sparrow/app/api/events_store.py
pyr0ball a6f60c9e07 feat: implement Sparrow backend (v0.1.0)
Full FastAPI backend for the AI music continuation editor:

Services
- chain.py: chain + node CRUD, commit/discard, recursive CTE spine query
- musicgen.py: MusicGenClient with cf-orch allocation + mock mode (CF_MUSICGEN_MOCK=1)
- stems.py: Demucs 4-stem separation subprocess wrapper + mock mode
- export.py: ffmpeg concat demuxer to stitch committed spine into WAV/MP3

API endpoints
- chains: CRUD, multipart audio upload (WAV/MP3/FLAC/OGG/M4A/AIFF)
- nodes: branch creation (202 + BackgroundTasks), commit, discard, audio stream
- gpu: cf-orch capacity status; session allocation stubbed pending cf-orch#43
- stems: Paid-tier stem separation (Demucs, gated via tiers.py)
- export: POST /{chain_id}/export → FileResponse download
- events: SSE stream (node-status events) per chain via asyncio Queue pub/sub

Infrastructure
- lifespan: reads SPARROW_DB_PATH/DATA_DIR at startup (not import time)
- events_store: subscribe/unsubscribe/broadcast pattern for SSE
- CORS: open in dev, SPARROW_CORS_ORIGINS in production
- Background generation opens its own DB connection (WAL-safe)

Tests: 30/30 passing across service units and API integration
2026-04-17 15:22:37 -07:00

34 lines
1.1 KiB
Python

# app/api/events_store.py — Chain-scoped pub/sub for SSE node-status events
#
# Each SSE connection subscribes with subscribe(chain_id) → Queue.
# Background generation tasks call broadcast(chain_id, event) to push
# node state transitions (pending → generating → ready/error) to all listeners.
from __future__ import annotations
import asyncio
from collections import defaultdict
from typing import Any
# chain_id → list of subscriber queues
_listeners: dict[str, list[asyncio.Queue[Any]]] = defaultdict(list)
def subscribe(chain_id: str) -> asyncio.Queue[Any]:
"""Register a new SSE listener for chain_id. Returns its Queue."""
q: asyncio.Queue[Any] = asyncio.Queue()
_listeners[chain_id].append(q)
return q
def unsubscribe(chain_id: str, q: asyncio.Queue[Any]) -> None:
"""Remove a listener Queue. Safe to call if already removed."""
try:
_listeners[chain_id].remove(q)
except ValueError:
pass
async def broadcast(chain_id: str, event: dict[str, Any]) -> None:
"""Push event dict to all active listeners for chain_id."""
for q in list(_listeners.get(chain_id, [])):
await q.put(event)