# cf_voice/capture.py — real microphone capture # # MIT licensed. This layer handles audio I/O and buffering only. # Inference (STT, classify) runs here but is imported lazily so that mock # mode works without the [inference] extras installed. from __future__ import annotations import asyncio import logging import time from typing import AsyncIterator import numpy as np from cf_voice.models import VoiceFrame from cf_voice.io import VoiceIO logger = logging.getLogger(__name__) _SAMPLE_RATE = 16_000 _CHUNK_FRAMES = 1_600 # 100ms of audio at 16kHz _WINDOW_CHUNKS = 20 # 20 × 100ms = 2s STT window # Skip windows whose RMS is below this (microphone noise floor) _SILENCE_RMS = 0.008 class MicVoiceIO(VoiceIO): """ Real microphone capture producing enriched VoiceFrames. Capture loop (sounddevice callback → asyncio.Queue): ┌──────────────────────────────────────────────────────────────┐ │ sounddevice InputStream │ │ 100ms PCM Int16 chunks → _queue │ └──────────────────────────────────────────────────────────────┘ ↓ accumulated 20× (2s window) ┌──────────────────────────────────────────────────────────────┐ │ Parallel async tasks (both run in thread pool) │ │ WhisperSTT.transcribe_chunk_async() → transcript │ │ ToneClassifier.classify_async() → ToneResult │ └──────────────────────────────────────────────────────────────┘ ↓ combined VoiceFrame(label, confidence, speaker_id, shift_magnitude, ...) speaker_id is always "speaker_a" until Navigation v0.2.x wires in the Diarizer. That integration happens in ContextClassifier, not here. Requires: pip install cf-voice[inference] """ def __init__( self, device_index: int | None = None, ) -> None: # Lazy import so that importing MicVoiceIO doesn't blow up without # inference deps — only instantiation requires them. try: from cf_voice.stt import WhisperSTT from cf_voice.classify import ToneClassifier except ImportError as exc: raise ImportError( "Real audio capture requires the [inference] extras. " "Install with: pip install cf-voice[inference]" ) from exc self._stt = WhisperSTT.from_env() self._classifier = ToneClassifier.from_env() self._device_index = device_index self._running = False self._queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=200) self._prev_label: str = "" self._prev_confidence: float = 0.0 async def stream(self) -> AsyncIterator[VoiceFrame]: # type: ignore[override] try: import sounddevice as sd except ImportError as exc: raise ImportError( "sounddevice is required for microphone capture. " "Install with: pip install cf-voice[inference]" ) from exc self._running = True self._stt.reset_session() loop = asyncio.get_event_loop() def _sd_callback( indata: np.ndarray, frames: int, time_info, status ) -> None: if status: logger.debug("sounddevice status: %s", status) pcm = (indata[:, 0] * 32_767).astype(np.int16).tobytes() try: loop.call_soon_threadsafe(self._queue.put_nowait, pcm) except asyncio.QueueFull: logger.warning("Audio queue full — dropping chunk (inference too slow?)") input_stream = sd.InputStream( samplerate=_SAMPLE_RATE, channels=1, dtype="float32", blocksize=_CHUNK_FRAMES, device=self._device_index, callback=_sd_callback, ) window: list[bytes] = [] session_start = time.monotonic() with input_stream: while self._running: try: chunk = await asyncio.wait_for(self._queue.get(), timeout=0.5) except asyncio.TimeoutError: continue window.append(chunk) if len(window) < _WINDOW_CHUNKS: continue # 2-second window ready raw = b"".join(window) window.clear() audio = ( np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32_768.0 ) # Skip silent windows rms = float(np.sqrt(np.mean(audio ** 2))) if rms < _SILENCE_RMS: continue # Run STT and tone classification in parallel stt_task = asyncio.create_task( self._stt.transcribe_chunk_async(raw) ) tone_task = asyncio.create_task( self._classifier.classify_async(audio) ) stt_result, tone = await asyncio.gather(stt_task, tone_task) # Update transcript on tone result now that STT is done # (re-classify with text is cheap enough to do inline) if stt_result.text: tone = await self._classifier.classify_async( audio, stt_result.text ) shift = _compute_shift( self._prev_label, self._prev_confidence, tone.label, tone.confidence, ) self._prev_label = tone.label self._prev_confidence = tone.confidence yield VoiceFrame( label=tone.label, confidence=tone.confidence, speaker_id="speaker_a", # Navigation v0.2.x: diarizer wired here shift_magnitude=round(shift, 3), timestamp=round(time.monotonic() - session_start, 2), ) async def stop(self) -> None: self._running = False # ── Helpers ─────────────────────────────────────────────────────────────────── def _compute_shift( prev_label: str, prev_confidence: float, curr_label: str, curr_confidence: float, ) -> float: """ Compute shift_magnitude for a VoiceFrame. 0.0 when the label hasn't changed. Higher when the label changes with high confidence in both directions. Capped at 1.0. """ if not prev_label or curr_label == prev_label: return 0.0 # A high-confidence transition in both frames = large shift return min(1.0, (prev_confidence + curr_confidence) / 2.0)