Watcher, REST endpoints, services (search, incidents, blocklist),
MCP server, context retriever, embedder, glean_scheduler, and
doc_upload all used the default 5-second SQLite busy timeout.
During collect glean write phases, watcher flush threads were hitting
'database is locked' errors when the glean held the write lock longer
than 5 seconds.
All connections now use timeout=30.0, matching the pipeline fix
from commit 5a9281a. No logic changes.
198 lines
6.6 KiB
Python
198 lines
6.6 KiB
Python
"""Turnstone MCP server — log search and diagnostics via stdio transport.
|
|
|
|
Works with both Claude Code and Copilot CLI (both use MCP stdio).
|
|
|
|
Run directly: python -m app.mcp_server
|
|
Or via: python app/mcp_server.py
|
|
|
|
Configure TURNSTONE_DB env var to override the default DB path.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import sqlite3
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
# All logging must go to stderr — stdout is reserved for MCP JSON-RPC
|
|
logging.basicConfig(
|
|
stream=sys.stderr,
|
|
level=logging.INFO,
|
|
format="%(levelname)s %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Allow running as a script from the project root
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from app.services.search import (
|
|
build_fts_index,
|
|
format_results,
|
|
list_sources,
|
|
search,
|
|
)
|
|
|
|
_DEFAULT_DB = Path(__file__).parent.parent / "data" / "turnstone.db"
|
|
DB_PATH = Path(os.environ.get("TURNSTONE_DB", _DEFAULT_DB))
|
|
|
|
mcp = FastMCP(
|
|
"turnstone",
|
|
instructions=(
|
|
"Turnstone is a diagnostic intelligence layer for server, service, and device logs. "
|
|
"Call list_log_sources first to understand what data is available and identify "
|
|
"source_id values for filtering. Use search_logs for targeted FTS queries. "
|
|
"Use diagnose for symptom-driven investigation — it runs layered searches and "
|
|
"returns evidence ranked by severity and relevance."
|
|
),
|
|
)
|
|
|
|
_index_ready = False
|
|
|
|
|
|
def _ensure_index() -> None:
|
|
"""Build FTS index on first use; skip if already present."""
|
|
global _index_ready
|
|
if _index_ready:
|
|
return
|
|
|
|
try:
|
|
conn = sqlite3.connect(str(DB_PATH), timeout=30.0)
|
|
count = conn.execute("SELECT COUNT(*) FROM log_fts").fetchone()[0]
|
|
conn.close()
|
|
if count > 0:
|
|
_index_ready = True
|
|
logger.info("FTS index present (%d entries)", count)
|
|
return
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
logger.info("Building FTS index for %s — this may take a minute...", DB_PATH)
|
|
build_fts_index(DB_PATH)
|
|
_index_ready = True
|
|
logger.info("FTS index ready")
|
|
|
|
|
|
@mcp.tool()
|
|
def search_logs(
|
|
query: str,
|
|
severity: str | None = None,
|
|
source: str | None = None,
|
|
pattern: str | None = None,
|
|
since: str | None = None,
|
|
until: str | None = None,
|
|
limit: int = 20,
|
|
include_repeats: bool = False,
|
|
) -> str:
|
|
"""Search log entries using full-text search with optional filters.
|
|
|
|
Args:
|
|
query: FTS5 search expression. Supports AND, OR, NOT, phrase quotes, prefix*.
|
|
Example: '"connection refused" OR "connection lost"'
|
|
severity: Filter by level — EMERGENCY, ALERT, CRITICAL, ERROR, WARN, NOTICE, INFO, DEBUG.
|
|
source: Partial match on source_id. Format is 'corpus:host:service'.
|
|
Example: 'example-node:caddy' matches all Caddy entries from example-node.
|
|
pattern: Filter by named pattern tag applied at glean time.
|
|
Known tags: auth_failure, connection_lost, oom, segfault, disk_full,
|
|
timeout, caddy_tls_error, caddy_config_error, caddy_auth_error,
|
|
caddy_upstream_error, service_restart, service_update,
|
|
power_failure, network_interface, ip_change.
|
|
since: Earliest timestamp, ISO 8601 (e.g. 2026-05-06T18:00:00).
|
|
until: Latest timestamp, ISO 8601.
|
|
limit: Result count cap (default 20, max 100).
|
|
include_repeats: Include deduplicated repeat log lines (default False).
|
|
|
|
Returns:
|
|
Formatted log entries ranked by FTS relevance, or a no-results message.
|
|
"""
|
|
_ensure_index()
|
|
results = search(
|
|
DB_PATH,
|
|
query=query,
|
|
severity=severity,
|
|
source_filter=source,
|
|
pattern_filter=pattern,
|
|
since=since,
|
|
until=until,
|
|
limit=min(limit, 100),
|
|
include_repeats=include_repeats,
|
|
)
|
|
return format_results(results)
|
|
|
|
|
|
@mcp.tool()
|
|
def diagnose(
|
|
symptom: str,
|
|
source: str | None = None,
|
|
since: str | None = None,
|
|
until: str | None = None,
|
|
) -> str:
|
|
"""Investigate a symptom or problem description across the log corpus.
|
|
|
|
Runs layered searches — broad relevance first, then CRITICAL/ERROR filtered —
|
|
and merges results deduplicated by entry ID, sorted chronologically.
|
|
Use this for open-ended questions like "why did auth break?" or
|
|
"what happened during the network outage?".
|
|
|
|
Args:
|
|
symptom: Plain-language description of the problem or symptom.
|
|
source: Limit to a host or service (partial match on source_id).
|
|
since: Earliest timestamp, ISO 8601.
|
|
until: Latest timestamp, ISO 8601.
|
|
|
|
Returns:
|
|
Consolidated log evidence sorted by time, formatted for analysis.
|
|
"""
|
|
_ensure_index()
|
|
|
|
common = dict(source_filter=source, since=since, until=until, include_repeats=False)
|
|
|
|
broad = search(DB_PATH, query=symptom, limit=15, **common)
|
|
critical = search(DB_PATH, query=symptom, severity="CRITICAL", limit=5, **common)
|
|
errors = search(DB_PATH, query=symptom, severity="ERROR", limit=10, **common)
|
|
|
|
seen: set[str] = set()
|
|
combined = []
|
|
for r in broad + critical + errors:
|
|
if r.entry_id not in seen:
|
|
seen.add(r.entry_id)
|
|
combined.append(r)
|
|
|
|
combined.sort(key=lambda r: (r.timestamp_iso or "\xff", r.sequence))
|
|
return format_results(combined[:20])
|
|
|
|
|
|
@mcp.tool()
|
|
def list_log_sources() -> str:
|
|
"""List all log sources in the corpus with entry counts and time ranges.
|
|
|
|
Call this before searching to understand what data is available and to
|
|
identify source_id values for use in the source= filter parameter.
|
|
|
|
Returns:
|
|
Each source on its own block with entry count, error count, and time range.
|
|
"""
|
|
sources = list_sources(DB_PATH)
|
|
if not sources:
|
|
return "No log sources found. Has the corpus been gleaned? Run: python scripts/glean_corpus.py"
|
|
|
|
lines = [f"Corpus: {DB_PATH}", f"Sources ({len(sources)} total):\n"]
|
|
for s in sources:
|
|
err_note = f" — {s['error_count']:,} errors" if s["error_count"] else ""
|
|
lines.append(
|
|
f" {s['source_id']}\n"
|
|
f" entries: {s['entry_count']:,}{err_note}\n"
|
|
f" range: {s['earliest'] or 'unknown'} → {s['latest'] or 'unknown'}"
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if not DB_PATH.exists():
|
|
logger.error("Database not found: %s", DB_PATH)
|
|
logger.error("Run: python scripts/glean_corpus.py <corpus_dir> <db_path>")
|
|
sys.exit(1)
|
|
logger.info("Starting Turnstone MCP server (DB: %s)", DB_PATH)
|
|
mcp.run()
|