turnstone/app/ingest/pipeline.py
pyr0ball f9ab4e5bb0 feat: incident tagging — DB schema, CRUD service, REST API (#1)
- Add `incidents` table to SQLite schema (id, label, started_at, ended_at,
  notes, created_at, severity)
- Extract `ensure_schema()` from ingest pipeline so tables are always
  created at startup, not only during ingest
- New `app/services/incidents.py`: create/list/get/delete + time-window
  entry association (FTS keyword search + raw window fallback)
- New `entries_in_window()` in search.py: plain SQL scan for incident
  detail when keyword FTS returns nothing
- REST endpoints: POST/GET /api/incidents, GET/DELETE /api/incidents/{id}
- Incident detail returns up to 100 associated log entries sorted by
  timestamp, prioritising FTS keyword hits then ERROR/CRITICAL then all
2026-05-09 15:37:14 -07:00

190 lines
5.8 KiB
Python

"""Ingest pipeline: auto-detect format, parse, write to SQLite."""
from __future__ import annotations
import json
import logging
import re
import sqlite3
from pathlib import Path
from typing import Iterator
from app.ingest import caddy, docker_log, journald, plaintext, plex
from app.ingest.base import _compile, load_patterns, now_iso
from app.services.models import LogPattern, RetrievedEntry
from app.services.search import build_fts_index
logger = logging.getLogger(__name__)
_SCHEMA = """
CREATE TABLE IF NOT EXISTS log_entries (
id TEXT PRIMARY KEY,
source_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
timestamp_raw TEXT,
timestamp_iso TEXT,
ingest_time TEXT NOT NULL,
severity TEXT,
repeat_count INTEGER DEFAULT 1,
out_of_order INTEGER DEFAULT 0,
matched_patterns TEXT DEFAULT '[]',
text TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id);
CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso);
CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity);
CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns);
CREATE TABLE IF NOT EXISTS incidents (
id TEXT PRIMARY KEY,
label TEXT NOT NULL,
started_at TEXT,
ended_at TEXT,
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'medium'
);
CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at);
"""
def ensure_schema(db_path: Path) -> None:
"""Create all tables if they don't exist. Safe to call on every startup."""
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript(_SCHEMA)
conn.commit()
conn.close()
def _detect_format(first_line: str) -> str:
try:
obj = json.loads(first_line)
if "__REALTIME_TIMESTAMP" in obj:
return "journald"
if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"):
return "docker"
if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj):
return "caddy"
except (json.JSONDecodeError, AttributeError):
pass
if plex.is_plex_log(first_line):
return "plex"
return "plaintext"
def _parse_file(
path: Path,
compiled: list[tuple[LogPattern, object]],
ingest_time: str,
) -> Iterator[RetrievedEntry]:
source_id = path.stem
with path.open("r", errors="replace") as f:
lines = iter(f)
try:
first = next(lines)
except StopIteration:
return
fmt = _detect_format(first.strip())
logger.info("Detected format %r for %s", fmt, path.name)
def all_lines():
yield first
yield from lines
if fmt == "journald":
yield from journald.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "docker":
yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "caddy":
yield from caddy.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "plex":
yield from plex.parse(all_lines(), source_id, compiled, ingest_time)
else:
yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time)
def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None:
conn.executemany(
"""
INSERT OR IGNORE INTO log_entries
(id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
""",
[
(
e.entry_id, e.source_id, e.sequence,
e.timestamp_raw, e.timestamp_iso, e.ingest_time,
e.severity, e.repeat_count, int(e.out_of_order),
json.dumps(list(e.matched_patterns)), e.text,
)
for e in batch
],
)
def _ingest_files(
files: list[Path],
db_path: Path,
pattern_file: Path | None = None,
batch_size: int = 1000,
) -> dict[str, int]:
pattern_file = pattern_file or Path("patterns/default.yaml")
patterns = load_patterns(pattern_file)
compiled = _compile(patterns)
ingest_time = now_iso()
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript(_SCHEMA)
conn.commit()
stats: dict[str, int] = {}
for log_file in files:
count = 0
batch: list[RetrievedEntry] = []
for entry in _parse_file(log_file, compiled, ingest_time):
batch.append(entry)
if len(batch) >= batch_size:
_write_batch(conn, batch)
conn.commit()
count += len(batch)
batch.clear()
if batch:
_write_batch(conn, batch)
conn.commit()
count += len(batch)
stats[log_file.name] = count
logger.info("Ingested %d entries from %s", count, log_file.name)
conn.close()
logger.info("Building FTS index...")
build_fts_index(db_path)
logger.info("FTS index ready")
return stats
def ingest(
corpus_dir: Path,
db_path: Path,
pattern_file: Path | None = None,
batch_size: int = 1000,
) -> dict[str, int]:
"""Ingest all .jsonl and .log files from a corpus directory."""
files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log"))
return _ingest_files(files, db_path, pattern_file, batch_size)
def ingest_file(
log_file: Path,
db_path: Path,
pattern_file: Path | None = None,
) -> dict[str, int]:
"""Ingest a single log file (any supported format)."""
return _ingest_files([log_file], db_path, pattern_file)