feat: periodic corpus export — push ERROR/CRITICAL entries and incidents to Avocet

Watermark-based batch export script (scripts/export_corpus.py) pushes up to 500
ERROR/CRITICAL entries and labeled incidents per run to AVOCET_CORPUS_ENDPOINT.
Uses SQLite rowid watermark (entry log) and ISO timestamp watermark (incidents).
Skips silently when AVOCET_CORPUS_ENDPOINT is not set. 19 tests. Closes turnstone#6.
This commit is contained in:
pyr0ball 2026-05-11 17:08:35 -07:00
parent 02866e6882
commit b2011d18b6
2 changed files with 457 additions and 0 deletions

194
scripts/export_corpus.py Normal file
View file

@ -0,0 +1,194 @@
"""Export ERROR/CRITICAL log entries and labeled incidents to Avocet corpus endpoint.
Run periodically alongside ingest_corpus.py (same cron schedule recommended).
Watermarks are stored as plain text files next to the DB:
corpus_watermark.txt last exported log_entries rowid
incident_watermark.txt last exported incident created_at timestamp
Required env vars:
AVOCET_CORPUS_ENDPOINT URL to POST batches to
AVOCET_CONSENT_TOKEN Per-node consent token (issued by CF)
Optional env vars:
TURNSTONE_DB Path to turnstone.db (default: /data/turnstone.db)
TURNSTONE_SOURCE_HOST Node identifier (default: system hostname)
Exit codes:
0 success (including no-op when endpoint not configured)
1 configuration error or failed POST
"""
from __future__ import annotations
import json
import logging
import os
import socket
import sqlite3
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
import httpx
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
log = logging.getLogger(__name__)
sys.path.insert(0, str(Path(__file__).parent.parent))
BATCH_LIMIT = 500
BATCH_VERSION = 1
DB_PATH = Path(os.environ.get("TURNSTONE_DB", "/data/turnstone.db"))
CORPUS_ENDPOINT = os.environ.get("AVOCET_CORPUS_ENDPOINT", "")
CONSENT_TOKEN = os.environ.get("AVOCET_CONSENT_TOKEN", "")
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", socket.gethostname())
def _watermark_path(db_path: Path, name: str) -> Path:
return db_path.parent / name
def read_rowid_watermark(db_path: Path) -> int:
path = _watermark_path(db_path, "corpus_watermark.txt")
if path.exists():
try:
return int(path.read_text().strip())
except (ValueError, OSError):
pass
return 0
def write_rowid_watermark(db_path: Path, rowid: int) -> None:
_watermark_path(db_path, "corpus_watermark.txt").write_text(str(rowid))
def read_ts_watermark(db_path: Path) -> str:
path = _watermark_path(db_path, "incident_watermark.txt")
if path.exists():
return path.read_text().strip() or "1970-01-01T00:00:00"
return "1970-01-01T00:00:00"
def write_ts_watermark(db_path: Path, ts: str) -> None:
_watermark_path(db_path, "incident_watermark.txt").write_text(ts)
def post_batch(endpoint: str, token: str, payload: dict) -> None:
resp = httpx.post(
endpoint,
json=payload,
headers={"Authorization": f"Bearer {token}"},
timeout=30.0,
)
resp.raise_for_status()
def export_raw_entries(conn: sqlite3.Connection, db_path: Path, endpoint: str, token: str) -> int:
last_rowid = read_rowid_watermark(db_path)
rows = conn.execute(
"SELECT rowid, id, source_id, timestamp_iso, ingest_time, severity, matched_patterns, text "
"FROM log_entries "
"WHERE severity IN ('ERROR', 'CRITICAL') AND rowid > ? "
"ORDER BY rowid LIMIT ?",
(last_rowid, BATCH_LIMIT),
).fetchall()
if not rows:
log.info("No new ERROR/CRITICAL entries since rowid %d", last_rowid)
return 0
entries = [
{
"entry_id": row["id"],
"source_id": row["source_id"],
"timestamp_iso": row["timestamp_iso"],
"ingest_time": row["ingest_time"],
"severity": row["severity"],
"matched_patterns": json.loads(row["matched_patterns"] or "[]"),
"text": row["text"],
}
for row in rows
]
new_watermark = rows[-1]["rowid"]
payload = {
"batch_version": BATCH_VERSION,
"batch_id": str(uuid.uuid4()),
"pushed_at": datetime.now(timezone.utc).isoformat(),
"source_host": SOURCE_HOST,
"batch_type": "raw_entries",
"watermark_from": last_rowid,
"watermark_to": new_watermark,
"entries": entries,
}
post_batch(endpoint, token, payload)
write_rowid_watermark(db_path, new_watermark)
log.info("Exported %d entries (rowid %d%d)", len(rows), last_rowid, new_watermark)
return len(rows)
def export_incidents(conn: sqlite3.Connection, db_path: Path, endpoint: str, token: str) -> int:
last_ts = read_ts_watermark(db_path)
rows = conn.execute(
"SELECT id, label, issue_type, started_at, ended_at, notes, created_at, severity "
"FROM incidents WHERE created_at > ? ORDER BY created_at LIMIT ?",
(last_ts, BATCH_LIMIT),
).fetchall()
if not rows:
log.info("No new incidents since %s", last_ts)
return 0
incidents = [dict(row) for row in rows]
new_watermark = rows[-1]["created_at"]
payload = {
"batch_version": BATCH_VERSION,
"batch_id": str(uuid.uuid4()),
"pushed_at": datetime.now(timezone.utc).isoformat(),
"source_host": SOURCE_HOST,
"batch_type": "incident_bundles",
"watermark_from": last_ts,
"watermark_to": new_watermark,
"entries": incidents,
}
post_batch(endpoint, token, payload)
write_ts_watermark(db_path, new_watermark)
log.info("Exported %d incidents (up to %s)", len(rows), new_watermark)
return len(rows)
def main() -> int:
if not CORPUS_ENDPOINT:
log.info("AVOCET_CORPUS_ENDPOINT not set — skipping corpus export")
return 0
if not CONSENT_TOKEN:
log.error("AVOCET_CONSENT_TOKEN not set")
return 1
if not DB_PATH.exists():
log.error("DB not found: %s", DB_PATH)
return 1
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
try:
entry_count = export_raw_entries(conn, DB_PATH, CORPUS_ENDPOINT, CONSENT_TOKEN)
incident_count = export_incidents(conn, DB_PATH, CORPUS_ENDPOINT, CONSENT_TOKEN)
log.info("Done — %d entries, %d incidents exported", entry_count, incident_count)
return 0
except httpx.HTTPStatusError as exc:
log.error("HTTP %s from Avocet: %s", exc.response.status_code, exc.response.text[:200])
return 1
except Exception as exc:
log.error("Export failed: %s", exc)
return 1
finally:
conn.close()
if __name__ == "__main__":
sys.exit(main())

263
tests/test_export_corpus.py Normal file
View file

@ -0,0 +1,263 @@
"""Tests for scripts/export_corpus.py"""
from __future__ import annotations
import json
import sqlite3
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
import export_corpus as ec
# ── Helpers ────────────────────────────────────────────────────────────────────
def _make_db(tmp_path: Path) -> sqlite3.Connection:
db = tmp_path / "turnstone.db"
conn = sqlite3.connect(str(db))
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE log_entries (
id TEXT PRIMARY KEY, source_id TEXT, sequence INTEGER,
timestamp_raw TEXT, timestamp_iso TEXT, ingest_time TEXT,
severity TEXT, repeat_count INTEGER DEFAULT 1,
out_of_order INTEGER DEFAULT 0,
matched_patterns TEXT DEFAULT '[]', text TEXT
)
""")
conn.execute("""
CREATE TABLE incidents (
id TEXT PRIMARY KEY, label TEXT, issue_type TEXT,
started_at TEXT, ended_at TEXT, notes TEXT,
created_at TEXT, severity TEXT DEFAULT 'medium'
)
""")
conn.commit()
return conn
def _insert_entry(conn: sqlite3.Connection, rowid_hint: str, severity: str = "ERROR") -> None:
conn.execute(
"INSERT INTO log_entries VALUES (?,?,?,?,?,?,?,?,?,?,?)",
(rowid_hint, "sonarr", 1, None, "2026-05-11T10:00:00", "2026-05-11T10:00:05",
severity, 1, 0, '[]', f"Log line for {rowid_hint}"),
)
conn.commit()
def _insert_incident(conn: sqlite3.Connection, created_at: str) -> None:
conn.execute(
"INSERT INTO incidents VALUES (?,?,?,?,?,?,?,?)",
(f"inc-{created_at}", "plex crash", "plex_crash", "2026-05-11T09:55:00",
"2026-05-11T10:05:00", "audio stopped", created_at, "high"),
)
conn.commit()
# ── Watermark helpers ──────────────────────────────────────────────────────────
def test_read_rowid_watermark_missing(tmp_path):
db = tmp_path / "t.db"
assert ec.read_rowid_watermark(db) == 0
def test_read_rowid_watermark_valid(tmp_path):
db = tmp_path / "t.db"
(tmp_path / "corpus_watermark.txt").write_text("42")
assert ec.read_rowid_watermark(db) == 42
def test_read_rowid_watermark_corrupt(tmp_path):
db = tmp_path / "t.db"
(tmp_path / "corpus_watermark.txt").write_text("not-a-number")
assert ec.read_rowid_watermark(db) == 0
def test_write_read_rowid_roundtrip(tmp_path):
db = tmp_path / "t.db"
ec.write_rowid_watermark(db, 99)
assert ec.read_rowid_watermark(db) == 99
def test_read_ts_watermark_missing(tmp_path):
db = tmp_path / "t.db"
assert ec.read_ts_watermark(db) == "1970-01-01T00:00:00"
def test_write_read_ts_roundtrip(tmp_path):
db = tmp_path / "t.db"
ec.write_ts_watermark(db, "2026-05-11T12:00:00")
assert ec.read_ts_watermark(db) == "2026-05-11T12:00:00"
# ── export_raw_entries ─────────────────────────────────────────────────────────
def test_export_raw_entries_empty(tmp_path):
conn = _make_db(tmp_path)
db = tmp_path / "turnstone.db"
posted = []
with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])):
count = ec.export_raw_entries(conn, db, "http://avocet/", "tok")
assert count == 0
assert posted == []
def test_export_raw_entries_skips_warn(tmp_path):
conn = _make_db(tmp_path)
db = tmp_path / "turnstone.db"
_insert_entry(conn, "w1", severity="WARN")
posted = []
with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])):
count = ec.export_raw_entries(conn, db, "http://avocet/", "tok")
assert count == 0
assert posted == []
def test_export_raw_entries_sends_errors(tmp_path):
conn = _make_db(tmp_path)
db = tmp_path / "turnstone.db"
_insert_entry(conn, "e1", severity="ERROR")
_insert_entry(conn, "e2", severity="CRITICAL")
posted = []
with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])):
count = ec.export_raw_entries(conn, db, "http://avocet/", "tok")
assert count == 2
assert len(posted) == 1
payload = posted[0]
assert payload["batch_type"] == "raw_entries"
assert len(payload["entries"]) == 2
assert all(e["severity"] in ("ERROR", "CRITICAL") for e in payload["entries"])
def test_export_raw_entries_updates_watermark(tmp_path):
conn = _make_db(tmp_path)
db = tmp_path / "turnstone.db"
_insert_entry(conn, "e1", severity="ERROR")
with patch.object(ec, "post_batch"):
ec.export_raw_entries(conn, db, "http://avocet/", "tok")
assert ec.read_rowid_watermark(db) > 0
def test_export_raw_entries_respects_watermark(tmp_path):
conn = _make_db(tmp_path)
db = tmp_path / "turnstone.db"
_insert_entry(conn, "e1", severity="ERROR")
# set watermark past first entry
with patch.object(ec, "post_batch"):
ec.export_raw_entries(conn, db, "http://avocet/", "tok")
# insert a second entry and export again
_insert_entry(conn, "e2", severity="ERROR")
posted = []
with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])):
count = ec.export_raw_entries(conn, db, "http://avocet/", "tok")
assert count == 1
assert len(posted[0]["entries"]) == 1
assert posted[0]["entries"][0]["entry_id"] == "e2"
def test_export_raw_entries_no_watermark_update_on_failure(tmp_path):
conn = _make_db(tmp_path)
db = tmp_path / "turnstone.db"
_insert_entry(conn, "e1", severity="ERROR")
import httpx as _httpx
mock_resp = MagicMock()
mock_resp.status_code = 403
mock_resp.text = "Forbidden"
exc = _httpx.HTTPStatusError("403", request=MagicMock(), response=mock_resp)
with patch.object(ec, "post_batch", side_effect=exc):
with pytest.raises(_httpx.HTTPStatusError):
ec.export_raw_entries(conn, db, "http://avocet/", "tok")
assert ec.read_rowid_watermark(db) == 0
# ── export_incidents ───────────────────────────────────────────────────────────
def test_export_incidents_empty(tmp_path):
conn = _make_db(tmp_path)
db = tmp_path / "turnstone.db"
posted = []
with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])):
count = ec.export_incidents(conn, db, "http://avocet/", "tok")
assert count == 0
assert posted == []
def test_export_incidents_sends_payload(tmp_path):
conn = _make_db(tmp_path)
db = tmp_path / "turnstone.db"
_insert_incident(conn, "2026-05-11T10:00:00")
posted = []
with patch.object(ec, "post_batch", side_effect=lambda *a, **kw: posted.append(a[2])):
count = ec.export_incidents(conn, db, "http://avocet/", "tok")
assert count == 1
payload = posted[0]
assert payload["batch_type"] == "incident_bundles"
assert payload["entries"][0]["label"] == "plex crash"
def test_export_incidents_updates_watermark(tmp_path):
conn = _make_db(tmp_path)
db = tmp_path / "turnstone.db"
_insert_incident(conn, "2026-05-11T10:00:00")
with patch.object(ec, "post_batch"):
ec.export_incidents(conn, db, "http://avocet/", "tok")
assert ec.read_ts_watermark(db) == "2026-05-11T10:00:00"
# ── main ───────────────────────────────────────────────────────────────────────
def test_main_no_endpoint(monkeypatch):
monkeypatch.setattr(ec, "CORPUS_ENDPOINT", "")
assert ec.main() == 0
def test_main_no_token(monkeypatch):
monkeypatch.setattr(ec, "CORPUS_ENDPOINT", "http://avocet/")
monkeypatch.setattr(ec, "CONSENT_TOKEN", "")
assert ec.main() == 1
def test_main_db_missing(monkeypatch, tmp_path):
monkeypatch.setattr(ec, "CORPUS_ENDPOINT", "http://avocet/")
monkeypatch.setattr(ec, "CONSENT_TOKEN", "tok")
monkeypatch.setattr(ec, "DB_PATH", tmp_path / "missing.db")
assert ec.main() == 1
def test_main_success(monkeypatch, tmp_path):
conn = _make_db(tmp_path)
conn.close()
db = tmp_path / "turnstone.db"
monkeypatch.setattr(ec, "CORPUS_ENDPOINT", "http://avocet/")
monkeypatch.setattr(ec, "CONSENT_TOKEN", "tok")
monkeypatch.setattr(ec, "DB_PATH", db)
with patch.object(ec, "post_batch"):
result = ec.main()
assert result == 0