"""Tests for scripts/export_corpus.py""" from __future__ import annotations import json import sqlite3 import sys from pathlib import Path from unittest.mock import MagicMock, patch import pytest sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent / "scripts")) import export_corpus as ec # ── Helpers ──────────────────────────────────────────────────────────────────── def _make_db(tmp_path: Path) -> sqlite3.Connection: db = tmp_path / "turnstone.db" conn = sqlite3.connect(str(db)) conn.row_factory = sqlite3.Row conn.execute(""" CREATE TABLE log_entries ( id TEXT PRIMARY KEY, source_id TEXT, sequence INTEGER, timestamp_raw TEXT, timestamp_iso TEXT, ingest_time TEXT, severity TEXT, repeat_count INTEGER DEFAULT 1, out_of_order INTEGER DEFAULT 0, matched_patterns TEXT DEFAULT '[]', text TEXT ) """) conn.execute(""" CREATE TABLE incidents ( id TEXT PRIMARY KEY, label TEXT, issue_type TEXT, started_at TEXT, ended_at TEXT, notes TEXT, created_at TEXT, severity TEXT DEFAULT 'medium' ) """) conn.commit() return conn def _insert_entry(conn: sqlite3.Connection, rowid_hint: str, severity: str = "ERROR") -> None: conn.execute( "INSERT INTO log_entries VALUES (?,?,?,?,?,?,?,?,?,?,?)", (rowid_hint, "sonarr", 1, None, "2026-05-11T10:00:00", "2026-05-11T10:00:05", severity, 1, 0, '[]', f"Log line for {rowid_hint}"), ) conn.commit() def _insert_incident(conn: sqlite3.Connection, created_at: str) -> None: conn.execute( "INSERT INTO incidents VALUES (?,?,?,?,?,?,?,?)", (f"inc-{created_at}", "plex crash", "plex_crash", "2026-05-11T09:55:00", "2026-05-11T10:05:00", "audio stopped", created_at, "high"), ) conn.commit() # ── Watermark helpers ────────────────────────────────────────────────────────── def test_read_rowid_watermark_missing(tmp_path): db = tmp_path / "t.db" assert ec.read_rowid_watermark(db) == 0 def test_read_rowid_watermark_valid(tmp_path): db = tmp_path / "t.db" (tmp_path / "corpus_watermark.txt").write_text("42") assert ec.read_rowid_watermark(db) == 42 def test_read_rowid_watermark_corrupt(tmp_path): db = tmp_path / "t.db" (tmp_path / "corpus_watermark.txt").write_text("not-a-number") assert ec.read_rowid_watermark(db) == 0 def test_write_read_rowid_roundtrip(tmp_path): db = tmp_path / "t.db" ec.write_rowid_watermark(db, 99) assert ec.read_rowid_watermark(db) == 99 def test_read_ts_watermark_missing(tmp_path): db = tmp_path / "t.db" assert ec.read_ts_watermark(db) == "1970-01-01T00:00:00" def test_write_read_ts_roundtrip(tmp_path): db = tmp_path / "t.db" ec.write_ts_watermark(db, "2026-05-11T12:00:00") assert ec.read_ts_watermark(db) == "2026-05-11T12:00:00" # ── export_raw_entries ───────────────────────────────────────────────────────── def test_export_raw_entries_empty(tmp_path): conn = _make_db(tmp_path) db = tmp_path / "turnstone.db" posted = [] with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])): count = ec.export_raw_entries(conn, db, "http://avocet/", "tok") assert count == 0 assert posted == [] def test_export_raw_entries_skips_warn(tmp_path): conn = _make_db(tmp_path) db = tmp_path / "turnstone.db" _insert_entry(conn, "w1", severity="WARN") posted = [] with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])): count = ec.export_raw_entries(conn, db, "http://avocet/", "tok") assert count == 0 assert posted == [] def test_export_raw_entries_sends_errors(tmp_path): conn = _make_db(tmp_path) db = tmp_path / "turnstone.db" _insert_entry(conn, "e1", severity="ERROR") _insert_entry(conn, "e2", severity="CRITICAL") posted = [] with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])): count = ec.export_raw_entries(conn, db, "http://avocet/", "tok") assert count == 2 assert len(posted) == 1 payload = posted[0] assert payload["batch_type"] == "raw_entries" assert len(payload["entries"]) == 2 assert all(e["severity"] in ("ERROR", "CRITICAL") for e in payload["entries"]) def test_export_raw_entries_updates_watermark(tmp_path): conn = _make_db(tmp_path) db = tmp_path / "turnstone.db" _insert_entry(conn, "e1", severity="ERROR") with patch.object(ec, "post_batch"): ec.export_raw_entries(conn, db, "http://avocet/", "tok") assert ec.read_rowid_watermark(db) > 0 def test_export_raw_entries_respects_watermark(tmp_path): conn = _make_db(tmp_path) db = tmp_path / "turnstone.db" _insert_entry(conn, "e1", severity="ERROR") # set watermark past first entry with patch.object(ec, "post_batch"): ec.export_raw_entries(conn, db, "http://avocet/", "tok") # insert a second entry and export again _insert_entry(conn, "e2", severity="ERROR") posted = [] with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])): count = ec.export_raw_entries(conn, db, "http://avocet/", "tok") assert count == 1 assert len(posted[0]["entries"]) == 1 assert posted[0]["entries"][0]["entry_id"] == "e2" def test_export_raw_entries_no_watermark_update_on_failure(tmp_path): conn = _make_db(tmp_path) db = tmp_path / "turnstone.db" _insert_entry(conn, "e1", severity="ERROR") import httpx as _httpx mock_resp = MagicMock() mock_resp.status_code = 403 mock_resp.text = "Forbidden" exc = _httpx.HTTPStatusError("403", request=MagicMock(), response=mock_resp) with patch.object(ec, "post_batch", side_effect=exc): with pytest.raises(_httpx.HTTPStatusError): ec.export_raw_entries(conn, db, "http://avocet/", "tok") assert ec.read_rowid_watermark(db) == 0 # ── export_incidents ─────────────────────────────────────────────────────────── def test_export_incidents_empty(tmp_path): conn = _make_db(tmp_path) db = tmp_path / "turnstone.db" posted = [] with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])): count = ec.export_incidents(conn, db, "http://avocet/", "tok") assert count == 0 assert posted == [] def test_export_incidents_sends_payload(tmp_path): conn = _make_db(tmp_path) db = tmp_path / "turnstone.db" _insert_incident(conn, "2026-05-11T10:00:00") posted = [] with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])): count = ec.export_incidents(conn, db, "http://avocet/", "tok") assert count == 1 payload = posted[0] assert payload["batch_type"] == "incident_bundles" assert payload["entries"][0]["label"] == "plex crash" def test_export_incidents_updates_watermark(tmp_path): conn = _make_db(tmp_path) db = tmp_path / "turnstone.db" _insert_incident(conn, "2026-05-11T10:00:00") with patch.object(ec, "post_batch"): ec.export_incidents(conn, db, "http://avocet/", "tok") assert ec.read_ts_watermark(db) == "2026-05-11T10:00:00" # ── main ─────────────────────────────────────────────────────────────────────── def test_main_no_endpoint(monkeypatch): monkeypatch.setattr(ec, "CORPUS_ENDPOINT", "") assert ec.main() == 0 def test_main_no_token(monkeypatch): monkeypatch.setattr(ec, "CORPUS_ENDPOINT", "http://avocet/") monkeypatch.setattr(ec, "CONSENT_TOKEN", "") assert ec.main() == 1 def test_main_db_missing(monkeypatch, tmp_path): monkeypatch.setattr(ec, "CORPUS_ENDPOINT", "http://avocet/") monkeypatch.setattr(ec, "CONSENT_TOKEN", "tok") monkeypatch.setattr(ec, "DB_PATH", tmp_path / "missing.db") assert ec.main() == 1 def test_main_success(monkeypatch, tmp_path): conn = _make_db(tmp_path) conn.close() db = tmp_path / "turnstone.db" monkeypatch.setattr(ec, "CORPUS_ENDPOINT", "http://avocet/") monkeypatch.setattr(ec, "CONSENT_TOKEN", "tok") monkeypatch.setattr(ec, "DB_PATH", db) with patch.object(ec, "post_batch"): result = ec.main() assert result == 0