feat(blocklist): blocklist_candidates schema + tests
Add blocklist_candidates table and indexes to _SCHEMA in pipeline.py. Add TestSchema tests verifying table existence, column set, and status/hit_count defaults. All 193 tests pass.
This commit is contained in:
parent
9c3f494708
commit
bbb4605829
2 changed files with 62 additions and 0 deletions
|
|
@ -91,6 +91,25 @@ CREATE TABLE IF NOT EXISTS context_chunks (
|
||||||
embedding BLOB
|
embedding BLOB
|
||||||
);
|
);
|
||||||
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
|
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS blocklist_candidates (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
domain_or_ip TEXT NOT NULL,
|
||||||
|
source_device_ip TEXT,
|
||||||
|
source_device_name TEXT,
|
||||||
|
first_seen TEXT NOT NULL,
|
||||||
|
last_seen TEXT NOT NULL,
|
||||||
|
hit_count INTEGER DEFAULT 1,
|
||||||
|
status TEXT DEFAULT 'pending',
|
||||||
|
pushed_at TEXT,
|
||||||
|
log_evidence TEXT DEFAULT '[]',
|
||||||
|
matched_rule TEXT,
|
||||||
|
llm_score REAL,
|
||||||
|
llm_reason TEXT
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
43
tests/test_service_blocklist.py
Normal file
43
tests/test_service_blocklist.py
Normal file
|
|
@ -0,0 +1,43 @@
|
||||||
|
"""Tests for blocklist service — schema, extraction, candidate management."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import sqlite3
|
||||||
|
import pytest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
class TestSchema:
|
||||||
|
def test_blocklist_candidates_table_exists(self, tmp_path):
|
||||||
|
from app.ingest.pipeline import ensure_schema
|
||||||
|
db = tmp_path / "test.db"
|
||||||
|
ensure_schema(db)
|
||||||
|
conn = sqlite3.connect(str(db))
|
||||||
|
tables = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
|
||||||
|
assert "blocklist_candidates" in tables
|
||||||
|
|
||||||
|
def test_blocklist_candidates_columns(self, tmp_path):
|
||||||
|
from app.ingest.pipeline import ensure_schema
|
||||||
|
db = tmp_path / "test.db"
|
||||||
|
ensure_schema(db)
|
||||||
|
conn = sqlite3.connect(str(db))
|
||||||
|
cols = {r[1] for r in conn.execute("PRAGMA table_info(blocklist_candidates)").fetchall()}
|
||||||
|
assert cols >= {
|
||||||
|
"id", "domain_or_ip", "source_device_ip", "source_device_name",
|
||||||
|
"first_seen", "last_seen", "hit_count", "status", "pushed_at",
|
||||||
|
"log_evidence", "matched_rule", "llm_score", "llm_reason",
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_status_default_is_pending(self, tmp_path):
|
||||||
|
from app.ingest.pipeline import ensure_schema
|
||||||
|
import uuid
|
||||||
|
db = tmp_path / "test.db"
|
||||||
|
ensure_schema(db)
|
||||||
|
conn = sqlite3.connect(str(db))
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO blocklist_candidates (id, domain_or_ip, first_seen, last_seen) VALUES (?, ?, ?, ?)",
|
||||||
|
(str(uuid.uuid4()), "samsungads.com", "2026-05-14T00:00:00+00:00", "2026-05-14T00:00:00+00:00"),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
row = conn.execute("SELECT status, hit_count FROM blocklist_candidates").fetchone()
|
||||||
|
assert row[0] == "pending"
|
||||||
|
assert row[1] == 1
|
||||||
Loading…
Reference in a new issue