turnstone/app/services/search.py
pyr0ball bbe4b1e360 feat: initial Turnstone POC — ingest, FTS search, MCP server
Ingest pipeline (journald / Caddy / Docker-wrapped formats) with
per-source state tracking (repeat dedup, out-of-order detection),
named pattern tagging at ingest time, and idempotent SHA1-keyed writes.

FTS5 search layer with porter stemmer, severity/source/pattern/time
filters, and BM25 ranking. MCP server (FastMCP stdio) with three tools:
search_logs, diagnose, list_log_sources — compatible with both
Claude Code and Copilot CLI.

WAL mode enabled on all connections. FTS index auto-built after ingest.
MCP configs included for Claude Code (.mcp.json) and Copilot CLI
(.github/copilot/mcp.json).
2026-05-08 12:12:34 -07:00

200 lines
6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""FTS5-based log search with severity, source, and pattern filters."""
from __future__ import annotations
import json
import logging
import sqlite3
from dataclasses import dataclass
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class SearchResult:
entry_id: str
source_id: str
sequence: int
timestamp_iso: str | None
severity: str | None
repeat_count: int
out_of_order: bool
matched_patterns: list[str]
text: str
rank: float
def build_fts_index(db_path: Path) -> None:
"""Build (or rebuild) the FTS5 index from log_entries. Safe to re-run.
Drops and recreates the table if the schema is stale (missing sequence column).
"""
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
# Check whether existing table has the sequence column; rebuild if not.
needs_rebuild = False
try:
conn.execute("SELECT sequence FROM log_fts LIMIT 0")
except sqlite3.OperationalError:
needs_rebuild = True
if needs_rebuild:
conn.execute("DROP TABLE IF EXISTS log_fts")
conn.executescript("""
CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5(
text,
entry_id UNINDEXED,
source_id UNINDEXED,
sequence UNINDEXED,
severity UNINDEXED,
timestamp_iso UNINDEXED,
matched_patterns UNINDEXED,
repeat_count UNINDEXED,
out_of_order UNINDEXED,
tokenize = 'porter ascii'
);
""")
# Only insert rows not already indexed
conn.execute("""
INSERT INTO log_fts(text, entry_id, source_id, sequence, severity,
timestamp_iso, matched_patterns,
repeat_count, out_of_order)
SELECT e.text, e.id, e.source_id, e.sequence, e.severity,
e.timestamp_iso, e.matched_patterns,
e.repeat_count, e.out_of_order
FROM log_entries e
WHERE e.id NOT IN (SELECT entry_id FROM log_fts WHERE entry_id IS NOT NULL)
""")
conn.commit()
conn.close()
def search(
db_path: Path,
query: str,
severity: str | None = None,
source_filter: str | None = None,
pattern_filter: str | None = None,
since: str | None = None,
until: str | None = None,
limit: int = 20,
include_repeats: bool = False,
) -> list[SearchResult]:
"""Full-text search with optional filters. Returns results ranked by relevance."""
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
conditions = ["log_fts MATCH ?"]
params: list = [query]
if severity:
conditions.append("severity = ?")
params.append(severity.upper())
if source_filter:
conditions.append("source_id LIKE ?")
params.append(f"%{source_filter}%")
if pattern_filter:
conditions.append("matched_patterns LIKE ?")
params.append(f'%"{pattern_filter}"%')
if since:
conditions.append("timestamp_iso >= ?")
params.append(since)
if until:
conditions.append("timestamp_iso <= ?")
params.append(until)
if not include_repeats:
conditions.append("repeat_count = 1")
where = " AND ".join(conditions)
params.append(limit)
try:
rows = conn.execute(
f"""
SELECT entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, rank
FROM log_fts
WHERE {where}
ORDER BY rank
LIMIT ?
""",
params,
).fetchall()
except sqlite3.OperationalError as e:
logger.warning("FTS query failed (%s) — index may not be built yet", e)
conn.close()
return []
results = [
SearchResult(
entry_id=r["entry_id"],
source_id=r["source_id"],
sequence=r["sequence"],
timestamp_iso=r["timestamp_iso"],
severity=r["severity"],
repeat_count=r["repeat_count"],
out_of_order=bool(r["out_of_order"]),
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
text=r["text"],
rank=r["rank"],
)
for r in rows
]
conn.close()
return results
def list_sources(db_path: Path) -> list[dict]:
"""Return distinct sources with entry counts and time ranges."""
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
rows = conn.execute("""
SELECT
source_id,
COUNT(*) as entry_count,
MIN(timestamp_iso) as earliest,
MAX(timestamp_iso) as latest,
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) as error_count
FROM log_entries
GROUP BY source_id
ORDER BY entry_count DESC
""").fetchall()
conn.close()
return [
{
"source_id": r[0],
"entry_count": r[1],
"earliest": r[2],
"latest": r[3],
"error_count": r[4],
}
for r in rows
]
def format_results(results: list[SearchResult], max_text: int = 300) -> str:
"""Format search results as readable text for LLM context."""
if not results:
return "No matching log entries found."
lines = []
for r in results:
ts = r.timestamp_iso or "no-timestamp"
sev = r.severity or "?"
src = r.source_id
flags = []
if r.repeat_count > 1:
flags.append(f"repeat×{r.repeat_count}")
if r.out_of_order:
flags.append("out-of-order")
if r.matched_patterns:
flags.append(f"[{', '.join(r.matched_patterns)}]")
flag_str = f" {' '.join(flags)}" if flags else ""
text = r.text[:max_text] + ("" if len(r.text) > max_text else "")
lines.append(f"[{ts} | {src} | {sev}{flag_str}]\n{text}")
return "\n\n".join(lines)