feat(diagnose): 5-stage multi-agent diagnose pipeline (#29) #39
3 changed files with 251 additions and 20 deletions
|
|
@ -391,6 +391,39 @@ def _glean_ssh_source(
|
|||
return stats
|
||||
|
||||
|
||||
def glean_ssh_source(
|
||||
src: dict, # type: ignore[type-arg]
|
||||
db_path: Path,
|
||||
pattern_file: Path | None = None,
|
||||
batch_size: int = 1000,
|
||||
) -> dict[str, int]:
|
||||
"""Glean a single SSH source dict and write results to *db_path*.
|
||||
|
||||
Public wrapper around :func:`_glean_ssh_source` for the REST layer.
|
||||
Manages the DB connection, pattern compilation, and FTS rebuild so callers
|
||||
don't have to deal with those lifecycle concerns.
|
||||
|
||||
Returns stats mapping ``{sub_source_id: entry_count}``.
|
||||
"""
|
||||
effective_pattern_file = pattern_file or Path("patterns/default.yaml")
|
||||
compiled = _compile(load_patterns(effective_pattern_file))
|
||||
ingest_time = now_iso()
|
||||
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.executescript(_SCHEMA)
|
||||
conn.commit()
|
||||
|
||||
try:
|
||||
stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
logger.info("Rebuilding FTS index after SSH source glean...")
|
||||
build_fts_index(db_path)
|
||||
return stats
|
||||
|
||||
|
||||
def glean_dir(
|
||||
corpus_dir: Path,
|
||||
db_path: Path,
|
||||
|
|
|
|||
87
app/rest.py
87
app/rest.py
|
|
@ -27,7 +27,7 @@ from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse
|
|||
from fastapi.staticfiles import StaticFiles
|
||||
from pydantic import BaseModel
|
||||
|
||||
from app.glean.pipeline import ensure_schema, glean_file as _glean_file
|
||||
from app.glean.pipeline import ensure_schema, glean_file as _glean_file, glean_ssh_source as _glean_ssh_source
|
||||
from app.glean.base import load_compiled_patterns, now_iso
|
||||
from app.glean.tautulli import parse_webhook as _parse_tautulli
|
||||
from app.glean.wazuh import is_wazuh_alert as _is_wazuh_alert, parse as _parse_wazuh
|
||||
|
|
@ -433,6 +433,72 @@ def list_sources() -> dict:
|
|||
return {"sources": _list_sources(DB_PATH)}
|
||||
|
||||
|
||||
@router.get("/api/sources/configured")
|
||||
def list_configured_sources() -> dict:
|
||||
"""Return every source in sources.yaml, enriched with DB stats.
|
||||
|
||||
Unlike ``/api/sources`` (which is DB-only), this endpoint reads sources.yaml
|
||||
so SSH sources appear even before their first successful glean. DB entry
|
||||
counts, error counts, and timestamps are aggregated and merged in.
|
||||
|
||||
For SSH sources, sub-source IDs (e.g. ``rack01/journald``) are summed to
|
||||
produce a single aggregate stat row for the top-level host entry.
|
||||
"""
|
||||
sources_file = PATTERN_DIR / "sources.yaml"
|
||||
if not sources_file.exists():
|
||||
return {"sources": []}
|
||||
|
||||
with open(sources_file) as f:
|
||||
config = yaml.safe_load(f) or {}
|
||||
|
||||
# Fetch all DB source stats once; key by source_id for O(1) lookup.
|
||||
db_stats: dict[str, dict] = {}
|
||||
try:
|
||||
for row in _list_sources(DB_PATH):
|
||||
db_stats[row["source_id"]] = row
|
||||
except Exception:
|
||||
pass # DB may not exist on first run
|
||||
|
||||
result = []
|
||||
for src in config.get("sources", []):
|
||||
transport = src.get("transport", "local")
|
||||
src_id = src.get("id", "")
|
||||
|
||||
entry: dict = {"id": src_id, "transport": transport}
|
||||
|
||||
if transport != "ssh":
|
||||
entry["path"] = src.get("path", "")
|
||||
db = db_stats.get(src_id, {})
|
||||
entry["entry_count"] = db.get("entry_count", 0)
|
||||
entry["error_count"] = db.get("error_count", 0)
|
||||
entry["earliest"] = db.get("earliest")
|
||||
entry["latest"] = db.get("latest")
|
||||
else:
|
||||
entry["host"] = src.get("host", "")
|
||||
entry["user"] = src.get("user", "")
|
||||
glean_items: list[dict] = src.get("glean", [])
|
||||
entry["glean_types"] = sorted({item.get("type", "plaintext") for item in glean_items})
|
||||
entry["glean_items"] = glean_items
|
||||
|
||||
# Aggregate sub-source DB rows that belong to this SSH host.
|
||||
# Sub-sources use IDs like "{host_id}/{type}" or "{host_id}/{type}/{container}".
|
||||
prefix = src_id + "/"
|
||||
matching_rows = [
|
||||
v for k, v in db_stats.items()
|
||||
if k.startswith(prefix) or k == src_id
|
||||
]
|
||||
entry["entry_count"] = sum(r.get("entry_count", 0) for r in matching_rows)
|
||||
entry["error_count"] = sum(r.get("error_count", 0) for r in matching_rows)
|
||||
earliests = [r["earliest"] for r in matching_rows if r.get("earliest")]
|
||||
latests = [r["latest"] for r in matching_rows if r.get("latest")]
|
||||
entry["earliest"] = min(earliests) if earliests else None
|
||||
entry["latest"] = max(latests) if latests else None
|
||||
|
||||
result.append(entry)
|
||||
|
||||
return {"sources": result}
|
||||
|
||||
|
||||
@router.delete("/api/sources/{source_id}")
|
||||
def delete_source(source_id: str) -> dict:
|
||||
"""Delete all log entries (and FTS index rows) for a given source."""
|
||||
|
|
@ -450,7 +516,13 @@ def delete_source(source_id: str) -> dict:
|
|||
|
||||
@router.post("/api/sources/{source_id}/glean")
|
||||
def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict:
|
||||
"""Trigger a re-glean for a configured source from sources.yaml."""
|
||||
"""Trigger a re-glean for a configured source from sources.yaml.
|
||||
|
||||
Handles both local file sources and SSH remote sources. For SSH sources,
|
||||
the glean runs in the foreground and rebuilds the FTS index before returning
|
||||
(same behaviour as local sources — callers can rely on the count being final
|
||||
when the response arrives).
|
||||
"""
|
||||
sources_file = PATTERN_DIR / "sources.yaml"
|
||||
if not sources_file.exists():
|
||||
raise HTTPException(status_code=404, detail="sources.yaml not found")
|
||||
|
|
@ -459,7 +531,16 @@ def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict:
|
|||
matching = [s for s in config.get("sources", []) if s.get("id") == source_id]
|
||||
if not matching:
|
||||
raise HTTPException(status_code=404, detail=f"Source {source_id!r} not in sources.yaml")
|
||||
src_path = Path(matching[0]["path"])
|
||||
|
||||
src = matching[0]
|
||||
|
||||
if src.get("transport") == "ssh":
|
||||
# SSH sources: open connection, glean all items, rebuild FTS inline.
|
||||
stats = _glean_ssh_source(src, DB_PATH, PATTERN_FILE)
|
||||
return {"source_id": source_id, "gleaned": sum(stats.values())}
|
||||
|
||||
# Local file source.
|
||||
src_path = Path(src["path"])
|
||||
if not src_path.exists():
|
||||
raise HTTPException(status_code=422, detail=f"Path does not exist: {src_path}")
|
||||
stats = _glean_file(src_path, DB_PATH, PATTERN_FILE)
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@
|
|||
|
||||
<div v-else class="rounded border border-surface-border overflow-hidden">
|
||||
<div class="overflow-x-auto">
|
||||
<table class="w-full text-sm min-w-[560px]">
|
||||
<table class="w-full text-sm min-w-[620px]">
|
||||
<thead class="bg-surface-raised border-b border-surface-border">
|
||||
<tr>
|
||||
<th class="text-left px-4 py-2.5 text-text-dim font-medium text-xs uppercase tracking-wider">Source</th>
|
||||
|
|
@ -40,29 +40,72 @@
|
|||
<tbody>
|
||||
<tr
|
||||
v-for="src in sources"
|
||||
:key="src.source_id"
|
||||
:key="src.id"
|
||||
class="border-b border-surface-border hover:bg-surface-raised transition-colors"
|
||||
>
|
||||
<td class="px-4 py-2.5 text-accent">{{ src.source_id }}</td>
|
||||
<td class="px-4 py-2.5 text-text-muted text-right tabular-nums">{{ src.entry_count.toLocaleString() }}</td>
|
||||
<!-- Source name + badges -->
|
||||
<td class="px-4 py-2.5">
|
||||
<div class="flex flex-wrap items-center gap-1.5">
|
||||
<span class="text-accent font-mono text-xs">{{ src.id }}</span>
|
||||
<!-- SSH transport badge -->
|
||||
<span
|
||||
v-if="src.transport === 'ssh'"
|
||||
class="inline-flex items-center gap-1 px-1.5 py-0.5 rounded text-[10px] font-medium
|
||||
bg-blue-900/30 text-blue-400 border border-blue-800/40"
|
||||
:title="`SSH: ${src.user}@${src.host}`"
|
||||
>
|
||||
<svg class="w-2.5 h-2.5" viewBox="0 0 16 16" fill="currentColor" aria-hidden="true">
|
||||
<path d="M2 3a1 1 0 011-1h10a1 1 0 011 1v2a1 1 0 01-1 1H3a1 1 0 01-1-1V3zm0 5a1 1 0 011-1h4a1 1 0 110 2H3a1 1 0 01-1-1zm0 4a1 1 0 011-1h2a1 1 0 110 2H3a1 1 0 01-1-1z"/>
|
||||
</svg>
|
||||
ssh
|
||||
</span>
|
||||
<!-- Glean-type pills for SSH sources -->
|
||||
<span
|
||||
v-for="gtype in (src.glean_types ?? [])"
|
||||
:key="gtype"
|
||||
class="px-1.5 py-0.5 rounded text-[10px] font-medium
|
||||
bg-surface-raised text-text-dim border border-surface-border"
|
||||
>{{ gtype }}</span>
|
||||
<!-- Upload badge for DB-only sources not in sources.yaml -->
|
||||
<span
|
||||
v-if="src.dbOnly"
|
||||
class="px-1.5 py-0.5 rounded text-[10px] font-medium
|
||||
bg-surface-raised text-text-dim border border-surface-border"
|
||||
>uploaded</span>
|
||||
</div>
|
||||
<!-- SSH host subtitle -->
|
||||
<div v-if="src.transport === 'ssh'" class="text-text-dim text-xs mt-0.5 font-mono">
|
||||
{{ src.user }}@{{ src.host }}
|
||||
</div>
|
||||
</td>
|
||||
|
||||
<!-- Entry count -->
|
||||
<td class="px-4 py-2.5 text-text-muted text-right tabular-nums">
|
||||
{{ src.entry_count.toLocaleString() }}
|
||||
</td>
|
||||
|
||||
<!-- Error count -->
|
||||
<td class="px-4 py-2.5 text-right tabular-nums">
|
||||
<span :class="src.error_count > 0 ? 'text-sev-error' : 'text-text-dim'">
|
||||
{{ src.error_count.toLocaleString() }}
|
||||
</span>
|
||||
</td>
|
||||
|
||||
<td class="px-4 py-2.5 text-text-dim text-xs">{{ formatTs(src.earliest) }}</td>
|
||||
<td class="px-4 py-2.5 text-text-dim text-xs">{{ formatTs(src.latest) }}</td>
|
||||
|
||||
<!-- Actions -->
|
||||
<td class="px-4 py-2.5">
|
||||
<div class="flex items-center justify-end gap-2">
|
||||
<button
|
||||
:disabled="busy.has(src.source_id)"
|
||||
@click="reglean(src.source_id)"
|
||||
:disabled="busy.has(src.id) || src.dbOnly"
|
||||
@click="reglean(src.id)"
|
||||
class="text-text-dim hover:text-accent transition-colors text-xs px-2 py-1 rounded hover:bg-surface disabled:opacity-40"
|
||||
title="Re-glean from sources.yaml"
|
||||
>{{ busy.has(src.source_id) ? '…' : 'reglean' }}</button>
|
||||
:title="src.dbOnly ? 'Not in sources.yaml — cannot re-glean' : 'Re-glean from sources.yaml'"
|
||||
>{{ busy.has(src.id) ? '…' : 'reglean' }}</button>
|
||||
<button
|
||||
:disabled="busy.has(src.source_id)"
|
||||
@click="deleteSource(src.source_id)"
|
||||
:disabled="busy.has(src.id)"
|
||||
@click="deleteSource(src.id)"
|
||||
class="text-text-dim hover:text-sev-error transition-colors text-xs px-2 py-1 rounded hover:bg-surface disabled:opacity-40"
|
||||
title="Delete all entries for this source"
|
||||
>delete</button>
|
||||
|
|
@ -78,9 +121,36 @@
|
|||
|
||||
<script setup lang="ts">
|
||||
import { ref, onMounted } from 'vue'
|
||||
import type { LogSource } from '@/stores/search'
|
||||
|
||||
const sources = ref<LogSource[]>([])
|
||||
// Unified source row shown in the table (merges configured + DB-only sources).
|
||||
interface SourceRow {
|
||||
id: string
|
||||
transport: 'local' | 'ssh'
|
||||
// SSH-specific
|
||||
host?: string
|
||||
user?: string
|
||||
glean_types?: string[]
|
||||
// Local-specific
|
||||
path?: string
|
||||
// DB stats (always present, default 0/null)
|
||||
entry_count: number
|
||||
error_count: number
|
||||
earliest: string | null
|
||||
latest: string | null
|
||||
// True when this source exists in the DB but not in sources.yaml (e.g. uploads)
|
||||
dbOnly?: boolean
|
||||
}
|
||||
|
||||
interface ConfiguredSource extends Omit<SourceRow, 'dbOnly'> {}
|
||||
interface DbSource {
|
||||
source_id: string
|
||||
entry_count: number
|
||||
error_count: number
|
||||
earliest: string | null
|
||||
latest: string | null
|
||||
}
|
||||
|
||||
const sources = ref<SourceRow[]>([])
|
||||
const loading = ref(true)
|
||||
const busy = ref(new Set<string>())
|
||||
const actionMsg = ref('')
|
||||
|
|
@ -90,11 +160,52 @@ const BASE = import.meta.env.BASE_URL.replace(/\/$/, '')
|
|||
|
||||
async function loadSources(): Promise<void> {
|
||||
try {
|
||||
const res = await fetch(`${BASE}/api/sources`)
|
||||
if (res.ok) {
|
||||
const data = await res.json()
|
||||
sources.value = data.sources
|
||||
// Primary list: configured sources from sources.yaml (enriched with DB stats).
|
||||
// This makes SSH sources visible even before their first glean.
|
||||
const [configuredRes, dbRes] = await Promise.all([
|
||||
fetch(`${BASE}/api/sources/configured`),
|
||||
fetch(`${BASE}/api/sources`),
|
||||
])
|
||||
|
||||
const configuredData = configuredRes.ok ? await configuredRes.json() : { sources: [] }
|
||||
const dbData = dbRes.ok ? await dbRes.json() : { sources: [] }
|
||||
|
||||
const configuredSources: ConfiguredSource[] = configuredData.sources ?? []
|
||||
const dbSources: DbSource[] = dbData.sources ?? []
|
||||
|
||||
// Build a set of all IDs represented by configured sources.
|
||||
// SSH sources own all sub-source IDs like "rack01/journald" too.
|
||||
const coveredIds = new Set<string>()
|
||||
for (const s of configuredSources) {
|
||||
coveredIds.add(s.id)
|
||||
}
|
||||
|
||||
// For SSH sources, also mark sub-source IDs (rack01/…) as covered so they
|
||||
// don't appear as separate "uploaded" rows.
|
||||
for (const s of configuredSources) {
|
||||
if (s.transport === 'ssh') {
|
||||
for (const db of dbSources) {
|
||||
if (db.source_id.startsWith(s.id + '/') || db.source_id === s.id) {
|
||||
coveredIds.add(db.source_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DB-only sources: uploaded files or manually gleaned sources not in sources.yaml.
|
||||
const dbOnly: SourceRow[] = dbSources
|
||||
.filter(db => !coveredIds.has(db.source_id))
|
||||
.map(db => ({
|
||||
id: db.source_id,
|
||||
transport: 'local' as const,
|
||||
entry_count: db.entry_count,
|
||||
error_count: db.error_count,
|
||||
earliest: db.earliest,
|
||||
latest: db.latest,
|
||||
dbOnly: true,
|
||||
}))
|
||||
|
||||
sources.value = [...configuredSources as SourceRow[], ...dbOnly]
|
||||
} finally {
|
||||
loading.value = false
|
||||
}
|
||||
|
|
@ -118,7 +229,13 @@ async function deleteSource(sourceId: string): Promise<void> {
|
|||
const data = await res.json()
|
||||
actionMsg.value = `Deleted ${data.deleted.toLocaleString()} entries for "${sourceId}"`
|
||||
actionError.value = false
|
||||
sources.value = sources.value.filter(s => s.source_id !== sourceId)
|
||||
// Remove DB-only rows; zero-out configured-source stats instead of hiding.
|
||||
sources.value = sources.value
|
||||
.filter(s => !(s.id === sourceId && s.dbOnly))
|
||||
.map(s => s.id === sourceId
|
||||
? { ...s, entry_count: 0, error_count: 0, earliest: null, latest: null }
|
||||
: s
|
||||
)
|
||||
} else {
|
||||
const data = await res.json()
|
||||
actionMsg.value = data.detail ?? 'Delete failed'
|
||||
|
|
|
|||
Loading…
Reference in a new issue