"""FTS5-based log search with severity, source, and pattern filters.""" from __future__ import annotations import json import logging import re import sqlite3 from dataclasses import dataclass from pathlib import Path logger = logging.getLogger(__name__) @dataclass(frozen=True) class SearchResult: entry_id: str source_id: str sequence: int timestamp_iso: str | None severity: str | None repeat_count: int out_of_order: bool matched_patterns: list[str] text: str rank: float def build_fts_index(db_path: Path) -> None: """Build (or rebuild) the FTS5 index from log_entries. Safe to re-run. Drops and recreates the table if the schema is stale (missing sequence column). """ conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") # Check whether existing table has the sequence column; rebuild if not. needs_rebuild = False try: conn.execute("SELECT sequence FROM log_fts LIMIT 0") except sqlite3.OperationalError: needs_rebuild = True if needs_rebuild: conn.execute("DROP TABLE IF EXISTS log_fts") conn.executescript(""" CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5( text, entry_id UNINDEXED, source_id UNINDEXED, sequence UNINDEXED, severity UNINDEXED, timestamp_iso UNINDEXED, matched_patterns UNINDEXED, repeat_count UNINDEXED, out_of_order UNINDEXED, tokenize = 'porter ascii' ); """) # Only insert rows not already indexed conn.execute(""" INSERT INTO log_fts(text, entry_id, source_id, sequence, severity, timestamp_iso, matched_patterns, repeat_count, out_of_order) SELECT e.text, e.id, e.source_id, e.sequence, e.severity, e.timestamp_iso, e.matched_patterns, e.repeat_count, e.out_of_order FROM log_entries e WHERE e.id NOT IN (SELECT entry_id FROM log_fts WHERE entry_id IS NOT NULL) """) conn.commit() conn.close() def _sanitize_fts_query(raw: str, or_mode: bool = False) -> str: """Strip FTS5 operator characters and return a safe MATCH expression. FTS5 reserves: " * + - ( ) ^ ~ : ? or_mode=True joins tokens with OR (any-of) instead of implicit AND (all-of). """ cleaned = re.sub(r"[^a-zA-Z0-9 _]", " ", raw) tokens = cleaned.split() if not tokens: return '""' return (" OR " if or_mode else " ").join(tokens) def search( db_path: Path, query: str, severity: str | None = None, source_filter: str | None = None, pattern_filter: str | None = None, since: str | None = None, until: str | None = None, limit: int = 20, include_repeats: bool = False, or_mode: bool = False, ) -> list[SearchResult]: """Full-text search with optional filters. Returns results ranked by relevance.""" conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.row_factory = sqlite3.Row fts_query = _sanitize_fts_query(query, or_mode=or_mode) conditions = ["log_fts MATCH ?"] params: list = [fts_query] if severity: conditions.append("severity = ?") params.append(severity.upper()) if source_filter: conditions.append("source_id LIKE ?") params.append(f"%{source_filter}%") if pattern_filter: conditions.append("matched_patterns LIKE ?") params.append(f'%"{pattern_filter}"%') if since: conditions.append("timestamp_iso >= ?") params.append(since) if until: conditions.append("timestamp_iso <= ?") params.append(until) if not include_repeats: conditions.append("repeat_count = 1") where = " AND ".join(conditions) params.append(limit) try: rows = conn.execute( f""" SELECT entry_id, source_id, sequence, timestamp_iso, severity, repeat_count, out_of_order, matched_patterns, text, rank FROM log_fts WHERE {where} ORDER BY rank LIMIT ? """, params, ).fetchall() except sqlite3.OperationalError as e: logger.warning("FTS query failed (%s) — index may not be built yet", e) conn.close() return [] results = [ SearchResult( entry_id=r["entry_id"], source_id=r["source_id"], sequence=r["sequence"], timestamp_iso=r["timestamp_iso"], severity=r["severity"], repeat_count=r["repeat_count"], out_of_order=bool(r["out_of_order"]), matched_patterns=json.loads(r["matched_patterns"] or "[]"), text=r["text"], rank=r["rank"], ) for r in rows ] conn.close() return results def entries_in_window( db_path: Path, since: str | None, until: str | None, severity: str | None = None, limit: int = 100, ) -> list[SearchResult]: """Return log entries within a time window using a plain SQL scan (no FTS). Used as a fallback when keyword search returns nothing — ensures incident detail always shows the raw log activity in the window even if no keywords match. """ conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.row_factory = sqlite3.Row conditions: list[str] = ["repeat_count = 1"] params: list = [] if since: conditions.append("timestamp_iso >= ?") params.append(since) if until: conditions.append("timestamp_iso <= ?") params.append(until) if severity: conditions.append("severity = ?") params.append(severity.upper()) where = " AND ".join(conditions) params.append(limit) rows = conn.execute( f""" SELECT id as entry_id, source_id, sequence, timestamp_iso, severity, repeat_count, out_of_order, matched_patterns, text, 0.0 as rank FROM log_entries WHERE {where} ORDER BY timestamp_iso ASC LIMIT ? """, params, ).fetchall() conn.close() return [ SearchResult( entry_id=r["entry_id"], source_id=r["source_id"], sequence=r["sequence"], timestamp_iso=r["timestamp_iso"], severity=r["severity"], repeat_count=r["repeat_count"], out_of_order=bool(r["out_of_order"]), matched_patterns=json.loads(r["matched_patterns"] or "[]"), text=r["text"], rank=r["rank"], ) for r in rows ] def list_sources(db_path: Path) -> list[dict]: """Return distinct sources with entry counts and time ranges.""" conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") rows = conn.execute(""" SELECT source_id, COUNT(*) as entry_count, MIN(timestamp_iso) as earliest, MAX(timestamp_iso) as latest, SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) as error_count FROM log_entries GROUP BY source_id ORDER BY entry_count DESC """).fetchall() conn.close() return [ { "source_id": r[0], "entry_count": r[1], "earliest": r[2], "latest": r[3], "error_count": r[4], } for r in rows ] def format_results(results: list[SearchResult], max_text: int = 300) -> str: """Format search results as readable text for LLM context.""" if not results: return "No matching log entries found." lines = [] for r in results: ts = r.timestamp_iso or "no-timestamp" sev = r.severity or "?" src = r.source_id flags = [] if r.repeat_count > 1: flags.append(f"repeat×{r.repeat_count}") if r.out_of_order: flags.append("out-of-order") if r.matched_patterns: flags.append(f"[{', '.join(r.matched_patterns)}]") flag_str = f" {' '.join(flags)}" if flags else "" text = r.text[:max_text] + ("…" if len(r.text) > max_text else "") lines.append(f"[{ts} | {src} | {sev}{flag_str}]\n{text}") return "\n\n".join(lines)