turnstone/scripts/export_corpus.py
pyr0ball afcac6ff05 feat: periodic corpus export — push ERROR/CRITICAL entries and incidents to Avocet
Watermark-based batch export script (scripts/export_corpus.py) pushes up to 500
ERROR/CRITICAL entries and labeled incidents per run to AVOCET_CORPUS_ENDPOINT.
Uses SQLite rowid watermark (entry log) and ISO timestamp watermark (incidents).
Skips silently when AVOCET_CORPUS_ENDPOINT is not set. 19 tests. Closes turnstone#6.
2026-05-11 17:08:35 -07:00

194 lines
6.1 KiB
Python

"""Export ERROR/CRITICAL log entries and labeled incidents to Avocet corpus endpoint.
Run periodically alongside ingest_corpus.py (same cron schedule recommended).
Watermarks are stored as plain text files next to the DB:
corpus_watermark.txt — last exported log_entries rowid
incident_watermark.txt — last exported incident created_at timestamp
Required env vars:
AVOCET_CORPUS_ENDPOINT URL to POST batches to
AVOCET_CONSENT_TOKEN Per-node consent token (issued by CF)
Optional env vars:
TURNSTONE_DB Path to turnstone.db (default: /data/turnstone.db)
TURNSTONE_SOURCE_HOST Node identifier (default: system hostname)
Exit codes:
0 — success (including no-op when endpoint not configured)
1 — configuration error or failed POST
"""
from __future__ import annotations
import json
import logging
import os
import socket
import sqlite3
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
import httpx
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
log = logging.getLogger(__name__)
sys.path.insert(0, str(Path(__file__).parent.parent))
BATCH_LIMIT = 500
BATCH_VERSION = 1
DB_PATH = Path(os.environ.get("TURNSTONE_DB", "/data/turnstone.db"))
CORPUS_ENDPOINT = os.environ.get("AVOCET_CORPUS_ENDPOINT", "")
CONSENT_TOKEN = os.environ.get("AVOCET_CONSENT_TOKEN", "")
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", socket.gethostname())
def _watermark_path(db_path: Path, name: str) -> Path:
return db_path.parent / name
def read_rowid_watermark(db_path: Path) -> int:
path = _watermark_path(db_path, "corpus_watermark.txt")
if path.exists():
try:
return int(path.read_text().strip())
except (ValueError, OSError):
pass
return 0
def write_rowid_watermark(db_path: Path, rowid: int) -> None:
_watermark_path(db_path, "corpus_watermark.txt").write_text(str(rowid))
def read_ts_watermark(db_path: Path) -> str:
path = _watermark_path(db_path, "incident_watermark.txt")
if path.exists():
return path.read_text().strip() or "1970-01-01T00:00:00"
return "1970-01-01T00:00:00"
def write_ts_watermark(db_path: Path, ts: str) -> None:
_watermark_path(db_path, "incident_watermark.txt").write_text(ts)
def post_batch(endpoint: str, token: str, payload: dict) -> None:
resp = httpx.post(
endpoint,
json=payload,
headers={"Authorization": f"Bearer {token}"},
timeout=30.0,
)
resp.raise_for_status()
def export_raw_entries(conn: sqlite3.Connection, db_path: Path, endpoint: str, token: str) -> int:
last_rowid = read_rowid_watermark(db_path)
rows = conn.execute(
"SELECT rowid, id, source_id, timestamp_iso, ingest_time, severity, matched_patterns, text "
"FROM log_entries "
"WHERE severity IN ('ERROR', 'CRITICAL') AND rowid > ? "
"ORDER BY rowid LIMIT ?",
(last_rowid, BATCH_LIMIT),
).fetchall()
if not rows:
log.info("No new ERROR/CRITICAL entries since rowid %d", last_rowid)
return 0
entries = [
{
"entry_id": row["id"],
"source_id": row["source_id"],
"timestamp_iso": row["timestamp_iso"],
"ingest_time": row["ingest_time"],
"severity": row["severity"],
"matched_patterns": json.loads(row["matched_patterns"] or "[]"),
"text": row["text"],
}
for row in rows
]
new_watermark = rows[-1]["rowid"]
payload = {
"batch_version": BATCH_VERSION,
"batch_id": str(uuid.uuid4()),
"pushed_at": datetime.now(timezone.utc).isoformat(),
"source_host": SOURCE_HOST,
"batch_type": "raw_entries",
"watermark_from": last_rowid,
"watermark_to": new_watermark,
"entries": entries,
}
post_batch(endpoint, token, payload)
write_rowid_watermark(db_path, new_watermark)
log.info("Exported %d entries (rowid %d%d)", len(rows), last_rowid, new_watermark)
return len(rows)
def export_incidents(conn: sqlite3.Connection, db_path: Path, endpoint: str, token: str) -> int:
last_ts = read_ts_watermark(db_path)
rows = conn.execute(
"SELECT id, label, issue_type, started_at, ended_at, notes, created_at, severity "
"FROM incidents WHERE created_at > ? ORDER BY created_at LIMIT ?",
(last_ts, BATCH_LIMIT),
).fetchall()
if not rows:
log.info("No new incidents since %s", last_ts)
return 0
incidents = [dict(row) for row in rows]
new_watermark = rows[-1]["created_at"]
payload = {
"batch_version": BATCH_VERSION,
"batch_id": str(uuid.uuid4()),
"pushed_at": datetime.now(timezone.utc).isoformat(),
"source_host": SOURCE_HOST,
"batch_type": "incident_bundles",
"watermark_from": last_ts,
"watermark_to": new_watermark,
"entries": incidents,
}
post_batch(endpoint, token, payload)
write_ts_watermark(db_path, new_watermark)
log.info("Exported %d incidents (up to %s)", len(rows), new_watermark)
return len(rows)
def main() -> int:
if not CORPUS_ENDPOINT:
log.info("AVOCET_CORPUS_ENDPOINT not set — skipping corpus export")
return 0
if not CONSENT_TOKEN:
log.error("AVOCET_CONSENT_TOKEN not set")
return 1
if not DB_PATH.exists():
log.error("DB not found: %s", DB_PATH)
return 1
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
try:
entry_count = export_raw_entries(conn, DB_PATH, CORPUS_ENDPOINT, CONSENT_TOKEN)
incident_count = export_incidents(conn, DB_PATH, CORPUS_ENDPOINT, CONSENT_TOKEN)
log.info("Done — %d entries, %d incidents exported", entry_count, incident_count)
return 0
except httpx.HTTPStatusError as exc:
log.error("HTTP %s from Avocet: %s", exc.response.status_code, exc.response.text[:200])
return 1
except Exception as exc:
log.error("Export failed: %s", exc)
return 1
finally:
conn.close()
if __name__ == "__main__":
sys.exit(main())