"""Tests for app/services/diagnose/timeline.py — TimelineReconstructor.""" from __future__ import annotations from app.services.diagnose.timeline import TimelineReconstructor from app.services.diagnose.models import TimelineResult from app.services.search import SearchResult # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_entry( entry_id: str = "e1", source_id: str = "src-a", timestamp_iso: str | None = "2026-01-01T00:00:00+00:00", severity: str | None = "INFO", rank: float = 0.0, text: str = "log line", matched_patterns: list[str] | None = None, sequence: int = 1, ) -> SearchResult: return SearchResult( entry_id=entry_id, source_id=source_id, sequence=sequence, timestamp_iso=timestamp_iso, severity=severity, repeat_count=1, out_of_order=False, matched_patterns=matched_patterns or [], text=text, rank=rank, ) def _ts(offset_seconds: int) -> str: """Return an ISO timestamp offset_seconds after 2026-01-01T00:00:00+00:00.""" from datetime import datetime, timezone, timedelta base = datetime(2026, 1, 1, 0, 0, 0, tzinfo=timezone.utc) dt = base + timedelta(seconds=offset_seconds) return dt.isoformat() # --------------------------------------------------------------------------- # Test cases # --------------------------------------------------------------------------- class TestEmptyInput: def test_empty_returns_empty_timeline(self): rt = TimelineReconstructor() result = rt.reconstruct([]) assert result == TimelineResult( clusters=(), total_entries=0, gap_count=0, burst_count=0, window_start=None, window_end=None, dominant_sources=(), ) class TestSingleEntry: def test_single_entry_one_cluster(self): rt = TimelineReconstructor() entry = _make_entry(entry_id="e1", timestamp_iso=_ts(0)) result = rt.reconstruct([entry]) assert len(result.clusters) == 1 cluster = result.clusters[0] assert cluster.gap_before_seconds == 0.0 assert cluster.burst is False assert result.total_entries == 1 class TestClusteringWithinWindow: def test_two_entries_10s_apart_same_cluster(self): rt = TimelineReconstructor(cluster_window_seconds=30) entries = [ _make_entry(entry_id="e1", timestamp_iso=_ts(0)), _make_entry(entry_id="e2", timestamp_iso=_ts(10)), ] result = rt.reconstruct(entries) assert len(result.clusters) == 1 assert len(result.clusters[0].entries) == 2 class TestClusteringOutsideWindow: def test_two_entries_60s_apart_two_clusters(self): rt = TimelineReconstructor(cluster_window_seconds=30) entries = [ _make_entry(entry_id="e1", timestamp_iso=_ts(0)), _make_entry(entry_id="e2", timestamp_iso=_ts(60)), ] result = rt.reconstruct(entries) assert len(result.clusters) == 2 second_cluster = result.clusters[1] assert second_cluster.gap_before_seconds >= 60.0 def test_gap_count_correct_for_60s_gap(self): rt = TimelineReconstructor(cluster_window_seconds=30) entries = [ _make_entry(entry_id="e1", timestamp_iso=_ts(0)), _make_entry(entry_id="e2", timestamp_iso=_ts(60)), ] result = rt.reconstruct(entries) assert result.gap_count == 1 class TestBurst: def test_15_entries_within_3s_is_burst(self): rt = TimelineReconstructor( cluster_window_seconds=30, burst_threshold=10, burst_window_seconds=5, ) # All 15 entries within a 3-second window — well under burst_window=5 entries = [ _make_entry(entry_id=f"e{i}", timestamp_iso=_ts(i % 3), sequence=i) for i in range(15) ] result = rt.reconstruct(entries) # All should land in one cluster assert len(result.clusters) == 1 assert result.clusters[0].burst is True assert result.burst_count == 1 class TestNullTimestamps: def test_null_timestamp_joins_current_cluster(self): rt = TimelineReconstructor(cluster_window_seconds=30) entries = [ _make_entry(entry_id="e1", timestamp_iso=_ts(0)), _make_entry(entry_id="e2", timestamp_iso=None), ] # Should not raise, and null entry should join the existing cluster result = rt.reconstruct(entries) assert len(result.clusters) == 1 assert "e2" in result.clusters[0].entries def test_null_timestamp_does_not_start_new_cluster(self): rt = TimelineReconstructor(cluster_window_seconds=30) entries = [ _make_entry(entry_id="e1", timestamp_iso=_ts(0)), _make_entry(entry_id="e2", timestamp_iso=None), _make_entry(entry_id="e3", timestamp_iso=_ts(5)), ] result = rt.reconstruct(entries) # e3 is within 30s of e1, so all three in one cluster assert len(result.clusters) == 1 def test_all_null_timestamps_one_cluster_no_crash(self): rt = TimelineReconstructor() entries = [ _make_entry(entry_id="e1", timestamp_iso=None), _make_entry(entry_id="e2", timestamp_iso=None), ] result = rt.reconstruct(entries) assert len(result.clusters) == 1 cluster = result.clusters[0] assert cluster.start_iso is None assert cluster.end_iso is None assert result.window_start is None assert result.window_end is None class TestDominantSources: def test_dominant_sources_ordered_by_count_descending(self): rt = TimelineReconstructor() # src-b has 3 entries, src-a has 1 entries = [ _make_entry(entry_id="e1", source_id="src-a", timestamp_iso=_ts(0)), _make_entry(entry_id="e2", source_id="src-b", timestamp_iso=_ts(1)), _make_entry(entry_id="e3", source_id="src-b", timestamp_iso=_ts(2)), _make_entry(entry_id="e4", source_id="src-b", timestamp_iso=_ts(3)), ] result = rt.reconstruct(entries) assert result.dominant_sources[0] == "src-b" assert result.dominant_sources[1] == "src-a" class TestRepresentativeText: def test_representative_text_uses_highest_rank(self): rt = TimelineReconstructor() entries = [ _make_entry(entry_id="e1", timestamp_iso=_ts(0), rank=-5.0, text="low score"), _make_entry(entry_id="e2", timestamp_iso=_ts(1), rank=-1.0, text="high score"), ] result = rt.reconstruct(entries) assert result.clusters[0].representative_text == "high score" def test_representative_text_tiebreak_on_longest_text(self): rt = TimelineReconstructor() entries = [ _make_entry(entry_id="e1", timestamp_iso=_ts(0), rank=0.0, text="short"), _make_entry(entry_id="e2", timestamp_iso=_ts(1), rank=0.0, text="much longer text here"), ] result = rt.reconstruct(entries) assert result.clusters[0].representative_text == "much longer text here" class TestClusterId: def test_cluster_id_is_12_char_hex(self): rt = TimelineReconstructor() entry = _make_entry(entry_id="abc123", timestamp_iso=_ts(0)) result = rt.reconstruct([entry]) cluster_id = result.clusters[0].cluster_id assert len(cluster_id) == 12 assert all(c in "0123456789abcdef" for c in cluster_id) class TestSeverity: def test_critical_wins_over_error(self): rt = TimelineReconstructor() entries = [ _make_entry(entry_id="e1", timestamp_iso=_ts(0), severity="ERROR"), _make_entry(entry_id="e2", timestamp_iso=_ts(1), severity="CRITICAL"), _make_entry(entry_id="e3", timestamp_iso=_ts(2), severity="INFO"), ] result = rt.reconstruct(entries) assert result.clusters[0].severity == "CRITICAL" class TestPatternTags: def test_pattern_tags_union_across_entries(self): rt = TimelineReconstructor() entries = [ _make_entry(entry_id="e1", timestamp_iso=_ts(0), matched_patterns=["oom-killer"]), _make_entry(entry_id="e2", timestamp_iso=_ts(1), matched_patterns=["disk-full"]), ] result = rt.reconstruct(entries) tags = set(result.clusters[0].pattern_tags) assert "oom-killer" in tags assert "disk-full" in tags