- cf_voice/stt.py: WhisperSTT async wrapper (faster-whisper, thread-pool executor, rolling 50-word session prompt for cross-chunk context continuity) - cf_voice/classify.py: ToneClassifier — wav2vec2 SER + librosa prosody flags (energy, ZCR speech rate, YIN pitch contour) mapped to AFFECT_LABELS - cf_voice/diarize.py: Diarizer async wrapper around pyannote/speaker-diarization-3.1; speaker_at() helper for Navigation v0.2.x wiring - cf_voice/capture.py: MicVoiceIO — sounddevice 16kHz mono capture, 2s window accumulation, parallel STT+classify tasks, shift_magnitude from confidence delta - cf_voice/io.py: make_io() now returns MicVoiceIO when CF_VOICE_MOCK is unset - cf_voice/context.py: classify_chunk() split into mock/real paths; real path decodes base64 PCM and runs ToneClassifier synchronously (cf-orch endpoint) - pyproject.toml: inference extras expanded (faster-whisper, sounddevice, librosa, python-dotenv) - .env.example: HF_TOKEN, CF_VOICE_WHISPER_MODEL, CF_VOICE_DEVICE, CF_VOICE_MOCK, CF_VOICE_CONFIDENCE_THRESHOLD Prior art ported from: Plex-Scripts/transcription/diarization.py (pyannote setup), devl/ogma/backend/speech/transcription_engine.py (faster-whisper preprocessing and session prompt pattern).
192 lines
7.3 KiB
Python
192 lines
7.3 KiB
Python
# cf_voice/capture.py — real microphone capture
|
||
#
|
||
# MIT licensed. This layer handles audio I/O and buffering only.
|
||
# Inference (STT, classify) runs here but is imported lazily so that mock
|
||
# mode works without the [inference] extras installed.
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import logging
|
||
import time
|
||
from typing import AsyncIterator
|
||
|
||
import numpy as np
|
||
|
||
from cf_voice.models import VoiceFrame
|
||
from cf_voice.io import VoiceIO
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
_SAMPLE_RATE = 16_000
|
||
_CHUNK_FRAMES = 1_600 # 100ms of audio at 16kHz
|
||
_WINDOW_CHUNKS = 20 # 20 × 100ms = 2s STT window
|
||
|
||
# Skip windows whose RMS is below this (microphone noise floor)
|
||
_SILENCE_RMS = 0.008
|
||
|
||
|
||
class MicVoiceIO(VoiceIO):
|
||
"""
|
||
Real microphone capture producing enriched VoiceFrames.
|
||
|
||
Capture loop (sounddevice callback → asyncio.Queue):
|
||
┌──────────────────────────────────────────────────────────────┐
|
||
│ sounddevice InputStream │
|
||
│ 100ms PCM Int16 chunks → _queue │
|
||
└──────────────────────────────────────────────────────────────┘
|
||
↓ accumulated 20× (2s window)
|
||
┌──────────────────────────────────────────────────────────────┐
|
||
│ Parallel async tasks (both run in thread pool) │
|
||
│ WhisperSTT.transcribe_chunk_async() → transcript │
|
||
│ ToneClassifier.classify_async() → ToneResult │
|
||
└──────────────────────────────────────────────────────────────┘
|
||
↓ combined
|
||
VoiceFrame(label, confidence, speaker_id, shift_magnitude, ...)
|
||
|
||
speaker_id is always "speaker_a" until Navigation v0.2.x wires in the
|
||
Diarizer. That integration happens in ContextClassifier, not here.
|
||
|
||
Requires: pip install cf-voice[inference]
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
device_index: int | None = None,
|
||
) -> None:
|
||
# Lazy import so that importing MicVoiceIO doesn't blow up without
|
||
# inference deps — only instantiation requires them.
|
||
try:
|
||
from cf_voice.stt import WhisperSTT
|
||
from cf_voice.classify import ToneClassifier
|
||
except ImportError as exc:
|
||
raise ImportError(
|
||
"Real audio capture requires the [inference] extras. "
|
||
"Install with: pip install cf-voice[inference]"
|
||
) from exc
|
||
|
||
self._stt = WhisperSTT.from_env()
|
||
self._classifier = ToneClassifier.from_env()
|
||
self._device_index = device_index
|
||
self._running = False
|
||
self._queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=200)
|
||
self._prev_label: str = ""
|
||
self._prev_confidence: float = 0.0
|
||
|
||
async def stream(self) -> AsyncIterator[VoiceFrame]: # type: ignore[override]
|
||
try:
|
||
import sounddevice as sd
|
||
except ImportError as exc:
|
||
raise ImportError(
|
||
"sounddevice is required for microphone capture. "
|
||
"Install with: pip install cf-voice[inference]"
|
||
) from exc
|
||
|
||
self._running = True
|
||
self._stt.reset_session()
|
||
|
||
loop = asyncio.get_event_loop()
|
||
|
||
def _sd_callback(
|
||
indata: np.ndarray, frames: int, time_info, status
|
||
) -> None:
|
||
if status:
|
||
logger.debug("sounddevice status: %s", status)
|
||
pcm = (indata[:, 0] * 32_767).astype(np.int16).tobytes()
|
||
try:
|
||
loop.call_soon_threadsafe(self._queue.put_nowait, pcm)
|
||
except asyncio.QueueFull:
|
||
logger.warning("Audio queue full — dropping chunk (inference too slow?)")
|
||
|
||
input_stream = sd.InputStream(
|
||
samplerate=_SAMPLE_RATE,
|
||
channels=1,
|
||
dtype="float32",
|
||
blocksize=_CHUNK_FRAMES,
|
||
device=self._device_index,
|
||
callback=_sd_callback,
|
||
)
|
||
|
||
window: list[bytes] = []
|
||
session_start = time.monotonic()
|
||
|
||
with input_stream:
|
||
while self._running:
|
||
try:
|
||
chunk = await asyncio.wait_for(self._queue.get(), timeout=0.5)
|
||
except asyncio.TimeoutError:
|
||
continue
|
||
|
||
window.append(chunk)
|
||
|
||
if len(window) < _WINDOW_CHUNKS:
|
||
continue
|
||
|
||
# 2-second window ready
|
||
raw = b"".join(window)
|
||
window.clear()
|
||
|
||
audio = (
|
||
np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32_768.0
|
||
)
|
||
|
||
# Skip silent windows
|
||
rms = float(np.sqrt(np.mean(audio ** 2)))
|
||
if rms < _SILENCE_RMS:
|
||
continue
|
||
|
||
# Run STT and tone classification in parallel
|
||
stt_task = asyncio.create_task(
|
||
self._stt.transcribe_chunk_async(raw)
|
||
)
|
||
tone_task = asyncio.create_task(
|
||
self._classifier.classify_async(audio)
|
||
)
|
||
stt_result, tone = await asyncio.gather(stt_task, tone_task)
|
||
|
||
# Update transcript on tone result now that STT is done
|
||
# (re-classify with text is cheap enough to do inline)
|
||
if stt_result.text:
|
||
tone = await self._classifier.classify_async(
|
||
audio, stt_result.text
|
||
)
|
||
|
||
shift = _compute_shift(
|
||
self._prev_label,
|
||
self._prev_confidence,
|
||
tone.label,
|
||
tone.confidence,
|
||
)
|
||
self._prev_label = tone.label
|
||
self._prev_confidence = tone.confidence
|
||
|
||
yield VoiceFrame(
|
||
label=tone.label,
|
||
confidence=tone.confidence,
|
||
speaker_id="speaker_a", # Navigation v0.2.x: diarizer wired here
|
||
shift_magnitude=round(shift, 3),
|
||
timestamp=round(time.monotonic() - session_start, 2),
|
||
)
|
||
|
||
async def stop(self) -> None:
|
||
self._running = False
|
||
|
||
|
||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||
|
||
def _compute_shift(
|
||
prev_label: str,
|
||
prev_confidence: float,
|
||
curr_label: str,
|
||
curr_confidence: float,
|
||
) -> float:
|
||
"""
|
||
Compute shift_magnitude for a VoiceFrame.
|
||
|
||
0.0 when the label hasn't changed.
|
||
Higher when the label changes with high confidence in both directions.
|
||
Capped at 1.0.
|
||
"""
|
||
if not prev_label or curr_label == prev_label:
|
||
return 0.0
|
||
# A high-confidence transition in both frames = large shift
|
||
return min(1.0, (prev_confidence + curr_confidence) / 2.0)
|