feat(classifier): add Hybrid-BERT label mapping shim (#41)

Adds _HYBRID_BERT_LABEL_MAP to translate the 7-class output vocabulary of
krishnas4415/log-anomaly-detection-models (Hybrid-BERT, MIT) to Turnstone
SeverityLabel. _map_label now checks the Hybrid-BERT map before the standard
map so either model family works via TURNSTONE_CLASSIFIER_MODEL without any
additional code path.

Mapping (confirmed from model config.json):
  normal            → INFO
  security_anomaly  → ERROR
  system_failure    → CRITICAL
  performance_issue → WARN
  network_anomaly   → WARN
  config_error      → ERROR
  hardware_issue    → CRITICAL

Keyword-based CRITICAL promotion and low-confidence DEBUG demotion apply on
top of the base mapping (same rules as the standard vocabulary).

11 new tests covering all 7 Hybrid-BERT labels, case-insensitivity, and
regression on standard-vocabulary labels. 372 tests passing total.

Note: custom loading code for the non-standard .pt checkpoint format is
explicitly out of scope — evaluate better-packaged HF alternatives first
(see #41 for candidate list).

Closes: #41
This commit is contained in:
pyr0ball 2026-06-01 16:20:31 -07:00
parent 1131816666
commit 503a36d76c
2 changed files with 81 additions and 4 deletions

View file

@ -62,6 +62,19 @@ _LABEL_MAP: dict[str, SeverityLabel] = {
"CRITICAL": "CRITICAL",
}
# Label shim for krishnas4415/log-anomaly-detection-models (Hybrid-BERT, MIT).
# Maps the model's 7-class output vocabulary to Turnstone SeverityLabel.
# Checked against the model config.json — labels confirmed in turnstone#41.
_HYBRID_BERT_LABEL_MAP: dict[str, SeverityLabel] = {
"NORMAL": "INFO",
"SECURITY_ANOMALY": "ERROR",
"SYSTEM_FAILURE": "CRITICAL",
"PERFORMANCE_ISSUE": "WARN",
"NETWORK_ANOMALY": "WARN",
"CONFIG_ERROR": "ERROR",
"HARDWARE_ISSUE": "CRITICAL",
}
_CRITICAL_KEYWORDS: frozenset[str] = frozenset(
{
"panic",
@ -87,15 +100,25 @@ _SEVERITY_ORDER: dict[str | None, int] = {
def _map_label(label: str, score: float, text: str) -> SeverityLabel:
"""Apply the severity shim: promote to CRITICAL or demote to DEBUG where warranted."""
"""Translate a raw model output label to a Turnstone SeverityLabel.
Handles two model vocabularies:
- Standard (ERROR/WARN/INFO/CRITICAL/DEBUG) byviz/bylastic_classification_logs
- Hybrid-BERT (normal/security_anomaly/) krishnas4415/log-anomaly-detection-models
Applies keyword-based CRITICAL promotion and low-confidence DEBUG demotion
on top of the base mapping.
"""
upper = label.upper()
if upper == "ERROR" and score > 0.95 and any(
# Resolve via Hybrid-BERT map first, then standard map, then UNKNOWN.
base: SeverityLabel = _HYBRID_BERT_LABEL_MAP.get(upper) or _LABEL_MAP.get(upper, "UNKNOWN") # type: ignore[assignment]
if base == "ERROR" and score > 0.95 and any(
k in text.lower() for k in _CRITICAL_KEYWORDS
):
return "CRITICAL"
if upper == "INFO" and score < 0.4:
if base == "INFO" and score < 0.4:
return "DEBUG"
return _LABEL_MAP.get(upper, "UNKNOWN") # type: ignore[return-value]
return base
def _highest_from_tags(

View file

@ -243,3 +243,57 @@ class TestEdgeCases:
with pytest.raises(FrozenInstanceError):
result.classifier_used = "ml" # type: ignore[misc]
# ---------------------------------------------------------------------------
# Hybrid-BERT label mapping shim (turnstone#41)
# ---------------------------------------------------------------------------
class TestHybridBertLabelMap:
"""_map_label must translate Hybrid-BERT vocabulary to SeverityLabel."""
def _run(self, label: str, score: float = 0.9, text: str = "log line") -> str:
from app.services.diagnose.classifier import _map_label
return _map_label(label, score, text)
def test_normal_maps_to_info(self) -> None:
assert self._run("normal") == "INFO"
def test_security_anomaly_maps_to_error(self) -> None:
assert self._run("security_anomaly") == "ERROR"
def test_system_failure_maps_to_critical(self) -> None:
assert self._run("system_failure") == "CRITICAL"
def test_performance_issue_maps_to_warn(self) -> None:
assert self._run("performance_issue") == "WARN"
def test_network_anomaly_maps_to_warn(self) -> None:
assert self._run("network_anomaly") == "WARN"
def test_config_error_maps_to_error(self) -> None:
assert self._run("config_error") == "ERROR"
def test_hardware_issue_maps_to_critical(self) -> None:
assert self._run("hardware_issue") == "CRITICAL"
def test_hybrid_bert_labels_are_case_insensitive(self) -> None:
from app.services.diagnose.classifier import _map_label
assert _map_label("SECURITY_ANOMALY", 0.9, "x") == "ERROR"
assert _map_label("Security_Anomaly", 0.9, "x") == "ERROR"
def test_system_failure_critical_promotion_not_doubled(self) -> None:
"""system_failure already maps to CRITICAL — keyword promotion is a no-op."""
assert self._run("system_failure", score=0.99, text="kernel panic") == "CRITICAL"
def test_normal_low_confidence_demotes_to_debug(self) -> None:
"""normal + low score → INFO base → DEBUG (same demotion rule as INFO)."""
assert self._run("normal", score=0.2) == "DEBUG"
def test_standard_labels_still_work(self) -> None:
"""Existing standard-vocabulary labels must not be broken by the shim."""
from app.services.diagnose.classifier import _map_label
assert _map_label("ERROR", 0.9, "x") == "ERROR"
assert _map_label("WARNING", 0.9, "x") == "WARN"
assert _map_label("CRITICAL", 0.9, "x") == "CRITICAL"