turnstone/app/services/search.py
pyr0ball 502ff54fd0 feat(ui): security alert dedup, clickable criticals, loading shimmer
Security Alerts:
- Client-side duplicate collapsing via anomaly_label + text fingerprint
- ×N count badge chip on collapsed rows; toggle to expand
- Skeleton shimmer rows replace "Loading..." text

Dashboard:
- Clickable Recent Criticals — inline LLM explanation via SSE stream
- ±5 min time window scoped to source_id for useful context
- Explanation cache keyed by entry_id (no re-fetch on re-expand)
- Default diagnose query injected on Diagnose button navigation to
  prevent local models hallucinating from bare log data
- Stat card and source-health skeleton shimmer loading states

Backend:
- anomaly.py: 4-attempt retry on "database is locked" with 10s backoff
- search.py: migrate build_fts_index to get_conn() (WAL race fix);
  add timeline_events to stats_summary for clickable criticals feature
- theme.css: @keyframes shimmer + .loading-shimmer utility;
  prefers-reduced-motion degrades gracefully to static muted block
2026-06-13 09:32:26 -07:00

755 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""FTS-based log search with optional hybrid BM25 + vector re-ranking.
SQLite backend: FTS5 virtual table with Porter stemmer.
Postgres backend: tsvector column with GIN index + websearch_to_tsquery.
"""
from __future__ import annotations
import json
import logging
import re
import sqlite3
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from app.db import BACKEND, Backend, frag, get_conn, resolve_tenant_id
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class SearchResult:
entry_id: str
source_id: str
sequence: int
timestamp_iso: str | None
severity: str | None
repeat_count: int
out_of_order: bool
matched_patterns: list[str]
text: str
rank: float
def build_fts_index(db_path: Path) -> None:
"""Build (or rebuild) the FTS5 index from log_entries. Safe to re-run.
For Postgres, the tsvector column is maintained by a trigger — this is a no-op.
"""
if BACKEND == Backend.POSTGRES:
return
with get_conn(db_path) as conn:
needs_rebuild = False
try:
conn.execute("SELECT sequence FROM log_fts LIMIT 0")
except Exception:
needs_rebuild = True
if needs_rebuild:
conn.execute("DROP TABLE IF EXISTS log_fts")
conn.commit()
conn.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5(
text,
entry_id UNINDEXED,
source_id UNINDEXED,
sequence UNINDEXED,
severity UNINDEXED,
timestamp_iso UNINDEXED,
matched_patterns UNINDEXED,
repeat_count UNINDEXED,
out_of_order UNINDEXED,
tokenize = 'porter ascii'
)
""")
conn.execute("""
INSERT INTO log_fts(text, entry_id, source_id, sequence, severity,
timestamp_iso, matched_patterns,
repeat_count, out_of_order)
SELECT e.text, e.id, e.source_id, e.sequence, e.severity,
e.timestamp_iso, e.matched_patterns,
e.repeat_count, e.out_of_order
FROM log_entries e
WHERE e.id NOT IN (SELECT entry_id FROM log_fts WHERE entry_id IS NOT NULL)
""")
conn.commit()
def _sanitize_fts_query(raw: str, or_mode: bool = False) -> str:
"""Strip FTS5 operator characters and return a safe MATCH expression.
FTS5 reserves: " * + - ( ) ^ ~ : ?
or_mode=True joins tokens with OR (any-of) instead of implicit AND (all-of).
"""
cleaned = re.sub(r"[^a-zA-Z0-9 _]", " ", raw)
tokens = cleaned.split()
if not tokens:
return '""'
return (" OR " if or_mode else " ").join(tokens)
def search(
db_path: Path,
query: str,
severity: str | None = None,
source_filter: str | None = None,
pattern_filter: str | None = None,
since: str | None = None,
until: str | None = None,
limit: int = 20,
include_repeats: bool = False,
or_mode: bool = False,
semantic: bool = False,
) -> list[SearchResult]:
"""Full-text search with optional filters. Returns results ranked by relevance.
When ``semantic=True`` and an embedding backend is configured, the BM25
candidate pool is re-ranked using hybrid scoring (BM25 + cosine similarity).
Falls back silently to pure BM25 when the embedder is unavailable.
"""
if semantic:
return _hybrid_search(
db_path, query, severity=severity, source_filter=source_filter,
pattern_filter=pattern_filter, since=since, until=until, limit=limit,
include_repeats=include_repeats, or_mode=or_mode,
)
return _bm25_search(
db_path, query, severity=severity, source_filter=source_filter,
pattern_filter=pattern_filter, since=since, until=until, limit=limit,
include_repeats=include_repeats, or_mode=or_mode,
)
def _hybrid_search(
db_path: Path,
query: str,
severity: str | None = None,
source_filter: str | None = None,
pattern_filter: str | None = None,
since: str | None = None,
until: str | None = None,
limit: int = 20,
include_repeats: bool = False,
or_mode: bool = False,
alpha: float = 0.6,
beta: float = 0.4,
) -> list[SearchResult]:
"""BM25 + vector re-ranking (late-fusion hybrid search).
Fetches an oversized BM25 candidate pool, embeds the query and each
candidate text in-process, then combines scores:
hybrid_score = alpha * bm25_normalized + beta * cosine_sim
BM25 normalization: FTS5 rank is negative (more negative = better match).
We flip the sign and divide by the pool maximum so all BM25 scores land
in (0, 1] — 1.0 for the top BM25 hit, approaching 0 for the weakest.
Falls back to pure BM25 when the embedding backend is unavailable.
"""
from app.services.embeddings import EMBEDDING_AVAILABLE, cosine_similarity, get_embedder
# Fetch a large candidate pool — 5x limit, minimum 100 entries.
pool_limit = max(limit * 5, 100)
candidates = _bm25_search(
db_path, query, severity=severity, source_filter=source_filter,
pattern_filter=pattern_filter, since=since, until=until,
limit=pool_limit, include_repeats=include_repeats, or_mode=or_mode,
)
if not candidates:
return []
if not EMBEDDING_AVAILABLE:
return candidates[:limit]
embedder = get_embedder()
if embedder is None:
return candidates[:limit]
try:
query_vec = embedder.embed(query)
candidate_vecs = embedder.embed_batch([r.text for r in candidates])
except Exception as exc:
logger.warning("Hybrid search embedding failed (%s) — falling back to BM25", exc)
return candidates[:limit]
# Normalize BM25 ranks: FTS5 rank is negative, flip and scale to [0, 1].
abs_ranks = [abs(r.rank) for r in candidates]
max_rank = max(abs_ranks) or 1.0
scored: list[tuple[float, SearchResult]] = []
for result, abs_rank, cand_vec in zip(candidates, abs_ranks, candidate_vecs):
bm25_norm = abs_rank / max_rank
cos_sim = cosine_similarity(query_vec, cand_vec)
hybrid = alpha * bm25_norm + beta * max(cos_sim, 0.0)
scored.append((hybrid, result))
scored.sort(key=lambda x: x[0], reverse=True)
return [r for _, r in scored[:limit]]
def _bm25_search(
db_path: Path,
query: str,
severity: str | None = None,
source_filter: str | None = None,
pattern_filter: str | None = None,
since: str | None = None,
until: str | None = None,
limit: int = 20,
include_repeats: bool = False,
or_mode: bool = False,
) -> list[SearchResult]:
"""FTS search — BM25 via FTS5 (SQLite) or tsvector (Postgres)."""
tid = resolve_tenant_id()
if BACKEND == Backend.POSTGRES:
return _pg_fts_search(
db_path, query, tid,
severity=severity, source_filter=source_filter,
pattern_filter=pattern_filter, since=since, until=until,
limit=limit, include_repeats=include_repeats,
)
return _sqlite_fts_search(
db_path, query, tid,
severity=severity, source_filter=source_filter,
pattern_filter=pattern_filter, since=since, until=until,
limit=limit, include_repeats=include_repeats, or_mode=or_mode,
)
def _sqlite_fts_search(
db_path: Path,
query: str,
tid: str,
severity: str | None,
source_filter: str | None,
pattern_filter: str | None,
since: str | None,
until: str | None,
limit: int,
include_repeats: bool,
or_mode: bool,
) -> list[SearchResult]:
fts_query = _sanitize_fts_query(query, or_mode=or_mode)
conditions = [
"log_fts MATCH ?",
"(e.tenant_id = ? OR e.tenant_id = '')",
]
params: list = [fts_query, tid]
if severity:
conditions.append("severity = ?")
params.append(severity.upper())
if source_filter:
conditions.append("source_id LIKE ?")
params.append(f"%{source_filter}%")
if pattern_filter:
conditions.append("matched_patterns LIKE ?")
params.append(f'%"{pattern_filter}"%')
if since:
conditions.append("timestamp_iso >= ?")
params.append(since)
if until:
conditions.append("timestamp_iso <= ?")
params.append(until)
if not include_repeats:
conditions.append("f.repeat_count = 1")
where = " AND ".join(conditions)
params.append(limit)
raw = sqlite3.connect(str(db_path), timeout=30.0)
raw.row_factory = sqlite3.Row
try:
rows = raw.execute(
f"""
SELECT f.entry_id, f.source_id, f.sequence, f.timestamp_iso, f.severity,
f.repeat_count, f.out_of_order, f.matched_patterns, f.text, f.rank
FROM log_fts f
JOIN log_entries e ON e.id = f.entry_id
WHERE {where}
ORDER BY f.rank
LIMIT ?
""",
params,
).fetchall()
except sqlite3.OperationalError as exc:
logger.warning("FTS query failed (%s) — index may not be built yet", exc)
return []
finally:
raw.close()
return [
SearchResult(
entry_id=r["entry_id"],
source_id=r["source_id"],
sequence=r["sequence"],
timestamp_iso=r["timestamp_iso"],
severity=r["severity"],
repeat_count=r["repeat_count"],
out_of_order=bool(r["out_of_order"]),
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
text=r["text"],
rank=float(r["rank"]),
)
for r in rows
]
def _pg_fts_search(
db_path: Path,
query: str,
tid: str,
severity: str | None,
source_filter: str | None,
pattern_filter: str | None,
since: str | None,
until: str | None,
limit: int,
include_repeats: bool,
) -> list[SearchResult]:
"""Postgres FTS via tsvector column and websearch_to_tsquery."""
tsq = "websearch_to_tsquery('english', %s)"
conditions = [
f"text_tsv @@ {tsq}",
"(tenant_id = %s OR tenant_id = '')",
]
params: list = [query, tid]
if severity:
conditions.append("severity = %s")
params.append(severity.upper())
if source_filter:
conditions.append("source_id LIKE %s")
params.append(f"%{source_filter}%")
if pattern_filter:
conditions.append("matched_patterns LIKE %s")
params.append(f'%"{pattern_filter}"%')
if since:
conditions.append("timestamp_iso >= %s")
params.append(since)
if until:
conditions.append("timestamp_iso <= %s")
params.append(until)
if not include_repeats:
conditions.append("repeat_count = 1")
where = " AND ".join(conditions)
# ts_rank needs the tsquery again — append it then the limit
params.extend([query, limit])
with get_conn(db_path) as conn:
rows = conn.execute(
f"""
SELECT id AS entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text,
ts_rank(text_tsv, {tsq}) AS rank
FROM log_entries
WHERE {where}
ORDER BY rank DESC
LIMIT %s
""",
params,
).fetchall()
return [
SearchResult(
entry_id=r["entry_id"],
source_id=r["source_id"],
sequence=r["sequence"],
timestamp_iso=r["timestamp_iso"],
severity=r["severity"],
repeat_count=r["repeat_count"],
out_of_order=bool(r["out_of_order"]),
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
text=r["text"],
rank=float(r["rank"]),
)
for r in rows
]
def entries_in_window(
db_path: Path,
since: str | None,
until: str | None,
severity: str | None = None,
source_filter: str | None = None,
limit: int = 100,
per_source_cap: int | None = None,
) -> list[SearchResult]:
"""Return log entries within a time window using a plain SQL scan (no FTS).
Used as a fallback when keyword search returns nothing — ensures incident
detail always shows the raw log activity in the window even if no keywords match.
per_source_cap: when set, limits rows per source_id so high-volume sources
(e.g. network-syslog) don't crowd out lower-volume but more interesting ones.
Errors/warnings are ranked first within each source partition.
"""
tid = resolve_tenant_id()
conditions: list[str] = [
"repeat_count = 1",
"(tenant_id = ? OR tenant_id = '')",
]
params: list = [tid]
if since:
conditions.append("timestamp_iso >= ?")
params.append(since)
if until:
conditions.append("timestamp_iso <= ?")
params.append(until)
if severity:
conditions.append("severity = ?")
params.append(severity.upper())
if source_filter:
conditions.append("source_id LIKE ?")
params.append(f"%{source_filter}%")
where = " AND ".join(conditions)
if per_source_cap is not None:
sql = f"""
WITH ranked AS (
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank,
ROW_NUMBER() OVER (
PARTITION BY source_id
ORDER BY
CASE UPPER(severity)
WHEN 'CRITICAL' THEN 0
WHEN 'ERROR' THEN 1
WHEN 'WARN' THEN 2
ELSE 3
END,
timestamp_iso
) AS rn
FROM log_entries
WHERE {where}
)
SELECT entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, rank
FROM ranked
WHERE rn <= ?
ORDER BY timestamp_iso ASC
LIMIT ?
"""
params.extend([per_source_cap, limit])
else:
sql = f"""
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank
FROM log_entries
WHERE {where}
ORDER BY timestamp_iso ASC
LIMIT ?
"""
params.append(limit)
with get_conn(db_path) as conn:
rows = conn.execute(sql, params).fetchall()
return [
SearchResult(
entry_id=r["entry_id"],
source_id=r["source_id"],
sequence=r["sequence"],
timestamp_iso=r["timestamp_iso"],
severity=r["severity"],
repeat_count=r["repeat_count"],
out_of_order=bool(r["out_of_order"]),
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
text=r["text"],
rank=float(r["rank"]),
)
for r in rows
]
def recent_source_errors(
db_path: Path,
source_filter: str,
severity: str = "ERROR",
limit: int = 10,
since: str | None = None,
until: str | None = None,
) -> list[SearchResult]:
"""Plain-SQL scan: most recent error entries from a named source.
Bypasses FTS ranking so text content doesn't affect which errors surface.
Used by diagnose when FTS keyword search returns nothing for a known source.
"""
tid = resolve_tenant_id()
conditions = [
"source_id LIKE ?",
"severity = ?",
"repeat_count = 1",
"(tenant_id = ? OR tenant_id = '')",
]
params: list = [f"%{source_filter}%", severity.upper(), tid]
if since:
conditions.append("timestamp_iso >= ?")
params.append(since)
if until:
conditions.append("timestamp_iso <= ?")
params.append(until)
params.append(limit)
where = " AND ".join(conditions)
with get_conn(db_path) as conn:
rows = conn.execute(
f"""
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank
FROM log_entries
WHERE {where}
ORDER BY timestamp_iso DESC
LIMIT ?
""",
params,
).fetchall()
return [
SearchResult(
entry_id=r["entry_id"],
source_id=r["source_id"],
sequence=r["sequence"],
timestamp_iso=r["timestamp_iso"],
severity=r["severity"],
repeat_count=r["repeat_count"],
out_of_order=bool(r["out_of_order"]),
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
text=r["text"],
rank=float(r["rank"]),
)
for r in rows
]
def list_sources(db_path: Path) -> list[dict]:
"""Return sources with entry counts, grouped by prefix:host stem.
source_ids with three or more colon-separated segments (e.g.
``muninn-journal:Muninn:ssh.service``) are collapsed to their first two
segments (``muninn-journal:Muninn``). Single- or two-segment IDs are
returned as-is. ``unit_count`` reports how many distinct sub-units were
merged into each row.
"""
tid = resolve_tenant_id()
group_expr = frag.source_group_expr("source_id")
with get_conn(db_path) as conn:
rows = conn.execute(
f"""
SELECT
{group_expr} AS group_id,
COUNT(DISTINCT source_id) AS unit_count,
COUNT(*) AS entry_count,
MIN(timestamp_iso) AS earliest,
MAX(timestamp_iso) AS latest,
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT')
THEN 1 ELSE 0 END) AS error_count
FROM log_entries
WHERE (tenant_id = ? OR tenant_id = '')
GROUP BY group_id
ORDER BY entry_count DESC
""",
(tid,),
).fetchall()
return [
{
"source_id": r["group_id"],
"unit_count": r["unit_count"],
"entry_count": r["entry_count"],
"earliest": r["earliest"],
"latest": r["latest"],
"error_count": r["error_count"],
}
for r in rows
]
def _compile_overrides(overrides: list[dict]) -> list[tuple[re.Pattern[str], str]]:
"""Return (compiled_pattern, override_severity) pairs for enabled rules."""
compiled = []
for rule in overrides:
if not rule.get("enabled", True):
continue
try:
compiled.append((re.compile(rule["pattern"], re.IGNORECASE), rule["override_severity"]))
except re.error:
pass
return compiled
def _apply_overrides(text: str, original_severity: str, rules: list[tuple[re.Pattern[str], str]]) -> str:
for pattern, new_sev in rules:
if pattern.search(text):
return new_sev
return original_severity
def stats_summary(db_path: Path, window_hours: int = 24, severity_overrides: list[dict] | None = None) -> dict:
"""Return aggregate health stats for the dashboard.
Queries plain log_entries (not FTS) so it works even before the index is built.
"""
rules = _compile_overrides(severity_overrides or [])
tid = resolve_tenant_id()
group_expr = frag.source_group_expr("source_id")
since_iso = (
datetime.now(timezone.utc) - timedelta(hours=window_hours)
).strftime("%Y-%m-%dT%H:%M:%S")
with get_conn(db_path) as conn:
row = conn.execute(
"""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN severity = 'CRITICAL' THEN 1 ELSE 0 END) AS criticals,
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS errors
FROM log_entries
WHERE timestamp_iso >= ?
AND repeat_count = 1
AND (tenant_id = ? OR tenant_id = '')
""",
(since_iso, tid),
).fetchone()
total_24h = int(row["total"] or 0)
criticals_24h = int(row["criticals"] or 0)
errors_24h = int(row["errors"] or 0)
source_rows = conn.execute(
f"""
SELECT
{group_expr} AS group_id,
COUNT(*) AS entry_count,
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS error_count,
MAX(timestamp_iso) AS latest
FROM log_entries
WHERE timestamp_iso >= ?
AND repeat_count = 1
AND (tenant_id = ? OR tenant_id = '')
GROUP BY group_id
ORDER BY error_count DESC, entry_count DESC
""",
(since_iso, tid),
).fetchall()
crit_rows = conn.execute(
"""
SELECT id as entry_id, source_id, timestamp_iso, severity, text
FROM log_entries
WHERE severity = 'CRITICAL'
AND repeat_count = 1
AND (tenant_id = ? OR tenant_id = '')
ORDER BY timestamp_iso DESC
LIMIT 25
""",
(tid,),
).fetchall()
timeline_rows = conn.execute(
"""
SELECT id as entry_id, source_id, timestamp_iso, severity, text
FROM log_entries
WHERE severity IN ('CRITICAL','ERROR','WARN','WARNING','EMERGENCY','ALERT')
AND timestamp_iso >= ?
AND timestamp_iso IS NOT NULL
AND repeat_count = 1
AND (tenant_id = ? OR tenant_id = '')
ORDER BY timestamp_iso DESC
LIMIT 300
""",
(since_iso, tid),
).fetchall()
last_row = conn.execute(
"SELECT MAX(ingest_time) AS t FROM log_entries WHERE (tenant_id = ? OR tenant_id = '')",
(tid,),
).fetchone()
source_health = [
{
"source_id": r["group_id"],
"entry_count": int(r["entry_count"]),
"error_count": int(r["error_count"]),
"latest": r["latest"],
}
for r in source_rows
]
suppressed = 0
recent_criticals = []
for r in crit_rows:
effective = _apply_overrides(r["text"], r["severity"], rules)
if effective == "CRITICAL":
recent_criticals.append({
"entry_id": r["entry_id"],
"source_id": r["source_id"],
"timestamp_iso": r["timestamp_iso"],
"severity": r["severity"],
"text": r["text"],
})
if len(recent_criticals) == 5:
break
else:
suppressed += 1
timeline_events = [
{
"entry_id": r["entry_id"],
"source_id": r["source_id"],
"timestamp_iso": r["timestamp_iso"],
"severity": r["severity"],
"text": r["text"],
}
for r in timeline_rows
]
last_gleaned: str | None = last_row["t"] if last_row else None
return {
"window_hours": window_hours,
"total_24h": total_24h,
"criticals_24h": criticals_24h,
"errors_24h": errors_24h,
"source_health": source_health,
"recent_criticals": recent_criticals,
"suppressed_criticals": suppressed,
"last_gleaned": last_gleaned,
"timeline_events": timeline_events,
}
def format_results(results: list[SearchResult], max_text: int = 300) -> str:
"""Format search results as readable text for LLM context."""
if not results:
return "No matching log entries found."
lines = []
for r in results:
ts = r.timestamp_iso or "no-timestamp"
sev = r.severity or "?"
src = r.source_id
flags = []
if r.repeat_count > 1:
flags.append(f"repeat×{r.repeat_count}")
if r.out_of_order:
flags.append("out-of-order")
if r.matched_patterns:
flags.append(f"[{', '.join(r.matched_patterns)}]")
flag_str = f" {' '.join(flags)}" if flags else ""
text = r.text[:max_text] + ("" if len(r.text) > max_text else "")
lines.append(f"[{ts} | {src} | {sev}{flag_str}]\n{text}")
return "\n\n".join(lines)