107 lines
2.9 KiB
Python
107 lines
2.9 KiB
Python
"""Shared utilities for all Turnstone log ingestors."""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Iterator
|
|
|
|
import yaml
|
|
|
|
from app.services.models import LogPattern, RetrievedEntry
|
|
|
|
SYSLOG_PRIORITY: dict[str, str] = {
|
|
"0": "EMERGENCY", "1": "ALERT", "2": "CRITICAL",
|
|
"3": "ERROR", "4": "WARN", "5": "NOTICE",
|
|
"6": "INFO", "7": "DEBUG",
|
|
}
|
|
|
|
_SEVERITY_RE = re.compile(
|
|
r"\b(EMERGENCY|ALERT|CRITICAL|ERROR|WARN(?:ING)?|NOTICE|INFO|DEBUG)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def load_patterns(path: Path) -> list[LogPattern]:
|
|
if not path.exists():
|
|
return []
|
|
raw = yaml.safe_load(path.read_text())
|
|
return [
|
|
LogPattern(
|
|
name=p["name"],
|
|
pattern=p["pattern"],
|
|
severity=p["severity"],
|
|
description=p["description"],
|
|
)
|
|
for p in raw.get("patterns", [])
|
|
]
|
|
|
|
|
|
def _compile(patterns: list[LogPattern]) -> list[tuple[LogPattern, re.Pattern]]:
|
|
return [(p, re.compile(p.pattern, re.IGNORECASE)) for p in patterns]
|
|
|
|
|
|
def load_compiled_patterns(path: Path) -> list[tuple[LogPattern, object]]:
|
|
"""Load and compile patterns from a YAML file. Public API over the private _compile."""
|
|
return _compile(load_patterns(path))
|
|
|
|
|
|
def apply_patterns(
|
|
text: str,
|
|
compiled: list[tuple[LogPattern, re.Pattern]],
|
|
) -> tuple[str, ...]:
|
|
return tuple(p.name for p, rx in compiled if rx.search(text))
|
|
|
|
|
|
def detect_severity(text: str) -> str | None:
|
|
m = _SEVERITY_RE.search(text)
|
|
return m.group(0).upper() if m else None
|
|
|
|
|
|
def epoch_micros_to_iso(micros: str | int) -> str:
|
|
ts = int(micros) / 1_000_000
|
|
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
|
|
|
|
|
|
def epoch_float_to_iso(ts: float) -> str:
|
|
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
|
|
|
|
|
|
def now_iso() -> str:
|
|
return datetime.now(tz=timezone.utc).isoformat()
|
|
|
|
|
|
def make_entry_id(source_id: str, sequence: int, text: str) -> str:
|
|
key = f"{source_id}:{sequence}:{text[:64]}"
|
|
return hashlib.sha1(key.encode()).hexdigest()
|
|
|
|
|
|
class SourceState:
|
|
"""Tracks per-source state for repeat and out-of-order detection."""
|
|
|
|
def __init__(self) -> None:
|
|
self._last_text: str = ""
|
|
self._last_iso: str | None = None
|
|
self._repeat: int = 0
|
|
self.sequence: int = 0
|
|
|
|
def observe(
|
|
self, text: str, timestamp_iso: str | None
|
|
) -> tuple[int, bool]:
|
|
"""Return (repeat_count, out_of_order) for this entry."""
|
|
self.sequence += 1
|
|
|
|
if text == self._last_text:
|
|
self._repeat += 1
|
|
else:
|
|
self._repeat = 1
|
|
self._last_text = text
|
|
|
|
out_of_order = False
|
|
if timestamp_iso and self._last_iso:
|
|
out_of_order = timestamp_iso < self._last_iso
|
|
if timestamp_iso:
|
|
self._last_iso = timestamp_iso
|
|
|
|
return self._repeat, out_of_order
|