turnstone/tests/test_service_blocklist.py
pyr0ball aa80f307fe refactor: rename ingest → glean throughout codebase
Renames the app/ingest/ package to app/glean/ and updates all
references across Python modules, shell scripts, Vue components,
tests, and documentation.

Intentionally preserved:
- SQLite column name ingest_time (avoids schema migration)
- RetrievedEntry.ingest_time field (maps to the column above)
- Any public-facing JSON keys that reference ingest_time

Changes by category:
- app/ingest/ → app/glean/ (full package move, all parsers)
- app/tasks/ingest_scheduler.py → app/tasks/glean_scheduler.py
- scripts/ingest_corpus.py → scripts/glean_corpus.py
- tests/test_ingest_*.py → tests/test_glean_*.py
- Docstrings, log messages, comments: ingest → glean
- Env var: TURNSTONE_INGEST_INTERVAL → TURNSTONE_GLEAN_INTERVAL
- Shell scripts: glean.log, glean_corpus.py references
- README.md: multi-source ingest → multi-source glean
- .env.example: updated env var name
- patterns/: new diagnostic patterns from 2026-05-20 SSH incident
  (service_crash_loop, pkg_daemon_restart, ssh_forward_conflict)
- SourcesView.vue: pipeline label updated
- All test import paths updated to app.glean.*

285 tests passing.
2026-05-20 23:02:55 -07:00

254 lines
9.7 KiB
Python

"""Tests for blocklist service — schema, extraction, candidate management."""
from __future__ import annotations
import sqlite3
import pytest
from pathlib import Path
class TestSchema:
def test_blocklist_candidates_table_exists(self, tmp_path):
from app.glean.pipeline import ensure_schema
db = tmp_path / "test.db"
ensure_schema(db)
conn = sqlite3.connect(str(db))
tables = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
assert "blocklist_candidates" in tables
def test_blocklist_candidates_columns(self, tmp_path):
from app.glean.pipeline import ensure_schema
db = tmp_path / "test.db"
ensure_schema(db)
conn = sqlite3.connect(str(db))
cols = {r[1] for r in conn.execute("PRAGMA table_info(blocklist_candidates)").fetchall()}
assert cols >= {
"id", "domain_or_ip", "source_device_ip", "source_device_name",
"first_seen", "last_seen", "hit_count", "status", "pushed_at",
"log_evidence", "matched_rule", "llm_score", "llm_reason",
}
def test_status_default_is_pending(self, tmp_path):
from app.glean.pipeline import ensure_schema
import uuid
db = tmp_path / "test.db"
ensure_schema(db)
conn = sqlite3.connect(str(db))
conn.execute(
"INSERT INTO blocklist_candidates (id, domain_or_ip, first_seen, last_seen) VALUES (?, ?, ?, ?)",
(str(uuid.uuid4()), "samsungads.com", "2026-05-14T00:00:00+00:00", "2026-05-14T00:00:00+00:00"),
)
conn.commit()
row = conn.execute("SELECT status, hit_count FROM blocklist_candidates").fetchone()
assert row[0] == "pending"
assert row[1] == 1
class TestTelemetry:
def _rules(self):
from app.services.blocklist import load_telemetry_rules
yaml_path = Path(__file__).parent.parent / "patterns" / "telemetry.yaml"
return load_telemetry_rules(yaml_path)
def test_load_returns_rules(self):
rules = self._rules()
assert len(rules) >= 3
def test_samsung_rule_present(self):
rules = self._rules()
names = [r.name for r in rules]
assert "samsung_ads" in names
def test_exact_domain_match(self):
from app.services.blocklist import matches_telemetry
rules = self._rules()
result = matches_telemetry("samsungads.com", rules)
assert result is not None
assert result.name == "samsung_ads"
def test_subdomain_match(self):
from app.services.blocklist import matches_telemetry
rules = self._rules()
result = matches_telemetry("sub.samsungads.com", rules)
assert result is not None
assert result.name == "samsung_ads"
def test_no_match_returns_none(self):
from app.services.blocklist import matches_telemetry
rules = self._rules()
result = matches_telemetry("google.com", rules)
assert result is None
def test_belkin_match(self):
from app.services.blocklist import matches_telemetry
rules = self._rules()
result = matches_telemetry("api.xbcs.net", rules)
assert result is not None
assert result.category == "belkin"
class TestExtraction:
@pytest.fixture
def db(self, tmp_path):
from app.glean.pipeline import ensure_schema
p = tmp_path / "test.db"
ensure_schema(p)
return p
@pytest.fixture
def rules(self):
from app.services.blocklist import load_telemetry_rules
return load_telemetry_rules(
Path(__file__).parent.parent / "patterns" / "telemetry.yaml"
)
def test_dnsmasq_entry_extracted(self, db, rules):
import sqlite3
from app.services.blocklist import run_scan
conn = sqlite3.connect(str(db))
conn.execute(
"""INSERT INTO log_entries (id, source_id, sequence, ingest_time, text)
VALUES ('e1', 'router:syslog', 1, '2026-05-14T00:00:00+00:00',
'dnsmasq[123]: query[A] samsungads.com from 192.168.1.45')"""
)
conn.commit()
conn.close()
count = run_scan(
db,
router_source_ids=["router:syslog"],
device_map={"192.168.1.45": "Samsung Projector"},
telemetry_rules=rules,
)
assert count >= 1
conn = sqlite3.connect(str(db))
row = conn.execute(
"SELECT domain_or_ip, source_device_name, matched_rule, status FROM blocklist_candidates"
).fetchone()
conn.close()
assert row[0] == "samsungads.com"
assert row[1] == "Samsung Projector"
assert row[2] == "samsung_ads"
assert row[3] == "pending"
def test_iptables_entry_extracted(self, db, rules):
import sqlite3
from app.services.blocklist import run_scan
conn = sqlite3.connect(str(db))
conn.execute(
"""INSERT INTO log_entries (id, source_id, sequence, ingest_time, text)
VALUES ('e2', 'router:fw', 1, '2026-05-14T00:00:00+00:00',
'kernel: FORWARD SRC=192.168.1.67 DST=52.11.243.144 PROTO=TCP DPT=443')"""
)
conn.commit()
conn.close()
count = run_scan(
db,
router_source_ids=["router:fw"],
device_map={"192.168.1.67": "Belkin Switch 1"},
telemetry_rules=rules,
)
assert count >= 1
conn = sqlite3.connect(str(db))
row = conn.execute("SELECT domain_or_ip, source_device_name FROM blocklist_candidates").fetchone()
conn.close()
assert row[0] == "52.11.243.144"
assert row[1] == "Belkin Switch 1"
def test_unknown_device_skipped(self, db, rules):
import sqlite3
from app.services.blocklist import run_scan
conn = sqlite3.connect(str(db))
conn.execute(
"""INSERT INTO log_entries (id, source_id, sequence, ingest_time, text)
VALUES ('e3', 'router:syslog', 1, '2026-05-14T00:00:00+00:00',
'dnsmasq[123]: query[A] samsungads.com from 10.0.0.99')"""
)
conn.commit()
conn.close()
count = run_scan(
db,
router_source_ids=["router:syslog"],
device_map={"192.168.1.45": "Samsung Projector"},
telemetry_rules=rules,
)
assert count == 0
def test_dedup_upsert_increments_hit_count(self, db, rules):
import sqlite3
from app.services.blocklist import run_scan
conn = sqlite3.connect(str(db))
for i in range(3):
conn.execute(
f"""INSERT INTO log_entries (id, source_id, sequence, ingest_time, text)
VALUES ('e{i}', 'router:syslog', {i}, '2026-05-14T00:00:00+00:00',
'dnsmasq[123]: query[A] samsungads.com from 192.168.1.45')"""
)
conn.commit()
conn.close()
run_scan(db, ["router:syslog"], {"192.168.1.45": "Projector"}, rules)
conn = sqlite3.connect(str(db))
rows = conn.execute("SELECT hit_count FROM blocklist_candidates").fetchall()
conn.close()
assert len(rows) == 1 # one row, not three
assert rows[0][0] == 3
class TestCandidateManagement:
@pytest.fixture
def db_with_candidate(self, tmp_path):
from app.glean.pipeline import ensure_schema
import sqlite3, uuid
db = tmp_path / "test.db"
ensure_schema(db)
conn = sqlite3.connect(str(db))
cid = str(uuid.uuid4())
conn.execute(
"""INSERT INTO blocklist_candidates
(id, domain_or_ip, first_seen, last_seen)
VALUES (?, 'samsungads.com', '2026-05-14T00:00:00+00:00', '2026-05-14T00:00:00+00:00')""",
(cid,),
)
conn.commit()
conn.close()
return db, cid
def test_list_candidates_returns_all(self, db_with_candidate):
from app.services.blocklist import list_candidates
db, _ = db_with_candidate
results = list_candidates(db)
assert len(results) == 1
assert results[0].domain_or_ip == "samsungads.com"
def test_list_candidates_filter_by_status(self, db_with_candidate):
from app.services.blocklist import list_candidates
db, _ = db_with_candidate
assert len(list_candidates(db, status="pending")) == 1
assert len(list_candidates(db, status="pushed")) == 0
def test_update_status_to_approved(self, db_with_candidate):
from app.services.blocklist import update_candidate_status, list_candidates
db, cid = db_with_candidate
candidate = update_candidate_status(db, cid, "approved")
assert candidate.status == "approved"
assert list_candidates(db, status="approved")[0].status == "approved"
def test_update_status_invalid_raises(self, db_with_candidate):
from app.services.blocklist import update_candidate_status
db, cid = db_with_candidate
with pytest.raises(ValueError, match="Invalid status"):
update_candidate_status(db, cid, "hacked")
def test_mark_pushed_sets_status_and_timestamp(self, db_with_candidate):
from app.services.blocklist import update_candidate_status, mark_pushed
db, cid = db_with_candidate
update_candidate_status(db, cid, "approved")
candidate = mark_pushed(db, cid)
assert candidate.status == "pushed"
assert candidate.pushed_at is not None
def test_mark_unblocked(self, db_with_candidate):
from app.services.blocklist import update_candidate_status, mark_pushed, mark_unblocked
db, cid = db_with_candidate
update_candidate_status(db, cid, "approved")
mark_pushed(db, cid)
candidate = mark_unblocked(db, cid)
assert candidate.status == "unblocked"