turnstone/app/services/blocklist.py
pyr0ball 8832061de2 feat(blocklist): telemetry YAML list + loader + domain matcher
Adds patterns/telemetry.yaml with 6 rule groups (samsung, belkin, roku, lg, amazon, advertising).
Adds app/services/blocklist.py with TelemetryRule and BlocklistCandidate dataclasses, load_telemetry_rules(), and matches_telemetry() with exact and subdomain matching.
6 new TestTelemetry tests pass; 199 total passing.
2026-05-15 20:54:40 -07:00

69 lines
1.9 KiB
Python

"""Blocklist candidate extraction, management, and telemetry matching."""
from __future__ import annotations
import dataclasses
import json
import re
import sqlite3
import uuid
from datetime import datetime, timezone
from pathlib import Path
import yaml
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclasses.dataclass(frozen=True)
class TelemetryRule:
name: str
domains: tuple[str, ...]
category: str
description: str
@dataclasses.dataclass
class BlocklistCandidate:
id: str
domain_or_ip: str
source_device_ip: str | None
source_device_name: str | None
first_seen: str
last_seen: str
hit_count: int
status: str
pushed_at: str | None
log_evidence: list[str]
matched_rule: str | None
llm_score: float | None
llm_reason: str | None
# ---------------------------------------------------------------------------
# Telemetry list
# ---------------------------------------------------------------------------
def load_telemetry_rules(path: Path) -> list[TelemetryRule]:
"""Load telemetry rules from a YAML file."""
data = yaml.safe_load(path.read_text())
return [
TelemetryRule(
name=r["name"],
domains=tuple(d.lower().strip(".") for d in r["domains"]),
category=r["category"],
description=r.get("description", ""),
)
for r in data.get("rules", [])
]
def matches_telemetry(domain: str, rules: list[TelemetryRule]) -> TelemetryRule | None:
"""Return the first rule whose domains include domain or a parent domain, else None."""
d = domain.lower().strip(".")
for rule in rules:
for rd in rule.domains:
if d == rd or d.endswith("." + rd):
return rule
return None