When diagnose() auto-detects a source name, FTS keyword scoring can bury real errors whose text doesn't match the symptom query. Add recent_source_errors() — a plain-SQL scan ordered by timestamp — so the most recent errors from a known service always surface regardless of keyword overlap.
342 lines
10 KiB
Python
342 lines
10 KiB
Python
"""FTS5-based log search with severity, source, and pattern filters."""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import re
|
||
import sqlite3
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class SearchResult:
|
||
entry_id: str
|
||
source_id: str
|
||
sequence: int
|
||
timestamp_iso: str | None
|
||
severity: str | None
|
||
repeat_count: int
|
||
out_of_order: bool
|
||
matched_patterns: list[str]
|
||
text: str
|
||
rank: float
|
||
|
||
|
||
def build_fts_index(db_path: Path) -> None:
|
||
"""Build (or rebuild) the FTS5 index from log_entries. Safe to re-run.
|
||
|
||
Drops and recreates the table if the schema is stale (missing sequence column).
|
||
"""
|
||
conn = sqlite3.connect(str(db_path))
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
|
||
# Check whether existing table has the sequence column; rebuild if not.
|
||
needs_rebuild = False
|
||
try:
|
||
conn.execute("SELECT sequence FROM log_fts LIMIT 0")
|
||
except sqlite3.OperationalError:
|
||
needs_rebuild = True
|
||
|
||
if needs_rebuild:
|
||
conn.execute("DROP TABLE IF EXISTS log_fts")
|
||
|
||
conn.executescript("""
|
||
CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5(
|
||
text,
|
||
entry_id UNINDEXED,
|
||
source_id UNINDEXED,
|
||
sequence UNINDEXED,
|
||
severity UNINDEXED,
|
||
timestamp_iso UNINDEXED,
|
||
matched_patterns UNINDEXED,
|
||
repeat_count UNINDEXED,
|
||
out_of_order UNINDEXED,
|
||
tokenize = 'porter ascii'
|
||
);
|
||
""")
|
||
# Only insert rows not already indexed
|
||
conn.execute("""
|
||
INSERT INTO log_fts(text, entry_id, source_id, sequence, severity,
|
||
timestamp_iso, matched_patterns,
|
||
repeat_count, out_of_order)
|
||
SELECT e.text, e.id, e.source_id, e.sequence, e.severity,
|
||
e.timestamp_iso, e.matched_patterns,
|
||
e.repeat_count, e.out_of_order
|
||
FROM log_entries e
|
||
WHERE e.id NOT IN (SELECT entry_id FROM log_fts WHERE entry_id IS NOT NULL)
|
||
""")
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def _sanitize_fts_query(raw: str, or_mode: bool = False) -> str:
|
||
"""Strip FTS5 operator characters and return a safe MATCH expression.
|
||
|
||
FTS5 reserves: " * + - ( ) ^ ~ : ?
|
||
or_mode=True joins tokens with OR (any-of) instead of implicit AND (all-of).
|
||
"""
|
||
cleaned = re.sub(r"[^a-zA-Z0-9 _]", " ", raw)
|
||
tokens = cleaned.split()
|
||
if not tokens:
|
||
return '""'
|
||
return (" OR " if or_mode else " ").join(tokens)
|
||
|
||
|
||
def search(
|
||
db_path: Path,
|
||
query: str,
|
||
severity: str | None = None,
|
||
source_filter: str | None = None,
|
||
pattern_filter: str | None = None,
|
||
since: str | None = None,
|
||
until: str | None = None,
|
||
limit: int = 20,
|
||
include_repeats: bool = False,
|
||
or_mode: bool = False,
|
||
) -> list[SearchResult]:
|
||
"""Full-text search with optional filters. Returns results ranked by relevance."""
|
||
conn = sqlite3.connect(str(db_path))
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
conn.row_factory = sqlite3.Row
|
||
|
||
fts_query = _sanitize_fts_query(query, or_mode=or_mode)
|
||
conditions = ["log_fts MATCH ?"]
|
||
params: list = [fts_query]
|
||
|
||
if severity:
|
||
conditions.append("severity = ?")
|
||
params.append(severity.upper())
|
||
if source_filter:
|
||
conditions.append("source_id LIKE ?")
|
||
params.append(f"%{source_filter}%")
|
||
if pattern_filter:
|
||
conditions.append("matched_patterns LIKE ?")
|
||
params.append(f'%"{pattern_filter}"%')
|
||
if since:
|
||
conditions.append("timestamp_iso >= ?")
|
||
params.append(since)
|
||
if until:
|
||
conditions.append("timestamp_iso <= ?")
|
||
params.append(until)
|
||
if not include_repeats:
|
||
conditions.append("repeat_count = 1")
|
||
|
||
where = " AND ".join(conditions)
|
||
params.append(limit)
|
||
|
||
try:
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT entry_id, source_id, sequence, timestamp_iso, severity,
|
||
repeat_count, out_of_order, matched_patterns, text, rank
|
||
FROM log_fts
|
||
WHERE {where}
|
||
ORDER BY rank
|
||
LIMIT ?
|
||
""",
|
||
params,
|
||
).fetchall()
|
||
except sqlite3.OperationalError as e:
|
||
logger.warning("FTS query failed (%s) — index may not be built yet", e)
|
||
conn.close()
|
||
return []
|
||
|
||
results = [
|
||
SearchResult(
|
||
entry_id=r["entry_id"],
|
||
source_id=r["source_id"],
|
||
sequence=r["sequence"],
|
||
timestamp_iso=r["timestamp_iso"],
|
||
severity=r["severity"],
|
||
repeat_count=r["repeat_count"],
|
||
out_of_order=bool(r["out_of_order"]),
|
||
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
|
||
text=r["text"],
|
||
rank=r["rank"],
|
||
)
|
||
for r in rows
|
||
]
|
||
conn.close()
|
||
return results
|
||
|
||
|
||
def entries_in_window(
|
||
db_path: Path,
|
||
since: str | None,
|
||
until: str | None,
|
||
severity: str | None = None,
|
||
limit: int = 100,
|
||
) -> list[SearchResult]:
|
||
"""Return log entries within a time window using a plain SQL scan (no FTS).
|
||
|
||
Used as a fallback when keyword search returns nothing — ensures incident
|
||
detail always shows the raw log activity in the window even if no keywords match.
|
||
"""
|
||
conn = sqlite3.connect(str(db_path))
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
conn.row_factory = sqlite3.Row
|
||
|
||
conditions: list[str] = ["repeat_count = 1"]
|
||
params: list = []
|
||
|
||
if since:
|
||
conditions.append("timestamp_iso >= ?")
|
||
params.append(since)
|
||
if until:
|
||
conditions.append("timestamp_iso <= ?")
|
||
params.append(until)
|
||
if severity:
|
||
conditions.append("severity = ?")
|
||
params.append(severity.upper())
|
||
|
||
where = " AND ".join(conditions)
|
||
params.append(limit)
|
||
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
|
||
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank
|
||
FROM log_entries
|
||
WHERE {where}
|
||
ORDER BY timestamp_iso ASC
|
||
LIMIT ?
|
||
""",
|
||
params,
|
||
).fetchall()
|
||
conn.close()
|
||
|
||
return [
|
||
SearchResult(
|
||
entry_id=r["entry_id"],
|
||
source_id=r["source_id"],
|
||
sequence=r["sequence"],
|
||
timestamp_iso=r["timestamp_iso"],
|
||
severity=r["severity"],
|
||
repeat_count=r["repeat_count"],
|
||
out_of_order=bool(r["out_of_order"]),
|
||
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
|
||
text=r["text"],
|
||
rank=r["rank"],
|
||
)
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def recent_source_errors(
|
||
db_path: Path,
|
||
source_filter: str,
|
||
severity: str = "ERROR",
|
||
limit: int = 10,
|
||
since: str | None = None,
|
||
until: str | None = None,
|
||
) -> list[SearchResult]:
|
||
"""Plain-SQL scan: most recent error entries from a named source.
|
||
|
||
Bypasses FTS ranking so text content doesn't affect which errors surface.
|
||
Used by diagnose when FTS keyword search returns nothing for a known source.
|
||
"""
|
||
conn = sqlite3.connect(str(db_path))
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
conn.row_factory = sqlite3.Row
|
||
|
||
conditions = [
|
||
"source_id LIKE ?",
|
||
"severity = ?",
|
||
"repeat_count = 1",
|
||
]
|
||
params: list = [f"%{source_filter}%", severity.upper()]
|
||
|
||
if since:
|
||
conditions.append("timestamp_iso >= ?")
|
||
params.append(since)
|
||
if until:
|
||
conditions.append("timestamp_iso <= ?")
|
||
params.append(until)
|
||
|
||
params.append(limit)
|
||
where = " AND ".join(conditions)
|
||
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
|
||
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank
|
||
FROM log_entries
|
||
WHERE {where}
|
||
ORDER BY timestamp_iso DESC
|
||
LIMIT ?
|
||
""",
|
||
params,
|
||
).fetchall()
|
||
conn.close()
|
||
|
||
return [
|
||
SearchResult(
|
||
entry_id=r["entry_id"],
|
||
source_id=r["source_id"],
|
||
sequence=r["sequence"],
|
||
timestamp_iso=r["timestamp_iso"],
|
||
severity=r["severity"],
|
||
repeat_count=r["repeat_count"],
|
||
out_of_order=bool(r["out_of_order"]),
|
||
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
|
||
text=r["text"],
|
||
rank=r["rank"],
|
||
)
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def list_sources(db_path: Path) -> list[dict]:
|
||
"""Return distinct sources with entry counts and time ranges."""
|
||
conn = sqlite3.connect(str(db_path))
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
rows = conn.execute("""
|
||
SELECT
|
||
source_id,
|
||
COUNT(*) as entry_count,
|
||
MIN(timestamp_iso) as earliest,
|
||
MAX(timestamp_iso) as latest,
|
||
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) as error_count
|
||
FROM log_entries
|
||
GROUP BY source_id
|
||
ORDER BY entry_count DESC
|
||
""").fetchall()
|
||
conn.close()
|
||
return [
|
||
{
|
||
"source_id": r[0],
|
||
"entry_count": r[1],
|
||
"earliest": r[2],
|
||
"latest": r[3],
|
||
"error_count": r[4],
|
||
}
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def format_results(results: list[SearchResult], max_text: int = 300) -> str:
|
||
"""Format search results as readable text for LLM context."""
|
||
if not results:
|
||
return "No matching log entries found."
|
||
|
||
lines = []
|
||
for r in results:
|
||
ts = r.timestamp_iso or "no-timestamp"
|
||
sev = r.severity or "?"
|
||
src = r.source_id
|
||
flags = []
|
||
if r.repeat_count > 1:
|
||
flags.append(f"repeat×{r.repeat_count}")
|
||
if r.out_of_order:
|
||
flags.append("out-of-order")
|
||
if r.matched_patterns:
|
||
flags.append(f"[{', '.join(r.matched_patterns)}]")
|
||
flag_str = f" {' '.join(flags)}" if flags else ""
|
||
|
||
text = r.text[:max_text] + ("…" if len(r.text) > max_text else "")
|
||
lines.append(f"[{ts} | {src} | {sev}{flag_str}]\n{text}")
|
||
|
||
return "\n\n".join(lines)
|