linnet/app/services/session_store.py
pyr0ball 7e14f9135e feat: Notation v0.1.x scaffold — full backend + frontend + tests
FastAPI backend (port 8522):
- Session lifecycle: POST /session/start, DELETE /session/{id}/end, GET /session/{id}
- SSE stream: GET /session/{id}/stream — per-subscriber asyncio.Queue fan-out, 15s heartbeat
- History: GET /session/{id}/history with min_confidence + limit filters
- Audio: WS /session/{id}/audio — binary PCM ingestion stub (real inference in v0.2.x)
- Export: GET /session/{id}/export — downloadable JSON session log
- ContextClassifier background task per session (CF_VOICE_MOCK=1 in dev)
- ToneEvent SSE wire format per cf-core#40 (locked field names)
- Tier gate: CFG-LNNT- prefix check, 402 for paid features

Vue 3 frontend (port 8521, Vite + UnoCSS + Pinia):
- NowPanel: affect-aware border tint, subtext, prosody flags, shift indicator
- HistoryStrip: horizontal scroll, last 8 events with affect color
- ComposeBar: start/stop session, SSE connection lifecycle
- useToneStream: EventSource composable
- useAudioCapture: AudioWorklet → Int16 PCM → WebSocket (v0.1.x stub)
- audio-processor.js: 100ms chunk accumulator in AudioWorklet thread
- Respects prefers-reduced-motion globally

26 tests passing, manage.sh, Dockerfile, compose.yml included.
2026-04-06 18:23:52 -07:00

71 lines
2.2 KiB
Python

# app/services/session_store.py — session lifecycle and classifier management
from __future__ import annotations
import asyncio
import logging
from cf_voice.context import ContextClassifier
from app.models.session import Session
from app.models.tone_event import ToneEvent
from app.services.annotator import annotate
logger = logging.getLogger(__name__)
# Module-level singleton store — one per process
_sessions: dict[str, Session] = {}
_tasks: dict[str, asyncio.Task] = {}
def create_session(elcor: bool = False) -> Session:
"""Create a new session and start its ContextClassifier background task."""
session = Session(elcor=elcor)
_sessions[session.session_id] = session
task = asyncio.create_task(
_run_classifier(session),
name=f"classifier-{session.session_id}",
)
_tasks[session.session_id] = task
session.state = "running"
logger.info("Session %s started", session.session_id)
return session
def get_session(session_id: str) -> Session | None:
return _sessions.get(session_id)
def end_session(session_id: str) -> bool:
"""Stop and remove a session. Returns True if it existed."""
session = _sessions.pop(session_id, None)
if session is None:
return False
session.state = "stopped"
task = _tasks.pop(session_id, None)
if task and not task.done():
task.cancel()
logger.info("Session %s ended", session_id)
return True
async def _run_classifier(session: Session) -> None:
"""
Background task: stream VoiceFrames from cf-voice and broadcast ToneEvents.
Starts the ContextClassifier (mock or real depending on CF_VOICE_MOCK),
converts each frame via annotator.annotate(), and fans out to subscribers.
"""
classifier = ContextClassifier.from_env()
try:
async for frame in classifier.stream():
if session.state == "stopped":
break
event = annotate(frame, session_id=session.session_id, elcor=session.elcor)
if event is not None:
session.broadcast(event)
except asyncio.CancelledError:
pass
finally:
await classifier.stop()
session.state = "stopped"
logger.info("Classifier stopped for session %s", session.session_id)