fix: frozen dataclasses, clean __all__, improve exception logging in diagnose package
This commit is contained in:
parent
d833febe77
commit
7ee87a2982
4 changed files with 169 additions and 59 deletions
|
|
@ -7,6 +7,7 @@ namespace, preserving backward compatibility with existing tests.
|
|||
|
||||
The verbatim original is preserved in legacy.py for reference.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
|
@ -27,6 +28,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
try:
|
||||
from dateparser.search import search_dates as _search_dates # type: ignore[import]
|
||||
|
||||
_HAS_DATEPARSER = True
|
||||
except ImportError:
|
||||
_search_dates = None # type: ignore[assignment]
|
||||
|
|
@ -77,17 +79,27 @@ def parse_time_window(query: str) -> tuple[str | None, str | None, str]:
|
|||
results = _search_dates(
|
||||
query,
|
||||
languages=["en"],
|
||||
settings={"PREFER_DATES_FROM": "past", "TIMEZONE": tz_str, "RETURN_AS_TIMEZONE_AWARE": True},
|
||||
settings={
|
||||
"PREFER_DATES_FROM": "past",
|
||||
"TIMEZONE": tz_str,
|
||||
"RETURN_AS_TIMEZONE_AWARE": True,
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"dateparser failed (%s) on query %r — falling back to 60-min window",
|
||||
type(e).__name__,
|
||||
query,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning("dateparser failed on query %r — falling back to 60-min window", query)
|
||||
results = None
|
||||
if results:
|
||||
phrase, dt = results[0]
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
else:
|
||||
dt = dt.astimezone(timezone.utc) # normalise to UTC for SQLite string compare
|
||||
dt = dt.astimezone(
|
||||
timezone.utc
|
||||
) # normalise to UTC for SQLite string compare
|
||||
since = (dt - timedelta(minutes=30)).isoformat()
|
||||
until = (dt + timedelta(minutes=30)).isoformat()
|
||||
keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip())
|
||||
|
|
@ -116,8 +128,23 @@ def diagnose(
|
|||
else:
|
||||
keywords = query
|
||||
|
||||
keyword_hits = search(db_path, query=keywords, since=since, until=until, source_filter=source_filter, limit=150, or_mode=True)
|
||||
window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50, per_source_cap=15)
|
||||
keyword_hits = search(
|
||||
db_path,
|
||||
query=keywords,
|
||||
since=since,
|
||||
until=until,
|
||||
source_filter=source_filter,
|
||||
limit=150,
|
||||
or_mode=True,
|
||||
)
|
||||
window_hits = entries_in_window(
|
||||
db_path,
|
||||
since=since,
|
||||
until=until,
|
||||
source_filter=source_filter,
|
||||
limit=50,
|
||||
per_source_cap=15,
|
||||
)
|
||||
|
||||
seen: set[str] = set()
|
||||
merged: list[SearchResult] = []
|
||||
|
|
@ -126,7 +153,9 @@ def diagnose(
|
|||
seen.add(r.entry_id)
|
||||
merged.append(r)
|
||||
|
||||
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200]
|
||||
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[
|
||||
:200
|
||||
]
|
||||
|
||||
by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0}
|
||||
by_source: dict[str, int] = {}
|
||||
|
|
@ -138,7 +167,9 @@ def diagnose(
|
|||
|
||||
reasoning: str | None = None
|
||||
if llm_url and llm_model:
|
||||
reasoning = summarize(query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key)
|
||||
reasoning = summarize(
|
||||
query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key
|
||||
)
|
||||
|
||||
return {
|
||||
"summary": {
|
||||
|
|
@ -186,7 +217,9 @@ async def diagnose_stream(
|
|||
yield {"type": "status", "message": "Parsing time window…"}
|
||||
time_detected = since is not None and until is not None
|
||||
if not time_detected:
|
||||
parsed_since, parsed_until, keywords = await asyncio.to_thread(parse_time_window, query)
|
||||
parsed_since, parsed_until, keywords = await asyncio.to_thread(
|
||||
parse_time_window, query
|
||||
)
|
||||
since = since or parsed_since
|
||||
until = until or parsed_until
|
||||
time_detected = keywords != query
|
||||
|
|
@ -206,23 +239,34 @@ async def diagnose_stream(
|
|||
keyword_hits: list[SearchResult] = []
|
||||
window_hits = await asyncio.to_thread(
|
||||
lambda: entries_in_window(
|
||||
db_path, since, until,
|
||||
source_filter=source_filter, limit=200,
|
||||
db_path,
|
||||
since,
|
||||
until,
|
||||
source_filter=source_filter,
|
||||
limit=200,
|
||||
)
|
||||
)
|
||||
else:
|
||||
keyword_hits, window_hits = await asyncio.gather(
|
||||
asyncio.to_thread(
|
||||
lambda: search(
|
||||
db_path, keywords,
|
||||
source_filter=source_filter, since=since, until=until,
|
||||
limit=150, or_mode=True,
|
||||
db_path,
|
||||
keywords,
|
||||
source_filter=source_filter,
|
||||
since=since,
|
||||
until=until,
|
||||
limit=150,
|
||||
or_mode=True,
|
||||
)
|
||||
),
|
||||
asyncio.to_thread(
|
||||
lambda: entries_in_window(
|
||||
db_path, since, until,
|
||||
source_filter=source_filter, limit=50, per_source_cap=15,
|
||||
db_path,
|
||||
since,
|
||||
until,
|
||||
source_filter=source_filter,
|
||||
limit=50,
|
||||
per_source_cap=15,
|
||||
)
|
||||
),
|
||||
)
|
||||
|
|
@ -234,7 +278,9 @@ async def diagnose_stream(
|
|||
seen.add(r.entry_id)
|
||||
merged.append(r)
|
||||
|
||||
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200]
|
||||
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[
|
||||
:200
|
||||
]
|
||||
|
||||
by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0}
|
||||
by_source: dict[str, int] = {}
|
||||
|
|
@ -260,7 +306,14 @@ async def diagnose_stream(
|
|||
if llm_url and llm_model and combined:
|
||||
yield {"type": "status", "message": "Analyzing with LLM…"}
|
||||
reasoning = await asyncio.to_thread(
|
||||
lambda: summarize(query, combined, llm_url, llm_model, llm_api_key, context_block=context_block)
|
||||
lambda: summarize(
|
||||
query,
|
||||
combined,
|
||||
llm_url,
|
||||
llm_model,
|
||||
llm_api_key,
|
||||
context_block=context_block,
|
||||
)
|
||||
)
|
||||
if reasoning:
|
||||
yield {"type": "reasoning", "text": reasoning}
|
||||
|
|
@ -280,15 +333,9 @@ __all__ = [
|
|||
"diagnose",
|
||||
"diagnose_stream",
|
||||
"parse_time_window",
|
||||
"_now_iso",
|
||||
"_last_n_minutes",
|
||||
"_HAS_DATEPARSER",
|
||||
"_search_dates",
|
||||
"_RELATIVE_RE",
|
||||
"_RELATIVE_UNITS",
|
||||
"_APPROX_N",
|
||||
"_relative_window",
|
||||
]
|
||||
|
||||
# Feature flag for Task 6
|
||||
MULTI_AGENT_ENABLED = os.getenv("TURNSTONE_MULTI_AGENT_DIAGNOSE", "false").lower() == "true"
|
||||
MULTI_AGENT_ENABLED = (
|
||||
os.getenv("TURNSTONE_MULTI_AGENT_DIAGNOSE", "false").lower() == "true"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
"""Frictionless diagnose service — NL time extraction + layered log search."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
|
@ -18,6 +19,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
try:
|
||||
from dateparser.search import search_dates as _search_dates # type: ignore[import]
|
||||
|
||||
_HAS_DATEPARSER = True
|
||||
except ImportError:
|
||||
_search_dates = None # type: ignore[assignment]
|
||||
|
|
@ -68,17 +70,25 @@ def parse_time_window(query: str) -> tuple[str | None, str | None, str]:
|
|||
results = _search_dates(
|
||||
query,
|
||||
languages=["en"],
|
||||
settings={"PREFER_DATES_FROM": "past", "TIMEZONE": tz_str, "RETURN_AS_TIMEZONE_AWARE": True},
|
||||
settings={
|
||||
"PREFER_DATES_FROM": "past",
|
||||
"TIMEZONE": tz_str,
|
||||
"RETURN_AS_TIMEZONE_AWARE": True,
|
||||
},
|
||||
)
|
||||
except Exception:
|
||||
logger.warning("dateparser failed on query %r — falling back to 60-min window", query)
|
||||
logger.warning(
|
||||
"dateparser failed on query %r — falling back to 60-min window", query
|
||||
)
|
||||
results = None
|
||||
if results:
|
||||
phrase, dt = results[0]
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
else:
|
||||
dt = dt.astimezone(timezone.utc) # normalise to UTC for SQLite string compare
|
||||
dt = dt.astimezone(
|
||||
timezone.utc
|
||||
) # normalise to UTC for SQLite string compare
|
||||
since = (dt - timedelta(minutes=30)).isoformat()
|
||||
until = (dt + timedelta(minutes=30)).isoformat()
|
||||
keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip())
|
||||
|
|
@ -107,8 +117,23 @@ def diagnose(
|
|||
else:
|
||||
keywords = query
|
||||
|
||||
keyword_hits = search(db_path, query=keywords, since=since, until=until, source_filter=source_filter, limit=150, or_mode=True)
|
||||
window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50, per_source_cap=15)
|
||||
keyword_hits = search(
|
||||
db_path,
|
||||
query=keywords,
|
||||
since=since,
|
||||
until=until,
|
||||
source_filter=source_filter,
|
||||
limit=150,
|
||||
or_mode=True,
|
||||
)
|
||||
window_hits = entries_in_window(
|
||||
db_path,
|
||||
since=since,
|
||||
until=until,
|
||||
source_filter=source_filter,
|
||||
limit=50,
|
||||
per_source_cap=15,
|
||||
)
|
||||
|
||||
seen: set[str] = set()
|
||||
merged: list[SearchResult] = []
|
||||
|
|
@ -117,7 +142,9 @@ def diagnose(
|
|||
seen.add(r.entry_id)
|
||||
merged.append(r)
|
||||
|
||||
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200]
|
||||
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[
|
||||
:200
|
||||
]
|
||||
|
||||
by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0}
|
||||
by_source: dict[str, int] = {}
|
||||
|
|
@ -129,7 +156,9 @@ def diagnose(
|
|||
|
||||
reasoning: str | None = None
|
||||
if llm_url and llm_model:
|
||||
reasoning = summarize(query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key)
|
||||
reasoning = summarize(
|
||||
query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key
|
||||
)
|
||||
|
||||
return {
|
||||
"summary": {
|
||||
|
|
@ -177,7 +206,9 @@ async def diagnose_stream(
|
|||
yield {"type": "status", "message": "Parsing time window…"}
|
||||
time_detected = since is not None and until is not None
|
||||
if not time_detected:
|
||||
parsed_since, parsed_until, keywords = await asyncio.to_thread(parse_time_window, query)
|
||||
parsed_since, parsed_until, keywords = await asyncio.to_thread(
|
||||
parse_time_window, query
|
||||
)
|
||||
since = since or parsed_since
|
||||
until = until or parsed_until
|
||||
time_detected = keywords != query
|
||||
|
|
@ -197,23 +228,34 @@ async def diagnose_stream(
|
|||
keyword_hits: list[SearchResult] = []
|
||||
window_hits = await asyncio.to_thread(
|
||||
lambda: entries_in_window(
|
||||
db_path, since, until,
|
||||
source_filter=source_filter, limit=200,
|
||||
db_path,
|
||||
since,
|
||||
until,
|
||||
source_filter=source_filter,
|
||||
limit=200,
|
||||
)
|
||||
)
|
||||
else:
|
||||
keyword_hits, window_hits = await asyncio.gather(
|
||||
asyncio.to_thread(
|
||||
lambda: search(
|
||||
db_path, keywords,
|
||||
source_filter=source_filter, since=since, until=until,
|
||||
limit=150, or_mode=True,
|
||||
db_path,
|
||||
keywords,
|
||||
source_filter=source_filter,
|
||||
since=since,
|
||||
until=until,
|
||||
limit=150,
|
||||
or_mode=True,
|
||||
)
|
||||
),
|
||||
asyncio.to_thread(
|
||||
lambda: entries_in_window(
|
||||
db_path, since, until,
|
||||
source_filter=source_filter, limit=50, per_source_cap=15,
|
||||
db_path,
|
||||
since,
|
||||
until,
|
||||
source_filter=source_filter,
|
||||
limit=50,
|
||||
per_source_cap=15,
|
||||
)
|
||||
),
|
||||
)
|
||||
|
|
@ -225,7 +267,9 @@ async def diagnose_stream(
|
|||
seen.add(r.entry_id)
|
||||
merged.append(r)
|
||||
|
||||
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200]
|
||||
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[
|
||||
:200
|
||||
]
|
||||
|
||||
by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0}
|
||||
by_source: dict[str, int] = {}
|
||||
|
|
@ -251,7 +295,14 @@ async def diagnose_stream(
|
|||
if llm_url and llm_model and combined:
|
||||
yield {"type": "status", "message": "Analyzing with LLM…"}
|
||||
reasoning = await asyncio.to_thread(
|
||||
lambda: summarize(query, combined, llm_url, llm_model, llm_api_key, context_block=context_block)
|
||||
lambda: summarize(
|
||||
query,
|
||||
combined,
|
||||
llm_url,
|
||||
llm_model,
|
||||
llm_api_key,
|
||||
context_block=context_block,
|
||||
)
|
||||
)
|
||||
if reasoning:
|
||||
yield {"type": "reasoning", "text": reasoning}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
"""Pipeline data types for the multi-agent diagnose pipeline."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
|
@ -7,53 +8,63 @@ from typing import Literal
|
|||
SeverityLabel = Literal["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "UNKNOWN"]
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(frozen=True)
|
||||
class EventCluster:
|
||||
"""A time-correlated group of log entries within the timeline."""
|
||||
|
||||
cluster_id: str
|
||||
entries: list[str] # entry_id refs
|
||||
entries: tuple[str, ...] # entry_id refs
|
||||
start_iso: str | None
|
||||
end_iso: str | None
|
||||
duration_seconds: float
|
||||
source_ids: list[str]
|
||||
pattern_tags: list[str]
|
||||
severity: SeverityLabel # highest severity from raw text
|
||||
source_ids: tuple[str, ...]
|
||||
pattern_tags: tuple[str, ...]
|
||||
severity: SeverityLabel
|
||||
burst: bool
|
||||
gap_before_seconds: float
|
||||
representative_text: str
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(frozen=True)
|
||||
class TimelineResult:
|
||||
clusters: list[EventCluster]
|
||||
"""Structured timeline of event clusters built from log entries."""
|
||||
|
||||
clusters: tuple[EventCluster, ...]
|
||||
total_entries: int
|
||||
window_start: str | None
|
||||
window_end: str | None
|
||||
gap_count: int
|
||||
burst_count: int
|
||||
dominant_sources: list[str]
|
||||
dominant_sources: tuple[str, ...]
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(frozen=True)
|
||||
class ClassifiedTimeline:
|
||||
"""Timeline annotated with ML-assigned severity per cluster."""
|
||||
|
||||
timeline: TimelineResult
|
||||
cluster_severities: dict[str, SeverityLabel]
|
||||
classifier_used: Literal["ml", "pattern_tags", "regex"]
|
||||
model_id: str | None
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(frozen=True)
|
||||
class Hypothesis:
|
||||
"""A root-cause hypothesis generated by Stage 3."""
|
||||
|
||||
hypothesis_id: str
|
||||
title: str
|
||||
description: str
|
||||
confidence: float
|
||||
supporting_cluster_ids: list[str]
|
||||
runbook_refs: list[str]
|
||||
supporting_cluster_ids: tuple[str, ...]
|
||||
runbook_refs: tuple[str, ...]
|
||||
severity: SeverityLabel
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(frozen=True)
|
||||
class RankedHypothesis:
|
||||
"""A hypothesis enriched by Stage 4 false-positive suppression."""
|
||||
|
||||
hypothesis: Hypothesis
|
||||
novelty_score: float
|
||||
similarity_to_known: float
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
"""Multi-agent diagnose pipeline orchestrator — stub (Task 1)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
|
|
|||
Loading…
Reference in a new issue