From b2011d18b673ee578a824d3581c15df888d68617 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 11 May 2026 17:08:35 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20periodic=20corpus=20export=20=E2=80=94?= =?UTF-8?q?=20push=20ERROR/CRITICAL=20entries=20and=20incidents=20to=20Avo?= =?UTF-8?q?cet?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Watermark-based batch export script (scripts/export_corpus.py) pushes up to 500 ERROR/CRITICAL entries and labeled incidents per run to AVOCET_CORPUS_ENDPOINT. Uses SQLite rowid watermark (entry log) and ISO timestamp watermark (incidents). Skips silently when AVOCET_CORPUS_ENDPOINT is not set. 19 tests. Closes turnstone#6. --- scripts/export_corpus.py | 194 ++++++++++++++++++++++++++ tests/test_export_corpus.py | 263 ++++++++++++++++++++++++++++++++++++ 2 files changed, 457 insertions(+) create mode 100644 scripts/export_corpus.py create mode 100644 tests/test_export_corpus.py diff --git a/scripts/export_corpus.py b/scripts/export_corpus.py new file mode 100644 index 0000000..f8052d8 --- /dev/null +++ b/scripts/export_corpus.py @@ -0,0 +1,194 @@ +"""Export ERROR/CRITICAL log entries and labeled incidents to Avocet corpus endpoint. + +Run periodically alongside ingest_corpus.py (same cron schedule recommended). + +Watermarks are stored as plain text files next to the DB: + corpus_watermark.txt — last exported log_entries rowid + incident_watermark.txt — last exported incident created_at timestamp + +Required env vars: + AVOCET_CORPUS_ENDPOINT URL to POST batches to + AVOCET_CONSENT_TOKEN Per-node consent token (issued by CF) + +Optional env vars: + TURNSTONE_DB Path to turnstone.db (default: /data/turnstone.db) + TURNSTONE_SOURCE_HOST Node identifier (default: system hostname) + +Exit codes: + 0 — success (including no-op when endpoint not configured) + 1 — configuration error or failed POST +""" +from __future__ import annotations + +import json +import logging +import os +import socket +import sqlite3 +import sys +import uuid +from datetime import datetime, timezone +from pathlib import Path + +import httpx + +logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") +log = logging.getLogger(__name__) + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +BATCH_LIMIT = 500 +BATCH_VERSION = 1 + +DB_PATH = Path(os.environ.get("TURNSTONE_DB", "/data/turnstone.db")) +CORPUS_ENDPOINT = os.environ.get("AVOCET_CORPUS_ENDPOINT", "") +CONSENT_TOKEN = os.environ.get("AVOCET_CONSENT_TOKEN", "") +SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", socket.gethostname()) + + +def _watermark_path(db_path: Path, name: str) -> Path: + return db_path.parent / name + + +def read_rowid_watermark(db_path: Path) -> int: + path = _watermark_path(db_path, "corpus_watermark.txt") + if path.exists(): + try: + return int(path.read_text().strip()) + except (ValueError, OSError): + pass + return 0 + + +def write_rowid_watermark(db_path: Path, rowid: int) -> None: + _watermark_path(db_path, "corpus_watermark.txt").write_text(str(rowid)) + + +def read_ts_watermark(db_path: Path) -> str: + path = _watermark_path(db_path, "incident_watermark.txt") + if path.exists(): + return path.read_text().strip() or "1970-01-01T00:00:00" + return "1970-01-01T00:00:00" + + +def write_ts_watermark(db_path: Path, ts: str) -> None: + _watermark_path(db_path, "incident_watermark.txt").write_text(ts) + + +def post_batch(endpoint: str, token: str, payload: dict) -> None: + resp = httpx.post( + endpoint, + json=payload, + headers={"Authorization": f"Bearer {token}"}, + timeout=30.0, + ) + resp.raise_for_status() + + +def export_raw_entries(conn: sqlite3.Connection, db_path: Path, endpoint: str, token: str) -> int: + last_rowid = read_rowid_watermark(db_path) + rows = conn.execute( + "SELECT rowid, id, source_id, timestamp_iso, ingest_time, severity, matched_patterns, text " + "FROM log_entries " + "WHERE severity IN ('ERROR', 'CRITICAL') AND rowid > ? " + "ORDER BY rowid LIMIT ?", + (last_rowid, BATCH_LIMIT), + ).fetchall() + + if not rows: + log.info("No new ERROR/CRITICAL entries since rowid %d", last_rowid) + return 0 + + entries = [ + { + "entry_id": row["id"], + "source_id": row["source_id"], + "timestamp_iso": row["timestamp_iso"], + "ingest_time": row["ingest_time"], + "severity": row["severity"], + "matched_patterns": json.loads(row["matched_patterns"] or "[]"), + "text": row["text"], + } + for row in rows + ] + new_watermark = rows[-1]["rowid"] + + payload = { + "batch_version": BATCH_VERSION, + "batch_id": str(uuid.uuid4()), + "pushed_at": datetime.now(timezone.utc).isoformat(), + "source_host": SOURCE_HOST, + "batch_type": "raw_entries", + "watermark_from": last_rowid, + "watermark_to": new_watermark, + "entries": entries, + } + + post_batch(endpoint, token, payload) + write_rowid_watermark(db_path, new_watermark) + log.info("Exported %d entries (rowid %d → %d)", len(rows), last_rowid, new_watermark) + return len(rows) + + +def export_incidents(conn: sqlite3.Connection, db_path: Path, endpoint: str, token: str) -> int: + last_ts = read_ts_watermark(db_path) + rows = conn.execute( + "SELECT id, label, issue_type, started_at, ended_at, notes, created_at, severity " + "FROM incidents WHERE created_at > ? ORDER BY created_at LIMIT ?", + (last_ts, BATCH_LIMIT), + ).fetchall() + + if not rows: + log.info("No new incidents since %s", last_ts) + return 0 + + incidents = [dict(row) for row in rows] + new_watermark = rows[-1]["created_at"] + + payload = { + "batch_version": BATCH_VERSION, + "batch_id": str(uuid.uuid4()), + "pushed_at": datetime.now(timezone.utc).isoformat(), + "source_host": SOURCE_HOST, + "batch_type": "incident_bundles", + "watermark_from": last_ts, + "watermark_to": new_watermark, + "entries": incidents, + } + + post_batch(endpoint, token, payload) + write_ts_watermark(db_path, new_watermark) + log.info("Exported %d incidents (up to %s)", len(rows), new_watermark) + return len(rows) + + +def main() -> int: + if not CORPUS_ENDPOINT: + log.info("AVOCET_CORPUS_ENDPOINT not set — skipping corpus export") + return 0 + if not CONSENT_TOKEN: + log.error("AVOCET_CONSENT_TOKEN not set") + return 1 + if not DB_PATH.exists(): + log.error("DB not found: %s", DB_PATH) + return 1 + + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + try: + entry_count = export_raw_entries(conn, DB_PATH, CORPUS_ENDPOINT, CONSENT_TOKEN) + incident_count = export_incidents(conn, DB_PATH, CORPUS_ENDPOINT, CONSENT_TOKEN) + log.info("Done — %d entries, %d incidents exported", entry_count, incident_count) + return 0 + except httpx.HTTPStatusError as exc: + log.error("HTTP %s from Avocet: %s", exc.response.status_code, exc.response.text[:200]) + return 1 + except Exception as exc: + log.error("Export failed: %s", exc) + return 1 + finally: + conn.close() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_export_corpus.py b/tests/test_export_corpus.py new file mode 100644 index 0000000..1b1c1be --- /dev/null +++ b/tests/test_export_corpus.py @@ -0,0 +1,263 @@ +"""Tests for scripts/export_corpus.py""" +from __future__ import annotations + +import json +import sqlite3 +import sys +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent)) +sys.path.insert(0, str(Path(__file__).parent.parent / "scripts")) + +import export_corpus as ec + + +# ── Helpers ──────────────────────────────────────────────────────────────────── + +def _make_db(tmp_path: Path) -> sqlite3.Connection: + db = tmp_path / "turnstone.db" + conn = sqlite3.connect(str(db)) + conn.row_factory = sqlite3.Row + conn.execute(""" + CREATE TABLE log_entries ( + id TEXT PRIMARY KEY, source_id TEXT, sequence INTEGER, + timestamp_raw TEXT, timestamp_iso TEXT, ingest_time TEXT, + severity TEXT, repeat_count INTEGER DEFAULT 1, + out_of_order INTEGER DEFAULT 0, + matched_patterns TEXT DEFAULT '[]', text TEXT + ) + """) + conn.execute(""" + CREATE TABLE incidents ( + id TEXT PRIMARY KEY, label TEXT, issue_type TEXT, + started_at TEXT, ended_at TEXT, notes TEXT, + created_at TEXT, severity TEXT DEFAULT 'medium' + ) + """) + conn.commit() + return conn + + +def _insert_entry(conn: sqlite3.Connection, rowid_hint: str, severity: str = "ERROR") -> None: + conn.execute( + "INSERT INTO log_entries VALUES (?,?,?,?,?,?,?,?,?,?,?)", + (rowid_hint, "sonarr", 1, None, "2026-05-11T10:00:00", "2026-05-11T10:00:05", + severity, 1, 0, '[]', f"Log line for {rowid_hint}"), + ) + conn.commit() + + +def _insert_incident(conn: sqlite3.Connection, created_at: str) -> None: + conn.execute( + "INSERT INTO incidents VALUES (?,?,?,?,?,?,?,?)", + (f"inc-{created_at}", "plex crash", "plex_crash", "2026-05-11T09:55:00", + "2026-05-11T10:05:00", "audio stopped", created_at, "high"), + ) + conn.commit() + + +# ── Watermark helpers ────────────────────────────────────────────────────────── + +def test_read_rowid_watermark_missing(tmp_path): + db = tmp_path / "t.db" + assert ec.read_rowid_watermark(db) == 0 + + +def test_read_rowid_watermark_valid(tmp_path): + db = tmp_path / "t.db" + (tmp_path / "corpus_watermark.txt").write_text("42") + assert ec.read_rowid_watermark(db) == 42 + + +def test_read_rowid_watermark_corrupt(tmp_path): + db = tmp_path / "t.db" + (tmp_path / "corpus_watermark.txt").write_text("not-a-number") + assert ec.read_rowid_watermark(db) == 0 + + +def test_write_read_rowid_roundtrip(tmp_path): + db = tmp_path / "t.db" + ec.write_rowid_watermark(db, 99) + assert ec.read_rowid_watermark(db) == 99 + + +def test_read_ts_watermark_missing(tmp_path): + db = tmp_path / "t.db" + assert ec.read_ts_watermark(db) == "1970-01-01T00:00:00" + + +def test_write_read_ts_roundtrip(tmp_path): + db = tmp_path / "t.db" + ec.write_ts_watermark(db, "2026-05-11T12:00:00") + assert ec.read_ts_watermark(db) == "2026-05-11T12:00:00" + + +# ── export_raw_entries ───────────────────────────────────────────────────────── + +def test_export_raw_entries_empty(tmp_path): + conn = _make_db(tmp_path) + db = tmp_path / "turnstone.db" + posted = [] + + with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])): + count = ec.export_raw_entries(conn, db, "http://avocet/", "tok") + + assert count == 0 + assert posted == [] + + +def test_export_raw_entries_skips_warn(tmp_path): + conn = _make_db(tmp_path) + db = tmp_path / "turnstone.db" + _insert_entry(conn, "w1", severity="WARN") + posted = [] + + with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])): + count = ec.export_raw_entries(conn, db, "http://avocet/", "tok") + + assert count == 0 + assert posted == [] + + +def test_export_raw_entries_sends_errors(tmp_path): + conn = _make_db(tmp_path) + db = tmp_path / "turnstone.db" + _insert_entry(conn, "e1", severity="ERROR") + _insert_entry(conn, "e2", severity="CRITICAL") + posted = [] + + with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])): + count = ec.export_raw_entries(conn, db, "http://avocet/", "tok") + + assert count == 2 + assert len(posted) == 1 + payload = posted[0] + assert payload["batch_type"] == "raw_entries" + assert len(payload["entries"]) == 2 + assert all(e["severity"] in ("ERROR", "CRITICAL") for e in payload["entries"]) + + +def test_export_raw_entries_updates_watermark(tmp_path): + conn = _make_db(tmp_path) + db = tmp_path / "turnstone.db" + _insert_entry(conn, "e1", severity="ERROR") + + with patch.object(ec, "post_batch"): + ec.export_raw_entries(conn, db, "http://avocet/", "tok") + + assert ec.read_rowid_watermark(db) > 0 + + +def test_export_raw_entries_respects_watermark(tmp_path): + conn = _make_db(tmp_path) + db = tmp_path / "turnstone.db" + _insert_entry(conn, "e1", severity="ERROR") + + # set watermark past first entry + with patch.object(ec, "post_batch"): + ec.export_raw_entries(conn, db, "http://avocet/", "tok") + + # insert a second entry and export again + _insert_entry(conn, "e2", severity="ERROR") + posted = [] + with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])): + count = ec.export_raw_entries(conn, db, "http://avocet/", "tok") + + assert count == 1 + assert len(posted[0]["entries"]) == 1 + assert posted[0]["entries"][0]["entry_id"] == "e2" + + +def test_export_raw_entries_no_watermark_update_on_failure(tmp_path): + conn = _make_db(tmp_path) + db = tmp_path / "turnstone.db" + _insert_entry(conn, "e1", severity="ERROR") + + import httpx as _httpx + mock_resp = MagicMock() + mock_resp.status_code = 403 + mock_resp.text = "Forbidden" + exc = _httpx.HTTPStatusError("403", request=MagicMock(), response=mock_resp) + + with patch.object(ec, "post_batch", side_effect=exc): + with pytest.raises(_httpx.HTTPStatusError): + ec.export_raw_entries(conn, db, "http://avocet/", "tok") + + assert ec.read_rowid_watermark(db) == 0 + + +# ── export_incidents ─────────────────────────────────────────────────────────── + +def test_export_incidents_empty(tmp_path): + conn = _make_db(tmp_path) + db = tmp_path / "turnstone.db" + posted = [] + + with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])): + count = ec.export_incidents(conn, db, "http://avocet/", "tok") + + assert count == 0 + assert posted == [] + + +def test_export_incidents_sends_payload(tmp_path): + conn = _make_db(tmp_path) + db = tmp_path / "turnstone.db" + _insert_incident(conn, "2026-05-11T10:00:00") + posted = [] + + with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])): + count = ec.export_incidents(conn, db, "http://avocet/", "tok") + + assert count == 1 + payload = posted[0] + assert payload["batch_type"] == "incident_bundles" + assert payload["entries"][0]["label"] == "plex crash" + + +def test_export_incidents_updates_watermark(tmp_path): + conn = _make_db(tmp_path) + db = tmp_path / "turnstone.db" + _insert_incident(conn, "2026-05-11T10:00:00") + + with patch.object(ec, "post_batch"): + ec.export_incidents(conn, db, "http://avocet/", "tok") + + assert ec.read_ts_watermark(db) == "2026-05-11T10:00:00" + + +# ── main ─────────────────────────────────────────────────────────────────────── + +def test_main_no_endpoint(monkeypatch): + monkeypatch.setattr(ec, "CORPUS_ENDPOINT", "") + assert ec.main() == 0 + + +def test_main_no_token(monkeypatch): + monkeypatch.setattr(ec, "CORPUS_ENDPOINT", "http://avocet/") + monkeypatch.setattr(ec, "CONSENT_TOKEN", "") + assert ec.main() == 1 + + +def test_main_db_missing(monkeypatch, tmp_path): + monkeypatch.setattr(ec, "CORPUS_ENDPOINT", "http://avocet/") + monkeypatch.setattr(ec, "CONSENT_TOKEN", "tok") + monkeypatch.setattr(ec, "DB_PATH", tmp_path / "missing.db") + assert ec.main() == 1 + + +def test_main_success(monkeypatch, tmp_path): + conn = _make_db(tmp_path) + conn.close() + db = tmp_path / "turnstone.db" + monkeypatch.setattr(ec, "CORPUS_ENDPOINT", "http://avocet/") + monkeypatch.setattr(ec, "CONSENT_TOKEN", "tok") + monkeypatch.setattr(ec, "DB_PATH", db) + + with patch.object(ec, "post_batch"): + result = ec.main() + + assert result == 0