"""Glean pipeline: auto-detect format, parse, write to SQLite or Postgres.""" from __future__ import annotations import json import logging import re import sqlite3 # still used in migrate_incidents_to_dedicated_db (SQLite-only migration) from pathlib import Path from typing import Any, Iterator from app.db import ( frag, get_conn, resolve_tenant_id, ) from app.db.schema import ( ensure_context_schema, ensure_incidents_schema, ensure_schema, migrate_incidents_to_dedicated_db, ) import yaml from app.glean import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog, wazuh from app.glean.base import _compile, load_patterns, now_iso from app.glean.ssh import ( SSHTransport, SSHConnectionError, SSHCommandError, _build_docker_command, _build_journald_command, _build_plaintext_command, _build_syslog_command, ) from app.services.models import LogPattern, RetrievedEntry from app.services.search import build_fts_index logger = logging.getLogger(__name__) _SCHEMA = """ CREATE TABLE IF NOT EXISTS log_entries ( id TEXT PRIMARY KEY, source_id TEXT NOT NULL, sequence INTEGER NOT NULL, timestamp_raw TEXT, timestamp_iso TEXT, ingest_time TEXT NOT NULL, severity TEXT, repeat_count INTEGER DEFAULT 1, out_of_order INTEGER DEFAULT 0, matched_patterns TEXT DEFAULT '[]', text TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id); CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso); CREATE INDEX IF NOT EXISTS idx_ts_repeat ON log_entries(timestamp_iso, repeat_count); CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity); CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns); -- incidents tables moved to ensure_incidents_schema() / INCIDENTS_DB_PATH -- kept here as no-ops so legacy single-file deployments still work CREATE TABLE IF NOT EXISTS incidents ( id TEXT PRIMARY KEY, label TEXT NOT NULL, issue_type TEXT NOT NULL DEFAULT '', started_at TEXT, ended_at TEXT, notes TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'medium' ); CREATE TABLE IF NOT EXISTS received_bundles ( id TEXT PRIMARY KEY, source_host TEXT NOT NULL, issue_type TEXT NOT NULL DEFAULT '', label TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'medium', started_at TEXT, bundled_at TEXT NOT NULL, entry_count INTEGER NOT NULL DEFAULT 0, bundle_json TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS sent_bundles ( id TEXT PRIMARY KEY, incident_id TEXT NOT NULL, exported_at TEXT NOT NULL, sanitized INTEGER NOT NULL DEFAULT 0, entry_count INTEGER NOT NULL DEFAULT 0, bundle_json TEXT NOT NULL ); -- context tables moved to ensure_context_schema() / CONTEXT_DB_PATH -- kept here as no-ops so legacy single-file deployments still work CREATE TABLE IF NOT EXISTS context_facts ( id TEXT PRIMARY KEY, category TEXT NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL, source TEXT, created_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category); CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key); CREATE TABLE IF NOT EXISTS context_documents ( id TEXT PRIMARY KEY, filename TEXT NOT NULL, doc_type TEXT NOT NULL, full_text TEXT NOT NULL, file_size INTEGER, uploaded_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS context_chunks ( id TEXT PRIMARY KEY, document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE, chunk_index INTEGER NOT NULL, text TEXT NOT NULL, embedding BLOB ); CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id); CREATE TABLE IF NOT EXISTS blocklist_candidates ( id TEXT PRIMARY KEY, domain_or_ip TEXT NOT NULL, source_device_ip TEXT, source_device_name TEXT, first_seen TEXT NOT NULL, last_seen TEXT NOT NULL, hit_count INTEGER DEFAULT 1, status TEXT DEFAULT 'pending', pushed_at TEXT, log_evidence TEXT DEFAULT '[]', matched_rule TEXT, llm_score REAL, llm_reason TEXT ); CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip); CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status); CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip); CREATE TABLE IF NOT EXISTS glean_fingerprints ( path TEXT PRIMARY KEY, mtime REAL NOT NULL, size INTEGER NOT NULL, gleaned_at TEXT NOT NULL ); """ _CONTEXT_SCHEMA = """ CREATE TABLE IF NOT EXISTS context_facts ( id TEXT PRIMARY KEY, category TEXT NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL, source TEXT, created_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category); CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key); CREATE TABLE IF NOT EXISTS context_documents ( id TEXT PRIMARY KEY, filename TEXT NOT NULL, doc_type TEXT NOT NULL, full_text TEXT NOT NULL, file_size INTEGER, uploaded_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS context_chunks ( id TEXT PRIMARY KEY, document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE, chunk_index INTEGER NOT NULL, text TEXT NOT NULL, embedding BLOB ); CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id); """ # ensure_schema / ensure_context_schema / ensure_incidents_schema / migrate_incidents_to_dedicated_db # are now implemented in app/db/schema.py and re-exported via app/db/__init__.py. # The imports at the top of this file bring them in; these names are kept as module-level # symbols so existing callers (rest.py, tests) still find them here without changes. # _INCIDENTS_SCHEMA and its ensure_/migrate_ functions moved to app/db/schema.py def _fingerprint(path: Path) -> tuple[float, int]: """Return (mtime, size) for a file — cheap identity check, no content read needed.""" st = path.stat() return st.st_mtime, st.st_size def _fp_unchanged(conn: Any, path: Path, mtime: float, size: int) -> bool: """Return True only when the stored fingerprint exactly matches (mtime, size).""" tid = resolve_tenant_id() row = conn.execute( "SELECT mtime, size FROM glean_fingerprints WHERE path = ? AND (tenant_id = ? OR tenant_id = '')", (str(path), tid), ).fetchone() if row is None: return False return row["mtime"] == mtime and row["size"] == size def _save_fingerprint( conn: Any, path: Path, mtime: float, size: int, gleaned_at: str, ) -> None: """Upsert the fingerprint for *path* after a successful glean.""" tid = resolve_tenant_id() conn.execute(frag.fingerprint_upsert(), (tid, str(path), mtime, size, gleaned_at)) def _detect_format(first_line: str) -> str: try: obj = json.loads(first_line) if "__REALTIME_TIMESTAMP" in obj: return "journald" if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"): return "docker" if wazuh.is_wazuh_alert(obj): return "wazuh" if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj): return "caddy" except (json.JSONDecodeError, AttributeError): pass if plex.is_plex_log(first_line): return "plex" if qbittorrent.is_qbit_log(first_line): return "qbittorrent" if servarr.is_servarr_log(first_line): return "servarr" if dmesg_log.is_dmesg_log(first_line): return "dmesg" if syslog.is_syslog(first_line): return "syslog" return "plaintext" def _parse_file( path: Path, compiled: list[tuple[LogPattern, object]], ingest_time: str, source_id: str | None = None, ) -> Iterator[RetrievedEntry]: source_id = source_id or path.stem with path.open("r", errors="replace") as f: lines = iter(f) try: first = next(lines) except StopIteration: return fmt = _detect_format(first.strip()) logger.info("Detected format %r for %s", fmt, path.name) def all_lines(): yield first yield from lines if fmt == "journald": yield from journald.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "wazuh": yield from wazuh.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "docker": yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "caddy": yield from caddy.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "plex": yield from plex.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "qbittorrent": yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "servarr": yield from servarr.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "dmesg": yield from dmesg_log.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "syslog": yield from syslog.parse(all_lines(), source_id, compiled, ingest_time) else: yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time) def _write_batch(conn: Any, batch: list[RetrievedEntry]) -> None: tid = resolve_tenant_id() conflict = frag.entries_conflict_clause() sql = f""" {frag.insert_ignore_entries()} (tenant_id, id, source_id, sequence, timestamp_raw, timestamp_iso, ingest_time, severity, repeat_count, out_of_order, matched_patterns, text) VALUES (?,?,?,?,?,?,?,?,?,?,?,?) {conflict} """ conn.executemany( sql, [ ( tid, e.entry_id, e.source_id, e.sequence, e.timestamp_raw, e.timestamp_iso, e.ingest_time, e.severity, e.repeat_count, int(e.out_of_order), json.dumps(list(e.matched_patterns)), e.text, ) for e in batch ], ) def _glean_files( files: list[Path], db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, source_id_map: dict[Path, str] | None = None, force: bool = False, ) -> dict[str, int]: pattern_file = pattern_file or Path("patterns/default.yaml") patterns = load_patterns(pattern_file) compiled = _compile(patterns) ingest_time = now_iso() source_id_map = source_id_map or {} ensure_schema(db_path) with get_conn(db_path) as conn: stats: dict[str, int] = {} skipped: list[str] = [] for log_file in files: source_id = source_id_map.get(log_file, log_file.stem) mtime, size = _fingerprint(log_file) if not force and _fp_unchanged(conn, log_file, mtime, size): logger.debug("Skipping unchanged file: %s", log_file.name) skipped.append(log_file.name) stats[source_id] = stats.get(source_id, 0) continue count = 0 batch: list[RetrievedEntry] = [] for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id): batch.append(entry) if len(batch) >= batch_size: _write_batch(conn, batch) conn.commit() count += len(batch) batch.clear() if batch: _write_batch(conn, batch) conn.commit() count += len(batch) _save_fingerprint(conn, log_file, mtime, size, ingest_time) conn.commit() stats[source_id] = stats.get(source_id, 0) + count logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id) if skipped: logger.info("Skipped %d unchanged file(s): %s", len(skipped), ", ".join(skipped)) logger.info("Building FTS index...") build_fts_index(db_path) logger.info("FTS index ready") return stats def _stream_and_write( transport: SSHTransport, cmd: str, parser, source_id: str, compiled: list[tuple[LogPattern, object]], ingest_time: str, conn: Any, batch_size: int, ) -> int: """Stream *cmd* output through *parser* and write entries to *conn*. Catches SSHCommandError per-item so one bad command doesn't abort the rest of the glean items for this host. Returns the number of entries written. """ count = 0 batch: list[RetrievedEntry] = [] try: for entry in parser(transport.exec_stream(cmd), source_id, compiled, ingest_time): batch.append(entry) if len(batch) >= batch_size: _write_batch(conn, batch) conn.commit() count += len(batch) batch.clear() if batch: _write_batch(conn, batch) conn.commit() count += len(batch) except SSHCommandError as exc: logger.warning("SSH command failed for source %r (cmd: %s): %s", source_id, cmd, exc) logger.info("Gleaned %d entries from SSH source %s", count, source_id) return count def _glean_ssh_source( src: dict, # type: ignore[type-arg] compiled: list[tuple[LogPattern, object]], ingest_time: str, conn: Any, batch_size: int, ) -> dict[str, int]: """Open one SSHTransport connection for *src* and glean all its glean items. One SSH connection is shared across all items in the ``glean:`` list so the handshake overhead is paid only once per host per glean run. Returns a stats dict mapping ``{source_id: entry_count}`` for each item. Gracefully skips the entire source on SSHConnectionError. """ host_id = src.get("id", src.get("host", "unknown")) host = src["host"] user = src["user"] key_path = str(Path(src["key_path"]).expanduser()) port = int(src.get("port", 22)) glean_items: list[dict] = src.get("glean", []) # type: ignore[type-arg] stats: dict[str, int] = {} try: with SSHTransport(host=host, user=user, key_path=key_path, port=port) as t: for item in glean_items: item_type = item.get("type", "plaintext") # Per-item source_id — falls back to host_id/type for un-labelled items item_id = item.get("id") or f"{host_id}/{item_type}" if item_type == "journald": cmd = _build_journald_command(item) count = _stream_and_write( t, cmd, journald.parse, item_id, compiled, ingest_time, conn, batch_size ) stats[item_id] = stats.get(item_id, 0) + count elif item_type == "syslog": cmd = _build_syslog_command(item) count = _stream_and_write( t, cmd, syslog.parse, item_id, compiled, ingest_time, conn, batch_size ) stats[item_id] = stats.get(item_id, 0) + count elif item_type == "plaintext": cmd = _build_plaintext_command(item) count = _stream_and_write( t, cmd, plaintext.parse, item_id, compiled, ingest_time, conn, batch_size ) stats[item_id] = stats.get(item_id, 0) + count elif item_type == "docker": cmds = _build_docker_command(item) if isinstance(cmds, str): cmds = [cmds] containers: list[str] = item.get("containers", []) for i, cmd in enumerate(cmds): # Use the container name as the final path segment when available container_name = containers[i] if i < len(containers) else str(i) container_id = f"{item_id}/{container_name}" if len(cmds) > 1 else item_id count = _stream_and_write( t, cmd, docker_log.parse, container_id, compiled, ingest_time, conn, batch_size, ) stats[container_id] = stats.get(container_id, 0) + count else: logger.warning( "Unknown SSH glean type %r for source %r — skipping item", item_type, host_id, ) except SSHConnectionError as exc: logger.warning("SSH connection failed for source %r: %s", host_id, exc) return stats def glean_ssh_source( src: dict, # type: ignore[type-arg] db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, ) -> dict[str, int]: """Glean a single SSH source dict and write results to *db_path*. Public wrapper around :func:`_glean_ssh_source` for the REST layer. Manages the DB connection, pattern compilation, and FTS rebuild so callers don't have to deal with those lifecycle concerns. Returns stats mapping ``{sub_source_id: entry_count}``. """ effective_pattern_file = pattern_file or Path("patterns/default.yaml") compiled = _compile(load_patterns(effective_pattern_file)) ingest_time = now_iso() ensure_schema(db_path) with get_conn(db_path) as conn: stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size) logger.info("Rebuilding FTS index after SSH source glean...") build_fts_index(db_path) return stats def glean_dir( corpus_dir: Path, db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, force: bool = False, ) -> dict[str, int]: """Glean all .jsonl and .log files from a corpus directory. Pass ``force=True`` to bypass fingerprint checks and re-glean all files regardless of whether they have changed since the last run. """ files = sorted(corpus_dir.rglob("*.jsonl")) + sorted(corpus_dir.rglob("*.log")) return _glean_files(files, db_path, pattern_file, batch_size, force=force) def glean_file( log_file: Path, db_path: Path, pattern_file: Path | None = None, force: bool = False, ) -> dict[str, int]: """Glean a single log file (any supported format). Pass ``force=True`` to re-glean even when the file fingerprint is unchanged. """ return _glean_files([log_file], db_path, pattern_file, force=force) def glean_sources( sources_file: Path, db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, force: bool = False, ) -> dict[str, int]: """Glean all sources listed in a sources.yaml config file. Supports two source types: Local file sources (default): sources: - id: sonarr path: /opt/sonarr/config/logs/sonarr.0.txt SSH remote sources (transport: ssh): sources: - id: rack01 transport: ssh host: 192.168.1.10 user: admin key_path: ~/.ssh/id_ed25519 glean: - type: journald args: ["--since", "2 hours ago"] - type: syslog path: /var/log/syslog - type: plaintext path: /var/log/app/error.log - type: docker containers: [myapp, nginx] Missing local paths and SSH connection failures are logged as warnings so the cron keeps running when a source is temporarily down. """ with open(sources_file) as f: config = yaml.safe_load(f) local_sources: list[dict] = [] # type: ignore[type-arg] ssh_sources: list[dict] = [] # type: ignore[type-arg] for src in config.get("sources", []): if src.get("transport") == "ssh": ssh_sources.append(src) else: local_sources.append(src) # ── Local file sources ───────────────────────────────────────────────── files: list[Path] = [] source_id_map: dict[Path, str] = {} for src in local_sources: path = Path(src["path"]) if not path.exists(): logger.warning("Source %r not found, skipping: %s", src.get("id", "?"), path) continue files.append(path) if "id" in src: source_id_map[path] = src["id"] if not files and not ssh_sources: logger.warning("No sources found — check sources.yaml paths") return {} stats: dict[str, int] = {} if files: stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map, force=force)) # ── SSH remote sources ───────────────────────────────────────────────── if not ssh_sources: return stats # Compile patterns once, share across all SSH sources in this run. effective_pattern_file = pattern_file or Path("patterns/default.yaml") compiled = _compile(load_patterns(effective_pattern_file)) ingest_time = now_iso() ensure_schema(db_path) with get_conn(db_path) as conn: for src in ssh_sources: ssh_stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size) for k, v in ssh_stats.items(): stats[k] = stats.get(k, 0) + v conn.commit() # Rebuild FTS only when SSH sources added entries (_glean_files already # rebuilds when local sources are present; safe to call again if both ran). if ssh_sources: logger.info("Rebuilding FTS index after SSH glean...") build_fts_index(db_path) return stats