"""Ingest pipeline: auto-detect format, parse, write to SQLite.""" from __future__ import annotations import json import logging import re import sqlite3 from pathlib import Path from typing import Iterator from app.ingest import caddy, docker_log, journald, plaintext, plex, qbittorrent from app.ingest.base import _compile, load_patterns, now_iso from app.services.models import LogPattern, RetrievedEntry from app.services.search import build_fts_index logger = logging.getLogger(__name__) _SCHEMA = """ CREATE TABLE IF NOT EXISTS log_entries ( id TEXT PRIMARY KEY, source_id TEXT NOT NULL, sequence INTEGER NOT NULL, timestamp_raw TEXT, timestamp_iso TEXT, ingest_time TEXT NOT NULL, severity TEXT, repeat_count INTEGER DEFAULT 1, out_of_order INTEGER DEFAULT 0, matched_patterns TEXT DEFAULT '[]', text TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id); CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso); CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity); CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns); CREATE TABLE IF NOT EXISTS incidents ( id TEXT PRIMARY KEY, label TEXT NOT NULL, started_at TEXT, ended_at TEXT, notes TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'medium' ); CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at); """ def ensure_schema(db_path: Path) -> None: """Create all tables if they don't exist. Safe to call on every startup.""" conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.executescript(_SCHEMA) conn.commit() conn.close() def _detect_format(first_line: str) -> str: try: obj = json.loads(first_line) if "__REALTIME_TIMESTAMP" in obj: return "journald" if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"): return "docker" if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj): return "caddy" except (json.JSONDecodeError, AttributeError): pass if plex.is_plex_log(first_line): return "plex" if qbittorrent.is_qbit_log(first_line): return "qbittorrent" return "plaintext" def _parse_file( path: Path, compiled: list[tuple[LogPattern, object]], ingest_time: str, ) -> Iterator[RetrievedEntry]: source_id = path.stem with path.open("r", errors="replace") as f: lines = iter(f) try: first = next(lines) except StopIteration: return fmt = _detect_format(first.strip()) logger.info("Detected format %r for %s", fmt, path.name) def all_lines(): yield first yield from lines if fmt == "journald": yield from journald.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "docker": yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "caddy": yield from caddy.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "plex": yield from plex.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "qbittorrent": yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time) else: yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time) def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None: conn.executemany( """ INSERT OR IGNORE INTO log_entries (id, source_id, sequence, timestamp_raw, timestamp_iso, ingest_time, severity, repeat_count, out_of_order, matched_patterns, text) VALUES (?,?,?,?,?,?,?,?,?,?,?) """, [ ( e.entry_id, e.source_id, e.sequence, e.timestamp_raw, e.timestamp_iso, e.ingest_time, e.severity, e.repeat_count, int(e.out_of_order), json.dumps(list(e.matched_patterns)), e.text, ) for e in batch ], ) def _ingest_files( files: list[Path], db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, ) -> dict[str, int]: pattern_file = pattern_file or Path("patterns/default.yaml") patterns = load_patterns(pattern_file) compiled = _compile(patterns) ingest_time = now_iso() conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.executescript(_SCHEMA) conn.commit() stats: dict[str, int] = {} for log_file in files: count = 0 batch: list[RetrievedEntry] = [] for entry in _parse_file(log_file, compiled, ingest_time): batch.append(entry) if len(batch) >= batch_size: _write_batch(conn, batch) conn.commit() count += len(batch) batch.clear() if batch: _write_batch(conn, batch) conn.commit() count += len(batch) stats[log_file.name] = count logger.info("Ingested %d entries from %s", count, log_file.name) conn.close() logger.info("Building FTS index...") build_fts_index(db_path) logger.info("FTS index ready") return stats def ingest( corpus_dir: Path, db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, ) -> dict[str, int]: """Ingest all .jsonl and .log files from a corpus directory.""" files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log")) return _ingest_files(files, db_path, pattern_file, batch_size) def ingest_file( log_file: Path, db_path: Path, pattern_file: Path | None = None, ) -> dict[str, int]: """Ingest a single log file (any supported format).""" return _ingest_files([log_file], db_path, pattern_file)