"""Blocklist candidate extraction, management, and telemetry matching.""" from __future__ import annotations import dataclasses from pathlib import Path import yaml # --------------------------------------------------------------------------- # Data models # --------------------------------------------------------------------------- @dataclasses.dataclass(frozen=True) class TelemetryRule: name: str domains: tuple[str, ...] category: str description: str @dataclasses.dataclass class BlocklistCandidate: id: str domain_or_ip: str source_device_ip: str | None source_device_name: str | None first_seen: str last_seen: str hit_count: int status: str pushed_at: str | None log_evidence: list[str] matched_rule: str | None llm_score: float | None llm_reason: str | None # --------------------------------------------------------------------------- # Telemetry list # --------------------------------------------------------------------------- def load_telemetry_rules(path: Path) -> list[TelemetryRule]: """Load telemetry rules from a YAML file.""" data = yaml.safe_load(path.read_text()) return [ TelemetryRule( name=r["name"], domains=tuple(d.lower().strip(".") for d in r["domains"]), category=r["category"], description=r.get("description", ""), ) for r in data.get("rules", []) ] def matches_telemetry(domain: str, rules: list[TelemetryRule]) -> TelemetryRule | None: """Return the first rule whose domains include domain or a parent domain, else None.""" d = domain.lower().strip(".") for rule in rules: for rd in rule.domains: if d == rd or d.endswith("." + rd): return rule return None