""" cf-voice FastAPI service — managed by cf-orch. Tone/affect classification sidecar for Linnet and any product that needs real-time audio context annotation. Wraps ContextClassifier so it runs as an independent managed process rather than embedded in the consumer's process. Endpoints: GET /health → {"status": "ok", "mode": "mock"|"real"} POST /classify → ClassifyResponse Usage: python -m cf_voice.app --port 8007 --gpu-id 0 Mock mode (no GPU, no audio hardware required): CF_VOICE_MOCK=1 python -m cf_voice.app --port 8007 """ from __future__ import annotations import argparse import logging import os import uvicorn from fastapi import FastAPI, HTTPException from pydantic import BaseModel from cf_voice.context import ContextClassifier, model_status logger = logging.getLogger(__name__) _classifier: ContextClassifier | None = None _mock_mode: bool = False # ── Request / response models ───────────────────────────────────────────────── class ClassifyRequest(BaseModel): audio_chunk: str | None = None # base64-encoded PCM int16 mono 16kHz; None in mock mode timestamp: float = 0.0 elcor: bool | None = None prior_frames: int | None = None session_id: str = "" language: str | None = None # BCP-47 hint for Whisper ("en", "es", …); None = auto-detect num_speakers: int | None = None # pyannote hint: None = auto; 1–8 = fixed min+max class AudioEventOut(BaseModel): event_type: str label: str confidence: float timestamp: float speaker_id: str = "speaker_a" subtext: str | None = None affect: str | None = None shift_magnitude: float | None = None shift_direction: str | None = None prosody_flags: list[str] = [] # Dimensional emotion (audeering model) — None when classifier disabled valence: float | None = None arousal: float | None = None dominance: float | None = None # Prosodic signals (openSMILE) — None when extractor disabled sarcasm_risk: float | None = None flat_f0_score: float | None = None # Trajectory signals — None until BASELINE_MIN frames buffered per speaker arousal_delta: float | None = None valence_delta: float | None = None trend: str | None = None # Coherence signals (SER vs VAD) coherence_score: float | None = None suppression_flag: bool | None = None reframe_type: str | None = None affect_divergence: float | None = None class ClassifyResponse(BaseModel): events: list[AudioEventOut] # ── App factory ─────────────────────────────────────────────────────────────── def create_app(gpu_id: int = 0, mock: bool = False) -> FastAPI: global _classifier, _mock_mode # Signal GPU to the inference backends (wav2vec2 loads via transformers pipeline) if not mock: os.environ.setdefault("CUDA_VISIBLE_DEVICES", str(gpu_id)) _mock_mode = mock or os.environ.get("CF_VOICE_MOCK", "") == "1" _classifier = ContextClassifier.mock() if _mock_mode else ContextClassifier.from_env() logger.info("cf-voice ready: mode=%s", "mock" if _mock_mode else "real") app = FastAPI(title="cf-voice", version="0.1.0") @app.on_event("startup") async def _startup_prewarm() -> None: """Pre-warm all configured models so downloads happen at startup, not on the first classify call (which has a hard 9-second timeout).""" if _classifier is not None: import asyncio as _asyncio _asyncio.create_task(_classifier.prewarm()) @app.get("/health") def health() -> dict: result: dict = { "status": "ok", "mode": "mock" if _mock_mode else "real", "models": dict(model_status), } # Surface misconfigured-but-silent diarizer so Linnet can warn the user. # Check env vars only — no model loading needed at health-check time. warnings: list[str] = [] if os.environ.get("CF_VOICE_DIARIZE", "0") == "1": token = os.environ.get("HF_TOKEN", "").strip() if not token: warnings.append( "Diarization is enabled (CF_VOICE_DIARIZE=1) but HF_TOKEN is not set. " "Speaker identity badges will not appear. " "Set HF_TOKEN in your .env and accept pyannote model terms at huggingface.co." ) if warnings: result["warnings"] = warnings return result @app.post("/classify") async def classify(req: ClassifyRequest) -> ClassifyResponse: if _classifier is None: raise HTTPException(503, detail="classifier not initialised") try: events = await _classifier.classify_chunk_async( audio_b64=req.audio_chunk, timestamp=req.timestamp, prior_frames=req.prior_frames, elcor=req.elcor, session_id=req.session_id, language=req.language, num_speakers=req.num_speakers, ) except NotImplementedError as exc: raise HTTPException(501, detail=str(exc)) from cf_voice.events import ToneEvent out: list[AudioEventOut] = [] for e in events: is_tone = isinstance(e, ToneEvent) out.append(AudioEventOut( event_type=e.event_type, label=e.label, confidence=round(e.confidence, 4), timestamp=e.timestamp, speaker_id=getattr(e, "speaker_id", "speaker_a") or "speaker_a", subtext=getattr(e, "subtext", None), affect=getattr(e, "affect", None) if is_tone else None, shift_magnitude=getattr(e, "shift_magnitude", None) if is_tone else None, shift_direction=getattr(e, "shift_direction", None) if is_tone else None, prosody_flags=getattr(e, "prosody_flags", []) if is_tone else [], valence=getattr(e, "valence", None) if is_tone else None, arousal=getattr(e, "arousal", None) if is_tone else None, dominance=getattr(e, "dominance", None) if is_tone else None, sarcasm_risk=getattr(e, "sarcasm_risk", None) if is_tone else None, flat_f0_score=getattr(e, "flat_f0_score", None) if is_tone else None, arousal_delta=getattr(e, "arousal_delta", None) if is_tone else None, valence_delta=getattr(e, "valence_delta", None) if is_tone else None, trend=getattr(e, "trend", None) if is_tone else None, coherence_score=getattr(e, "coherence_score", None) if is_tone else None, suppression_flag=getattr(e, "suppression_flag", None) if is_tone else None, reframe_type=getattr(e, "reframe_type", None) if is_tone else None, affect_divergence=getattr(e, "affect_divergence", None) if is_tone else None, )) return ClassifyResponse(events=out) return app # ── CLI entrypoint ──────────────────────────────────────────────────────────── def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="cf-voice tone classification server") parser.add_argument("--port", type=int, default=8007) parser.add_argument("--host", default="0.0.0.0") parser.add_argument("--gpu-id", type=int, default=0) parser.add_argument("--mock", action="store_true", help="Run in mock mode (no GPU, no audio hardware needed)") return parser.parse_args() if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s — %(message)s") args = _parse_args() app = create_app(gpu_id=args.gpu_id, mock=args.mock) uvicorn.run(app, host=args.host, port=args.port, log_level="info")