"""Tests for blocklist service — schema, extraction, candidate management.""" from __future__ import annotations import sqlite3 import pytest from pathlib import Path class TestSchema: def test_blocklist_candidates_table_exists(self, tmp_path): from app.ingest.pipeline import ensure_schema db = tmp_path / "test.db" ensure_schema(db) conn = sqlite3.connect(str(db)) tables = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()} assert "blocklist_candidates" in tables def test_blocklist_candidates_columns(self, tmp_path): from app.ingest.pipeline import ensure_schema db = tmp_path / "test.db" ensure_schema(db) conn = sqlite3.connect(str(db)) cols = {r[1] for r in conn.execute("PRAGMA table_info(blocklist_candidates)").fetchall()} assert cols >= { "id", "domain_or_ip", "source_device_ip", "source_device_name", "first_seen", "last_seen", "hit_count", "status", "pushed_at", "log_evidence", "matched_rule", "llm_score", "llm_reason", } def test_status_default_is_pending(self, tmp_path): from app.ingest.pipeline import ensure_schema import uuid db = tmp_path / "test.db" ensure_schema(db) conn = sqlite3.connect(str(db)) conn.execute( "INSERT INTO blocklist_candidates (id, domain_or_ip, first_seen, last_seen) VALUES (?, ?, ?, ?)", (str(uuid.uuid4()), "samsungads.com", "2026-05-14T00:00:00+00:00", "2026-05-14T00:00:00+00:00"), ) conn.commit() row = conn.execute("SELECT status, hit_count FROM blocklist_candidates").fetchone() assert row[0] == "pending" assert row[1] == 1 class TestTelemetry: def _rules(self): from app.services.blocklist import load_telemetry_rules yaml_path = Path(__file__).parent.parent / "patterns" / "telemetry.yaml" return load_telemetry_rules(yaml_path) def test_load_returns_rules(self): rules = self._rules() assert len(rules) >= 3 def test_samsung_rule_present(self): rules = self._rules() names = [r.name for r in rules] assert "samsung_ads" in names def test_exact_domain_match(self): from app.services.blocklist import matches_telemetry rules = self._rules() result = matches_telemetry("samsungads.com", rules) assert result is not None assert result.name == "samsung_ads" def test_subdomain_match(self): from app.services.blocklist import matches_telemetry rules = self._rules() result = matches_telemetry("sub.samsungads.com", rules) assert result is not None assert result.name == "samsung_ads" def test_no_match_returns_none(self): from app.services.blocklist import matches_telemetry rules = self._rules() result = matches_telemetry("google.com", rules) assert result is None def test_belkin_match(self): from app.services.blocklist import matches_telemetry rules = self._rules() result = matches_telemetry("api.xbcs.net", rules) assert result is not None assert result.category == "belkin"