turnstone/tests/test_diagnose_classifier.py
pyr0ball 6ea8fbfec1 feat: Stage 2 — SeverityClassifier for multi-agent diagnose pipeline (issue #29)
Three-path classification: ML (transformers pipeline, lazy singleton) →
pattern_tags (YAML pattern severity dict) → regex (detect_severity).

- Path A: HF text-classification pipeline loaded lazily on first classify()
  call via module-level singleton; shim promotes ERROR+keyword hits to CRITICAL
  and demotes low-confidence INFO to DEBUG.
- Path B: maps cluster.pattern_tags through the loaded pattern severity dict;
  picks the highest severity across matching tags.
- Path C: falls back to detect_severity() regex scan on representative_text;
  defaults to INFO when no keyword matches.
- Pattern file resolved from constructor arg or TURNSTONE_PATTERNS env var
  (mirrors app/rest.py convention).
- No crash when transformers is not installed; ImportError on per-cluster ML
  inference triggers clean per-cluster fallback to pattern_tags/regex.
- ClassifiedTimeline.classifier_used reflects the primary session path.

Tests (10 new, 328 total, all passing):
- ML ERROR, CRITICAL promotion, DEBUG demotion, WARNING→WARN
- pattern_tags resolution from YAML fixture
- regex ERROR detection and INFO default
- ImportError clean fallback
- empty timeline no-crash
- ClassifiedTimeline FrozenInstanceError on mutation

Closes: #29
2026-05-25 13:27:17 -07:00

245 lines
8.8 KiB
Python

"""Tests for app/services/diagnose/classifier.py — SeverityClassifier.
All ML-path tests mock ``transformers.pipeline`` so no model weights are
downloaded during the test suite.
"""
from __future__ import annotations
from dataclasses import FrozenInstanceError
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
import app.services.diagnose.classifier as clf_module
from app.services.diagnose.classifier import SeverityClassifier
from app.services.diagnose.models import ClassifiedTimeline, EventCluster, TimelineResult
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def reset_ml_singleton():
"""Ensure the module-level ML singleton is cleared before and after each test."""
clf_module._ml_classifier = None
yield
clf_module._ml_classifier = None
# ---------------------------------------------------------------------------
# Test-object builders
# ---------------------------------------------------------------------------
def _make_cluster(
representative_text: str = "test log",
pattern_tags: tuple[str, ...] = (),
severity: str = "INFO",
) -> EventCluster:
return EventCluster(
cluster_id="abc123",
entries=("e1",),
start_iso=None,
end_iso=None,
duration_seconds=0.0,
source_ids=("src",),
pattern_tags=pattern_tags,
severity=severity, # type: ignore[arg-type]
burst=False,
gap_before_seconds=0.0,
representative_text=representative_text,
)
def _make_timeline(clusters: tuple[EventCluster, ...] = ()) -> TimelineResult:
return TimelineResult(
clusters=clusters,
total_entries=0,
window_start=None,
window_end=None,
gap_count=0,
burst_count=0,
dominant_sources=(),
)
def _mock_hf_pipeline(label: str, score: float) -> MagicMock:
"""Return a mock HF pipeline callable that always yields one result."""
pipe = MagicMock()
pipe.return_value = [{"label": label, "score": score}]
return pipe
# ---------------------------------------------------------------------------
# Path A — ML classification
# ---------------------------------------------------------------------------
class TestMLPath:
def test_ml_error_maps_to_error(self) -> None:
"""ML returning ERROR with score 0.98 → cluster severity ERROR."""
pipe = _mock_hf_pipeline("ERROR", 0.98)
with patch(
"app.services.diagnose.classifier._get_ml_classifier", return_value=pipe
):
clf = SeverityClassifier(model_id="fake/model")
result = clf.classify(_make_timeline(((_make_cluster("disk error detected")),)))
assert result.cluster_severities["abc123"] == "ERROR"
assert result.classifier_used == "ml"
assert result.model_id == "fake/model"
def test_ml_critical_promotion(self) -> None:
"""ERROR + score > 0.95 + 'kernel panic' in text → promoted to CRITICAL."""
pipe = _mock_hf_pipeline("ERROR", 0.97)
with patch(
"app.services.diagnose.classifier._get_ml_classifier", return_value=pipe
):
clf = SeverityClassifier(model_id="fake/model")
result = clf.classify(
_make_timeline((_make_cluster("kernel panic: not syncing VFS"),))
)
assert result.cluster_severities["abc123"] == "CRITICAL"
def test_ml_debug_demotion(self) -> None:
"""INFO + score < 0.4 → demoted to DEBUG."""
pipe = _mock_hf_pipeline("INFO", 0.3)
with patch(
"app.services.diagnose.classifier._get_ml_classifier", return_value=pipe
):
clf = SeverityClassifier(model_id="fake/model")
result = clf.classify(_make_timeline((_make_cluster("routine ping"),)))
assert result.cluster_severities["abc123"] == "DEBUG"
def test_ml_warning_maps_to_warn(self) -> None:
"""ML returning WARNING → mapped to WARN."""
pipe = _mock_hf_pipeline("WARNING", 0.85)
with patch(
"app.services.diagnose.classifier._get_ml_classifier", return_value=pipe
):
clf = SeverityClassifier(model_id="fake/model")
result = clf.classify(_make_timeline((_make_cluster("low disk space"),)))
assert result.cluster_severities["abc123"] == "WARN"
# ---------------------------------------------------------------------------
# Path B — pattern_tags fallback
# ---------------------------------------------------------------------------
class TestPatternTagsPath:
def test_pattern_tags_resolve_error_severity(self, tmp_path: Path) -> None:
"""Cluster with pattern_tag 'service_crash_loop' → ERROR from pattern file."""
pattern_yaml = tmp_path / "default.yaml"
pattern_yaml.write_text(
"patterns:\n"
" - name: service_crash_loop\n"
" pattern: crash\n"
" severity: ERROR\n"
" description: Service crashed in a loop\n"
)
clf = SeverityClassifier(model_id="", pattern_file=pattern_yaml)
cluster = _make_cluster(
representative_text="service crashed",
pattern_tags=("service_crash_loop",),
)
result = clf.classify(_make_timeline((cluster,)))
assert result.cluster_severities["abc123"] == "ERROR"
assert result.classifier_used == "pattern_tags"
assert result.model_id is None
# ---------------------------------------------------------------------------
# Path C — regex fallback
# ---------------------------------------------------------------------------
class TestRegexPath:
def test_regex_detects_error(self) -> None:
"""No ML, no pattern file: 'ERROR: disk full' → ERROR via regex."""
clf = SeverityClassifier(model_id="")
result = clf.classify(
_make_timeline((_make_cluster("ERROR: disk full"),))
)
assert result.cluster_severities["abc123"] == "ERROR"
assert result.classifier_used == "regex"
def test_regex_defaults_to_info_when_no_match(self) -> None:
"""No severity keyword in text → defaults to INFO."""
clf = SeverityClassifier(model_id="")
result = clf.classify(
_make_timeline((_make_cluster("mount: disk mounted successfully"),))
)
assert result.cluster_severities["abc123"] == "INFO"
# ---------------------------------------------------------------------------
# Fallback behaviour
# ---------------------------------------------------------------------------
class TestImportErrorFallback:
def test_transformers_import_error_falls_back_to_pattern_tags(
self, tmp_path: Path
) -> None:
"""ImportError from transformers → clean fallback to pattern_tags path."""
pattern_yaml = tmp_path / "default.yaml"
pattern_yaml.write_text(
"patterns:\n"
" - name: auth_failure\n"
" pattern: auth\n"
" severity: ERROR\n"
" description: Auth failure\n"
)
def _raising_get_ml(*_args: Any, **_kwargs: Any) -> None:
raise ImportError("No module named 'transformers'")
with patch(
"app.services.diagnose.classifier._get_ml_classifier",
side_effect=_raising_get_ml,
):
clf = SeverityClassifier(model_id="fake/model", pattern_file=pattern_yaml)
cluster = _make_cluster(
representative_text="auth failed",
pattern_tags=("auth_failure",),
)
result = clf.classify(_make_timeline((cluster,)))
# ML was attempted (classifier_used == "ml") but pattern_tags resolved it
assert result.classifier_used == "ml"
assert result.cluster_severities["abc123"] == "ERROR"
# ---------------------------------------------------------------------------
# Edge cases
# ---------------------------------------------------------------------------
class TestEdgeCases:
def test_empty_timeline_produces_empty_severities(self) -> None:
"""TimelineResult with no clusters → empty cluster_severities, no crash."""
clf = SeverityClassifier(model_id="")
result = clf.classify(_make_timeline())
assert isinstance(result, ClassifiedTimeline)
assert result.cluster_severities == {}
assert result.classifier_used == "regex"
def test_classified_timeline_is_frozen(self) -> None:
"""ClassifiedTimeline must be frozen (FrozenInstanceError on mutation)."""
clf = SeverityClassifier(model_id="")
result = clf.classify(_make_timeline((_make_cluster(),)))
with pytest.raises(FrozenInstanceError):
result.classifier_used = "ml" # type: ignore[misc]