"""Journald JSON (-o json) log parser.""" from __future__ import annotations import json from typing import Iterator from app.ingest.base import ( SourceState, apply_patterns, epoch_micros_to_iso, make_entry_id, now_iso, SYSLOG_PRIORITY, ) from app.services.models import LogPattern, RetrievedEntry def _extract_message(raw: dict) -> str: msg = raw.get("MESSAGE", "") # journald encodes binary messages as arrays of ints if isinstance(msg, list): try: return bytes(msg).decode("utf-8", errors="replace") except Exception: return repr(msg) return str(msg) def parse( lines: Iterator[str], source_id: str, compiled_patterns: list[tuple[LogPattern, object]], ingest_time: str | None = None, ) -> Iterator[RetrievedEntry]: ingest_time = ingest_time or now_iso() state = SourceState() for raw_line in lines: raw_line = raw_line.strip() if not raw_line: continue try: entry = json.loads(raw_line) except json.JSONDecodeError: continue if "__REALTIME_TIMESTAMP" not in entry: continue text = _extract_message(entry) if not text: continue ts_raw = entry["__REALTIME_TIMESTAMP"] ts_iso = epoch_micros_to_iso(ts_raw) priority = entry.get("PRIORITY", "") severity = SYSLOG_PRIORITY.get(str(priority)) hostname = entry.get("_HOSTNAME", "") unit = entry.get("_SYSTEMD_UNIT") or entry.get("SYSLOG_IDENTIFIER", "") src = f"{source_id}:{hostname}:{unit}" if hostname else source_id repeat, out_of_order = state.observe(text, ts_iso) matched = apply_patterns(text, compiled_patterns) yield RetrievedEntry( entry_id=make_entry_id(src, state.sequence, text), source_id=src, sequence=state.sequence, timestamp_raw=ts_raw, timestamp_iso=ts_iso, ingest_time=ingest_time, severity=severity, repeat_count=repeat, out_of_order=out_of_order, matched_patterns=matched, text=text, )