turnstone/app/services/diagnose/timeline.py
pyr0ball f7429ee963 feat: Stage 1 — TimelineReconstructor for multi-agent diagnose pipeline (issue #29)
- Add app/services/diagnose/timeline.py: pure-Python TimelineReconstructor
  - Sorts entries by timestamp_iso (None entries appended at end)
  - Sliding-window clustering anchored to first entry in each cluster
  - Computes cluster_id (sha1[:12]), severity (highest wins), burst flag,
    gap_before_seconds, representative_text (highest rank, longest text tiebreak)
  - Builds TimelineResult with dominant_sources sorted by entry count descending
- Update pipeline.py stub to import TimelineReconstructor (Task 6 wiring prep)
- Add tests/test_diagnose_timeline.py: 15 tests covering all 13 required cases
  plus null-timestamp edge case variant; all 318 tests passing

Closes: #29
2026-05-25 12:54:15 -07:00

245 lines
8 KiB
Python

"""Stage 1: Timeline Reconstructor — pure Python, no ML."""
from __future__ import annotations
import hashlib
import logging
from collections import defaultdict
from datetime import datetime, timezone
from app.services.diagnose.models import EventCluster, TimelineResult
from app.services.search import SearchResult
logger = logging.getLogger(__name__)
_SEVERITY_ORDER: dict[str | None, int] = {
"CRITICAL": 5,
"ERROR": 4,
"WARN": 3,
"WARNING": 3,
"INFO": 2,
"DEBUG": 1,
None: 0,
}
def _parse_iso(s: str) -> datetime:
"""Parse ISO 8601 string to UTC-aware datetime."""
dt = datetime.fromisoformat(s)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
def _highest_severity(entries: list[SearchResult]) -> str:
"""Return the highest severity label across all entries."""
best: str | None = None
best_rank = -1
for entry in entries:
sev = entry.severity
rank = _SEVERITY_ORDER.get(sev, 0)
if rank > best_rank:
best_rank = rank
best = sev
# SeverityLabel requires a valid literal; fall back to "UNKNOWN" if None
if best is None:
return "UNKNOWN"
# Normalise WARNING -> WARN for the output type
if best == "WARNING":
return "WARN"
return best
def _representative_text(entries: list[SearchResult]) -> str:
"""Return text of the entry with highest rank; tie-break on longest text."""
if not entries:
return ""
best = max(entries, key=lambda e: (e.rank, len(e.text)))
return best.text
def _cluster_id(entry_ids: list[str]) -> str:
"""Compute a 12-char hex cluster ID from a sorted list of entry IDs."""
payload = ",".join(sorted(entry_ids)).encode()
return hashlib.sha1(payload).hexdigest()[:12] # noqa: S324 — not used for security
def _build_cluster(
cluster_entries: list[SearchResult],
gap_before_seconds: float,
burst_threshold: int,
burst_window_seconds: int,
) -> EventCluster:
"""Construct an EventCluster from a list of SearchResult entries."""
timestamps = [
_parse_iso(e.timestamp_iso)
for e in cluster_entries
if e.timestamp_iso is not None
]
start_iso: str | None = None
end_iso: str | None = None
duration_seconds = 0.0
if timestamps:
ts_min = min(timestamps)
ts_max = max(timestamps)
start_iso = ts_min.isoformat()
end_iso = ts_max.isoformat()
duration_seconds = (ts_max - ts_min).total_seconds()
entry_ids = [e.entry_id for e in cluster_entries]
burst = (
len(cluster_entries) >= burst_threshold
and duration_seconds <= burst_window_seconds
)
return EventCluster(
cluster_id=_cluster_id(entry_ids),
entries=tuple(entry_ids),
start_iso=start_iso,
end_iso=end_iso,
duration_seconds=duration_seconds,
source_ids=tuple(sorted(set(e.source_id for e in cluster_entries))),
pattern_tags=tuple(
sorted(set(tag for e in cluster_entries for tag in e.matched_patterns))
),
severity=_highest_severity(cluster_entries), # type: ignore[arg-type]
burst=burst,
gap_before_seconds=gap_before_seconds,
representative_text=_representative_text(cluster_entries),
)
class TimelineReconstructor:
"""Reconstruct a structured timeline of event clusters from log entries.
Pure Python — no ML or LLM calls. Designed as Stage 1 of the multi-agent
diagnose pipeline.
"""
def __init__(
self,
cluster_window_seconds: int = 30,
burst_threshold: int = 10,
burst_window_seconds: int = 5,
) -> None:
self._cluster_window = cluster_window_seconds
self._burst_threshold = burst_threshold
self._burst_window = burst_window_seconds
def reconstruct(self, entries: list[SearchResult]) -> TimelineResult:
"""Build a TimelineResult from a flat list of SearchResult entries.
Entries are sorted by timestamp_iso ascending; entries without a
timestamp are appended at the end and always join the current cluster.
"""
if not entries:
return TimelineResult(
clusters=(),
total_entries=0,
window_start=None,
window_end=None,
gap_count=0,
burst_count=0,
dominant_sources=(),
)
# Sort: timestamped entries first (ascending), then None-timestamp entries
def _sort_key(e: SearchResult) -> tuple[int, str]:
if e.timestamp_iso is None:
return (1, "")
return (0, e.timestamp_iso)
sorted_entries = sorted(entries, key=_sort_key)
# Cluster using sliding window anchored to the first entry in each cluster
raw_clusters: list[list[SearchResult]] = []
current: list[SearchResult] = []
cluster_anchor: datetime | None = None
for entry in sorted_entries:
if not current:
# Start first cluster
current.append(entry)
if entry.timestamp_iso is not None:
cluster_anchor = _parse_iso(entry.timestamp_iso)
continue
if entry.timestamp_iso is None:
# No timestamp — always joins the current cluster
current.append(entry)
continue
entry_dt = _parse_iso(entry.timestamp_iso)
if cluster_anchor is None:
# Current cluster has no anchor yet — set it, stay in cluster
cluster_anchor = entry_dt
current.append(entry)
continue
delta = (entry_dt - cluster_anchor).total_seconds()
if delta > self._cluster_window:
# Start a new cluster
raw_clusters.append(current)
current = [entry]
cluster_anchor = entry_dt
else:
current.append(entry)
if current:
raw_clusters.append(current)
# Build EventCluster objects
clusters: list[EventCluster] = []
prev_end_iso: str | None = None
for cluster_entries in raw_clusters:
gap_before = 0.0
if prev_end_iso is not None:
# Find start of this cluster
ts_list = [
_parse_iso(e.timestamp_iso)
for e in cluster_entries
if e.timestamp_iso is not None
]
if ts_list:
this_start = min(ts_list)
prev_end = _parse_iso(prev_end_iso)
gap_before = (this_start - prev_end).total_seconds()
cluster = _build_cluster(
cluster_entries,
gap_before_seconds=gap_before,
burst_threshold=self._burst_threshold,
burst_window_seconds=self._burst_window,
)
clusters.append(cluster)
if cluster.end_iso is not None:
prev_end_iso = cluster.end_iso
clusters_tuple = tuple(clusters)
# Dominant sources: sort by total entry count descending
source_counts: dict[str, int] = defaultdict(int)
for entry in entries:
source_counts[entry.source_id] += 1
dominant_sources = tuple(
src
for src, _ in sorted(source_counts.items(), key=lambda kv: -kv[1])
)
window_start = clusters[0].start_iso if clusters else None
window_end = clusters[-1].end_iso if clusters else None
gap_count = sum(1 for c in clusters if c.gap_before_seconds > 30)
burst_count = sum(1 for c in clusters if c.burst)
return TimelineResult(
clusters=clusters_tuple,
total_entries=len(entries),
window_start=window_start,
window_end=window_end,
gap_count=gap_count,
burst_count=burst_count,
dominant_sources=dominant_sources,
)