"""Tests for app/services/diagnose/classifier.py — SeverityClassifier. All ML-path tests mock ``transformers.pipeline`` so no model weights are downloaded during the test suite. """ from __future__ import annotations from dataclasses import FrozenInstanceError from pathlib import Path from typing import Any from unittest.mock import MagicMock, patch import pytest import app.services.diagnose.classifier as clf_module from app.services.diagnose.classifier import SeverityClassifier from app.services.diagnose.models import ClassifiedTimeline, EventCluster, TimelineResult # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture(autouse=True) def reset_ml_singleton(): """Ensure the module-level ML singleton is cleared before and after each test.""" clf_module._ml_classifier = None yield clf_module._ml_classifier = None # --------------------------------------------------------------------------- # Test-object builders # --------------------------------------------------------------------------- def _make_cluster( representative_text: str = "test log", pattern_tags: tuple[str, ...] = (), severity: str = "INFO", ) -> EventCluster: return EventCluster( cluster_id="abc123", entries=("e1",), start_iso=None, end_iso=None, duration_seconds=0.0, source_ids=("src",), pattern_tags=pattern_tags, severity=severity, # type: ignore[arg-type] burst=False, gap_before_seconds=0.0, representative_text=representative_text, ) def _make_timeline(clusters: tuple[EventCluster, ...] = ()) -> TimelineResult: return TimelineResult( clusters=clusters, total_entries=0, window_start=None, window_end=None, gap_count=0, burst_count=0, dominant_sources=(), ) def _mock_hf_pipeline(label: str, score: float) -> MagicMock: """Return a mock HF pipeline callable that always yields one result.""" pipe = MagicMock() pipe.return_value = [{"label": label, "score": score}] return pipe # --------------------------------------------------------------------------- # Path A — ML classification # --------------------------------------------------------------------------- class TestMLPath: def test_ml_error_maps_to_error(self) -> None: """ML returning ERROR with score 0.98 → cluster severity ERROR.""" pipe = _mock_hf_pipeline("ERROR", 0.98) with patch( "app.services.diagnose.classifier._get_ml_classifier", return_value=pipe ): clf = SeverityClassifier(model_id="fake/model") result = clf.classify(_make_timeline(((_make_cluster("disk error detected")),))) assert result.cluster_severities["abc123"] == "ERROR" assert result.classifier_used == "ml" assert result.model_id == "fake/model" def test_ml_critical_promotion(self) -> None: """ERROR + score > 0.95 + 'kernel panic' in text → promoted to CRITICAL.""" pipe = _mock_hf_pipeline("ERROR", 0.97) with patch( "app.services.diagnose.classifier._get_ml_classifier", return_value=pipe ): clf = SeverityClassifier(model_id="fake/model") result = clf.classify( _make_timeline((_make_cluster("kernel panic: not syncing VFS"),)) ) assert result.cluster_severities["abc123"] == "CRITICAL" def test_ml_debug_demotion(self) -> None: """INFO + score < 0.4 → demoted to DEBUG.""" pipe = _mock_hf_pipeline("INFO", 0.3) with patch( "app.services.diagnose.classifier._get_ml_classifier", return_value=pipe ): clf = SeverityClassifier(model_id="fake/model") result = clf.classify(_make_timeline((_make_cluster("routine ping"),))) assert result.cluster_severities["abc123"] == "DEBUG" def test_ml_warning_maps_to_warn(self) -> None: """ML returning WARNING → mapped to WARN.""" pipe = _mock_hf_pipeline("WARNING", 0.85) with patch( "app.services.diagnose.classifier._get_ml_classifier", return_value=pipe ): clf = SeverityClassifier(model_id="fake/model") result = clf.classify(_make_timeline((_make_cluster("low disk space"),))) assert result.cluster_severities["abc123"] == "WARN" # --------------------------------------------------------------------------- # Path B — pattern_tags fallback # --------------------------------------------------------------------------- class TestPatternTagsPath: def test_pattern_tags_resolve_error_severity(self, tmp_path: Path) -> None: """Cluster with pattern_tag 'service_crash_loop' → ERROR from pattern file.""" pattern_yaml = tmp_path / "default.yaml" pattern_yaml.write_text( "patterns:\n" " - name: service_crash_loop\n" " pattern: crash\n" " severity: ERROR\n" " description: Service crashed in a loop\n" ) clf = SeverityClassifier(model_id="", pattern_file=pattern_yaml) cluster = _make_cluster( representative_text="service crashed", pattern_tags=("service_crash_loop",), ) result = clf.classify(_make_timeline((cluster,))) assert result.cluster_severities["abc123"] == "ERROR" assert result.classifier_used == "pattern_tags" assert result.model_id is None # --------------------------------------------------------------------------- # Path C — regex fallback # --------------------------------------------------------------------------- class TestRegexPath: def test_regex_detects_error(self) -> None: """No ML, no pattern file: 'ERROR: disk full' → ERROR via regex.""" clf = SeverityClassifier(model_id="") result = clf.classify( _make_timeline((_make_cluster("ERROR: disk full"),)) ) assert result.cluster_severities["abc123"] == "ERROR" assert result.classifier_used == "regex" def test_regex_defaults_to_info_when_no_match(self) -> None: """No severity keyword in text → defaults to INFO.""" clf = SeverityClassifier(model_id="") result = clf.classify( _make_timeline((_make_cluster("mount: disk mounted successfully"),)) ) assert result.cluster_severities["abc123"] == "INFO" # --------------------------------------------------------------------------- # Fallback behaviour # --------------------------------------------------------------------------- class TestImportErrorFallback: def test_transformers_import_error_falls_back_to_pattern_tags( self, tmp_path: Path ) -> None: """ImportError from transformers → clean fallback to pattern_tags path.""" pattern_yaml = tmp_path / "default.yaml" pattern_yaml.write_text( "patterns:\n" " - name: auth_failure\n" " pattern: auth\n" " severity: ERROR\n" " description: Auth failure\n" ) def _raising_get_ml(*_args: Any, **_kwargs: Any) -> None: raise ImportError("No module named 'transformers'") with patch( "app.services.diagnose.classifier._get_ml_classifier", side_effect=_raising_get_ml, ): clf = SeverityClassifier(model_id="fake/model", pattern_file=pattern_yaml) cluster = _make_cluster( representative_text="auth failed", pattern_tags=("auth_failure",), ) result = clf.classify(_make_timeline((cluster,))) # ML was attempted (classifier_used == "ml") but pattern_tags resolved it assert result.classifier_used == "ml" assert result.cluster_severities["abc123"] == "ERROR" # --------------------------------------------------------------------------- # Edge cases # --------------------------------------------------------------------------- class TestEdgeCases: def test_empty_timeline_produces_empty_severities(self) -> None: """TimelineResult with no clusters → empty cluster_severities, no crash.""" clf = SeverityClassifier(model_id="") result = clf.classify(_make_timeline()) assert isinstance(result, ClassifiedTimeline) assert result.cluster_severities == {} assert result.classifier_used == "regex" def test_classified_timeline_is_frozen(self) -> None: """ClassifiedTimeline must be frozen (FrozenInstanceError on mutation).""" clf = SeverityClassifier(model_id="") result = clf.classify(_make_timeline((_make_cluster(),))) with pytest.raises(FrozenInstanceError): result.classifier_used = "ml" # type: ignore[misc]