Ingest pipeline (journald / Caddy / Docker-wrapped formats) with per-source state tracking (repeat dedup, out-of-order detection), named pattern tagging at ingest time, and idempotent SHA1-keyed writes. FTS5 search layer with porter stemmer, severity/source/pattern/time filters, and BM25 ranking. MCP server (FastMCP stdio) with three tools: search_logs, diagnose, list_log_sources — compatible with both Claude Code and Copilot CLI. WAL mode enabled on all connections. FTS index auto-built after ingest. MCP configs included for Claude Code (.mcp.json) and Copilot CLI (.github/copilot/mcp.json).
200 lines
6 KiB
Python
200 lines
6 KiB
Python
"""FTS5-based log search with severity, source, and pattern filters."""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import sqlite3
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class SearchResult:
|
||
entry_id: str
|
||
source_id: str
|
||
sequence: int
|
||
timestamp_iso: str | None
|
||
severity: str | None
|
||
repeat_count: int
|
||
out_of_order: bool
|
||
matched_patterns: list[str]
|
||
text: str
|
||
rank: float
|
||
|
||
|
||
def build_fts_index(db_path: Path) -> None:
|
||
"""Build (or rebuild) the FTS5 index from log_entries. Safe to re-run.
|
||
|
||
Drops and recreates the table if the schema is stale (missing sequence column).
|
||
"""
|
||
conn = sqlite3.connect(str(db_path))
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
|
||
# Check whether existing table has the sequence column; rebuild if not.
|
||
needs_rebuild = False
|
||
try:
|
||
conn.execute("SELECT sequence FROM log_fts LIMIT 0")
|
||
except sqlite3.OperationalError:
|
||
needs_rebuild = True
|
||
|
||
if needs_rebuild:
|
||
conn.execute("DROP TABLE IF EXISTS log_fts")
|
||
|
||
conn.executescript("""
|
||
CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5(
|
||
text,
|
||
entry_id UNINDEXED,
|
||
source_id UNINDEXED,
|
||
sequence UNINDEXED,
|
||
severity UNINDEXED,
|
||
timestamp_iso UNINDEXED,
|
||
matched_patterns UNINDEXED,
|
||
repeat_count UNINDEXED,
|
||
out_of_order UNINDEXED,
|
||
tokenize = 'porter ascii'
|
||
);
|
||
""")
|
||
# Only insert rows not already indexed
|
||
conn.execute("""
|
||
INSERT INTO log_fts(text, entry_id, source_id, sequence, severity,
|
||
timestamp_iso, matched_patterns,
|
||
repeat_count, out_of_order)
|
||
SELECT e.text, e.id, e.source_id, e.sequence, e.severity,
|
||
e.timestamp_iso, e.matched_patterns,
|
||
e.repeat_count, e.out_of_order
|
||
FROM log_entries e
|
||
WHERE e.id NOT IN (SELECT entry_id FROM log_fts WHERE entry_id IS NOT NULL)
|
||
""")
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def search(
|
||
db_path: Path,
|
||
query: str,
|
||
severity: str | None = None,
|
||
source_filter: str | None = None,
|
||
pattern_filter: str | None = None,
|
||
since: str | None = None,
|
||
until: str | None = None,
|
||
limit: int = 20,
|
||
include_repeats: bool = False,
|
||
) -> list[SearchResult]:
|
||
"""Full-text search with optional filters. Returns results ranked by relevance."""
|
||
conn = sqlite3.connect(str(db_path))
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
conn.row_factory = sqlite3.Row
|
||
|
||
conditions = ["log_fts MATCH ?"]
|
||
params: list = [query]
|
||
|
||
if severity:
|
||
conditions.append("severity = ?")
|
||
params.append(severity.upper())
|
||
if source_filter:
|
||
conditions.append("source_id LIKE ?")
|
||
params.append(f"%{source_filter}%")
|
||
if pattern_filter:
|
||
conditions.append("matched_patterns LIKE ?")
|
||
params.append(f'%"{pattern_filter}"%')
|
||
if since:
|
||
conditions.append("timestamp_iso >= ?")
|
||
params.append(since)
|
||
if until:
|
||
conditions.append("timestamp_iso <= ?")
|
||
params.append(until)
|
||
if not include_repeats:
|
||
conditions.append("repeat_count = 1")
|
||
|
||
where = " AND ".join(conditions)
|
||
params.append(limit)
|
||
|
||
try:
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT entry_id, source_id, sequence, timestamp_iso, severity,
|
||
repeat_count, out_of_order, matched_patterns, text, rank
|
||
FROM log_fts
|
||
WHERE {where}
|
||
ORDER BY rank
|
||
LIMIT ?
|
||
""",
|
||
params,
|
||
).fetchall()
|
||
except sqlite3.OperationalError as e:
|
||
logger.warning("FTS query failed (%s) — index may not be built yet", e)
|
||
conn.close()
|
||
return []
|
||
|
||
results = [
|
||
SearchResult(
|
||
entry_id=r["entry_id"],
|
||
source_id=r["source_id"],
|
||
sequence=r["sequence"],
|
||
timestamp_iso=r["timestamp_iso"],
|
||
severity=r["severity"],
|
||
repeat_count=r["repeat_count"],
|
||
out_of_order=bool(r["out_of_order"]),
|
||
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
|
||
text=r["text"],
|
||
rank=r["rank"],
|
||
)
|
||
for r in rows
|
||
]
|
||
conn.close()
|
||
return results
|
||
|
||
|
||
def list_sources(db_path: Path) -> list[dict]:
|
||
"""Return distinct sources with entry counts and time ranges."""
|
||
conn = sqlite3.connect(str(db_path))
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
rows = conn.execute("""
|
||
SELECT
|
||
source_id,
|
||
COUNT(*) as entry_count,
|
||
MIN(timestamp_iso) as earliest,
|
||
MAX(timestamp_iso) as latest,
|
||
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) as error_count
|
||
FROM log_entries
|
||
GROUP BY source_id
|
||
ORDER BY entry_count DESC
|
||
""").fetchall()
|
||
conn.close()
|
||
return [
|
||
{
|
||
"source_id": r[0],
|
||
"entry_count": r[1],
|
||
"earliest": r[2],
|
||
"latest": r[3],
|
||
"error_count": r[4],
|
||
}
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def format_results(results: list[SearchResult], max_text: int = 300) -> str:
|
||
"""Format search results as readable text for LLM context."""
|
||
if not results:
|
||
return "No matching log entries found."
|
||
|
||
lines = []
|
||
for r in results:
|
||
ts = r.timestamp_iso or "no-timestamp"
|
||
sev = r.severity or "?"
|
||
src = r.source_id
|
||
flags = []
|
||
if r.repeat_count > 1:
|
||
flags.append(f"repeat×{r.repeat_count}")
|
||
if r.out_of_order:
|
||
flags.append("out-of-order")
|
||
if r.matched_patterns:
|
||
flags.append(f"[{', '.join(r.matched_patterns)}]")
|
||
flag_str = f" {' '.join(flags)}" if flags else ""
|
||
|
||
text = r.text[:max_text] + ("…" if len(r.text) > max_text else "")
|
||
lines.append(f"[{ts} | {src} | {sev}{flag_str}]\n{text}")
|
||
|
||
return "\n\n".join(lines)
|