64 lines
1.8 KiB
Python
64 lines
1.8 KiB
Python
"""Blocklist candidate extraction, management, and telemetry matching."""
|
|
from __future__ import annotations
|
|
|
|
import dataclasses
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data models
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class TelemetryRule:
|
|
name: str
|
|
domains: tuple[str, ...]
|
|
category: str
|
|
description: str
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class BlocklistCandidate:
|
|
id: str
|
|
domain_or_ip: str
|
|
source_device_ip: str | None
|
|
source_device_name: str | None
|
|
first_seen: str
|
|
last_seen: str
|
|
hit_count: int
|
|
status: str
|
|
pushed_at: str | None
|
|
log_evidence: list[str]
|
|
matched_rule: str | None
|
|
llm_score: float | None
|
|
llm_reason: str | None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Telemetry list
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_telemetry_rules(path: Path) -> list[TelemetryRule]:
|
|
"""Load telemetry rules from a YAML file."""
|
|
data = yaml.safe_load(path.read_text())
|
|
return [
|
|
TelemetryRule(
|
|
name=r["name"],
|
|
domains=tuple(d.lower().strip(".") for d in r["domains"]),
|
|
category=r["category"],
|
|
description=r.get("description", ""),
|
|
)
|
|
for r in data.get("rules", [])
|
|
]
|
|
|
|
|
|
def matches_telemetry(domain: str, rules: list[TelemetryRule]) -> TelemetryRule | None:
|
|
"""Return the first rule whose domains include domain or a parent domain, else None."""
|
|
d = domain.lower().strip(".")
|
|
for rule in rules:
|
|
for rd in rule.domains:
|
|
if d == rd or d.endswith("." + rd):
|
|
return rule
|
|
return None
|