"""Periodic batch ingest scheduler with optional CF submission. Runs ingest_sources on a configurable interval (TURNSTONE_INGEST_INTERVAL env var, default 900s / 15 min). Set to 0 to disable. When TURNSTONE_SUBMIT_ENDPOINT is set, pushes pattern-matched entries to a remote Turnstone instance (the CF receiving store) after each ingest run. """ from __future__ import annotations import asyncio import json import logging import sqlite3 from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any import httpx from app.ingest.pipeline import ingest_sources logger = logging.getLogger(__name__) _lock = asyncio.Lock() @dataclass class IngestState: last_run_at: str | None = None last_duration_s: float | None = None last_stats: dict[str, int] = field(default_factory=dict) last_error: str | None = None run_count: int = 0 next_run_at: str | None = None running: bool = False last_submitted_at: str | None = None last_submit_count: int = 0 last_submit_error: str | None = None _state = IngestState() def get_state() -> IngestState: return _state def _query_matched_since(db_path: Path, since: str | None) -> list[dict]: """Return entries with non-empty matched_patterns, optionally filtered by ingest_time.""" conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row try: if since: rows = conn.execute( """ SELECT id, source_id, sequence, timestamp_raw, timestamp_iso, ingest_time, severity, repeat_count, out_of_order, matched_patterns, text FROM log_entries WHERE matched_patterns != '[]' AND ingest_time > ? ORDER BY ingest_time LIMIT 5000 """, (since,), ).fetchall() else: rows = conn.execute( """ SELECT id, source_id, sequence, timestamp_raw, timestamp_iso, ingest_time, severity, repeat_count, out_of_order, matched_patterns, text FROM log_entries WHERE matched_patterns != '[]' ORDER BY ingest_time DESC LIMIT 5000 """, ).fetchall() return [dict(r) for r in rows] finally: conn.close() async def submit_matched( db_path: Path, submit_endpoint: str, source_host: str, since: str | None = None, ) -> dict[str, Any]: """Push pattern-matched entries to the remote CF receiving instance.""" loop = asyncio.get_running_loop() entries = await loop.run_in_executor( None, lambda: _query_matched_since(db_path, since) ) if not entries: return {"ok": True, "submitted": 0, "skipped": True} url = f"{submit_endpoint.rstrip('/')}/turnstone/api/ingest/batch" payload = {"source_host": source_host, "entries": entries} try: async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post(url, json=payload) resp.raise_for_status() result = resp.json() submitted = result.get("ingested", len(entries)) _state.last_submitted_at = datetime.now(tz=timezone.utc).isoformat() _state.last_submit_count = submitted _state.last_submit_error = None logger.info("Submitted %d matched entries to %s", submitted, submit_endpoint) return {"ok": True, "submitted": submitted} except Exception as exc: _state.last_submit_error = str(exc) logger.warning("Submission to %s failed: %s", submit_endpoint, exc) return {"ok": False, "error": str(exc)} async def run_once( sources_file: Path, db_path: Path, pattern_file: Path | None = None, submit_endpoint: str | None = None, source_host: str = "unknown", ) -> dict[str, Any]: """Ingest all sources once, then submit matched entries if configured.""" if _lock.locked(): return {"ok": False, "error": "ingest already running", "skipped": True} async with _lock: _state.running = True started = datetime.now(tz=timezone.utc) try: loop = asyncio.get_running_loop() stats: dict[str, int] = await loop.run_in_executor( None, lambda: ingest_sources(sources_file, db_path, pattern_file), ) duration = (datetime.now(tz=timezone.utc) - started).total_seconds() _state.last_run_at = started.isoformat() _state.last_duration_s = round(duration, 2) _state.last_stats = stats _state.last_error = None _state.run_count += 1 logger.info("Batch ingest complete in %.1fs — %s", duration, stats) except Exception as exc: duration = (datetime.now(tz=timezone.utc) - started).total_seconds() _state.last_run_at = started.isoformat() _state.last_duration_s = round(duration, 2) _state.last_error = str(exc) _state.run_count += 1 logger.error("Batch ingest failed: %s", exc) _state.running = False return {"ok": False, "error": str(exc)} finally: _state.running = False if submit_endpoint: await submit_matched(db_path, submit_endpoint, source_host, since=_state.last_submitted_at) return {"ok": True, "stats": _state.last_stats, "duration_s": _state.last_duration_s} async def scheduler_loop( sources_file: Path, db_path: Path, pattern_file: Path | None, interval_s: int, submit_endpoint: str | None = None, source_host: str = "unknown", ) -> None: """Run ingest + optional submission every interval_s seconds until cancelled.""" logger.info("Ingest scheduler started — interval %ds, sources: %s", interval_s, sources_file) if submit_endpoint: logger.info("Submission enabled — endpoint: %s", submit_endpoint) while True: await run_once(sources_file, db_path, pattern_file, submit_endpoint, source_host) next_run = datetime.now(tz=timezone.utc) + timedelta(seconds=interval_s) _state.next_run_at = next_run.isoformat() try: await asyncio.sleep(interval_s) except asyncio.CancelledError: logger.info("Ingest scheduler cancelled") _state.next_run_at = None raise