feat(diagnose): 5-stage multi-agent diagnose pipeline (#29) #39
No reviewers
Labels
No labels
compliance
demo
deployment
docs
enhancement
parser
patterns
performance
security
ux
No milestone
No project
No assignees
1 participant
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference: Circuit-Forge/turnstone#39
Loading…
Reference in a new issue
No description provided.
Delete branch "feat/29-multi-agent-diagnose"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Summary
Replaces the single-LLM
summarize()call indiagnose_streamwith a 5-stage ML pipeline, gated behindTURNSTONE_MULTI_AGENT_DIAGNOSE=true. 100% backward compatible — existing SSE event protocol unchanged, Vue frontend requires no changes.Pipeline Architecture
New Files
app/services/diagnose/__init__.py— feature flag wiring + legacy pathapp/services/diagnose/legacy.py— verbatim copy of old diagnose.pyapp/services/diagnose/models.py— 5 frozen dataclassesapp/services/diagnose/pipeline.py— async generator orchestratorapp/services/diagnose/timeline.py— Stage 1app/services/diagnose/classifier.py— Stage 2app/services/diagnose/hypothesizer.py— Stage 3app/services/diagnose/suppressor.py— Stage 4app/services/diagnose/synthesizer.py— Stage 5tests/test_diagnose_timeline.py— 15 teststests/test_diagnose_classifier.py— 10 teststests/test_diagnose_hypothesizer.py— 12 teststests/test_diagnose_suppressor.py— 10 tests (incl. borderline boundary)tests/test_diagnose_synthesizer.py— 8 teststests/test_diagnose_pipeline.py— 13 testsTest Results
Notable Design Decisions
frozen=Truewithtuplefields (no mutable lists)MULTI_AGENT_ENABLEDevaluated at import time from env var — no runtime reload neededautousefixtures in testsasyncio.to_threadused for synchronous ML inference to avoid blocking the event looppipeline_stage(4x) andhypotheses; existingsummary/entries/reasoning/doneevents unchangedFixes Caught in Code Review
suppress_thresholdsemantics were inverted — was suppressing when similarity > 0.15 instead of > 0.85. Fixed tosuppress = max_sim >= similarity_threshold. Added borderline test.suppression_reasondisplay guard was too loose in synthesizer — tightened toif rh.suppress and rh.suppression_reasonFollow-up Issues Filed
Activation
To enable after merge:
Closing #29.
Adds SSH-based log collection from remote hosts via Paramiko. One SSH connection per host, multiple log types per connection. New files: - app/glean/ssh.py: SSHTransport context manager + command builders for journald, syslog, plaintext, and docker log types - tests/test_glean_ssh.py: 18 tests for transport layer (all mocked) - tests/test_glean_pipeline_ssh.py: 15 tests for pipeline integration Pipeline changes (app/glean/pipeline.py): - glean_sources() now splits sources into local-file and SSH categories - SSH sources use transport: ssh + glean: list schema in sources.yaml - _glean_ssh_source(): one SSHTransport per host, N commands per connection - _stream_and_write(): SSHCommandError caught per-item so one bad command does not abort the rest of the host's glean items - SSHConnectionError skips the entire host with a warning log SSH source schema (sources.yaml): - id: rack01 transport: ssh host: 192.168.1.10 user: admin key_path: ~/.ssh/id_ed25519 glean: - type: journald args: [--since, 2 hours ago] - type: syslog path: /var/log/syslog - type: plaintext path: /var/log/app/error.log - type: docker containers: [myapp, nginx] Key design decisions: - Key-based auth only (no password prompts in daemon context) - exit-status check fires after all stdout lines yielded; callers drain the iterator to trigger it - Local file sources path unchanged; SSH sources co-exist in same yaml - Docker multi-container: one exec_stream call per container, source_id scoped as host_id/type/container_name Remaining for #22: REST endpoint, SourcesView UI, sources.yaml docs. 285 → 285 tests passing (33 new SSH tests).Closes turnstone#22. ## Transport layer (app/glean/ssh.py) - SSHTransport context manager: key-only auth, paramiko backend - SSHConnectionError / SSHCommandError exception hierarchy - exec_stream() generator: yields stdout lines, raises SSHCommandError on non-zero exit (isinstance(int) guard for test-mock safety) - Command builders: _build_journald_command, _build_syslog_command, _build_plaintext_command, _build_docker_command - 18 unit tests in tests/test_glean_ssh.py ## Pipeline integration (app/glean/pipeline.py) - _stream_and_write(): per-item error isolation — SSHCommandError skips one glean item without aborting the rest of the host connection - _glean_ssh_source(): one SSHTransport per host, dispatches all glean items (journald/syslog/plaintext/docker); SSHConnectionError aborts host - glean_sources(): splits local vs SSH sources; local → _glean_files(); SSH → _glean_ssh_source(); shared compiled patterns and DB connection - glean_ssh_source(): public wrapper for REST use — manages DB connection, pattern compilation, FTS rebuild lifecycle - 15 integration tests in tests/test_glean_pipeline_ssh.py - All 285 tests passing ## REST layer (app/rest.py) - GET /api/sources/configured: reads sources.yaml and enriches with DB stats; SSH sources appear before first glean (entry_count=0); sub-source IDs (rack01/journald, rack01/docker/myapp) aggregated per host entry - POST /api/sources/{id}/glean: detects transport:ssh and dispatches to glean_ssh_source() wrapper; local sources unchanged - Import: glean_ssh_source as _glean_ssh_source ## Frontend (web/src/views/SourcesView.vue) - Fetches /api/sources/configured (primary) + /api/sources (DB-only) in parallel; merges into unified SourceRow list - SSH sources show: ssh badge (with user@host tooltip), glean-type pills (journald/syslog/docker/etc.), host subtitle - SSH sub-source IDs (rack01/journald) suppressed from the DB-only list since they are covered by the parent SSH row - DB-only sources (uploads) appear below configured sources with 'uploaded' badge; reglean button disabled (not in sources.yaml) - Delete zeroes out configured-source stats in-place rather than removing the row (so the source remains visible for re-gleaning)- Add glean_fingerprints table to schema (sha256 + mtime + size) - _fingerprint(), _fp_unchanged(), _save_fingerprint() helpers in pipeline.py - _glean_files() now checks fingerprint; skips file if hash unchanged - force=True param threads through glean_dir → glean_file → glean_sources - POST /api/tasks/glean and POST /api/sources/{id}/glean accept force=true - 14 unit tests in tests/test_glean_fingerprint.py, all passing Closes: #30- Move app/services/diagnose.py verbatim to app/services/diagnose/legacy.py - Create app/services/diagnose/__init__.py with full implementation so that patch('app.services.diagnose._HAS_DATEPARSER') targets the correct namespace and all 303 existing tests continue to pass without modification - Add app/services/diagnose/models.py with 5 pipeline dataclasses: EventCluster, TimelineResult, ClassifiedTimeline, Hypothesis, RankedHypothesis - Add app/services/diagnose/pipeline.py with run_pipeline() stub (Task 6) - Add MULTI_AGENT_ENABLED feature flag (off by default via env var) - Zero behavior change; ruff clean Closes: #29- Add app/services/diagnose/timeline.py: pure-Python TimelineReconstructor - Sorts entries by timestamp_iso (None entries appended at end) - Sliding-window clustering anchored to first entry in each cluster - Computes cluster_id (sha1[:12]), severity (highest wins), burst flag, gap_before_seconds, representative_text (highest rank, longest text tiebreak) - Builds TimelineResult with dominant_sources sorted by entry count descending - Update pipeline.py stub to import TimelineReconstructor (Task 6 wiring prep) - Add tests/test_diagnose_timeline.py: 15 tests covering all 13 required cases plus null-timestamp edge case variant; all 318 tests passing Closes: #29- Add _coerce_float() module-level helper: catches TypeError/ValueError from non-numeric LLM output (e.g. 'high', 'N/A') and returns a caller-supplied default instead of raising. - Replace float(item.get('confidence', 0.5)) with _coerce_float(item.get('confidence'), 0.5) in _parse_response. - Guard supporting_cluster_ids: tuple(item.get(...) or []) so a JSON null from the LLM does not cause TypeError('NoneType is not iterable'). - runbook_refs is hardcoded as () and not sourced from LLM output; no change needed there. - Add test_non_numeric_confidence_uses_default (Test 10) to cover the 'high' string case: asserts no exception and confidence == 0.5. - 341 tests passing (+1). Closes: #29- Add app/services/diagnose/synthesizer.py: SummarySynthesizer (Stage 5) - Builds structured LLM prompt from ranked hypotheses, timeline, RAG context - Excludes suppressed hypotheses from the narrative prompt - Deterministic fallback when no LLM configured or LLM call fails - Same cf-orch task endpoint + direct OpenAI-compat fallback pattern as other stages - Replace pipeline.py stub with full run_pipeline() async generator - Orchestrates all 5 stages via asyncio.to_thread for each synchronous stage - Yields typed SSE event dicts: status, pipeline_stage (1-4), hypotheses, reasoning, done - Suppressor counts (active vs suppressed) reported in stage 4 event message - Wire MULTI_AGENT_ENABLED feature flag into diagnose_stream() - TURNSTONE_MULTI_AGENT_DIAGNOSE=true routes through run_pipeline() - pipeline emits its own done event; legacy path unchanged when flag is false - Import of run_pipeline added to __init__.py - Add 21 new tests (350 -> 371 passing): - tests/test_diagnose_synthesizer.py: 8 tests (with/without LLM, suppressed, empty ranked, LLM failure fallback) - tests/test_diagnose_pipeline.py: 13 tests (flag off, flag on event sequence, empty entries, no LLM, stage 1 cluster count message) Closes: #29