"""Shared utilities for all Turnstone log ingestors.""" from __future__ import annotations import hashlib import re from datetime import datetime, timezone from pathlib import Path from typing import Iterator import yaml from app.services.models import LogPattern, RetrievedEntry SYSLOG_PRIORITY: dict[str, str] = { "0": "EMERGENCY", "1": "ALERT", "2": "CRITICAL", "3": "ERROR", "4": "WARN", "5": "NOTICE", "6": "INFO", "7": "DEBUG", } _SEVERITY_RE = re.compile( r"\b(EMERGENCY|ALERT|CRITICAL|ERROR|WARN(?:ING)?|NOTICE|INFO|DEBUG)\b", re.IGNORECASE, ) def load_patterns(path: Path) -> list[LogPattern]: if not path.exists(): return [] raw = yaml.safe_load(path.read_text()) return [ LogPattern( name=p["name"], pattern=p["pattern"], severity=p["severity"], description=p["description"], ) for p in raw.get("patterns", []) ] def _compile(patterns: list[LogPattern]) -> list[tuple[LogPattern, re.Pattern]]: return [(p, re.compile(p.pattern, re.IGNORECASE)) for p in patterns] def load_compiled_patterns(path: Path) -> list[tuple[LogPattern, object]]: """Load and compile patterns from a YAML file. Public API over the private _compile.""" return _compile(load_patterns(path)) def apply_patterns( text: str, compiled: list[tuple[LogPattern, re.Pattern]], ) -> tuple[str, ...]: return tuple(p.name for p, rx in compiled if rx.search(text)) def detect_severity(text: str) -> str | None: m = _SEVERITY_RE.search(text) return m.group(0).upper() if m else None def epoch_micros_to_iso(micros: str | int) -> str: ts = int(micros) / 1_000_000 return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() def epoch_float_to_iso(ts: float) -> str: return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() def now_iso() -> str: return datetime.now(tz=timezone.utc).isoformat() def make_entry_id(source_id: str, sequence: int, text: str) -> str: key = f"{source_id}:{sequence}:{text[:64]}" return hashlib.sha1(key.encode()).hexdigest() class SourceState: """Tracks per-source state for repeat and out-of-order detection.""" def __init__(self) -> None: self._last_text: str = "" self._last_iso: str | None = None self._repeat: int = 0 self.sequence: int = 0 def observe( self, text: str, timestamp_iso: str | None ) -> tuple[int, bool]: """Return (repeat_count, out_of_order) for this entry.""" self.sequence += 1 if text == self._last_text: self._repeat += 1 else: self._repeat = 1 self._last_text = text out_of_order = False if timestamp_iso and self._last_iso: out_of_order = timestamp_iso < self._last_iso if timestamp_iso: self._last_iso = timestamp_iso return self._repeat, out_of_order