Watermark-based batch export script (scripts/export_corpus.py) pushes up to 500 ERROR/CRITICAL entries and labeled incidents per run to AVOCET_CORPUS_ENDPOINT. Uses SQLite rowid watermark (entry log) and ISO timestamp watermark (incidents). Skips silently when AVOCET_CORPUS_ENDPOINT is not set. 19 tests. Closes turnstone#6.
194 lines
6.1 KiB
Python
194 lines
6.1 KiB
Python
"""Export ERROR/CRITICAL log entries and labeled incidents to Avocet corpus endpoint.
|
|
|
|
Run periodically alongside ingest_corpus.py (same cron schedule recommended).
|
|
|
|
Watermarks are stored as plain text files next to the DB:
|
|
corpus_watermark.txt — last exported log_entries rowid
|
|
incident_watermark.txt — last exported incident created_at timestamp
|
|
|
|
Required env vars:
|
|
AVOCET_CORPUS_ENDPOINT URL to POST batches to
|
|
AVOCET_CONSENT_TOKEN Per-node consent token (issued by CF)
|
|
|
|
Optional env vars:
|
|
TURNSTONE_DB Path to turnstone.db (default: /data/turnstone.db)
|
|
TURNSTONE_SOURCE_HOST Node identifier (default: system hostname)
|
|
|
|
Exit codes:
|
|
0 — success (including no-op when endpoint not configured)
|
|
1 — configuration error or failed POST
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import socket
|
|
import sqlite3
|
|
import sys
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
|
|
log = logging.getLogger(__name__)
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
BATCH_LIMIT = 500
|
|
BATCH_VERSION = 1
|
|
|
|
DB_PATH = Path(os.environ.get("TURNSTONE_DB", "/data/turnstone.db"))
|
|
CORPUS_ENDPOINT = os.environ.get("AVOCET_CORPUS_ENDPOINT", "")
|
|
CONSENT_TOKEN = os.environ.get("AVOCET_CONSENT_TOKEN", "")
|
|
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", socket.gethostname())
|
|
|
|
|
|
def _watermark_path(db_path: Path, name: str) -> Path:
|
|
return db_path.parent / name
|
|
|
|
|
|
def read_rowid_watermark(db_path: Path) -> int:
|
|
path = _watermark_path(db_path, "corpus_watermark.txt")
|
|
if path.exists():
|
|
try:
|
|
return int(path.read_text().strip())
|
|
except (ValueError, OSError):
|
|
pass
|
|
return 0
|
|
|
|
|
|
def write_rowid_watermark(db_path: Path, rowid: int) -> None:
|
|
_watermark_path(db_path, "corpus_watermark.txt").write_text(str(rowid))
|
|
|
|
|
|
def read_ts_watermark(db_path: Path) -> str:
|
|
path = _watermark_path(db_path, "incident_watermark.txt")
|
|
if path.exists():
|
|
return path.read_text().strip() or "1970-01-01T00:00:00"
|
|
return "1970-01-01T00:00:00"
|
|
|
|
|
|
def write_ts_watermark(db_path: Path, ts: str) -> None:
|
|
_watermark_path(db_path, "incident_watermark.txt").write_text(ts)
|
|
|
|
|
|
def post_batch(endpoint: str, token: str, payload: dict) -> None:
|
|
resp = httpx.post(
|
|
endpoint,
|
|
json=payload,
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
timeout=30.0,
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
|
|
def export_raw_entries(conn: sqlite3.Connection, db_path: Path, endpoint: str, token: str) -> int:
|
|
last_rowid = read_rowid_watermark(db_path)
|
|
rows = conn.execute(
|
|
"SELECT rowid, id, source_id, timestamp_iso, ingest_time, severity, matched_patterns, text "
|
|
"FROM log_entries "
|
|
"WHERE severity IN ('ERROR', 'CRITICAL') AND rowid > ? "
|
|
"ORDER BY rowid LIMIT ?",
|
|
(last_rowid, BATCH_LIMIT),
|
|
).fetchall()
|
|
|
|
if not rows:
|
|
log.info("No new ERROR/CRITICAL entries since rowid %d", last_rowid)
|
|
return 0
|
|
|
|
entries = [
|
|
{
|
|
"entry_id": row["id"],
|
|
"source_id": row["source_id"],
|
|
"timestamp_iso": row["timestamp_iso"],
|
|
"ingest_time": row["ingest_time"],
|
|
"severity": row["severity"],
|
|
"matched_patterns": json.loads(row["matched_patterns"] or "[]"),
|
|
"text": row["text"],
|
|
}
|
|
for row in rows
|
|
]
|
|
new_watermark = rows[-1]["rowid"]
|
|
|
|
payload = {
|
|
"batch_version": BATCH_VERSION,
|
|
"batch_id": str(uuid.uuid4()),
|
|
"pushed_at": datetime.now(timezone.utc).isoformat(),
|
|
"source_host": SOURCE_HOST,
|
|
"batch_type": "raw_entries",
|
|
"watermark_from": last_rowid,
|
|
"watermark_to": new_watermark,
|
|
"entries": entries,
|
|
}
|
|
|
|
post_batch(endpoint, token, payload)
|
|
write_rowid_watermark(db_path, new_watermark)
|
|
log.info("Exported %d entries (rowid %d → %d)", len(rows), last_rowid, new_watermark)
|
|
return len(rows)
|
|
|
|
|
|
def export_incidents(conn: sqlite3.Connection, db_path: Path, endpoint: str, token: str) -> int:
|
|
last_ts = read_ts_watermark(db_path)
|
|
rows = conn.execute(
|
|
"SELECT id, label, issue_type, started_at, ended_at, notes, created_at, severity "
|
|
"FROM incidents WHERE created_at > ? ORDER BY created_at LIMIT ?",
|
|
(last_ts, BATCH_LIMIT),
|
|
).fetchall()
|
|
|
|
if not rows:
|
|
log.info("No new incidents since %s", last_ts)
|
|
return 0
|
|
|
|
incidents = [dict(row) for row in rows]
|
|
new_watermark = rows[-1]["created_at"]
|
|
|
|
payload = {
|
|
"batch_version": BATCH_VERSION,
|
|
"batch_id": str(uuid.uuid4()),
|
|
"pushed_at": datetime.now(timezone.utc).isoformat(),
|
|
"source_host": SOURCE_HOST,
|
|
"batch_type": "incident_bundles",
|
|
"watermark_from": last_ts,
|
|
"watermark_to": new_watermark,
|
|
"entries": incidents,
|
|
}
|
|
|
|
post_batch(endpoint, token, payload)
|
|
write_ts_watermark(db_path, new_watermark)
|
|
log.info("Exported %d incidents (up to %s)", len(rows), new_watermark)
|
|
return len(rows)
|
|
|
|
|
|
def main() -> int:
|
|
if not CORPUS_ENDPOINT:
|
|
log.info("AVOCET_CORPUS_ENDPOINT not set — skipping corpus export")
|
|
return 0
|
|
if not CONSENT_TOKEN:
|
|
log.error("AVOCET_CONSENT_TOKEN not set")
|
|
return 1
|
|
if not DB_PATH.exists():
|
|
log.error("DB not found: %s", DB_PATH)
|
|
return 1
|
|
|
|
conn = sqlite3.connect(str(DB_PATH))
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
entry_count = export_raw_entries(conn, DB_PATH, CORPUS_ENDPOINT, CONSENT_TOKEN)
|
|
incident_count = export_incidents(conn, DB_PATH, CORPUS_ENDPOINT, CONSENT_TOKEN)
|
|
log.info("Done — %d entries, %d incidents exported", entry_count, incident_count)
|
|
return 0
|
|
except httpx.HTTPStatusError as exc:
|
|
log.error("HTTP %s from Avocet: %s", exc.response.status_code, exc.response.text[:200])
|
|
return 1
|
|
except Exception as exc:
|
|
log.error("Export failed: %s", exc)
|
|
return 1
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|