"""Export ERROR/CRITICAL log entries and labeled incidents to Avocet corpus endpoint. Run periodically alongside ingest_corpus.py (same cron schedule recommended). Watermarks are stored as plain text files next to the DB: corpus_watermark.txt — last exported log_entries rowid incident_watermark.txt — last exported incident created_at timestamp Required env vars: AVOCET_CORPUS_ENDPOINT URL to POST batches to AVOCET_CONSENT_TOKEN Per-node consent token (issued by CF) Optional env vars: TURNSTONE_DB Path to turnstone.db (default: /data/turnstone.db) TURNSTONE_SOURCE_HOST Node identifier (default: system hostname) Exit codes: 0 — success (including no-op when endpoint not configured) 1 — configuration error or failed POST """ from __future__ import annotations import json import logging import os import socket import sqlite3 import sys import uuid from datetime import datetime, timezone from pathlib import Path import httpx logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") log = logging.getLogger(__name__) sys.path.insert(0, str(Path(__file__).parent.parent)) BATCH_LIMIT = 500 BATCH_VERSION = 1 DB_PATH = Path(os.environ.get("TURNSTONE_DB", "/data/turnstone.db")) CORPUS_ENDPOINT = os.environ.get("AVOCET_CORPUS_ENDPOINT", "") CONSENT_TOKEN = os.environ.get("AVOCET_CONSENT_TOKEN", "") SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", socket.gethostname()) def _watermark_path(db_path: Path, name: str) -> Path: return db_path.parent / name def read_rowid_watermark(db_path: Path) -> int: path = _watermark_path(db_path, "corpus_watermark.txt") if path.exists(): try: return int(path.read_text().strip()) except (ValueError, OSError): pass return 0 def write_rowid_watermark(db_path: Path, rowid: int) -> None: _watermark_path(db_path, "corpus_watermark.txt").write_text(str(rowid)) def read_ts_watermark(db_path: Path) -> str: path = _watermark_path(db_path, "incident_watermark.txt") if path.exists(): return path.read_text().strip() or "1970-01-01T00:00:00" return "1970-01-01T00:00:00" def write_ts_watermark(db_path: Path, ts: str) -> None: _watermark_path(db_path, "incident_watermark.txt").write_text(ts) def post_batch(endpoint: str, token: str, payload: dict) -> None: resp = httpx.post( endpoint, json=payload, headers={"Authorization": f"Bearer {token}"}, timeout=30.0, ) resp.raise_for_status() def export_raw_entries(conn: sqlite3.Connection, db_path: Path, endpoint: str, token: str) -> int: last_rowid = read_rowid_watermark(db_path) rows = conn.execute( "SELECT rowid, id, source_id, timestamp_iso, ingest_time, severity, matched_patterns, text " "FROM log_entries " "WHERE severity IN ('ERROR', 'CRITICAL') AND rowid > ? " "ORDER BY rowid LIMIT ?", (last_rowid, BATCH_LIMIT), ).fetchall() if not rows: log.info("No new ERROR/CRITICAL entries since rowid %d", last_rowid) return 0 entries = [ { "entry_id": row["id"], "source_id": row["source_id"], "timestamp_iso": row["timestamp_iso"], "ingest_time": row["ingest_time"], "severity": row["severity"], "matched_patterns": json.loads(row["matched_patterns"] or "[]"), "text": row["text"], } for row in rows ] new_watermark = rows[-1]["rowid"] payload = { "batch_version": BATCH_VERSION, "batch_id": str(uuid.uuid4()), "pushed_at": datetime.now(timezone.utc).isoformat(), "source_host": SOURCE_HOST, "batch_type": "raw_entries", "watermark_from": last_rowid, "watermark_to": new_watermark, "entries": entries, } post_batch(endpoint, token, payload) write_rowid_watermark(db_path, new_watermark) log.info("Exported %d entries (rowid %d → %d)", len(rows), last_rowid, new_watermark) return len(rows) def export_incidents(conn: sqlite3.Connection, db_path: Path, endpoint: str, token: str) -> int: last_ts = read_ts_watermark(db_path) rows = conn.execute( "SELECT id, label, issue_type, started_at, ended_at, notes, created_at, severity " "FROM incidents WHERE created_at > ? ORDER BY created_at LIMIT ?", (last_ts, BATCH_LIMIT), ).fetchall() if not rows: log.info("No new incidents since %s", last_ts) return 0 incidents = [dict(row) for row in rows] new_watermark = rows[-1]["created_at"] payload = { "batch_version": BATCH_VERSION, "batch_id": str(uuid.uuid4()), "pushed_at": datetime.now(timezone.utc).isoformat(), "source_host": SOURCE_HOST, "batch_type": "incident_bundles", "watermark_from": last_ts, "watermark_to": new_watermark, "entries": incidents, } post_batch(endpoint, token, payload) write_ts_watermark(db_path, new_watermark) log.info("Exported %d incidents (up to %s)", len(rows), new_watermark) return len(rows) def main() -> int: if not CORPUS_ENDPOINT: log.info("AVOCET_CORPUS_ENDPOINT not set — skipping corpus export") return 0 if not CONSENT_TOKEN: log.error("AVOCET_CONSENT_TOKEN not set") return 1 if not DB_PATH.exists(): log.error("DB not found: %s", DB_PATH) return 1 conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row try: entry_count = export_raw_entries(conn, DB_PATH, CORPUS_ENDPOINT, CONSENT_TOKEN) incident_count = export_incidents(conn, DB_PATH, CORPUS_ENDPOINT, CONSENT_TOKEN) log.info("Done — %d entries, %d incidents exported", entry_count, incident_count) return 0 except httpx.HTTPStatusError as exc: log.error("HTTP %s from Avocet: %s", exc.response.status_code, exc.response.text[:200]) return 1 except Exception as exc: log.error("Export failed: %s", exc) return 1 finally: conn.close() if __name__ == "__main__": sys.exit(main())