Merge feat/41-hybrid-bert-shim: Hybrid-BERT label mapping shim (#41)

This commit is contained in:
pyr0ball 2026-06-01 19:59:34 -07:00
commit cfddff6a2a
2 changed files with 81 additions and 4 deletions

View file

@ -62,6 +62,19 @@ _LABEL_MAP: dict[str, SeverityLabel] = {
"CRITICAL": "CRITICAL", "CRITICAL": "CRITICAL",
} }
# Label shim for krishnas4415/log-anomaly-detection-models (Hybrid-BERT, MIT).
# Maps the model's 7-class output vocabulary to Turnstone SeverityLabel.
# Checked against the model config.json — labels confirmed in turnstone#41.
_HYBRID_BERT_LABEL_MAP: dict[str, SeverityLabel] = {
"NORMAL": "INFO",
"SECURITY_ANOMALY": "ERROR",
"SYSTEM_FAILURE": "CRITICAL",
"PERFORMANCE_ISSUE": "WARN",
"NETWORK_ANOMALY": "WARN",
"CONFIG_ERROR": "ERROR",
"HARDWARE_ISSUE": "CRITICAL",
}
_CRITICAL_KEYWORDS: frozenset[str] = frozenset( _CRITICAL_KEYWORDS: frozenset[str] = frozenset(
{ {
"panic", "panic",
@ -87,15 +100,25 @@ _SEVERITY_ORDER: dict[str | None, int] = {
def _map_label(label: str, score: float, text: str) -> SeverityLabel: def _map_label(label: str, score: float, text: str) -> SeverityLabel:
"""Apply the severity shim: promote to CRITICAL or demote to DEBUG where warranted.""" """Translate a raw model output label to a Turnstone SeverityLabel.
Handles two model vocabularies:
- Standard (ERROR/WARN/INFO/CRITICAL/DEBUG) byviz/bylastic_classification_logs
- Hybrid-BERT (normal/security_anomaly/) krishnas4415/log-anomaly-detection-models
Applies keyword-based CRITICAL promotion and low-confidence DEBUG demotion
on top of the base mapping.
"""
upper = label.upper() upper = label.upper()
if upper == "ERROR" and score > 0.95 and any( # Resolve via Hybrid-BERT map first, then standard map, then UNKNOWN.
base: SeverityLabel = _HYBRID_BERT_LABEL_MAP.get(upper) or _LABEL_MAP.get(upper, "UNKNOWN") # type: ignore[assignment]
if base == "ERROR" and score > 0.95 and any(
k in text.lower() for k in _CRITICAL_KEYWORDS k in text.lower() for k in _CRITICAL_KEYWORDS
): ):
return "CRITICAL" return "CRITICAL"
if upper == "INFO" and score < 0.4: if base == "INFO" and score < 0.4:
return "DEBUG" return "DEBUG"
return _LABEL_MAP.get(upper, "UNKNOWN") # type: ignore[return-value] return base
def _highest_from_tags( def _highest_from_tags(

View file

@ -243,3 +243,57 @@ class TestEdgeCases:
with pytest.raises(FrozenInstanceError): with pytest.raises(FrozenInstanceError):
result.classifier_used = "ml" # type: ignore[misc] result.classifier_used = "ml" # type: ignore[misc]
# ---------------------------------------------------------------------------
# Hybrid-BERT label mapping shim (turnstone#41)
# ---------------------------------------------------------------------------
class TestHybridBertLabelMap:
"""_map_label must translate Hybrid-BERT vocabulary to SeverityLabel."""
def _run(self, label: str, score: float = 0.9, text: str = "log line") -> str:
from app.services.diagnose.classifier import _map_label
return _map_label(label, score, text)
def test_normal_maps_to_info(self) -> None:
assert self._run("normal") == "INFO"
def test_security_anomaly_maps_to_error(self) -> None:
assert self._run("security_anomaly") == "ERROR"
def test_system_failure_maps_to_critical(self) -> None:
assert self._run("system_failure") == "CRITICAL"
def test_performance_issue_maps_to_warn(self) -> None:
assert self._run("performance_issue") == "WARN"
def test_network_anomaly_maps_to_warn(self) -> None:
assert self._run("network_anomaly") == "WARN"
def test_config_error_maps_to_error(self) -> None:
assert self._run("config_error") == "ERROR"
def test_hardware_issue_maps_to_critical(self) -> None:
assert self._run("hardware_issue") == "CRITICAL"
def test_hybrid_bert_labels_are_case_insensitive(self) -> None:
from app.services.diagnose.classifier import _map_label
assert _map_label("SECURITY_ANOMALY", 0.9, "x") == "ERROR"
assert _map_label("Security_Anomaly", 0.9, "x") == "ERROR"
def test_system_failure_critical_promotion_not_doubled(self) -> None:
"""system_failure already maps to CRITICAL — keyword promotion is a no-op."""
assert self._run("system_failure", score=0.99, text="kernel panic") == "CRITICAL"
def test_normal_low_confidence_demotes_to_debug(self) -> None:
"""normal + low score → INFO base → DEBUG (same demotion rule as INFO)."""
assert self._run("normal", score=0.2) == "DEBUG"
def test_standard_labels_still_work(self) -> None:
"""Existing standard-vocabulary labels must not be broken by the shim."""
from app.services.diagnose.classifier import _map_label
assert _map_label("ERROR", 0.9, "x") == "ERROR"
assert _map_label("WARNING", 0.9, "x") == "WARN"
assert _map_label("CRITICAL", 0.9, "x") == "CRITICAL"