"""Glean pipeline: auto-detect format, parse, write to SQLite.""" from __future__ import annotations import json import logging import re import sqlite3 from pathlib import Path from typing import Iterator import yaml from app.glean import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog, wazuh from app.glean.base import _compile, load_patterns, now_iso from app.glean.ssh import ( SSHTransport, SSHConnectionError, SSHCommandError, _build_docker_command, _build_journald_command, _build_plaintext_command, _build_syslog_command, ) from app.services.models import LogPattern, RetrievedEntry from app.services.search import build_fts_index logger = logging.getLogger(__name__) _SCHEMA = """ CREATE TABLE IF NOT EXISTS log_entries ( id TEXT PRIMARY KEY, source_id TEXT NOT NULL, sequence INTEGER NOT NULL, timestamp_raw TEXT, timestamp_iso TEXT, ingest_time TEXT NOT NULL, severity TEXT, repeat_count INTEGER DEFAULT 1, out_of_order INTEGER DEFAULT 0, matched_patterns TEXT DEFAULT '[]', text TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id); CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso); CREATE INDEX IF NOT EXISTS idx_ts_repeat ON log_entries(timestamp_iso, repeat_count); CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity); CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns); CREATE TABLE IF NOT EXISTS incidents ( id TEXT PRIMARY KEY, label TEXT NOT NULL, issue_type TEXT NOT NULL DEFAULT '', started_at TEXT, ended_at TEXT, notes TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'medium' ); CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at); CREATE TABLE IF NOT EXISTS received_bundles ( id TEXT PRIMARY KEY, source_host TEXT NOT NULL, issue_type TEXT NOT NULL DEFAULT '', label TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'medium', started_at TEXT, bundled_at TEXT NOT NULL, entry_count INTEGER NOT NULL DEFAULT 0, bundle_json TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at); CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type); CREATE TABLE IF NOT EXISTS context_facts ( id TEXT PRIMARY KEY, category TEXT NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL, source TEXT, created_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category); CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key); CREATE TABLE IF NOT EXISTS context_documents ( id TEXT PRIMARY KEY, filename TEXT NOT NULL, doc_type TEXT NOT NULL, full_text TEXT NOT NULL, file_size INTEGER, uploaded_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS context_chunks ( id TEXT PRIMARY KEY, document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE, chunk_index INTEGER NOT NULL, text TEXT NOT NULL, embedding BLOB ); CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id); CREATE TABLE IF NOT EXISTS blocklist_candidates ( id TEXT PRIMARY KEY, domain_or_ip TEXT NOT NULL, source_device_ip TEXT, source_device_name TEXT, first_seen TEXT NOT NULL, last_seen TEXT NOT NULL, hit_count INTEGER DEFAULT 1, status TEXT DEFAULT 'pending', pushed_at TEXT, log_evidence TEXT DEFAULT '[]', matched_rule TEXT, llm_score REAL, llm_reason TEXT ); CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip); CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status); CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip); """ def ensure_schema(db_path: Path) -> None: """Create all tables and apply additive migrations. Safe to call on every startup.""" conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.executescript(_SCHEMA) # Additive column migrations — ALTER TABLE silently skips if column exists for stmt in [ "ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''", ]: try: conn.execute(stmt) except sqlite3.OperationalError: pass conn.commit() conn.close() def _detect_format(first_line: str) -> str: try: obj = json.loads(first_line) if "__REALTIME_TIMESTAMP" in obj: return "journald" if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"): return "docker" if wazuh.is_wazuh_alert(obj): return "wazuh" if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj): return "caddy" except (json.JSONDecodeError, AttributeError): pass if plex.is_plex_log(first_line): return "plex" if qbittorrent.is_qbit_log(first_line): return "qbittorrent" if servarr.is_servarr_log(first_line): return "servarr" if dmesg_log.is_dmesg_log(first_line): return "dmesg" if syslog.is_syslog(first_line): return "syslog" return "plaintext" def _parse_file( path: Path, compiled: list[tuple[LogPattern, object]], ingest_time: str, source_id: str | None = None, ) -> Iterator[RetrievedEntry]: source_id = source_id or path.stem with path.open("r", errors="replace") as f: lines = iter(f) try: first = next(lines) except StopIteration: return fmt = _detect_format(first.strip()) logger.info("Detected format %r for %s", fmt, path.name) def all_lines(): yield first yield from lines if fmt == "journald": yield from journald.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "wazuh": yield from wazuh.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "docker": yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "caddy": yield from caddy.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "plex": yield from plex.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "qbittorrent": yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "servarr": yield from servarr.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "dmesg": yield from dmesg_log.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "syslog": yield from syslog.parse(all_lines(), source_id, compiled, ingest_time) else: yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time) def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None: conn.executemany( """ INSERT OR IGNORE INTO log_entries (id, source_id, sequence, timestamp_raw, timestamp_iso, ingest_time, severity, repeat_count, out_of_order, matched_patterns, text) VALUES (?,?,?,?,?,?,?,?,?,?,?) """, [ ( e.entry_id, e.source_id, e.sequence, e.timestamp_raw, e.timestamp_iso, e.ingest_time, e.severity, e.repeat_count, int(e.out_of_order), json.dumps(list(e.matched_patterns)), e.text, ) for e in batch ], ) def _glean_files( files: list[Path], db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, source_id_map: dict[Path, str] | None = None, ) -> dict[str, int]: pattern_file = pattern_file or Path("patterns/default.yaml") patterns = load_patterns(pattern_file) compiled = _compile(patterns) ingest_time = now_iso() source_id_map = source_id_map or {} conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.executescript(_SCHEMA) conn.commit() stats: dict[str, int] = {} for log_file in files: source_id = source_id_map.get(log_file, log_file.stem) count = 0 batch: list[RetrievedEntry] = [] for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id): batch.append(entry) if len(batch) >= batch_size: _write_batch(conn, batch) conn.commit() count += len(batch) batch.clear() if batch: _write_batch(conn, batch) conn.commit() count += len(batch) stats[source_id] = stats.get(source_id, 0) + count logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id) conn.close() logger.info("Building FTS index...") build_fts_index(db_path) logger.info("FTS index ready") return stats def _stream_and_write( transport: SSHTransport, cmd: str, parser, source_id: str, compiled: list[tuple[LogPattern, object]], ingest_time: str, conn: sqlite3.Connection, batch_size: int, ) -> int: """Stream *cmd* output through *parser* and write entries to *conn*. Catches SSHCommandError per-item so one bad command doesn't abort the rest of the glean items for this host. Returns the number of entries written. """ count = 0 batch: list[RetrievedEntry] = [] try: for entry in parser(transport.exec_stream(cmd), source_id, compiled, ingest_time): batch.append(entry) if len(batch) >= batch_size: _write_batch(conn, batch) conn.commit() count += len(batch) batch.clear() if batch: _write_batch(conn, batch) conn.commit() count += len(batch) except SSHCommandError as exc: logger.warning("SSH command failed for source %r (cmd: %s): %s", source_id, cmd, exc) logger.info("Gleaned %d entries from SSH source %s", count, source_id) return count def _glean_ssh_source( src: dict, # type: ignore[type-arg] compiled: list[tuple[LogPattern, object]], ingest_time: str, conn: sqlite3.Connection, batch_size: int, ) -> dict[str, int]: """Open one SSHTransport connection for *src* and glean all its glean items. One SSH connection is shared across all items in the ``glean:`` list so the handshake overhead is paid only once per host per glean run. Returns a stats dict mapping ``{source_id: entry_count}`` for each item. Gracefully skips the entire source on SSHConnectionError. """ host_id = src.get("id", src.get("host", "unknown")) host = src["host"] user = src["user"] key_path = str(Path(src["key_path"]).expanduser()) port = int(src.get("port", 22)) glean_items: list[dict] = src.get("glean", []) # type: ignore[type-arg] stats: dict[str, int] = {} try: with SSHTransport(host=host, user=user, key_path=key_path, port=port) as t: for item in glean_items: item_type = item.get("type", "plaintext") # Per-item source_id — falls back to host_id/type for un-labelled items item_id = item.get("id") or f"{host_id}/{item_type}" if item_type == "journald": cmd = _build_journald_command(item) count = _stream_and_write( t, cmd, journald.parse, item_id, compiled, ingest_time, conn, batch_size ) stats[item_id] = stats.get(item_id, 0) + count elif item_type == "syslog": cmd = _build_syslog_command(item) count = _stream_and_write( t, cmd, syslog.parse, item_id, compiled, ingest_time, conn, batch_size ) stats[item_id] = stats.get(item_id, 0) + count elif item_type == "plaintext": cmd = _build_plaintext_command(item) count = _stream_and_write( t, cmd, plaintext.parse, item_id, compiled, ingest_time, conn, batch_size ) stats[item_id] = stats.get(item_id, 0) + count elif item_type == "docker": cmds = _build_docker_command(item) if isinstance(cmds, str): cmds = [cmds] containers: list[str] = item.get("containers", []) for i, cmd in enumerate(cmds): # Use the container name as the final path segment when available container_name = containers[i] if i < len(containers) else str(i) container_id = f"{item_id}/{container_name}" if len(cmds) > 1 else item_id count = _stream_and_write( t, cmd, docker_log.parse, container_id, compiled, ingest_time, conn, batch_size, ) stats[container_id] = stats.get(container_id, 0) + count else: logger.warning( "Unknown SSH glean type %r for source %r — skipping item", item_type, host_id, ) except SSHConnectionError as exc: logger.warning("SSH connection failed for source %r: %s", host_id, exc) return stats def glean_dir( corpus_dir: Path, db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, ) -> dict[str, int]: """Glean all .jsonl and .log files from a corpus directory.""" files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log")) return _glean_files(files, db_path, pattern_file, batch_size) def glean_file( log_file: Path, db_path: Path, pattern_file: Path | None = None, ) -> dict[str, int]: """Glean a single log file (any supported format).""" return _glean_files([log_file], db_path, pattern_file) def glean_sources( sources_file: Path, db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, ) -> dict[str, int]: """Glean all sources listed in a sources.yaml config file. Supports two source types: Local file sources (default): sources: - id: sonarr path: /opt/sonarr/config/logs/sonarr.0.txt SSH remote sources (transport: ssh): sources: - id: rack01 transport: ssh host: 192.168.1.10 user: admin key_path: ~/.ssh/id_ed25519 glean: - type: journald args: ["--since", "2 hours ago"] - type: syslog path: /var/log/syslog - type: plaintext path: /var/log/app/error.log - type: docker containers: [myapp, nginx] Missing local paths and SSH connection failures are logged as warnings so the cron keeps running when a source is temporarily down. """ with open(sources_file) as f: config = yaml.safe_load(f) local_sources: list[dict] = [] # type: ignore[type-arg] ssh_sources: list[dict] = [] # type: ignore[type-arg] for src in config.get("sources", []): if src.get("transport") == "ssh": ssh_sources.append(src) else: local_sources.append(src) # ── Local file sources ───────────────────────────────────────────────── files: list[Path] = [] source_id_map: dict[Path, str] = {} for src in local_sources: path = Path(src["path"]) if not path.exists(): logger.warning("Source %r not found, skipping: %s", src.get("id", "?"), path) continue files.append(path) if "id" in src: source_id_map[path] = src["id"] if not files and not ssh_sources: logger.warning("No sources found — check sources.yaml paths") return {} stats: dict[str, int] = {} if files: stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map)) # ── SSH remote sources ───────────────────────────────────────────────── if not ssh_sources: return stats # Compile patterns once, share across all SSH sources in this run. effective_pattern_file = pattern_file or Path("patterns/default.yaml") compiled = _compile(load_patterns(effective_pattern_file)) ingest_time = now_iso() conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.executescript(_SCHEMA) conn.commit() try: for src in ssh_sources: ssh_stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size) for k, v in ssh_stats.items(): stats[k] = stats.get(k, 0) + v finally: conn.close() # Rebuild FTS only when SSH sources added entries (_glean_files already # rebuilds when local sources are present; safe to call again if both ran). if ssh_sources: logger.info("Rebuilding FTS index after SSH glean...") build_fts_index(db_path) return stats