refactor: split TimelineReconstructor.reconstruct into helpers, fix magic number + error handling

- Add gap_significance_seconds constructor param (default 30) to replace hardcoded magic number in gap_count computation
- _parse_iso now returns datetime | None with try/except on ValueError; all callers handle None return by treating malformed timestamps as absent
- Extract reconstruct into four private helpers: _sort_entries, _group_into_raw_clusters, _build_cluster, _dominant_sources_tuple
- Promote _sort_key to module-level function (was nested inside reconstruct)
- Rename old module-level _build_cluster to _make_event_cluster to avoid name collision with new instance method
- Add explanatory comment to type: ignore[arg-type] at _highest_severity call site
- Black-formatted
This commit is contained in:
pyr0ball 2026-05-25 13:22:18 -07:00
parent a6e65ec95d
commit 65bbc438de

View file

@ -1,4 +1,5 @@
"""Stage 1: Timeline Reconstructor — pure Python, no ML.""" """Stage 1: Timeline Reconstructor — pure Python, no ML."""
from __future__ import annotations from __future__ import annotations
import hashlib import hashlib
@ -22,14 +23,26 @@ _SEVERITY_ORDER: dict[str | None, int] = {
} }
def _parse_iso(s: str) -> datetime: def _parse_iso(s: str) -> datetime | None:
"""Parse ISO 8601 string to UTC-aware datetime.""" """Parse ISO 8601 string to UTC-aware datetime. Returns None on parse failure."""
dt = datetime.fromisoformat(s) try:
dt = datetime.fromisoformat(s)
except ValueError:
logger.warning("Unparseable timestamp in log entry, treating as None: %r", s)
return None
if dt.tzinfo is None: if dt.tzinfo is None:
logger.debug("Naive timestamp treated as UTC: %s", s)
dt = dt.replace(tzinfo=timezone.utc) dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc) return dt.astimezone(timezone.utc)
def _sort_key(e: SearchResult) -> tuple[int, str]:
"""Sort key: timestamped entries first (ascending), then None-timestamp entries."""
if e.timestamp_iso is None:
return (1, "")
return (0, e.timestamp_iso)
def _highest_severity(entries: list[SearchResult]) -> str: def _highest_severity(entries: list[SearchResult]) -> str:
"""Return the highest severity label across all entries.""" """Return the highest severity label across all entries."""
best: str | None = None best: str | None = None
@ -63,7 +76,7 @@ def _cluster_id(entry_ids: list[str]) -> str:
return hashlib.sha1(payload).hexdigest()[:12] # noqa: S324 — not used for security return hashlib.sha1(payload).hexdigest()[:12] # noqa: S324 — not used for security
def _build_cluster( def _make_event_cluster(
cluster_entries: list[SearchResult], cluster_entries: list[SearchResult],
gap_before_seconds: float, gap_before_seconds: float,
burst_threshold: int, burst_threshold: int,
@ -71,9 +84,11 @@ def _build_cluster(
) -> EventCluster: ) -> EventCluster:
"""Construct an EventCluster from a list of SearchResult entries.""" """Construct an EventCluster from a list of SearchResult entries."""
timestamps = [ timestamps = [
_parse_iso(e.timestamp_iso) ts
for e in cluster_entries for e in cluster_entries
if e.timestamp_iso is not None if e.timestamp_iso is not None
for ts in (_parse_iso(e.timestamp_iso),)
if ts is not None
] ]
start_iso: str | None = None start_iso: str | None = None
@ -103,7 +118,7 @@ def _build_cluster(
pattern_tags=tuple( pattern_tags=tuple(
sorted(set(tag for e in cluster_entries for tag in e.matched_patterns)) sorted(set(tag for e in cluster_entries for tag in e.matched_patterns))
), ),
severity=_highest_severity(cluster_entries), # type: ignore[arg-type] severity=_highest_severity(cluster_entries), # type: ignore[arg-type] # SeverityLabel is a Literal; _highest_severity returns a compatible str
burst=burst, burst=burst,
gap_before_seconds=gap_before_seconds, gap_before_seconds=gap_before_seconds,
representative_text=_representative_text(cluster_entries), representative_text=_representative_text(cluster_entries),
@ -122,44 +137,27 @@ class TimelineReconstructor:
cluster_window_seconds: int = 30, cluster_window_seconds: int = 30,
burst_threshold: int = 10, burst_threshold: int = 10,
burst_window_seconds: int = 5, burst_window_seconds: int = 5,
gap_significance_seconds: int = 30,
) -> None: ) -> None:
self._cluster_window = cluster_window_seconds self._cluster_window = cluster_window_seconds
self._burst_threshold = burst_threshold self._burst_threshold = burst_threshold
self._burst_window = burst_window_seconds self._burst_window = burst_window_seconds
self._gap_significance_seconds: int = gap_significance_seconds
def reconstruct(self, entries: list[SearchResult]) -> TimelineResult: def _sort_entries(self, entries: list[SearchResult]) -> list[SearchResult]:
"""Build a TimelineResult from a flat list of SearchResult entries. """Sort entries: timestamped first (ascending), then None-timestamp entries."""
return sorted(entries, key=_sort_key)
Entries are sorted by timestamp_iso ascending; entries without a def _group_into_raw_clusters(
timestamp are appended at the end and always join the current cluster. self, sorted_entries: list[SearchResult]
""" ) -> list[list[SearchResult]]:
if not entries: """Group sorted entries into time-window clusters."""
return TimelineResult(
clusters=(),
total_entries=0,
window_start=None,
window_end=None,
gap_count=0,
burst_count=0,
dominant_sources=(),
)
# Sort: timestamped entries first (ascending), then None-timestamp entries
def _sort_key(e: SearchResult) -> tuple[int, str]:
if e.timestamp_iso is None:
return (1, "")
return (0, e.timestamp_iso)
sorted_entries = sorted(entries, key=_sort_key)
# Cluster using sliding window anchored to the first entry in each cluster
raw_clusters: list[list[SearchResult]] = [] raw_clusters: list[list[SearchResult]] = []
current: list[SearchResult] = [] current: list[SearchResult] = []
cluster_anchor: datetime | None = None cluster_anchor: datetime | None = None
for entry in sorted_entries: for entry in sorted_entries:
if not current: if not current:
# Start first cluster
current.append(entry) current.append(entry)
if entry.timestamp_iso is not None: if entry.timestamp_iso is not None:
cluster_anchor = _parse_iso(entry.timestamp_iso) cluster_anchor = _parse_iso(entry.timestamp_iso)
@ -172,6 +170,11 @@ class TimelineReconstructor:
entry_dt = _parse_iso(entry.timestamp_iso) entry_dt = _parse_iso(entry.timestamp_iso)
if entry_dt is None:
# Malformed timestamp — treat same as None: join current cluster
current.append(entry)
continue
if cluster_anchor is None: if cluster_anchor is None:
# Current cluster has no anchor yet — set it, stay in cluster # Current cluster has no anchor yet — set it, stay in cluster
cluster_anchor = entry_dt cluster_anchor = entry_dt
@ -180,7 +183,6 @@ class TimelineReconstructor:
delta = (entry_dt - cluster_anchor).total_seconds() delta = (entry_dt - cluster_anchor).total_seconds()
if delta > self._cluster_window: if delta > self._cluster_window:
# Start a new cluster
raw_clusters.append(current) raw_clusters.append(current)
current = [entry] current = [entry]
cluster_anchor = entry_dt cluster_anchor = entry_dt
@ -190,56 +192,81 @@ class TimelineReconstructor:
if current: if current:
raw_clusters.append(current) raw_clusters.append(current)
# Build EventCluster objects return raw_clusters
clusters: list[EventCluster] = []
prev_end_iso: str | None = None
for cluster_entries in raw_clusters: def _build_cluster(
gap_before = 0.0 self,
if prev_end_iso is not None: cluster_entries: list[SearchResult],
# Find start of this cluster prev_end_iso: str | None,
ts_list = [ ) -> EventCluster:
_parse_iso(e.timestamp_iso) """Build an EventCluster from a list of SearchResult entries."""
for e in cluster_entries gap_before = 0.0
if e.timestamp_iso is not None if prev_end_iso is not None:
] ts_list = [
if ts_list: ts
this_start = min(ts_list) for e in cluster_entries
prev_end = _parse_iso(prev_end_iso) if e.timestamp_iso is not None
for ts in (_parse_iso(e.timestamp_iso),)
if ts is not None
]
if ts_list:
this_start = min(ts_list)
prev_end = _parse_iso(prev_end_iso)
if prev_end is not None:
gap_before = (this_start - prev_end).total_seconds() gap_before = (this_start - prev_end).total_seconds()
cluster = _build_cluster( return _make_event_cluster(
cluster_entries, cluster_entries,
gap_before_seconds=gap_before, gap_before_seconds=gap_before,
burst_threshold=self._burst_threshold, burst_threshold=self._burst_threshold,
burst_window_seconds=self._burst_window, burst_window_seconds=self._burst_window,
) )
clusters.append(cluster)
if cluster.end_iso is not None:
prev_end_iso = cluster.end_iso
clusters_tuple = tuple(clusters) def _dominant_sources_tuple(self, entries: list[SearchResult]) -> tuple[str, ...]:
"""Return source_ids sorted by total entry count descending."""
# Dominant sources: sort by total entry count descending
source_counts: dict[str, int] = defaultdict(int) source_counts: dict[str, int] = defaultdict(int)
for entry in entries: for entry in entries:
source_counts[entry.source_id] += 1 source_counts[entry.source_id] += 1
dominant_sources = tuple( return tuple(
src src for src, _ in sorted(source_counts.items(), key=lambda kv: -kv[1])
for src, _ in sorted(source_counts.items(), key=lambda kv: -kv[1])
) )
window_start = clusters[0].start_iso if clusters else None def reconstruct(self, entries: list[SearchResult]) -> TimelineResult:
window_end = clusters[-1].end_iso if clusters else None """Build a structured timeline from a flat list of log entries."""
gap_count = sum(1 for c in clusters if c.gap_before_seconds > 30) if not entries:
burst_count = sum(1 for c in clusters if c.burst) return TimelineResult(
clusters=(),
total_entries=0,
window_start=None,
window_end=None,
gap_count=0,
burst_count=0,
dominant_sources=(),
)
sorted_entries = self._sort_entries(entries)
raw_clusters = self._group_into_raw_clusters(sorted_entries)
clusters: list[EventCluster] = []
prev_end: str | None = None
for raw in raw_clusters:
c = self._build_cluster(raw, prev_end)
clusters.append(c)
prev_end = c.end_iso
clusters_tuple = tuple(clusters)
gap_count = sum(
1
for c in clusters_tuple
if c.gap_before_seconds > self._gap_significance_seconds
)
return TimelineResult( return TimelineResult(
clusters=clusters_tuple, clusters=clusters_tuple,
total_entries=len(entries), total_entries=len(entries),
window_start=window_start, window_start=clusters_tuple[0].start_iso if clusters_tuple else None,
window_end=window_end, window_end=clusters_tuple[-1].end_iso if clusters_tuple else None,
gap_count=gap_count, gap_count=gap_count,
burst_count=burst_count, burst_count=sum(1 for c in clusters_tuple if c.burst),
dominant_sources=dominant_sources, dominant_sources=self._dominant_sources_tuple(entries),
) )