From 6ea8fbfec1141da338415623a0779cbbfe74f0a3 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 25 May 2026 13:27:17 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20Stage=202=20=E2=80=94=20SeverityClassif?= =?UTF-8?q?ier=20for=20multi-agent=20diagnose=20pipeline=20(issue=20#29)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three-path classification: ML (transformers pipeline, lazy singleton) → pattern_tags (YAML pattern severity dict) → regex (detect_severity). - Path A: HF text-classification pipeline loaded lazily on first classify() call via module-level singleton; shim promotes ERROR+keyword hits to CRITICAL and demotes low-confidence INFO to DEBUG. - Path B: maps cluster.pattern_tags through the loaded pattern severity dict; picks the highest severity across matching tags. - Path C: falls back to detect_severity() regex scan on representative_text; defaults to INFO when no keyword matches. - Pattern file resolved from constructor arg or TURNSTONE_PATTERNS env var (mirrors app/rest.py convention). - No crash when transformers is not installed; ImportError on per-cluster ML inference triggers clean per-cluster fallback to pattern_tags/regex. - ClassifiedTimeline.classifier_used reflects the primary session path. Tests (10 new, 328 total, all passing): - ML ERROR, CRITICAL promotion, DEBUG demotion, WARNING→WARN - pattern_tags resolution from YAML fixture - regex ERROR detection and INFO default - ImportError clean fallback - empty timeline no-crash - ClassifiedTimeline FrozenInstanceError on mutation Closes: https://git.opensourcesolarpunk.com/Circuit-Forge/turnstone/issues/29 --- app/services/diagnose/classifier.py | 249 ++++++++++++++++++++++++++++ tests/test_diagnose_classifier.py | 245 +++++++++++++++++++++++++++ 2 files changed, 494 insertions(+) create mode 100644 app/services/diagnose/classifier.py create mode 100644 tests/test_diagnose_classifier.py diff --git a/app/services/diagnose/classifier.py b/app/services/diagnose/classifier.py new file mode 100644 index 0000000..b7aa8ed --- /dev/null +++ b/app/services/diagnose/classifier.py @@ -0,0 +1,249 @@ +"""Stage 2: Severity Classifier — ML with two fallback levels. + +Classification strategy (in priority order): + + Path A — ML: Hugging Face text-classification pipeline, loaded lazily. + Path B — pattern_tags: Map cluster.pattern_tags through the loaded pattern + severity dict; pick the highest severity across matching tags. + Path C — regex: Call detect_severity() from app.glean.base on the cluster's + representative_text. + +Each cluster is classified independently. The ``classifier_used`` field on the +returned ``ClassifiedTimeline`` reflects the primary path (the one that governed +the overall classification session, not individual cluster fallbacks). +""" +from __future__ import annotations + +import logging +import os +from pathlib import Path +from typing import Any + +from app.services.diagnose.models import ( + ClassifiedTimeline, + EventCluster, + SeverityLabel, + TimelineResult, +) + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Module-level ML singleton — reset to None between tests via the fixture +# --------------------------------------------------------------------------- + +_ml_classifier: Any | None = None + + +def _get_ml_classifier(model_id: str, device: str) -> Any: + """Return the cached HF pipeline, loading it on first call.""" + global _ml_classifier # noqa: PLW0603 + if _ml_classifier is None: + from transformers import pipeline as hf_pipeline # type: ignore[import-untyped] + + _ml_classifier = hf_pipeline( + "text-classification", model=model_id, device=device + ) + return _ml_classifier + + +# --------------------------------------------------------------------------- +# Label mapping +# --------------------------------------------------------------------------- + +_LABEL_MAP: dict[str, SeverityLabel] = { + "ERROR": "ERROR", + "WARNING": "WARN", + "WARN": "WARN", + "INFO": "INFO", + "DEBUG": "DEBUG", + "CRITICAL": "CRITICAL", +} + +_CRITICAL_KEYWORDS: frozenset[str] = frozenset( + { + "panic", + "oom", + "fatal", + "critical", + "kernel panic", + "out of memory", + "segfault", + "segmentation fault", + } +) + +_SEVERITY_ORDER: dict[str | None, int] = { + "CRITICAL": 5, + "ERROR": 4, + "WARN": 3, + "WARNING": 3, + "INFO": 2, + "DEBUG": 1, + None: 0, +} + + +def _map_label(label: str, score: float, text: str) -> SeverityLabel: + """Apply the severity shim: promote to CRITICAL or demote to DEBUG where warranted.""" + upper = label.upper() + if upper == "ERROR" and score > 0.95 and any( + k in text.lower() for k in _CRITICAL_KEYWORDS + ): + return "CRITICAL" + if upper == "INFO" and score < 0.4: + return "DEBUG" + return _LABEL_MAP.get(upper, "UNKNOWN") # type: ignore[return-value] + + +def _highest_from_tags( + tags: tuple[str, ...], severity_map: dict[str, str] +) -> SeverityLabel | None: + """Return the highest severity from the pattern_tags that appear in severity_map.""" + best: str | None = None + best_rank = -1 + for tag in tags: + sev = severity_map.get(tag) + rank = _SEVERITY_ORDER.get(sev, 0) + if rank > best_rank: + best_rank = rank + best = sev + if best is None: + return None + normalised = "WARN" if best.upper() == "WARNING" else best.upper() + return normalised # type: ignore[return-value] + + +# --------------------------------------------------------------------------- +# SeverityClassifier +# --------------------------------------------------------------------------- + + +class SeverityClassifier: + """Classify each EventCluster's severity using ML, patterns, or regex fallback. + + Parameters + ---------- + model_id: + Hugging Face model identifier. When empty (default), ML is skipped. + device: + Torch device string passed to the HF pipeline (e.g. ``"cpu"`` or ``"cuda:0"``). + pattern_file: + Path to the YAML pattern file. When ``None`` the classifier reads + ``TURNSTONE_PATTERNS`` env var (same logic as ``app/rest.py``). + """ + + def __init__( + self, + model_id: str = "", + device: str = "cpu", + pattern_file: Path | None = None, + ) -> None: + self._model_id = model_id + self._device = device + self._pattern_file: Path | None = pattern_file + self._pattern_severity: dict[str, str] = {} + self._patterns_loaded = False + + # ------------------------------------------------------------------ + # Lazy loaders + # ------------------------------------------------------------------ + + def _resolve_pattern_file(self) -> Path | None: + """Resolve pattern file from constructor arg or env var.""" + if self._pattern_file is not None: + return self._pattern_file + env_dir = os.environ.get("TURNSTONE_PATTERNS") + if env_dir: + return Path(env_dir) / "default.yaml" + return None + + def _ensure_patterns_loaded(self) -> None: + """Populate _pattern_severity from the pattern YAML file (once).""" + if self._patterns_loaded: + return + self._patterns_loaded = True + path = self._resolve_pattern_file() + if path is None: + return + from app.glean.base import load_patterns + + patterns = load_patterns(path) + self._pattern_severity = {p.name: p.severity for p in patterns} + + # ------------------------------------------------------------------ + # Per-cluster classification helpers + # ------------------------------------------------------------------ + + def _classify_cluster_ml(self, cluster: EventCluster) -> SeverityLabel | None: + """Attempt ML classification. Returns None on any inference failure.""" + try: + pipe = _get_ml_classifier(self._model_id, self._device) + results = pipe(cluster.representative_text) + if not results: + return None + hit = results[0] + return _map_label(hit["label"], hit["score"], cluster.representative_text) + except Exception: # noqa: BLE001 + logger.warning( + "ML inference failed for cluster %s — falling back", + cluster.cluster_id, + ) + return None + + def _classify_cluster_pattern_tags( + self, cluster: EventCluster + ) -> SeverityLabel | None: + """Derive severity from the cluster's pattern_tags. Returns None if no match.""" + return _highest_from_tags(cluster.pattern_tags, self._pattern_severity) + + def _classify_cluster_regex(self, cluster: EventCluster) -> SeverityLabel: + """Classify by scanning representative_text with the severity regex.""" + from app.glean.base import detect_severity + + raw = detect_severity(cluster.representative_text) + if raw is None: + return "INFO" + return _LABEL_MAP.get(raw.upper(), "INFO") # type: ignore[return-value] + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def classify(self, timeline: TimelineResult) -> ClassifiedTimeline: + """Classify every cluster in *timeline* and return a ``ClassifiedTimeline``.""" + self._ensure_patterns_loaded() + + # Determine which primary path governs this session + ml_available = bool(self._model_id) + patterns_available = bool(self._pattern_severity) + + if ml_available: + classifier_used: str = "ml" + elif patterns_available: + classifier_used = "pattern_tags" + else: + classifier_used = "regex" + + cluster_severities: dict[str, SeverityLabel] = {} + + for cluster in timeline.clusters: + severity: SeverityLabel | None = None + + if ml_available: + severity = self._classify_cluster_ml(cluster) + + if severity is None and patterns_available: + severity = self._classify_cluster_pattern_tags(cluster) + + if severity is None: + severity = self._classify_cluster_regex(cluster) + + cluster_severities[cluster.cluster_id] = severity + + return ClassifiedTimeline( + timeline=timeline, + cluster_severities=cluster_severities, + classifier_used=classifier_used, # type: ignore[arg-type] + model_id=self._model_id if ml_available else None, + ) diff --git a/tests/test_diagnose_classifier.py b/tests/test_diagnose_classifier.py new file mode 100644 index 0000000..40a1447 --- /dev/null +++ b/tests/test_diagnose_classifier.py @@ -0,0 +1,245 @@ +"""Tests for app/services/diagnose/classifier.py — SeverityClassifier. + +All ML-path tests mock ``transformers.pipeline`` so no model weights are +downloaded during the test suite. +""" +from __future__ import annotations + +from dataclasses import FrozenInstanceError +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest + +import app.services.diagnose.classifier as clf_module +from app.services.diagnose.classifier import SeverityClassifier +from app.services.diagnose.models import ClassifiedTimeline, EventCluster, TimelineResult + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def reset_ml_singleton(): + """Ensure the module-level ML singleton is cleared before and after each test.""" + clf_module._ml_classifier = None + yield + clf_module._ml_classifier = None + + +# --------------------------------------------------------------------------- +# Test-object builders +# --------------------------------------------------------------------------- + + +def _make_cluster( + representative_text: str = "test log", + pattern_tags: tuple[str, ...] = (), + severity: str = "INFO", +) -> EventCluster: + return EventCluster( + cluster_id="abc123", + entries=("e1",), + start_iso=None, + end_iso=None, + duration_seconds=0.0, + source_ids=("src",), + pattern_tags=pattern_tags, + severity=severity, # type: ignore[arg-type] + burst=False, + gap_before_seconds=0.0, + representative_text=representative_text, + ) + + +def _make_timeline(clusters: tuple[EventCluster, ...] = ()) -> TimelineResult: + return TimelineResult( + clusters=clusters, + total_entries=0, + window_start=None, + window_end=None, + gap_count=0, + burst_count=0, + dominant_sources=(), + ) + + +def _mock_hf_pipeline(label: str, score: float) -> MagicMock: + """Return a mock HF pipeline callable that always yields one result.""" + pipe = MagicMock() + pipe.return_value = [{"label": label, "score": score}] + return pipe + + +# --------------------------------------------------------------------------- +# Path A — ML classification +# --------------------------------------------------------------------------- + + +class TestMLPath: + def test_ml_error_maps_to_error(self) -> None: + """ML returning ERROR with score 0.98 → cluster severity ERROR.""" + pipe = _mock_hf_pipeline("ERROR", 0.98) + with patch( + "app.services.diagnose.classifier._get_ml_classifier", return_value=pipe + ): + clf = SeverityClassifier(model_id="fake/model") + result = clf.classify(_make_timeline(((_make_cluster("disk error detected")),))) + + assert result.cluster_severities["abc123"] == "ERROR" + assert result.classifier_used == "ml" + assert result.model_id == "fake/model" + + def test_ml_critical_promotion(self) -> None: + """ERROR + score > 0.95 + 'kernel panic' in text → promoted to CRITICAL.""" + pipe = _mock_hf_pipeline("ERROR", 0.97) + with patch( + "app.services.diagnose.classifier._get_ml_classifier", return_value=pipe + ): + clf = SeverityClassifier(model_id="fake/model") + result = clf.classify( + _make_timeline((_make_cluster("kernel panic: not syncing VFS"),)) + ) + + assert result.cluster_severities["abc123"] == "CRITICAL" + + def test_ml_debug_demotion(self) -> None: + """INFO + score < 0.4 → demoted to DEBUG.""" + pipe = _mock_hf_pipeline("INFO", 0.3) + with patch( + "app.services.diagnose.classifier._get_ml_classifier", return_value=pipe + ): + clf = SeverityClassifier(model_id="fake/model") + result = clf.classify(_make_timeline((_make_cluster("routine ping"),))) + + assert result.cluster_severities["abc123"] == "DEBUG" + + def test_ml_warning_maps_to_warn(self) -> None: + """ML returning WARNING → mapped to WARN.""" + pipe = _mock_hf_pipeline("WARNING", 0.85) + with patch( + "app.services.diagnose.classifier._get_ml_classifier", return_value=pipe + ): + clf = SeverityClassifier(model_id="fake/model") + result = clf.classify(_make_timeline((_make_cluster("low disk space"),))) + + assert result.cluster_severities["abc123"] == "WARN" + + +# --------------------------------------------------------------------------- +# Path B — pattern_tags fallback +# --------------------------------------------------------------------------- + + +class TestPatternTagsPath: + def test_pattern_tags_resolve_error_severity(self, tmp_path: Path) -> None: + """Cluster with pattern_tag 'service_crash_loop' → ERROR from pattern file.""" + pattern_yaml = tmp_path / "default.yaml" + pattern_yaml.write_text( + "patterns:\n" + " - name: service_crash_loop\n" + " pattern: crash\n" + " severity: ERROR\n" + " description: Service crashed in a loop\n" + ) + clf = SeverityClassifier(model_id="", pattern_file=pattern_yaml) + cluster = _make_cluster( + representative_text="service crashed", + pattern_tags=("service_crash_loop",), + ) + result = clf.classify(_make_timeline((cluster,))) + + assert result.cluster_severities["abc123"] == "ERROR" + assert result.classifier_used == "pattern_tags" + assert result.model_id is None + + +# --------------------------------------------------------------------------- +# Path C — regex fallback +# --------------------------------------------------------------------------- + + +class TestRegexPath: + def test_regex_detects_error(self) -> None: + """No ML, no pattern file: 'ERROR: disk full' → ERROR via regex.""" + clf = SeverityClassifier(model_id="") + result = clf.classify( + _make_timeline((_make_cluster("ERROR: disk full"),)) + ) + + assert result.cluster_severities["abc123"] == "ERROR" + assert result.classifier_used == "regex" + + def test_regex_defaults_to_info_when_no_match(self) -> None: + """No severity keyword in text → defaults to INFO.""" + clf = SeverityClassifier(model_id="") + result = clf.classify( + _make_timeline((_make_cluster("mount: disk mounted successfully"),)) + ) + + assert result.cluster_severities["abc123"] == "INFO" + + +# --------------------------------------------------------------------------- +# Fallback behaviour +# --------------------------------------------------------------------------- + + +class TestImportErrorFallback: + def test_transformers_import_error_falls_back_to_pattern_tags( + self, tmp_path: Path + ) -> None: + """ImportError from transformers → clean fallback to pattern_tags path.""" + pattern_yaml = tmp_path / "default.yaml" + pattern_yaml.write_text( + "patterns:\n" + " - name: auth_failure\n" + " pattern: auth\n" + " severity: ERROR\n" + " description: Auth failure\n" + ) + + def _raising_get_ml(*_args: Any, **_kwargs: Any) -> None: + raise ImportError("No module named 'transformers'") + + with patch( + "app.services.diagnose.classifier._get_ml_classifier", + side_effect=_raising_get_ml, + ): + clf = SeverityClassifier(model_id="fake/model", pattern_file=pattern_yaml) + cluster = _make_cluster( + representative_text="auth failed", + pattern_tags=("auth_failure",), + ) + result = clf.classify(_make_timeline((cluster,))) + + # ML was attempted (classifier_used == "ml") but pattern_tags resolved it + assert result.classifier_used == "ml" + assert result.cluster_severities["abc123"] == "ERROR" + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + + +class TestEdgeCases: + def test_empty_timeline_produces_empty_severities(self) -> None: + """TimelineResult with no clusters → empty cluster_severities, no crash.""" + clf = SeverityClassifier(model_id="") + result = clf.classify(_make_timeline()) + + assert isinstance(result, ClassifiedTimeline) + assert result.cluster_severities == {} + assert result.classifier_used == "regex" + + def test_classified_timeline_is_frozen(self) -> None: + """ClassifiedTimeline must be frozen (FrozenInstanceError on mutation).""" + clf = SeverityClassifier(model_id="") + result = clf.classify(_make_timeline((_make_cluster(),))) + + with pytest.raises(FrozenInstanceError): + result.classifier_used = "ml" # type: ignore[misc]