"""Docker-wrapped log parser (Turnstone's custom corpus format).""" from __future__ import annotations import json from typing import Iterator from app.ingest.base import ( SourceState, apply_patterns, detect_severity, make_entry_id, now_iso, ) from app.services.models import LogPattern, RetrievedEntry def parse( lines: Iterator[str], source_id: str, compiled_patterns: list[tuple[LogPattern, object]], ingest_time: str | None = None, ) -> Iterator[RetrievedEntry]: ingest_time = ingest_time or now_iso() # One SourceState per container name states: dict[str, SourceState] = {} for raw_line in lines: raw_line = raw_line.strip() if not raw_line: continue try: entry = json.loads(raw_line) except json.JSONDecodeError: continue src = entry.get("SOURCE", source_id) text = entry.get("MESSAGE", "").strip() if not text: continue # Try to parse inner JSON (Docker often logs JSON itself) try: inner = json.loads(text) if isinstance(inner, dict): text = inner.get("msg") or inner.get("message") or inner.get("MESSAGE") or text except (json.JSONDecodeError, TypeError): pass if src not in states: states[src] = SourceState() state = states[src] severity = detect_severity(text) repeat, out_of_order = state.observe(text, None) matched = apply_patterns(text, compiled_patterns) yield RetrievedEntry( entry_id=make_entry_id(src, state.sequence, text), source_id=src, sequence=state.sequence, timestamp_raw=None, timestamp_iso=None, ingest_time=ingest_time, severity=severity, repeat_count=repeat, out_of_order=False, # no timestamps to compare matched_patterns=matched, text=text, )