feat(blocklist): blocklist_candidates schema + tests

Add blocklist_candidates table and indexes to _SCHEMA in pipeline.py.
Add TestSchema tests verifying table existence, column set, and status/hit_count defaults.
All 193 tests pass.
This commit is contained in:
pyr0ball 2026-05-15 20:51:00 -07:00
parent 135dd02423
commit 4d7c436721
2 changed files with 62 additions and 0 deletions

View file

@ -91,6 +91,25 @@ CREATE TABLE IF NOT EXISTS context_chunks (
embedding BLOB embedding BLOB
); );
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id); CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
CREATE TABLE IF NOT EXISTS blocklist_candidates (
id TEXT PRIMARY KEY,
domain_or_ip TEXT NOT NULL,
source_device_ip TEXT,
source_device_name TEXT,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL,
hit_count INTEGER DEFAULT 1,
status TEXT DEFAULT 'pending',
pushed_at TEXT,
log_evidence TEXT DEFAULT '[]',
matched_rule TEXT,
llm_score REAL,
llm_reason TEXT
);
CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip);
CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status);
CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip);
""" """

View file

@ -0,0 +1,43 @@
"""Tests for blocklist service — schema, extraction, candidate management."""
from __future__ import annotations
import sqlite3
import pytest
from pathlib import Path
class TestSchema:
def test_blocklist_candidates_table_exists(self, tmp_path):
from app.ingest.pipeline import ensure_schema
db = tmp_path / "test.db"
ensure_schema(db)
conn = sqlite3.connect(str(db))
tables = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
assert "blocklist_candidates" in tables
def test_blocklist_candidates_columns(self, tmp_path):
from app.ingest.pipeline import ensure_schema
db = tmp_path / "test.db"
ensure_schema(db)
conn = sqlite3.connect(str(db))
cols = {r[1] for r in conn.execute("PRAGMA table_info(blocklist_candidates)").fetchall()}
assert cols >= {
"id", "domain_or_ip", "source_device_ip", "source_device_name",
"first_seen", "last_seen", "hit_count", "status", "pushed_at",
"log_evidence", "matched_rule", "llm_score", "llm_reason",
}
def test_status_default_is_pending(self, tmp_path):
from app.ingest.pipeline import ensure_schema
import uuid
db = tmp_path / "test.db"
ensure_schema(db)
conn = sqlite3.connect(str(db))
conn.execute(
"INSERT INTO blocklist_candidates (id, domain_or_ip, first_seen, last_seen) VALUES (?, ?, ?, ?)",
(str(uuid.uuid4()), "samsungads.com", "2026-05-14T00:00:00+00:00", "2026-05-14T00:00:00+00:00"),
)
conn.commit()
row = conn.execute("SELECT status, hit_count FROM blocklist_candidates").fetchone()
assert row[0] == "pending"
assert row[1] == 1