turnstone/app/tasks/glean_scheduler.py
pyr0ball 828b69768a refactor: rename ingest → glean throughout codebase
Renames the app/ingest/ package to app/glean/ and updates all
references across Python modules, shell scripts, Vue components,
tests, and documentation.

Intentionally preserved:
- SQLite column name ingest_time (avoids schema migration)
- RetrievedEntry.ingest_time field (maps to the column above)
- Any public-facing JSON keys that reference ingest_time

Changes by category:
- app/ingest/ → app/glean/ (full package move, all parsers)
- app/tasks/ingest_scheduler.py → app/tasks/glean_scheduler.py
- scripts/ingest_corpus.py → scripts/glean_corpus.py
- tests/test_ingest_*.py → tests/test_glean_*.py
- Docstrings, log messages, comments: ingest → glean
- Env var: TURNSTONE_INGEST_INTERVAL → TURNSTONE_GLEAN_INTERVAL
- Shell scripts: glean.log, glean_corpus.py references
- README.md: multi-source ingest → multi-source glean
- .env.example: updated env var name
- patterns/: new diagnostic patterns from 2026-05-20 SSH incident
  (service_crash_loop, pkg_daemon_restart, ssh_forward_conflict)
- SourcesView.vue: pipeline label updated
- All test import paths updated to app.glean.*

285 tests passing.
2026-05-20 23:02:55 -07:00

184 lines
6.4 KiB
Python

"""Periodic batch glean scheduler with optional CF submission.
Runs glean_sources on a configurable interval (TURNSTONE_GLEAN_INTERVAL env var,
default 900s / 15 min). Set to 0 to disable.
When TURNSTONE_SUBMIT_ENDPOINT is set, pushes pattern-matched entries to a remote
Turnstone instance (the CF receiving store) after each glean run.
"""
from __future__ import annotations
import asyncio
import json
import logging
import sqlite3
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
import httpx
from app.glean.pipeline import glean_sources
logger = logging.getLogger(__name__)
_lock = asyncio.Lock()
@dataclass
class IngestState:
last_run_at: str | None = None
last_duration_s: float | None = None
last_stats: dict[str, int] = field(default_factory=dict)
last_error: str | None = None
run_count: int = 0
next_run_at: str | None = None
running: bool = False
last_submitted_at: str | None = None
last_submit_count: int = 0
last_submit_error: str | None = None
_state = IngestState()
def get_state() -> IngestState:
return _state
def _query_matched_since(db_path: Path, since: str | None) -> list[dict]:
"""Return entries with non-empty matched_patterns, optionally filtered by ingest_time."""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
if since:
rows = conn.execute(
"""
SELECT id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text
FROM log_entries
WHERE matched_patterns != '[]' AND ingest_time > ?
ORDER BY ingest_time
LIMIT 5000
""",
(since,),
).fetchall()
else:
rows = conn.execute(
"""
SELECT id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text
FROM log_entries
WHERE matched_patterns != '[]'
ORDER BY ingest_time DESC
LIMIT 5000
""",
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
async def submit_matched(
db_path: Path,
submit_endpoint: str,
source_host: str,
since: str | None = None,
) -> dict[str, Any]:
"""Push pattern-matched entries to the remote CF receiving instance."""
loop = asyncio.get_running_loop()
entries = await loop.run_in_executor(
None, lambda: _query_matched_since(db_path, since)
)
if not entries:
return {"ok": True, "submitted": 0, "skipped": True}
url = f"{submit_endpoint.rstrip('/')}/turnstone/api/glean/batch"
payload = {"source_host": source_host, "entries": entries}
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()
result = resp.json()
submitted = result.get("gleaned", len(entries))
_state.last_submitted_at = datetime.now(tz=timezone.utc).isoformat()
_state.last_submit_count = submitted
_state.last_submit_error = None
logger.info("Submitted %d matched entries to %s", submitted, submit_endpoint)
return {"ok": True, "submitted": submitted}
except Exception as exc:
_state.last_submit_error = str(exc)
logger.warning("Submission to %s failed: %s", submit_endpoint, exc)
return {"ok": False, "error": str(exc)}
async def run_once(
sources_file: Path,
db_path: Path,
pattern_file: Path | None = None,
submit_endpoint: str | None = None,
source_host: str = "unknown",
) -> dict[str, Any]:
"""Ingest all sources once, then submit matched entries if configured."""
if _lock.locked():
return {"ok": False, "error": "glean already running", "skipped": True}
async with _lock:
_state.running = True
started = datetime.now(tz=timezone.utc)
try:
loop = asyncio.get_running_loop()
stats: dict[str, int] = await loop.run_in_executor(
None,
lambda: glean_sources(sources_file, db_path, pattern_file),
)
duration = (datetime.now(tz=timezone.utc) - started).total_seconds()
_state.last_run_at = started.isoformat()
_state.last_duration_s = round(duration, 2)
_state.last_stats = stats
_state.last_error = None
_state.run_count += 1
logger.info("Batch glean complete in %.1fs — %s", duration, stats)
except Exception as exc:
duration = (datetime.now(tz=timezone.utc) - started).total_seconds()
_state.last_run_at = started.isoformat()
_state.last_duration_s = round(duration, 2)
_state.last_error = str(exc)
_state.run_count += 1
logger.error("Batch glean failed: %s", exc)
_state.running = False
return {"ok": False, "error": str(exc)}
finally:
_state.running = False
if submit_endpoint:
await submit_matched(db_path, submit_endpoint, source_host, since=_state.last_submitted_at)
return {"ok": True, "stats": _state.last_stats, "duration_s": _state.last_duration_s}
async def scheduler_loop(
sources_file: Path,
db_path: Path,
pattern_file: Path | None,
interval_s: int,
submit_endpoint: str | None = None,
source_host: str = "unknown",
) -> None:
"""Run glean + optional submission every interval_s seconds until cancelled."""
logger.info("Ingest scheduler started — interval %ds, sources: %s", interval_s, sources_file)
if submit_endpoint:
logger.info("Submission enabled — endpoint: %s", submit_endpoint)
while True:
await run_once(sources_file, db_path, pattern_file, submit_endpoint, source_host)
next_run = datetime.now(tz=timezone.utc) + timedelta(seconds=interval_s)
_state.next_run_at = next_run.isoformat()
try:
await asyncio.sleep(interval_s)
except asyncio.CancelledError:
logger.info("Ingest scheduler cancelled")
_state.next_run_at = None
raise