cf-voice/cf_voice/context.py
pyr0ball 24f04b67db feat: full voice pipeline — AST acoustic, accent, privacy, prosody, dimensional, trajectory, telephony, FastAPI app
New modules shipped (from Linnet integration):
- acoustic.py: AST (MIT/ast-finetuned-audioset-10-10-0.4593) replaces YAMNet stub;
  527 AudioSet classes mapped to queue/speaker/environ/scene labels; _LABEL_MAP
  includes hold_music, ringback, DTMF, background_shift, AMD signal chain
- accent.py: facebook/mms-lid-126 language ID → regional accent labels
  (en_gb, en_us, en_au, fr, es, de, zh, …); lazy-loaded, gated by CF_VOICE_ACCENT
- privacy.py: compound privacy risk scorer — public_env, background_voices,
  nature scene, accent signals; returns 0–3 score without storing any audio
- prosody.py: openSMILE-backed prosody extractor (sarcasm_risk, flat_f0_score,
  speech_rate, pitch_range); mock mode returns neutral values
- dimensional.py: audeering/wav2vec2-large-robust-12-ft-emotion-msp-dim
  valence/arousal/dominance scorer; gated by CF_VOICE_DIMENSIONAL
- trajectory.py: rolling buffer for arousal/valence deltas, trend detection
  (escalating/suppressed/stable), coherence scoring, suppression/reframe flags
- telephony.py: TelephonyBackend Protocol + MockTelephonyBackend + SignalWireBackend
  + FreeSWITCHBackend; CallSession dataclass; make_telephony() factory
- app.py: FastAPI service (port 8007) — /health + /classify; accepts base64 PCM
  chunks, returns full AudioEventOut including dimensional/prosody/accent fields
- prefs.py: voice preference helpers (elcor_mode, confidence_threshold,
  whisper_model, elcor_prior_frames); cf-core and env-var fallback

Tests: fix stale tests (YAMNetAcousticBackend → ASTAcousticBackend, scene field
added to AcousticResult, speaker_at gap now resolves dominant speaker not UNKNOWN,
make_io real path returns MicVoiceIO when sounddevice installed). 78 tests passing.

Closes #2, #3.
2026-04-18 22:36:58 -07:00

744 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# cf_voice/context.py — parallel audio context classifier (orchestrator)
#
# BSL 1.1 when real inference models are integrated.
# Mock mode: MIT licensed (no real inference).
#
# Runs three classifiers in parallel against the same audio window:
# 1. Tone/affect (classify.py) — wav2vec2 SER + librosa prosody
# 2. Queue/environ (acoustic.py) — YAMNet acoustic event detection
# 3. Speaker type/VAD (diarize.py) — pyannote.audio (Navigation v0.2.x)
#
# Combined output is a list[AudioEvent] per window, merged into VoiceFrame
# for the streaming path.
#
# Elcor mode reads from cf-core preferences (cf_voice.prefs) so that the
# annotation format is user-configurable without per-request flags.
from __future__ import annotations
import asyncio
import logging
import os
from typing import AsyncIterator
from cf_voice.acoustic import MockAcousticBackend, make_acoustic
from cf_voice.events import AudioEvent, ToneEvent, tone_event_from_voice_frame
from cf_voice.io import MockVoiceIO, VoiceIO, make_io
from cf_voice.models import VoiceFrame
from cf_voice.prefs import get_elcor_prior_frames, is_elcor_enabled
logger = logging.getLogger(__name__)
# ── Per-model download/load status registry ───────────────────────────────────
# Written by _load_* methods; read by the /health endpoint in app.py.
# Values: "disabled" | "loading" | "ready" | "error"
# Thread-safe: individual str assignment is atomic in CPython.
model_status: dict[str, str] = {}
# ── No-op coroutines for disabled/unavailable classifiers ─────────────────────
async def _noop_stt() -> None:
"""Placeholder when STT is disabled or unavailable."""
return None
async def _noop_diarize() -> list:
"""Placeholder when diarization is disabled or unavailable."""
return []
# ─────────────────────────────────────────────────────────────────────────────
class ContextClassifier:
"""
High-level voice context classifier.
Wraps a VoiceIO source and runs three parallel classifiers on each audio
window: tone (SER), queue/environ (YAMNet), and speaker (pyannote).
In mock mode all classifiers produce synthetic events — no GPU, microphone,
or HuggingFace token required.
Usage
-----
classifier = ContextClassifier.from_env()
async for frame in classifier.stream():
print(frame.label, frame.confidence)
For the full multi-class event list (queue + speaker + tone):
events = classifier.classify_chunk(audio_b64, timestamp=4.5)
"""
def __init__(
self,
io: VoiceIO,
user_id: str | None = None,
store=None,
) -> None:
self._io = io
self._user_id = user_id
self._store = store
self._acoustic = make_acoustic(
mock=isinstance(io, MockVoiceIO)
or os.environ.get("CF_VOICE_MOCK", "") == "1"
)
# Lazy — loaded on first real classify call, then reused.
self._tone: "ToneClassifier | None" = None
# STT: loaded if faster-whisper is installed. Controlled by CF_VOICE_STT (default: 1).
self._stt: "WhisperSTT | None" = None
self._stt_loaded: bool = False # False = not yet attempted
# Diarizer: optional — requires HF_TOKEN and CF_VOICE_DIARIZE=1.
self._diarizer: "Diarizer | None" = None
self._diarizer_loaded: bool = False
# Per-session speaker label tracker — maps pyannote IDs → "Speaker A/B/..."
# Reset at session end (when the ContextClassifier is stopped).
from cf_voice.diarize import SpeakerTracker
self._speaker_tracker: SpeakerTracker = SpeakerTracker()
# One-at-a-time GPU classify gate. All three models share the same GPU;
# running them "in parallel" just serializes at the CUDA level while
# filling the thread pool. Drop incoming frames when a classify is
# already in flight — freshness beats completeness for real-time audio.
self._classify_lock: asyncio.Lock = asyncio.Lock()
# Dimensional classifier (audeering) — lazy, CF_VOICE_DIMENSIONAL=1
self._dimensional: "DimensionalClassifier | None" = None
self._dimensional_loaded: bool = False
# Prosodic extractor (openSMILE) — lazy, CF_VOICE_PROSODY=1
self._prosodic: "ProsodicExtractor | None" = None
self._prosodic_loaded: bool = False
# Per-speaker rolling dimensional buffers for trajectory/coherence signals.
# Keys are speaker_id strings; values are deques of DimensionalResult.
# Reset at session end alongside SpeakerTracker.
from collections import deque as _deque
from cf_voice.trajectory import BUFFER_WINDOW
self._dim_buffer: dict[str, "_deque"] = {}
self._last_ser_affect: dict[str, str] = {}
self._buffer_window = BUFFER_WINDOW
# Accent classifier — lazy, gated by CF_VOICE_ACCENT=1
self._accent: "MockAccentClassifier | AccentClassifier | None" = None
self._accent_loaded: bool = False
@classmethod
def from_env(
cls,
interval_s: float = 2.5,
user_id: str | None = None,
store=None,
) -> "ContextClassifier":
"""
Create a ContextClassifier from environment.
CF_VOICE_MOCK=1 activates full mock mode (no GPU, no audio hardware).
If real audio hardware is unavailable (faster-whisper not installed),
falls back to mock mode automatically.
user_id + store are forwarded to cf-core preferences for Elcor/threshold
lookups.
"""
if os.environ.get("CF_VOICE_MOCK", "") == "1":
return cls.mock(interval_s=interval_s, user_id=user_id, store=store)
try:
io = make_io(interval_s=interval_s)
except (NotImplementedError, ImportError):
# Real audio hardware or inference extras unavailable — fall back to
# mock mode so the coordinator starts cleanly on headless nodes.
return cls.mock(interval_s=interval_s, user_id=user_id, store=store)
return cls(io=io, user_id=user_id, store=store)
@classmethod
def mock(
cls,
interval_s: float = 2.5,
seed: int | None = None,
user_id: str | None = None,
store=None,
) -> "ContextClassifier":
"""Create a ContextClassifier backed by MockVoiceIO. Useful in tests."""
return cls(
io=MockVoiceIO(interval_s=interval_s, seed=seed),
user_id=user_id,
store=store,
)
async def stream(self) -> AsyncIterator[VoiceFrame]:
"""
Yield enriched VoiceFrames continuously.
Stub: frames from the IO layer pass through unchanged.
Real (Navigation v0.2.x): acoustic + diarization enrichment runs here.
"""
async for frame in self._io.stream():
yield self._enrich(frame)
async def stop(self) -> None:
await self._io.stop()
self._speaker_tracker.reset()
self._dim_buffer.clear()
self._last_ser_affect.clear()
def classify_chunk(
self,
audio_b64: str | None = None,
timestamp: float = 0.0,
prior_frames: int | None = None,
elcor: bool | None = None,
session_id: str = "",
) -> list[AudioEvent]:
"""
Classify a single audio window and return all AudioEvents.
Returns a heterogeneous list containing zero or one of each:
- ToneEvent (event_type="tone")
- AudioEvent (event_type="queue")
- AudioEvent (event_type="speaker")
- AudioEvent (event_type="environ")
This is the request-response path used by the cf-orch SSE endpoint.
The streaming path (async generator) is for continuous consumers.
audio_b64 Base64-encoded PCM int16 mono 16kHz bytes.
Pass None in mock mode (ignored).
timestamp Session-relative seconds since capture started.
prior_frames Rolling context window size for Elcor LLM.
Defaults to user preference (PREF_ELCOR_PRIOR_FRAMES).
elcor Override Elcor mode for this request.
None = read from user preference (PREF_ELCOR_MODE).
session_id Caller-assigned correlation ID for the session.
"""
use_elcor = elcor if elcor is not None else is_elcor_enabled(
user_id=self._user_id, store=self._store
)
context_frames = prior_frames if prior_frames is not None else get_elcor_prior_frames(
user_id=self._user_id, store=self._store
)
if isinstance(self._io, MockVoiceIO) or os.environ.get("CF_VOICE_MOCK", "") == "1":
return self._classify_mock(timestamp, context_frames, use_elcor, session_id)
if not audio_b64:
return []
return self._classify_real(audio_b64, timestamp, use_elcor, session_id)
async def classify_chunk_async(
self,
audio_b64: str | None = None,
timestamp: float = 0.0,
prior_frames: int | None = None,
elcor: bool | None = None,
session_id: str = "",
language: str | None = None,
num_speakers: int | None = None,
) -> list[AudioEvent]:
"""
Async variant of classify_chunk.
Runs tone, STT, diarization, and acoustic classification in parallel
using asyncio.gather(). Use this from async contexts (FastAPI routes)
to get true concurrency across all four inference paths.
"""
use_elcor = elcor if elcor is not None else is_elcor_enabled(
user_id=self._user_id, store=self._store
)
context_frames = prior_frames if prior_frames is not None else get_elcor_prior_frames(
user_id=self._user_id, store=self._store
)
if isinstance(self._io, MockVoiceIO) or os.environ.get("CF_VOICE_MOCK", "") == "1":
return self._classify_mock(timestamp, context_frames, use_elcor, session_id)
if not audio_b64:
return []
# Drop frame if a classify is already in flight — GPU models serialize
# anyway, so queuing just adds latency without improving output.
if self._classify_lock.locked():
logger.debug("classify busy — dropping frame at t=%.2f", timestamp)
return []
async with self._classify_lock:
# Diarization (pyannote) can take 38 s on first invocations even with GPU.
# 25 s gives enough headroom without stalling the stream for too long.
try:
return await asyncio.wait_for(
self._classify_real_async(audio_b64, timestamp, use_elcor, session_id, language, num_speakers),
timeout=25.0,
)
except asyncio.TimeoutError:
logger.warning("classify_real_async timed out at t=%.2f — dropping frame", timestamp)
return []
def _classify_mock(
self,
timestamp: float,
prior_frames: int,
elcor: bool,
session_id: str,
) -> list[AudioEvent]:
"""
Synthetic multi-class event batch.
Tone event comes from the MockVoiceIO RNG (consistent seed behaviour).
Queue/speaker/environ come from MockAcousticBackend (call lifecycle simulation).
"""
rng = self._io._rng # type: ignore[attr-defined]
label = rng.choice(self._io._labels) # type: ignore[attr-defined]
shift = rng.uniform(0.1, 0.7) if prior_frames > 0 else 0.0
frame = VoiceFrame(
label=label,
confidence=rng.uniform(0.6, 0.97),
speaker_id=rng.choice(self._io._speakers), # type: ignore[attr-defined]
shift_magnitude=round(shift, 3),
timestamp=timestamp,
)
tone: ToneEvent = tone_event_from_voice_frame(
frame_label=frame.label,
frame_confidence=frame.confidence,
shift_magnitude=frame.shift_magnitude,
timestamp=frame.timestamp,
elcor=elcor,
)
tone.session_id = session_id
acoustic = self._acoustic.classify_window(b"", timestamp=timestamp)
events: list[AudioEvent] = [tone]
if acoustic.queue:
events.append(acoustic.queue)
if acoustic.speaker:
events.append(acoustic.speaker)
if acoustic.environ:
events.append(acoustic.environ)
if acoustic.scene:
events.append(acoustic.scene)
return events
def _classify_real(
self,
audio_b64: str,
timestamp: float,
elcor: bool,
session_id: str,
) -> list[AudioEvent]:
"""
Real inference path — used when CF_VOICE_MOCK is unset.
Tone: wav2vec2 SER via ToneClassifier (classify.py).
Acoustic: YAMNet via YAMNetAcousticBackend (Navigation v0.2.x stub).
Speaker: pyannote VAD (diarize.py) — merged in ContextClassifier, not here.
"""
import base64
import numpy as np
from cf_voice.classify import ToneClassifier
pcm = base64.b64decode(audio_b64)
audio = np.frombuffer(pcm, dtype=np.int16).astype(np.float32) / 32_768.0
if self._tone is None:
self._tone = ToneClassifier.from_env()
tone_result = self._tone.classify(audio)
frame = VoiceFrame(
label=tone_result.label,
confidence=tone_result.confidence,
speaker_id="speaker_a",
shift_magnitude=0.0,
timestamp=timestamp,
)
tone: ToneEvent = tone_event_from_voice_frame(
frame_label=frame.label,
frame_confidence=frame.confidence,
shift_magnitude=frame.shift_magnitude,
timestamp=frame.timestamp,
elcor=elcor,
)
tone.session_id = session_id
events: list[AudioEvent] = [tone]
# Acoustic events: Navigation v0.2.x (YAMNet not yet implemented)
# YAMNetAcousticBackend raises NotImplementedError at construction —
# we catch and log rather than failing the entire classify call.
try:
acoustic = self._acoustic.classify_window(audio.tobytes(), timestamp=timestamp)
if acoustic.queue:
events.append(acoustic.queue)
if acoustic.speaker:
events.append(acoustic.speaker)
if acoustic.environ:
events.append(acoustic.environ)
if acoustic.scene:
events.append(acoustic.scene)
except NotImplementedError:
pass
return events
def _load_stt(self) -> "WhisperSTT | None":
"""Lazy-load WhisperSTT once. Returns None if unavailable or disabled."""
if self._stt_loaded:
return self._stt
self._stt_loaded = True
if os.environ.get("CF_VOICE_STT", "1") != "1":
model_status["stt"] = "disabled"
return None
model_status["stt"] = "loading"
try:
from cf_voice.stt import WhisperSTT
self._stt = WhisperSTT.from_env()
model_status["stt"] = "ready"
logger.info("WhisperSTT loaded (model=%s)", os.environ.get("CF_VOICE_WHISPER_MODEL", "small"))
except Exception as exc:
model_status["stt"] = "error"
logger.warning("WhisperSTT unavailable: %s", exc)
return self._stt
def _load_diarizer(self) -> "Diarizer | None":
"""Lazy-load Diarizer once. Returns None if HF_TOKEN absent or CF_VOICE_DIARIZE!=1."""
if self._diarizer_loaded:
return self._diarizer
self._diarizer_loaded = True
if os.environ.get("CF_VOICE_DIARIZE", "0") != "1":
model_status["diarizer"] = "disabled"
return None
model_status["diarizer"] = "loading"
try:
from cf_voice.diarize import Diarizer
self._diarizer = Diarizer.from_env()
model_status["diarizer"] = "ready"
logger.info("Diarizer loaded")
except Exception as exc:
model_status["diarizer"] = "error"
logger.warning("Diarizer unavailable: %s", exc)
return self._diarizer
def _load_dimensional(self) -> "DimensionalClassifier | None":
"""Lazy-load DimensionalClassifier once. Returns None if CF_VOICE_DIMENSIONAL!=1."""
if self._dimensional_loaded:
return self._dimensional
self._dimensional_loaded = True
if os.environ.get("CF_VOICE_DIMENSIONAL", "0") != "1":
model_status["dimensional"] = "disabled"
return None
model_status["dimensional"] = "loading"
try:
from cf_voice.dimensional import DimensionalClassifier
self._dimensional = DimensionalClassifier()
model_status["dimensional"] = "ready"
logger.info("DimensionalClassifier loaded (audeering VAD model)")
except Exception as exc:
model_status["dimensional"] = "error"
logger.warning("DimensionalClassifier unavailable: %s", exc)
return self._dimensional
def _load_accent(self) -> "MockAccentClassifier | AccentClassifier | None":
"""Lazy-load AccentClassifier once. Returns None if CF_VOICE_ACCENT!=1."""
if self._accent_loaded:
return self._accent
self._accent_loaded = True
from cf_voice.accent import make_accent_classifier
result = make_accent_classifier(
mock=isinstance(self._io, MockVoiceIO) or os.environ.get("CF_VOICE_MOCK", "") == "1"
)
self._accent = result
if result is None:
model_status["accent"] = "disabled"
else:
model_status["accent"] = "ready"
logger.info("AccentClassifier loaded (mock=%s)", isinstance(result, type(result).__mro__[0]))
return self._accent
def _load_prosodic(self) -> "ProsodicExtractor | None":
"""Lazy-load ProsodicExtractor once. Returns None if CF_VOICE_PROSODY!=1."""
if self._prosodic_loaded:
return self._prosodic
self._prosodic_loaded = True
if os.environ.get("CF_VOICE_PROSODY", "0") != "1":
model_status["prosody"] = "disabled"
return None
model_status["prosody"] = "loading"
try:
from cf_voice.prosody import ProsodicExtractor
self._prosodic = ProsodicExtractor()
model_status["prosody"] = "ready"
logger.info("ProsodicExtractor loaded (openSMILE eGeMAPS)")
except Exception as exc:
model_status["prosody"] = "error"
logger.warning("ProsodicExtractor unavailable: %s", exc)
return self._prosodic
async def prewarm(self) -> None:
"""Pre-load all configured models in a thread-pool so downloads happen at
startup rather than on the first classify call. Safe to call multiple times
(each _load_* method is idempotent after the first call)."""
if isinstance(self._io, MockVoiceIO) or os.environ.get("CF_VOICE_MOCK", "") == "1":
return
loop = asyncio.get_running_loop()
# Load each model in its own executor slot so status updates are visible
# as each one completes rather than all at once.
await loop.run_in_executor(None, self._load_stt)
await loop.run_in_executor(None, self._load_diarizer)
await loop.run_in_executor(None, self._load_dimensional)
await loop.run_in_executor(None, self._load_prosodic)
logger.info("cf-voice prewarm complete: %s", model_status)
async def _classify_real_async(
self,
audio_b64: str,
timestamp: float,
elcor: bool,
session_id: str,
language: str | None = None,
num_speakers: int | None = None,
) -> list[AudioEvent]:
"""
Real inference path running all classifiers in parallel.
Tone (wav2vec2) + STT (Whisper) + Diarization (pyannote, optional) +
Acoustic (AST) all run concurrently via asyncio.gather(). Each result
is type-checked after gather — a single classifier failure does not
abort the call.
Transcript text is fed back to ToneClassifier as a weak signal (e.g.
"unfortunately" biases toward apologetic). Diarizer output sets the
speaker_id on the VoiceFrame.
"""
import base64
from functools import partial
import numpy as np
from cf_voice.classify import ToneClassifier, _apply_transcript_hints, _AFFECT_TO_LABEL
pcm = base64.b64decode(audio_b64)
audio = np.frombuffer(pcm, dtype=np.int16).astype(np.float32) / 32_768.0
# Lazy-load models on first real call
if self._tone is None:
self._tone = ToneClassifier.from_env()
stt = self._load_stt()
diarizer = self._load_diarizer()
dimensional = self._load_dimensional()
prosodic = self._load_prosodic()
accent_clf = self._load_accent()
# Build coroutines — all run in thread pool executors internally.
# Dimensional, prosodic, and accent run in parallel with SER/STT/diarization.
tone_coro = self._tone.classify_async(audio)
stt_coro = stt.transcribe_chunk_async(pcm, language=language) if stt else _noop_stt()
diarize_coro = diarizer.diarize_async(audio, num_speakers=num_speakers) if diarizer else _noop_diarize()
loop = asyncio.get_running_loop()
acoustic_coro = loop.run_in_executor(
None, partial(self._acoustic.classify_window, audio.tobytes(), timestamp)
)
dimensional_coro = dimensional.classify_async(audio) if dimensional else _noop_stt()
prosodic_coro = prosodic.extract_async(audio) if prosodic else _noop_stt()
accent_coro = loop.run_in_executor(
None, partial(accent_clf.classify, audio.tobytes())
) if accent_clf else _noop_stt()
(
tone_result, stt_result, diarize_segs, acoustic,
dimensional_result, prosodic_result, accent_result,
) = await asyncio.gather(
tone_coro, stt_coro, diarize_coro, acoustic_coro,
dimensional_coro, prosodic_coro, accent_coro,
return_exceptions=True,
)
# Extract transcript text (STT optional)
transcript = ""
if stt_result and not isinstance(stt_result, BaseException):
transcript = stt_result.text # type: ignore[union-attr]
# Apply transcript weak signal to affect if STT produced text
if transcript and not isinstance(tone_result, BaseException):
new_affect = _apply_transcript_hints(tone_result.affect, transcript) # type: ignore[union-attr]
if new_affect != tone_result.affect: # type: ignore[union-attr]
from cf_voice.classify import ToneResult
tone_result = ToneResult( # type: ignore[assignment]
label=_AFFECT_TO_LABEL.get(new_affect, tone_result.label), # type: ignore[union-attr]
affect=new_affect,
confidence=tone_result.confidence, # type: ignore[union-attr]
prosody_flags=tone_result.prosody_flags, # type: ignore[union-attr]
)
# Get speaker_id from diarization (falls back to "speaker_a")
speaker_id = "speaker_a"
if isinstance(diarize_segs, BaseException):
logger.warning("Diarizer failed in gather: %s", diarize_segs)
elif diarizer and diarize_segs is not None:
window_mid = len(audio) / 2.0 / 16_000.0
speaker_id = diarizer.speaker_at( # type: ignore[arg-type]
diarize_segs, window_mid, tracker=self._speaker_tracker
)
logger.debug("diarize: segs=%d speaker=%s mid=%.3f", len(diarize_segs), speaker_id, window_mid)
if isinstance(tone_result, BaseException):
logger.warning("Tone classifier failed: %s", tone_result)
return []
# Unpack dimensional result (None when classifier is disabled or failed)
dim = None
if dimensional_result and not isinstance(dimensional_result, BaseException):
dim = dimensional_result
# Unpack prosodic result. If dimensional is also available, pass the
# calm-positive score so sarcasm_risk benefits from both signals.
pros = None
if prosodic_result and not isinstance(prosodic_result, BaseException):
if dim is not None:
# Re-compute sarcasm_risk with dimensional context
from cf_voice.prosody import _compute_sarcasm_risk
calm_pos = dim.calm_positive_score()
updated_risk = _compute_sarcasm_risk(
flat_f0=prosodic_result.flat_f0_score, # type: ignore[union-attr]
calm_positive=calm_pos,
)
from cf_voice.prosody import ProsodicSignal
pros = ProsodicSignal(
f0_mean=prosodic_result.f0_mean, # type: ignore[union-attr]
f0_std=prosodic_result.f0_std, # type: ignore[union-attr]
jitter=prosodic_result.jitter, # type: ignore[union-attr]
shimmer=prosodic_result.shimmer, # type: ignore[union-attr]
loudness=prosodic_result.loudness, # type: ignore[union-attr]
flat_f0_score=prosodic_result.flat_f0_score, # type: ignore[union-attr]
sarcasm_risk=updated_risk,
)
else:
pros = prosodic_result
frame = VoiceFrame(
label=tone_result.label, # type: ignore[union-attr]
confidence=tone_result.confidence, # type: ignore[union-attr]
speaker_id=speaker_id,
shift_magnitude=0.0,
timestamp=timestamp,
valence=dim.valence if dim else None,
arousal=dim.arousal if dim else None,
dominance=dim.dominance if dim else None,
sarcasm_risk=pros.sarcasm_risk if pros else None,
flat_f0_score=pros.flat_f0_score if pros else None,
)
tone_event: ToneEvent = tone_event_from_voice_frame(
frame_label=frame.label,
frame_confidence=frame.confidence,
shift_magnitude=frame.shift_magnitude,
timestamp=frame.timestamp,
elcor=elcor,
)
tone_event.session_id = session_id
tone_event.speaker_id = speaker_id
# Attach dimensional and prosodic results to the wire event
tone_event.valence = frame.valence
tone_event.arousal = frame.arousal
tone_event.dominance = frame.dominance
tone_event.sarcasm_risk = frame.sarcasm_risk
tone_event.flat_f0_score = frame.flat_f0_score
# Trajectory and coherence signals — only when dimensional is running
if dim:
from collections import deque as _deque
from cf_voice.trajectory import compute_trajectory
spk_buffer = self._dim_buffer.setdefault(
speaker_id, _deque(maxlen=self._buffer_window)
)
prior_affect = self._last_ser_affect.get(speaker_id)
traj, coher = compute_trajectory(
spk_buffer, dim, tone_result.affect, prior_affect # type: ignore[union-attr]
)
# Update buffer and affect history after computing (not before)
spk_buffer.append(dim)
self._last_ser_affect[speaker_id] = tone_result.affect # type: ignore[union-attr]
tone_event.arousal_delta = traj.arousal_delta if traj.baseline_established else None
tone_event.valence_delta = traj.valence_delta if traj.baseline_established else None
tone_event.trend = traj.trend if traj.baseline_established else None
tone_event.coherence_score = coher.coherence_score
tone_event.suppression_flag = coher.suppression_flag
tone_event.reframe_type = coher.reframe_type if coher.reframe_type != "none" else None
tone_event.affect_divergence = coher.affect_divergence
logger.debug(
"Dimensional: valence=%.3f arousal=%.3f dominance=%.3f quadrant=%s "
"trend=%s coherence=%.2f suppressed=%s reframe=%s",
dim.valence, dim.arousal, dim.dominance, dim.affect_quadrant(),
traj.trend, coher.coherence_score, coher.suppression_flag, coher.reframe_type,
)
if pros:
logger.debug(
"Prosodic: flat_f0=%.3f sarcasm_risk=%.3f",
pros.flat_f0_score, pros.sarcasm_risk,
)
events: list[AudioEvent] = [tone_event]
# Emit transcript event so consumers can display live STT
if transcript:
events.append(AudioEvent(
timestamp=timestamp,
event_type="transcript", # type: ignore[arg-type]
label=transcript,
confidence=1.0,
speaker_id=speaker_id,
))
# Acoustic events (queue / speaker type / environ / scene)
scene_label: str | None = None
environ_labels: list[str] = []
speaker_label: str | None = None
if not isinstance(acoustic, BaseException):
if acoustic.queue: # type: ignore[union-attr]
events.append(acoustic.queue) # type: ignore[union-attr]
if acoustic.speaker: # type: ignore[union-attr]
events.append(acoustic.speaker) # type: ignore[union-attr]
speaker_label = acoustic.speaker.label # type: ignore[union-attr]
if acoustic.environ: # type: ignore[union-attr]
events.append(acoustic.environ) # type: ignore[union-attr]
environ_labels = [acoustic.environ.label] # type: ignore[union-attr]
if acoustic.scene: # type: ignore[union-attr]
events.append(acoustic.scene) # type: ignore[union-attr]
scene_label = acoustic.scene.label # type: ignore[union-attr]
# Accent event (optional — gated by CF_VOICE_ACCENT=1)
accent_region: str | None = None
if accent_result and not isinstance(accent_result, BaseException):
accent_region = accent_result.region # type: ignore[union-attr]
events.append(AudioEvent(
timestamp=timestamp,
event_type="accent", # type: ignore[arg-type]
label=accent_region,
confidence=accent_result.confidence, # type: ignore[union-attr]
speaker_id=speaker_id,
))
# Privacy risk scoring — local only, never transmitted
from cf_voice.privacy import score_privacy_risk
risk = score_privacy_risk(
scene=scene_label,
environ_labels=environ_labels,
speaker=speaker_label,
accent=accent_region,
)
if risk.level != "low":
logger.info(
"privacy_risk=%s flags=%s session=%s",
risk.level, risk.flags, session_id,
)
# Attach risk to the tone event so Linnet can surface the gate
tone_event.prosody_flags = list(tone_event.prosody_flags) + [f"privacy:{risk.level}"]
return events
def _enrich(self, frame: VoiceFrame) -> VoiceFrame:
"""
Apply tone classification to a raw frame (streaming path).
Stub: identity transform — returns frame unchanged.
Real (Navigation v0.2.x): replace label + confidence with classifier output.
"""
return frame