"""Blocklist candidate extraction, management, and telemetry matching.""" from __future__ import annotations import dataclasses import json import re import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any from app.db import get_conn, resolve_tenant_id import yaml # --------------------------------------------------------------------------- # Data models # --------------------------------------------------------------------------- @dataclasses.dataclass(frozen=True) class TelemetryRule: name: str domains: tuple[str, ...] category: str description: str @dataclasses.dataclass class BlocklistCandidate: id: str domain_or_ip: str source_device_ip: str | None source_device_name: str | None first_seen: str last_seen: str hit_count: int status: str pushed_at: str | None log_evidence: list[str] matched_rule: str | None llm_score: float | None llm_reason: str | None # --------------------------------------------------------------------------- # Telemetry list # --------------------------------------------------------------------------- def load_telemetry_rules(path: Path) -> list[TelemetryRule]: """Load telemetry rules from a YAML file.""" data = yaml.safe_load(path.read_text()) return [ TelemetryRule( name=r["name"], domains=tuple(d.lower().strip(".") for d in r["domains"]), category=r["category"], description=r.get("description", ""), ) for r in data.get("rules", []) ] def matches_telemetry(domain: str, rules: list[TelemetryRule]) -> TelemetryRule | None: """Return the first rule whose domains include domain or a parent domain, else None.""" d = domain.lower().strip(".") for rule in rules: for rd in rule.domains: if d == rd or d.endswith("." + rd): return rule return None # --------------------------------------------------------------------------- # Regex extractors for router log entries # --------------------------------------------------------------------------- _DNSMASQ_RE = re.compile( r"query\[A{1,4}\]\s+(?P\S+)\s+from\s+(?P[\d.]+)" ) _IPTABLES_RE = re.compile( r"SRC=(?P[\d.]+).*?DST=(?P[\d.a-zA-Z.-]+)" ) _VALID_STATUSES = {"pending", "approved", "rejected", "pushed", "unblocked"} # --------------------------------------------------------------------------- # DB helpers # --------------------------------------------------------------------------- def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _row_to_candidate(row: Any) -> BlocklistCandidate: return BlocklistCandidate( id=row["id"], domain_or_ip=row["domain_or_ip"], source_device_ip=row["source_device_ip"], source_device_name=row["source_device_name"], first_seen=row["first_seen"], last_seen=row["last_seen"], hit_count=row["hit_count"], status=row["status"], pushed_at=row["pushed_at"], log_evidence=json.loads(row["log_evidence"] or "[]"), matched_rule=row["matched_rule"], llm_score=row["llm_score"], llm_reason=row["llm_reason"], ) def _upsert_candidate( conn: Any, domain_or_ip: str, source_device_ip: str | None, source_device_name: str | None, matched_rule: str | None, entry_id: str, now: str, ) -> bool: """Insert or update a candidate. Returns True if a new row was created.""" tid = resolve_tenant_id() row = conn.execute( "SELECT id, hit_count, log_evidence FROM blocklist_candidates " "WHERE domain_or_ip = ? AND source_device_ip IS ? AND (tenant_id = ? OR tenant_id = '')", (domain_or_ip, source_device_ip, tid), ).fetchone() if row is None: conn.execute( """INSERT INTO blocklist_candidates (id, tenant_id, domain_or_ip, source_device_ip, source_device_name, first_seen, last_seen, hit_count, status, pushed_at, log_evidence, matched_rule) VALUES (?, ?, ?, ?, ?, ?, ?, 1, 'pending', NULL, ?, ?)""", ( str(uuid.uuid4()), tid, domain_or_ip, source_device_ip, source_device_name, now, now, json.dumps([entry_id]), matched_rule, ), ) return True existing_id = row["id"] hit_count = row["hit_count"] existing_evidence = row["log_evidence"] evidence = json.loads(existing_evidence or "[]") if entry_id not in evidence: evidence.append(entry_id) evidence = evidence[-10:] # cap at 10 conn.execute( "UPDATE blocklist_candidates SET last_seen=?, hit_count=?, log_evidence=? WHERE id=?", (now, hit_count + 1, json.dumps(evidence), existing_id), ) return False # --------------------------------------------------------------------------- # Extraction scan # --------------------------------------------------------------------------- def run_scan( db_path: Path, router_source_ids: list[str], device_map: dict[str, str], telemetry_rules: list[TelemetryRule], ) -> int: """Scan log_entries from router sources, upsert blocklist candidates. Only entries whose source IP is in device_map are recorded. Returns the total number of rows created or updated. """ if not router_source_ids or not device_map: return 0 placeholders = ",".join("?" for _ in router_source_ids) now = _now_iso() count = 0 tid = resolve_tenant_id() with get_conn(db_path) as conn: rows = conn.execute( f"SELECT id, text FROM log_entries WHERE source_id IN ({placeholders}) AND (tenant_id = ? OR tenant_id = '')", # noqa: S608 (*router_source_ids, tid), ).fetchall() for row in rows: entry_id, text = row["id"], row["text"] # rest of loop body follows unchanged src_ip: str | None = None dst: str | None = None m = _DNSMASQ_RE.search(text) if m: src_ip = m.group("src") dst = m.group("domain") else: m = _IPTABLES_RE.search(text) if m: src_ip = m.group("src") dst = m.group("dst") if src_ip is None or src_ip not in device_map: continue device_name = device_map[src_ip] rule = matches_telemetry(dst, telemetry_rules) if dst else None matched_rule_name = rule.name if rule else None _upsert_candidate(conn, dst or "unknown", src_ip, device_name, matched_rule_name, entry_id, now) count += 1 conn.commit() return count # --------------------------------------------------------------------------- # Candidate CRUD # --------------------------------------------------------------------------- _CANDIDATE_SELECT = ( "SELECT id,domain_or_ip,source_device_ip,source_device_name," "first_seen,last_seen,hit_count,status,pushed_at,log_evidence," "matched_rule,llm_score,llm_reason FROM blocklist_candidates" ) def list_candidates( db_path: Path, status: str | None = None, device_ip: str | None = None, ) -> list[BlocklistCandidate]: tid = resolve_tenant_id() conditions = ["(tenant_id = ? OR tenant_id = '')"] params: list = [tid] if status and status != "all": conditions.append("status = ?") params.append(status) if device_ip: conditions.append("source_device_ip = ?") params.append(device_ip) where = " AND ".join(conditions) with get_conn(db_path) as conn: rows = conn.execute( f"{_CANDIDATE_SELECT} WHERE {where} ORDER BY last_seen DESC", # noqa: S608 params, ).fetchall() return [_row_to_candidate(r) for r in rows] def _get_candidate(conn: Any, candidate_id: str) -> BlocklistCandidate: row = conn.execute( f"{_CANDIDATE_SELECT} WHERE id=?", # noqa: S608 (candidate_id,), ).fetchone() if row is None: raise KeyError(f"Candidate {candidate_id!r} not found") return _row_to_candidate(row) def get_candidate(db_path: Path, candidate_id: str) -> BlocklistCandidate: """Fetch a single candidate by ID. Raises KeyError if not found.""" with get_conn(db_path) as conn: return _get_candidate(conn, candidate_id) def update_candidate_status(db_path: Path, candidate_id: str, new_status: str) -> BlocklistCandidate: if new_status not in _VALID_STATUSES: raise ValueError(f"Invalid status {new_status!r}. Must be one of {_VALID_STATUSES}") with get_conn(db_path) as conn: conn.execute("UPDATE blocklist_candidates SET status=? WHERE id=?", (new_status, candidate_id)) conn.commit() return _get_candidate(conn, candidate_id) def mark_pushed(db_path: Path, candidate_id: str) -> BlocklistCandidate: with get_conn(db_path) as conn: conn.execute( "UPDATE blocklist_candidates SET status='pushed', pushed_at=? WHERE id=?", (_now_iso(), candidate_id), ) conn.commit() return _get_candidate(conn, candidate_id) def mark_unblocked(db_path: Path, candidate_id: str) -> BlocklistCandidate: with get_conn(db_path) as conn: conn.execute("UPDATE blocklist_candidates SET status='unblocked' WHERE id=?", (candidate_id,)) conn.commit() return _get_candidate(conn, candidate_id)