fix: bypass FTS ranking for named-source error retrieval

When diagnose() auto-detects a source name, FTS keyword scoring can
bury real errors whose text doesn't match the symptom query. Add
recent_source_errors() — a plain-SQL scan ordered by timestamp — so
the most recent errors from a known service always surface regardless
of keyword overlap.
This commit is contained in:
pyr0ball 2026-05-10 08:14:23 -07:00
parent 58b179b275
commit 5a8dc731b8
2 changed files with 79 additions and 12 deletions

View file

@ -25,7 +25,12 @@ from app.services.incidents import (
get_incident_entries, get_incident_entries,
list_incidents, list_incidents,
) )
from app.services.search import search as _search, list_sources as _list_sources, format_results from app.services.search import (
search as _search,
list_sources as _list_sources,
recent_source_errors as _source_errors,
format_results,
)
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db")) DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
DIST_DIR = Path(__file__).parent.parent / "web" / "dist" DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
@ -120,21 +125,19 @@ def diagnose(
critical = _search(DB_PATH, query=q, severity="CRITICAL", limit=5, **common) critical = _search(DB_PATH, query=q, severity="CRITICAL", limit=5, **common)
errors = _search(DB_PATH, query=q, severity="ERROR", limit=10, **common) errors = _search(DB_PATH, query=q, severity="ERROR", limit=10, **common)
# When a source was auto-detected, also pull its most recent errors unconstrained # When a source was auto-detected, also pull its most recent errors via plain SQL
# the user named a service, so show what's actually broken there even if their # FTS ranking can bury real errors from the named service if their text doesn't
# symptom keywords don't appear literally in the error text. # match the symptom keywords. Plain-SQL scan returns actual recent errors regardless.
source_errors: list = [] source_errors: list = []
if detected_source and not source and not errors: if detected_source and not source and not errors:
source_errors = _search( source_errors = _source_errors(
DB_PATH, query="error warning fail", severity="ERROR", DB_PATH, source_filter=detected_source, severity="ERROR",
limit=10, or_mode=True, limit=10, since=since, until=until,
source_filter=detected_source, since=since, until=until, include_repeats=False,
) )
if not source_errors: if not source_errors:
source_errors = _search( source_errors = _source_errors(
DB_PATH, query="error warning fail", severity="CRITICAL", DB_PATH, source_filter=detected_source, severity="CRITICAL",
limit=5, or_mode=True, limit=5, since=since, until=until,
source_filter=detected_source, since=since, until=until, include_repeats=False,
) )
seen: set[str] = set() seen: set[str] = set()

View file

@ -225,6 +225,70 @@ def entries_in_window(
] ]
def recent_source_errors(
db_path: Path,
source_filter: str,
severity: str = "ERROR",
limit: int = 10,
since: str | None = None,
until: str | None = None,
) -> list[SearchResult]:
"""Plain-SQL scan: most recent error entries from a named source.
Bypasses FTS ranking so text content doesn't affect which errors surface.
Used by diagnose when FTS keyword search returns nothing for a known source.
"""
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
conditions = [
"source_id LIKE ?",
"severity = ?",
"repeat_count = 1",
]
params: list = [f"%{source_filter}%", severity.upper()]
if since:
conditions.append("timestamp_iso >= ?")
params.append(since)
if until:
conditions.append("timestamp_iso <= ?")
params.append(until)
params.append(limit)
where = " AND ".join(conditions)
rows = conn.execute(
f"""
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank
FROM log_entries
WHERE {where}
ORDER BY timestamp_iso DESC
LIMIT ?
""",
params,
).fetchall()
conn.close()
return [
SearchResult(
entry_id=r["entry_id"],
source_id=r["source_id"],
sequence=r["sequence"],
timestamp_iso=r["timestamp_iso"],
severity=r["severity"],
repeat_count=r["repeat_count"],
out_of_order=bool(r["out_of_order"]),
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
text=r["text"],
rank=r["rank"],
)
for r in rows
]
def list_sources(db_path: Path) -> list[dict]: def list_sources(db_path: Path) -> list[dict]:
"""Return distinct sources with entry counts and time ranges.""" """Return distinct sources with entry counts and time ranges."""
conn = sqlite3.connect(str(db_path)) conn = sqlite3.connect(str(db_path))