Renames the app/ingest/ package to app/glean/ and updates all references across Python modules, shell scripts, Vue components, tests, and documentation. Intentionally preserved: - SQLite column name ingest_time (avoids schema migration) - RetrievedEntry.ingest_time field (maps to the column above) - Any public-facing JSON keys that reference ingest_time Changes by category: - app/ingest/ → app/glean/ (full package move, all parsers) - app/tasks/ingest_scheduler.py → app/tasks/glean_scheduler.py - scripts/ingest_corpus.py → scripts/glean_corpus.py - tests/test_ingest_*.py → tests/test_glean_*.py - Docstrings, log messages, comments: ingest → glean - Env var: TURNSTONE_INGEST_INTERVAL → TURNSTONE_GLEAN_INTERVAL - Shell scripts: glean.log, glean_corpus.py references - README.md: multi-source ingest → multi-source glean - .env.example: updated env var name - patterns/: new diagnostic patterns from 2026-05-20 SSH incident (service_crash_loop, pkg_daemon_restart, ssh_forward_conflict) - SourcesView.vue: pipeline label updated - All test import paths updated to app.glean.* 285 tests passing.
85 lines
2.6 KiB
Python
85 lines
2.6 KiB
Python
"""Caddy structured JSON access log parser."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Iterator
|
|
|
|
from app.glean.base import (
|
|
SourceState, apply_patterns, epoch_float_to_iso,
|
|
make_entry_id, now_iso,
|
|
)
|
|
from app.services.models import LogPattern, RetrievedEntry
|
|
|
|
_LEVEL_MAP = {"debug": "DEBUG", "info": "INFO", "warn": "WARN", "error": "ERROR"}
|
|
|
|
|
|
def _summarise(entry: dict) -> str:
|
|
"""Build a human-readable text representation of a Caddy log entry."""
|
|
msg = entry.get("msg", entry.get("message", ""))
|
|
req = entry.get("request", {})
|
|
if req:
|
|
method = req.get("method", "")
|
|
host = req.get("host", "")
|
|
uri = req.get("uri", "")
|
|
status = entry.get("status", "")
|
|
duration = entry.get("duration", "")
|
|
err = entry.get("error", "")
|
|
parts = [msg, f"{method} {host}{uri}" if method else "", f"status={status}" if status else ""]
|
|
if duration:
|
|
parts.append(f"duration={duration:.3f}s")
|
|
if err:
|
|
parts.append(f"error={err}")
|
|
return " ".join(p for p in parts if p)
|
|
# Non-access log entries (TLS, config, etc.)
|
|
err = entry.get("error", "")
|
|
return f"{msg} {err}".strip() if err else msg
|
|
|
|
|
|
def parse(
|
|
lines: Iterator[str],
|
|
source_id: str,
|
|
compiled_patterns: list[tuple[LogPattern, object]],
|
|
ingest_time: str | None = None,
|
|
) -> Iterator[RetrievedEntry]:
|
|
ingest_time = ingest_time or now_iso()
|
|
state = SourceState()
|
|
|
|
for raw_line in lines:
|
|
raw_line = raw_line.strip()
|
|
if not raw_line:
|
|
continue
|
|
try:
|
|
entry = json.loads(raw_line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if "ts" not in entry:
|
|
continue
|
|
|
|
ts_float = float(entry["ts"])
|
|
ts_iso = epoch_float_to_iso(ts_float)
|
|
ts_raw = str(entry["ts"])
|
|
|
|
level_raw = entry.get("level", "info")
|
|
severity = _LEVEL_MAP.get(level_raw.lower(), level_raw.upper())
|
|
|
|
text = _summarise(entry)
|
|
if not text:
|
|
continue
|
|
|
|
repeat, out_of_order = state.observe(text, ts_iso)
|
|
matched = apply_patterns(text, compiled_patterns)
|
|
|
|
yield RetrievedEntry(
|
|
entry_id=make_entry_id(source_id, state.sequence, text),
|
|
source_id=source_id,
|
|
sequence=state.sequence,
|
|
timestamp_raw=ts_raw,
|
|
timestamp_iso=ts_iso,
|
|
ingest_time=ingest_time,
|
|
severity=severity,
|
|
repeat_count=repeat,
|
|
out_of_order=out_of_order,
|
|
matched_patterns=matched,
|
|
text=text,
|
|
)
|