turnstone/app/ingest/caddy.py
pyr0ball 3e6eabb7ce feat: initial Turnstone POC — ingest, FTS search, MCP server
Ingest pipeline (journald / Caddy / Docker-wrapped formats) with
per-source state tracking (repeat dedup, out-of-order detection),
named pattern tagging at ingest time, and idempotent SHA1-keyed writes.

FTS5 search layer with porter stemmer, severity/source/pattern/time
filters, and BM25 ranking. MCP server (FastMCP stdio) with three tools:
search_logs, diagnose, list_log_sources — compatible with both
Claude Code and Copilot CLI.

WAL mode enabled on all connections. FTS index auto-built after ingest.
MCP configs included for Claude Code (.mcp.json) and Copilot CLI
(.github/copilot/mcp.json).
2026-05-08 12:12:34 -07:00

85 lines
2.6 KiB
Python

"""Caddy structured JSON access log parser."""
from __future__ import annotations
import json
from typing import Iterator
from app.ingest.base import (
SourceState, apply_patterns, epoch_float_to_iso,
make_entry_id, now_iso,
)
from app.services.models import LogPattern, RetrievedEntry
_LEVEL_MAP = {"debug": "DEBUG", "info": "INFO", "warn": "WARN", "error": "ERROR"}
def _summarise(entry: dict) -> str:
"""Build a human-readable text representation of a Caddy log entry."""
msg = entry.get("msg", entry.get("message", ""))
req = entry.get("request", {})
if req:
method = req.get("method", "")
host = req.get("host", "")
uri = req.get("uri", "")
status = entry.get("status", "")
duration = entry.get("duration", "")
err = entry.get("error", "")
parts = [msg, f"{method} {host}{uri}" if method else "", f"status={status}" if status else ""]
if duration:
parts.append(f"duration={duration:.3f}s")
if err:
parts.append(f"error={err}")
return " ".join(p for p in parts if p)
# Non-access log entries (TLS, config, etc.)
err = entry.get("error", "")
return f"{msg} {err}".strip() if err else msg
def parse(
lines: Iterator[str],
source_id: str,
compiled_patterns: list[tuple[LogPattern, object]],
ingest_time: str | None = None,
) -> Iterator[RetrievedEntry]:
ingest_time = ingest_time or now_iso()
state = SourceState()
for raw_line in lines:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
entry = json.loads(raw_line)
except json.JSONDecodeError:
continue
if "ts" not in entry:
continue
ts_float = float(entry["ts"])
ts_iso = epoch_float_to_iso(ts_float)
ts_raw = str(entry["ts"])
level_raw = entry.get("level", "info")
severity = _LEVEL_MAP.get(level_raw.lower(), level_raw.upper())
text = _summarise(entry)
if not text:
continue
repeat, out_of_order = state.observe(text, ts_iso)
matched = apply_patterns(text, compiled_patterns)
yield RetrievedEntry(
entry_id=make_entry_id(source_id, state.sequence, text),
source_id=source_id,
sequence=state.sequence,
timestamp_raw=ts_raw,
timestamp_iso=ts_iso,
ingest_time=ingest_time,
severity=severity,
repeat_count=repeat,
out_of_order=out_of_order,
matched_patterns=matched,
text=text,
)