fix: group journal sources by prefix:host stem in source health

source_ids with 3+ colon segments (e.g. muninn-journal:Muninn:ssh.service)
are now aggregated by their prefix:host key at the SQL level in both
list_sources() and stats_summary(). This collapses ~19K transient systemd
unit rows (crash-loop scope entries from Muninn) into ~24 grouped rows.

- list_sources: SQL CASE/INSTR group-by stem + unit_count field
- stats_summary: same stem grouping for dashboard source health table
- delete endpoint: LIKE-based cascade delete covers grouped stems
- SourcesView: unit_count badge (e.g. "2686 units") on grouped rows;
  delete confirmation names the unit count when deleting a group
- Bump version to v0.6.1
This commit is contained in:
pyr0ball 2026-06-02 04:35:26 -07:00
parent 354513796a
commit 92d7c21530
3 changed files with 65 additions and 19 deletions

View file

@ -187,7 +187,7 @@ async def _lifespan(app: FastAPI):
pass pass
app = FastAPI(title="Turnstone API", version="0.6.0", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan) app = FastAPI(title="Turnstone API", version="0.6.1", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan)
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
@ -619,8 +619,16 @@ def delete_source(source_id: str) -> dict:
conn = sqlite3.connect(str(DB_PATH), timeout=30.0) conn = sqlite3.connect(str(DB_PATH), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA journal_mode=WAL")
try: try:
conn.execute("DELETE FROM log_fts WHERE source_id = ?", (source_id,)) # Exact match covers ungrouped IDs; LIKE match covers grouped stems
cur = conn.execute("DELETE FROM log_entries WHERE source_id = ?", (source_id,)) # (e.g. "muninn-journal:Muninn" deletes all "muninn-journal:Muninn:*" units).
conn.execute(
"DELETE FROM log_fts WHERE source_id = ? OR source_id LIKE ? || ':%'",
(source_id, source_id),
)
cur = conn.execute(
"DELETE FROM log_entries WHERE source_id = ? OR source_id LIKE ? || ':%'",
(source_id, source_id),
)
deleted = cur.rowcount deleted = cur.rowcount
conn.commit() conn.commit()
finally: finally:

View file

@ -428,28 +428,45 @@ def recent_source_errors(
def list_sources(db_path: Path) -> list[dict]: def list_sources(db_path: Path) -> list[dict]:
"""Return distinct sources with entry counts and time ranges.""" """Return sources with entry counts, grouped by prefix:host stem.
source_ids with three or more colon-separated segments (e.g.
``muninn-journal:Muninn:ssh.service``) are collapsed to their first two
segments (``muninn-journal:Muninn``). Single- or two-segment IDs are
returned as-is. ``unit_count`` reports how many distinct sub-units were
merged into each row.
"""
conn = sqlite3.connect(str(db_path), timeout=30.0) conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA journal_mode=WAL")
rows = conn.execute(""" rows = conn.execute("""
SELECT SELECT
source_id, CASE
COUNT(*) as entry_count, WHEN INSTR(SUBSTR(source_id, INSTR(source_id, ':')+1), ':') > 0
MIN(timestamp_iso) as earliest, THEN SUBSTR(source_id, 1,
MAX(timestamp_iso) as latest, INSTR(source_id, ':')
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) as error_count + INSTR(SUBSTR(source_id, INSTR(source_id, ':')+1), ':')
- 1)
ELSE source_id
END AS group_id,
COUNT(DISTINCT source_id) AS unit_count,
COUNT(*) AS entry_count,
MIN(timestamp_iso) AS earliest,
MAX(timestamp_iso) AS latest,
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT')
THEN 1 ELSE 0 END) AS error_count
FROM log_entries FROM log_entries
GROUP BY source_id GROUP BY group_id
ORDER BY entry_count DESC ORDER BY entry_count DESC
""").fetchall() """).fetchall()
conn.close() conn.close()
return [ return [
{ {
"source_id": r[0], "source_id": r[0],
"entry_count": r[1], "unit_count": r[1],
"earliest": r[2], "entry_count": r[2],
"latest": r[3], "earliest": r[3],
"error_count": r[4], "latest": r[4],
"error_count": r[5],
} }
for r in rows for r in rows
] ]
@ -502,22 +519,29 @@ def stats_summary(db_path: Path, window_hours: int = 24, severity_overrides: lis
criticals_24h = int(row["criticals"] or 0) criticals_24h = int(row["criticals"] or 0)
errors_24h = int(row["errors"] or 0) errors_24h = int(row["errors"] or 0)
# Per-source breakdown # Per-source breakdown — grouped by prefix:host stem (same logic as list_sources).
source_rows = conn.execute(f""" source_rows = conn.execute(f"""
SELECT SELECT
source_id, CASE
WHEN INSTR(SUBSTR(source_id, INSTR(source_id, ':')+1), ':') > 0
THEN SUBSTR(source_id, 1,
INSTR(source_id, ':')
+ INSTR(SUBSTR(source_id, INSTR(source_id, ':')+1), ':')
- 1)
ELSE source_id
END AS group_id,
COUNT(*) AS entry_count, COUNT(*) AS entry_count,
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS error_count, SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS error_count,
MAX(timestamp_iso) AS latest MAX(timestamp_iso) AS latest
FROM log_entries FROM log_entries
WHERE timestamp_iso >= {since_expr} WHERE timestamp_iso >= {since_expr}
AND repeat_count = 1 AND repeat_count = 1
GROUP BY source_id GROUP BY group_id
ORDER BY error_count DESC, entry_count DESC ORDER BY error_count DESC, entry_count DESC
""").fetchall() """).fetchall()
source_health = [ source_health = [
{ {
"source_id": r["source_id"], "source_id": r["group_id"],
"entry_count": int(r["entry_count"]), "entry_count": int(r["entry_count"]),
"error_count": int(r["error_count"]), "error_count": int(r["error_count"]),
"latest": r["latest"], "latest": r["latest"],

View file

@ -90,6 +90,13 @@
class="px-1.5 py-0.5 rounded text-[10px] font-medium class="px-1.5 py-0.5 rounded text-[10px] font-medium
bg-surface-raised text-text-dim border border-surface-border" bg-surface-raised text-text-dim border border-surface-border"
>{{ gtype }}</span> >{{ gtype }}</span>
<!-- Unit count badge for grouped journal sources -->
<span
v-if="src.unit_count && src.unit_count > 1"
class="px-1.5 py-0.5 rounded text-[10px] font-medium
bg-surface-raised text-text-dim border border-surface-border"
:title="`${src.unit_count} systemd units aggregated into this source`"
>{{ src.unit_count }} units</span>
<!-- Upload badge for DB-only sources not in sources.yaml --> <!-- Upload badge for DB-only sources not in sources.yaml -->
<span <span
v-if="src.dbOnly" v-if="src.dbOnly"
@ -158,6 +165,7 @@ interface SourceRow {
// Local-specific // Local-specific
path?: string path?: string
// DB stats (always present, default 0/null) // DB stats (always present, default 0/null)
unit_count?: number
entry_count: number entry_count: number
error_count: number error_count: number
earliest: string | null earliest: string | null
@ -169,6 +177,7 @@ interface SourceRow {
interface ConfiguredSource extends Omit<SourceRow, 'dbOnly'> {} interface ConfiguredSource extends Omit<SourceRow, 'dbOnly'> {}
interface DbSource { interface DbSource {
source_id: string source_id: string
unit_count: number
entry_count: number entry_count: number
error_count: number error_count: number
earliest: string | null earliest: string | null
@ -242,6 +251,7 @@ async function loadSources(): Promise<void> {
.map(db => ({ .map(db => ({
id: db.source_id, id: db.source_id,
transport: 'local' as const, transport: 'local' as const,
unit_count: db.unit_count,
entry_count: db.entry_count, entry_count: db.entry_count,
error_count: db.error_count, error_count: db.error_count,
earliest: db.earliest, earliest: db.earliest,
@ -267,7 +277,11 @@ function setBusy(id: string, on: boolean): void {
} }
async function deleteSource(sourceId: string): Promise<void> { async function deleteSource(sourceId: string): Promise<void> {
if (!confirm(`Delete all entries for "${sourceId}"? This cannot be undone.`)) return const row = sources.value.find(s => s.id === sourceId)
const label = row?.unit_count && row.unit_count > 1
? `all ${row.unit_count} units under "${sourceId}"`
: `"${sourceId}"`
if (!confirm(`Delete all entries for ${label}? This cannot be undone.`)) return
setBusy(sourceId, true) setBusy(sourceId, true)
actionMsg.value = '' actionMsg.value = ''
try { try {