turnstone/app/services/blocklist.py
pyr0ball e543ab70f7 feat: dual-backend SQLite/Postgres + multi-tenant source namespacing
- Add app/db/ abstraction layer: Backend enum, DbConn wrapper,
  dialect helper (q() for ? vs %s paramstyle), get_conn(), tenant_id()
- Auto-detect backend from DATABASE_URL; SQLite remains default when
  unset — no config change for local deployments
- Add tenant_id column to all three logical DBs (main, context, incidents);
  idempotent ALTER TABLE migration runs before schema scripts on existing DBs
- All INSERTs inject tenant_id; SELECTs use (tenant_id = ? OR tenant_id = '')
  for backward compat with pre-namespacing rows
- Add docker-compose.yml with named volume turnstone_pgdata (survives rebuilds)
  and optional external Postgres support via DATABASE_URL override
- Add scripts/migrate_sqlite_to_postgres.py — one-shot idempotent migration
  for existing SQLite data; ON CONFLICT DO NOTHING for safe re-runs
- Fix SSH glean path in pipeline.py to use ensure_schema + get_conn
  (was still using raw sqlite3.connect + old _SCHEMA without tenant_id)
- Fix FTS5 JOIN ambiguity: qualify repeat_count as f.repeat_count in search
- Update all tests to use ensure_*_schema fixtures; add row_factory where needed
- 394/394 tests passing

Closes: #42
Closes: #50
2026-06-08 08:37:54 -07:00

291 lines
9.4 KiB
Python

"""Blocklist candidate extraction, management, and telemetry matching."""
from __future__ import annotations
import dataclasses
import json
import re
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from app.db import get_conn, resolve_tenant_id
import yaml
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclasses.dataclass(frozen=True)
class TelemetryRule:
name: str
domains: tuple[str, ...]
category: str
description: str
@dataclasses.dataclass
class BlocklistCandidate:
id: str
domain_or_ip: str
source_device_ip: str | None
source_device_name: str | None
first_seen: str
last_seen: str
hit_count: int
status: str
pushed_at: str | None
log_evidence: list[str]
matched_rule: str | None
llm_score: float | None
llm_reason: str | None
# ---------------------------------------------------------------------------
# Telemetry list
# ---------------------------------------------------------------------------
def load_telemetry_rules(path: Path) -> list[TelemetryRule]:
"""Load telemetry rules from a YAML file."""
data = yaml.safe_load(path.read_text())
return [
TelemetryRule(
name=r["name"],
domains=tuple(d.lower().strip(".") for d in r["domains"]),
category=r["category"],
description=r.get("description", ""),
)
for r in data.get("rules", [])
]
def matches_telemetry(domain: str, rules: list[TelemetryRule]) -> TelemetryRule | None:
"""Return the first rule whose domains include domain or a parent domain, else None."""
d = domain.lower().strip(".")
for rule in rules:
for rd in rule.domains:
if d == rd or d.endswith("." + rd):
return rule
return None
# ---------------------------------------------------------------------------
# Regex extractors for router log entries
# ---------------------------------------------------------------------------
_DNSMASQ_RE = re.compile(
r"query\[A{1,4}\]\s+(?P<domain>\S+)\s+from\s+(?P<src>[\d.]+)"
)
_IPTABLES_RE = re.compile(
r"SRC=(?P<src>[\d.]+).*?DST=(?P<dst>[\d.a-zA-Z.-]+)"
)
_VALID_STATUSES = {"pending", "approved", "rejected", "pushed", "unblocked"}
# ---------------------------------------------------------------------------
# DB helpers
# ---------------------------------------------------------------------------
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _row_to_candidate(row: Any) -> BlocklistCandidate:
return BlocklistCandidate(
id=row["id"],
domain_or_ip=row["domain_or_ip"],
source_device_ip=row["source_device_ip"],
source_device_name=row["source_device_name"],
first_seen=row["first_seen"],
last_seen=row["last_seen"],
hit_count=row["hit_count"],
status=row["status"],
pushed_at=row["pushed_at"],
log_evidence=json.loads(row["log_evidence"] or "[]"),
matched_rule=row["matched_rule"],
llm_score=row["llm_score"],
llm_reason=row["llm_reason"],
)
def _upsert_candidate(
conn: Any,
domain_or_ip: str,
source_device_ip: str | None,
source_device_name: str | None,
matched_rule: str | None,
entry_id: str,
now: str,
) -> bool:
"""Insert or update a candidate. Returns True if a new row was created."""
tid = resolve_tenant_id()
row = conn.execute(
"SELECT id, hit_count, log_evidence FROM blocklist_candidates "
"WHERE domain_or_ip = ? AND source_device_ip IS ? AND (tenant_id = ? OR tenant_id = '')",
(domain_or_ip, source_device_ip, tid),
).fetchone()
if row is None:
conn.execute(
"""INSERT INTO blocklist_candidates
(id, tenant_id, domain_or_ip, source_device_ip, source_device_name,
first_seen, last_seen, hit_count, status, pushed_at, log_evidence, matched_rule)
VALUES (?, ?, ?, ?, ?, ?, ?, 1, 'pending', NULL, ?, ?)""",
(
str(uuid.uuid4()), tid, domain_or_ip, source_device_ip, source_device_name,
now, now, json.dumps([entry_id]), matched_rule,
),
)
return True
existing_id = row["id"]
hit_count = row["hit_count"]
existing_evidence = row["log_evidence"]
evidence = json.loads(existing_evidence or "[]")
if entry_id not in evidence:
evidence.append(entry_id)
evidence = evidence[-10:] # cap at 10
conn.execute(
"UPDATE blocklist_candidates SET last_seen=?, hit_count=?, log_evidence=? WHERE id=?",
(now, hit_count + 1, json.dumps(evidence), existing_id),
)
return False
# ---------------------------------------------------------------------------
# Extraction scan
# ---------------------------------------------------------------------------
def run_scan(
db_path: Path,
router_source_ids: list[str],
device_map: dict[str, str],
telemetry_rules: list[TelemetryRule],
) -> int:
"""Scan log_entries from router sources, upsert blocklist candidates.
Only entries whose source IP is in device_map are recorded.
Returns the total number of rows created or updated.
"""
if not router_source_ids or not device_map:
return 0
placeholders = ",".join("?" for _ in router_source_ids)
now = _now_iso()
count = 0
tid = resolve_tenant_id()
with get_conn(db_path) as conn:
rows = conn.execute(
f"SELECT id, text FROM log_entries WHERE source_id IN ({placeholders}) AND (tenant_id = ? OR tenant_id = '')", # noqa: S608
(*router_source_ids, tid),
).fetchall()
for row in rows:
entry_id, text = row["id"], row["text"]
# rest of loop body follows unchanged
src_ip: str | None = None
dst: str | None = None
m = _DNSMASQ_RE.search(text)
if m:
src_ip = m.group("src")
dst = m.group("domain")
else:
m = _IPTABLES_RE.search(text)
if m:
src_ip = m.group("src")
dst = m.group("dst")
if src_ip is None or src_ip not in device_map:
continue
device_name = device_map[src_ip]
rule = matches_telemetry(dst, telemetry_rules) if dst else None
matched_rule_name = rule.name if rule else None
_upsert_candidate(conn, dst or "unknown", src_ip, device_name, matched_rule_name, entry_id, now)
count += 1
conn.commit()
return count
# ---------------------------------------------------------------------------
# Candidate CRUD
# ---------------------------------------------------------------------------
_CANDIDATE_SELECT = (
"SELECT id,domain_or_ip,source_device_ip,source_device_name,"
"first_seen,last_seen,hit_count,status,pushed_at,log_evidence,"
"matched_rule,llm_score,llm_reason FROM blocklist_candidates"
)
def list_candidates(
db_path: Path,
status: str | None = None,
device_ip: str | None = None,
) -> list[BlocklistCandidate]:
tid = resolve_tenant_id()
conditions = ["(tenant_id = ? OR tenant_id = '')"]
params: list = [tid]
if status and status != "all":
conditions.append("status = ?")
params.append(status)
if device_ip:
conditions.append("source_device_ip = ?")
params.append(device_ip)
where = " AND ".join(conditions)
with get_conn(db_path) as conn:
rows = conn.execute(
f"{_CANDIDATE_SELECT} WHERE {where} ORDER BY last_seen DESC", # noqa: S608
params,
).fetchall()
return [_row_to_candidate(r) for r in rows]
def _get_candidate(conn: Any, candidate_id: str) -> BlocklistCandidate:
row = conn.execute(
f"{_CANDIDATE_SELECT} WHERE id=?", # noqa: S608
(candidate_id,),
).fetchone()
if row is None:
raise KeyError(f"Candidate {candidate_id!r} not found")
return _row_to_candidate(row)
def get_candidate(db_path: Path, candidate_id: str) -> BlocklistCandidate:
"""Fetch a single candidate by ID. Raises KeyError if not found."""
with get_conn(db_path) as conn:
return _get_candidate(conn, candidate_id)
def update_candidate_status(db_path: Path, candidate_id: str, new_status: str) -> BlocklistCandidate:
if new_status not in _VALID_STATUSES:
raise ValueError(f"Invalid status {new_status!r}. Must be one of {_VALID_STATUSES}")
with get_conn(db_path) as conn:
conn.execute("UPDATE blocklist_candidates SET status=? WHERE id=?", (new_status, candidate_id))
conn.commit()
return _get_candidate(conn, candidate_id)
def mark_pushed(db_path: Path, candidate_id: str) -> BlocklistCandidate:
with get_conn(db_path) as conn:
conn.execute(
"UPDATE blocklist_candidates SET status='pushed', pushed_at=? WHERE id=?",
(_now_iso(), candidate_id),
)
conn.commit()
return _get_candidate(conn, candidate_id)
def mark_unblocked(db_path: Path, candidate_id: str) -> BlocklistCandidate:
with get_conn(db_path) as conn:
conn.execute("UPDATE blocklist_candidates SET status='unblocked' WHERE id=?", (candidate_id,))
conn.commit()
return _get_candidate(conn, candidate_id)