feat: Stage 2 — SeverityClassifier for multi-agent diagnose pipeline (issue #29)
Three-path classification: ML (transformers pipeline, lazy singleton) → pattern_tags (YAML pattern severity dict) → regex (detect_severity). - Path A: HF text-classification pipeline loaded lazily on first classify() call via module-level singleton; shim promotes ERROR+keyword hits to CRITICAL and demotes low-confidence INFO to DEBUG. - Path B: maps cluster.pattern_tags through the loaded pattern severity dict; picks the highest severity across matching tags. - Path C: falls back to detect_severity() regex scan on representative_text; defaults to INFO when no keyword matches. - Pattern file resolved from constructor arg or TURNSTONE_PATTERNS env var (mirrors app/rest.py convention). - No crash when transformers is not installed; ImportError on per-cluster ML inference triggers clean per-cluster fallback to pattern_tags/regex. - ClassifiedTimeline.classifier_used reflects the primary session path. Tests (10 new, 328 total, all passing): - ML ERROR, CRITICAL promotion, DEBUG demotion, WARNING→WARN - pattern_tags resolution from YAML fixture - regex ERROR detection and INFO default - ImportError clean fallback - empty timeline no-crash - ClassifiedTimeline FrozenInstanceError on mutation Closes: #29
This commit is contained in:
parent
7abb76e628
commit
6ea8fbfec1
2 changed files with 494 additions and 0 deletions
249
app/services/diagnose/classifier.py
Normal file
249
app/services/diagnose/classifier.py
Normal file
|
|
@ -0,0 +1,249 @@
|
|||
"""Stage 2: Severity Classifier — ML with two fallback levels.
|
||||
|
||||
Classification strategy (in priority order):
|
||||
|
||||
Path A — ML: Hugging Face text-classification pipeline, loaded lazily.
|
||||
Path B — pattern_tags: Map cluster.pattern_tags through the loaded pattern
|
||||
severity dict; pick the highest severity across matching tags.
|
||||
Path C — regex: Call detect_severity() from app.glean.base on the cluster's
|
||||
representative_text.
|
||||
|
||||
Each cluster is classified independently. The ``classifier_used`` field on the
|
||||
returned ``ClassifiedTimeline`` reflects the primary path (the one that governed
|
||||
the overall classification session, not individual cluster fallbacks).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from app.services.diagnose.models import (
|
||||
ClassifiedTimeline,
|
||||
EventCluster,
|
||||
SeverityLabel,
|
||||
TimelineResult,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Module-level ML singleton — reset to None between tests via the fixture
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_ml_classifier: Any | None = None
|
||||
|
||||
|
||||
def _get_ml_classifier(model_id: str, device: str) -> Any:
|
||||
"""Return the cached HF pipeline, loading it on first call."""
|
||||
global _ml_classifier # noqa: PLW0603
|
||||
if _ml_classifier is None:
|
||||
from transformers import pipeline as hf_pipeline # type: ignore[import-untyped]
|
||||
|
||||
_ml_classifier = hf_pipeline(
|
||||
"text-classification", model=model_id, device=device
|
||||
)
|
||||
return _ml_classifier
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Label mapping
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_LABEL_MAP: dict[str, SeverityLabel] = {
|
||||
"ERROR": "ERROR",
|
||||
"WARNING": "WARN",
|
||||
"WARN": "WARN",
|
||||
"INFO": "INFO",
|
||||
"DEBUG": "DEBUG",
|
||||
"CRITICAL": "CRITICAL",
|
||||
}
|
||||
|
||||
_CRITICAL_KEYWORDS: frozenset[str] = frozenset(
|
||||
{
|
||||
"panic",
|
||||
"oom",
|
||||
"fatal",
|
||||
"critical",
|
||||
"kernel panic",
|
||||
"out of memory",
|
||||
"segfault",
|
||||
"segmentation fault",
|
||||
}
|
||||
)
|
||||
|
||||
_SEVERITY_ORDER: dict[str | None, int] = {
|
||||
"CRITICAL": 5,
|
||||
"ERROR": 4,
|
||||
"WARN": 3,
|
||||
"WARNING": 3,
|
||||
"INFO": 2,
|
||||
"DEBUG": 1,
|
||||
None: 0,
|
||||
}
|
||||
|
||||
|
||||
def _map_label(label: str, score: float, text: str) -> SeverityLabel:
|
||||
"""Apply the severity shim: promote to CRITICAL or demote to DEBUG where warranted."""
|
||||
upper = label.upper()
|
||||
if upper == "ERROR" and score > 0.95 and any(
|
||||
k in text.lower() for k in _CRITICAL_KEYWORDS
|
||||
):
|
||||
return "CRITICAL"
|
||||
if upper == "INFO" and score < 0.4:
|
||||
return "DEBUG"
|
||||
return _LABEL_MAP.get(upper, "UNKNOWN") # type: ignore[return-value]
|
||||
|
||||
|
||||
def _highest_from_tags(
|
||||
tags: tuple[str, ...], severity_map: dict[str, str]
|
||||
) -> SeverityLabel | None:
|
||||
"""Return the highest severity from the pattern_tags that appear in severity_map."""
|
||||
best: str | None = None
|
||||
best_rank = -1
|
||||
for tag in tags:
|
||||
sev = severity_map.get(tag)
|
||||
rank = _SEVERITY_ORDER.get(sev, 0)
|
||||
if rank > best_rank:
|
||||
best_rank = rank
|
||||
best = sev
|
||||
if best is None:
|
||||
return None
|
||||
normalised = "WARN" if best.upper() == "WARNING" else best.upper()
|
||||
return normalised # type: ignore[return-value]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SeverityClassifier
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class SeverityClassifier:
|
||||
"""Classify each EventCluster's severity using ML, patterns, or regex fallback.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
model_id:
|
||||
Hugging Face model identifier. When empty (default), ML is skipped.
|
||||
device:
|
||||
Torch device string passed to the HF pipeline (e.g. ``"cpu"`` or ``"cuda:0"``).
|
||||
pattern_file:
|
||||
Path to the YAML pattern file. When ``None`` the classifier reads
|
||||
``TURNSTONE_PATTERNS`` env var (same logic as ``app/rest.py``).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model_id: str = "",
|
||||
device: str = "cpu",
|
||||
pattern_file: Path | None = None,
|
||||
) -> None:
|
||||
self._model_id = model_id
|
||||
self._device = device
|
||||
self._pattern_file: Path | None = pattern_file
|
||||
self._pattern_severity: dict[str, str] = {}
|
||||
self._patterns_loaded = False
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Lazy loaders
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _resolve_pattern_file(self) -> Path | None:
|
||||
"""Resolve pattern file from constructor arg or env var."""
|
||||
if self._pattern_file is not None:
|
||||
return self._pattern_file
|
||||
env_dir = os.environ.get("TURNSTONE_PATTERNS")
|
||||
if env_dir:
|
||||
return Path(env_dir) / "default.yaml"
|
||||
return None
|
||||
|
||||
def _ensure_patterns_loaded(self) -> None:
|
||||
"""Populate _pattern_severity from the pattern YAML file (once)."""
|
||||
if self._patterns_loaded:
|
||||
return
|
||||
self._patterns_loaded = True
|
||||
path = self._resolve_pattern_file()
|
||||
if path is None:
|
||||
return
|
||||
from app.glean.base import load_patterns
|
||||
|
||||
patterns = load_patterns(path)
|
||||
self._pattern_severity = {p.name: p.severity for p in patterns}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Per-cluster classification helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _classify_cluster_ml(self, cluster: EventCluster) -> SeverityLabel | None:
|
||||
"""Attempt ML classification. Returns None on any inference failure."""
|
||||
try:
|
||||
pipe = _get_ml_classifier(self._model_id, self._device)
|
||||
results = pipe(cluster.representative_text)
|
||||
if not results:
|
||||
return None
|
||||
hit = results[0]
|
||||
return _map_label(hit["label"], hit["score"], cluster.representative_text)
|
||||
except Exception: # noqa: BLE001
|
||||
logger.warning(
|
||||
"ML inference failed for cluster %s — falling back",
|
||||
cluster.cluster_id,
|
||||
)
|
||||
return None
|
||||
|
||||
def _classify_cluster_pattern_tags(
|
||||
self, cluster: EventCluster
|
||||
) -> SeverityLabel | None:
|
||||
"""Derive severity from the cluster's pattern_tags. Returns None if no match."""
|
||||
return _highest_from_tags(cluster.pattern_tags, self._pattern_severity)
|
||||
|
||||
def _classify_cluster_regex(self, cluster: EventCluster) -> SeverityLabel:
|
||||
"""Classify by scanning representative_text with the severity regex."""
|
||||
from app.glean.base import detect_severity
|
||||
|
||||
raw = detect_severity(cluster.representative_text)
|
||||
if raw is None:
|
||||
return "INFO"
|
||||
return _LABEL_MAP.get(raw.upper(), "INFO") # type: ignore[return-value]
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public API
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def classify(self, timeline: TimelineResult) -> ClassifiedTimeline:
|
||||
"""Classify every cluster in *timeline* and return a ``ClassifiedTimeline``."""
|
||||
self._ensure_patterns_loaded()
|
||||
|
||||
# Determine which primary path governs this session
|
||||
ml_available = bool(self._model_id)
|
||||
patterns_available = bool(self._pattern_severity)
|
||||
|
||||
if ml_available:
|
||||
classifier_used: str = "ml"
|
||||
elif patterns_available:
|
||||
classifier_used = "pattern_tags"
|
||||
else:
|
||||
classifier_used = "regex"
|
||||
|
||||
cluster_severities: dict[str, SeverityLabel] = {}
|
||||
|
||||
for cluster in timeline.clusters:
|
||||
severity: SeverityLabel | None = None
|
||||
|
||||
if ml_available:
|
||||
severity = self._classify_cluster_ml(cluster)
|
||||
|
||||
if severity is None and patterns_available:
|
||||
severity = self._classify_cluster_pattern_tags(cluster)
|
||||
|
||||
if severity is None:
|
||||
severity = self._classify_cluster_regex(cluster)
|
||||
|
||||
cluster_severities[cluster.cluster_id] = severity
|
||||
|
||||
return ClassifiedTimeline(
|
||||
timeline=timeline,
|
||||
cluster_severities=cluster_severities,
|
||||
classifier_used=classifier_used, # type: ignore[arg-type]
|
||||
model_id=self._model_id if ml_available else None,
|
||||
)
|
||||
245
tests/test_diagnose_classifier.py
Normal file
245
tests/test_diagnose_classifier.py
Normal file
|
|
@ -0,0 +1,245 @@
|
|||
"""Tests for app/services/diagnose/classifier.py — SeverityClassifier.
|
||||
|
||||
All ML-path tests mock ``transformers.pipeline`` so no model weights are
|
||||
downloaded during the test suite.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import FrozenInstanceError
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
import app.services.diagnose.classifier as clf_module
|
||||
from app.services.diagnose.classifier import SeverityClassifier
|
||||
from app.services.diagnose.models import ClassifiedTimeline, EventCluster, TimelineResult
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_ml_singleton():
|
||||
"""Ensure the module-level ML singleton is cleared before and after each test."""
|
||||
clf_module._ml_classifier = None
|
||||
yield
|
||||
clf_module._ml_classifier = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test-object builders
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_cluster(
|
||||
representative_text: str = "test log",
|
||||
pattern_tags: tuple[str, ...] = (),
|
||||
severity: str = "INFO",
|
||||
) -> EventCluster:
|
||||
return EventCluster(
|
||||
cluster_id="abc123",
|
||||
entries=("e1",),
|
||||
start_iso=None,
|
||||
end_iso=None,
|
||||
duration_seconds=0.0,
|
||||
source_ids=("src",),
|
||||
pattern_tags=pattern_tags,
|
||||
severity=severity, # type: ignore[arg-type]
|
||||
burst=False,
|
||||
gap_before_seconds=0.0,
|
||||
representative_text=representative_text,
|
||||
)
|
||||
|
||||
|
||||
def _make_timeline(clusters: tuple[EventCluster, ...] = ()) -> TimelineResult:
|
||||
return TimelineResult(
|
||||
clusters=clusters,
|
||||
total_entries=0,
|
||||
window_start=None,
|
||||
window_end=None,
|
||||
gap_count=0,
|
||||
burst_count=0,
|
||||
dominant_sources=(),
|
||||
)
|
||||
|
||||
|
||||
def _mock_hf_pipeline(label: str, score: float) -> MagicMock:
|
||||
"""Return a mock HF pipeline callable that always yields one result."""
|
||||
pipe = MagicMock()
|
||||
pipe.return_value = [{"label": label, "score": score}]
|
||||
return pipe
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Path A — ML classification
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMLPath:
|
||||
def test_ml_error_maps_to_error(self) -> None:
|
||||
"""ML returning ERROR with score 0.98 → cluster severity ERROR."""
|
||||
pipe = _mock_hf_pipeline("ERROR", 0.98)
|
||||
with patch(
|
||||
"app.services.diagnose.classifier._get_ml_classifier", return_value=pipe
|
||||
):
|
||||
clf = SeverityClassifier(model_id="fake/model")
|
||||
result = clf.classify(_make_timeline(((_make_cluster("disk error detected")),)))
|
||||
|
||||
assert result.cluster_severities["abc123"] == "ERROR"
|
||||
assert result.classifier_used == "ml"
|
||||
assert result.model_id == "fake/model"
|
||||
|
||||
def test_ml_critical_promotion(self) -> None:
|
||||
"""ERROR + score > 0.95 + 'kernel panic' in text → promoted to CRITICAL."""
|
||||
pipe = _mock_hf_pipeline("ERROR", 0.97)
|
||||
with patch(
|
||||
"app.services.diagnose.classifier._get_ml_classifier", return_value=pipe
|
||||
):
|
||||
clf = SeverityClassifier(model_id="fake/model")
|
||||
result = clf.classify(
|
||||
_make_timeline((_make_cluster("kernel panic: not syncing VFS"),))
|
||||
)
|
||||
|
||||
assert result.cluster_severities["abc123"] == "CRITICAL"
|
||||
|
||||
def test_ml_debug_demotion(self) -> None:
|
||||
"""INFO + score < 0.4 → demoted to DEBUG."""
|
||||
pipe = _mock_hf_pipeline("INFO", 0.3)
|
||||
with patch(
|
||||
"app.services.diagnose.classifier._get_ml_classifier", return_value=pipe
|
||||
):
|
||||
clf = SeverityClassifier(model_id="fake/model")
|
||||
result = clf.classify(_make_timeline((_make_cluster("routine ping"),)))
|
||||
|
||||
assert result.cluster_severities["abc123"] == "DEBUG"
|
||||
|
||||
def test_ml_warning_maps_to_warn(self) -> None:
|
||||
"""ML returning WARNING → mapped to WARN."""
|
||||
pipe = _mock_hf_pipeline("WARNING", 0.85)
|
||||
with patch(
|
||||
"app.services.diagnose.classifier._get_ml_classifier", return_value=pipe
|
||||
):
|
||||
clf = SeverityClassifier(model_id="fake/model")
|
||||
result = clf.classify(_make_timeline((_make_cluster("low disk space"),)))
|
||||
|
||||
assert result.cluster_severities["abc123"] == "WARN"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Path B — pattern_tags fallback
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPatternTagsPath:
|
||||
def test_pattern_tags_resolve_error_severity(self, tmp_path: Path) -> None:
|
||||
"""Cluster with pattern_tag 'service_crash_loop' → ERROR from pattern file."""
|
||||
pattern_yaml = tmp_path / "default.yaml"
|
||||
pattern_yaml.write_text(
|
||||
"patterns:\n"
|
||||
" - name: service_crash_loop\n"
|
||||
" pattern: crash\n"
|
||||
" severity: ERROR\n"
|
||||
" description: Service crashed in a loop\n"
|
||||
)
|
||||
clf = SeverityClassifier(model_id="", pattern_file=pattern_yaml)
|
||||
cluster = _make_cluster(
|
||||
representative_text="service crashed",
|
||||
pattern_tags=("service_crash_loop",),
|
||||
)
|
||||
result = clf.classify(_make_timeline((cluster,)))
|
||||
|
||||
assert result.cluster_severities["abc123"] == "ERROR"
|
||||
assert result.classifier_used == "pattern_tags"
|
||||
assert result.model_id is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Path C — regex fallback
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRegexPath:
|
||||
def test_regex_detects_error(self) -> None:
|
||||
"""No ML, no pattern file: 'ERROR: disk full' → ERROR via regex."""
|
||||
clf = SeverityClassifier(model_id="")
|
||||
result = clf.classify(
|
||||
_make_timeline((_make_cluster("ERROR: disk full"),))
|
||||
)
|
||||
|
||||
assert result.cluster_severities["abc123"] == "ERROR"
|
||||
assert result.classifier_used == "regex"
|
||||
|
||||
def test_regex_defaults_to_info_when_no_match(self) -> None:
|
||||
"""No severity keyword in text → defaults to INFO."""
|
||||
clf = SeverityClassifier(model_id="")
|
||||
result = clf.classify(
|
||||
_make_timeline((_make_cluster("mount: disk mounted successfully"),))
|
||||
)
|
||||
|
||||
assert result.cluster_severities["abc123"] == "INFO"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fallback behaviour
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestImportErrorFallback:
|
||||
def test_transformers_import_error_falls_back_to_pattern_tags(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
"""ImportError from transformers → clean fallback to pattern_tags path."""
|
||||
pattern_yaml = tmp_path / "default.yaml"
|
||||
pattern_yaml.write_text(
|
||||
"patterns:\n"
|
||||
" - name: auth_failure\n"
|
||||
" pattern: auth\n"
|
||||
" severity: ERROR\n"
|
||||
" description: Auth failure\n"
|
||||
)
|
||||
|
||||
def _raising_get_ml(*_args: Any, **_kwargs: Any) -> None:
|
||||
raise ImportError("No module named 'transformers'")
|
||||
|
||||
with patch(
|
||||
"app.services.diagnose.classifier._get_ml_classifier",
|
||||
side_effect=_raising_get_ml,
|
||||
):
|
||||
clf = SeverityClassifier(model_id="fake/model", pattern_file=pattern_yaml)
|
||||
cluster = _make_cluster(
|
||||
representative_text="auth failed",
|
||||
pattern_tags=("auth_failure",),
|
||||
)
|
||||
result = clf.classify(_make_timeline((cluster,)))
|
||||
|
||||
# ML was attempted (classifier_used == "ml") but pattern_tags resolved it
|
||||
assert result.classifier_used == "ml"
|
||||
assert result.cluster_severities["abc123"] == "ERROR"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Edge cases
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
def test_empty_timeline_produces_empty_severities(self) -> None:
|
||||
"""TimelineResult with no clusters → empty cluster_severities, no crash."""
|
||||
clf = SeverityClassifier(model_id="")
|
||||
result = clf.classify(_make_timeline())
|
||||
|
||||
assert isinstance(result, ClassifiedTimeline)
|
||||
assert result.cluster_severities == {}
|
||||
assert result.classifier_used == "regex"
|
||||
|
||||
def test_classified_timeline_is_frozen(self) -> None:
|
||||
"""ClassifiedTimeline must be frozen (FrozenInstanceError on mutation)."""
|
||||
clf = SeverityClassifier(model_id="")
|
||||
result = clf.classify(_make_timeline((_make_cluster(),)))
|
||||
|
||||
with pytest.raises(FrozenInstanceError):
|
||||
result.classifier_used = "ml" # type: ignore[misc]
|
||||
Loading…
Reference in a new issue