cf-voice/cf_voice/capture.py
pyr0ball fed6388b99 feat: real inference pipeline — STT, tone classifier, diarization, mic capture
- cf_voice/stt.py: WhisperSTT async wrapper (faster-whisper, thread-pool executor,
  rolling 50-word session prompt for cross-chunk context continuity)
- cf_voice/classify.py: ToneClassifier — wav2vec2 SER + librosa prosody flags
  (energy, ZCR speech rate, YIN pitch contour) mapped to AFFECT_LABELS
- cf_voice/diarize.py: Diarizer async wrapper around pyannote/speaker-diarization-3.1;
  speaker_at() helper for Navigation v0.2.x wiring
- cf_voice/capture.py: MicVoiceIO — sounddevice 16kHz mono capture, 2s window
  accumulation, parallel STT+classify tasks, shift_magnitude from confidence delta
- cf_voice/io.py: make_io() now returns MicVoiceIO when CF_VOICE_MOCK is unset
- cf_voice/context.py: classify_chunk() split into mock/real paths; real path
  decodes base64 PCM and runs ToneClassifier synchronously (cf-orch endpoint)
- pyproject.toml: inference extras expanded (faster-whisper, sounddevice,
  librosa, python-dotenv)
- .env.example: HF_TOKEN, CF_VOICE_WHISPER_MODEL, CF_VOICE_DEVICE, CF_VOICE_MOCK,
  CF_VOICE_CONFIDENCE_THRESHOLD

Prior art ported from: Plex-Scripts/transcription/diarization.py (pyannote
setup), devl/ogma/backend/speech/transcription_engine.py (faster-whisper
preprocessing and session prompt pattern).
2026-04-06 17:33:51 -07:00

192 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# cf_voice/capture.py — real microphone capture
#
# MIT licensed. This layer handles audio I/O and buffering only.
# Inference (STT, classify) runs here but is imported lazily so that mock
# mode works without the [inference] extras installed.
from __future__ import annotations
import asyncio
import logging
import time
from typing import AsyncIterator
import numpy as np
from cf_voice.models import VoiceFrame
from cf_voice.io import VoiceIO
logger = logging.getLogger(__name__)
_SAMPLE_RATE = 16_000
_CHUNK_FRAMES = 1_600 # 100ms of audio at 16kHz
_WINDOW_CHUNKS = 20 # 20 × 100ms = 2s STT window
# Skip windows whose RMS is below this (microphone noise floor)
_SILENCE_RMS = 0.008
class MicVoiceIO(VoiceIO):
"""
Real microphone capture producing enriched VoiceFrames.
Capture loop (sounddevice callback → asyncio.Queue):
┌──────────────────────────────────────────────────────────────┐
│ sounddevice InputStream │
│ 100ms PCM Int16 chunks → _queue │
└──────────────────────────────────────────────────────────────┘
↓ accumulated 20× (2s window)
┌──────────────────────────────────────────────────────────────┐
│ Parallel async tasks (both run in thread pool) │
│ WhisperSTT.transcribe_chunk_async() → transcript │
│ ToneClassifier.classify_async() → ToneResult │
└──────────────────────────────────────────────────────────────┘
↓ combined
VoiceFrame(label, confidence, speaker_id, shift_magnitude, ...)
speaker_id is always "speaker_a" until Navigation v0.2.x wires in the
Diarizer. That integration happens in ContextClassifier, not here.
Requires: pip install cf-voice[inference]
"""
def __init__(
self,
device_index: int | None = None,
) -> None:
# Lazy import so that importing MicVoiceIO doesn't blow up without
# inference deps — only instantiation requires them.
try:
from cf_voice.stt import WhisperSTT
from cf_voice.classify import ToneClassifier
except ImportError as exc:
raise ImportError(
"Real audio capture requires the [inference] extras. "
"Install with: pip install cf-voice[inference]"
) from exc
self._stt = WhisperSTT.from_env()
self._classifier = ToneClassifier.from_env()
self._device_index = device_index
self._running = False
self._queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=200)
self._prev_label: str = ""
self._prev_confidence: float = 0.0
async def stream(self) -> AsyncIterator[VoiceFrame]: # type: ignore[override]
try:
import sounddevice as sd
except ImportError as exc:
raise ImportError(
"sounddevice is required for microphone capture. "
"Install with: pip install cf-voice[inference]"
) from exc
self._running = True
self._stt.reset_session()
loop = asyncio.get_event_loop()
def _sd_callback(
indata: np.ndarray, frames: int, time_info, status
) -> None:
if status:
logger.debug("sounddevice status: %s", status)
pcm = (indata[:, 0] * 32_767).astype(np.int16).tobytes()
try:
loop.call_soon_threadsafe(self._queue.put_nowait, pcm)
except asyncio.QueueFull:
logger.warning("Audio queue full — dropping chunk (inference too slow?)")
input_stream = sd.InputStream(
samplerate=_SAMPLE_RATE,
channels=1,
dtype="float32",
blocksize=_CHUNK_FRAMES,
device=self._device_index,
callback=_sd_callback,
)
window: list[bytes] = []
session_start = time.monotonic()
with input_stream:
while self._running:
try:
chunk = await asyncio.wait_for(self._queue.get(), timeout=0.5)
except asyncio.TimeoutError:
continue
window.append(chunk)
if len(window) < _WINDOW_CHUNKS:
continue
# 2-second window ready
raw = b"".join(window)
window.clear()
audio = (
np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32_768.0
)
# Skip silent windows
rms = float(np.sqrt(np.mean(audio ** 2)))
if rms < _SILENCE_RMS:
continue
# Run STT and tone classification in parallel
stt_task = asyncio.create_task(
self._stt.transcribe_chunk_async(raw)
)
tone_task = asyncio.create_task(
self._classifier.classify_async(audio)
)
stt_result, tone = await asyncio.gather(stt_task, tone_task)
# Update transcript on tone result now that STT is done
# (re-classify with text is cheap enough to do inline)
if stt_result.text:
tone = await self._classifier.classify_async(
audio, stt_result.text
)
shift = _compute_shift(
self._prev_label,
self._prev_confidence,
tone.label,
tone.confidence,
)
self._prev_label = tone.label
self._prev_confidence = tone.confidence
yield VoiceFrame(
label=tone.label,
confidence=tone.confidence,
speaker_id="speaker_a", # Navigation v0.2.x: diarizer wired here
shift_magnitude=round(shift, 3),
timestamp=round(time.monotonic() - session_start, 2),
)
async def stop(self) -> None:
self._running = False
# ── Helpers ───────────────────────────────────────────────────────────────────
def _compute_shift(
prev_label: str,
prev_confidence: float,
curr_label: str,
curr_confidence: float,
) -> float:
"""
Compute shift_magnitude for a VoiceFrame.
0.0 when the label hasn't changed.
Higher when the label changes with high confidence in both directions.
Capped at 1.0.
"""
if not prev_label or curr_label == prev_label:
return 0.0
# A high-confidence transition in both frames = large shift
return min(1.0, (prev_confidence + curr_confidence) / 2.0)