"""FTS-based log search with optional hybrid BM25 + vector re-ranking. SQLite backend: FTS5 virtual table with Porter stemmer. Postgres backend: tsvector column with GIN index + websearch_to_tsquery. """ from __future__ import annotations import json import logging import re import sqlite3 from dataclasses import dataclass from datetime import datetime, timedelta, timezone from pathlib import Path from app.db import BACKEND, Backend, frag, get_conn, resolve_tenant_id logger = logging.getLogger(__name__) @dataclass(frozen=True) class SearchResult: entry_id: str source_id: str sequence: int timestamp_iso: str | None severity: str | None repeat_count: int out_of_order: bool matched_patterns: list[str] text: str rank: float def build_fts_index(db_path: Path) -> None: """Build (or rebuild) the FTS5 index from log_entries. Safe to re-run. For Postgres, the tsvector column is maintained by a trigger — this is a no-op. """ if BACKEND == Backend.POSTGRES: return raw = sqlite3.connect(str(db_path), timeout=30.0) raw.execute("PRAGMA journal_mode=WAL") needs_rebuild = False try: raw.execute("SELECT sequence FROM log_fts LIMIT 0") except sqlite3.OperationalError: needs_rebuild = True if needs_rebuild: raw.execute("DROP TABLE IF EXISTS log_fts") raw.executescript(""" CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5( text, entry_id UNINDEXED, source_id UNINDEXED, sequence UNINDEXED, severity UNINDEXED, timestamp_iso UNINDEXED, matched_patterns UNINDEXED, repeat_count UNINDEXED, out_of_order UNINDEXED, tokenize = 'porter ascii' ); """) raw.execute(""" INSERT INTO log_fts(text, entry_id, source_id, sequence, severity, timestamp_iso, matched_patterns, repeat_count, out_of_order) SELECT e.text, e.id, e.source_id, e.sequence, e.severity, e.timestamp_iso, e.matched_patterns, e.repeat_count, e.out_of_order FROM log_entries e WHERE e.id NOT IN (SELECT entry_id FROM log_fts WHERE entry_id IS NOT NULL) """) raw.commit() raw.close() def _sanitize_fts_query(raw: str, or_mode: bool = False) -> str: """Strip FTS5 operator characters and return a safe MATCH expression. FTS5 reserves: " * + - ( ) ^ ~ : ? or_mode=True joins tokens with OR (any-of) instead of implicit AND (all-of). """ cleaned = re.sub(r"[^a-zA-Z0-9 _]", " ", raw) tokens = cleaned.split() if not tokens: return '""' return (" OR " if or_mode else " ").join(tokens) def search( db_path: Path, query: str, severity: str | None = None, source_filter: str | None = None, pattern_filter: str | None = None, since: str | None = None, until: str | None = None, limit: int = 20, include_repeats: bool = False, or_mode: bool = False, semantic: bool = False, ) -> list[SearchResult]: """Full-text search with optional filters. Returns results ranked by relevance. When ``semantic=True`` and an embedding backend is configured, the BM25 candidate pool is re-ranked using hybrid scoring (BM25 + cosine similarity). Falls back silently to pure BM25 when the embedder is unavailable. """ if semantic: return _hybrid_search( db_path, query, severity=severity, source_filter=source_filter, pattern_filter=pattern_filter, since=since, until=until, limit=limit, include_repeats=include_repeats, or_mode=or_mode, ) return _bm25_search( db_path, query, severity=severity, source_filter=source_filter, pattern_filter=pattern_filter, since=since, until=until, limit=limit, include_repeats=include_repeats, or_mode=or_mode, ) def _hybrid_search( db_path: Path, query: str, severity: str | None = None, source_filter: str | None = None, pattern_filter: str | None = None, since: str | None = None, until: str | None = None, limit: int = 20, include_repeats: bool = False, or_mode: bool = False, alpha: float = 0.6, beta: float = 0.4, ) -> list[SearchResult]: """BM25 + vector re-ranking (late-fusion hybrid search). Fetches an oversized BM25 candidate pool, embeds the query and each candidate text in-process, then combines scores: hybrid_score = alpha * bm25_normalized + beta * cosine_sim BM25 normalization: FTS5 rank is negative (more negative = better match). We flip the sign and divide by the pool maximum so all BM25 scores land in (0, 1] — 1.0 for the top BM25 hit, approaching 0 for the weakest. Falls back to pure BM25 when the embedding backend is unavailable. """ from app.services.embeddings import EMBEDDING_AVAILABLE, cosine_similarity, get_embedder # Fetch a large candidate pool — 5x limit, minimum 100 entries. pool_limit = max(limit * 5, 100) candidates = _bm25_search( db_path, query, severity=severity, source_filter=source_filter, pattern_filter=pattern_filter, since=since, until=until, limit=pool_limit, include_repeats=include_repeats, or_mode=or_mode, ) if not candidates: return [] if not EMBEDDING_AVAILABLE: return candidates[:limit] embedder = get_embedder() if embedder is None: return candidates[:limit] try: query_vec = embedder.embed(query) candidate_vecs = embedder.embed_batch([r.text for r in candidates]) except Exception as exc: logger.warning("Hybrid search embedding failed (%s) — falling back to BM25", exc) return candidates[:limit] # Normalize BM25 ranks: FTS5 rank is negative, flip and scale to [0, 1]. abs_ranks = [abs(r.rank) for r in candidates] max_rank = max(abs_ranks) or 1.0 scored: list[tuple[float, SearchResult]] = [] for result, abs_rank, cand_vec in zip(candidates, abs_ranks, candidate_vecs): bm25_norm = abs_rank / max_rank cos_sim = cosine_similarity(query_vec, cand_vec) hybrid = alpha * bm25_norm + beta * max(cos_sim, 0.0) scored.append((hybrid, result)) scored.sort(key=lambda x: x[0], reverse=True) return [r for _, r in scored[:limit]] def _bm25_search( db_path: Path, query: str, severity: str | None = None, source_filter: str | None = None, pattern_filter: str | None = None, since: str | None = None, until: str | None = None, limit: int = 20, include_repeats: bool = False, or_mode: bool = False, ) -> list[SearchResult]: """FTS search — BM25 via FTS5 (SQLite) or tsvector (Postgres).""" tid = resolve_tenant_id() if BACKEND == Backend.POSTGRES: return _pg_fts_search( db_path, query, tid, severity=severity, source_filter=source_filter, pattern_filter=pattern_filter, since=since, until=until, limit=limit, include_repeats=include_repeats, ) return _sqlite_fts_search( db_path, query, tid, severity=severity, source_filter=source_filter, pattern_filter=pattern_filter, since=since, until=until, limit=limit, include_repeats=include_repeats, or_mode=or_mode, ) def _sqlite_fts_search( db_path: Path, query: str, tid: str, severity: str | None, source_filter: str | None, pattern_filter: str | None, since: str | None, until: str | None, limit: int, include_repeats: bool, or_mode: bool, ) -> list[SearchResult]: fts_query = _sanitize_fts_query(query, or_mode=or_mode) conditions = [ "log_fts MATCH ?", "(e.tenant_id = ? OR e.tenant_id = '')", ] params: list = [fts_query, tid] if severity: conditions.append("severity = ?") params.append(severity.upper()) if source_filter: conditions.append("source_id LIKE ?") params.append(f"%{source_filter}%") if pattern_filter: conditions.append("matched_patterns LIKE ?") params.append(f'%"{pattern_filter}"%') if since: conditions.append("timestamp_iso >= ?") params.append(since) if until: conditions.append("timestamp_iso <= ?") params.append(until) if not include_repeats: conditions.append("f.repeat_count = 1") where = " AND ".join(conditions) params.append(limit) raw = sqlite3.connect(str(db_path), timeout=30.0) raw.row_factory = sqlite3.Row try: rows = raw.execute( f""" SELECT f.entry_id, f.source_id, f.sequence, f.timestamp_iso, f.severity, f.repeat_count, f.out_of_order, f.matched_patterns, f.text, f.rank FROM log_fts f JOIN log_entries e ON e.id = f.entry_id WHERE {where} ORDER BY f.rank LIMIT ? """, params, ).fetchall() except sqlite3.OperationalError as exc: logger.warning("FTS query failed (%s) — index may not be built yet", exc) return [] finally: raw.close() return [ SearchResult( entry_id=r["entry_id"], source_id=r["source_id"], sequence=r["sequence"], timestamp_iso=r["timestamp_iso"], severity=r["severity"], repeat_count=r["repeat_count"], out_of_order=bool(r["out_of_order"]), matched_patterns=json.loads(r["matched_patterns"] or "[]"), text=r["text"], rank=float(r["rank"]), ) for r in rows ] def _pg_fts_search( db_path: Path, query: str, tid: str, severity: str | None, source_filter: str | None, pattern_filter: str | None, since: str | None, until: str | None, limit: int, include_repeats: bool, ) -> list[SearchResult]: """Postgres FTS via tsvector column and websearch_to_tsquery.""" tsq = "websearch_to_tsquery('english', %s)" conditions = [ f"text_tsv @@ {tsq}", "(tenant_id = %s OR tenant_id = '')", ] params: list = [query, tid] if severity: conditions.append("severity = %s") params.append(severity.upper()) if source_filter: conditions.append("source_id LIKE %s") params.append(f"%{source_filter}%") if pattern_filter: conditions.append("matched_patterns LIKE %s") params.append(f'%"{pattern_filter}"%') if since: conditions.append("timestamp_iso >= %s") params.append(since) if until: conditions.append("timestamp_iso <= %s") params.append(until) if not include_repeats: conditions.append("repeat_count = 1") where = " AND ".join(conditions) # ts_rank needs the tsquery again — append it then the limit params.extend([query, limit]) with get_conn(db_path) as conn: rows = conn.execute( f""" SELECT id AS entry_id, source_id, sequence, timestamp_iso, severity, repeat_count, out_of_order, matched_patterns, text, ts_rank(text_tsv, {tsq}) AS rank FROM log_entries WHERE {where} ORDER BY rank DESC LIMIT %s """, params, ).fetchall() return [ SearchResult( entry_id=r["entry_id"], source_id=r["source_id"], sequence=r["sequence"], timestamp_iso=r["timestamp_iso"], severity=r["severity"], repeat_count=r["repeat_count"], out_of_order=bool(r["out_of_order"]), matched_patterns=json.loads(r["matched_patterns"] or "[]"), text=r["text"], rank=float(r["rank"]), ) for r in rows ] def entries_in_window( db_path: Path, since: str | None, until: str | None, severity: str | None = None, source_filter: str | None = None, limit: int = 100, per_source_cap: int | None = None, ) -> list[SearchResult]: """Return log entries within a time window using a plain SQL scan (no FTS). Used as a fallback when keyword search returns nothing — ensures incident detail always shows the raw log activity in the window even if no keywords match. per_source_cap: when set, limits rows per source_id so high-volume sources (e.g. network-syslog) don't crowd out lower-volume but more interesting ones. Errors/warnings are ranked first within each source partition. """ tid = resolve_tenant_id() conditions: list[str] = [ "repeat_count = 1", "(tenant_id = ? OR tenant_id = '')", ] params: list = [tid] if since: conditions.append("timestamp_iso >= ?") params.append(since) if until: conditions.append("timestamp_iso <= ?") params.append(until) if severity: conditions.append("severity = ?") params.append(severity.upper()) if source_filter: conditions.append("source_id LIKE ?") params.append(f"%{source_filter}%") where = " AND ".join(conditions) if per_source_cap is not None: sql = f""" WITH ranked AS ( SELECT id as entry_id, source_id, sequence, timestamp_iso, severity, repeat_count, out_of_order, matched_patterns, text, 0.0 as rank, ROW_NUMBER() OVER ( PARTITION BY source_id ORDER BY CASE UPPER(severity) WHEN 'CRITICAL' THEN 0 WHEN 'ERROR' THEN 1 WHEN 'WARN' THEN 2 ELSE 3 END, timestamp_iso ) AS rn FROM log_entries WHERE {where} ) SELECT entry_id, source_id, sequence, timestamp_iso, severity, repeat_count, out_of_order, matched_patterns, text, rank FROM ranked WHERE rn <= ? ORDER BY timestamp_iso ASC LIMIT ? """ params.extend([per_source_cap, limit]) else: sql = f""" SELECT id as entry_id, source_id, sequence, timestamp_iso, severity, repeat_count, out_of_order, matched_patterns, text, 0.0 as rank FROM log_entries WHERE {where} ORDER BY timestamp_iso ASC LIMIT ? """ params.append(limit) with get_conn(db_path) as conn: rows = conn.execute(sql, params).fetchall() return [ SearchResult( entry_id=r["entry_id"], source_id=r["source_id"], sequence=r["sequence"], timestamp_iso=r["timestamp_iso"], severity=r["severity"], repeat_count=r["repeat_count"], out_of_order=bool(r["out_of_order"]), matched_patterns=json.loads(r["matched_patterns"] or "[]"), text=r["text"], rank=float(r["rank"]), ) for r in rows ] def recent_source_errors( db_path: Path, source_filter: str, severity: str = "ERROR", limit: int = 10, since: str | None = None, until: str | None = None, ) -> list[SearchResult]: """Plain-SQL scan: most recent error entries from a named source. Bypasses FTS ranking so text content doesn't affect which errors surface. Used by diagnose when FTS keyword search returns nothing for a known source. """ tid = resolve_tenant_id() conditions = [ "source_id LIKE ?", "severity = ?", "repeat_count = 1", "(tenant_id = ? OR tenant_id = '')", ] params: list = [f"%{source_filter}%", severity.upper(), tid] if since: conditions.append("timestamp_iso >= ?") params.append(since) if until: conditions.append("timestamp_iso <= ?") params.append(until) params.append(limit) where = " AND ".join(conditions) with get_conn(db_path) as conn: rows = conn.execute( f""" SELECT id as entry_id, source_id, sequence, timestamp_iso, severity, repeat_count, out_of_order, matched_patterns, text, 0.0 as rank FROM log_entries WHERE {where} ORDER BY timestamp_iso DESC LIMIT ? """, params, ).fetchall() return [ SearchResult( entry_id=r["entry_id"], source_id=r["source_id"], sequence=r["sequence"], timestamp_iso=r["timestamp_iso"], severity=r["severity"], repeat_count=r["repeat_count"], out_of_order=bool(r["out_of_order"]), matched_patterns=json.loads(r["matched_patterns"] or "[]"), text=r["text"], rank=float(r["rank"]), ) for r in rows ] def list_sources(db_path: Path) -> list[dict]: """Return sources with entry counts, grouped by prefix:host stem. source_ids with three or more colon-separated segments (e.g. ``muninn-journal:Muninn:ssh.service``) are collapsed to their first two segments (``muninn-journal:Muninn``). Single- or two-segment IDs are returned as-is. ``unit_count`` reports how many distinct sub-units were merged into each row. """ tid = resolve_tenant_id() group_expr = frag.source_group_expr("source_id") with get_conn(db_path) as conn: rows = conn.execute( f""" SELECT {group_expr} AS group_id, COUNT(DISTINCT source_id) AS unit_count, COUNT(*) AS entry_count, MIN(timestamp_iso) AS earliest, MAX(timestamp_iso) AS latest, SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS error_count FROM log_entries WHERE (tenant_id = ? OR tenant_id = '') GROUP BY group_id ORDER BY entry_count DESC """, (tid,), ).fetchall() return [ { "source_id": r["group_id"], "unit_count": r["unit_count"], "entry_count": r["entry_count"], "earliest": r["earliest"], "latest": r["latest"], "error_count": r["error_count"], } for r in rows ] def _compile_overrides(overrides: list[dict]) -> list[tuple[re.Pattern[str], str]]: """Return (compiled_pattern, override_severity) pairs for enabled rules.""" compiled = [] for rule in overrides: if not rule.get("enabled", True): continue try: compiled.append((re.compile(rule["pattern"], re.IGNORECASE), rule["override_severity"])) except re.error: pass return compiled def _apply_overrides(text: str, original_severity: str, rules: list[tuple[re.Pattern[str], str]]) -> str: for pattern, new_sev in rules: if pattern.search(text): return new_sev return original_severity def stats_summary(db_path: Path, window_hours: int = 24, severity_overrides: list[dict] | None = None) -> dict: """Return aggregate health stats for the dashboard. Queries plain log_entries (not FTS) so it works even before the index is built. """ rules = _compile_overrides(severity_overrides or []) tid = resolve_tenant_id() group_expr = frag.source_group_expr("source_id") since_iso = ( datetime.now(timezone.utc) - timedelta(hours=window_hours) ).strftime("%Y-%m-%dT%H:%M:%S") with get_conn(db_path) as conn: row = conn.execute( """ SELECT COUNT(*) AS total, SUM(CASE WHEN severity = 'CRITICAL' THEN 1 ELSE 0 END) AS criticals, SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS errors FROM log_entries WHERE timestamp_iso >= ? AND repeat_count = 1 AND (tenant_id = ? OR tenant_id = '') """, (since_iso, tid), ).fetchone() total_24h = int(row["total"] or 0) criticals_24h = int(row["criticals"] or 0) errors_24h = int(row["errors"] or 0) source_rows = conn.execute( f""" SELECT {group_expr} AS group_id, COUNT(*) AS entry_count, SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS error_count, MAX(timestamp_iso) AS latest FROM log_entries WHERE timestamp_iso >= ? AND repeat_count = 1 AND (tenant_id = ? OR tenant_id = '') GROUP BY group_id ORDER BY error_count DESC, entry_count DESC """, (since_iso, tid), ).fetchall() crit_rows = conn.execute( """ SELECT id as entry_id, source_id, timestamp_iso, severity, text FROM log_entries WHERE severity = 'CRITICAL' AND repeat_count = 1 AND (tenant_id = ? OR tenant_id = '') ORDER BY timestamp_iso DESC LIMIT 25 """, (tid,), ).fetchall() last_row = conn.execute( "SELECT MAX(ingest_time) AS t FROM log_entries WHERE (tenant_id = ? OR tenant_id = '')", (tid,), ).fetchone() source_health = [ { "source_id": r["group_id"], "entry_count": int(r["entry_count"]), "error_count": int(r["error_count"]), "latest": r["latest"], } for r in source_rows ] suppressed = 0 recent_criticals = [] for r in crit_rows: effective = _apply_overrides(r["text"], r["severity"], rules) if effective == "CRITICAL": recent_criticals.append({ "entry_id": r["entry_id"], "source_id": r["source_id"], "timestamp_iso": r["timestamp_iso"], "severity": r["severity"], "text": r["text"], }) if len(recent_criticals) == 5: break else: suppressed += 1 last_gleaned: str | None = last_row["t"] if last_row else None return { "window_hours": window_hours, "total_24h": total_24h, "criticals_24h": criticals_24h, "errors_24h": errors_24h, "source_health": source_health, "recent_criticals": recent_criticals, "suppressed_criticals": suppressed, "last_gleaned": last_gleaned, } def format_results(results: list[SearchResult], max_text: int = 300) -> str: """Format search results as readable text for LLM context.""" if not results: return "No matching log entries found." lines = [] for r in results: ts = r.timestamp_iso or "no-timestamp" sev = r.severity or "?" src = r.source_id flags = [] if r.repeat_count > 1: flags.append(f"repeat×{r.repeat_count}") if r.out_of_order: flags.append("out-of-order") if r.matched_patterns: flags.append(f"[{', '.join(r.matched_patterns)}]") flag_str = f" {' '.join(flags)}" if flags else "" text = r.text[:max_text] + ("…" if len(r.text) > max_text else "") lines.append(f"[{ts} | {src} | {sev}{flag_str}]\n{text}") return "\n\n".join(lines)