commit bbe4b1e36069d6799420ec0792bc9c272a6484c8 Author: pyr0ball Date: Fri May 8 12:12:34 2026 -0700 feat: initial Turnstone POC — ingest, FTS search, MCP server Ingest pipeline (journald / Caddy / Docker-wrapped formats) with per-source state tracking (repeat dedup, out-of-order detection), named pattern tagging at ingest time, and idempotent SHA1-keyed writes. FTS5 search layer with porter stemmer, severity/source/pattern/time filters, and BM25 ranking. MCP server (FastMCP stdio) with three tools: search_logs, diagnose, list_log_sources — compatible with both Claude Code and Copilot CLI. WAL mode enabled on all connections. FTS index auto-built after ingest. MCP configs included for Claude Code (.mcp.json) and Copilot CLI (.github/copilot/mcp.json). diff --git a/.github/copilot/mcp.json b/.github/copilot/mcp.json new file mode 100644 index 0000000..e6737de --- /dev/null +++ b/.github/copilot/mcp.json @@ -0,0 +1,15 @@ +{ + "servers": { + "turnstone": { + "type": "stdio", + "command": "conda", + "args": [ + "run", "--no-capture-output", "-n", "cf", + "python", "-m", "app.mcp_server" + ], + "env": { + "TURNSTONE_DB": "/Library/Development/CircuitForge/turnstone/data/turnstone.db" + } + } + } +} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fbee7af --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +data/ +corpus/raw/ +__pycache__/ +*.pyc +*.pyo +*.pyd +.env +*.db +*.db-wal +*.db-shm diff --git a/.mcp.json b/.mcp.json new file mode 100644 index 0000000..83f5eed --- /dev/null +++ b/.mcp.json @@ -0,0 +1,15 @@ +{ + "mcpServers": { + "turnstone": { + "command": "conda", + "args": [ + "run", "--no-capture-output", "-n", "cf", + "python", "-m", "app.mcp_server" + ], + "cwd": "/Library/Development/CircuitForge/turnstone", + "env": { + "TURNSTONE_DB": "/Library/Development/CircuitForge/turnstone/data/turnstone.db" + } + } + } +} diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/api/__init__.py b/app/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/ingest/__init__.py b/app/ingest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/ingest/base.py b/app/ingest/base.py new file mode 100644 index 0000000..9a9bcdc --- /dev/null +++ b/app/ingest/base.py @@ -0,0 +1,102 @@ +"""Shared utilities for all Turnstone log ingestors.""" +from __future__ import annotations + +import hashlib +import re +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterator + +import yaml + +from app.services.models import LogPattern, RetrievedEntry + +SYSLOG_PRIORITY: dict[str, str] = { + "0": "EMERGENCY", "1": "ALERT", "2": "CRITICAL", + "3": "ERROR", "4": "WARN", "5": "NOTICE", + "6": "INFO", "7": "DEBUG", +} + +_SEVERITY_RE = re.compile( + r"\b(EMERGENCY|ALERT|CRITICAL|ERROR|WARN(?:ING)?|NOTICE|INFO|DEBUG)\b", + re.IGNORECASE, +) + + +def load_patterns(path: Path) -> list[LogPattern]: + if not path.exists(): + return [] + raw = yaml.safe_load(path.read_text()) + return [ + LogPattern( + name=p["name"], + pattern=p["pattern"], + severity=p["severity"], + description=p["description"], + ) + for p in raw.get("patterns", []) + ] + + +def _compile(patterns: list[LogPattern]) -> list[tuple[LogPattern, re.Pattern]]: + return [(p, re.compile(p.pattern, re.IGNORECASE)) for p in patterns] + + +def apply_patterns( + text: str, + compiled: list[tuple[LogPattern, re.Pattern]], +) -> tuple[str, ...]: + return tuple(p.name for p, rx in compiled if rx.search(text)) + + +def detect_severity(text: str) -> str | None: + m = _SEVERITY_RE.search(text) + return m.group(0).upper() if m else None + + +def epoch_micros_to_iso(micros: str | int) -> str: + ts = int(micros) / 1_000_000 + return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() + + +def epoch_float_to_iso(ts: float) -> str: + return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() + + +def now_iso() -> str: + return datetime.now(tz=timezone.utc).isoformat() + + +def make_entry_id(source_id: str, sequence: int, text: str) -> str: + key = f"{source_id}:{sequence}:{text[:64]}" + return hashlib.sha1(key.encode()).hexdigest() + + +class SourceState: + """Tracks per-source state for repeat and out-of-order detection.""" + + def __init__(self) -> None: + self._last_text: str = "" + self._last_iso: str | None = None + self._repeat: int = 0 + self.sequence: int = 0 + + def observe( + self, text: str, timestamp_iso: str | None + ) -> tuple[int, bool]: + """Return (repeat_count, out_of_order) for this entry.""" + self.sequence += 1 + + if text == self._last_text: + self._repeat += 1 + else: + self._repeat = 1 + self._last_text = text + + out_of_order = False + if timestamp_iso and self._last_iso: + out_of_order = timestamp_iso < self._last_iso + if timestamp_iso: + self._last_iso = timestamp_iso + + return self._repeat, out_of_order diff --git a/app/ingest/caddy.py b/app/ingest/caddy.py new file mode 100644 index 0000000..0cf2319 --- /dev/null +++ b/app/ingest/caddy.py @@ -0,0 +1,85 @@ +"""Caddy structured JSON access log parser.""" +from __future__ import annotations + +import json +from typing import Iterator + +from app.ingest.base import ( + SourceState, apply_patterns, epoch_float_to_iso, + make_entry_id, now_iso, +) +from app.services.models import LogPattern, RetrievedEntry + +_LEVEL_MAP = {"debug": "DEBUG", "info": "INFO", "warn": "WARN", "error": "ERROR"} + + +def _summarise(entry: dict) -> str: + """Build a human-readable text representation of a Caddy log entry.""" + msg = entry.get("msg", entry.get("message", "")) + req = entry.get("request", {}) + if req: + method = req.get("method", "") + host = req.get("host", "") + uri = req.get("uri", "") + status = entry.get("status", "") + duration = entry.get("duration", "") + err = entry.get("error", "") + parts = [msg, f"{method} {host}{uri}" if method else "", f"status={status}" if status else ""] + if duration: + parts.append(f"duration={duration:.3f}s") + if err: + parts.append(f"error={err}") + return " ".join(p for p in parts if p) + # Non-access log entries (TLS, config, etc.) + err = entry.get("error", "") + return f"{msg} {err}".strip() if err else msg + + +def parse( + lines: Iterator[str], + source_id: str, + compiled_patterns: list[tuple[LogPattern, object]], + ingest_time: str | None = None, +) -> Iterator[RetrievedEntry]: + ingest_time = ingest_time or now_iso() + state = SourceState() + + for raw_line in lines: + raw_line = raw_line.strip() + if not raw_line: + continue + try: + entry = json.loads(raw_line) + except json.JSONDecodeError: + continue + + if "ts" not in entry: + continue + + ts_float = float(entry["ts"]) + ts_iso = epoch_float_to_iso(ts_float) + ts_raw = str(entry["ts"]) + + level_raw = entry.get("level", "info") + severity = _LEVEL_MAP.get(level_raw.lower(), level_raw.upper()) + + text = _summarise(entry) + if not text: + continue + + repeat, out_of_order = state.observe(text, ts_iso) + matched = apply_patterns(text, compiled_patterns) + + yield RetrievedEntry( + entry_id=make_entry_id(source_id, state.sequence, text), + source_id=source_id, + sequence=state.sequence, + timestamp_raw=ts_raw, + timestamp_iso=ts_iso, + ingest_time=ingest_time, + severity=severity, + repeat_count=repeat, + out_of_order=out_of_order, + matched_patterns=matched, + text=text, + ) diff --git a/app/ingest/docker_log.py b/app/ingest/docker_log.py new file mode 100644 index 0000000..c383571 --- /dev/null +++ b/app/ingest/docker_log.py @@ -0,0 +1,66 @@ +"""Docker-wrapped log parser (Turnstone's custom corpus format).""" +from __future__ import annotations + +import json +from typing import Iterator + +from app.ingest.base import ( + SourceState, apply_patterns, detect_severity, + make_entry_id, now_iso, +) +from app.services.models import LogPattern, RetrievedEntry + + +def parse( + lines: Iterator[str], + source_id: str, + compiled_patterns: list[tuple[LogPattern, object]], + ingest_time: str | None = None, +) -> Iterator[RetrievedEntry]: + ingest_time = ingest_time or now_iso() + # One SourceState per container name + states: dict[str, SourceState] = {} + + for raw_line in lines: + raw_line = raw_line.strip() + if not raw_line: + continue + try: + entry = json.loads(raw_line) + except json.JSONDecodeError: + continue + + src = entry.get("SOURCE", source_id) + text = entry.get("MESSAGE", "").strip() + if not text: + continue + + # Try to parse inner JSON (Docker often logs JSON itself) + try: + inner = json.loads(text) + if isinstance(inner, dict): + text = inner.get("msg") or inner.get("message") or inner.get("MESSAGE") or text + except (json.JSONDecodeError, TypeError): + pass + + if src not in states: + states[src] = SourceState() + state = states[src] + + severity = detect_severity(text) + repeat, out_of_order = state.observe(text, None) + matched = apply_patterns(text, compiled_patterns) + + yield RetrievedEntry( + entry_id=make_entry_id(src, state.sequence, text), + source_id=src, + sequence=state.sequence, + timestamp_raw=None, + timestamp_iso=None, + ingest_time=ingest_time, + severity=severity, + repeat_count=repeat, + out_of_order=False, # no timestamps to compare + matched_patterns=matched, + text=text, + ) diff --git a/app/ingest/journald.py b/app/ingest/journald.py new file mode 100644 index 0000000..220e9c8 --- /dev/null +++ b/app/ingest/journald.py @@ -0,0 +1,75 @@ +"""Journald JSON (-o json) log parser.""" +from __future__ import annotations + +import json +from typing import Iterator + +from app.ingest.base import ( + SourceState, apply_patterns, epoch_micros_to_iso, + make_entry_id, now_iso, SYSLOG_PRIORITY, +) +from app.services.models import LogPattern, RetrievedEntry + + +def _extract_message(raw: dict) -> str: + msg = raw.get("MESSAGE", "") + # journald encodes binary messages as arrays of ints + if isinstance(msg, list): + try: + return bytes(msg).decode("utf-8", errors="replace") + except Exception: + return repr(msg) + return str(msg) + + +def parse( + lines: Iterator[str], + source_id: str, + compiled_patterns: list[tuple[LogPattern, object]], + ingest_time: str | None = None, +) -> Iterator[RetrievedEntry]: + ingest_time = ingest_time or now_iso() + state = SourceState() + + for raw_line in lines: + raw_line = raw_line.strip() + if not raw_line: + continue + try: + entry = json.loads(raw_line) + except json.JSONDecodeError: + continue + + if "__REALTIME_TIMESTAMP" not in entry: + continue + + text = _extract_message(entry) + if not text: + continue + + ts_raw = entry["__REALTIME_TIMESTAMP"] + ts_iso = epoch_micros_to_iso(ts_raw) + + priority = entry.get("PRIORITY", "") + severity = SYSLOG_PRIORITY.get(str(priority)) + + hostname = entry.get("_HOSTNAME", "") + unit = entry.get("_SYSTEMD_UNIT") or entry.get("SYSLOG_IDENTIFIER", "") + src = f"{source_id}:{hostname}:{unit}" if hostname else source_id + + repeat, out_of_order = state.observe(text, ts_iso) + matched = apply_patterns(text, compiled_patterns) + + yield RetrievedEntry( + entry_id=make_entry_id(src, state.sequence, text), + source_id=src, + sequence=state.sequence, + timestamp_raw=ts_raw, + timestamp_iso=ts_iso, + ingest_time=ingest_time, + severity=severity, + repeat_count=repeat, + out_of_order=out_of_order, + matched_patterns=matched, + text=text, + ) diff --git a/app/ingest/pipeline.py b/app/ingest/pipeline.py new file mode 100644 index 0000000..ea25806 --- /dev/null +++ b/app/ingest/pipeline.py @@ -0,0 +1,146 @@ +"""Ingest pipeline: auto-detect format, parse, write to SQLite.""" +from __future__ import annotations + +import json +import logging +import re +import sqlite3 +from pathlib import Path +from typing import Iterator + +from app.ingest import caddy, docker_log, journald +from app.ingest.base import _compile, load_patterns, now_iso +from app.services.models import LogPattern, RetrievedEntry +from app.services.search import build_fts_index + +logger = logging.getLogger(__name__) + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS log_entries ( + id TEXT PRIMARY KEY, + source_id TEXT NOT NULL, + sequence INTEGER NOT NULL, + timestamp_raw TEXT, + timestamp_iso TEXT, + ingest_time TEXT NOT NULL, + severity TEXT, + repeat_count INTEGER DEFAULT 1, + out_of_order INTEGER DEFAULT 0, + matched_patterns TEXT DEFAULT '[]', + text TEXT NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id); +CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso); +CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity); +CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns); +""" + + +def _detect_format(first_line: str) -> str: + try: + obj = json.loads(first_line) + if "__REALTIME_TIMESTAMP" in obj: + return "journald" + if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"): + return "docker" + if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj): + return "caddy" + except (json.JSONDecodeError, AttributeError): + pass + return "unknown" + + +def _parse_file( + path: Path, + compiled: list[tuple[LogPattern, object]], + ingest_time: str, +) -> Iterator[RetrievedEntry]: + source_id = path.stem + + with path.open("r", errors="replace") as f: + lines = iter(f) + try: + first = next(lines) + except StopIteration: + return + + fmt = _detect_format(first.strip()) + logger.info("Detected format %r for %s", fmt, path.name) + + def all_lines(): + yield first + yield from lines + + if fmt == "journald": + yield from journald.parse(all_lines(), source_id, compiled, ingest_time) + elif fmt == "docker": + yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time) + elif fmt == "caddy": + yield from caddy.parse(all_lines(), source_id, compiled, ingest_time) + else: + logger.warning("Unknown format in %s — skipping", path.name) + + +def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None: + conn.executemany( + """ + INSERT OR IGNORE INTO log_entries + (id, source_id, sequence, timestamp_raw, timestamp_iso, + ingest_time, severity, repeat_count, out_of_order, + matched_patterns, text) + VALUES (?,?,?,?,?,?,?,?,?,?,?) + """, + [ + ( + e.entry_id, e.source_id, e.sequence, + e.timestamp_raw, e.timestamp_iso, e.ingest_time, + e.severity, e.repeat_count, int(e.out_of_order), + json.dumps(list(e.matched_patterns)), e.text, + ) + for e in batch + ], + ) + + +def ingest( + corpus_dir: Path, + db_path: Path, + pattern_file: Path | None = None, + batch_size: int = 1000, +) -> dict[str, int]: + pattern_file = pattern_file or Path("patterns/default.yaml") + patterns = load_patterns(pattern_file) + compiled = _compile(patterns) + ingest_time = now_iso() + + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + conn.executescript(_SCHEMA) + conn.commit() + + stats: dict[str, int] = {} + + for jsonl_file in sorted(corpus_dir.glob("*.jsonl")): + count = 0 + batch: list[RetrievedEntry] = [] + for entry in _parse_file(jsonl_file, compiled, ingest_time): + batch.append(entry) + if len(batch) >= batch_size: + _write_batch(conn, batch) + conn.commit() + count += len(batch) + batch.clear() + if batch: + _write_batch(conn, batch) + conn.commit() + count += len(batch) + stats[jsonl_file.name] = count + logger.info("Ingested %d entries from %s", count, jsonl_file.name) + + conn.close() + + logger.info("Building FTS index...") + build_fts_index(db_path) + logger.info("FTS index ready") + + return stats diff --git a/app/mcp_server.py b/app/mcp_server.py new file mode 100644 index 0000000..3ba8d6a --- /dev/null +++ b/app/mcp_server.py @@ -0,0 +1,198 @@ +"""Turnstone MCP server — log search and diagnostics via stdio transport. + +Works with both Claude Code and Copilot CLI (both use MCP stdio). + +Run directly: python -m app.mcp_server +Or via: python app/mcp_server.py + +Configure TURNSTONE_DB env var to override the default DB path. +""" +from __future__ import annotations + +import logging +import os +import sqlite3 +import sys +from pathlib import Path + +from mcp.server.fastmcp import FastMCP + +# All logging must go to stderr — stdout is reserved for MCP JSON-RPC +logging.basicConfig( + stream=sys.stderr, + level=logging.INFO, + format="%(levelname)s %(message)s", +) +logger = logging.getLogger(__name__) + +# Allow running as a script from the project root +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from app.services.search import ( + build_fts_index, + format_results, + list_sources, + search, +) + +_DEFAULT_DB = Path(__file__).parent.parent / "data" / "turnstone.db" +DB_PATH = Path(os.environ.get("TURNSTONE_DB", _DEFAULT_DB)) + +mcp = FastMCP( + "turnstone", + instructions=( + "Turnstone is a diagnostic intelligence layer for server, service, and device logs. " + "Call list_log_sources first to understand what data is available and identify " + "source_id values for filtering. Use search_logs for targeted FTS queries. " + "Use diagnose for symptom-driven investigation — it runs layered searches and " + "returns evidence ranked by severity and relevance." + ), +) + +_index_ready = False + + +def _ensure_index() -> None: + """Build FTS index on first use; skip if already present.""" + global _index_ready + if _index_ready: + return + + try: + conn = sqlite3.connect(str(DB_PATH)) + count = conn.execute("SELECT COUNT(*) FROM log_fts").fetchone()[0] + conn.close() + if count > 0: + _index_ready = True + logger.info("FTS index present (%d entries)", count) + return + except sqlite3.OperationalError: + pass + + logger.info("Building FTS index for %s — this may take a minute...", DB_PATH) + build_fts_index(DB_PATH) + _index_ready = True + logger.info("FTS index ready") + + +@mcp.tool() +def search_logs( + query: str, + severity: str | None = None, + source: str | None = None, + pattern: str | None = None, + since: str | None = None, + until: str | None = None, + limit: int = 20, + include_repeats: bool = False, +) -> str: + """Search log entries using full-text search with optional filters. + + Args: + query: FTS5 search expression. Supports AND, OR, NOT, phrase quotes, prefix*. + Example: '"connection refused" OR "connection lost"' + severity: Filter by level — EMERGENCY, ALERT, CRITICAL, ERROR, WARN, NOTICE, INFO, DEBUG. + source: Partial match on source_id. Format is 'corpus:host:service'. + Example: 'example-node:caddy' matches all Caddy entries from example-node. + pattern: Filter by named pattern tag applied at ingest time. + Known tags: auth_failure, connection_lost, oom, segfault, disk_full, + timeout, caddy_tls_error, caddy_config_error, caddy_auth_error, + caddy_upstream_error, service_restart, service_update, + power_failure, network_interface, ip_change. + since: Earliest timestamp, ISO 8601 (e.g. 2026-05-06T18:00:00). + until: Latest timestamp, ISO 8601. + limit: Result count cap (default 20, max 100). + include_repeats: Include deduplicated repeat log lines (default False). + + Returns: + Formatted log entries ranked by FTS relevance, or a no-results message. + """ + _ensure_index() + results = search( + DB_PATH, + query=query, + severity=severity, + source_filter=source, + pattern_filter=pattern, + since=since, + until=until, + limit=min(limit, 100), + include_repeats=include_repeats, + ) + return format_results(results) + + +@mcp.tool() +def diagnose( + symptom: str, + source: str | None = None, + since: str | None = None, + until: str | None = None, +) -> str: + """Investigate a symptom or problem description across the log corpus. + + Runs layered searches — broad relevance first, then CRITICAL/ERROR filtered — + and merges results deduplicated by entry ID, sorted chronologically. + Use this for open-ended questions like "why did auth break?" or + "what happened during the network outage?". + + Args: + symptom: Plain-language description of the problem or symptom. + source: Limit to a host or service (partial match on source_id). + since: Earliest timestamp, ISO 8601. + until: Latest timestamp, ISO 8601. + + Returns: + Consolidated log evidence sorted by time, formatted for analysis. + """ + _ensure_index() + + common = dict(source_filter=source, since=since, until=until, include_repeats=False) + + broad = search(DB_PATH, query=symptom, limit=15, **common) + critical = search(DB_PATH, query=symptom, severity="CRITICAL", limit=5, **common) + errors = search(DB_PATH, query=symptom, severity="ERROR", limit=10, **common) + + seen: set[str] = set() + combined = [] + for r in broad + critical + errors: + if r.entry_id not in seen: + seen.add(r.entry_id) + combined.append(r) + + combined.sort(key=lambda r: (r.timestamp_iso or "\xff", r.sequence)) + return format_results(combined[:20]) + + +@mcp.tool() +def list_log_sources() -> str: + """List all log sources in the corpus with entry counts and time ranges. + + Call this before searching to understand what data is available and to + identify source_id values for use in the source= filter parameter. + + Returns: + Each source on its own block with entry count, error count, and time range. + """ + sources = list_sources(DB_PATH) + if not sources: + return "No log sources found. Has the corpus been ingested? Run: python scripts/ingest_corpus.py" + + lines = [f"Corpus: {DB_PATH}", f"Sources ({len(sources)} total):\n"] + for s in sources: + err_note = f" — {s['error_count']:,} errors" if s["error_count"] else "" + lines.append( + f" {s['source_id']}\n" + f" entries: {s['entry_count']:,}{err_note}\n" + f" range: {s['earliest'] or 'unknown'} → {s['latest'] or 'unknown'}" + ) + return "\n".join(lines) + + +if __name__ == "__main__": + if not DB_PATH.exists(): + logger.error("Database not found: %s", DB_PATH) + logger.error("Run: python scripts/ingest_corpus.py ") + sys.exit(1) + logger.info("Starting Turnstone MCP server (DB: %s)", DB_PATH) + mcp.run() diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/models.py b/app/services/models.py new file mode 100644 index 0000000..89eb455 --- /dev/null +++ b/app/services/models.py @@ -0,0 +1,33 @@ +"""Core data models for Turnstone log retrieval.""" +from __future__ import annotations + +from dataclasses import dataclass, field + + +@dataclass(frozen=True) +class RetrievedEntry: + """A log entry returned by the retriever, with source metadata and scores.""" + + entry_id: str + source_id: str # log file path or service name + sequence: int # original line number — ingest order, not wall-clock order + timestamp_raw: str | None # timestamp as it appeared in the log + timestamp_iso: str | None # parsed to ISO 8601 for sorting; None if unparseable + ingest_time: str # when Turnstone indexed this entry (wall clock) + severity: str | None # ERROR / WARN / INFO / DEBUG / None if not detected + repeat_count: int # collapsed duplicate count (1 = unique) + out_of_order: bool # True when timestamp precedes predecessor's timestamp + matched_patterns: tuple[str, ...] = field(default_factory=tuple) # named pattern hits + text: str = "" + bm25_score: float = 0.0 + vector_score: float | None = None + + +@dataclass(frozen=True) +class LogPattern: + """A named regex pattern for tagging entries at ingest time.""" + + name: str # e.g. "device_disconnect", "auth_failure" + pattern: str # regex string + severity: str # suggested severity if not present in log line + description: str # human-readable explanation for the UI diff --git a/app/services/search.py b/app/services/search.py new file mode 100644 index 0000000..0b3772e --- /dev/null +++ b/app/services/search.py @@ -0,0 +1,200 @@ +"""FTS5-based log search with severity, source, and pattern filters.""" +from __future__ import annotations + +import json +import logging +import sqlite3 +from dataclasses import dataclass +from pathlib import Path + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class SearchResult: + entry_id: str + source_id: str + sequence: int + timestamp_iso: str | None + severity: str | None + repeat_count: int + out_of_order: bool + matched_patterns: list[str] + text: str + rank: float + + +def build_fts_index(db_path: Path) -> None: + """Build (or rebuild) the FTS5 index from log_entries. Safe to re-run. + + Drops and recreates the table if the schema is stale (missing sequence column). + """ + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + + # Check whether existing table has the sequence column; rebuild if not. + needs_rebuild = False + try: + conn.execute("SELECT sequence FROM log_fts LIMIT 0") + except sqlite3.OperationalError: + needs_rebuild = True + + if needs_rebuild: + conn.execute("DROP TABLE IF EXISTS log_fts") + + conn.executescript(""" + CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5( + text, + entry_id UNINDEXED, + source_id UNINDEXED, + sequence UNINDEXED, + severity UNINDEXED, + timestamp_iso UNINDEXED, + matched_patterns UNINDEXED, + repeat_count UNINDEXED, + out_of_order UNINDEXED, + tokenize = 'porter ascii' + ); + """) + # Only insert rows not already indexed + conn.execute(""" + INSERT INTO log_fts(text, entry_id, source_id, sequence, severity, + timestamp_iso, matched_patterns, + repeat_count, out_of_order) + SELECT e.text, e.id, e.source_id, e.sequence, e.severity, + e.timestamp_iso, e.matched_patterns, + e.repeat_count, e.out_of_order + FROM log_entries e + WHERE e.id NOT IN (SELECT entry_id FROM log_fts WHERE entry_id IS NOT NULL) + """) + conn.commit() + conn.close() + + +def search( + db_path: Path, + query: str, + severity: str | None = None, + source_filter: str | None = None, + pattern_filter: str | None = None, + since: str | None = None, + until: str | None = None, + limit: int = 20, + include_repeats: bool = False, +) -> list[SearchResult]: + """Full-text search with optional filters. Returns results ranked by relevance.""" + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + conn.row_factory = sqlite3.Row + + conditions = ["log_fts MATCH ?"] + params: list = [query] + + if severity: + conditions.append("severity = ?") + params.append(severity.upper()) + if source_filter: + conditions.append("source_id LIKE ?") + params.append(f"%{source_filter}%") + if pattern_filter: + conditions.append("matched_patterns LIKE ?") + params.append(f'%"{pattern_filter}"%') + if since: + conditions.append("timestamp_iso >= ?") + params.append(since) + if until: + conditions.append("timestamp_iso <= ?") + params.append(until) + if not include_repeats: + conditions.append("repeat_count = 1") + + where = " AND ".join(conditions) + params.append(limit) + + try: + rows = conn.execute( + f""" + SELECT entry_id, source_id, sequence, timestamp_iso, severity, + repeat_count, out_of_order, matched_patterns, text, rank + FROM log_fts + WHERE {where} + ORDER BY rank + LIMIT ? + """, + params, + ).fetchall() + except sqlite3.OperationalError as e: + logger.warning("FTS query failed (%s) — index may not be built yet", e) + conn.close() + return [] + + results = [ + SearchResult( + entry_id=r["entry_id"], + source_id=r["source_id"], + sequence=r["sequence"], + timestamp_iso=r["timestamp_iso"], + severity=r["severity"], + repeat_count=r["repeat_count"], + out_of_order=bool(r["out_of_order"]), + matched_patterns=json.loads(r["matched_patterns"] or "[]"), + text=r["text"], + rank=r["rank"], + ) + for r in rows + ] + conn.close() + return results + + +def list_sources(db_path: Path) -> list[dict]: + """Return distinct sources with entry counts and time ranges.""" + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + rows = conn.execute(""" + SELECT + source_id, + COUNT(*) as entry_count, + MIN(timestamp_iso) as earliest, + MAX(timestamp_iso) as latest, + SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) as error_count + FROM log_entries + GROUP BY source_id + ORDER BY entry_count DESC + """).fetchall() + conn.close() + return [ + { + "source_id": r[0], + "entry_count": r[1], + "earliest": r[2], + "latest": r[3], + "error_count": r[4], + } + for r in rows + ] + + +def format_results(results: list[SearchResult], max_text: int = 300) -> str: + """Format search results as readable text for LLM context.""" + if not results: + return "No matching log entries found." + + lines = [] + for r in results: + ts = r.timestamp_iso or "no-timestamp" + sev = r.severity or "?" + src = r.source_id + flags = [] + if r.repeat_count > 1: + flags.append(f"repeat×{r.repeat_count}") + if r.out_of_order: + flags.append("out-of-order") + if r.matched_patterns: + flags.append(f"[{', '.join(r.matched_patterns)}]") + flag_str = f" {' '.join(flags)}" if flags else "" + + text = r.text[:max_text] + ("…" if len(r.text) > max_text else "") + lines.append(f"[{ts} | {src} | {sev}{flag_str}]\n{text}") + + return "\n\n".join(lines) diff --git a/patterns/default.yaml b/patterns/default.yaml new file mode 100644 index 0000000..3c3d0d4 --- /dev/null +++ b/patterns/default.yaml @@ -0,0 +1,88 @@ +# Turnstone pattern library — named regex patterns for log tagging at ingest time. +# Each matched pattern name is stored on RetrievedEntry.matched_patterns and +# used to boost retrieval relevance for diagnostic queries. +# +# Add domain-specific patterns here. Patterns are applied in order; multiple +# can match a single entry. + +patterns: + - name: service_restart + pattern: "(restarting|restart requested|service.*start)" + severity: WARN + description: Service restart detected + + - name: connection_lost + pattern: "(connection (lost|dropped|refused|timed? out)|disconnect(ed)?)" + severity: ERROR + description: Network or device connection failure + + - name: auth_failure + pattern: "(auth(entication)? (failed?|error|denied)|permission denied|unauthorized)" + severity: ERROR + description: Authentication or authorization failure + + - name: oom + pattern: "(out of memory|OOM|killed process|cannot allocate)" + severity: CRITICAL + description: Out-of-memory condition + + - name: segfault + pattern: "(segmentation fault|segfault|SIGSEGV|core dump)" + severity: CRITICAL + description: Process crash or memory corruption + + - name: disk_full + pattern: "(no space left|disk full|filesystem.*full|ENOSPC)" + severity: ERROR + description: Storage capacity exhausted + + - name: timeout + pattern: "(timed? out|deadline exceeded|operation timed?)" + severity: WARN + description: Operation timeout + + - name: caddy_tls_error + pattern: "(acme|certificate|tls).*(error|fail|invalid|expired|renew)" + severity: ERROR + description: Caddy TLS or certificate error + + - name: caddy_config_error + pattern: "(config|caddyfile|directive).*(error|invalid|unknown|unrecognized)" + severity: ERROR + description: Caddy configuration error + + - name: caddy_auth_error + pattern: "(forward_auth|basicauth|basic_auth).*(error|fail|denied|invalid|unreachable)" + severity: ERROR + description: Caddy authentication middleware failure + + - name: caddy_upstream_error + pattern: "(upstream|backend|reverse.proxy).*(error|fail|unreachable|refused|timeout)" + severity: ERROR + description: Caddy upstream/backend failure + + - name: service_update + pattern: "(upgraded?|updated?|installing|dpkg|apt|package).*(caddy|nginx|apache|proxy)" + severity: INFO + description: Web server package update detected + + - name: power_failure + pattern: "(power (fail|loss|outage|cut)|ups|battery|shutdown.*power|lost power)" + severity: CRITICAL + description: Power failure or UPS event + + - name: network_interface + pattern: "(eth[0-9]|ens[0-9]|enp[0-9]|wlan[0-9]).*(down|up|carrier|link)" + severity: WARN + description: Network interface state change + + - name: ip_change + pattern: "(new ip|ip.*(changed|assigned|address)|dhcp.*(ack|offer|bound|renew))" + severity: INFO + description: IP address change or DHCP event + + # Add device/service-specific patterns below this line: + # - name: ext_device_device_error + # pattern: "ERR-\d{4}" + # severity: ERROR + # description: EXT_DEVICE device error code diff --git a/scripts/build_fts_index.py b/scripts/build_fts_index.py new file mode 100644 index 0000000..d0d4677 --- /dev/null +++ b/scripts/build_fts_index.py @@ -0,0 +1,21 @@ +"""CLI: build (or update) the FTS5 full-text search index after ingest.""" +from __future__ import annotations + +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from app.services.search import build_fts_index + +if __name__ == "__main__": + db_path = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("data/turnstone.db") + + if not db_path.exists(): + print(f"ERROR: database not found: {db_path}", file=sys.stderr) + print("Run ingest first: python scripts/ingest_corpus.py", file=sys.stderr) + sys.exit(1) + + print(f"Building FTS index for {db_path} ...") + build_fts_index(db_path) + print("Done.") diff --git a/scripts/ingest_corpus.py b/scripts/ingest_corpus.py new file mode 100644 index 0000000..d39da6a --- /dev/null +++ b/scripts/ingest_corpus.py @@ -0,0 +1,28 @@ +"""CLI: ingest a corpus directory into the Turnstone SQLite database.""" +from __future__ import annotations + +import logging +import sys +from pathlib import Path + +logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") + +# Allow running from repo root +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from app.ingest.pipeline import ingest + +if __name__ == "__main__": + corpus_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("corpus/raw") + db_path = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("data/turnstone.db") + pattern_file = Path("patterns/default.yaml") + + db_path.parent.mkdir(parents=True, exist_ok=True) + + print(f"Ingesting {corpus_dir} → {db_path}") + stats = ingest(corpus_dir, db_path, pattern_file) + + total = sum(stats.values()) + for fname, count in sorted(stats.items()): + print(f" {fname}: {count:,}") + print(f" TOTAL: {total:,} entries") diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29