cf-voice/cf_voice/app.py
pyr0ball 24f04b67db feat: full voice pipeline — AST acoustic, accent, privacy, prosody, dimensional, trajectory, telephony, FastAPI app
New modules shipped (from Linnet integration):
- acoustic.py: AST (MIT/ast-finetuned-audioset-10-10-0.4593) replaces YAMNet stub;
  527 AudioSet classes mapped to queue/speaker/environ/scene labels; _LABEL_MAP
  includes hold_music, ringback, DTMF, background_shift, AMD signal chain
- accent.py: facebook/mms-lid-126 language ID → regional accent labels
  (en_gb, en_us, en_au, fr, es, de, zh, …); lazy-loaded, gated by CF_VOICE_ACCENT
- privacy.py: compound privacy risk scorer — public_env, background_voices,
  nature scene, accent signals; returns 0–3 score without storing any audio
- prosody.py: openSMILE-backed prosody extractor (sarcasm_risk, flat_f0_score,
  speech_rate, pitch_range); mock mode returns neutral values
- dimensional.py: audeering/wav2vec2-large-robust-12-ft-emotion-msp-dim
  valence/arousal/dominance scorer; gated by CF_VOICE_DIMENSIONAL
- trajectory.py: rolling buffer for arousal/valence deltas, trend detection
  (escalating/suppressed/stable), coherence scoring, suppression/reframe flags
- telephony.py: TelephonyBackend Protocol + MockTelephonyBackend + SignalWireBackend
  + FreeSWITCHBackend; CallSession dataclass; make_telephony() factory
- app.py: FastAPI service (port 8007) — /health + /classify; accepts base64 PCM
  chunks, returns full AudioEventOut including dimensional/prosody/accent fields
- prefs.py: voice preference helpers (elcor_mode, confidence_threshold,
  whisper_model, elcor_prior_frames); cf-core and env-var fallback

Tests: fix stale tests (YAMNetAcousticBackend → ASTAcousticBackend, scene field
added to AcousticResult, speaker_at gap now resolves dominant speaker not UNKNOWN,
make_io real path returns MicVoiceIO when sounddevice installed). 78 tests passing.

Closes #2, #3.
2026-04-18 22:36:58 -07:00

197 lines
8.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
cf-voice FastAPI service — managed by cf-orch.
Tone/affect classification sidecar for Linnet and any product that needs
real-time audio context annotation. Wraps ContextClassifier so it runs as an
independent managed process rather than embedded in the consumer's process.
Endpoints:
GET /health → {"status": "ok", "mode": "mock"|"real"}
POST /classify → ClassifyResponse
Usage:
python -m cf_voice.app --port 8007 --gpu-id 0
Mock mode (no GPU, no audio hardware required):
CF_VOICE_MOCK=1 python -m cf_voice.app --port 8007
"""
from __future__ import annotations
import argparse
import logging
import os
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from cf_voice.context import ContextClassifier, model_status
logger = logging.getLogger(__name__)
_classifier: ContextClassifier | None = None
_mock_mode: bool = False
# ── Request / response models ─────────────────────────────────────────────────
class ClassifyRequest(BaseModel):
audio_chunk: str | None = None # base64-encoded PCM int16 mono 16kHz; None in mock mode
timestamp: float = 0.0
elcor: bool | None = None
prior_frames: int | None = None
session_id: str = ""
language: str | None = None # BCP-47 hint for Whisper ("en", "es", …); None = auto-detect
num_speakers: int | None = None # pyannote hint: None = auto; 18 = fixed min+max
class AudioEventOut(BaseModel):
event_type: str
label: str
confidence: float
timestamp: float
speaker_id: str = "speaker_a"
subtext: str | None = None
affect: str | None = None
shift_magnitude: float | None = None
shift_direction: str | None = None
prosody_flags: list[str] = []
# Dimensional emotion (audeering model) — None when classifier disabled
valence: float | None = None
arousal: float | None = None
dominance: float | None = None
# Prosodic signals (openSMILE) — None when extractor disabled
sarcasm_risk: float | None = None
flat_f0_score: float | None = None
# Trajectory signals — None until BASELINE_MIN frames buffered per speaker
arousal_delta: float | None = None
valence_delta: float | None = None
trend: str | None = None
# Coherence signals (SER vs VAD)
coherence_score: float | None = None
suppression_flag: bool | None = None
reframe_type: str | None = None
affect_divergence: float | None = None
class ClassifyResponse(BaseModel):
events: list[AudioEventOut]
# ── App factory ───────────────────────────────────────────────────────────────
def create_app(gpu_id: int = 0, mock: bool = False) -> FastAPI:
global _classifier, _mock_mode
# Signal GPU to the inference backends (wav2vec2 loads via transformers pipeline)
if not mock:
os.environ.setdefault("CUDA_VISIBLE_DEVICES", str(gpu_id))
_mock_mode = mock or os.environ.get("CF_VOICE_MOCK", "") == "1"
_classifier = ContextClassifier.mock() if _mock_mode else ContextClassifier.from_env()
logger.info("cf-voice ready: mode=%s", "mock" if _mock_mode else "real")
app = FastAPI(title="cf-voice", version="0.1.0")
@app.on_event("startup")
async def _startup_prewarm() -> None:
"""Pre-warm all configured models so downloads happen at startup, not
on the first classify call (which has a hard 9-second timeout)."""
if _classifier is not None:
import asyncio as _asyncio
_asyncio.create_task(_classifier.prewarm())
@app.get("/health")
def health() -> dict:
result: dict = {
"status": "ok",
"mode": "mock" if _mock_mode else "real",
"models": dict(model_status),
}
# Surface misconfigured-but-silent diarizer so Linnet can warn the user.
# Check env vars only — no model loading needed at health-check time.
warnings: list[str] = []
if os.environ.get("CF_VOICE_DIARIZE", "0") == "1":
token = os.environ.get("HF_TOKEN", "").strip()
if not token:
warnings.append(
"Diarization is enabled (CF_VOICE_DIARIZE=1) but HF_TOKEN is not set. "
"Speaker identity badges will not appear. "
"Set HF_TOKEN in your .env and accept pyannote model terms at huggingface.co."
)
if warnings:
result["warnings"] = warnings
return result
@app.post("/classify")
async def classify(req: ClassifyRequest) -> ClassifyResponse:
if _classifier is None:
raise HTTPException(503, detail="classifier not initialised")
try:
events = await _classifier.classify_chunk_async(
audio_b64=req.audio_chunk,
timestamp=req.timestamp,
prior_frames=req.prior_frames,
elcor=req.elcor,
session_id=req.session_id,
language=req.language,
num_speakers=req.num_speakers,
)
except NotImplementedError as exc:
raise HTTPException(501, detail=str(exc))
from cf_voice.events import ToneEvent
out: list[AudioEventOut] = []
for e in events:
is_tone = isinstance(e, ToneEvent)
out.append(AudioEventOut(
event_type=e.event_type,
label=e.label,
confidence=round(e.confidence, 4),
timestamp=e.timestamp,
speaker_id=getattr(e, "speaker_id", "speaker_a") or "speaker_a",
subtext=getattr(e, "subtext", None),
affect=getattr(e, "affect", None) if is_tone else None,
shift_magnitude=getattr(e, "shift_magnitude", None) if is_tone else None,
shift_direction=getattr(e, "shift_direction", None) if is_tone else None,
prosody_flags=getattr(e, "prosody_flags", []) if is_tone else [],
valence=getattr(e, "valence", None) if is_tone else None,
arousal=getattr(e, "arousal", None) if is_tone else None,
dominance=getattr(e, "dominance", None) if is_tone else None,
sarcasm_risk=getattr(e, "sarcasm_risk", None) if is_tone else None,
flat_f0_score=getattr(e, "flat_f0_score", None) if is_tone else None,
arousal_delta=getattr(e, "arousal_delta", None) if is_tone else None,
valence_delta=getattr(e, "valence_delta", None) if is_tone else None,
trend=getattr(e, "trend", None) if is_tone else None,
coherence_score=getattr(e, "coherence_score", None) if is_tone else None,
suppression_flag=getattr(e, "suppression_flag", None) if is_tone else None,
reframe_type=getattr(e, "reframe_type", None) if is_tone else None,
affect_divergence=getattr(e, "affect_divergence", None) if is_tone else None,
))
return ClassifyResponse(events=out)
return app
# ── CLI entrypoint ────────────────────────────────────────────────────────────
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="cf-voice tone classification server")
parser.add_argument("--port", type=int, default=8007)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--gpu-id", type=int, default=0)
parser.add_argument("--mock", action="store_true",
help="Run in mock mode (no GPU, no audio hardware needed)")
return parser.parse_args()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s%(message)s")
args = _parse_args()
app = create_app(gpu_id=args.gpu_id, mock=args.mock)
uvicorn.run(app, host=args.host, port=args.port, log_level="info")