"""Stage 1: Timeline Reconstructor — pure Python, no ML.""" from __future__ import annotations import hashlib import logging from collections import defaultdict from datetime import datetime, timezone from app.services.diagnose.models import EventCluster, TimelineResult from app.services.search import SearchResult logger = logging.getLogger(__name__) _SEVERITY_ORDER: dict[str | None, int] = { "CRITICAL": 5, "ERROR": 4, "WARN": 3, "WARNING": 3, "INFO": 2, "DEBUG": 1, None: 0, } def _parse_iso(s: str) -> datetime: """Parse ISO 8601 string to UTC-aware datetime.""" dt = datetime.fromisoformat(s) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) def _highest_severity(entries: list[SearchResult]) -> str: """Return the highest severity label across all entries.""" best: str | None = None best_rank = -1 for entry in entries: sev = entry.severity rank = _SEVERITY_ORDER.get(sev, 0) if rank > best_rank: best_rank = rank best = sev # SeverityLabel requires a valid literal; fall back to "UNKNOWN" if None if best is None: return "UNKNOWN" # Normalise WARNING -> WARN for the output type if best == "WARNING": return "WARN" return best def _representative_text(entries: list[SearchResult]) -> str: """Return text of the entry with highest rank; tie-break on longest text.""" if not entries: return "" best = max(entries, key=lambda e: (e.rank, len(e.text))) return best.text def _cluster_id(entry_ids: list[str]) -> str: """Compute a 12-char hex cluster ID from a sorted list of entry IDs.""" payload = ",".join(sorted(entry_ids)).encode() return hashlib.sha1(payload).hexdigest()[:12] # noqa: S324 — not used for security def _build_cluster( cluster_entries: list[SearchResult], gap_before_seconds: float, burst_threshold: int, burst_window_seconds: int, ) -> EventCluster: """Construct an EventCluster from a list of SearchResult entries.""" timestamps = [ _parse_iso(e.timestamp_iso) for e in cluster_entries if e.timestamp_iso is not None ] start_iso: str | None = None end_iso: str | None = None duration_seconds = 0.0 if timestamps: ts_min = min(timestamps) ts_max = max(timestamps) start_iso = ts_min.isoformat() end_iso = ts_max.isoformat() duration_seconds = (ts_max - ts_min).total_seconds() entry_ids = [e.entry_id for e in cluster_entries] burst = ( len(cluster_entries) >= burst_threshold and duration_seconds <= burst_window_seconds ) return EventCluster( cluster_id=_cluster_id(entry_ids), entries=tuple(entry_ids), start_iso=start_iso, end_iso=end_iso, duration_seconds=duration_seconds, source_ids=tuple(sorted(set(e.source_id for e in cluster_entries))), pattern_tags=tuple( sorted(set(tag for e in cluster_entries for tag in e.matched_patterns)) ), severity=_highest_severity(cluster_entries), # type: ignore[arg-type] burst=burst, gap_before_seconds=gap_before_seconds, representative_text=_representative_text(cluster_entries), ) class TimelineReconstructor: """Reconstruct a structured timeline of event clusters from log entries. Pure Python — no ML or LLM calls. Designed as Stage 1 of the multi-agent diagnose pipeline. """ def __init__( self, cluster_window_seconds: int = 30, burst_threshold: int = 10, burst_window_seconds: int = 5, ) -> None: self._cluster_window = cluster_window_seconds self._burst_threshold = burst_threshold self._burst_window = burst_window_seconds def reconstruct(self, entries: list[SearchResult]) -> TimelineResult: """Build a TimelineResult from a flat list of SearchResult entries. Entries are sorted by timestamp_iso ascending; entries without a timestamp are appended at the end and always join the current cluster. """ if not entries: return TimelineResult( clusters=(), total_entries=0, window_start=None, window_end=None, gap_count=0, burst_count=0, dominant_sources=(), ) # Sort: timestamped entries first (ascending), then None-timestamp entries def _sort_key(e: SearchResult) -> tuple[int, str]: if e.timestamp_iso is None: return (1, "") return (0, e.timestamp_iso) sorted_entries = sorted(entries, key=_sort_key) # Cluster using sliding window anchored to the first entry in each cluster raw_clusters: list[list[SearchResult]] = [] current: list[SearchResult] = [] cluster_anchor: datetime | None = None for entry in sorted_entries: if not current: # Start first cluster current.append(entry) if entry.timestamp_iso is not None: cluster_anchor = _parse_iso(entry.timestamp_iso) continue if entry.timestamp_iso is None: # No timestamp — always joins the current cluster current.append(entry) continue entry_dt = _parse_iso(entry.timestamp_iso) if cluster_anchor is None: # Current cluster has no anchor yet — set it, stay in cluster cluster_anchor = entry_dt current.append(entry) continue delta = (entry_dt - cluster_anchor).total_seconds() if delta > self._cluster_window: # Start a new cluster raw_clusters.append(current) current = [entry] cluster_anchor = entry_dt else: current.append(entry) if current: raw_clusters.append(current) # Build EventCluster objects clusters: list[EventCluster] = [] prev_end_iso: str | None = None for cluster_entries in raw_clusters: gap_before = 0.0 if prev_end_iso is not None: # Find start of this cluster ts_list = [ _parse_iso(e.timestamp_iso) for e in cluster_entries if e.timestamp_iso is not None ] if ts_list: this_start = min(ts_list) prev_end = _parse_iso(prev_end_iso) gap_before = (this_start - prev_end).total_seconds() cluster = _build_cluster( cluster_entries, gap_before_seconds=gap_before, burst_threshold=self._burst_threshold, burst_window_seconds=self._burst_window, ) clusters.append(cluster) if cluster.end_iso is not None: prev_end_iso = cluster.end_iso clusters_tuple = tuple(clusters) # Dominant sources: sort by total entry count descending source_counts: dict[str, int] = defaultdict(int) for entry in entries: source_counts[entry.source_id] += 1 dominant_sources = tuple( src for src, _ in sorted(source_counts.items(), key=lambda kv: -kv[1]) ) window_start = clusters[0].start_iso if clusters else None window_end = clusters[-1].end_iso if clusters else None gap_count = sum(1 for c in clusters if c.gap_before_seconds > 30) burst_count = sum(1 for c in clusters if c.burst) return TimelineResult( clusters=clusters_tuple, total_entries=len(entries), window_start=window_start, window_end=window_end, gap_count=gap_count, burst_count=burst_count, dominant_sources=dominant_sources, )