"""FTS5-based log search with severity, source, and pattern filters.""" from __future__ import annotations import json import logging import sqlite3 from dataclasses import dataclass from pathlib import Path logger = logging.getLogger(__name__) @dataclass(frozen=True) class SearchResult: entry_id: str source_id: str sequence: int timestamp_iso: str | None severity: str | None repeat_count: int out_of_order: bool matched_patterns: list[str] text: str rank: float def build_fts_index(db_path: Path) -> None: """Build (or rebuild) the FTS5 index from log_entries. Safe to re-run. Drops and recreates the table if the schema is stale (missing sequence column). """ conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") # Check whether existing table has the sequence column; rebuild if not. needs_rebuild = False try: conn.execute("SELECT sequence FROM log_fts LIMIT 0") except sqlite3.OperationalError: needs_rebuild = True if needs_rebuild: conn.execute("DROP TABLE IF EXISTS log_fts") conn.executescript(""" CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5( text, entry_id UNINDEXED, source_id UNINDEXED, sequence UNINDEXED, severity UNINDEXED, timestamp_iso UNINDEXED, matched_patterns UNINDEXED, repeat_count UNINDEXED, out_of_order UNINDEXED, tokenize = 'porter ascii' ); """) # Only insert rows not already indexed conn.execute(""" INSERT INTO log_fts(text, entry_id, source_id, sequence, severity, timestamp_iso, matched_patterns, repeat_count, out_of_order) SELECT e.text, e.id, e.source_id, e.sequence, e.severity, e.timestamp_iso, e.matched_patterns, e.repeat_count, e.out_of_order FROM log_entries e WHERE e.id NOT IN (SELECT entry_id FROM log_fts WHERE entry_id IS NOT NULL) """) conn.commit() conn.close() def search( db_path: Path, query: str, severity: str | None = None, source_filter: str | None = None, pattern_filter: str | None = None, since: str | None = None, until: str | None = None, limit: int = 20, include_repeats: bool = False, ) -> list[SearchResult]: """Full-text search with optional filters. Returns results ranked by relevance.""" conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.row_factory = sqlite3.Row conditions = ["log_fts MATCH ?"] params: list = [query] if severity: conditions.append("severity = ?") params.append(severity.upper()) if source_filter: conditions.append("source_id LIKE ?") params.append(f"%{source_filter}%") if pattern_filter: conditions.append("matched_patterns LIKE ?") params.append(f'%"{pattern_filter}"%') if since: conditions.append("timestamp_iso >= ?") params.append(since) if until: conditions.append("timestamp_iso <= ?") params.append(until) if not include_repeats: conditions.append("repeat_count = 1") where = " AND ".join(conditions) params.append(limit) try: rows = conn.execute( f""" SELECT entry_id, source_id, sequence, timestamp_iso, severity, repeat_count, out_of_order, matched_patterns, text, rank FROM log_fts WHERE {where} ORDER BY rank LIMIT ? """, params, ).fetchall() except sqlite3.OperationalError as e: logger.warning("FTS query failed (%s) — index may not be built yet", e) conn.close() return [] results = [ SearchResult( entry_id=r["entry_id"], source_id=r["source_id"], sequence=r["sequence"], timestamp_iso=r["timestamp_iso"], severity=r["severity"], repeat_count=r["repeat_count"], out_of_order=bool(r["out_of_order"]), matched_patterns=json.loads(r["matched_patterns"] or "[]"), text=r["text"], rank=r["rank"], ) for r in rows ] conn.close() return results def list_sources(db_path: Path) -> list[dict]: """Return distinct sources with entry counts and time ranges.""" conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") rows = conn.execute(""" SELECT source_id, COUNT(*) as entry_count, MIN(timestamp_iso) as earliest, MAX(timestamp_iso) as latest, SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) as error_count FROM log_entries GROUP BY source_id ORDER BY entry_count DESC """).fetchall() conn.close() return [ { "source_id": r[0], "entry_count": r[1], "earliest": r[2], "latest": r[3], "error_count": r[4], } for r in rows ] def format_results(results: list[SearchResult], max_text: int = 300) -> str: """Format search results as readable text for LLM context.""" if not results: return "No matching log entries found." lines = [] for r in results: ts = r.timestamp_iso or "no-timestamp" sev = r.severity or "?" src = r.source_id flags = [] if r.repeat_count > 1: flags.append(f"repeat×{r.repeat_count}") if r.out_of_order: flags.append("out-of-order") if r.matched_patterns: flags.append(f"[{', '.join(r.matched_patterns)}]") flag_str = f" {' '.join(flags)}" if flags else "" text = r.text[:max_text] + ("…" if len(r.text) > max_text else "") lines.append(f"[{ts} | {src} | {sev}{flag_str}]\n{text}") return "\n\n".join(lines)