turnstone/app/services/search.py
pyr0ball 784a4072b4 feat: SSE streaming diagnose, severity filter pills, per-source-cap search
- diagnose_stream() async generator: status/summary/entries/reasoning/done events
- POST /api/diagnose/stream SSE endpoint wired in rest.py
- entries_in_window() gains per_source_cap to prevent high-volume sources crowding results
- QuickCapture: severity filter pills, filtered entries view, pipeline status spinner
- llm.py: remove overly broad HTTPStatusError re-raise
2026-05-13 15:45:35 -07:00

494 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""FTS5-based log search with severity, source, and pattern filters."""
from __future__ import annotations
import json
import logging
import re
import sqlite3
from dataclasses import dataclass
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class SearchResult:
entry_id: str
source_id: str
sequence: int
timestamp_iso: str | None
severity: str | None
repeat_count: int
out_of_order: bool
matched_patterns: list[str]
text: str
rank: float
def build_fts_index(db_path: Path) -> None:
"""Build (or rebuild) the FTS5 index from log_entries. Safe to re-run.
Drops and recreates the table if the schema is stale (missing sequence column).
"""
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
# Check whether existing table has the sequence column; rebuild if not.
needs_rebuild = False
try:
conn.execute("SELECT sequence FROM log_fts LIMIT 0")
except sqlite3.OperationalError:
needs_rebuild = True
if needs_rebuild:
conn.execute("DROP TABLE IF EXISTS log_fts")
conn.executescript("""
CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5(
text,
entry_id UNINDEXED,
source_id UNINDEXED,
sequence UNINDEXED,
severity UNINDEXED,
timestamp_iso UNINDEXED,
matched_patterns UNINDEXED,
repeat_count UNINDEXED,
out_of_order UNINDEXED,
tokenize = 'porter ascii'
);
""")
# Only insert rows not already indexed
conn.execute("""
INSERT INTO log_fts(text, entry_id, source_id, sequence, severity,
timestamp_iso, matched_patterns,
repeat_count, out_of_order)
SELECT e.text, e.id, e.source_id, e.sequence, e.severity,
e.timestamp_iso, e.matched_patterns,
e.repeat_count, e.out_of_order
FROM log_entries e
WHERE e.id NOT IN (SELECT entry_id FROM log_fts WHERE entry_id IS NOT NULL)
""")
conn.commit()
conn.close()
def _sanitize_fts_query(raw: str, or_mode: bool = False) -> str:
"""Strip FTS5 operator characters and return a safe MATCH expression.
FTS5 reserves: " * + - ( ) ^ ~ : ?
or_mode=True joins tokens with OR (any-of) instead of implicit AND (all-of).
"""
cleaned = re.sub(r"[^a-zA-Z0-9 _]", " ", raw)
tokens = cleaned.split()
if not tokens:
return '""'
return (" OR " if or_mode else " ").join(tokens)
def search(
db_path: Path,
query: str,
severity: str | None = None,
source_filter: str | None = None,
pattern_filter: str | None = None,
since: str | None = None,
until: str | None = None,
limit: int = 20,
include_repeats: bool = False,
or_mode: bool = False,
) -> list[SearchResult]:
"""Full-text search with optional filters. Returns results ranked by relevance."""
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
fts_query = _sanitize_fts_query(query, or_mode=or_mode)
conditions = ["log_fts MATCH ?"]
params: list = [fts_query]
if severity:
conditions.append("severity = ?")
params.append(severity.upper())
if source_filter:
conditions.append("source_id LIKE ?")
params.append(f"%{source_filter}%")
if pattern_filter:
conditions.append("matched_patterns LIKE ?")
params.append(f'%"{pattern_filter}"%')
if since:
conditions.append("timestamp_iso >= ?")
params.append(since)
if until:
conditions.append("timestamp_iso <= ?")
params.append(until)
if not include_repeats:
conditions.append("repeat_count = 1")
where = " AND ".join(conditions)
params.append(limit)
try:
rows = conn.execute(
f"""
SELECT entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, rank
FROM log_fts
WHERE {where}
ORDER BY rank
LIMIT ?
""",
params,
).fetchall()
except sqlite3.OperationalError as e:
logger.warning("FTS query failed (%s) — index may not be built yet", e)
conn.close()
return []
results = [
SearchResult(
entry_id=r["entry_id"],
source_id=r["source_id"],
sequence=r["sequence"],
timestamp_iso=r["timestamp_iso"],
severity=r["severity"],
repeat_count=r["repeat_count"],
out_of_order=bool(r["out_of_order"]),
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
text=r["text"],
rank=r["rank"],
)
for r in rows
]
conn.close()
return results
def entries_in_window(
db_path: Path,
since: str | None,
until: str | None,
severity: str | None = None,
source_filter: str | None = None,
limit: int = 100,
per_source_cap: int | None = None,
) -> list[SearchResult]:
"""Return log entries within a time window using a plain SQL scan (no FTS).
Used as a fallback when keyword search returns nothing — ensures incident
detail always shows the raw log activity in the window even if no keywords match.
per_source_cap: when set, limits rows per source_id so high-volume sources
(e.g. network-syslog) don't crowd out lower-volume but more interesting ones.
Errors/warnings are ranked first within each source partition.
"""
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
conditions: list[str] = ["repeat_count = 1"]
params: list = []
if since:
conditions.append("timestamp_iso >= ?")
params.append(since)
if until:
conditions.append("timestamp_iso <= ?")
params.append(until)
if severity:
conditions.append("severity = ?")
params.append(severity.upper())
if source_filter:
conditions.append("source_id LIKE ?")
params.append(f"%{source_filter}%")
where = " AND ".join(conditions)
if per_source_cap is not None:
# Use a window function to cap rows per source, errors/warnings first.
query = f"""
WITH ranked AS (
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank,
ROW_NUMBER() OVER (
PARTITION BY source_id
ORDER BY
CASE UPPER(severity)
WHEN 'CRITICAL' THEN 0
WHEN 'ERROR' THEN 1
WHEN 'WARN' THEN 2
ELSE 3
END,
timestamp_iso
) AS rn
FROM log_entries
WHERE {where}
)
SELECT entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, rank
FROM ranked
WHERE rn <= ?
ORDER BY timestamp_iso ASC
LIMIT ?
"""
params.extend([per_source_cap, limit])
else:
query = f"""
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank
FROM log_entries
WHERE {where}
ORDER BY timestamp_iso ASC
LIMIT ?
"""
params.append(limit)
rows = conn.execute(query, params).fetchall()
conn.close()
return [
SearchResult(
entry_id=r["entry_id"],
source_id=r["source_id"],
sequence=r["sequence"],
timestamp_iso=r["timestamp_iso"],
severity=r["severity"],
repeat_count=r["repeat_count"],
out_of_order=bool(r["out_of_order"]),
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
text=r["text"],
rank=r["rank"],
)
for r in rows
]
def recent_source_errors(
db_path: Path,
source_filter: str,
severity: str = "ERROR",
limit: int = 10,
since: str | None = None,
until: str | None = None,
) -> list[SearchResult]:
"""Plain-SQL scan: most recent error entries from a named source.
Bypasses FTS ranking so text content doesn't affect which errors surface.
Used by diagnose when FTS keyword search returns nothing for a known source.
"""
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
conditions = [
"source_id LIKE ?",
"severity = ?",
"repeat_count = 1",
]
params: list = [f"%{source_filter}%", severity.upper()]
if since:
conditions.append("timestamp_iso >= ?")
params.append(since)
if until:
conditions.append("timestamp_iso <= ?")
params.append(until)
params.append(limit)
where = " AND ".join(conditions)
rows = conn.execute(
f"""
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank
FROM log_entries
WHERE {where}
ORDER BY timestamp_iso DESC
LIMIT ?
""",
params,
).fetchall()
conn.close()
return [
SearchResult(
entry_id=r["entry_id"],
source_id=r["source_id"],
sequence=r["sequence"],
timestamp_iso=r["timestamp_iso"],
severity=r["severity"],
repeat_count=r["repeat_count"],
out_of_order=bool(r["out_of_order"]),
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
text=r["text"],
rank=r["rank"],
)
for r in rows
]
def list_sources(db_path: Path) -> list[dict]:
"""Return distinct sources with entry counts and time ranges."""
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
rows = conn.execute("""
SELECT
source_id,
COUNT(*) as entry_count,
MIN(timestamp_iso) as earliest,
MAX(timestamp_iso) as latest,
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) as error_count
FROM log_entries
GROUP BY source_id
ORDER BY entry_count DESC
""").fetchall()
conn.close()
return [
{
"source_id": r[0],
"entry_count": r[1],
"earliest": r[2],
"latest": r[3],
"error_count": r[4],
}
for r in rows
]
def _compile_overrides(overrides: list[dict]) -> list[tuple[re.Pattern[str], str]]:
"""Return (compiled_pattern, override_severity) pairs for enabled rules."""
compiled = []
for rule in overrides:
if not rule.get("enabled", True):
continue
try:
compiled.append((re.compile(rule["pattern"], re.IGNORECASE), rule["override_severity"]))
except re.error:
pass
return compiled
def _apply_overrides(text: str, original_severity: str, rules: list[tuple[re.Pattern[str], str]]) -> str:
for pattern, new_sev in rules:
if pattern.search(text):
return new_sev
return original_severity
def stats_summary(db_path: Path, window_hours: int = 24, severity_overrides: list[dict] | None = None) -> dict:
"""Return aggregate health stats for the dashboard.
Queries plain log_entries (not FTS) so it works even before the index is built.
"""
rules = _compile_overrides(severity_overrides or [])
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
since_expr = f"strftime('%Y-%m-%dT%H:%M:%S', 'now', '-{window_hours} hours')"
# Overall counts in window
row = conn.execute(f"""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN severity = 'CRITICAL' THEN 1 ELSE 0 END) AS criticals,
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS errors
FROM log_entries
WHERE timestamp_iso >= {since_expr}
AND repeat_count = 1
""").fetchone()
total_24h = int(row["total"] or 0)
criticals_24h = int(row["criticals"] or 0)
errors_24h = int(row["errors"] or 0)
# Per-source breakdown
source_rows = conn.execute(f"""
SELECT
source_id,
COUNT(*) AS entry_count,
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS error_count,
MAX(timestamp_iso) AS latest
FROM log_entries
WHERE timestamp_iso >= {since_expr}
AND repeat_count = 1
GROUP BY source_id
ORDER BY error_count DESC, entry_count DESC
""").fetchall()
source_health = [
{
"source_id": r["source_id"],
"entry_count": int(r["entry_count"]),
"error_count": int(r["error_count"]),
"latest": r["latest"],
}
for r in source_rows
]
# Fetch candidate criticals (fetch more so filtering doesn't leave us with too few)
crit_rows = conn.execute("""
SELECT id as entry_id, source_id, timestamp_iso, severity, text
FROM log_entries
WHERE severity = 'CRITICAL' AND repeat_count = 1
ORDER BY timestamp_iso DESC
LIMIT 25
""").fetchall()
# Apply overrides: skip entries whose effective severity is no longer CRITICAL
suppressed = 0
recent_criticals = []
for r in crit_rows:
effective = _apply_overrides(r["text"], r["severity"], rules)
if effective == "CRITICAL":
recent_criticals.append({
"entry_id": r["entry_id"],
"source_id": r["source_id"],
"timestamp_iso": r["timestamp_iso"],
"severity": r["severity"],
"text": r["text"],
})
if len(recent_criticals) == 5:
break
else:
suppressed += 1
# When did we last ingest anything?
last_row = conn.execute("SELECT MAX(ingest_time) AS t FROM log_entries").fetchone()
last_ingested: str | None = last_row["t"] if last_row else None
conn.close()
return {
"window_hours": window_hours,
"total_24h": total_24h,
"criticals_24h": criticals_24h,
"errors_24h": errors_24h,
"source_health": source_health,
"recent_criticals": recent_criticals,
"suppressed_criticals": suppressed,
"last_ingested": last_ingested,
}
def format_results(results: list[SearchResult], max_text: int = 300) -> str:
"""Format search results as readable text for LLM context."""
if not results:
return "No matching log entries found."
lines = []
for r in results:
ts = r.timestamp_iso or "no-timestamp"
sev = r.severity or "?"
src = r.source_id
flags = []
if r.repeat_count > 1:
flags.append(f"repeat×{r.repeat_count}")
if r.out_of_order:
flags.append("out-of-order")
if r.matched_patterns:
flags.append(f"[{', '.join(r.matched_patterns)}]")
flag_str = f" {' '.join(flags)}" if flags else ""
text = r.text[:max_text] + ("" if len(r.text) > max_text else "")
lines.append(f"[{ts} | {src} | {sev}{flag_str}]\n{text}")
return "\n\n".join(lines)