feat: initial Turnstone POC — ingest, FTS search, MCP server

Ingest pipeline (journald / Caddy / Docker-wrapped formats) with
per-source state tracking (repeat dedup, out-of-order detection),
named pattern tagging at ingest time, and idempotent SHA1-keyed writes.

FTS5 search layer with porter stemmer, severity/source/pattern/time
filters, and BM25 ranking. MCP server (FastMCP stdio) with three tools:
search_logs, diagnose, list_log_sources — compatible with both
Claude Code and Copilot CLI.

WAL mode enabled on all connections. FTS index auto-built after ingest.
MCP configs included for Claude Code (.mcp.json) and Copilot CLI
(.github/copilot/mcp.json).
This commit is contained in:
pyr0ball 2026-05-08 12:12:34 -07:00
commit 64c3996aa1
19 changed files with 1082 additions and 0 deletions

15
.github/copilot/mcp.json vendored Normal file
View file

@ -0,0 +1,15 @@
{
"servers": {
"turnstone": {
"type": "stdio",
"command": "conda",
"args": [
"run", "--no-capture-output", "-n", "cf",
"python", "-m", "app.mcp_server"
],
"env": {
"TURNSTONE_DB": "/Library/Development/CircuitForge/turnstone/data/turnstone.db"
}
}
}
}

10
.gitignore vendored Normal file
View file

@ -0,0 +1,10 @@
data/
corpus/raw/
__pycache__/
*.pyc
*.pyo
*.pyd
.env
*.db
*.db-wal
*.db-shm

15
.mcp.json Normal file
View file

@ -0,0 +1,15 @@
{
"mcpServers": {
"turnstone": {
"command": "conda",
"args": [
"run", "--no-capture-output", "-n", "cf",
"python", "-m", "app.mcp_server"
],
"cwd": "/Library/Development/CircuitForge/turnstone",
"env": {
"TURNSTONE_DB": "/Library/Development/CircuitForge/turnstone/data/turnstone.db"
}
}
}
}

0
app/__init__.py Normal file
View file

0
app/api/__init__.py Normal file
View file

0
app/ingest/__init__.py Normal file
View file

102
app/ingest/base.py Normal file
View file

@ -0,0 +1,102 @@
"""Shared utilities for all Turnstone log ingestors."""
from __future__ import annotations
import hashlib
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterator
import yaml
from app.services.models import LogPattern, RetrievedEntry
SYSLOG_PRIORITY: dict[str, str] = {
"0": "EMERGENCY", "1": "ALERT", "2": "CRITICAL",
"3": "ERROR", "4": "WARN", "5": "NOTICE",
"6": "INFO", "7": "DEBUG",
}
_SEVERITY_RE = re.compile(
r"\b(EMERGENCY|ALERT|CRITICAL|ERROR|WARN(?:ING)?|NOTICE|INFO|DEBUG)\b",
re.IGNORECASE,
)
def load_patterns(path: Path) -> list[LogPattern]:
if not path.exists():
return []
raw = yaml.safe_load(path.read_text())
return [
LogPattern(
name=p["name"],
pattern=p["pattern"],
severity=p["severity"],
description=p["description"],
)
for p in raw.get("patterns", [])
]
def _compile(patterns: list[LogPattern]) -> list[tuple[LogPattern, re.Pattern]]:
return [(p, re.compile(p.pattern, re.IGNORECASE)) for p in patterns]
def apply_patterns(
text: str,
compiled: list[tuple[LogPattern, re.Pattern]],
) -> tuple[str, ...]:
return tuple(p.name for p, rx in compiled if rx.search(text))
def detect_severity(text: str) -> str | None:
m = _SEVERITY_RE.search(text)
return m.group(0).upper() if m else None
def epoch_micros_to_iso(micros: str | int) -> str:
ts = int(micros) / 1_000_000
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
def epoch_float_to_iso(ts: float) -> str:
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
def now_iso() -> str:
return datetime.now(tz=timezone.utc).isoformat()
def make_entry_id(source_id: str, sequence: int, text: str) -> str:
key = f"{source_id}:{sequence}:{text[:64]}"
return hashlib.sha1(key.encode()).hexdigest()
class SourceState:
"""Tracks per-source state for repeat and out-of-order detection."""
def __init__(self) -> None:
self._last_text: str = ""
self._last_iso: str | None = None
self._repeat: int = 0
self.sequence: int = 0
def observe(
self, text: str, timestamp_iso: str | None
) -> tuple[int, bool]:
"""Return (repeat_count, out_of_order) for this entry."""
self.sequence += 1
if text == self._last_text:
self._repeat += 1
else:
self._repeat = 1
self._last_text = text
out_of_order = False
if timestamp_iso and self._last_iso:
out_of_order = timestamp_iso < self._last_iso
if timestamp_iso:
self._last_iso = timestamp_iso
return self._repeat, out_of_order

85
app/ingest/caddy.py Normal file
View file

@ -0,0 +1,85 @@
"""Caddy structured JSON access log parser."""
from __future__ import annotations
import json
from typing import Iterator
from app.ingest.base import (
SourceState, apply_patterns, epoch_float_to_iso,
make_entry_id, now_iso,
)
from app.services.models import LogPattern, RetrievedEntry
_LEVEL_MAP = {"debug": "DEBUG", "info": "INFO", "warn": "WARN", "error": "ERROR"}
def _summarise(entry: dict) -> str:
"""Build a human-readable text representation of a Caddy log entry."""
msg = entry.get("msg", entry.get("message", ""))
req = entry.get("request", {})
if req:
method = req.get("method", "")
host = req.get("host", "")
uri = req.get("uri", "")
status = entry.get("status", "")
duration = entry.get("duration", "")
err = entry.get("error", "")
parts = [msg, f"{method} {host}{uri}" if method else "", f"status={status}" if status else ""]
if duration:
parts.append(f"duration={duration:.3f}s")
if err:
parts.append(f"error={err}")
return " ".join(p for p in parts if p)
# Non-access log entries (TLS, config, etc.)
err = entry.get("error", "")
return f"{msg} {err}".strip() if err else msg
def parse(
lines: Iterator[str],
source_id: str,
compiled_patterns: list[tuple[LogPattern, object]],
ingest_time: str | None = None,
) -> Iterator[RetrievedEntry]:
ingest_time = ingest_time or now_iso()
state = SourceState()
for raw_line in lines:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
entry = json.loads(raw_line)
except json.JSONDecodeError:
continue
if "ts" not in entry:
continue
ts_float = float(entry["ts"])
ts_iso = epoch_float_to_iso(ts_float)
ts_raw = str(entry["ts"])
level_raw = entry.get("level", "info")
severity = _LEVEL_MAP.get(level_raw.lower(), level_raw.upper())
text = _summarise(entry)
if not text:
continue
repeat, out_of_order = state.observe(text, ts_iso)
matched = apply_patterns(text, compiled_patterns)
yield RetrievedEntry(
entry_id=make_entry_id(source_id, state.sequence, text),
source_id=source_id,
sequence=state.sequence,
timestamp_raw=ts_raw,
timestamp_iso=ts_iso,
ingest_time=ingest_time,
severity=severity,
repeat_count=repeat,
out_of_order=out_of_order,
matched_patterns=matched,
text=text,
)

66
app/ingest/docker_log.py Normal file
View file

@ -0,0 +1,66 @@
"""Docker-wrapped log parser (Turnstone's custom corpus format)."""
from __future__ import annotations
import json
from typing import Iterator
from app.ingest.base import (
SourceState, apply_patterns, detect_severity,
make_entry_id, now_iso,
)
from app.services.models import LogPattern, RetrievedEntry
def parse(
lines: Iterator[str],
source_id: str,
compiled_patterns: list[tuple[LogPattern, object]],
ingest_time: str | None = None,
) -> Iterator[RetrievedEntry]:
ingest_time = ingest_time or now_iso()
# One SourceState per container name
states: dict[str, SourceState] = {}
for raw_line in lines:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
entry = json.loads(raw_line)
except json.JSONDecodeError:
continue
src = entry.get("SOURCE", source_id)
text = entry.get("MESSAGE", "").strip()
if not text:
continue
# Try to parse inner JSON (Docker often logs JSON itself)
try:
inner = json.loads(text)
if isinstance(inner, dict):
text = inner.get("msg") or inner.get("message") or inner.get("MESSAGE") or text
except (json.JSONDecodeError, TypeError):
pass
if src not in states:
states[src] = SourceState()
state = states[src]
severity = detect_severity(text)
repeat, out_of_order = state.observe(text, None)
matched = apply_patterns(text, compiled_patterns)
yield RetrievedEntry(
entry_id=make_entry_id(src, state.sequence, text),
source_id=src,
sequence=state.sequence,
timestamp_raw=None,
timestamp_iso=None,
ingest_time=ingest_time,
severity=severity,
repeat_count=repeat,
out_of_order=False, # no timestamps to compare
matched_patterns=matched,
text=text,
)

75
app/ingest/journald.py Normal file
View file

@ -0,0 +1,75 @@
"""Journald JSON (-o json) log parser."""
from __future__ import annotations
import json
from typing import Iterator
from app.ingest.base import (
SourceState, apply_patterns, epoch_micros_to_iso,
make_entry_id, now_iso, SYSLOG_PRIORITY,
)
from app.services.models import LogPattern, RetrievedEntry
def _extract_message(raw: dict) -> str:
msg = raw.get("MESSAGE", "")
# journald encodes binary messages as arrays of ints
if isinstance(msg, list):
try:
return bytes(msg).decode("utf-8", errors="replace")
except Exception:
return repr(msg)
return str(msg)
def parse(
lines: Iterator[str],
source_id: str,
compiled_patterns: list[tuple[LogPattern, object]],
ingest_time: str | None = None,
) -> Iterator[RetrievedEntry]:
ingest_time = ingest_time or now_iso()
state = SourceState()
for raw_line in lines:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
entry = json.loads(raw_line)
except json.JSONDecodeError:
continue
if "__REALTIME_TIMESTAMP" not in entry:
continue
text = _extract_message(entry)
if not text:
continue
ts_raw = entry["__REALTIME_TIMESTAMP"]
ts_iso = epoch_micros_to_iso(ts_raw)
priority = entry.get("PRIORITY", "")
severity = SYSLOG_PRIORITY.get(str(priority))
hostname = entry.get("_HOSTNAME", "")
unit = entry.get("_SYSTEMD_UNIT") or entry.get("SYSLOG_IDENTIFIER", "")
src = f"{source_id}:{hostname}:{unit}" if hostname else source_id
repeat, out_of_order = state.observe(text, ts_iso)
matched = apply_patterns(text, compiled_patterns)
yield RetrievedEntry(
entry_id=make_entry_id(src, state.sequence, text),
source_id=src,
sequence=state.sequence,
timestamp_raw=ts_raw,
timestamp_iso=ts_iso,
ingest_time=ingest_time,
severity=severity,
repeat_count=repeat,
out_of_order=out_of_order,
matched_patterns=matched,
text=text,
)

146
app/ingest/pipeline.py Normal file
View file

@ -0,0 +1,146 @@
"""Ingest pipeline: auto-detect format, parse, write to SQLite."""
from __future__ import annotations
import json
import logging
import re
import sqlite3
from pathlib import Path
from typing import Iterator
from app.ingest import caddy, docker_log, journald
from app.ingest.base import _compile, load_patterns, now_iso
from app.services.models import LogPattern, RetrievedEntry
from app.services.search import build_fts_index
logger = logging.getLogger(__name__)
_SCHEMA = """
CREATE TABLE IF NOT EXISTS log_entries (
id TEXT PRIMARY KEY,
source_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
timestamp_raw TEXT,
timestamp_iso TEXT,
ingest_time TEXT NOT NULL,
severity TEXT,
repeat_count INTEGER DEFAULT 1,
out_of_order INTEGER DEFAULT 0,
matched_patterns TEXT DEFAULT '[]',
text TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id);
CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso);
CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity);
CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns);
"""
def _detect_format(first_line: str) -> str:
try:
obj = json.loads(first_line)
if "__REALTIME_TIMESTAMP" in obj:
return "journald"
if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"):
return "docker"
if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj):
return "caddy"
except (json.JSONDecodeError, AttributeError):
pass
return "unknown"
def _parse_file(
path: Path,
compiled: list[tuple[LogPattern, object]],
ingest_time: str,
) -> Iterator[RetrievedEntry]:
source_id = path.stem
with path.open("r", errors="replace") as f:
lines = iter(f)
try:
first = next(lines)
except StopIteration:
return
fmt = _detect_format(first.strip())
logger.info("Detected format %r for %s", fmt, path.name)
def all_lines():
yield first
yield from lines
if fmt == "journald":
yield from journald.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "docker":
yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "caddy":
yield from caddy.parse(all_lines(), source_id, compiled, ingest_time)
else:
logger.warning("Unknown format in %s — skipping", path.name)
def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None:
conn.executemany(
"""
INSERT OR IGNORE INTO log_entries
(id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
""",
[
(
e.entry_id, e.source_id, e.sequence,
e.timestamp_raw, e.timestamp_iso, e.ingest_time,
e.severity, e.repeat_count, int(e.out_of_order),
json.dumps(list(e.matched_patterns)), e.text,
)
for e in batch
],
)
def ingest(
corpus_dir: Path,
db_path: Path,
pattern_file: Path | None = None,
batch_size: int = 1000,
) -> dict[str, int]:
pattern_file = pattern_file or Path("patterns/default.yaml")
patterns = load_patterns(pattern_file)
compiled = _compile(patterns)
ingest_time = now_iso()
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript(_SCHEMA)
conn.commit()
stats: dict[str, int] = {}
for jsonl_file in sorted(corpus_dir.glob("*.jsonl")):
count = 0
batch: list[RetrievedEntry] = []
for entry in _parse_file(jsonl_file, compiled, ingest_time):
batch.append(entry)
if len(batch) >= batch_size:
_write_batch(conn, batch)
conn.commit()
count += len(batch)
batch.clear()
if batch:
_write_batch(conn, batch)
conn.commit()
count += len(batch)
stats[jsonl_file.name] = count
logger.info("Ingested %d entries from %s", count, jsonl_file.name)
conn.close()
logger.info("Building FTS index...")
build_fts_index(db_path)
logger.info("FTS index ready")
return stats

198
app/mcp_server.py Normal file
View file

@ -0,0 +1,198 @@
"""Turnstone MCP server — log search and diagnostics via stdio transport.
Works with both Claude Code and Copilot CLI (both use MCP stdio).
Run directly: python -m app.mcp_server
Or via: python app/mcp_server.py
Configure TURNSTONE_DB env var to override the default DB path.
"""
from __future__ import annotations
import logging
import os
import sqlite3
import sys
from pathlib import Path
from mcp.server.fastmcp import FastMCP
# All logging must go to stderr — stdout is reserved for MCP JSON-RPC
logging.basicConfig(
stream=sys.stderr,
level=logging.INFO,
format="%(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
# Allow running as a script from the project root
sys.path.insert(0, str(Path(__file__).parent.parent))
from app.services.search import (
build_fts_index,
format_results,
list_sources,
search,
)
_DEFAULT_DB = Path(__file__).parent.parent / "data" / "turnstone.db"
DB_PATH = Path(os.environ.get("TURNSTONE_DB", _DEFAULT_DB))
mcp = FastMCP(
"turnstone",
instructions=(
"Turnstone is a diagnostic intelligence layer for server, service, and device logs. "
"Call list_log_sources first to understand what data is available and identify "
"source_id values for filtering. Use search_logs for targeted FTS queries. "
"Use diagnose for symptom-driven investigation — it runs layered searches and "
"returns evidence ranked by severity and relevance."
),
)
_index_ready = False
def _ensure_index() -> None:
"""Build FTS index on first use; skip if already present."""
global _index_ready
if _index_ready:
return
try:
conn = sqlite3.connect(str(DB_PATH))
count = conn.execute("SELECT COUNT(*) FROM log_fts").fetchone()[0]
conn.close()
if count > 0:
_index_ready = True
logger.info("FTS index present (%d entries)", count)
return
except sqlite3.OperationalError:
pass
logger.info("Building FTS index for %s — this may take a minute...", DB_PATH)
build_fts_index(DB_PATH)
_index_ready = True
logger.info("FTS index ready")
@mcp.tool()
def search_logs(
query: str,
severity: str | None = None,
source: str | None = None,
pattern: str | None = None,
since: str | None = None,
until: str | None = None,
limit: int = 20,
include_repeats: bool = False,
) -> str:
"""Search log entries using full-text search with optional filters.
Args:
query: FTS5 search expression. Supports AND, OR, NOT, phrase quotes, prefix*.
Example: '"connection refused" OR "connection lost"'
severity: Filter by level EMERGENCY, ALERT, CRITICAL, ERROR, WARN, NOTICE, INFO, DEBUG.
source: Partial match on source_id. Format is 'corpus:host:service'.
Example: 'example-node:caddy' matches all Caddy entries from example-node.
pattern: Filter by named pattern tag applied at ingest time.
Known tags: auth_failure, connection_lost, oom, segfault, disk_full,
timeout, caddy_tls_error, caddy_config_error, caddy_auth_error,
caddy_upstream_error, service_restart, service_update,
power_failure, network_interface, ip_change.
since: Earliest timestamp, ISO 8601 (e.g. 2026-05-06T18:00:00).
until: Latest timestamp, ISO 8601.
limit: Result count cap (default 20, max 100).
include_repeats: Include deduplicated repeat log lines (default False).
Returns:
Formatted log entries ranked by FTS relevance, or a no-results message.
"""
_ensure_index()
results = search(
DB_PATH,
query=query,
severity=severity,
source_filter=source,
pattern_filter=pattern,
since=since,
until=until,
limit=min(limit, 100),
include_repeats=include_repeats,
)
return format_results(results)
@mcp.tool()
def diagnose(
symptom: str,
source: str | None = None,
since: str | None = None,
until: str | None = None,
) -> str:
"""Investigate a symptom or problem description across the log corpus.
Runs layered searches broad relevance first, then CRITICAL/ERROR filtered
and merges results deduplicated by entry ID, sorted chronologically.
Use this for open-ended questions like "why did auth break?" or
"what happened during the network outage?".
Args:
symptom: Plain-language description of the problem or symptom.
source: Limit to a host or service (partial match on source_id).
since: Earliest timestamp, ISO 8601.
until: Latest timestamp, ISO 8601.
Returns:
Consolidated log evidence sorted by time, formatted for analysis.
"""
_ensure_index()
common = dict(source_filter=source, since=since, until=until, include_repeats=False)
broad = search(DB_PATH, query=symptom, limit=15, **common)
critical = search(DB_PATH, query=symptom, severity="CRITICAL", limit=5, **common)
errors = search(DB_PATH, query=symptom, severity="ERROR", limit=10, **common)
seen: set[str] = set()
combined = []
for r in broad + critical + errors:
if r.entry_id not in seen:
seen.add(r.entry_id)
combined.append(r)
combined.sort(key=lambda r: (r.timestamp_iso or "\xff", r.sequence))
return format_results(combined[:20])
@mcp.tool()
def list_log_sources() -> str:
"""List all log sources in the corpus with entry counts and time ranges.
Call this before searching to understand what data is available and to
identify source_id values for use in the source= filter parameter.
Returns:
Each source on its own block with entry count, error count, and time range.
"""
sources = list_sources(DB_PATH)
if not sources:
return "No log sources found. Has the corpus been ingested? Run: python scripts/ingest_corpus.py"
lines = [f"Corpus: {DB_PATH}", f"Sources ({len(sources)} total):\n"]
for s in sources:
err_note = f"{s['error_count']:,} errors" if s["error_count"] else ""
lines.append(
f" {s['source_id']}\n"
f" entries: {s['entry_count']:,}{err_note}\n"
f" range: {s['earliest'] or 'unknown'}{s['latest'] or 'unknown'}"
)
return "\n".join(lines)
if __name__ == "__main__":
if not DB_PATH.exists():
logger.error("Database not found: %s", DB_PATH)
logger.error("Run: python scripts/ingest_corpus.py <corpus_dir> <db_path>")
sys.exit(1)
logger.info("Starting Turnstone MCP server (DB: %s)", DB_PATH)
mcp.run()

0
app/services/__init__.py Normal file
View file

33
app/services/models.py Normal file
View file

@ -0,0 +1,33 @@
"""Core data models for Turnstone log retrieval."""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass(frozen=True)
class RetrievedEntry:
"""A log entry returned by the retriever, with source metadata and scores."""
entry_id: str
source_id: str # log file path or service name
sequence: int # original line number — ingest order, not wall-clock order
timestamp_raw: str | None # timestamp as it appeared in the log
timestamp_iso: str | None # parsed to ISO 8601 for sorting; None if unparseable
ingest_time: str # when Turnstone indexed this entry (wall clock)
severity: str | None # ERROR / WARN / INFO / DEBUG / None if not detected
repeat_count: int # collapsed duplicate count (1 = unique)
out_of_order: bool # True when timestamp precedes predecessor's timestamp
matched_patterns: tuple[str, ...] = field(default_factory=tuple) # named pattern hits
text: str = ""
bm25_score: float = 0.0
vector_score: float | None = None
@dataclass(frozen=True)
class LogPattern:
"""A named regex pattern for tagging entries at ingest time."""
name: str # e.g. "device_disconnect", "auth_failure"
pattern: str # regex string
severity: str # suggested severity if not present in log line
description: str # human-readable explanation for the UI

200
app/services/search.py Normal file
View file

@ -0,0 +1,200 @@
"""FTS5-based log search with severity, source, and pattern filters."""
from __future__ import annotations
import json
import logging
import sqlite3
from dataclasses import dataclass
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class SearchResult:
entry_id: str
source_id: str
sequence: int
timestamp_iso: str | None
severity: str | None
repeat_count: int
out_of_order: bool
matched_patterns: list[str]
text: str
rank: float
def build_fts_index(db_path: Path) -> None:
"""Build (or rebuild) the FTS5 index from log_entries. Safe to re-run.
Drops and recreates the table if the schema is stale (missing sequence column).
"""
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
# Check whether existing table has the sequence column; rebuild if not.
needs_rebuild = False
try:
conn.execute("SELECT sequence FROM log_fts LIMIT 0")
except sqlite3.OperationalError:
needs_rebuild = True
if needs_rebuild:
conn.execute("DROP TABLE IF EXISTS log_fts")
conn.executescript("""
CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5(
text,
entry_id UNINDEXED,
source_id UNINDEXED,
sequence UNINDEXED,
severity UNINDEXED,
timestamp_iso UNINDEXED,
matched_patterns UNINDEXED,
repeat_count UNINDEXED,
out_of_order UNINDEXED,
tokenize = 'porter ascii'
);
""")
# Only insert rows not already indexed
conn.execute("""
INSERT INTO log_fts(text, entry_id, source_id, sequence, severity,
timestamp_iso, matched_patterns,
repeat_count, out_of_order)
SELECT e.text, e.id, e.source_id, e.sequence, e.severity,
e.timestamp_iso, e.matched_patterns,
e.repeat_count, e.out_of_order
FROM log_entries e
WHERE e.id NOT IN (SELECT entry_id FROM log_fts WHERE entry_id IS NOT NULL)
""")
conn.commit()
conn.close()
def search(
db_path: Path,
query: str,
severity: str | None = None,
source_filter: str | None = None,
pattern_filter: str | None = None,
since: str | None = None,
until: str | None = None,
limit: int = 20,
include_repeats: bool = False,
) -> list[SearchResult]:
"""Full-text search with optional filters. Returns results ranked by relevance."""
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
conditions = ["log_fts MATCH ?"]
params: list = [query]
if severity:
conditions.append("severity = ?")
params.append(severity.upper())
if source_filter:
conditions.append("source_id LIKE ?")
params.append(f"%{source_filter}%")
if pattern_filter:
conditions.append("matched_patterns LIKE ?")
params.append(f'%"{pattern_filter}"%')
if since:
conditions.append("timestamp_iso >= ?")
params.append(since)
if until:
conditions.append("timestamp_iso <= ?")
params.append(until)
if not include_repeats:
conditions.append("repeat_count = 1")
where = " AND ".join(conditions)
params.append(limit)
try:
rows = conn.execute(
f"""
SELECT entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, rank
FROM log_fts
WHERE {where}
ORDER BY rank
LIMIT ?
""",
params,
).fetchall()
except sqlite3.OperationalError as e:
logger.warning("FTS query failed (%s) — index may not be built yet", e)
conn.close()
return []
results = [
SearchResult(
entry_id=r["entry_id"],
source_id=r["source_id"],
sequence=r["sequence"],
timestamp_iso=r["timestamp_iso"],
severity=r["severity"],
repeat_count=r["repeat_count"],
out_of_order=bool(r["out_of_order"]),
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
text=r["text"],
rank=r["rank"],
)
for r in rows
]
conn.close()
return results
def list_sources(db_path: Path) -> list[dict]:
"""Return distinct sources with entry counts and time ranges."""
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
rows = conn.execute("""
SELECT
source_id,
COUNT(*) as entry_count,
MIN(timestamp_iso) as earliest,
MAX(timestamp_iso) as latest,
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) as error_count
FROM log_entries
GROUP BY source_id
ORDER BY entry_count DESC
""").fetchall()
conn.close()
return [
{
"source_id": r[0],
"entry_count": r[1],
"earliest": r[2],
"latest": r[3],
"error_count": r[4],
}
for r in rows
]
def format_results(results: list[SearchResult], max_text: int = 300) -> str:
"""Format search results as readable text for LLM context."""
if not results:
return "No matching log entries found."
lines = []
for r in results:
ts = r.timestamp_iso or "no-timestamp"
sev = r.severity or "?"
src = r.source_id
flags = []
if r.repeat_count > 1:
flags.append(f"repeat×{r.repeat_count}")
if r.out_of_order:
flags.append("out-of-order")
if r.matched_patterns:
flags.append(f"[{', '.join(r.matched_patterns)}]")
flag_str = f" {' '.join(flags)}" if flags else ""
text = r.text[:max_text] + ("" if len(r.text) > max_text else "")
lines.append(f"[{ts} | {src} | {sev}{flag_str}]\n{text}")
return "\n\n".join(lines)

88
patterns/default.yaml Normal file
View file

@ -0,0 +1,88 @@
# Turnstone pattern library — named regex patterns for log tagging at ingest time.
# Each matched pattern name is stored on RetrievedEntry.matched_patterns and
# used to boost retrieval relevance for diagnostic queries.
#
# Add domain-specific patterns here. Patterns are applied in order; multiple
# can match a single entry.
patterns:
- name: service_restart
pattern: "(restarting|restart requested|service.*start)"
severity: WARN
description: Service restart detected
- name: connection_lost
pattern: "(connection (lost|dropped|refused|timed? out)|disconnect(ed)?)"
severity: ERROR
description: Network or device connection failure
- name: auth_failure
pattern: "(auth(entication)? (failed?|error|denied)|permission denied|unauthorized)"
severity: ERROR
description: Authentication or authorization failure
- name: oom
pattern: "(out of memory|OOM|killed process|cannot allocate)"
severity: CRITICAL
description: Out-of-memory condition
- name: segfault
pattern: "(segmentation fault|segfault|SIGSEGV|core dump)"
severity: CRITICAL
description: Process crash or memory corruption
- name: disk_full
pattern: "(no space left|disk full|filesystem.*full|ENOSPC)"
severity: ERROR
description: Storage capacity exhausted
- name: timeout
pattern: "(timed? out|deadline exceeded|operation timed?)"
severity: WARN
description: Operation timeout
- name: caddy_tls_error
pattern: "(acme|certificate|tls).*(error|fail|invalid|expired|renew)"
severity: ERROR
description: Caddy TLS or certificate error
- name: caddy_config_error
pattern: "(config|caddyfile|directive).*(error|invalid|unknown|unrecognized)"
severity: ERROR
description: Caddy configuration error
- name: caddy_auth_error
pattern: "(forward_auth|basicauth|basic_auth).*(error|fail|denied|invalid|unreachable)"
severity: ERROR
description: Caddy authentication middleware failure
- name: caddy_upstream_error
pattern: "(upstream|backend|reverse.proxy).*(error|fail|unreachable|refused|timeout)"
severity: ERROR
description: Caddy upstream/backend failure
- name: service_update
pattern: "(upgraded?|updated?|installing|dpkg|apt|package).*(caddy|nginx|apache|proxy)"
severity: INFO
description: Web server package update detected
- name: power_failure
pattern: "(power (fail|loss|outage|cut)|ups|battery|shutdown.*power|lost power)"
severity: CRITICAL
description: Power failure or UPS event
- name: network_interface
pattern: "(eth[0-9]|ens[0-9]|enp[0-9]|wlan[0-9]).*(down|up|carrier|link)"
severity: WARN
description: Network interface state change
- name: ip_change
pattern: "(new ip|ip.*(changed|assigned|address)|dhcp.*(ack|offer|bound|renew))"
severity: INFO
description: IP address change or DHCP event
# Add device/service-specific patterns below this line:
# - name: avcx_device_error
# pattern: "ERR-\d{4}"
# severity: ERROR
# description: AVCX device error code

View file

@ -0,0 +1,21 @@
"""CLI: build (or update) the FTS5 full-text search index after ingest."""
from __future__ import annotations
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from app.services.search import build_fts_index
if __name__ == "__main__":
db_path = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("data/turnstone.db")
if not db_path.exists():
print(f"ERROR: database not found: {db_path}", file=sys.stderr)
print("Run ingest first: python scripts/ingest_corpus.py", file=sys.stderr)
sys.exit(1)
print(f"Building FTS index for {db_path} ...")
build_fts_index(db_path)
print("Done.")

28
scripts/ingest_corpus.py Normal file
View file

@ -0,0 +1,28 @@
"""CLI: ingest a corpus directory into the Turnstone SQLite database."""
from __future__ import annotations
import logging
import sys
from pathlib import Path
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
# Allow running from repo root
sys.path.insert(0, str(Path(__file__).parent.parent))
from app.ingest.pipeline import ingest
if __name__ == "__main__":
corpus_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("corpus/raw")
db_path = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("data/turnstone.db")
pattern_file = Path("patterns/default.yaml")
db_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Ingesting {corpus_dir}{db_path}")
stats = ingest(corpus_dir, db_path, pattern_file)
total = sum(stats.values())
for fname, count in sorted(stats.items()):
print(f" {fname}: {count:,}")
print(f" TOTAL: {total:,} entries")

0
tests/__init__.py Normal file
View file