feat: initial Turnstone POC — ingest, FTS search, MCP server
Ingest pipeline (journald / Caddy / Docker-wrapped formats) with per-source state tracking (repeat dedup, out-of-order detection), named pattern tagging at ingest time, and idempotent SHA1-keyed writes. FTS5 search layer with porter stemmer, severity/source/pattern/time filters, and BM25 ranking. MCP server (FastMCP stdio) with three tools: search_logs, diagnose, list_log_sources — compatible with both Claude Code and Copilot CLI. WAL mode enabled on all connections. FTS index auto-built after ingest. MCP configs included for Claude Code (.mcp.json) and Copilot CLI (.github/copilot/mcp.json).
This commit is contained in:
commit
3e6eabb7ce
19 changed files with 1082 additions and 0 deletions
15
.github/copilot/mcp.json
vendored
Normal file
15
.github/copilot/mcp.json
vendored
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
{
|
||||
"servers": {
|
||||
"turnstone": {
|
||||
"type": "stdio",
|
||||
"command": "conda",
|
||||
"args": [
|
||||
"run", "--no-capture-output", "-n", "cf",
|
||||
"python", "-m", "app.mcp_server"
|
||||
],
|
||||
"env": {
|
||||
"TURNSTONE_DB": "/Library/Development/CircuitForge/turnstone/data/turnstone.db"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
10
.gitignore
vendored
Normal file
10
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
data/
|
||||
corpus/raw/
|
||||
__pycache__/
|
||||
*.pyc
|
||||
*.pyo
|
||||
*.pyd
|
||||
.env
|
||||
*.db
|
||||
*.db-wal
|
||||
*.db-shm
|
||||
15
.mcp.json
Normal file
15
.mcp.json
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
{
|
||||
"mcpServers": {
|
||||
"turnstone": {
|
||||
"command": "conda",
|
||||
"args": [
|
||||
"run", "--no-capture-output", "-n", "cf",
|
||||
"python", "-m", "app.mcp_server"
|
||||
],
|
||||
"cwd": "/Library/Development/CircuitForge/turnstone",
|
||||
"env": {
|
||||
"TURNSTONE_DB": "/Library/Development/CircuitForge/turnstone/data/turnstone.db"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
0
app/__init__.py
Normal file
0
app/__init__.py
Normal file
0
app/api/__init__.py
Normal file
0
app/api/__init__.py
Normal file
0
app/ingest/__init__.py
Normal file
0
app/ingest/__init__.py
Normal file
102
app/ingest/base.py
Normal file
102
app/ingest/base.py
Normal file
|
|
@ -0,0 +1,102 @@
|
|||
"""Shared utilities for all Turnstone log ingestors."""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Iterator
|
||||
|
||||
import yaml
|
||||
|
||||
from app.services.models import LogPattern, RetrievedEntry
|
||||
|
||||
SYSLOG_PRIORITY: dict[str, str] = {
|
||||
"0": "EMERGENCY", "1": "ALERT", "2": "CRITICAL",
|
||||
"3": "ERROR", "4": "WARN", "5": "NOTICE",
|
||||
"6": "INFO", "7": "DEBUG",
|
||||
}
|
||||
|
||||
_SEVERITY_RE = re.compile(
|
||||
r"\b(EMERGENCY|ALERT|CRITICAL|ERROR|WARN(?:ING)?|NOTICE|INFO|DEBUG)\b",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def load_patterns(path: Path) -> list[LogPattern]:
|
||||
if not path.exists():
|
||||
return []
|
||||
raw = yaml.safe_load(path.read_text())
|
||||
return [
|
||||
LogPattern(
|
||||
name=p["name"],
|
||||
pattern=p["pattern"],
|
||||
severity=p["severity"],
|
||||
description=p["description"],
|
||||
)
|
||||
for p in raw.get("patterns", [])
|
||||
]
|
||||
|
||||
|
||||
def _compile(patterns: list[LogPattern]) -> list[tuple[LogPattern, re.Pattern]]:
|
||||
return [(p, re.compile(p.pattern, re.IGNORECASE)) for p in patterns]
|
||||
|
||||
|
||||
def apply_patterns(
|
||||
text: str,
|
||||
compiled: list[tuple[LogPattern, re.Pattern]],
|
||||
) -> tuple[str, ...]:
|
||||
return tuple(p.name for p, rx in compiled if rx.search(text))
|
||||
|
||||
|
||||
def detect_severity(text: str) -> str | None:
|
||||
m = _SEVERITY_RE.search(text)
|
||||
return m.group(0).upper() if m else None
|
||||
|
||||
|
||||
def epoch_micros_to_iso(micros: str | int) -> str:
|
||||
ts = int(micros) / 1_000_000
|
||||
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
|
||||
|
||||
|
||||
def epoch_float_to_iso(ts: float) -> str:
|
||||
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
|
||||
|
||||
|
||||
def now_iso() -> str:
|
||||
return datetime.now(tz=timezone.utc).isoformat()
|
||||
|
||||
|
||||
def make_entry_id(source_id: str, sequence: int, text: str) -> str:
|
||||
key = f"{source_id}:{sequence}:{text[:64]}"
|
||||
return hashlib.sha1(key.encode()).hexdigest()
|
||||
|
||||
|
||||
class SourceState:
|
||||
"""Tracks per-source state for repeat and out-of-order detection."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._last_text: str = ""
|
||||
self._last_iso: str | None = None
|
||||
self._repeat: int = 0
|
||||
self.sequence: int = 0
|
||||
|
||||
def observe(
|
||||
self, text: str, timestamp_iso: str | None
|
||||
) -> tuple[int, bool]:
|
||||
"""Return (repeat_count, out_of_order) for this entry."""
|
||||
self.sequence += 1
|
||||
|
||||
if text == self._last_text:
|
||||
self._repeat += 1
|
||||
else:
|
||||
self._repeat = 1
|
||||
self._last_text = text
|
||||
|
||||
out_of_order = False
|
||||
if timestamp_iso and self._last_iso:
|
||||
out_of_order = timestamp_iso < self._last_iso
|
||||
if timestamp_iso:
|
||||
self._last_iso = timestamp_iso
|
||||
|
||||
return self._repeat, out_of_order
|
||||
85
app/ingest/caddy.py
Normal file
85
app/ingest/caddy.py
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
"""Caddy structured JSON access log parser."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Iterator
|
||||
|
||||
from app.ingest.base import (
|
||||
SourceState, apply_patterns, epoch_float_to_iso,
|
||||
make_entry_id, now_iso,
|
||||
)
|
||||
from app.services.models import LogPattern, RetrievedEntry
|
||||
|
||||
_LEVEL_MAP = {"debug": "DEBUG", "info": "INFO", "warn": "WARN", "error": "ERROR"}
|
||||
|
||||
|
||||
def _summarise(entry: dict) -> str:
|
||||
"""Build a human-readable text representation of a Caddy log entry."""
|
||||
msg = entry.get("msg", entry.get("message", ""))
|
||||
req = entry.get("request", {})
|
||||
if req:
|
||||
method = req.get("method", "")
|
||||
host = req.get("host", "")
|
||||
uri = req.get("uri", "")
|
||||
status = entry.get("status", "")
|
||||
duration = entry.get("duration", "")
|
||||
err = entry.get("error", "")
|
||||
parts = [msg, f"{method} {host}{uri}" if method else "", f"status={status}" if status else ""]
|
||||
if duration:
|
||||
parts.append(f"duration={duration:.3f}s")
|
||||
if err:
|
||||
parts.append(f"error={err}")
|
||||
return " ".join(p for p in parts if p)
|
||||
# Non-access log entries (TLS, config, etc.)
|
||||
err = entry.get("error", "")
|
||||
return f"{msg} {err}".strip() if err else msg
|
||||
|
||||
|
||||
def parse(
|
||||
lines: Iterator[str],
|
||||
source_id: str,
|
||||
compiled_patterns: list[tuple[LogPattern, object]],
|
||||
ingest_time: str | None = None,
|
||||
) -> Iterator[RetrievedEntry]:
|
||||
ingest_time = ingest_time or now_iso()
|
||||
state = SourceState()
|
||||
|
||||
for raw_line in lines:
|
||||
raw_line = raw_line.strip()
|
||||
if not raw_line:
|
||||
continue
|
||||
try:
|
||||
entry = json.loads(raw_line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
if "ts" not in entry:
|
||||
continue
|
||||
|
||||
ts_float = float(entry["ts"])
|
||||
ts_iso = epoch_float_to_iso(ts_float)
|
||||
ts_raw = str(entry["ts"])
|
||||
|
||||
level_raw = entry.get("level", "info")
|
||||
severity = _LEVEL_MAP.get(level_raw.lower(), level_raw.upper())
|
||||
|
||||
text = _summarise(entry)
|
||||
if not text:
|
||||
continue
|
||||
|
||||
repeat, out_of_order = state.observe(text, ts_iso)
|
||||
matched = apply_patterns(text, compiled_patterns)
|
||||
|
||||
yield RetrievedEntry(
|
||||
entry_id=make_entry_id(source_id, state.sequence, text),
|
||||
source_id=source_id,
|
||||
sequence=state.sequence,
|
||||
timestamp_raw=ts_raw,
|
||||
timestamp_iso=ts_iso,
|
||||
ingest_time=ingest_time,
|
||||
severity=severity,
|
||||
repeat_count=repeat,
|
||||
out_of_order=out_of_order,
|
||||
matched_patterns=matched,
|
||||
text=text,
|
||||
)
|
||||
66
app/ingest/docker_log.py
Normal file
66
app/ingest/docker_log.py
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
"""Docker-wrapped log parser (Turnstone's custom corpus format)."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Iterator
|
||||
|
||||
from app.ingest.base import (
|
||||
SourceState, apply_patterns, detect_severity,
|
||||
make_entry_id, now_iso,
|
||||
)
|
||||
from app.services.models import LogPattern, RetrievedEntry
|
||||
|
||||
|
||||
def parse(
|
||||
lines: Iterator[str],
|
||||
source_id: str,
|
||||
compiled_patterns: list[tuple[LogPattern, object]],
|
||||
ingest_time: str | None = None,
|
||||
) -> Iterator[RetrievedEntry]:
|
||||
ingest_time = ingest_time or now_iso()
|
||||
# One SourceState per container name
|
||||
states: dict[str, SourceState] = {}
|
||||
|
||||
for raw_line in lines:
|
||||
raw_line = raw_line.strip()
|
||||
if not raw_line:
|
||||
continue
|
||||
try:
|
||||
entry = json.loads(raw_line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
src = entry.get("SOURCE", source_id)
|
||||
text = entry.get("MESSAGE", "").strip()
|
||||
if not text:
|
||||
continue
|
||||
|
||||
# Try to parse inner JSON (Docker often logs JSON itself)
|
||||
try:
|
||||
inner = json.loads(text)
|
||||
if isinstance(inner, dict):
|
||||
text = inner.get("msg") or inner.get("message") or inner.get("MESSAGE") or text
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
|
||||
if src not in states:
|
||||
states[src] = SourceState()
|
||||
state = states[src]
|
||||
|
||||
severity = detect_severity(text)
|
||||
repeat, out_of_order = state.observe(text, None)
|
||||
matched = apply_patterns(text, compiled_patterns)
|
||||
|
||||
yield RetrievedEntry(
|
||||
entry_id=make_entry_id(src, state.sequence, text),
|
||||
source_id=src,
|
||||
sequence=state.sequence,
|
||||
timestamp_raw=None,
|
||||
timestamp_iso=None,
|
||||
ingest_time=ingest_time,
|
||||
severity=severity,
|
||||
repeat_count=repeat,
|
||||
out_of_order=False, # no timestamps to compare
|
||||
matched_patterns=matched,
|
||||
text=text,
|
||||
)
|
||||
75
app/ingest/journald.py
Normal file
75
app/ingest/journald.py
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
"""Journald JSON (-o json) log parser."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Iterator
|
||||
|
||||
from app.ingest.base import (
|
||||
SourceState, apply_patterns, epoch_micros_to_iso,
|
||||
make_entry_id, now_iso, SYSLOG_PRIORITY,
|
||||
)
|
||||
from app.services.models import LogPattern, RetrievedEntry
|
||||
|
||||
|
||||
def _extract_message(raw: dict) -> str:
|
||||
msg = raw.get("MESSAGE", "")
|
||||
# journald encodes binary messages as arrays of ints
|
||||
if isinstance(msg, list):
|
||||
try:
|
||||
return bytes(msg).decode("utf-8", errors="replace")
|
||||
except Exception:
|
||||
return repr(msg)
|
||||
return str(msg)
|
||||
|
||||
|
||||
def parse(
|
||||
lines: Iterator[str],
|
||||
source_id: str,
|
||||
compiled_patterns: list[tuple[LogPattern, object]],
|
||||
ingest_time: str | None = None,
|
||||
) -> Iterator[RetrievedEntry]:
|
||||
ingest_time = ingest_time or now_iso()
|
||||
state = SourceState()
|
||||
|
||||
for raw_line in lines:
|
||||
raw_line = raw_line.strip()
|
||||
if not raw_line:
|
||||
continue
|
||||
try:
|
||||
entry = json.loads(raw_line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
if "__REALTIME_TIMESTAMP" not in entry:
|
||||
continue
|
||||
|
||||
text = _extract_message(entry)
|
||||
if not text:
|
||||
continue
|
||||
|
||||
ts_raw = entry["__REALTIME_TIMESTAMP"]
|
||||
ts_iso = epoch_micros_to_iso(ts_raw)
|
||||
|
||||
priority = entry.get("PRIORITY", "")
|
||||
severity = SYSLOG_PRIORITY.get(str(priority))
|
||||
|
||||
hostname = entry.get("_HOSTNAME", "")
|
||||
unit = entry.get("_SYSTEMD_UNIT") or entry.get("SYSLOG_IDENTIFIER", "")
|
||||
src = f"{source_id}:{hostname}:{unit}" if hostname else source_id
|
||||
|
||||
repeat, out_of_order = state.observe(text, ts_iso)
|
||||
matched = apply_patterns(text, compiled_patterns)
|
||||
|
||||
yield RetrievedEntry(
|
||||
entry_id=make_entry_id(src, state.sequence, text),
|
||||
source_id=src,
|
||||
sequence=state.sequence,
|
||||
timestamp_raw=ts_raw,
|
||||
timestamp_iso=ts_iso,
|
||||
ingest_time=ingest_time,
|
||||
severity=severity,
|
||||
repeat_count=repeat,
|
||||
out_of_order=out_of_order,
|
||||
matched_patterns=matched,
|
||||
text=text,
|
||||
)
|
||||
146
app/ingest/pipeline.py
Normal file
146
app/ingest/pipeline.py
Normal file
|
|
@ -0,0 +1,146 @@
|
|||
"""Ingest pipeline: auto-detect format, parse, write to SQLite."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from typing import Iterator
|
||||
|
||||
from app.ingest import caddy, docker_log, journald
|
||||
from app.ingest.base import _compile, load_patterns, now_iso
|
||||
from app.services.models import LogPattern, RetrievedEntry
|
||||
from app.services.search import build_fts_index
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS log_entries (
|
||||
id TEXT PRIMARY KEY,
|
||||
source_id TEXT NOT NULL,
|
||||
sequence INTEGER NOT NULL,
|
||||
timestamp_raw TEXT,
|
||||
timestamp_iso TEXT,
|
||||
ingest_time TEXT NOT NULL,
|
||||
severity TEXT,
|
||||
repeat_count INTEGER DEFAULT 1,
|
||||
out_of_order INTEGER DEFAULT 0,
|
||||
matched_patterns TEXT DEFAULT '[]',
|
||||
text TEXT NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso);
|
||||
CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity);
|
||||
CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns);
|
||||
"""
|
||||
|
||||
|
||||
def _detect_format(first_line: str) -> str:
|
||||
try:
|
||||
obj = json.loads(first_line)
|
||||
if "__REALTIME_TIMESTAMP" in obj:
|
||||
return "journald"
|
||||
if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"):
|
||||
return "docker"
|
||||
if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj):
|
||||
return "caddy"
|
||||
except (json.JSONDecodeError, AttributeError):
|
||||
pass
|
||||
return "unknown"
|
||||
|
||||
|
||||
def _parse_file(
|
||||
path: Path,
|
||||
compiled: list[tuple[LogPattern, object]],
|
||||
ingest_time: str,
|
||||
) -> Iterator[RetrievedEntry]:
|
||||
source_id = path.stem
|
||||
|
||||
with path.open("r", errors="replace") as f:
|
||||
lines = iter(f)
|
||||
try:
|
||||
first = next(lines)
|
||||
except StopIteration:
|
||||
return
|
||||
|
||||
fmt = _detect_format(first.strip())
|
||||
logger.info("Detected format %r for %s", fmt, path.name)
|
||||
|
||||
def all_lines():
|
||||
yield first
|
||||
yield from lines
|
||||
|
||||
if fmt == "journald":
|
||||
yield from journald.parse(all_lines(), source_id, compiled, ingest_time)
|
||||
elif fmt == "docker":
|
||||
yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time)
|
||||
elif fmt == "caddy":
|
||||
yield from caddy.parse(all_lines(), source_id, compiled, ingest_time)
|
||||
else:
|
||||
logger.warning("Unknown format in %s — skipping", path.name)
|
||||
|
||||
|
||||
def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None:
|
||||
conn.executemany(
|
||||
"""
|
||||
INSERT OR IGNORE INTO log_entries
|
||||
(id, source_id, sequence, timestamp_raw, timestamp_iso,
|
||||
ingest_time, severity, repeat_count, out_of_order,
|
||||
matched_patterns, text)
|
||||
VALUES (?,?,?,?,?,?,?,?,?,?,?)
|
||||
""",
|
||||
[
|
||||
(
|
||||
e.entry_id, e.source_id, e.sequence,
|
||||
e.timestamp_raw, e.timestamp_iso, e.ingest_time,
|
||||
e.severity, e.repeat_count, int(e.out_of_order),
|
||||
json.dumps(list(e.matched_patterns)), e.text,
|
||||
)
|
||||
for e in batch
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
def ingest(
|
||||
corpus_dir: Path,
|
||||
db_path: Path,
|
||||
pattern_file: Path | None = None,
|
||||
batch_size: int = 1000,
|
||||
) -> dict[str, int]:
|
||||
pattern_file = pattern_file or Path("patterns/default.yaml")
|
||||
patterns = load_patterns(pattern_file)
|
||||
compiled = _compile(patterns)
|
||||
ingest_time = now_iso()
|
||||
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.executescript(_SCHEMA)
|
||||
conn.commit()
|
||||
|
||||
stats: dict[str, int] = {}
|
||||
|
||||
for jsonl_file in sorted(corpus_dir.glob("*.jsonl")):
|
||||
count = 0
|
||||
batch: list[RetrievedEntry] = []
|
||||
for entry in _parse_file(jsonl_file, compiled, ingest_time):
|
||||
batch.append(entry)
|
||||
if len(batch) >= batch_size:
|
||||
_write_batch(conn, batch)
|
||||
conn.commit()
|
||||
count += len(batch)
|
||||
batch.clear()
|
||||
if batch:
|
||||
_write_batch(conn, batch)
|
||||
conn.commit()
|
||||
count += len(batch)
|
||||
stats[jsonl_file.name] = count
|
||||
logger.info("Ingested %d entries from %s", count, jsonl_file.name)
|
||||
|
||||
conn.close()
|
||||
|
||||
logger.info("Building FTS index...")
|
||||
build_fts_index(db_path)
|
||||
logger.info("FTS index ready")
|
||||
|
||||
return stats
|
||||
198
app/mcp_server.py
Normal file
198
app/mcp_server.py
Normal file
|
|
@ -0,0 +1,198 @@
|
|||
"""Turnstone MCP server — log search and diagnostics via stdio transport.
|
||||
|
||||
Works with both Claude Code and Copilot CLI (both use MCP stdio).
|
||||
|
||||
Run directly: python -m app.mcp_server
|
||||
Or via: python app/mcp_server.py
|
||||
|
||||
Configure TURNSTONE_DB env var to override the default DB path.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
# All logging must go to stderr — stdout is reserved for MCP JSON-RPC
|
||||
logging.basicConfig(
|
||||
stream=sys.stderr,
|
||||
level=logging.INFO,
|
||||
format="%(levelname)s %(message)s",
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Allow running as a script from the project root
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from app.services.search import (
|
||||
build_fts_index,
|
||||
format_results,
|
||||
list_sources,
|
||||
search,
|
||||
)
|
||||
|
||||
_DEFAULT_DB = Path(__file__).parent.parent / "data" / "turnstone.db"
|
||||
DB_PATH = Path(os.environ.get("TURNSTONE_DB", _DEFAULT_DB))
|
||||
|
||||
mcp = FastMCP(
|
||||
"turnstone",
|
||||
instructions=(
|
||||
"Turnstone is a diagnostic intelligence layer for server, service, and device logs. "
|
||||
"Call list_log_sources first to understand what data is available and identify "
|
||||
"source_id values for filtering. Use search_logs for targeted FTS queries. "
|
||||
"Use diagnose for symptom-driven investigation — it runs layered searches and "
|
||||
"returns evidence ranked by severity and relevance."
|
||||
),
|
||||
)
|
||||
|
||||
_index_ready = False
|
||||
|
||||
|
||||
def _ensure_index() -> None:
|
||||
"""Build FTS index on first use; skip if already present."""
|
||||
global _index_ready
|
||||
if _index_ready:
|
||||
return
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
count = conn.execute("SELECT COUNT(*) FROM log_fts").fetchone()[0]
|
||||
conn.close()
|
||||
if count > 0:
|
||||
_index_ready = True
|
||||
logger.info("FTS index present (%d entries)", count)
|
||||
return
|
||||
except sqlite3.OperationalError:
|
||||
pass
|
||||
|
||||
logger.info("Building FTS index for %s — this may take a minute...", DB_PATH)
|
||||
build_fts_index(DB_PATH)
|
||||
_index_ready = True
|
||||
logger.info("FTS index ready")
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def search_logs(
|
||||
query: str,
|
||||
severity: str | None = None,
|
||||
source: str | None = None,
|
||||
pattern: str | None = None,
|
||||
since: str | None = None,
|
||||
until: str | None = None,
|
||||
limit: int = 20,
|
||||
include_repeats: bool = False,
|
||||
) -> str:
|
||||
"""Search log entries using full-text search with optional filters.
|
||||
|
||||
Args:
|
||||
query: FTS5 search expression. Supports AND, OR, NOT, phrase quotes, prefix*.
|
||||
Example: '"connection refused" OR "connection lost"'
|
||||
severity: Filter by level — EMERGENCY, ALERT, CRITICAL, ERROR, WARN, NOTICE, INFO, DEBUG.
|
||||
source: Partial match on source_id. Format is 'corpus:host:service'.
|
||||
Example: 'xanderland:caddy' matches all Caddy entries from xanderland.
|
||||
pattern: Filter by named pattern tag applied at ingest time.
|
||||
Known tags: auth_failure, connection_lost, oom, segfault, disk_full,
|
||||
timeout, caddy_tls_error, caddy_config_error, caddy_auth_error,
|
||||
caddy_upstream_error, service_restart, service_update,
|
||||
power_failure, network_interface, ip_change.
|
||||
since: Earliest timestamp, ISO 8601 (e.g. 2026-05-06T18:00:00).
|
||||
until: Latest timestamp, ISO 8601.
|
||||
limit: Result count cap (default 20, max 100).
|
||||
include_repeats: Include deduplicated repeat log lines (default False).
|
||||
|
||||
Returns:
|
||||
Formatted log entries ranked by FTS relevance, or a no-results message.
|
||||
"""
|
||||
_ensure_index()
|
||||
results = search(
|
||||
DB_PATH,
|
||||
query=query,
|
||||
severity=severity,
|
||||
source_filter=source,
|
||||
pattern_filter=pattern,
|
||||
since=since,
|
||||
until=until,
|
||||
limit=min(limit, 100),
|
||||
include_repeats=include_repeats,
|
||||
)
|
||||
return format_results(results)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def diagnose(
|
||||
symptom: str,
|
||||
source: str | None = None,
|
||||
since: str | None = None,
|
||||
until: str | None = None,
|
||||
) -> str:
|
||||
"""Investigate a symptom or problem description across the log corpus.
|
||||
|
||||
Runs layered searches — broad relevance first, then CRITICAL/ERROR filtered —
|
||||
and merges results deduplicated by entry ID, sorted chronologically.
|
||||
Use this for open-ended questions like "why did auth break?" or
|
||||
"what happened during the network outage?".
|
||||
|
||||
Args:
|
||||
symptom: Plain-language description of the problem or symptom.
|
||||
source: Limit to a host or service (partial match on source_id).
|
||||
since: Earliest timestamp, ISO 8601.
|
||||
until: Latest timestamp, ISO 8601.
|
||||
|
||||
Returns:
|
||||
Consolidated log evidence sorted by time, formatted for analysis.
|
||||
"""
|
||||
_ensure_index()
|
||||
|
||||
common = dict(source_filter=source, since=since, until=until, include_repeats=False)
|
||||
|
||||
broad = search(DB_PATH, query=symptom, limit=15, **common)
|
||||
critical = search(DB_PATH, query=symptom, severity="CRITICAL", limit=5, **common)
|
||||
errors = search(DB_PATH, query=symptom, severity="ERROR", limit=10, **common)
|
||||
|
||||
seen: set[str] = set()
|
||||
combined = []
|
||||
for r in broad + critical + errors:
|
||||
if r.entry_id not in seen:
|
||||
seen.add(r.entry_id)
|
||||
combined.append(r)
|
||||
|
||||
combined.sort(key=lambda r: (r.timestamp_iso or "\xff", r.sequence))
|
||||
return format_results(combined[:20])
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def list_log_sources() -> str:
|
||||
"""List all log sources in the corpus with entry counts and time ranges.
|
||||
|
||||
Call this before searching to understand what data is available and to
|
||||
identify source_id values for use in the source= filter parameter.
|
||||
|
||||
Returns:
|
||||
Each source on its own block with entry count, error count, and time range.
|
||||
"""
|
||||
sources = list_sources(DB_PATH)
|
||||
if not sources:
|
||||
return "No log sources found. Has the corpus been ingested? Run: python scripts/ingest_corpus.py"
|
||||
|
||||
lines = [f"Corpus: {DB_PATH}", f"Sources ({len(sources)} total):\n"]
|
||||
for s in sources:
|
||||
err_note = f" — {s['error_count']:,} errors" if s["error_count"] else ""
|
||||
lines.append(
|
||||
f" {s['source_id']}\n"
|
||||
f" entries: {s['entry_count']:,}{err_note}\n"
|
||||
f" range: {s['earliest'] or 'unknown'} → {s['latest'] or 'unknown'}"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if not DB_PATH.exists():
|
||||
logger.error("Database not found: %s", DB_PATH)
|
||||
logger.error("Run: python scripts/ingest_corpus.py <corpus_dir> <db_path>")
|
||||
sys.exit(1)
|
||||
logger.info("Starting Turnstone MCP server (DB: %s)", DB_PATH)
|
||||
mcp.run()
|
||||
0
app/services/__init__.py
Normal file
0
app/services/__init__.py
Normal file
33
app/services/models.py
Normal file
33
app/services/models.py
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
"""Core data models for Turnstone log retrieval."""
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RetrievedEntry:
|
||||
"""A log entry returned by the retriever, with source metadata and scores."""
|
||||
|
||||
entry_id: str
|
||||
source_id: str # log file path or service name
|
||||
sequence: int # original line number — ingest order, not wall-clock order
|
||||
timestamp_raw: str | None # timestamp as it appeared in the log
|
||||
timestamp_iso: str | None # parsed to ISO 8601 for sorting; None if unparseable
|
||||
ingest_time: str # when Turnstone indexed this entry (wall clock)
|
||||
severity: str | None # ERROR / WARN / INFO / DEBUG / None if not detected
|
||||
repeat_count: int # collapsed duplicate count (1 = unique)
|
||||
out_of_order: bool # True when timestamp precedes predecessor's timestamp
|
||||
matched_patterns: tuple[str, ...] = field(default_factory=tuple) # named pattern hits
|
||||
text: str = ""
|
||||
bm25_score: float = 0.0
|
||||
vector_score: float | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class LogPattern:
|
||||
"""A named regex pattern for tagging entries at ingest time."""
|
||||
|
||||
name: str # e.g. "device_disconnect", "auth_failure"
|
||||
pattern: str # regex string
|
||||
severity: str # suggested severity if not present in log line
|
||||
description: str # human-readable explanation for the UI
|
||||
200
app/services/search.py
Normal file
200
app/services/search.py
Normal file
|
|
@ -0,0 +1,200 @@
|
|||
"""FTS5-based log search with severity, source, and pattern filters."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import sqlite3
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SearchResult:
|
||||
entry_id: str
|
||||
source_id: str
|
||||
sequence: int
|
||||
timestamp_iso: str | None
|
||||
severity: str | None
|
||||
repeat_count: int
|
||||
out_of_order: bool
|
||||
matched_patterns: list[str]
|
||||
text: str
|
||||
rank: float
|
||||
|
||||
|
||||
def build_fts_index(db_path: Path) -> None:
|
||||
"""Build (or rebuild) the FTS5 index from log_entries. Safe to re-run.
|
||||
|
||||
Drops and recreates the table if the schema is stale (missing sequence column).
|
||||
"""
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
|
||||
# Check whether existing table has the sequence column; rebuild if not.
|
||||
needs_rebuild = False
|
||||
try:
|
||||
conn.execute("SELECT sequence FROM log_fts LIMIT 0")
|
||||
except sqlite3.OperationalError:
|
||||
needs_rebuild = True
|
||||
|
||||
if needs_rebuild:
|
||||
conn.execute("DROP TABLE IF EXISTS log_fts")
|
||||
|
||||
conn.executescript("""
|
||||
CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5(
|
||||
text,
|
||||
entry_id UNINDEXED,
|
||||
source_id UNINDEXED,
|
||||
sequence UNINDEXED,
|
||||
severity UNINDEXED,
|
||||
timestamp_iso UNINDEXED,
|
||||
matched_patterns UNINDEXED,
|
||||
repeat_count UNINDEXED,
|
||||
out_of_order UNINDEXED,
|
||||
tokenize = 'porter ascii'
|
||||
);
|
||||
""")
|
||||
# Only insert rows not already indexed
|
||||
conn.execute("""
|
||||
INSERT INTO log_fts(text, entry_id, source_id, sequence, severity,
|
||||
timestamp_iso, matched_patterns,
|
||||
repeat_count, out_of_order)
|
||||
SELECT e.text, e.id, e.source_id, e.sequence, e.severity,
|
||||
e.timestamp_iso, e.matched_patterns,
|
||||
e.repeat_count, e.out_of_order
|
||||
FROM log_entries e
|
||||
WHERE e.id NOT IN (SELECT entry_id FROM log_fts WHERE entry_id IS NOT NULL)
|
||||
""")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def search(
|
||||
db_path: Path,
|
||||
query: str,
|
||||
severity: str | None = None,
|
||||
source_filter: str | None = None,
|
||||
pattern_filter: str | None = None,
|
||||
since: str | None = None,
|
||||
until: str | None = None,
|
||||
limit: int = 20,
|
||||
include_repeats: bool = False,
|
||||
) -> list[SearchResult]:
|
||||
"""Full-text search with optional filters. Returns results ranked by relevance."""
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
conditions = ["log_fts MATCH ?"]
|
||||
params: list = [query]
|
||||
|
||||
if severity:
|
||||
conditions.append("severity = ?")
|
||||
params.append(severity.upper())
|
||||
if source_filter:
|
||||
conditions.append("source_id LIKE ?")
|
||||
params.append(f"%{source_filter}%")
|
||||
if pattern_filter:
|
||||
conditions.append("matched_patterns LIKE ?")
|
||||
params.append(f'%"{pattern_filter}"%')
|
||||
if since:
|
||||
conditions.append("timestamp_iso >= ?")
|
||||
params.append(since)
|
||||
if until:
|
||||
conditions.append("timestamp_iso <= ?")
|
||||
params.append(until)
|
||||
if not include_repeats:
|
||||
conditions.append("repeat_count = 1")
|
||||
|
||||
where = " AND ".join(conditions)
|
||||
params.append(limit)
|
||||
|
||||
try:
|
||||
rows = conn.execute(
|
||||
f"""
|
||||
SELECT entry_id, source_id, sequence, timestamp_iso, severity,
|
||||
repeat_count, out_of_order, matched_patterns, text, rank
|
||||
FROM log_fts
|
||||
WHERE {where}
|
||||
ORDER BY rank
|
||||
LIMIT ?
|
||||
""",
|
||||
params,
|
||||
).fetchall()
|
||||
except sqlite3.OperationalError as e:
|
||||
logger.warning("FTS query failed (%s) — index may not be built yet", e)
|
||||
conn.close()
|
||||
return []
|
||||
|
||||
results = [
|
||||
SearchResult(
|
||||
entry_id=r["entry_id"],
|
||||
source_id=r["source_id"],
|
||||
sequence=r["sequence"],
|
||||
timestamp_iso=r["timestamp_iso"],
|
||||
severity=r["severity"],
|
||||
repeat_count=r["repeat_count"],
|
||||
out_of_order=bool(r["out_of_order"]),
|
||||
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
|
||||
text=r["text"],
|
||||
rank=r["rank"],
|
||||
)
|
||||
for r in rows
|
||||
]
|
||||
conn.close()
|
||||
return results
|
||||
|
||||
|
||||
def list_sources(db_path: Path) -> list[dict]:
|
||||
"""Return distinct sources with entry counts and time ranges."""
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
rows = conn.execute("""
|
||||
SELECT
|
||||
source_id,
|
||||
COUNT(*) as entry_count,
|
||||
MIN(timestamp_iso) as earliest,
|
||||
MAX(timestamp_iso) as latest,
|
||||
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) as error_count
|
||||
FROM log_entries
|
||||
GROUP BY source_id
|
||||
ORDER BY entry_count DESC
|
||||
""").fetchall()
|
||||
conn.close()
|
||||
return [
|
||||
{
|
||||
"source_id": r[0],
|
||||
"entry_count": r[1],
|
||||
"earliest": r[2],
|
||||
"latest": r[3],
|
||||
"error_count": r[4],
|
||||
}
|
||||
for r in rows
|
||||
]
|
||||
|
||||
|
||||
def format_results(results: list[SearchResult], max_text: int = 300) -> str:
|
||||
"""Format search results as readable text for LLM context."""
|
||||
if not results:
|
||||
return "No matching log entries found."
|
||||
|
||||
lines = []
|
||||
for r in results:
|
||||
ts = r.timestamp_iso or "no-timestamp"
|
||||
sev = r.severity or "?"
|
||||
src = r.source_id
|
||||
flags = []
|
||||
if r.repeat_count > 1:
|
||||
flags.append(f"repeat×{r.repeat_count}")
|
||||
if r.out_of_order:
|
||||
flags.append("out-of-order")
|
||||
if r.matched_patterns:
|
||||
flags.append(f"[{', '.join(r.matched_patterns)}]")
|
||||
flag_str = f" {' '.join(flags)}" if flags else ""
|
||||
|
||||
text = r.text[:max_text] + ("…" if len(r.text) > max_text else "")
|
||||
lines.append(f"[{ts} | {src} | {sev}{flag_str}]\n{text}")
|
||||
|
||||
return "\n\n".join(lines)
|
||||
88
patterns/default.yaml
Normal file
88
patterns/default.yaml
Normal file
|
|
@ -0,0 +1,88 @@
|
|||
# Turnstone pattern library — named regex patterns for log tagging at ingest time.
|
||||
# Each matched pattern name is stored on RetrievedEntry.matched_patterns and
|
||||
# used to boost retrieval relevance for diagnostic queries.
|
||||
#
|
||||
# Add domain-specific patterns here. Patterns are applied in order; multiple
|
||||
# can match a single entry.
|
||||
|
||||
patterns:
|
||||
- name: service_restart
|
||||
pattern: "(restarting|restart requested|service.*start)"
|
||||
severity: WARN
|
||||
description: Service restart detected
|
||||
|
||||
- name: connection_lost
|
||||
pattern: "(connection (lost|dropped|refused|timed? out)|disconnect(ed)?)"
|
||||
severity: ERROR
|
||||
description: Network or device connection failure
|
||||
|
||||
- name: auth_failure
|
||||
pattern: "(auth(entication)? (failed?|error|denied)|permission denied|unauthorized)"
|
||||
severity: ERROR
|
||||
description: Authentication or authorization failure
|
||||
|
||||
- name: oom
|
||||
pattern: "(out of memory|OOM|killed process|cannot allocate)"
|
||||
severity: CRITICAL
|
||||
description: Out-of-memory condition
|
||||
|
||||
- name: segfault
|
||||
pattern: "(segmentation fault|segfault|SIGSEGV|core dump)"
|
||||
severity: CRITICAL
|
||||
description: Process crash or memory corruption
|
||||
|
||||
- name: disk_full
|
||||
pattern: "(no space left|disk full|filesystem.*full|ENOSPC)"
|
||||
severity: ERROR
|
||||
description: Storage capacity exhausted
|
||||
|
||||
- name: timeout
|
||||
pattern: "(timed? out|deadline exceeded|operation timed?)"
|
||||
severity: WARN
|
||||
description: Operation timeout
|
||||
|
||||
- name: caddy_tls_error
|
||||
pattern: "(acme|certificate|tls).*(error|fail|invalid|expired|renew)"
|
||||
severity: ERROR
|
||||
description: Caddy TLS or certificate error
|
||||
|
||||
- name: caddy_config_error
|
||||
pattern: "(config|caddyfile|directive).*(error|invalid|unknown|unrecognized)"
|
||||
severity: ERROR
|
||||
description: Caddy configuration error
|
||||
|
||||
- name: caddy_auth_error
|
||||
pattern: "(forward_auth|basicauth|basic_auth).*(error|fail|denied|invalid|unreachable)"
|
||||
severity: ERROR
|
||||
description: Caddy authentication middleware failure
|
||||
|
||||
- name: caddy_upstream_error
|
||||
pattern: "(upstream|backend|reverse.proxy).*(error|fail|unreachable|refused|timeout)"
|
||||
severity: ERROR
|
||||
description: Caddy upstream/backend failure
|
||||
|
||||
- name: service_update
|
||||
pattern: "(upgraded?|updated?|installing|dpkg|apt|package).*(caddy|nginx|apache|proxy)"
|
||||
severity: INFO
|
||||
description: Web server package update detected
|
||||
|
||||
- name: power_failure
|
||||
pattern: "(power (fail|loss|outage|cut)|ups|battery|shutdown.*power|lost power)"
|
||||
severity: CRITICAL
|
||||
description: Power failure or UPS event
|
||||
|
||||
- name: network_interface
|
||||
pattern: "(eth[0-9]|ens[0-9]|enp[0-9]|wlan[0-9]).*(down|up|carrier|link)"
|
||||
severity: WARN
|
||||
description: Network interface state change
|
||||
|
||||
- name: ip_change
|
||||
pattern: "(new ip|ip.*(changed|assigned|address)|dhcp.*(ack|offer|bound|renew))"
|
||||
severity: INFO
|
||||
description: IP address change or DHCP event
|
||||
|
||||
# Add device/service-specific patterns below this line:
|
||||
# - name: avcx_device_error
|
||||
# pattern: "ERR-\d{4}"
|
||||
# severity: ERROR
|
||||
# description: AVCX device error code
|
||||
21
scripts/build_fts_index.py
Normal file
21
scripts/build_fts_index.py
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
"""CLI: build (or update) the FTS5 full-text search index after ingest."""
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from app.services.search import build_fts_index
|
||||
|
||||
if __name__ == "__main__":
|
||||
db_path = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("data/turnstone.db")
|
||||
|
||||
if not db_path.exists():
|
||||
print(f"ERROR: database not found: {db_path}", file=sys.stderr)
|
||||
print("Run ingest first: python scripts/ingest_corpus.py", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
print(f"Building FTS index for {db_path} ...")
|
||||
build_fts_index(db_path)
|
||||
print("Done.")
|
||||
28
scripts/ingest_corpus.py
Normal file
28
scripts/ingest_corpus.py
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
"""CLI: ingest a corpus directory into the Turnstone SQLite database."""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
|
||||
|
||||
# Allow running from repo root
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from app.ingest.pipeline import ingest
|
||||
|
||||
if __name__ == "__main__":
|
||||
corpus_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("corpus/raw")
|
||||
db_path = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("data/turnstone.db")
|
||||
pattern_file = Path("patterns/default.yaml")
|
||||
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
print(f"Ingesting {corpus_dir} → {db_path}")
|
||||
stats = ingest(corpus_dir, db_path, pattern_file)
|
||||
|
||||
total = sum(stats.values())
|
||||
for fname, count in sorted(stats.items()):
|
||||
print(f" {fname}: {count:,}")
|
||||
print(f" TOTAL: {total:,} entries")
|
||||
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
Loading…
Reference in a new issue