New modules shipped (from Linnet integration): - acoustic.py: AST (MIT/ast-finetuned-audioset-10-10-0.4593) replaces YAMNet stub; 527 AudioSet classes mapped to queue/speaker/environ/scene labels; _LABEL_MAP includes hold_music, ringback, DTMF, background_shift, AMD signal chain - accent.py: facebook/mms-lid-126 language ID → regional accent labels (en_gb, en_us, en_au, fr, es, de, zh, …); lazy-loaded, gated by CF_VOICE_ACCENT - privacy.py: compound privacy risk scorer — public_env, background_voices, nature scene, accent signals; returns 0–3 score without storing any audio - prosody.py: openSMILE-backed prosody extractor (sarcasm_risk, flat_f0_score, speech_rate, pitch_range); mock mode returns neutral values - dimensional.py: audeering/wav2vec2-large-robust-12-ft-emotion-msp-dim valence/arousal/dominance scorer; gated by CF_VOICE_DIMENSIONAL - trajectory.py: rolling buffer for arousal/valence deltas, trend detection (escalating/suppressed/stable), coherence scoring, suppression/reframe flags - telephony.py: TelephonyBackend Protocol + MockTelephonyBackend + SignalWireBackend + FreeSWITCHBackend; CallSession dataclass; make_telephony() factory - app.py: FastAPI service (port 8007) — /health + /classify; accepts base64 PCM chunks, returns full AudioEventOut including dimensional/prosody/accent fields - prefs.py: voice preference helpers (elcor_mode, confidence_threshold, whisper_model, elcor_prior_frames); cf-core and env-var fallback Tests: fix stale tests (YAMNetAcousticBackend → ASTAcousticBackend, scene field added to AcousticResult, speaker_at gap now resolves dominant speaker not UNKNOWN, make_io real path returns MicVoiceIO when sounddevice installed). 78 tests passing. Closes #2, #3.
131 lines
4.3 KiB
Python
131 lines
4.3 KiB
Python
# tests/test_diarize.py — SpeakerTracker and speaker_at() diarization logic
|
|
#
|
|
# All tests are pure Python — no GPU, no pyannote, no HF_TOKEN required.
|
|
# The Diarizer class itself is only tested for its from_env() guard and the
|
|
# speaker_at() method, both of which run without loading the model.
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import pytest
|
|
|
|
from cf_voice.diarize import (
|
|
Diarizer,
|
|
SpeakerSegment,
|
|
SpeakerTracker,
|
|
SPEAKER_MULTIPLE,
|
|
SPEAKER_UNKNOWN,
|
|
)
|
|
|
|
|
|
# ── SpeakerTracker ────────────────────────────────────────────────────────────
|
|
|
|
def test_tracker_first_speaker_is_a():
|
|
t = SpeakerTracker()
|
|
assert t.label("SPEAKER_00") == "Speaker A"
|
|
|
|
|
|
def test_tracker_second_speaker_is_b():
|
|
t = SpeakerTracker()
|
|
t.label("SPEAKER_00")
|
|
assert t.label("SPEAKER_01") == "Speaker B"
|
|
|
|
|
|
def test_tracker_same_id_returns_same_label():
|
|
t = SpeakerTracker()
|
|
first = t.label("SPEAKER_00")
|
|
second = t.label("SPEAKER_00")
|
|
assert first == second == "Speaker A"
|
|
|
|
|
|
def test_tracker_26_speakers():
|
|
t = SpeakerTracker()
|
|
labels = [t.label(f"SPEAKER_{i:02d}") for i in range(26)]
|
|
assert labels[0] == "Speaker A"
|
|
assert labels[25] == "Speaker Z"
|
|
|
|
|
|
def test_tracker_27th_speaker_wraps():
|
|
t = SpeakerTracker()
|
|
for i in range(26):
|
|
t.label(f"SPEAKER_{i:02d}")
|
|
label_27 = t.label("SPEAKER_26")
|
|
assert label_27 == "Speaker AA"
|
|
|
|
|
|
def test_tracker_reset_clears_map():
|
|
t = SpeakerTracker()
|
|
t.label("SPEAKER_00")
|
|
t.label("SPEAKER_01")
|
|
t.reset()
|
|
# After reset, SPEAKER_01 is seen as new and maps to "Speaker A" again
|
|
assert t.label("SPEAKER_01") == "Speaker A"
|
|
|
|
|
|
# ── Diarizer.speaker_at() ─────────────────────────────────────────────────────
|
|
|
|
def _segs(*items: tuple[str, float, float]) -> list[SpeakerSegment]:
|
|
return [SpeakerSegment(speaker_id=s, start_s=st, end_s=en) for s, st, en in items]
|
|
|
|
|
|
def test_speaker_at_single_speaker():
|
|
d = object.__new__(Diarizer) # bypass __init__ (no GPU needed)
|
|
segs = _segs(("SPEAKER_00", 0.0, 2.0))
|
|
t = SpeakerTracker()
|
|
assert d.speaker_at(segs, 1.0, tracker=t) == "Speaker A"
|
|
|
|
|
|
def test_speaker_at_no_coverage_returns_unknown():
|
|
d = object.__new__(Diarizer)
|
|
segs = _segs(("SPEAKER_00", 0.0, 1.0))
|
|
assert d.speaker_at(segs, 1.5) == SPEAKER_UNKNOWN
|
|
|
|
|
|
def test_speaker_at_empty_segments_returns_unknown():
|
|
d = object.__new__(Diarizer)
|
|
assert d.speaker_at([], 1.0) == SPEAKER_UNKNOWN
|
|
|
|
|
|
def test_speaker_at_overlap_returns_multiple():
|
|
d = object.__new__(Diarizer)
|
|
segs = _segs(
|
|
("SPEAKER_00", 0.0, 2.0),
|
|
("SPEAKER_01", 0.5, 2.0), # overlaps SPEAKER_00 from 0.5s
|
|
)
|
|
assert d.speaker_at(segs, 1.0) == SPEAKER_MULTIPLE
|
|
|
|
|
|
def test_speaker_at_boundary_inclusive():
|
|
d = object.__new__(Diarizer)
|
|
segs = _segs(("SPEAKER_00", 1.0, 2.0))
|
|
t = SpeakerTracker()
|
|
# Exact boundary timestamps are included
|
|
assert d.speaker_at(segs, 1.0, tracker=t) == "Speaker A"
|
|
assert d.speaker_at(segs, 2.0, tracker=t) == "Speaker A"
|
|
|
|
|
|
def test_speaker_at_without_tracker_returns_raw_id():
|
|
d = object.__new__(Diarizer)
|
|
segs = _segs(("SPEAKER_00", 0.0, 2.0))
|
|
assert d.speaker_at(segs, 1.0) == "SPEAKER_00"
|
|
|
|
|
|
def test_speaker_at_two_speakers_no_overlap():
|
|
d = object.__new__(Diarizer)
|
|
t = SpeakerTracker()
|
|
segs = _segs(
|
|
("SPEAKER_00", 0.0, 1.0),
|
|
("SPEAKER_01", 1.5, 2.5),
|
|
)
|
|
assert d.speaker_at(segs, 0.5, tracker=t) == "Speaker A"
|
|
assert d.speaker_at(segs, 2.0, tracker=t) == "Speaker B"
|
|
# Gap at 1.2s: window [0.7, 1.7] → SPEAKER_00 has 0.3s, SPEAKER_01 has 0.2s
|
|
# Dominant speaker (SPEAKER_00 = "Speaker A") is returned, not SPEAKER_UNKNOWN.
|
|
assert d.speaker_at(segs, 1.2, tracker=t) == "Speaker A"
|
|
|
|
|
|
# ── Diarizer.from_env() guard ─────────────────────────────────────────────────
|
|
|
|
def test_from_env_raises_without_hf_token(monkeypatch):
|
|
monkeypatch.delenv("HF_TOKEN", raising=False)
|
|
with pytest.raises(EnvironmentError, match="HF_TOKEN"):
|
|
Diarizer.from_env()
|