signal_bus module — generic SSE event publisher for real-time signal streams #58

Open
opened 2026-04-26 21:50:23 -07:00 by pyr0ball · 0 comments
Owner

Both Merlin (gesture events) and Linnet (transcription/tone events) need a real-time event
stream that local services can subscribe to. Extract into cf-core rather than duplicating.

Interface:

from circuitforge_core.signal_bus import SignalBus, SignalEvent
bus = SignalBus()
# Producer
bus.publish(SignalEvent(source='merlin', kind='gesture', payload={'name': 'open_palm', 'confidence': 0.94}))
# Consumer — SSE endpoint
@app.get('/events')
async def events(request: Request):
    return bus.subscribe(request)  # StreamingResponse

Implementation: SSE via FastAPI StreamingResponse. Bounded asyncio.Queue per subscriber
(drop oldest on overflow). publish() safe to call from sync threads via loop.call_soon_threadsafe().

BCI adaptive feedback loop use case (Phase C): The signal bus also carries the closed-loop
neurofeedback channel. Merlin publishes decoded EEG state (e.g. 'alpha_rising', 'motor_intent_left');
the training UI subscribes and renders real-time visual/audio cues that the user reacts to.
This closes the feedback loop: stimulus -> EEG response -> decoded state -> next stimulus.
The bus must carry sub-100ms round-trip latency to be useful for neurofeedback.

Products that will use this: Merlin (gestures, EEG state), Linnet (transcription events),
future products needing real-time signal streaming.

MIT licensed.

Both Merlin (gesture events) and Linnet (transcription/tone events) need a real-time event stream that local services can subscribe to. Extract into cf-core rather than duplicating. **Interface:** ```python from circuitforge_core.signal_bus import SignalBus, SignalEvent bus = SignalBus() # Producer bus.publish(SignalEvent(source='merlin', kind='gesture', payload={'name': 'open_palm', 'confidence': 0.94})) # Consumer — SSE endpoint @app.get('/events') async def events(request: Request): return bus.subscribe(request) # StreamingResponse ``` **Implementation:** SSE via FastAPI StreamingResponse. Bounded asyncio.Queue per subscriber (drop oldest on overflow). publish() safe to call from sync threads via loop.call_soon_threadsafe(). **BCI adaptive feedback loop use case (Phase C):** The signal bus also carries the closed-loop neurofeedback channel. Merlin publishes decoded EEG state (e.g. 'alpha_rising', 'motor_intent_left'); the training UI subscribes and renders real-time visual/audio cues that the user reacts to. This closes the feedback loop: stimulus -> EEG response -> decoded state -> next stimulus. The bus must carry sub-100ms round-trip latency to be useful for neurofeedback. Products that will use this: Merlin (gestures, EEG state), Linnet (transcription events), future products needing real-time signal streaming. MIT licensed.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference: Circuit-Forge/circuitforge-core#58
No description provided.