"""Ingest pipeline: auto-detect format, parse, write to SQLite.""" from __future__ import annotations import json import logging import re import sqlite3 from pathlib import Path from typing import Iterator import yaml from app.ingest import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog from app.ingest.base import _compile, load_patterns, now_iso from app.services.models import LogPattern, RetrievedEntry from app.services.search import build_fts_index logger = logging.getLogger(__name__) _SCHEMA = """ CREATE TABLE IF NOT EXISTS log_entries ( id TEXT PRIMARY KEY, source_id TEXT NOT NULL, sequence INTEGER NOT NULL, timestamp_raw TEXT, timestamp_iso TEXT, ingest_time TEXT NOT NULL, severity TEXT, repeat_count INTEGER DEFAULT 1, out_of_order INTEGER DEFAULT 0, matched_patterns TEXT DEFAULT '[]', text TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id); CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso); CREATE INDEX IF NOT EXISTS idx_ts_repeat ON log_entries(timestamp_iso, repeat_count); CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity); CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns); CREATE TABLE IF NOT EXISTS incidents ( id TEXT PRIMARY KEY, label TEXT NOT NULL, issue_type TEXT NOT NULL DEFAULT '', started_at TEXT, ended_at TEXT, notes TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'medium' ); CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at); CREATE TABLE IF NOT EXISTS received_bundles ( id TEXT PRIMARY KEY, source_host TEXT NOT NULL, issue_type TEXT NOT NULL DEFAULT '', label TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'medium', started_at TEXT, bundled_at TEXT NOT NULL, entry_count INTEGER NOT NULL DEFAULT 0, bundle_json TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at); CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type); CREATE TABLE IF NOT EXISTS context_facts ( id TEXT PRIMARY KEY, category TEXT NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL, source TEXT, created_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category); CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key); CREATE TABLE IF NOT EXISTS context_documents ( id TEXT PRIMARY KEY, filename TEXT NOT NULL, doc_type TEXT NOT NULL, full_text TEXT NOT NULL, file_size INTEGER, uploaded_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS context_chunks ( id TEXT PRIMARY KEY, document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE, chunk_index INTEGER NOT NULL, text TEXT NOT NULL, embedding BLOB ); CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id); """ def ensure_schema(db_path: Path) -> None: """Create all tables and apply additive migrations. Safe to call on every startup.""" conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.executescript(_SCHEMA) # Additive column migrations — ALTER TABLE silently skips if column exists for stmt in [ "ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''", ]: try: conn.execute(stmt) except sqlite3.OperationalError: pass conn.commit() conn.close() def _detect_format(first_line: str) -> str: try: obj = json.loads(first_line) if "__REALTIME_TIMESTAMP" in obj: return "journald" if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"): return "docker" if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj): return "caddy" except (json.JSONDecodeError, AttributeError): pass if plex.is_plex_log(first_line): return "plex" if qbittorrent.is_qbit_log(first_line): return "qbittorrent" if servarr.is_servarr_log(first_line): return "servarr" if dmesg_log.is_dmesg_log(first_line): return "dmesg" if syslog.is_syslog(first_line): return "syslog" return "plaintext" def _parse_file( path: Path, compiled: list[tuple[LogPattern, object]], ingest_time: str, source_id: str | None = None, ) -> Iterator[RetrievedEntry]: source_id = source_id or path.stem with path.open("r", errors="replace") as f: lines = iter(f) try: first = next(lines) except StopIteration: return fmt = _detect_format(first.strip()) logger.info("Detected format %r for %s", fmt, path.name) def all_lines(): yield first yield from lines if fmt == "journald": yield from journald.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "docker": yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "caddy": yield from caddy.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "plex": yield from plex.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "qbittorrent": yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "servarr": yield from servarr.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "dmesg": yield from dmesg_log.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "syslog": yield from syslog.parse(all_lines(), source_id, compiled, ingest_time) else: yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time) def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None: conn.executemany( """ INSERT OR IGNORE INTO log_entries (id, source_id, sequence, timestamp_raw, timestamp_iso, ingest_time, severity, repeat_count, out_of_order, matched_patterns, text) VALUES (?,?,?,?,?,?,?,?,?,?,?) """, [ ( e.entry_id, e.source_id, e.sequence, e.timestamp_raw, e.timestamp_iso, e.ingest_time, e.severity, e.repeat_count, int(e.out_of_order), json.dumps(list(e.matched_patterns)), e.text, ) for e in batch ], ) def _ingest_files( files: list[Path], db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, source_id_map: dict[Path, str] | None = None, ) -> dict[str, int]: pattern_file = pattern_file or Path("patterns/default.yaml") patterns = load_patterns(pattern_file) compiled = _compile(patterns) ingest_time = now_iso() source_id_map = source_id_map or {} conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.executescript(_SCHEMA) conn.commit() stats: dict[str, int] = {} for log_file in files: source_id = source_id_map.get(log_file, log_file.stem) count = 0 batch: list[RetrievedEntry] = [] for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id): batch.append(entry) if len(batch) >= batch_size: _write_batch(conn, batch) conn.commit() count += len(batch) batch.clear() if batch: _write_batch(conn, batch) conn.commit() count += len(batch) stats[source_id] = stats.get(source_id, 0) + count logger.info("Ingested %d entries from %s (source: %s)", count, log_file.name, source_id) conn.close() logger.info("Building FTS index...") build_fts_index(db_path) logger.info("FTS index ready") return stats def ingest( corpus_dir: Path, db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, ) -> dict[str, int]: """Ingest all .jsonl and .log files from a corpus directory.""" files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log")) return _ingest_files(files, db_path, pattern_file, batch_size) def ingest_file( log_file: Path, db_path: Path, pattern_file: Path | None = None, ) -> dict[str, int]: """Ingest a single log file (any supported format).""" return _ingest_files([log_file], db_path, pattern_file) def ingest_sources( sources_file: Path, db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, ) -> dict[str, int]: """Ingest all sources listed in a sources.yaml config file. sources.yaml format: sources: - id: sonarr path: /opt/sonarr/config/logs/sonarr.0.txt - id: qbittorrent path: /opt/qbittorrent/config/data/logs/qbittorrent.log Missing paths are skipped with a warning so the cron keeps running when a service is temporarily down. """ with open(sources_file) as f: config = yaml.safe_load(f) files: list[Path] = [] source_id_map: dict[Path, str] = {} for src in config.get("sources", []): path = Path(src["path"]) if not path.exists(): logger.warning("Source %r not found, skipping: %s", src.get("id", "?"), path) continue files.append(path) if "id" in src: source_id_map[path] = src["id"] if not files: logger.warning("No source files found — check sources.yaml paths") return {} return _ingest_files(files, db_path, pattern_file, batch_size, source_id_map)