From c813832cbe4e9f3bc1a2b57659c68d65cd0b5e86 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Fri, 15 May 2026 21:05:49 -0700 Subject: [PATCH] feat(blocklist): extraction scan + candidate CRUD + full test suite --- app/services/blocklist.py | 224 ++++++++++++++++++++++++++++++++ tests/test_service_blocklist.py | 168 ++++++++++++++++++++++++ 2 files changed, 392 insertions(+) diff --git a/app/services/blocklist.py b/app/services/blocklist.py index 150dce0..513a778 100644 --- a/app/services/blocklist.py +++ b/app/services/blocklist.py @@ -2,6 +2,11 @@ from __future__ import annotations import dataclasses +import json +import re +import sqlite3 +import uuid +from datetime import datetime, timezone from pathlib import Path import yaml @@ -62,3 +67,222 @@ def matches_telemetry(domain: str, rules: list[TelemetryRule]) -> TelemetryRule if d == rd or d.endswith("." + rd): return rule return None + + +# --------------------------------------------------------------------------- +# Regex extractors for router log entries +# --------------------------------------------------------------------------- + +_DNSMASQ_RE = re.compile( + r"query\[A{1,4}\]\s+(?P\S+)\s+from\s+(?P[\d.]+)" +) +_IPTABLES_RE = re.compile( + r"SRC=(?P[\d.]+).*?DST=(?P[\d.a-zA-Z.-]+)" +) + +_VALID_STATUSES = {"pending", "approved", "rejected", "pushed", "unblocked"} + + +# --------------------------------------------------------------------------- +# DB helpers +# --------------------------------------------------------------------------- + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _row_to_candidate(row: tuple) -> BlocklistCandidate: + return BlocklistCandidate( + id=row[0], + domain_or_ip=row[1], + source_device_ip=row[2], + source_device_name=row[3], + first_seen=row[4], + last_seen=row[5], + hit_count=row[6], + status=row[7], + pushed_at=row[8], + log_evidence=json.loads(row[9] or "[]"), + matched_rule=row[10], + llm_score=row[11], + llm_reason=row[12], + ) + + +def _upsert_candidate( + conn: sqlite3.Connection, + domain_or_ip: str, + source_device_ip: str | None, + source_device_name: str | None, + matched_rule: str | None, + entry_id: str, + now: str, +) -> bool: + """Insert or update a candidate. Returns True if a new row was created.""" + row = conn.execute( + "SELECT id, hit_count, log_evidence FROM blocklist_candidates " + "WHERE domain_or_ip = ? AND source_device_ip IS ?", + (domain_or_ip, source_device_ip), + ).fetchone() + + if row is None: + conn.execute( + """INSERT INTO blocklist_candidates + (id, domain_or_ip, source_device_ip, source_device_name, + first_seen, last_seen, hit_count, status, pushed_at, log_evidence, matched_rule) + VALUES (?, ?, ?, ?, ?, ?, 1, 'pending', NULL, ?, ?)""", + ( + str(uuid.uuid4()), domain_or_ip, source_device_ip, source_device_name, + now, now, json.dumps([entry_id]), matched_rule, + ), + ) + return True + + existing_id, hit_count, existing_evidence = row + evidence = json.loads(existing_evidence or "[]") + if entry_id not in evidence: + evidence.append(entry_id) + evidence = evidence[-10:] # cap at 10 + conn.execute( + "UPDATE blocklist_candidates SET last_seen=?, hit_count=?, log_evidence=? WHERE id=?", + (now, hit_count + 1, json.dumps(evidence), existing_id), + ) + return False + + +# --------------------------------------------------------------------------- +# Extraction scan +# --------------------------------------------------------------------------- + +def run_scan( + db_path: Path, + router_source_ids: list[str], + device_map: dict[str, str], + telemetry_rules: list[TelemetryRule], +) -> int: + """Scan log_entries from router sources, upsert blocklist candidates. + + Only entries whose source IP is in device_map are recorded. + Returns the total number of rows created or updated. + """ + if not router_source_ids or not device_map: + return 0 + + placeholders = ",".join("?" for _ in router_source_ids) + now = _now_iso() + count = 0 + + conn = sqlite3.connect(str(db_path)) + try: + rows = conn.execute( + f"SELECT id, text FROM log_entries WHERE source_id IN ({placeholders})", + router_source_ids, + ).fetchall() + + for entry_id, text in rows: + src_ip: str | None = None + dst: str | None = None + + m = _DNSMASQ_RE.search(text) + if m: + src_ip = m.group("src") + dst = m.group("domain") + else: + m = _IPTABLES_RE.search(text) + if m: + src_ip = m.group("src") + dst = m.group("dst") + + if src_ip is None or src_ip not in device_map: + continue + + device_name = device_map[src_ip] + rule = matches_telemetry(dst, telemetry_rules) if dst else None + matched_rule_name = rule.name if rule else None + + _upsert_candidate(conn, dst or "unknown", src_ip, device_name, matched_rule_name, entry_id, now) + count += 1 + + conn.commit() + finally: + conn.close() + + return count + + +# --------------------------------------------------------------------------- +# Candidate CRUD +# --------------------------------------------------------------------------- + +_CANDIDATE_SELECT = ( + "SELECT id,domain_or_ip,source_device_ip,source_device_name," + "first_seen,last_seen,hit_count,status,pushed_at,log_evidence," + "matched_rule,llm_score,llm_reason FROM blocklist_candidates" +) + + +def list_candidates( + db_path: Path, + status: str | None = None, + device_ip: str | None = None, +) -> list[BlocklistCandidate]: + conn = sqlite3.connect(str(db_path)) + try: + query = f"{_CANDIDATE_SELECT} WHERE 1=1" + params: list = [] + if status and status != "all": + query += " AND status = ?" + params.append(status) + if device_ip: + query += " AND source_device_ip = ?" + params.append(device_ip) + query += " ORDER BY last_seen DESC" + rows = conn.execute(query, params).fetchall() + finally: + conn.close() + return [_row_to_candidate(r) for r in rows] + + +def _get_candidate(conn: sqlite3.Connection, candidate_id: str) -> BlocklistCandidate: + row = conn.execute( + f"{_CANDIDATE_SELECT} WHERE id=?", + (candidate_id,), + ).fetchone() + if row is None: + raise KeyError(f"Candidate {candidate_id!r} not found") + return _row_to_candidate(row) + + +def update_candidate_status(db_path: Path, candidate_id: str, new_status: str) -> BlocklistCandidate: + if new_status not in _VALID_STATUSES: + raise ValueError(f"Invalid status {new_status!r}. Must be one of {_VALID_STATUSES}") + conn = sqlite3.connect(str(db_path)) + try: + conn.execute("UPDATE blocklist_candidates SET status=? WHERE id=?", (new_status, candidate_id)) + conn.commit() + return _get_candidate(conn, candidate_id) + finally: + conn.close() + + +def mark_pushed(db_path: Path, candidate_id: str) -> BlocklistCandidate: + conn = sqlite3.connect(str(db_path)) + try: + conn.execute( + "UPDATE blocklist_candidates SET status='pushed', pushed_at=? WHERE id=?", + (_now_iso(), candidate_id), + ) + conn.commit() + return _get_candidate(conn, candidate_id) + finally: + conn.close() + + +def mark_unblocked(db_path: Path, candidate_id: str) -> BlocklistCandidate: + conn = sqlite3.connect(str(db_path)) + try: + conn.execute("UPDATE blocklist_candidates SET status='unblocked' WHERE id=?", (candidate_id,)) + conn.commit() + return _get_candidate(conn, candidate_id) + finally: + conn.close() diff --git a/tests/test_service_blocklist.py b/tests/test_service_blocklist.py index 01e3bbf..893a076 100644 --- a/tests/test_service_blocklist.py +++ b/tests/test_service_blocklist.py @@ -84,3 +84,171 @@ class TestTelemetry: result = matches_telemetry("api.xbcs.net", rules) assert result is not None assert result.category == "belkin" + + +class TestExtraction: + @pytest.fixture + def db(self, tmp_path): + from app.ingest.pipeline import ensure_schema + p = tmp_path / "test.db" + ensure_schema(p) + return p + + @pytest.fixture + def rules(self): + from app.services.blocklist import load_telemetry_rules + return load_telemetry_rules( + Path(__file__).parent.parent / "patterns" / "telemetry.yaml" + ) + + def test_dnsmasq_entry_extracted(self, db, rules): + import sqlite3 + from app.services.blocklist import run_scan + conn = sqlite3.connect(str(db)) + conn.execute( + """INSERT INTO log_entries (id, source_id, sequence, ingest_time, text) + VALUES ('e1', 'router:syslog', 1, '2026-05-14T00:00:00+00:00', + 'dnsmasq[123]: query[A] samsungads.com from 192.168.1.45')""" + ) + conn.commit() + conn.close() + count = run_scan( + db, + router_source_ids=["router:syslog"], + device_map={"192.168.1.45": "Samsung Projector"}, + telemetry_rules=rules, + ) + assert count >= 1 + conn = sqlite3.connect(str(db)) + row = conn.execute( + "SELECT domain_or_ip, source_device_name, matched_rule, status FROM blocklist_candidates" + ).fetchone() + conn.close() + assert row[0] == "samsungads.com" + assert row[1] == "Samsung Projector" + assert row[2] == "samsung_ads" + assert row[3] == "pending" + + def test_iptables_entry_extracted(self, db, rules): + import sqlite3 + from app.services.blocklist import run_scan + conn = sqlite3.connect(str(db)) + conn.execute( + """INSERT INTO log_entries (id, source_id, sequence, ingest_time, text) + VALUES ('e2', 'router:fw', 1, '2026-05-14T00:00:00+00:00', + 'kernel: FORWARD SRC=192.168.1.67 DST=52.11.243.144 PROTO=TCP DPT=443')""" + ) + conn.commit() + conn.close() + count = run_scan( + db, + router_source_ids=["router:fw"], + device_map={"192.168.1.67": "Belkin Switch 1"}, + telemetry_rules=rules, + ) + assert count >= 1 + conn = sqlite3.connect(str(db)) + row = conn.execute("SELECT domain_or_ip, source_device_name FROM blocklist_candidates").fetchone() + conn.close() + assert row[0] == "52.11.243.144" + assert row[1] == "Belkin Switch 1" + + def test_unknown_device_skipped(self, db, rules): + import sqlite3 + from app.services.blocklist import run_scan + conn = sqlite3.connect(str(db)) + conn.execute( + """INSERT INTO log_entries (id, source_id, sequence, ingest_time, text) + VALUES ('e3', 'router:syslog', 1, '2026-05-14T00:00:00+00:00', + 'dnsmasq[123]: query[A] samsungads.com from 10.0.0.99')""" + ) + conn.commit() + conn.close() + count = run_scan( + db, + router_source_ids=["router:syslog"], + device_map={"192.168.1.45": "Samsung Projector"}, + telemetry_rules=rules, + ) + assert count == 0 + + def test_dedup_upsert_increments_hit_count(self, db, rules): + import sqlite3 + from app.services.blocklist import run_scan + conn = sqlite3.connect(str(db)) + for i in range(3): + conn.execute( + f"""INSERT INTO log_entries (id, source_id, sequence, ingest_time, text) + VALUES ('e{i}', 'router:syslog', {i}, '2026-05-14T00:00:00+00:00', + 'dnsmasq[123]: query[A] samsungads.com from 192.168.1.45')""" + ) + conn.commit() + conn.close() + run_scan(db, ["router:syslog"], {"192.168.1.45": "Projector"}, rules) + conn = sqlite3.connect(str(db)) + rows = conn.execute("SELECT hit_count FROM blocklist_candidates").fetchall() + conn.close() + assert len(rows) == 1 # one row, not three + assert rows[0][0] == 3 + + +class TestCandidateManagement: + @pytest.fixture + def db_with_candidate(self, tmp_path): + from app.ingest.pipeline import ensure_schema + import sqlite3, uuid + db = tmp_path / "test.db" + ensure_schema(db) + conn = sqlite3.connect(str(db)) + cid = str(uuid.uuid4()) + conn.execute( + """INSERT INTO blocklist_candidates + (id, domain_or_ip, first_seen, last_seen) + VALUES (?, 'samsungads.com', '2026-05-14T00:00:00+00:00', '2026-05-14T00:00:00+00:00')""", + (cid,), + ) + conn.commit() + conn.close() + return db, cid + + def test_list_candidates_returns_all(self, db_with_candidate): + from app.services.blocklist import list_candidates + db, _ = db_with_candidate + results = list_candidates(db) + assert len(results) == 1 + assert results[0].domain_or_ip == "samsungads.com" + + def test_list_candidates_filter_by_status(self, db_with_candidate): + from app.services.blocklist import list_candidates + db, _ = db_with_candidate + assert len(list_candidates(db, status="pending")) == 1 + assert len(list_candidates(db, status="pushed")) == 0 + + def test_update_status_to_approved(self, db_with_candidate): + from app.services.blocklist import update_candidate_status, list_candidates + db, cid = db_with_candidate + candidate = update_candidate_status(db, cid, "approved") + assert candidate.status == "approved" + assert list_candidates(db, status="approved")[0].status == "approved" + + def test_update_status_invalid_raises(self, db_with_candidate): + from app.services.blocklist import update_candidate_status + db, cid = db_with_candidate + with pytest.raises(ValueError, match="Invalid status"): + update_candidate_status(db, cid, "hacked") + + def test_mark_pushed_sets_status_and_timestamp(self, db_with_candidate): + from app.services.blocklist import update_candidate_status, mark_pushed + db, cid = db_with_candidate + update_candidate_status(db, cid, "approved") + candidate = mark_pushed(db, cid) + assert candidate.status == "pushed" + assert candidate.pushed_at is not None + + def test_mark_unblocked(self, db_with_candidate): + from app.services.blocklist import update_candidate_status, mark_pushed, mark_unblocked + db, cid = db_with_candidate + update_candidate_status(db, cid, "approved") + mark_pushed(db, cid) + candidate = mark_unblocked(db, cid) + assert candidate.status == "unblocked"