"""Stage 1: Timeline Reconstructor — pure Python, no ML.""" from __future__ import annotations import hashlib import logging from collections import defaultdict from datetime import datetime, timezone from app.services.diagnose.models import EventCluster, TimelineResult from app.services.search import SearchResult logger = logging.getLogger(__name__) _SEVERITY_ORDER: dict[str | None, int] = { "CRITICAL": 5, "ERROR": 4, "WARN": 3, "WARNING": 3, "INFO": 2, "DEBUG": 1, None: 0, } def _parse_iso(s: str) -> datetime | None: """Parse ISO 8601 string to UTC-aware datetime. Returns None on parse failure.""" try: dt = datetime.fromisoformat(s) except ValueError: logger.warning("Unparseable timestamp in log entry, treating as None: %r", s) return None if dt.tzinfo is None: logger.debug("Naive timestamp treated as UTC: %s", s) dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) def _sort_key(e: SearchResult) -> tuple[int, str]: """Sort key: timestamped entries first (ascending), then None-timestamp entries.""" if e.timestamp_iso is None: return (1, "") return (0, e.timestamp_iso) def _highest_severity(entries: list[SearchResult]) -> str: """Return the highest severity label across all entries.""" best: str | None = None best_rank = -1 for entry in entries: sev = entry.severity rank = _SEVERITY_ORDER.get(sev, 0) if rank > best_rank: best_rank = rank best = sev # SeverityLabel requires a valid literal; fall back to "UNKNOWN" if None if best is None: return "UNKNOWN" # Normalise WARNING -> WARN for the output type if best == "WARNING": return "WARN" return best def _representative_text(entries: list[SearchResult]) -> str: """Return text of the entry with highest rank; tie-break on longest text.""" if not entries: return "" best = max(entries, key=lambda e: (e.rank, len(e.text))) return best.text def _cluster_id(entry_ids: list[str]) -> str: """Compute a 12-char hex cluster ID from a sorted list of entry IDs.""" payload = ",".join(sorted(entry_ids)).encode() return hashlib.sha1(payload).hexdigest()[:12] # noqa: S324 — not used for security def _make_event_cluster( cluster_entries: list[SearchResult], gap_before_seconds: float, burst_threshold: int, burst_window_seconds: int, ) -> EventCluster: """Construct an EventCluster from a list of SearchResult entries.""" timestamps = [ ts for e in cluster_entries if e.timestamp_iso is not None for ts in (_parse_iso(e.timestamp_iso),) if ts is not None ] start_iso: str | None = None end_iso: str | None = None duration_seconds = 0.0 if timestamps: ts_min = min(timestamps) ts_max = max(timestamps) start_iso = ts_min.isoformat() end_iso = ts_max.isoformat() duration_seconds = (ts_max - ts_min).total_seconds() entry_ids = [e.entry_id for e in cluster_entries] burst = ( len(cluster_entries) >= burst_threshold and duration_seconds <= burst_window_seconds ) return EventCluster( cluster_id=_cluster_id(entry_ids), entries=tuple(entry_ids), start_iso=start_iso, end_iso=end_iso, duration_seconds=duration_seconds, source_ids=tuple(sorted(set(e.source_id for e in cluster_entries))), pattern_tags=tuple( sorted(set(tag for e in cluster_entries for tag in e.matched_patterns)) ), severity=_highest_severity(cluster_entries), # type: ignore[arg-type] # SeverityLabel is a Literal; _highest_severity returns a compatible str burst=burst, gap_before_seconds=gap_before_seconds, representative_text=_representative_text(cluster_entries), ) class TimelineReconstructor: """Reconstruct a structured timeline of event clusters from log entries. Pure Python — no ML or LLM calls. Designed as Stage 1 of the multi-agent diagnose pipeline. """ def __init__( self, cluster_window_seconds: int = 30, burst_threshold: int = 10, burst_window_seconds: int = 5, gap_significance_seconds: int = 30, ) -> None: self._cluster_window = cluster_window_seconds self._burst_threshold = burst_threshold self._burst_window = burst_window_seconds self._gap_significance_seconds: int = gap_significance_seconds def _sort_entries(self, entries: list[SearchResult]) -> list[SearchResult]: """Sort entries: timestamped first (ascending), then None-timestamp entries.""" return sorted(entries, key=_sort_key) def _group_into_raw_clusters( self, sorted_entries: list[SearchResult] ) -> list[list[SearchResult]]: """Group sorted entries into time-window clusters.""" raw_clusters: list[list[SearchResult]] = [] current: list[SearchResult] = [] cluster_anchor: datetime | None = None for entry in sorted_entries: if not current: current.append(entry) if entry.timestamp_iso is not None: cluster_anchor = _parse_iso(entry.timestamp_iso) continue if entry.timestamp_iso is None: # No timestamp — always joins the current cluster current.append(entry) continue entry_dt = _parse_iso(entry.timestamp_iso) if entry_dt is None: # Malformed timestamp — treat same as None: join current cluster current.append(entry) continue if cluster_anchor is None: # Current cluster has no anchor yet — set it, stay in cluster cluster_anchor = entry_dt current.append(entry) continue delta = (entry_dt - cluster_anchor).total_seconds() if delta > self._cluster_window: raw_clusters.append(current) current = [entry] cluster_anchor = entry_dt else: current.append(entry) if current: raw_clusters.append(current) return raw_clusters def _build_cluster( self, cluster_entries: list[SearchResult], prev_end_iso: str | None, ) -> EventCluster: """Build an EventCluster from a list of SearchResult entries.""" gap_before = 0.0 if prev_end_iso is not None: ts_list = [ ts for e in cluster_entries if e.timestamp_iso is not None for ts in (_parse_iso(e.timestamp_iso),) if ts is not None ] if ts_list: this_start = min(ts_list) prev_end = _parse_iso(prev_end_iso) if prev_end is not None: gap_before = (this_start - prev_end).total_seconds() return _make_event_cluster( cluster_entries, gap_before_seconds=gap_before, burst_threshold=self._burst_threshold, burst_window_seconds=self._burst_window, ) def _dominant_sources_tuple(self, entries: list[SearchResult]) -> tuple[str, ...]: """Return source_ids sorted by total entry count descending.""" source_counts: dict[str, int] = defaultdict(int) for entry in entries: source_counts[entry.source_id] += 1 return tuple( src for src, _ in sorted(source_counts.items(), key=lambda kv: -kv[1]) ) def reconstruct(self, entries: list[SearchResult]) -> TimelineResult: """Build a structured timeline from a flat list of log entries.""" if not entries: return TimelineResult( clusters=(), total_entries=0, window_start=None, window_end=None, gap_count=0, burst_count=0, dominant_sources=(), ) sorted_entries = self._sort_entries(entries) raw_clusters = self._group_into_raw_clusters(sorted_entries) clusters: list[EventCluster] = [] prev_end: str | None = None for raw in raw_clusters: c = self._build_cluster(raw, prev_end) clusters.append(c) prev_end = c.end_iso clusters_tuple = tuple(clusters) gap_count = sum( 1 for c in clusters_tuple if c.gap_before_seconds > self._gap_significance_seconds ) return TimelineResult( clusters=clusters_tuple, total_entries=len(entries), window_start=clusters_tuple[0].start_iso if clusters_tuple else None, window_end=clusters_tuple[-1].end_iso if clusters_tuple else None, gap_count=gap_count, burst_count=sum(1 for c in clusters_tuple if c.burst), dominant_sources=self._dominant_sources_tuple(entries), )