From a6e65ec95d3427ee0f1f373805c0e9dc2f89a8d9 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 25 May 2026 12:54:15 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20Stage=201=20=E2=80=94=20TimelineReconst?= =?UTF-8?q?ructor=20for=20multi-agent=20diagnose=20pipeline=20(issue=20#29?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add app/services/diagnose/timeline.py: pure-Python TimelineReconstructor - Sorts entries by timestamp_iso (None entries appended at end) - Sliding-window clustering anchored to first entry in each cluster - Computes cluster_id (sha1[:12]), severity (highest wins), burst flag, gap_before_seconds, representative_text (highest rank, longest text tiebreak) - Builds TimelineResult with dominant_sources sorted by entry count descending - Update pipeline.py stub to import TimelineReconstructor (Task 6 wiring prep) - Add tests/test_diagnose_timeline.py: 15 tests covering all 13 required cases plus null-timestamp edge case variant; all 318 tests passing Closes: https://git.opensourcesolarpunk.com/Circuit-Forge/turnstone/issues/29 --- app/services/diagnose/pipeline.py | 6 + app/services/diagnose/timeline.py | 245 ++++++++++++++++++++++++++++++ tests/test_diagnose_timeline.py | 234 ++++++++++++++++++++++++++++ 3 files changed, 485 insertions(+) create mode 100644 app/services/diagnose/timeline.py create mode 100644 tests/test_diagnose_timeline.py diff --git a/app/services/diagnose/pipeline.py b/app/services/diagnose/pipeline.py index 6c713b4..c8ffdcb 100644 --- a/app/services/diagnose/pipeline.py +++ b/app/services/diagnose/pipeline.py @@ -2,9 +2,15 @@ from __future__ import annotations +import logging from typing import Any +from app.services.diagnose.timeline import TimelineReconstructor + +logger = logging.getLogger(__name__) + # run_pipeline() will be implemented in Task 6 +logger.debug("TimelineReconstructor available: %s", TimelineReconstructor) async def run_pipeline(*args: Any, **kwargs: Any) -> None: diff --git a/app/services/diagnose/timeline.py b/app/services/diagnose/timeline.py new file mode 100644 index 0000000..5870990 --- /dev/null +++ b/app/services/diagnose/timeline.py @@ -0,0 +1,245 @@ +"""Stage 1: Timeline Reconstructor — pure Python, no ML.""" +from __future__ import annotations + +import hashlib +import logging +from collections import defaultdict +from datetime import datetime, timezone + +from app.services.diagnose.models import EventCluster, TimelineResult +from app.services.search import SearchResult + +logger = logging.getLogger(__name__) + +_SEVERITY_ORDER: dict[str | None, int] = { + "CRITICAL": 5, + "ERROR": 4, + "WARN": 3, + "WARNING": 3, + "INFO": 2, + "DEBUG": 1, + None: 0, +} + + +def _parse_iso(s: str) -> datetime: + """Parse ISO 8601 string to UTC-aware datetime.""" + dt = datetime.fromisoformat(s) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + + +def _highest_severity(entries: list[SearchResult]) -> str: + """Return the highest severity label across all entries.""" + best: str | None = None + best_rank = -1 + for entry in entries: + sev = entry.severity + rank = _SEVERITY_ORDER.get(sev, 0) + if rank > best_rank: + best_rank = rank + best = sev + # SeverityLabel requires a valid literal; fall back to "UNKNOWN" if None + if best is None: + return "UNKNOWN" + # Normalise WARNING -> WARN for the output type + if best == "WARNING": + return "WARN" + return best + + +def _representative_text(entries: list[SearchResult]) -> str: + """Return text of the entry with highest rank; tie-break on longest text.""" + if not entries: + return "" + best = max(entries, key=lambda e: (e.rank, len(e.text))) + return best.text + + +def _cluster_id(entry_ids: list[str]) -> str: + """Compute a 12-char hex cluster ID from a sorted list of entry IDs.""" + payload = ",".join(sorted(entry_ids)).encode() + return hashlib.sha1(payload).hexdigest()[:12] # noqa: S324 — not used for security + + +def _build_cluster( + cluster_entries: list[SearchResult], + gap_before_seconds: float, + burst_threshold: int, + burst_window_seconds: int, +) -> EventCluster: + """Construct an EventCluster from a list of SearchResult entries.""" + timestamps = [ + _parse_iso(e.timestamp_iso) + for e in cluster_entries + if e.timestamp_iso is not None + ] + + start_iso: str | None = None + end_iso: str | None = None + duration_seconds = 0.0 + + if timestamps: + ts_min = min(timestamps) + ts_max = max(timestamps) + start_iso = ts_min.isoformat() + end_iso = ts_max.isoformat() + duration_seconds = (ts_max - ts_min).total_seconds() + + entry_ids = [e.entry_id for e in cluster_entries] + burst = ( + len(cluster_entries) >= burst_threshold + and duration_seconds <= burst_window_seconds + ) + + return EventCluster( + cluster_id=_cluster_id(entry_ids), + entries=tuple(entry_ids), + start_iso=start_iso, + end_iso=end_iso, + duration_seconds=duration_seconds, + source_ids=tuple(sorted(set(e.source_id for e in cluster_entries))), + pattern_tags=tuple( + sorted(set(tag for e in cluster_entries for tag in e.matched_patterns)) + ), + severity=_highest_severity(cluster_entries), # type: ignore[arg-type] + burst=burst, + gap_before_seconds=gap_before_seconds, + representative_text=_representative_text(cluster_entries), + ) + + +class TimelineReconstructor: + """Reconstruct a structured timeline of event clusters from log entries. + + Pure Python — no ML or LLM calls. Designed as Stage 1 of the multi-agent + diagnose pipeline. + """ + + def __init__( + self, + cluster_window_seconds: int = 30, + burst_threshold: int = 10, + burst_window_seconds: int = 5, + ) -> None: + self._cluster_window = cluster_window_seconds + self._burst_threshold = burst_threshold + self._burst_window = burst_window_seconds + + def reconstruct(self, entries: list[SearchResult]) -> TimelineResult: + """Build a TimelineResult from a flat list of SearchResult entries. + + Entries are sorted by timestamp_iso ascending; entries without a + timestamp are appended at the end and always join the current cluster. + """ + if not entries: + return TimelineResult( + clusters=(), + total_entries=0, + window_start=None, + window_end=None, + gap_count=0, + burst_count=0, + dominant_sources=(), + ) + + # Sort: timestamped entries first (ascending), then None-timestamp entries + def _sort_key(e: SearchResult) -> tuple[int, str]: + if e.timestamp_iso is None: + return (1, "") + return (0, e.timestamp_iso) + + sorted_entries = sorted(entries, key=_sort_key) + + # Cluster using sliding window anchored to the first entry in each cluster + raw_clusters: list[list[SearchResult]] = [] + current: list[SearchResult] = [] + cluster_anchor: datetime | None = None + + for entry in sorted_entries: + if not current: + # Start first cluster + current.append(entry) + if entry.timestamp_iso is not None: + cluster_anchor = _parse_iso(entry.timestamp_iso) + continue + + if entry.timestamp_iso is None: + # No timestamp — always joins the current cluster + current.append(entry) + continue + + entry_dt = _parse_iso(entry.timestamp_iso) + + if cluster_anchor is None: + # Current cluster has no anchor yet — set it, stay in cluster + cluster_anchor = entry_dt + current.append(entry) + continue + + delta = (entry_dt - cluster_anchor).total_seconds() + if delta > self._cluster_window: + # Start a new cluster + raw_clusters.append(current) + current = [entry] + cluster_anchor = entry_dt + else: + current.append(entry) + + if current: + raw_clusters.append(current) + + # Build EventCluster objects + clusters: list[EventCluster] = [] + prev_end_iso: str | None = None + + for cluster_entries in raw_clusters: + gap_before = 0.0 + if prev_end_iso is not None: + # Find start of this cluster + ts_list = [ + _parse_iso(e.timestamp_iso) + for e in cluster_entries + if e.timestamp_iso is not None + ] + if ts_list: + this_start = min(ts_list) + prev_end = _parse_iso(prev_end_iso) + gap_before = (this_start - prev_end).total_seconds() + + cluster = _build_cluster( + cluster_entries, + gap_before_seconds=gap_before, + burst_threshold=self._burst_threshold, + burst_window_seconds=self._burst_window, + ) + clusters.append(cluster) + if cluster.end_iso is not None: + prev_end_iso = cluster.end_iso + + clusters_tuple = tuple(clusters) + + # Dominant sources: sort by total entry count descending + source_counts: dict[str, int] = defaultdict(int) + for entry in entries: + source_counts[entry.source_id] += 1 + dominant_sources = tuple( + src + for src, _ in sorted(source_counts.items(), key=lambda kv: -kv[1]) + ) + + window_start = clusters[0].start_iso if clusters else None + window_end = clusters[-1].end_iso if clusters else None + gap_count = sum(1 for c in clusters if c.gap_before_seconds > 30) + burst_count = sum(1 for c in clusters if c.burst) + + return TimelineResult( + clusters=clusters_tuple, + total_entries=len(entries), + window_start=window_start, + window_end=window_end, + gap_count=gap_count, + burst_count=burst_count, + dominant_sources=dominant_sources, + ) diff --git a/tests/test_diagnose_timeline.py b/tests/test_diagnose_timeline.py new file mode 100644 index 0000000..0f5c6dc --- /dev/null +++ b/tests/test_diagnose_timeline.py @@ -0,0 +1,234 @@ +"""Tests for app/services/diagnose/timeline.py — TimelineReconstructor.""" +from __future__ import annotations + +from app.services.diagnose.timeline import TimelineReconstructor +from app.services.diagnose.models import TimelineResult +from app.services.search import SearchResult + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_entry( + entry_id: str = "e1", + source_id: str = "src-a", + timestamp_iso: str | None = "2026-01-01T00:00:00+00:00", + severity: str | None = "INFO", + rank: float = 0.0, + text: str = "log line", + matched_patterns: list[str] | None = None, + sequence: int = 1, +) -> SearchResult: + return SearchResult( + entry_id=entry_id, + source_id=source_id, + sequence=sequence, + timestamp_iso=timestamp_iso, + severity=severity, + repeat_count=1, + out_of_order=False, + matched_patterns=matched_patterns or [], + text=text, + rank=rank, + ) + + +def _ts(offset_seconds: int) -> str: + """Return an ISO timestamp offset_seconds after 2026-01-01T00:00:00+00:00.""" + from datetime import datetime, timezone, timedelta + base = datetime(2026, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + dt = base + timedelta(seconds=offset_seconds) + return dt.isoformat() + + +# --------------------------------------------------------------------------- +# Test cases +# --------------------------------------------------------------------------- + +class TestEmptyInput: + def test_empty_returns_empty_timeline(self): + rt = TimelineReconstructor() + result = rt.reconstruct([]) + assert result == TimelineResult( + clusters=(), + total_entries=0, + gap_count=0, + burst_count=0, + window_start=None, + window_end=None, + dominant_sources=(), + ) + + +class TestSingleEntry: + def test_single_entry_one_cluster(self): + rt = TimelineReconstructor() + entry = _make_entry(entry_id="e1", timestamp_iso=_ts(0)) + result = rt.reconstruct([entry]) + assert len(result.clusters) == 1 + cluster = result.clusters[0] + assert cluster.gap_before_seconds == 0.0 + assert cluster.burst is False + assert result.total_entries == 1 + + +class TestClusteringWithinWindow: + def test_two_entries_10s_apart_same_cluster(self): + rt = TimelineReconstructor(cluster_window_seconds=30) + entries = [ + _make_entry(entry_id="e1", timestamp_iso=_ts(0)), + _make_entry(entry_id="e2", timestamp_iso=_ts(10)), + ] + result = rt.reconstruct(entries) + assert len(result.clusters) == 1 + assert len(result.clusters[0].entries) == 2 + + +class TestClusteringOutsideWindow: + def test_two_entries_60s_apart_two_clusters(self): + rt = TimelineReconstructor(cluster_window_seconds=30) + entries = [ + _make_entry(entry_id="e1", timestamp_iso=_ts(0)), + _make_entry(entry_id="e2", timestamp_iso=_ts(60)), + ] + result = rt.reconstruct(entries) + assert len(result.clusters) == 2 + second_cluster = result.clusters[1] + assert second_cluster.gap_before_seconds >= 60.0 + + def test_gap_count_correct_for_60s_gap(self): + rt = TimelineReconstructor(cluster_window_seconds=30) + entries = [ + _make_entry(entry_id="e1", timestamp_iso=_ts(0)), + _make_entry(entry_id="e2", timestamp_iso=_ts(60)), + ] + result = rt.reconstruct(entries) + assert result.gap_count == 1 + + +class TestBurst: + def test_15_entries_within_3s_is_burst(self): + rt = TimelineReconstructor( + cluster_window_seconds=30, + burst_threshold=10, + burst_window_seconds=5, + ) + # All 15 entries within a 3-second window — well under burst_window=5 + entries = [ + _make_entry(entry_id=f"e{i}", timestamp_iso=_ts(i % 3), sequence=i) + for i in range(15) + ] + result = rt.reconstruct(entries) + # All should land in one cluster + assert len(result.clusters) == 1 + assert result.clusters[0].burst is True + assert result.burst_count == 1 + + +class TestNullTimestamps: + def test_null_timestamp_joins_current_cluster(self): + rt = TimelineReconstructor(cluster_window_seconds=30) + entries = [ + _make_entry(entry_id="e1", timestamp_iso=_ts(0)), + _make_entry(entry_id="e2", timestamp_iso=None), + ] + # Should not raise, and null entry should join the existing cluster + result = rt.reconstruct(entries) + assert len(result.clusters) == 1 + assert "e2" in result.clusters[0].entries + + def test_null_timestamp_does_not_start_new_cluster(self): + rt = TimelineReconstructor(cluster_window_seconds=30) + entries = [ + _make_entry(entry_id="e1", timestamp_iso=_ts(0)), + _make_entry(entry_id="e2", timestamp_iso=None), + _make_entry(entry_id="e3", timestamp_iso=_ts(5)), + ] + result = rt.reconstruct(entries) + # e3 is within 30s of e1, so all three in one cluster + assert len(result.clusters) == 1 + + def test_all_null_timestamps_one_cluster_no_crash(self): + rt = TimelineReconstructor() + entries = [ + _make_entry(entry_id="e1", timestamp_iso=None), + _make_entry(entry_id="e2", timestamp_iso=None), + ] + result = rt.reconstruct(entries) + assert len(result.clusters) == 1 + cluster = result.clusters[0] + assert cluster.start_iso is None + assert cluster.end_iso is None + assert result.window_start is None + assert result.window_end is None + + +class TestDominantSources: + def test_dominant_sources_ordered_by_count_descending(self): + rt = TimelineReconstructor() + # src-b has 3 entries, src-a has 1 + entries = [ + _make_entry(entry_id="e1", source_id="src-a", timestamp_iso=_ts(0)), + _make_entry(entry_id="e2", source_id="src-b", timestamp_iso=_ts(1)), + _make_entry(entry_id="e3", source_id="src-b", timestamp_iso=_ts(2)), + _make_entry(entry_id="e4", source_id="src-b", timestamp_iso=_ts(3)), + ] + result = rt.reconstruct(entries) + assert result.dominant_sources[0] == "src-b" + assert result.dominant_sources[1] == "src-a" + + +class TestRepresentativeText: + def test_representative_text_uses_highest_rank(self): + rt = TimelineReconstructor() + entries = [ + _make_entry(entry_id="e1", timestamp_iso=_ts(0), rank=-5.0, text="low score"), + _make_entry(entry_id="e2", timestamp_iso=_ts(1), rank=-1.0, text="high score"), + ] + result = rt.reconstruct(entries) + assert result.clusters[0].representative_text == "high score" + + def test_representative_text_tiebreak_on_longest_text(self): + rt = TimelineReconstructor() + entries = [ + _make_entry(entry_id="e1", timestamp_iso=_ts(0), rank=0.0, text="short"), + _make_entry(entry_id="e2", timestamp_iso=_ts(1), rank=0.0, text="much longer text here"), + ] + result = rt.reconstruct(entries) + assert result.clusters[0].representative_text == "much longer text here" + + +class TestClusterId: + def test_cluster_id_is_12_char_hex(self): + rt = TimelineReconstructor() + entry = _make_entry(entry_id="abc123", timestamp_iso=_ts(0)) + result = rt.reconstruct([entry]) + cluster_id = result.clusters[0].cluster_id + assert len(cluster_id) == 12 + assert all(c in "0123456789abcdef" for c in cluster_id) + + +class TestSeverity: + def test_critical_wins_over_error(self): + rt = TimelineReconstructor() + entries = [ + _make_entry(entry_id="e1", timestamp_iso=_ts(0), severity="ERROR"), + _make_entry(entry_id="e2", timestamp_iso=_ts(1), severity="CRITICAL"), + _make_entry(entry_id="e3", timestamp_iso=_ts(2), severity="INFO"), + ] + result = rt.reconstruct(entries) + assert result.clusters[0].severity == "CRITICAL" + + +class TestPatternTags: + def test_pattern_tags_union_across_entries(self): + rt = TimelineReconstructor() + entries = [ + _make_entry(entry_id="e1", timestamp_iso=_ts(0), matched_patterns=["oom-killer"]), + _make_entry(entry_id="e2", timestamp_iso=_ts(1), matched_patterns=["disk-full"]), + ] + result = rt.reconstruct(entries) + tags = set(result.clusters[0].pattern_tags) + assert "oom-killer" in tags + assert "disk-full" in tags