From 0013ae916d5e993928aeee28964c8092d23456a2 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Fri, 15 May 2026 20:54:40 -0700 Subject: [PATCH] feat(blocklist): telemetry YAML list + loader + domain matcher Adds patterns/telemetry.yaml with 6 rule groups (samsung, belkin, roku, lg, amazon, advertising). Adds app/services/blocklist.py with TelemetryRule and BlocklistCandidate dataclasses, load_telemetry_rules(), and matches_telemetry() with exact and subdomain matching. 6 new TestTelemetry tests pass; 199 total passing. --- app/services/blocklist.py | 69 +++++++++++++++++++++++++++++++++ patterns/telemetry.yaml | 46 ++++++++++++++++++++++ tests/test_service_blocklist.py | 43 ++++++++++++++++++++ 3 files changed, 158 insertions(+) create mode 100644 app/services/blocklist.py create mode 100644 patterns/telemetry.yaml diff --git a/app/services/blocklist.py b/app/services/blocklist.py new file mode 100644 index 0000000..588966e --- /dev/null +++ b/app/services/blocklist.py @@ -0,0 +1,69 @@ +"""Blocklist candidate extraction, management, and telemetry matching.""" +from __future__ import annotations + +import dataclasses +import json +import re +import sqlite3 +import uuid +from datetime import datetime, timezone +from pathlib import Path + +import yaml + + +# --------------------------------------------------------------------------- +# Data models +# --------------------------------------------------------------------------- + +@dataclasses.dataclass(frozen=True) +class TelemetryRule: + name: str + domains: tuple[str, ...] + category: str + description: str + + +@dataclasses.dataclass +class BlocklistCandidate: + id: str + domain_or_ip: str + source_device_ip: str | None + source_device_name: str | None + first_seen: str + last_seen: str + hit_count: int + status: str + pushed_at: str | None + log_evidence: list[str] + matched_rule: str | None + llm_score: float | None + llm_reason: str | None + + +# --------------------------------------------------------------------------- +# Telemetry list +# --------------------------------------------------------------------------- + +def load_telemetry_rules(path: Path) -> list[TelemetryRule]: + """Load telemetry rules from a YAML file.""" + data = yaml.safe_load(path.read_text()) + return [ + TelemetryRule( + name=r["name"], + domains=tuple(d.lower().strip(".") for d in r["domains"]), + category=r["category"], + description=r.get("description", ""), + ) + for r in data.get("rules", []) + ] + + +def matches_telemetry(domain: str, rules: list[TelemetryRule]) -> TelemetryRule | None: + """Return the first rule whose domains include domain or a parent domain, else None.""" + d = domain.lower().strip(".") + for rule in rules: + for rd in rule.domains: + if d == rd or d.endswith("." + rd): + return rule + return None diff --git a/patterns/telemetry.yaml b/patterns/telemetry.yaml new file mode 100644 index 0000000..5be9873 --- /dev/null +++ b/patterns/telemetry.yaml @@ -0,0 +1,46 @@ +version: 1 +rules: + - name: samsung_ads + domains: + - samsungads.com + - samsungcloudsolution.com + - samsungrm.net + - samsungacr.com + category: samsung + description: Samsung Smart TV advertising and telemetry + + - name: belkin_wemo + domains: + - api.xbcs.net + - wemo.belkin.com + - statistics.belkin.com + category: belkin + description: Belkin/WeMo smart device telemetry + + - name: roku_telemetry + domains: + - logs.roku.com + - scribe.logs.roku.com + category: roku + description: Roku device telemetry + + - name: lg_telemetry + domains: + - us.lgappstv.com + - lgtvcommon.com + - lgtvsdp.com + category: lg + description: LG Smart TV telemetry + + - name: amazon_iot + domains: + - device-metrics-us.amazon.com + category: amazon + description: Amazon device telemetry + + - name: ad_networks + domains: + - doubleclick.net + - googleads.g.doubleclick.net + category: advertising + description: Common advertising networks served to IoT devices diff --git a/tests/test_service_blocklist.py b/tests/test_service_blocklist.py index 7b3fc4e..01e3bbf 100644 --- a/tests/test_service_blocklist.py +++ b/tests/test_service_blocklist.py @@ -41,3 +41,46 @@ class TestSchema: row = conn.execute("SELECT status, hit_count FROM blocklist_candidates").fetchone() assert row[0] == "pending" assert row[1] == 1 + + +class TestTelemetry: + def _rules(self): + from app.services.blocklist import load_telemetry_rules + yaml_path = Path(__file__).parent.parent / "patterns" / "telemetry.yaml" + return load_telemetry_rules(yaml_path) + + def test_load_returns_rules(self): + rules = self._rules() + assert len(rules) >= 3 + + def test_samsung_rule_present(self): + rules = self._rules() + names = [r.name for r in rules] + assert "samsung_ads" in names + + def test_exact_domain_match(self): + from app.services.blocklist import matches_telemetry + rules = self._rules() + result = matches_telemetry("samsungads.com", rules) + assert result is not None + assert result.name == "samsung_ads" + + def test_subdomain_match(self): + from app.services.blocklist import matches_telemetry + rules = self._rules() + result = matches_telemetry("sub.samsungads.com", rules) + assert result is not None + assert result.name == "samsung_ads" + + def test_no_match_returns_none(self): + from app.services.blocklist import matches_telemetry + rules = self._rules() + result = matches_telemetry("google.com", rules) + assert result is None + + def test_belkin_match(self): + from app.services.blocklist import matches_telemetry + rules = self._rules() + result = matches_telemetry("api.xbcs.net", rules) + assert result is not None + assert result.category == "belkin"