turnstone/tests/test_service_blocklist.py
pyr0ball 0013ae916d feat(blocklist): telemetry YAML list + loader + domain matcher
Adds patterns/telemetry.yaml with 6 rule groups (samsung, belkin, roku, lg, amazon, advertising).
Adds app/services/blocklist.py with TelemetryRule and BlocklistCandidate dataclasses, load_telemetry_rules(), and matches_telemetry() with exact and subdomain matching.
6 new TestTelemetry tests pass; 199 total passing.
2026-05-15 20:54:40 -07:00

86 lines
3.2 KiB
Python

"""Tests for blocklist service — schema, extraction, candidate management."""
from __future__ import annotations
import sqlite3
import pytest
from pathlib import Path
class TestSchema:
def test_blocklist_candidates_table_exists(self, tmp_path):
from app.ingest.pipeline import ensure_schema
db = tmp_path / "test.db"
ensure_schema(db)
conn = sqlite3.connect(str(db))
tables = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
assert "blocklist_candidates" in tables
def test_blocklist_candidates_columns(self, tmp_path):
from app.ingest.pipeline import ensure_schema
db = tmp_path / "test.db"
ensure_schema(db)
conn = sqlite3.connect(str(db))
cols = {r[1] for r in conn.execute("PRAGMA table_info(blocklist_candidates)").fetchall()}
assert cols >= {
"id", "domain_or_ip", "source_device_ip", "source_device_name",
"first_seen", "last_seen", "hit_count", "status", "pushed_at",
"log_evidence", "matched_rule", "llm_score", "llm_reason",
}
def test_status_default_is_pending(self, tmp_path):
from app.ingest.pipeline import ensure_schema
import uuid
db = tmp_path / "test.db"
ensure_schema(db)
conn = sqlite3.connect(str(db))
conn.execute(
"INSERT INTO blocklist_candidates (id, domain_or_ip, first_seen, last_seen) VALUES (?, ?, ?, ?)",
(str(uuid.uuid4()), "samsungads.com", "2026-05-14T00:00:00+00:00", "2026-05-14T00:00:00+00:00"),
)
conn.commit()
row = conn.execute("SELECT status, hit_count FROM blocklist_candidates").fetchone()
assert row[0] == "pending"
assert row[1] == 1
class TestTelemetry:
def _rules(self):
from app.services.blocklist import load_telemetry_rules
yaml_path = Path(__file__).parent.parent / "patterns" / "telemetry.yaml"
return load_telemetry_rules(yaml_path)
def test_load_returns_rules(self):
rules = self._rules()
assert len(rules) >= 3
def test_samsung_rule_present(self):
rules = self._rules()
names = [r.name for r in rules]
assert "samsung_ads" in names
def test_exact_domain_match(self):
from app.services.blocklist import matches_telemetry
rules = self._rules()
result = matches_telemetry("samsungads.com", rules)
assert result is not None
assert result.name == "samsung_ads"
def test_subdomain_match(self):
from app.services.blocklist import matches_telemetry
rules = self._rules()
result = matches_telemetry("sub.samsungads.com", rules)
assert result is not None
assert result.name == "samsung_ads"
def test_no_match_returns_none(self):
from app.services.blocklist import matches_telemetry
rules = self._rules()
result = matches_telemetry("google.com", rules)
assert result is None
def test_belkin_match(self):
from app.services.blocklist import matches_telemetry
rules = self._rules()
result = matches_telemetry("api.xbcs.net", rules)
assert result is not None
assert result.category == "belkin"