# app/api/events_store.py — Chain-scoped pub/sub for SSE node-status events # # Each SSE connection subscribes with subscribe(chain_id) → Queue. # Background generation tasks call broadcast(chain_id, event) to push # node state transitions (pending → generating → ready/error) to all listeners. from __future__ import annotations import asyncio from collections import defaultdict from typing import Any # chain_id → list of subscriber queues _listeners: dict[str, list[asyncio.Queue[Any]]] = defaultdict(list) def subscribe(chain_id: str) -> asyncio.Queue[Any]: """Register a new SSE listener for chain_id. Returns its Queue.""" q: asyncio.Queue[Any] = asyncio.Queue() _listeners[chain_id].append(q) return q def unsubscribe(chain_id: str, q: asyncio.Queue[Any]) -> None: """Remove a listener Queue. Safe to call if already removed.""" try: _listeners[chain_id].remove(q) except ValueError: pass async def broadcast(chain_id: str, event: dict[str, Any]) -> None: """Push event dict to all active listeners for chain_id.""" for q in list(_listeners.get(chain_id, [])): await q.put(event)