254 lines
9.7 KiB
Python
254 lines
9.7 KiB
Python
"""Tests for blocklist service — schema, extraction, candidate management."""
|
|
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
import pytest
|
|
from pathlib import Path
|
|
|
|
|
|
class TestSchema:
|
|
def test_blocklist_candidates_table_exists(self, tmp_path):
|
|
from app.ingest.pipeline import ensure_schema
|
|
db = tmp_path / "test.db"
|
|
ensure_schema(db)
|
|
conn = sqlite3.connect(str(db))
|
|
tables = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
|
|
assert "blocklist_candidates" in tables
|
|
|
|
def test_blocklist_candidates_columns(self, tmp_path):
|
|
from app.ingest.pipeline import ensure_schema
|
|
db = tmp_path / "test.db"
|
|
ensure_schema(db)
|
|
conn = sqlite3.connect(str(db))
|
|
cols = {r[1] for r in conn.execute("PRAGMA table_info(blocklist_candidates)").fetchall()}
|
|
assert cols >= {
|
|
"id", "domain_or_ip", "source_device_ip", "source_device_name",
|
|
"first_seen", "last_seen", "hit_count", "status", "pushed_at",
|
|
"log_evidence", "matched_rule", "llm_score", "llm_reason",
|
|
}
|
|
|
|
def test_status_default_is_pending(self, tmp_path):
|
|
from app.ingest.pipeline import ensure_schema
|
|
import uuid
|
|
db = tmp_path / "test.db"
|
|
ensure_schema(db)
|
|
conn = sqlite3.connect(str(db))
|
|
conn.execute(
|
|
"INSERT INTO blocklist_candidates (id, domain_or_ip, first_seen, last_seen) VALUES (?, ?, ?, ?)",
|
|
(str(uuid.uuid4()), "samsungads.com", "2026-05-14T00:00:00+00:00", "2026-05-14T00:00:00+00:00"),
|
|
)
|
|
conn.commit()
|
|
row = conn.execute("SELECT status, hit_count FROM blocklist_candidates").fetchone()
|
|
assert row[0] == "pending"
|
|
assert row[1] == 1
|
|
|
|
|
|
class TestTelemetry:
|
|
def _rules(self):
|
|
from app.services.blocklist import load_telemetry_rules
|
|
yaml_path = Path(__file__).parent.parent / "patterns" / "telemetry.yaml"
|
|
return load_telemetry_rules(yaml_path)
|
|
|
|
def test_load_returns_rules(self):
|
|
rules = self._rules()
|
|
assert len(rules) >= 3
|
|
|
|
def test_samsung_rule_present(self):
|
|
rules = self._rules()
|
|
names = [r.name for r in rules]
|
|
assert "samsung_ads" in names
|
|
|
|
def test_exact_domain_match(self):
|
|
from app.services.blocklist import matches_telemetry
|
|
rules = self._rules()
|
|
result = matches_telemetry("samsungads.com", rules)
|
|
assert result is not None
|
|
assert result.name == "samsung_ads"
|
|
|
|
def test_subdomain_match(self):
|
|
from app.services.blocklist import matches_telemetry
|
|
rules = self._rules()
|
|
result = matches_telemetry("sub.samsungads.com", rules)
|
|
assert result is not None
|
|
assert result.name == "samsung_ads"
|
|
|
|
def test_no_match_returns_none(self):
|
|
from app.services.blocklist import matches_telemetry
|
|
rules = self._rules()
|
|
result = matches_telemetry("google.com", rules)
|
|
assert result is None
|
|
|
|
def test_belkin_match(self):
|
|
from app.services.blocklist import matches_telemetry
|
|
rules = self._rules()
|
|
result = matches_telemetry("api.xbcs.net", rules)
|
|
assert result is not None
|
|
assert result.category == "belkin"
|
|
|
|
|
|
class TestExtraction:
|
|
@pytest.fixture
|
|
def db(self, tmp_path):
|
|
from app.ingest.pipeline import ensure_schema
|
|
p = tmp_path / "test.db"
|
|
ensure_schema(p)
|
|
return p
|
|
|
|
@pytest.fixture
|
|
def rules(self):
|
|
from app.services.blocklist import load_telemetry_rules
|
|
return load_telemetry_rules(
|
|
Path(__file__).parent.parent / "patterns" / "telemetry.yaml"
|
|
)
|
|
|
|
def test_dnsmasq_entry_extracted(self, db, rules):
|
|
import sqlite3
|
|
from app.services.blocklist import run_scan
|
|
conn = sqlite3.connect(str(db))
|
|
conn.execute(
|
|
"""INSERT INTO log_entries (id, source_id, sequence, ingest_time, text)
|
|
VALUES ('e1', 'router:syslog', 1, '2026-05-14T00:00:00+00:00',
|
|
'dnsmasq[123]: query[A] samsungads.com from 192.168.1.45')"""
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
count = run_scan(
|
|
db,
|
|
router_source_ids=["router:syslog"],
|
|
device_map={"192.168.1.45": "Samsung Projector"},
|
|
telemetry_rules=rules,
|
|
)
|
|
assert count >= 1
|
|
conn = sqlite3.connect(str(db))
|
|
row = conn.execute(
|
|
"SELECT domain_or_ip, source_device_name, matched_rule, status FROM blocklist_candidates"
|
|
).fetchone()
|
|
conn.close()
|
|
assert row[0] == "samsungads.com"
|
|
assert row[1] == "Samsung Projector"
|
|
assert row[2] == "samsung_ads"
|
|
assert row[3] == "pending"
|
|
|
|
def test_iptables_entry_extracted(self, db, rules):
|
|
import sqlite3
|
|
from app.services.blocklist import run_scan
|
|
conn = sqlite3.connect(str(db))
|
|
conn.execute(
|
|
"""INSERT INTO log_entries (id, source_id, sequence, ingest_time, text)
|
|
VALUES ('e2', 'router:fw', 1, '2026-05-14T00:00:00+00:00',
|
|
'kernel: FORWARD SRC=192.168.1.67 DST=52.11.243.144 PROTO=TCP DPT=443')"""
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
count = run_scan(
|
|
db,
|
|
router_source_ids=["router:fw"],
|
|
device_map={"192.168.1.67": "Belkin Switch 1"},
|
|
telemetry_rules=rules,
|
|
)
|
|
assert count >= 1
|
|
conn = sqlite3.connect(str(db))
|
|
row = conn.execute("SELECT domain_or_ip, source_device_name FROM blocklist_candidates").fetchone()
|
|
conn.close()
|
|
assert row[0] == "52.11.243.144"
|
|
assert row[1] == "Belkin Switch 1"
|
|
|
|
def test_unknown_device_skipped(self, db, rules):
|
|
import sqlite3
|
|
from app.services.blocklist import run_scan
|
|
conn = sqlite3.connect(str(db))
|
|
conn.execute(
|
|
"""INSERT INTO log_entries (id, source_id, sequence, ingest_time, text)
|
|
VALUES ('e3', 'router:syslog', 1, '2026-05-14T00:00:00+00:00',
|
|
'dnsmasq[123]: query[A] samsungads.com from 10.0.0.99')"""
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
count = run_scan(
|
|
db,
|
|
router_source_ids=["router:syslog"],
|
|
device_map={"192.168.1.45": "Samsung Projector"},
|
|
telemetry_rules=rules,
|
|
)
|
|
assert count == 0
|
|
|
|
def test_dedup_upsert_increments_hit_count(self, db, rules):
|
|
import sqlite3
|
|
from app.services.blocklist import run_scan
|
|
conn = sqlite3.connect(str(db))
|
|
for i in range(3):
|
|
conn.execute(
|
|
f"""INSERT INTO log_entries (id, source_id, sequence, ingest_time, text)
|
|
VALUES ('e{i}', 'router:syslog', {i}, '2026-05-14T00:00:00+00:00',
|
|
'dnsmasq[123]: query[A] samsungads.com from 192.168.1.45')"""
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
run_scan(db, ["router:syslog"], {"192.168.1.45": "Projector"}, rules)
|
|
conn = sqlite3.connect(str(db))
|
|
rows = conn.execute("SELECT hit_count FROM blocklist_candidates").fetchall()
|
|
conn.close()
|
|
assert len(rows) == 1 # one row, not three
|
|
assert rows[0][0] == 3
|
|
|
|
|
|
class TestCandidateManagement:
|
|
@pytest.fixture
|
|
def db_with_candidate(self, tmp_path):
|
|
from app.ingest.pipeline import ensure_schema
|
|
import sqlite3, uuid
|
|
db = tmp_path / "test.db"
|
|
ensure_schema(db)
|
|
conn = sqlite3.connect(str(db))
|
|
cid = str(uuid.uuid4())
|
|
conn.execute(
|
|
"""INSERT INTO blocklist_candidates
|
|
(id, domain_or_ip, first_seen, last_seen)
|
|
VALUES (?, 'samsungads.com', '2026-05-14T00:00:00+00:00', '2026-05-14T00:00:00+00:00')""",
|
|
(cid,),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return db, cid
|
|
|
|
def test_list_candidates_returns_all(self, db_with_candidate):
|
|
from app.services.blocklist import list_candidates
|
|
db, _ = db_with_candidate
|
|
results = list_candidates(db)
|
|
assert len(results) == 1
|
|
assert results[0].domain_or_ip == "samsungads.com"
|
|
|
|
def test_list_candidates_filter_by_status(self, db_with_candidate):
|
|
from app.services.blocklist import list_candidates
|
|
db, _ = db_with_candidate
|
|
assert len(list_candidates(db, status="pending")) == 1
|
|
assert len(list_candidates(db, status="pushed")) == 0
|
|
|
|
def test_update_status_to_approved(self, db_with_candidate):
|
|
from app.services.blocklist import update_candidate_status, list_candidates
|
|
db, cid = db_with_candidate
|
|
candidate = update_candidate_status(db, cid, "approved")
|
|
assert candidate.status == "approved"
|
|
assert list_candidates(db, status="approved")[0].status == "approved"
|
|
|
|
def test_update_status_invalid_raises(self, db_with_candidate):
|
|
from app.services.blocklist import update_candidate_status
|
|
db, cid = db_with_candidate
|
|
with pytest.raises(ValueError, match="Invalid status"):
|
|
update_candidate_status(db, cid, "hacked")
|
|
|
|
def test_mark_pushed_sets_status_and_timestamp(self, db_with_candidate):
|
|
from app.services.blocklist import update_candidate_status, mark_pushed
|
|
db, cid = db_with_candidate
|
|
update_candidate_status(db, cid, "approved")
|
|
candidate = mark_pushed(db, cid)
|
|
assert candidate.status == "pushed"
|
|
assert candidate.pushed_at is not None
|
|
|
|
def test_mark_unblocked(self, db_with_candidate):
|
|
from app.services.blocklist import update_candidate_status, mark_pushed, mark_unblocked
|
|
db, cid = db_with_candidate
|
|
update_candidate_status(db, cid, "approved")
|
|
mark_pushed(db, cid)
|
|
candidate = mark_unblocked(db, cid)
|
|
assert candidate.status == "unblocked"
|