turnstone/app/glean/docker_log.py
pyr0ball aa80f307fe refactor: rename ingest → glean throughout codebase
Renames the app/ingest/ package to app/glean/ and updates all
references across Python modules, shell scripts, Vue components,
tests, and documentation.

Intentionally preserved:
- SQLite column name ingest_time (avoids schema migration)
- RetrievedEntry.ingest_time field (maps to the column above)
- Any public-facing JSON keys that reference ingest_time

Changes by category:
- app/ingest/ → app/glean/ (full package move, all parsers)
- app/tasks/ingest_scheduler.py → app/tasks/glean_scheduler.py
- scripts/ingest_corpus.py → scripts/glean_corpus.py
- tests/test_ingest_*.py → tests/test_glean_*.py
- Docstrings, log messages, comments: ingest → glean
- Env var: TURNSTONE_INGEST_INTERVAL → TURNSTONE_GLEAN_INTERVAL
- Shell scripts: glean.log, glean_corpus.py references
- README.md: multi-source ingest → multi-source glean
- .env.example: updated env var name
- patterns/: new diagnostic patterns from 2026-05-20 SSH incident
  (service_crash_loop, pkg_daemon_restart, ssh_forward_conflict)
- SourcesView.vue: pipeline label updated
- All test import paths updated to app.glean.*

285 tests passing.
2026-05-20 23:02:55 -07:00

66 lines
2 KiB
Python

"""Docker-wrapped log parser (Turnstone's custom corpus format)."""
from __future__ import annotations
import json
from typing import Iterator
from app.glean.base import (
SourceState, apply_patterns, detect_severity,
make_entry_id, now_iso,
)
from app.services.models import LogPattern, RetrievedEntry
def parse(
lines: Iterator[str],
source_id: str,
compiled_patterns: list[tuple[LogPattern, object]],
ingest_time: str | None = None,
) -> Iterator[RetrievedEntry]:
ingest_time = ingest_time or now_iso()
# One SourceState per container name
states: dict[str, SourceState] = {}
for raw_line in lines:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
entry = json.loads(raw_line)
except json.JSONDecodeError:
continue
src = entry.get("SOURCE", source_id)
text = entry.get("MESSAGE", "").strip()
if not text:
continue
# Try to parse inner JSON (Docker often logs JSON itself)
try:
inner = json.loads(text)
if isinstance(inner, dict):
text = inner.get("msg") or inner.get("message") or inner.get("MESSAGE") or text
except (json.JSONDecodeError, TypeError):
pass
if src not in states:
states[src] = SourceState()
state = states[src]
severity = detect_severity(text)
repeat, out_of_order = state.observe(text, None)
matched = apply_patterns(text, compiled_patterns)
yield RetrievedEntry(
entry_id=make_entry_id(src, state.sequence, text),
source_id=src,
sequence=state.sequence,
timestamp_raw=None,
timestamp_iso=None,
ingest_time=ingest_time,
severity=severity,
repeat_count=repeat,
out_of_order=False, # no timestamps to compare
matched_patterns=matched,
text=text,
)