From 65bbc438de8cac0a026bbb71b213ccb180b68385 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 25 May 2026 13:22:18 -0700 Subject: [PATCH] refactor: split TimelineReconstructor.reconstruct into helpers, fix magic number + error handling - Add gap_significance_seconds constructor param (default 30) to replace hardcoded magic number in gap_count computation - _parse_iso now returns datetime | None with try/except on ValueError; all callers handle None return by treating malformed timestamps as absent - Extract reconstruct into four private helpers: _sort_entries, _group_into_raw_clusters, _build_cluster, _dominant_sources_tuple - Promote _sort_key to module-level function (was nested inside reconstruct) - Rename old module-level _build_cluster to _make_event_cluster to avoid name collision with new instance method - Add explanatory comment to type: ignore[arg-type] at _highest_severity call site - Black-formatted --- app/services/diagnose/timeline.py | 169 +++++++++++++++++------------- 1 file changed, 98 insertions(+), 71 deletions(-) diff --git a/app/services/diagnose/timeline.py b/app/services/diagnose/timeline.py index 5870990..3d557dc 100644 --- a/app/services/diagnose/timeline.py +++ b/app/services/diagnose/timeline.py @@ -1,4 +1,5 @@ """Stage 1: Timeline Reconstructor — pure Python, no ML.""" + from __future__ import annotations import hashlib @@ -22,14 +23,26 @@ _SEVERITY_ORDER: dict[str | None, int] = { } -def _parse_iso(s: str) -> datetime: - """Parse ISO 8601 string to UTC-aware datetime.""" - dt = datetime.fromisoformat(s) +def _parse_iso(s: str) -> datetime | None: + """Parse ISO 8601 string to UTC-aware datetime. Returns None on parse failure.""" + try: + dt = datetime.fromisoformat(s) + except ValueError: + logger.warning("Unparseable timestamp in log entry, treating as None: %r", s) + return None if dt.tzinfo is None: + logger.debug("Naive timestamp treated as UTC: %s", s) dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) +def _sort_key(e: SearchResult) -> tuple[int, str]: + """Sort key: timestamped entries first (ascending), then None-timestamp entries.""" + if e.timestamp_iso is None: + return (1, "") + return (0, e.timestamp_iso) + + def _highest_severity(entries: list[SearchResult]) -> str: """Return the highest severity label across all entries.""" best: str | None = None @@ -63,7 +76,7 @@ def _cluster_id(entry_ids: list[str]) -> str: return hashlib.sha1(payload).hexdigest()[:12] # noqa: S324 — not used for security -def _build_cluster( +def _make_event_cluster( cluster_entries: list[SearchResult], gap_before_seconds: float, burst_threshold: int, @@ -71,9 +84,11 @@ def _build_cluster( ) -> EventCluster: """Construct an EventCluster from a list of SearchResult entries.""" timestamps = [ - _parse_iso(e.timestamp_iso) + ts for e in cluster_entries if e.timestamp_iso is not None + for ts in (_parse_iso(e.timestamp_iso),) + if ts is not None ] start_iso: str | None = None @@ -103,7 +118,7 @@ def _build_cluster( pattern_tags=tuple( sorted(set(tag for e in cluster_entries for tag in e.matched_patterns)) ), - severity=_highest_severity(cluster_entries), # type: ignore[arg-type] + severity=_highest_severity(cluster_entries), # type: ignore[arg-type] # SeverityLabel is a Literal; _highest_severity returns a compatible str burst=burst, gap_before_seconds=gap_before_seconds, representative_text=_representative_text(cluster_entries), @@ -122,44 +137,27 @@ class TimelineReconstructor: cluster_window_seconds: int = 30, burst_threshold: int = 10, burst_window_seconds: int = 5, + gap_significance_seconds: int = 30, ) -> None: self._cluster_window = cluster_window_seconds self._burst_threshold = burst_threshold self._burst_window = burst_window_seconds + self._gap_significance_seconds: int = gap_significance_seconds - def reconstruct(self, entries: list[SearchResult]) -> TimelineResult: - """Build a TimelineResult from a flat list of SearchResult entries. + def _sort_entries(self, entries: list[SearchResult]) -> list[SearchResult]: + """Sort entries: timestamped first (ascending), then None-timestamp entries.""" + return sorted(entries, key=_sort_key) - Entries are sorted by timestamp_iso ascending; entries without a - timestamp are appended at the end and always join the current cluster. - """ - if not entries: - return TimelineResult( - clusters=(), - total_entries=0, - window_start=None, - window_end=None, - gap_count=0, - burst_count=0, - dominant_sources=(), - ) - - # Sort: timestamped entries first (ascending), then None-timestamp entries - def _sort_key(e: SearchResult) -> tuple[int, str]: - if e.timestamp_iso is None: - return (1, "") - return (0, e.timestamp_iso) - - sorted_entries = sorted(entries, key=_sort_key) - - # Cluster using sliding window anchored to the first entry in each cluster + def _group_into_raw_clusters( + self, sorted_entries: list[SearchResult] + ) -> list[list[SearchResult]]: + """Group sorted entries into time-window clusters.""" raw_clusters: list[list[SearchResult]] = [] current: list[SearchResult] = [] cluster_anchor: datetime | None = None for entry in sorted_entries: if not current: - # Start first cluster current.append(entry) if entry.timestamp_iso is not None: cluster_anchor = _parse_iso(entry.timestamp_iso) @@ -172,6 +170,11 @@ class TimelineReconstructor: entry_dt = _parse_iso(entry.timestamp_iso) + if entry_dt is None: + # Malformed timestamp — treat same as None: join current cluster + current.append(entry) + continue + if cluster_anchor is None: # Current cluster has no anchor yet — set it, stay in cluster cluster_anchor = entry_dt @@ -180,7 +183,6 @@ class TimelineReconstructor: delta = (entry_dt - cluster_anchor).total_seconds() if delta > self._cluster_window: - # Start a new cluster raw_clusters.append(current) current = [entry] cluster_anchor = entry_dt @@ -190,56 +192,81 @@ class TimelineReconstructor: if current: raw_clusters.append(current) - # Build EventCluster objects - clusters: list[EventCluster] = [] - prev_end_iso: str | None = None + return raw_clusters - for cluster_entries in raw_clusters: - gap_before = 0.0 - if prev_end_iso is not None: - # Find start of this cluster - ts_list = [ - _parse_iso(e.timestamp_iso) - for e in cluster_entries - if e.timestamp_iso is not None - ] - if ts_list: - this_start = min(ts_list) - prev_end = _parse_iso(prev_end_iso) + def _build_cluster( + self, + cluster_entries: list[SearchResult], + prev_end_iso: str | None, + ) -> EventCluster: + """Build an EventCluster from a list of SearchResult entries.""" + gap_before = 0.0 + if prev_end_iso is not None: + ts_list = [ + ts + for e in cluster_entries + if e.timestamp_iso is not None + for ts in (_parse_iso(e.timestamp_iso),) + if ts is not None + ] + if ts_list: + this_start = min(ts_list) + prev_end = _parse_iso(prev_end_iso) + if prev_end is not None: gap_before = (this_start - prev_end).total_seconds() - cluster = _build_cluster( - cluster_entries, - gap_before_seconds=gap_before, - burst_threshold=self._burst_threshold, - burst_window_seconds=self._burst_window, - ) - clusters.append(cluster) - if cluster.end_iso is not None: - prev_end_iso = cluster.end_iso + return _make_event_cluster( + cluster_entries, + gap_before_seconds=gap_before, + burst_threshold=self._burst_threshold, + burst_window_seconds=self._burst_window, + ) - clusters_tuple = tuple(clusters) - - # Dominant sources: sort by total entry count descending + def _dominant_sources_tuple(self, entries: list[SearchResult]) -> tuple[str, ...]: + """Return source_ids sorted by total entry count descending.""" source_counts: dict[str, int] = defaultdict(int) for entry in entries: source_counts[entry.source_id] += 1 - dominant_sources = tuple( - src - for src, _ in sorted(source_counts.items(), key=lambda kv: -kv[1]) + return tuple( + src for src, _ in sorted(source_counts.items(), key=lambda kv: -kv[1]) ) - window_start = clusters[0].start_iso if clusters else None - window_end = clusters[-1].end_iso if clusters else None - gap_count = sum(1 for c in clusters if c.gap_before_seconds > 30) - burst_count = sum(1 for c in clusters if c.burst) + def reconstruct(self, entries: list[SearchResult]) -> TimelineResult: + """Build a structured timeline from a flat list of log entries.""" + if not entries: + return TimelineResult( + clusters=(), + total_entries=0, + window_start=None, + window_end=None, + gap_count=0, + burst_count=0, + dominant_sources=(), + ) + + sorted_entries = self._sort_entries(entries) + raw_clusters = self._group_into_raw_clusters(sorted_entries) + + clusters: list[EventCluster] = [] + prev_end: str | None = None + for raw in raw_clusters: + c = self._build_cluster(raw, prev_end) + clusters.append(c) + prev_end = c.end_iso + + clusters_tuple = tuple(clusters) + gap_count = sum( + 1 + for c in clusters_tuple + if c.gap_before_seconds > self._gap_significance_seconds + ) return TimelineResult( clusters=clusters_tuple, total_entries=len(entries), - window_start=window_start, - window_end=window_end, + window_start=clusters_tuple[0].start_iso if clusters_tuple else None, + window_end=clusters_tuple[-1].end_iso if clusters_tuple else None, gap_count=gap_count, - burst_count=burst_count, - dominant_sources=dominant_sources, + burst_count=sum(1 for c in clusters_tuple if c.burst), + dominant_sources=self._dominant_sources_tuple(entries), )