Both log_fts and log_entries have timestamp_iso, severity, source_id, and matched_patterns columns. After the JOIN, unqualified references to any of these caused SQLite to raise 'ambiguous column name', silently falling back to the non-FTS scan path on every time-filtered or severity-filtered query. Prefix all filter conditions that touch FTS-mirror columns with f. to resolve the ambiguity. The e. prefix on tenant_id was already correct since tenant_id is not present in the FTS virtual table.
755 lines
24 KiB
Python
755 lines
24 KiB
Python
"""FTS-based log search with optional hybrid BM25 + vector re-ranking.
|
||
|
||
SQLite backend: FTS5 virtual table with Porter stemmer.
|
||
Postgres backend: tsvector column with GIN index + websearch_to_tsquery.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import re
|
||
import sqlite3
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timedelta, timezone
|
||
from pathlib import Path
|
||
|
||
from app.db import BACKEND, Backend, frag, get_conn, resolve_tenant_id
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class SearchResult:
|
||
entry_id: str
|
||
source_id: str
|
||
sequence: int
|
||
timestamp_iso: str | None
|
||
severity: str | None
|
||
repeat_count: int
|
||
out_of_order: bool
|
||
matched_patterns: list[str]
|
||
text: str
|
||
rank: float
|
||
|
||
|
||
def build_fts_index(db_path: Path) -> None:
|
||
"""Build (or rebuild) the FTS5 index from log_entries. Safe to re-run.
|
||
|
||
For Postgres, the tsvector column is maintained by a trigger — this is a no-op.
|
||
"""
|
||
if BACKEND == Backend.POSTGRES:
|
||
return
|
||
|
||
with get_conn(db_path) as conn:
|
||
needs_rebuild = False
|
||
try:
|
||
conn.execute("SELECT sequence FROM log_fts LIMIT 0")
|
||
except Exception:
|
||
needs_rebuild = True
|
||
|
||
if needs_rebuild:
|
||
conn.execute("DROP TABLE IF EXISTS log_fts")
|
||
conn.commit()
|
||
|
||
conn.execute("""
|
||
CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5(
|
||
text,
|
||
entry_id UNINDEXED,
|
||
source_id UNINDEXED,
|
||
sequence UNINDEXED,
|
||
severity UNINDEXED,
|
||
timestamp_iso UNINDEXED,
|
||
matched_patterns UNINDEXED,
|
||
repeat_count UNINDEXED,
|
||
out_of_order UNINDEXED,
|
||
tokenize = 'porter ascii'
|
||
)
|
||
""")
|
||
conn.execute("""
|
||
INSERT INTO log_fts(text, entry_id, source_id, sequence, severity,
|
||
timestamp_iso, matched_patterns,
|
||
repeat_count, out_of_order)
|
||
SELECT e.text, e.id, e.source_id, e.sequence, e.severity,
|
||
e.timestamp_iso, e.matched_patterns,
|
||
e.repeat_count, e.out_of_order
|
||
FROM log_entries e
|
||
WHERE e.id NOT IN (SELECT entry_id FROM log_fts WHERE entry_id IS NOT NULL)
|
||
""")
|
||
conn.commit()
|
||
|
||
|
||
def _sanitize_fts_query(raw: str, or_mode: bool = False) -> str:
|
||
"""Strip FTS5 operator characters and return a safe MATCH expression.
|
||
|
||
FTS5 reserves: " * + - ( ) ^ ~ : ?
|
||
or_mode=True joins tokens with OR (any-of) instead of implicit AND (all-of).
|
||
"""
|
||
cleaned = re.sub(r"[^a-zA-Z0-9 _]", " ", raw)
|
||
tokens = cleaned.split()
|
||
if not tokens:
|
||
return '""'
|
||
return (" OR " if or_mode else " ").join(tokens)
|
||
|
||
|
||
def search(
|
||
db_path: Path,
|
||
query: str,
|
||
severity: str | None = None,
|
||
source_filter: str | None = None,
|
||
pattern_filter: str | None = None,
|
||
since: str | None = None,
|
||
until: str | None = None,
|
||
limit: int = 20,
|
||
include_repeats: bool = False,
|
||
or_mode: bool = False,
|
||
semantic: bool = False,
|
||
) -> list[SearchResult]:
|
||
"""Full-text search with optional filters. Returns results ranked by relevance.
|
||
|
||
When ``semantic=True`` and an embedding backend is configured, the BM25
|
||
candidate pool is re-ranked using hybrid scoring (BM25 + cosine similarity).
|
||
Falls back silently to pure BM25 when the embedder is unavailable.
|
||
"""
|
||
if semantic:
|
||
return _hybrid_search(
|
||
db_path, query, severity=severity, source_filter=source_filter,
|
||
pattern_filter=pattern_filter, since=since, until=until, limit=limit,
|
||
include_repeats=include_repeats, or_mode=or_mode,
|
||
)
|
||
return _bm25_search(
|
||
db_path, query, severity=severity, source_filter=source_filter,
|
||
pattern_filter=pattern_filter, since=since, until=until, limit=limit,
|
||
include_repeats=include_repeats, or_mode=or_mode,
|
||
)
|
||
|
||
|
||
def _hybrid_search(
|
||
db_path: Path,
|
||
query: str,
|
||
severity: str | None = None,
|
||
source_filter: str | None = None,
|
||
pattern_filter: str | None = None,
|
||
since: str | None = None,
|
||
until: str | None = None,
|
||
limit: int = 20,
|
||
include_repeats: bool = False,
|
||
or_mode: bool = False,
|
||
alpha: float = 0.6,
|
||
beta: float = 0.4,
|
||
) -> list[SearchResult]:
|
||
"""BM25 + vector re-ranking (late-fusion hybrid search).
|
||
|
||
Fetches an oversized BM25 candidate pool, embeds the query and each
|
||
candidate text in-process, then combines scores:
|
||
|
||
hybrid_score = alpha * bm25_normalized + beta * cosine_sim
|
||
|
||
BM25 normalization: FTS5 rank is negative (more negative = better match).
|
||
We flip the sign and divide by the pool maximum so all BM25 scores land
|
||
in (0, 1] — 1.0 for the top BM25 hit, approaching 0 for the weakest.
|
||
|
||
Falls back to pure BM25 when the embedding backend is unavailable.
|
||
"""
|
||
from app.services.embeddings import EMBEDDING_AVAILABLE, cosine_similarity, get_embedder
|
||
|
||
# Fetch a large candidate pool — 5x limit, minimum 100 entries.
|
||
pool_limit = max(limit * 5, 100)
|
||
candidates = _bm25_search(
|
||
db_path, query, severity=severity, source_filter=source_filter,
|
||
pattern_filter=pattern_filter, since=since, until=until,
|
||
limit=pool_limit, include_repeats=include_repeats, or_mode=or_mode,
|
||
)
|
||
|
||
if not candidates:
|
||
return []
|
||
|
||
if not EMBEDDING_AVAILABLE:
|
||
return candidates[:limit]
|
||
|
||
embedder = get_embedder()
|
||
if embedder is None:
|
||
return candidates[:limit]
|
||
|
||
try:
|
||
query_vec = embedder.embed(query)
|
||
candidate_vecs = embedder.embed_batch([r.text for r in candidates])
|
||
except Exception as exc:
|
||
logger.warning("Hybrid search embedding failed (%s) — falling back to BM25", exc)
|
||
return candidates[:limit]
|
||
|
||
# Normalize BM25 ranks: FTS5 rank is negative, flip and scale to [0, 1].
|
||
abs_ranks = [abs(r.rank) for r in candidates]
|
||
max_rank = max(abs_ranks) or 1.0
|
||
|
||
scored: list[tuple[float, SearchResult]] = []
|
||
for result, abs_rank, cand_vec in zip(candidates, abs_ranks, candidate_vecs):
|
||
bm25_norm = abs_rank / max_rank
|
||
cos_sim = cosine_similarity(query_vec, cand_vec)
|
||
hybrid = alpha * bm25_norm + beta * max(cos_sim, 0.0)
|
||
scored.append((hybrid, result))
|
||
|
||
scored.sort(key=lambda x: x[0], reverse=True)
|
||
return [r for _, r in scored[:limit]]
|
||
|
||
|
||
def _bm25_search(
|
||
db_path: Path,
|
||
query: str,
|
||
severity: str | None = None,
|
||
source_filter: str | None = None,
|
||
pattern_filter: str | None = None,
|
||
since: str | None = None,
|
||
until: str | None = None,
|
||
limit: int = 20,
|
||
include_repeats: bool = False,
|
||
or_mode: bool = False,
|
||
) -> list[SearchResult]:
|
||
"""FTS search — BM25 via FTS5 (SQLite) or tsvector (Postgres)."""
|
||
tid = resolve_tenant_id()
|
||
|
||
if BACKEND == Backend.POSTGRES:
|
||
return _pg_fts_search(
|
||
db_path, query, tid,
|
||
severity=severity, source_filter=source_filter,
|
||
pattern_filter=pattern_filter, since=since, until=until,
|
||
limit=limit, include_repeats=include_repeats,
|
||
)
|
||
|
||
return _sqlite_fts_search(
|
||
db_path, query, tid,
|
||
severity=severity, source_filter=source_filter,
|
||
pattern_filter=pattern_filter, since=since, until=until,
|
||
limit=limit, include_repeats=include_repeats, or_mode=or_mode,
|
||
)
|
||
|
||
|
||
def _sqlite_fts_search(
|
||
db_path: Path,
|
||
query: str,
|
||
tid: str,
|
||
severity: str | None,
|
||
source_filter: str | None,
|
||
pattern_filter: str | None,
|
||
since: str | None,
|
||
until: str | None,
|
||
limit: int,
|
||
include_repeats: bool,
|
||
or_mode: bool,
|
||
) -> list[SearchResult]:
|
||
fts_query = _sanitize_fts_query(query, or_mode=or_mode)
|
||
conditions = [
|
||
"log_fts MATCH ?",
|
||
"(e.tenant_id = ? OR e.tenant_id = '')",
|
||
]
|
||
params: list = [fts_query, tid]
|
||
|
||
if severity:
|
||
conditions.append("f.severity = ?")
|
||
params.append(severity.upper())
|
||
if source_filter:
|
||
conditions.append("f.source_id LIKE ?")
|
||
params.append(f"%{source_filter}%")
|
||
if pattern_filter:
|
||
conditions.append("f.matched_patterns LIKE ?")
|
||
params.append(f'%"{pattern_filter}"%')
|
||
if since:
|
||
conditions.append("f.timestamp_iso >= ?")
|
||
params.append(since)
|
||
if until:
|
||
conditions.append("f.timestamp_iso <= ?")
|
||
params.append(until)
|
||
if not include_repeats:
|
||
conditions.append("f.repeat_count = 1")
|
||
|
||
where = " AND ".join(conditions)
|
||
params.append(limit)
|
||
|
||
raw = sqlite3.connect(str(db_path), timeout=30.0)
|
||
raw.row_factory = sqlite3.Row
|
||
try:
|
||
rows = raw.execute(
|
||
f"""
|
||
SELECT f.entry_id, f.source_id, f.sequence, f.timestamp_iso, f.severity,
|
||
f.repeat_count, f.out_of_order, f.matched_patterns, f.text, f.rank
|
||
FROM log_fts f
|
||
JOIN log_entries e ON e.id = f.entry_id
|
||
WHERE {where}
|
||
ORDER BY f.rank
|
||
LIMIT ?
|
||
""",
|
||
params,
|
||
).fetchall()
|
||
except sqlite3.OperationalError as exc:
|
||
logger.warning("FTS query failed (%s) — index may not be built yet", exc)
|
||
return []
|
||
finally:
|
||
raw.close()
|
||
|
||
return [
|
||
SearchResult(
|
||
entry_id=r["entry_id"],
|
||
source_id=r["source_id"],
|
||
sequence=r["sequence"],
|
||
timestamp_iso=r["timestamp_iso"],
|
||
severity=r["severity"],
|
||
repeat_count=r["repeat_count"],
|
||
out_of_order=bool(r["out_of_order"]),
|
||
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
|
||
text=r["text"],
|
||
rank=float(r["rank"]),
|
||
)
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def _pg_fts_search(
|
||
db_path: Path,
|
||
query: str,
|
||
tid: str,
|
||
severity: str | None,
|
||
source_filter: str | None,
|
||
pattern_filter: str | None,
|
||
since: str | None,
|
||
until: str | None,
|
||
limit: int,
|
||
include_repeats: bool,
|
||
) -> list[SearchResult]:
|
||
"""Postgres FTS via tsvector column and websearch_to_tsquery."""
|
||
tsq = "websearch_to_tsquery('english', %s)"
|
||
conditions = [
|
||
f"text_tsv @@ {tsq}",
|
||
"(tenant_id = %s OR tenant_id = '')",
|
||
]
|
||
params: list = [query, tid]
|
||
|
||
if severity:
|
||
conditions.append("severity = %s")
|
||
params.append(severity.upper())
|
||
if source_filter:
|
||
conditions.append("source_id LIKE %s")
|
||
params.append(f"%{source_filter}%")
|
||
if pattern_filter:
|
||
conditions.append("matched_patterns LIKE %s")
|
||
params.append(f'%"{pattern_filter}"%')
|
||
if since:
|
||
conditions.append("timestamp_iso >= %s")
|
||
params.append(since)
|
||
if until:
|
||
conditions.append("timestamp_iso <= %s")
|
||
params.append(until)
|
||
if not include_repeats:
|
||
conditions.append("repeat_count = 1")
|
||
|
||
where = " AND ".join(conditions)
|
||
# ts_rank needs the tsquery again — append it then the limit
|
||
params.extend([query, limit])
|
||
|
||
with get_conn(db_path) as conn:
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT id AS entry_id, source_id, sequence, timestamp_iso, severity,
|
||
repeat_count, out_of_order, matched_patterns, text,
|
||
ts_rank(text_tsv, {tsq}) AS rank
|
||
FROM log_entries
|
||
WHERE {where}
|
||
ORDER BY rank DESC
|
||
LIMIT %s
|
||
""",
|
||
params,
|
||
).fetchall()
|
||
|
||
return [
|
||
SearchResult(
|
||
entry_id=r["entry_id"],
|
||
source_id=r["source_id"],
|
||
sequence=r["sequence"],
|
||
timestamp_iso=r["timestamp_iso"],
|
||
severity=r["severity"],
|
||
repeat_count=r["repeat_count"],
|
||
out_of_order=bool(r["out_of_order"]),
|
||
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
|
||
text=r["text"],
|
||
rank=float(r["rank"]),
|
||
)
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def entries_in_window(
|
||
db_path: Path,
|
||
since: str | None,
|
||
until: str | None,
|
||
severity: str | None = None,
|
||
source_filter: str | None = None,
|
||
limit: int = 100,
|
||
per_source_cap: int | None = None,
|
||
) -> list[SearchResult]:
|
||
"""Return log entries within a time window using a plain SQL scan (no FTS).
|
||
|
||
Used as a fallback when keyword search returns nothing — ensures incident
|
||
detail always shows the raw log activity in the window even if no keywords match.
|
||
|
||
per_source_cap: when set, limits rows per source_id so high-volume sources
|
||
(e.g. network-syslog) don't crowd out lower-volume but more interesting ones.
|
||
Errors/warnings are ranked first within each source partition.
|
||
"""
|
||
tid = resolve_tenant_id()
|
||
conditions: list[str] = [
|
||
"repeat_count = 1",
|
||
"(tenant_id = ? OR tenant_id = '')",
|
||
]
|
||
params: list = [tid]
|
||
|
||
if since:
|
||
conditions.append("timestamp_iso >= ?")
|
||
params.append(since)
|
||
if until:
|
||
conditions.append("timestamp_iso <= ?")
|
||
params.append(until)
|
||
if severity:
|
||
conditions.append("severity = ?")
|
||
params.append(severity.upper())
|
||
if source_filter:
|
||
conditions.append("source_id LIKE ?")
|
||
params.append(f"%{source_filter}%")
|
||
|
||
where = " AND ".join(conditions)
|
||
|
||
if per_source_cap is not None:
|
||
sql = f"""
|
||
WITH ranked AS (
|
||
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
|
||
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank,
|
||
ROW_NUMBER() OVER (
|
||
PARTITION BY source_id
|
||
ORDER BY
|
||
CASE UPPER(severity)
|
||
WHEN 'CRITICAL' THEN 0
|
||
WHEN 'ERROR' THEN 1
|
||
WHEN 'WARN' THEN 2
|
||
ELSE 3
|
||
END,
|
||
timestamp_iso
|
||
) AS rn
|
||
FROM log_entries
|
||
WHERE {where}
|
||
)
|
||
SELECT entry_id, source_id, sequence, timestamp_iso, severity,
|
||
repeat_count, out_of_order, matched_patterns, text, rank
|
||
FROM ranked
|
||
WHERE rn <= ?
|
||
ORDER BY timestamp_iso ASC
|
||
LIMIT ?
|
||
"""
|
||
params.extend([per_source_cap, limit])
|
||
else:
|
||
sql = f"""
|
||
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
|
||
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank
|
||
FROM log_entries
|
||
WHERE {where}
|
||
ORDER BY timestamp_iso ASC
|
||
LIMIT ?
|
||
"""
|
||
params.append(limit)
|
||
|
||
with get_conn(db_path) as conn:
|
||
rows = conn.execute(sql, params).fetchall()
|
||
|
||
return [
|
||
SearchResult(
|
||
entry_id=r["entry_id"],
|
||
source_id=r["source_id"],
|
||
sequence=r["sequence"],
|
||
timestamp_iso=r["timestamp_iso"],
|
||
severity=r["severity"],
|
||
repeat_count=r["repeat_count"],
|
||
out_of_order=bool(r["out_of_order"]),
|
||
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
|
||
text=r["text"],
|
||
rank=float(r["rank"]),
|
||
)
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def recent_source_errors(
|
||
db_path: Path,
|
||
source_filter: str,
|
||
severity: str = "ERROR",
|
||
limit: int = 10,
|
||
since: str | None = None,
|
||
until: str | None = None,
|
||
) -> list[SearchResult]:
|
||
"""Plain-SQL scan: most recent error entries from a named source.
|
||
|
||
Bypasses FTS ranking so text content doesn't affect which errors surface.
|
||
Used by diagnose when FTS keyword search returns nothing for a known source.
|
||
"""
|
||
tid = resolve_tenant_id()
|
||
conditions = [
|
||
"source_id LIKE ?",
|
||
"severity = ?",
|
||
"repeat_count = 1",
|
||
"(tenant_id = ? OR tenant_id = '')",
|
||
]
|
||
params: list = [f"%{source_filter}%", severity.upper(), tid]
|
||
|
||
if since:
|
||
conditions.append("timestamp_iso >= ?")
|
||
params.append(since)
|
||
if until:
|
||
conditions.append("timestamp_iso <= ?")
|
||
params.append(until)
|
||
|
||
params.append(limit)
|
||
where = " AND ".join(conditions)
|
||
|
||
with get_conn(db_path) as conn:
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
|
||
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank
|
||
FROM log_entries
|
||
WHERE {where}
|
||
ORDER BY timestamp_iso DESC
|
||
LIMIT ?
|
||
""",
|
||
params,
|
||
).fetchall()
|
||
|
||
return [
|
||
SearchResult(
|
||
entry_id=r["entry_id"],
|
||
source_id=r["source_id"],
|
||
sequence=r["sequence"],
|
||
timestamp_iso=r["timestamp_iso"],
|
||
severity=r["severity"],
|
||
repeat_count=r["repeat_count"],
|
||
out_of_order=bool(r["out_of_order"]),
|
||
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
|
||
text=r["text"],
|
||
rank=float(r["rank"]),
|
||
)
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def list_sources(db_path: Path) -> list[dict]:
|
||
"""Return sources with entry counts, grouped by prefix:host stem.
|
||
|
||
source_ids with three or more colon-separated segments (e.g.
|
||
``muninn-journal:Muninn:ssh.service``) are collapsed to their first two
|
||
segments (``muninn-journal:Muninn``). Single- or two-segment IDs are
|
||
returned as-is. ``unit_count`` reports how many distinct sub-units were
|
||
merged into each row.
|
||
"""
|
||
tid = resolve_tenant_id()
|
||
group_expr = frag.source_group_expr("source_id")
|
||
with get_conn(db_path) as conn:
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT
|
||
{group_expr} AS group_id,
|
||
COUNT(DISTINCT source_id) AS unit_count,
|
||
COUNT(*) AS entry_count,
|
||
MIN(timestamp_iso) AS earliest,
|
||
MAX(timestamp_iso) AS latest,
|
||
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT')
|
||
THEN 1 ELSE 0 END) AS error_count
|
||
FROM log_entries
|
||
WHERE (tenant_id = ? OR tenant_id = '')
|
||
GROUP BY group_id
|
||
ORDER BY entry_count DESC
|
||
""",
|
||
(tid,),
|
||
).fetchall()
|
||
return [
|
||
{
|
||
"source_id": r["group_id"],
|
||
"unit_count": r["unit_count"],
|
||
"entry_count": r["entry_count"],
|
||
"earliest": r["earliest"],
|
||
"latest": r["latest"],
|
||
"error_count": r["error_count"],
|
||
}
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def _compile_overrides(overrides: list[dict]) -> list[tuple[re.Pattern[str], str]]:
|
||
"""Return (compiled_pattern, override_severity) pairs for enabled rules."""
|
||
compiled = []
|
||
for rule in overrides:
|
||
if not rule.get("enabled", True):
|
||
continue
|
||
try:
|
||
compiled.append((re.compile(rule["pattern"], re.IGNORECASE), rule["override_severity"]))
|
||
except re.error:
|
||
pass
|
||
return compiled
|
||
|
||
|
||
def _apply_overrides(text: str, original_severity: str, rules: list[tuple[re.Pattern[str], str]]) -> str:
|
||
for pattern, new_sev in rules:
|
||
if pattern.search(text):
|
||
return new_sev
|
||
return original_severity
|
||
|
||
|
||
def stats_summary(db_path: Path, window_hours: int = 24, severity_overrides: list[dict] | None = None) -> dict:
|
||
"""Return aggregate health stats for the dashboard.
|
||
|
||
Queries plain log_entries (not FTS) so it works even before the index is built.
|
||
"""
|
||
rules = _compile_overrides(severity_overrides or [])
|
||
tid = resolve_tenant_id()
|
||
group_expr = frag.source_group_expr("source_id")
|
||
since_iso = (
|
||
datetime.now(timezone.utc) - timedelta(hours=window_hours)
|
||
).strftime("%Y-%m-%dT%H:%M:%S")
|
||
|
||
with get_conn(db_path) as conn:
|
||
row = conn.execute(
|
||
"""
|
||
SELECT
|
||
COUNT(*) AS total,
|
||
SUM(CASE WHEN severity = 'CRITICAL' THEN 1 ELSE 0 END) AS criticals,
|
||
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS errors
|
||
FROM log_entries
|
||
WHERE timestamp_iso >= ?
|
||
AND repeat_count = 1
|
||
AND (tenant_id = ? OR tenant_id = '')
|
||
""",
|
||
(since_iso, tid),
|
||
).fetchone()
|
||
total_24h = int(row["total"] or 0)
|
||
criticals_24h = int(row["criticals"] or 0)
|
||
errors_24h = int(row["errors"] or 0)
|
||
|
||
source_rows = conn.execute(
|
||
f"""
|
||
SELECT
|
||
{group_expr} AS group_id,
|
||
COUNT(*) AS entry_count,
|
||
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS error_count,
|
||
MAX(timestamp_iso) AS latest
|
||
FROM log_entries
|
||
WHERE timestamp_iso >= ?
|
||
AND repeat_count = 1
|
||
AND (tenant_id = ? OR tenant_id = '')
|
||
GROUP BY group_id
|
||
ORDER BY error_count DESC, entry_count DESC
|
||
""",
|
||
(since_iso, tid),
|
||
).fetchall()
|
||
|
||
crit_rows = conn.execute(
|
||
"""
|
||
SELECT id as entry_id, source_id, timestamp_iso, severity, text
|
||
FROM log_entries
|
||
WHERE severity = 'CRITICAL'
|
||
AND repeat_count = 1
|
||
AND (tenant_id = ? OR tenant_id = '')
|
||
ORDER BY timestamp_iso DESC
|
||
LIMIT 25
|
||
""",
|
||
(tid,),
|
||
).fetchall()
|
||
|
||
timeline_rows = conn.execute(
|
||
"""
|
||
SELECT id as entry_id, source_id, timestamp_iso, severity, text
|
||
FROM log_entries
|
||
WHERE severity IN ('CRITICAL','ERROR','WARN','WARNING','EMERGENCY','ALERT')
|
||
AND timestamp_iso >= ?
|
||
AND timestamp_iso IS NOT NULL
|
||
AND repeat_count = 1
|
||
AND (tenant_id = ? OR tenant_id = '')
|
||
ORDER BY timestamp_iso DESC
|
||
LIMIT 300
|
||
""",
|
||
(since_iso, tid),
|
||
).fetchall()
|
||
|
||
last_row = conn.execute(
|
||
"SELECT MAX(ingest_time) AS t FROM log_entries WHERE (tenant_id = ? OR tenant_id = '')",
|
||
(tid,),
|
||
).fetchone()
|
||
|
||
source_health = [
|
||
{
|
||
"source_id": r["group_id"],
|
||
"entry_count": int(r["entry_count"]),
|
||
"error_count": int(r["error_count"]),
|
||
"latest": r["latest"],
|
||
}
|
||
for r in source_rows
|
||
]
|
||
|
||
suppressed = 0
|
||
recent_criticals = []
|
||
for r in crit_rows:
|
||
effective = _apply_overrides(r["text"], r["severity"], rules)
|
||
if effective == "CRITICAL":
|
||
recent_criticals.append({
|
||
"entry_id": r["entry_id"],
|
||
"source_id": r["source_id"],
|
||
"timestamp_iso": r["timestamp_iso"],
|
||
"severity": r["severity"],
|
||
"text": r["text"],
|
||
})
|
||
if len(recent_criticals) == 5:
|
||
break
|
||
else:
|
||
suppressed += 1
|
||
|
||
timeline_events = [
|
||
{
|
||
"entry_id": r["entry_id"],
|
||
"source_id": r["source_id"],
|
||
"timestamp_iso": r["timestamp_iso"],
|
||
"severity": r["severity"],
|
||
"text": r["text"],
|
||
}
|
||
for r in timeline_rows
|
||
]
|
||
|
||
last_gleaned: str | None = last_row["t"] if last_row else None
|
||
|
||
return {
|
||
"window_hours": window_hours,
|
||
"total_24h": total_24h,
|
||
"criticals_24h": criticals_24h,
|
||
"errors_24h": errors_24h,
|
||
"source_health": source_health,
|
||
"recent_criticals": recent_criticals,
|
||
"suppressed_criticals": suppressed,
|
||
"last_gleaned": last_gleaned,
|
||
"timeline_events": timeline_events,
|
||
}
|
||
|
||
|
||
def format_results(results: list[SearchResult], max_text: int = 300) -> str:
|
||
"""Format search results as readable text for LLM context."""
|
||
if not results:
|
||
return "No matching log entries found."
|
||
|
||
lines = []
|
||
for r in results:
|
||
ts = r.timestamp_iso or "no-timestamp"
|
||
sev = r.severity or "?"
|
||
src = r.source_id
|
||
flags = []
|
||
if r.repeat_count > 1:
|
||
flags.append(f"repeat×{r.repeat_count}")
|
||
if r.out_of_order:
|
||
flags.append("out-of-order")
|
||
if r.matched_patterns:
|
||
flags.append(f"[{', '.join(r.matched_patterns)}]")
|
||
flag_str = f" {' '.join(flags)}" if flags else ""
|
||
|
||
text = r.text[:max_text] + ("…" if len(r.text) > max_text else "")
|
||
lines.append(f"[{ts} | {src} | {sev}{flag_str}]\n{text}")
|
||
|
||
return "\n\n".join(lines)
|