"""Tests for blocklist service — schema, extraction, candidate management.""" from __future__ import annotations import sqlite3 import pytest from pathlib import Path class TestSchema: def test_blocklist_candidates_table_exists(self, tmp_path): from app.glean.pipeline import ensure_schema db = tmp_path / "test.db" ensure_schema(db) conn = sqlite3.connect(str(db)) tables = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()} assert "blocklist_candidates" in tables def test_blocklist_candidates_columns(self, tmp_path): from app.glean.pipeline import ensure_schema db = tmp_path / "test.db" ensure_schema(db) conn = sqlite3.connect(str(db)) cols = {r[1] for r in conn.execute("PRAGMA table_info(blocklist_candidates)").fetchall()} assert cols >= { "id", "domain_or_ip", "source_device_ip", "source_device_name", "first_seen", "last_seen", "hit_count", "status", "pushed_at", "log_evidence", "matched_rule", "llm_score", "llm_reason", } def test_status_default_is_pending(self, tmp_path): from app.glean.pipeline import ensure_schema import uuid db = tmp_path / "test.db" ensure_schema(db) conn = sqlite3.connect(str(db)) conn.execute( "INSERT INTO blocklist_candidates (id, domain_or_ip, first_seen, last_seen) VALUES (?, ?, ?, ?)", (str(uuid.uuid4()), "samsungads.com", "2026-05-14T00:00:00+00:00", "2026-05-14T00:00:00+00:00"), ) conn.commit() row = conn.execute("SELECT status, hit_count FROM blocklist_candidates").fetchone() assert row[0] == "pending" assert row[1] == 1 class TestTelemetry: def _rules(self): from app.services.blocklist import load_telemetry_rules yaml_path = Path(__file__).parent.parent / "patterns" / "telemetry.yaml" return load_telemetry_rules(yaml_path) def test_load_returns_rules(self): rules = self._rules() assert len(rules) >= 3 def test_samsung_rule_present(self): rules = self._rules() names = [r.name for r in rules] assert "samsung_ads" in names def test_exact_domain_match(self): from app.services.blocklist import matches_telemetry rules = self._rules() result = matches_telemetry("samsungads.com", rules) assert result is not None assert result.name == "samsung_ads" def test_subdomain_match(self): from app.services.blocklist import matches_telemetry rules = self._rules() result = matches_telemetry("sub.samsungads.com", rules) assert result is not None assert result.name == "samsung_ads" def test_no_match_returns_none(self): from app.services.blocklist import matches_telemetry rules = self._rules() result = matches_telemetry("google.com", rules) assert result is None def test_belkin_match(self): from app.services.blocklist import matches_telemetry rules = self._rules() result = matches_telemetry("api.xbcs.net", rules) assert result is not None assert result.category == "belkin" class TestExtraction: @pytest.fixture def db(self, tmp_path): from app.glean.pipeline import ensure_schema p = tmp_path / "test.db" ensure_schema(p) return p @pytest.fixture def rules(self): from app.services.blocklist import load_telemetry_rules return load_telemetry_rules( Path(__file__).parent.parent / "patterns" / "telemetry.yaml" ) def test_dnsmasq_entry_extracted(self, db, rules): import sqlite3 from app.services.blocklist import run_scan conn = sqlite3.connect(str(db)) conn.execute( """INSERT INTO log_entries (id, source_id, sequence, ingest_time, text) VALUES ('e1', 'router:syslog', 1, '2026-05-14T00:00:00+00:00', 'dnsmasq[123]: query[A] samsungads.com from 192.168.1.45')""" ) conn.commit() conn.close() count = run_scan( db, router_source_ids=["router:syslog"], device_map={"192.168.1.45": "Samsung Projector"}, telemetry_rules=rules, ) assert count >= 1 conn = sqlite3.connect(str(db)) row = conn.execute( "SELECT domain_or_ip, source_device_name, matched_rule, status FROM blocklist_candidates" ).fetchone() conn.close() assert row[0] == "samsungads.com" assert row[1] == "Samsung Projector" assert row[2] == "samsung_ads" assert row[3] == "pending" def test_iptables_entry_extracted(self, db, rules): import sqlite3 from app.services.blocklist import run_scan conn = sqlite3.connect(str(db)) conn.execute( """INSERT INTO log_entries (id, source_id, sequence, ingest_time, text) VALUES ('e2', 'router:fw', 1, '2026-05-14T00:00:00+00:00', 'kernel: FORWARD SRC=192.168.1.67 DST=52.11.243.144 PROTO=TCP DPT=443')""" ) conn.commit() conn.close() count = run_scan( db, router_source_ids=["router:fw"], device_map={"192.168.1.67": "Belkin Switch 1"}, telemetry_rules=rules, ) assert count >= 1 conn = sqlite3.connect(str(db)) row = conn.execute("SELECT domain_or_ip, source_device_name FROM blocklist_candidates").fetchone() conn.close() assert row[0] == "52.11.243.144" assert row[1] == "Belkin Switch 1" def test_unknown_device_skipped(self, db, rules): import sqlite3 from app.services.blocklist import run_scan conn = sqlite3.connect(str(db)) conn.execute( """INSERT INTO log_entries (id, source_id, sequence, ingest_time, text) VALUES ('e3', 'router:syslog', 1, '2026-05-14T00:00:00+00:00', 'dnsmasq[123]: query[A] samsungads.com from 10.0.0.99')""" ) conn.commit() conn.close() count = run_scan( db, router_source_ids=["router:syslog"], device_map={"192.168.1.45": "Samsung Projector"}, telemetry_rules=rules, ) assert count == 0 def test_dedup_upsert_increments_hit_count(self, db, rules): import sqlite3 from app.services.blocklist import run_scan conn = sqlite3.connect(str(db)) for i in range(3): conn.execute( f"""INSERT INTO log_entries (id, source_id, sequence, ingest_time, text) VALUES ('e{i}', 'router:syslog', {i}, '2026-05-14T00:00:00+00:00', 'dnsmasq[123]: query[A] samsungads.com from 192.168.1.45')""" ) conn.commit() conn.close() run_scan(db, ["router:syslog"], {"192.168.1.45": "Projector"}, rules) conn = sqlite3.connect(str(db)) rows = conn.execute("SELECT hit_count FROM blocklist_candidates").fetchall() conn.close() assert len(rows) == 1 # one row, not three assert rows[0][0] == 3 class TestCandidateManagement: @pytest.fixture def db_with_candidate(self, tmp_path): from app.glean.pipeline import ensure_schema import sqlite3, uuid db = tmp_path / "test.db" ensure_schema(db) conn = sqlite3.connect(str(db)) cid = str(uuid.uuid4()) conn.execute( """INSERT INTO blocklist_candidates (id, domain_or_ip, first_seen, last_seen) VALUES (?, 'samsungads.com', '2026-05-14T00:00:00+00:00', '2026-05-14T00:00:00+00:00')""", (cid,), ) conn.commit() conn.close() return db, cid def test_list_candidates_returns_all(self, db_with_candidate): from app.services.blocklist import list_candidates db, _ = db_with_candidate results = list_candidates(db) assert len(results) == 1 assert results[0].domain_or_ip == "samsungads.com" def test_list_candidates_filter_by_status(self, db_with_candidate): from app.services.blocklist import list_candidates db, _ = db_with_candidate assert len(list_candidates(db, status="pending")) == 1 assert len(list_candidates(db, status="pushed")) == 0 def test_update_status_to_approved(self, db_with_candidate): from app.services.blocklist import update_candidate_status, list_candidates db, cid = db_with_candidate candidate = update_candidate_status(db, cid, "approved") assert candidate.status == "approved" assert list_candidates(db, status="approved")[0].status == "approved" def test_update_status_invalid_raises(self, db_with_candidate): from app.services.blocklist import update_candidate_status db, cid = db_with_candidate with pytest.raises(ValueError, match="Invalid status"): update_candidate_status(db, cid, "hacked") def test_mark_pushed_sets_status_and_timestamp(self, db_with_candidate): from app.services.blocklist import update_candidate_status, mark_pushed db, cid = db_with_candidate update_candidate_status(db, cid, "approved") candidate = mark_pushed(db, cid) assert candidate.status == "pushed" assert candidate.pushed_at is not None def test_mark_unblocked(self, db_with_candidate): from app.services.blocklist import update_candidate_status, mark_pushed, mark_unblocked db, cid = db_with_candidate update_candidate_status(db, cid, "approved") mark_pushed(db, cid) candidate = mark_unblocked(db, cid) assert candidate.status == "unblocked"