feat(blocklist): telemetry YAML list + loader + domain matcher
Adds patterns/telemetry.yaml with 6 rule groups (samsung, belkin, roku, lg, amazon, advertising). Adds app/services/blocklist.py with TelemetryRule and BlocklistCandidate dataclasses, load_telemetry_rules(), and matches_telemetry() with exact and subdomain matching. 6 new TestTelemetry tests pass; 199 total passing.
This commit is contained in:
parent
4d7c436721
commit
f469692c52
3 changed files with 158 additions and 0 deletions
69
app/services/blocklist.py
Normal file
69
app/services/blocklist.py
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
"""Blocklist candidate extraction, management, and telemetry matching."""
|
||||
from __future__ import annotations
|
||||
|
||||
import dataclasses
|
||||
import json
|
||||
import re
|
||||
import sqlite3
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data models
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class TelemetryRule:
|
||||
name: str
|
||||
domains: tuple[str, ...]
|
||||
category: str
|
||||
description: str
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class BlocklistCandidate:
|
||||
id: str
|
||||
domain_or_ip: str
|
||||
source_device_ip: str | None
|
||||
source_device_name: str | None
|
||||
first_seen: str
|
||||
last_seen: str
|
||||
hit_count: int
|
||||
status: str
|
||||
pushed_at: str | None
|
||||
log_evidence: list[str]
|
||||
matched_rule: str | None
|
||||
llm_score: float | None
|
||||
llm_reason: str | None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Telemetry list
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_telemetry_rules(path: Path) -> list[TelemetryRule]:
|
||||
"""Load telemetry rules from a YAML file."""
|
||||
data = yaml.safe_load(path.read_text())
|
||||
return [
|
||||
TelemetryRule(
|
||||
name=r["name"],
|
||||
domains=tuple(d.lower().strip(".") for d in r["domains"]),
|
||||
category=r["category"],
|
||||
description=r.get("description", ""),
|
||||
)
|
||||
for r in data.get("rules", [])
|
||||
]
|
||||
|
||||
|
||||
def matches_telemetry(domain: str, rules: list[TelemetryRule]) -> TelemetryRule | None:
|
||||
"""Return the first rule whose domains include domain or a parent domain, else None."""
|
||||
d = domain.lower().strip(".")
|
||||
for rule in rules:
|
||||
for rd in rule.domains:
|
||||
if d == rd or d.endswith("." + rd):
|
||||
return rule
|
||||
return None
|
||||
46
patterns/telemetry.yaml
Normal file
46
patterns/telemetry.yaml
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
version: 1
|
||||
rules:
|
||||
- name: samsung_ads
|
||||
domains:
|
||||
- samsungads.com
|
||||
- samsungcloudsolution.com
|
||||
- samsungrm.net
|
||||
- samsungacr.com
|
||||
category: samsung
|
||||
description: Samsung Smart TV advertising and telemetry
|
||||
|
||||
- name: belkin_wemo
|
||||
domains:
|
||||
- api.xbcs.net
|
||||
- wemo.belkin.com
|
||||
- statistics.belkin.com
|
||||
category: belkin
|
||||
description: Belkin/WeMo smart device telemetry
|
||||
|
||||
- name: roku_telemetry
|
||||
domains:
|
||||
- logs.roku.com
|
||||
- scribe.logs.roku.com
|
||||
category: roku
|
||||
description: Roku device telemetry
|
||||
|
||||
- name: lg_telemetry
|
||||
domains:
|
||||
- us.lgappstv.com
|
||||
- lgtvcommon.com
|
||||
- lgtvsdp.com
|
||||
category: lg
|
||||
description: LG Smart TV telemetry
|
||||
|
||||
- name: amazon_iot
|
||||
domains:
|
||||
- device-metrics-us.amazon.com
|
||||
category: amazon
|
||||
description: Amazon device telemetry
|
||||
|
||||
- name: ad_networks
|
||||
domains:
|
||||
- doubleclick.net
|
||||
- googleads.g.doubleclick.net
|
||||
category: advertising
|
||||
description: Common advertising networks served to IoT devices
|
||||
|
|
@ -41,3 +41,46 @@ class TestSchema:
|
|||
row = conn.execute("SELECT status, hit_count FROM blocklist_candidates").fetchone()
|
||||
assert row[0] == "pending"
|
||||
assert row[1] == 1
|
||||
|
||||
|
||||
class TestTelemetry:
|
||||
def _rules(self):
|
||||
from app.services.blocklist import load_telemetry_rules
|
||||
yaml_path = Path(__file__).parent.parent / "patterns" / "telemetry.yaml"
|
||||
return load_telemetry_rules(yaml_path)
|
||||
|
||||
def test_load_returns_rules(self):
|
||||
rules = self._rules()
|
||||
assert len(rules) >= 3
|
||||
|
||||
def test_samsung_rule_present(self):
|
||||
rules = self._rules()
|
||||
names = [r.name for r in rules]
|
||||
assert "samsung_ads" in names
|
||||
|
||||
def test_exact_domain_match(self):
|
||||
from app.services.blocklist import matches_telemetry
|
||||
rules = self._rules()
|
||||
result = matches_telemetry("samsungads.com", rules)
|
||||
assert result is not None
|
||||
assert result.name == "samsung_ads"
|
||||
|
||||
def test_subdomain_match(self):
|
||||
from app.services.blocklist import matches_telemetry
|
||||
rules = self._rules()
|
||||
result = matches_telemetry("sub.samsungads.com", rules)
|
||||
assert result is not None
|
||||
assert result.name == "samsung_ads"
|
||||
|
||||
def test_no_match_returns_none(self):
|
||||
from app.services.blocklist import matches_telemetry
|
||||
rules = self._rules()
|
||||
result = matches_telemetry("google.com", rules)
|
||||
assert result is None
|
||||
|
||||
def test_belkin_match(self):
|
||||
from app.services.blocklist import matches_telemetry
|
||||
rules = self._rules()
|
||||
result = matches_telemetry("api.xbcs.net", rules)
|
||||
assert result is not None
|
||||
assert result.category == "belkin"
|
||||
|
|
|
|||
Loading…
Reference in a new issue