Watcher, REST endpoints, services (search, incidents, blocklist),
MCP server, context retriever, embedder, glean_scheduler, and
doc_upload all used the default 5-second SQLite busy timeout.
During collect glean write phases, watcher flush threads were hitting
'database is locked' errors when the glean held the write lock longer
than 5 seconds.
All connections now use timeout=30.0, matching the pipeline fix
from commit 5a9281a. No logic changes.
493 lines
16 KiB
Python
493 lines
16 KiB
Python
"""FTS5-based log search with severity, source, and pattern filters."""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import re
|
||
import sqlite3
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class SearchResult:
|
||
entry_id: str
|
||
source_id: str
|
||
sequence: int
|
||
timestamp_iso: str | None
|
||
severity: str | None
|
||
repeat_count: int
|
||
out_of_order: bool
|
||
matched_patterns: list[str]
|
||
text: str
|
||
rank: float
|
||
|
||
|
||
def build_fts_index(db_path: Path) -> None:
|
||
"""Build (or rebuild) the FTS5 index from log_entries. Safe to re-run.
|
||
|
||
Drops and recreates the table if the schema is stale (missing sequence column).
|
||
"""
|
||
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
|
||
# Check whether existing table has the sequence column; rebuild if not.
|
||
needs_rebuild = False
|
||
try:
|
||
conn.execute("SELECT sequence FROM log_fts LIMIT 0")
|
||
except sqlite3.OperationalError:
|
||
needs_rebuild = True
|
||
|
||
if needs_rebuild:
|
||
conn.execute("DROP TABLE IF EXISTS log_fts")
|
||
|
||
conn.executescript("""
|
||
CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5(
|
||
text,
|
||
entry_id UNINDEXED,
|
||
source_id UNINDEXED,
|
||
sequence UNINDEXED,
|
||
severity UNINDEXED,
|
||
timestamp_iso UNINDEXED,
|
||
matched_patterns UNINDEXED,
|
||
repeat_count UNINDEXED,
|
||
out_of_order UNINDEXED,
|
||
tokenize = 'porter ascii'
|
||
);
|
||
""")
|
||
# Only insert rows not already indexed
|
||
conn.execute("""
|
||
INSERT INTO log_fts(text, entry_id, source_id, sequence, severity,
|
||
timestamp_iso, matched_patterns,
|
||
repeat_count, out_of_order)
|
||
SELECT e.text, e.id, e.source_id, e.sequence, e.severity,
|
||
e.timestamp_iso, e.matched_patterns,
|
||
e.repeat_count, e.out_of_order
|
||
FROM log_entries e
|
||
WHERE e.id NOT IN (SELECT entry_id FROM log_fts WHERE entry_id IS NOT NULL)
|
||
""")
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def _sanitize_fts_query(raw: str, or_mode: bool = False) -> str:
|
||
"""Strip FTS5 operator characters and return a safe MATCH expression.
|
||
|
||
FTS5 reserves: " * + - ( ) ^ ~ : ?
|
||
or_mode=True joins tokens with OR (any-of) instead of implicit AND (all-of).
|
||
"""
|
||
cleaned = re.sub(r"[^a-zA-Z0-9 _]", " ", raw)
|
||
tokens = cleaned.split()
|
||
if not tokens:
|
||
return '""'
|
||
return (" OR " if or_mode else " ").join(tokens)
|
||
|
||
|
||
def search(
|
||
db_path: Path,
|
||
query: str,
|
||
severity: str | None = None,
|
||
source_filter: str | None = None,
|
||
pattern_filter: str | None = None,
|
||
since: str | None = None,
|
||
until: str | None = None,
|
||
limit: int = 20,
|
||
include_repeats: bool = False,
|
||
or_mode: bool = False,
|
||
) -> list[SearchResult]:
|
||
"""Full-text search with optional filters. Returns results ranked by relevance."""
|
||
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
conn.row_factory = sqlite3.Row
|
||
|
||
fts_query = _sanitize_fts_query(query, or_mode=or_mode)
|
||
conditions = ["log_fts MATCH ?"]
|
||
params: list = [fts_query]
|
||
|
||
if severity:
|
||
conditions.append("severity = ?")
|
||
params.append(severity.upper())
|
||
if source_filter:
|
||
conditions.append("source_id LIKE ?")
|
||
params.append(f"%{source_filter}%")
|
||
if pattern_filter:
|
||
conditions.append("matched_patterns LIKE ?")
|
||
params.append(f'%"{pattern_filter}"%')
|
||
if since:
|
||
conditions.append("timestamp_iso >= ?")
|
||
params.append(since)
|
||
if until:
|
||
conditions.append("timestamp_iso <= ?")
|
||
params.append(until)
|
||
if not include_repeats:
|
||
conditions.append("repeat_count = 1")
|
||
|
||
where = " AND ".join(conditions)
|
||
params.append(limit)
|
||
|
||
try:
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT entry_id, source_id, sequence, timestamp_iso, severity,
|
||
repeat_count, out_of_order, matched_patterns, text, rank
|
||
FROM log_fts
|
||
WHERE {where}
|
||
ORDER BY rank
|
||
LIMIT ?
|
||
""",
|
||
params,
|
||
).fetchall()
|
||
except sqlite3.OperationalError as e:
|
||
logger.warning("FTS query failed (%s) — index may not be built yet", e)
|
||
conn.close()
|
||
return []
|
||
|
||
results = [
|
||
SearchResult(
|
||
entry_id=r["entry_id"],
|
||
source_id=r["source_id"],
|
||
sequence=r["sequence"],
|
||
timestamp_iso=r["timestamp_iso"],
|
||
severity=r["severity"],
|
||
repeat_count=r["repeat_count"],
|
||
out_of_order=bool(r["out_of_order"]),
|
||
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
|
||
text=r["text"],
|
||
rank=r["rank"],
|
||
)
|
||
for r in rows
|
||
]
|
||
conn.close()
|
||
return results
|
||
|
||
|
||
def entries_in_window(
|
||
db_path: Path,
|
||
since: str | None,
|
||
until: str | None,
|
||
severity: str | None = None,
|
||
source_filter: str | None = None,
|
||
limit: int = 100,
|
||
per_source_cap: int | None = None,
|
||
) -> list[SearchResult]:
|
||
"""Return log entries within a time window using a plain SQL scan (no FTS).
|
||
|
||
Used as a fallback when keyword search returns nothing — ensures incident
|
||
detail always shows the raw log activity in the window even if no keywords match.
|
||
|
||
per_source_cap: when set, limits rows per source_id so high-volume sources
|
||
(e.g. network-syslog) don't crowd out lower-volume but more interesting ones.
|
||
Errors/warnings are ranked first within each source partition.
|
||
"""
|
||
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
conn.row_factory = sqlite3.Row
|
||
|
||
conditions: list[str] = ["repeat_count = 1"]
|
||
params: list = []
|
||
|
||
if since:
|
||
conditions.append("timestamp_iso >= ?")
|
||
params.append(since)
|
||
if until:
|
||
conditions.append("timestamp_iso <= ?")
|
||
params.append(until)
|
||
if severity:
|
||
conditions.append("severity = ?")
|
||
params.append(severity.upper())
|
||
if source_filter:
|
||
conditions.append("source_id LIKE ?")
|
||
params.append(f"%{source_filter}%")
|
||
|
||
where = " AND ".join(conditions)
|
||
|
||
if per_source_cap is not None:
|
||
# Use a window function to cap rows per source, errors/warnings first.
|
||
query = f"""
|
||
WITH ranked AS (
|
||
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
|
||
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank,
|
||
ROW_NUMBER() OVER (
|
||
PARTITION BY source_id
|
||
ORDER BY
|
||
CASE UPPER(severity)
|
||
WHEN 'CRITICAL' THEN 0
|
||
WHEN 'ERROR' THEN 1
|
||
WHEN 'WARN' THEN 2
|
||
ELSE 3
|
||
END,
|
||
timestamp_iso
|
||
) AS rn
|
||
FROM log_entries
|
||
WHERE {where}
|
||
)
|
||
SELECT entry_id, source_id, sequence, timestamp_iso, severity,
|
||
repeat_count, out_of_order, matched_patterns, text, rank
|
||
FROM ranked
|
||
WHERE rn <= ?
|
||
ORDER BY timestamp_iso ASC
|
||
LIMIT ?
|
||
"""
|
||
params.extend([per_source_cap, limit])
|
||
else:
|
||
query = f"""
|
||
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
|
||
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank
|
||
FROM log_entries
|
||
WHERE {where}
|
||
ORDER BY timestamp_iso ASC
|
||
LIMIT ?
|
||
"""
|
||
params.append(limit)
|
||
|
||
rows = conn.execute(query, params).fetchall()
|
||
conn.close()
|
||
|
||
return [
|
||
SearchResult(
|
||
entry_id=r["entry_id"],
|
||
source_id=r["source_id"],
|
||
sequence=r["sequence"],
|
||
timestamp_iso=r["timestamp_iso"],
|
||
severity=r["severity"],
|
||
repeat_count=r["repeat_count"],
|
||
out_of_order=bool(r["out_of_order"]),
|
||
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
|
||
text=r["text"],
|
||
rank=r["rank"],
|
||
)
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def recent_source_errors(
|
||
db_path: Path,
|
||
source_filter: str,
|
||
severity: str = "ERROR",
|
||
limit: int = 10,
|
||
since: str | None = None,
|
||
until: str | None = None,
|
||
) -> list[SearchResult]:
|
||
"""Plain-SQL scan: most recent error entries from a named source.
|
||
|
||
Bypasses FTS ranking so text content doesn't affect which errors surface.
|
||
Used by diagnose when FTS keyword search returns nothing for a known source.
|
||
"""
|
||
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
conn.row_factory = sqlite3.Row
|
||
|
||
conditions = [
|
||
"source_id LIKE ?",
|
||
"severity = ?",
|
||
"repeat_count = 1",
|
||
]
|
||
params: list = [f"%{source_filter}%", severity.upper()]
|
||
|
||
if since:
|
||
conditions.append("timestamp_iso >= ?")
|
||
params.append(since)
|
||
if until:
|
||
conditions.append("timestamp_iso <= ?")
|
||
params.append(until)
|
||
|
||
params.append(limit)
|
||
where = " AND ".join(conditions)
|
||
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
|
||
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank
|
||
FROM log_entries
|
||
WHERE {where}
|
||
ORDER BY timestamp_iso DESC
|
||
LIMIT ?
|
||
""",
|
||
params,
|
||
).fetchall()
|
||
conn.close()
|
||
|
||
return [
|
||
SearchResult(
|
||
entry_id=r["entry_id"],
|
||
source_id=r["source_id"],
|
||
sequence=r["sequence"],
|
||
timestamp_iso=r["timestamp_iso"],
|
||
severity=r["severity"],
|
||
repeat_count=r["repeat_count"],
|
||
out_of_order=bool(r["out_of_order"]),
|
||
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
|
||
text=r["text"],
|
||
rank=r["rank"],
|
||
)
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def list_sources(db_path: Path) -> list[dict]:
|
||
"""Return distinct sources with entry counts and time ranges."""
|
||
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
rows = conn.execute("""
|
||
SELECT
|
||
source_id,
|
||
COUNT(*) as entry_count,
|
||
MIN(timestamp_iso) as earliest,
|
||
MAX(timestamp_iso) as latest,
|
||
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) as error_count
|
||
FROM log_entries
|
||
GROUP BY source_id
|
||
ORDER BY entry_count DESC
|
||
""").fetchall()
|
||
conn.close()
|
||
return [
|
||
{
|
||
"source_id": r[0],
|
||
"entry_count": r[1],
|
||
"earliest": r[2],
|
||
"latest": r[3],
|
||
"error_count": r[4],
|
||
}
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def _compile_overrides(overrides: list[dict]) -> list[tuple[re.Pattern[str], str]]:
|
||
"""Return (compiled_pattern, override_severity) pairs for enabled rules."""
|
||
compiled = []
|
||
for rule in overrides:
|
||
if not rule.get("enabled", True):
|
||
continue
|
||
try:
|
||
compiled.append((re.compile(rule["pattern"], re.IGNORECASE), rule["override_severity"]))
|
||
except re.error:
|
||
pass
|
||
return compiled
|
||
|
||
|
||
def _apply_overrides(text: str, original_severity: str, rules: list[tuple[re.Pattern[str], str]]) -> str:
|
||
for pattern, new_sev in rules:
|
||
if pattern.search(text):
|
||
return new_sev
|
||
return original_severity
|
||
|
||
|
||
def stats_summary(db_path: Path, window_hours: int = 24, severity_overrides: list[dict] | None = None) -> dict:
|
||
"""Return aggregate health stats for the dashboard.
|
||
|
||
Queries plain log_entries (not FTS) so it works even before the index is built.
|
||
"""
|
||
rules = _compile_overrides(severity_overrides or [])
|
||
|
||
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
conn.row_factory = sqlite3.Row
|
||
|
||
since_expr = f"strftime('%Y-%m-%dT%H:%M:%S', 'now', '-{window_hours} hours')"
|
||
|
||
# Overall counts in window
|
||
row = conn.execute(f"""
|
||
SELECT
|
||
COUNT(*) AS total,
|
||
SUM(CASE WHEN severity = 'CRITICAL' THEN 1 ELSE 0 END) AS criticals,
|
||
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS errors
|
||
FROM log_entries
|
||
WHERE timestamp_iso >= {since_expr}
|
||
AND repeat_count = 1
|
||
""").fetchone()
|
||
total_24h = int(row["total"] or 0)
|
||
criticals_24h = int(row["criticals"] or 0)
|
||
errors_24h = int(row["errors"] or 0)
|
||
|
||
# Per-source breakdown
|
||
source_rows = conn.execute(f"""
|
||
SELECT
|
||
source_id,
|
||
COUNT(*) AS entry_count,
|
||
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS error_count,
|
||
MAX(timestamp_iso) AS latest
|
||
FROM log_entries
|
||
WHERE timestamp_iso >= {since_expr}
|
||
AND repeat_count = 1
|
||
GROUP BY source_id
|
||
ORDER BY error_count DESC, entry_count DESC
|
||
""").fetchall()
|
||
source_health = [
|
||
{
|
||
"source_id": r["source_id"],
|
||
"entry_count": int(r["entry_count"]),
|
||
"error_count": int(r["error_count"]),
|
||
"latest": r["latest"],
|
||
}
|
||
for r in source_rows
|
||
]
|
||
|
||
# Fetch candidate criticals (fetch more so filtering doesn't leave us with too few)
|
||
crit_rows = conn.execute("""
|
||
SELECT id as entry_id, source_id, timestamp_iso, severity, text
|
||
FROM log_entries
|
||
WHERE severity = 'CRITICAL' AND repeat_count = 1
|
||
ORDER BY timestamp_iso DESC
|
||
LIMIT 25
|
||
""").fetchall()
|
||
|
||
# Apply overrides: skip entries whose effective severity is no longer CRITICAL
|
||
suppressed = 0
|
||
recent_criticals = []
|
||
for r in crit_rows:
|
||
effective = _apply_overrides(r["text"], r["severity"], rules)
|
||
if effective == "CRITICAL":
|
||
recent_criticals.append({
|
||
"entry_id": r["entry_id"],
|
||
"source_id": r["source_id"],
|
||
"timestamp_iso": r["timestamp_iso"],
|
||
"severity": r["severity"],
|
||
"text": r["text"],
|
||
})
|
||
if len(recent_criticals) == 5:
|
||
break
|
||
else:
|
||
suppressed += 1
|
||
|
||
last_row = conn.execute("SELECT MAX(ingest_time) AS t FROM log_entries").fetchone()
|
||
last_gleaned: str | None = last_row["t"] if last_row else None
|
||
|
||
conn.close()
|
||
|
||
return {
|
||
"window_hours": window_hours,
|
||
"total_24h": total_24h,
|
||
"criticals_24h": criticals_24h,
|
||
"errors_24h": errors_24h,
|
||
"source_health": source_health,
|
||
"recent_criticals": recent_criticals,
|
||
"suppressed_criticals": suppressed,
|
||
"last_gleaned": last_gleaned,
|
||
}
|
||
|
||
|
||
def format_results(results: list[SearchResult], max_text: int = 300) -> str:
|
||
"""Format search results as readable text for LLM context."""
|
||
if not results:
|
||
return "No matching log entries found."
|
||
|
||
lines = []
|
||
for r in results:
|
||
ts = r.timestamp_iso or "no-timestamp"
|
||
sev = r.severity or "?"
|
||
src = r.source_id
|
||
flags = []
|
||
if r.repeat_count > 1:
|
||
flags.append(f"repeat×{r.repeat_count}")
|
||
if r.out_of_order:
|
||
flags.append("out-of-order")
|
||
if r.matched_patterns:
|
||
flags.append(f"[{', '.join(r.matched_patterns)}]")
|
||
flag_str = f" {' '.join(flags)}" if flags else ""
|
||
|
||
text = r.text[:max_text] + ("…" if len(r.text) > max_text else "")
|
||
lines.append(f"[{ts} | {src} | {sev}{flag_str}]\n{text}")
|
||
|
||
return "\n\n".join(lines)
|