fix: group journal sources by prefix:host stem in source health
source_ids with 3+ colon segments (e.g. muninn-journal:Muninn:ssh.service) are now aggregated by their prefix:host key at the SQL level in both list_sources() and stats_summary(). This collapses ~19K transient systemd unit rows (crash-loop scope entries from Muninn) into ~24 grouped rows. - list_sources: SQL CASE/INSTR group-by stem + unit_count field - stats_summary: same stem grouping for dashboard source health table - delete endpoint: LIKE-based cascade delete covers grouped stems - SourcesView: unit_count badge (e.g. "2686 units") on grouped rows; delete confirmation names the unit count when deleting a group - Bump version to v0.6.1
This commit is contained in:
parent
9cd7450591
commit
876cfb9a63
3 changed files with 65 additions and 19 deletions
14
app/rest.py
14
app/rest.py
|
|
@ -187,7 +187,7 @@ async def _lifespan(app: FastAPI):
|
|||
pass
|
||||
|
||||
|
||||
app = FastAPI(title="Turnstone API", version="0.6.0", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan)
|
||||
app = FastAPI(title="Turnstone API", version="0.6.1", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan)
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
|
|
@ -619,8 +619,16 @@ def delete_source(source_id: str) -> dict:
|
|||
conn = sqlite3.connect(str(DB_PATH), timeout=30.0)
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
try:
|
||||
conn.execute("DELETE FROM log_fts WHERE source_id = ?", (source_id,))
|
||||
cur = conn.execute("DELETE FROM log_entries WHERE source_id = ?", (source_id,))
|
||||
# Exact match covers ungrouped IDs; LIKE match covers grouped stems
|
||||
# (e.g. "muninn-journal:Muninn" deletes all "muninn-journal:Muninn:*" units).
|
||||
conn.execute(
|
||||
"DELETE FROM log_fts WHERE source_id = ? OR source_id LIKE ? || ':%'",
|
||||
(source_id, source_id),
|
||||
)
|
||||
cur = conn.execute(
|
||||
"DELETE FROM log_entries WHERE source_id = ? OR source_id LIKE ? || ':%'",
|
||||
(source_id, source_id),
|
||||
)
|
||||
deleted = cur.rowcount
|
||||
conn.commit()
|
||||
finally:
|
||||
|
|
|
|||
|
|
@ -428,28 +428,45 @@ def recent_source_errors(
|
|||
|
||||
|
||||
def list_sources(db_path: Path) -> list[dict]:
|
||||
"""Return distinct sources with entry counts and time ranges."""
|
||||
"""Return sources with entry counts, grouped by prefix:host stem.
|
||||
|
||||
source_ids with three or more colon-separated segments (e.g.
|
||||
``muninn-journal:Muninn:ssh.service``) are collapsed to their first two
|
||||
segments (``muninn-journal:Muninn``). Single- or two-segment IDs are
|
||||
returned as-is. ``unit_count`` reports how many distinct sub-units were
|
||||
merged into each row.
|
||||
"""
|
||||
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
rows = conn.execute("""
|
||||
SELECT
|
||||
source_id,
|
||||
COUNT(*) as entry_count,
|
||||
MIN(timestamp_iso) as earliest,
|
||||
MAX(timestamp_iso) as latest,
|
||||
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) as error_count
|
||||
CASE
|
||||
WHEN INSTR(SUBSTR(source_id, INSTR(source_id, ':')+1), ':') > 0
|
||||
THEN SUBSTR(source_id, 1,
|
||||
INSTR(source_id, ':')
|
||||
+ INSTR(SUBSTR(source_id, INSTR(source_id, ':')+1), ':')
|
||||
- 1)
|
||||
ELSE source_id
|
||||
END AS group_id,
|
||||
COUNT(DISTINCT source_id) AS unit_count,
|
||||
COUNT(*) AS entry_count,
|
||||
MIN(timestamp_iso) AS earliest,
|
||||
MAX(timestamp_iso) AS latest,
|
||||
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT')
|
||||
THEN 1 ELSE 0 END) AS error_count
|
||||
FROM log_entries
|
||||
GROUP BY source_id
|
||||
GROUP BY group_id
|
||||
ORDER BY entry_count DESC
|
||||
""").fetchall()
|
||||
conn.close()
|
||||
return [
|
||||
{
|
||||
"source_id": r[0],
|
||||
"entry_count": r[1],
|
||||
"earliest": r[2],
|
||||
"latest": r[3],
|
||||
"error_count": r[4],
|
||||
"unit_count": r[1],
|
||||
"entry_count": r[2],
|
||||
"earliest": r[3],
|
||||
"latest": r[4],
|
||||
"error_count": r[5],
|
||||
}
|
||||
for r in rows
|
||||
]
|
||||
|
|
@ -502,22 +519,29 @@ def stats_summary(db_path: Path, window_hours: int = 24, severity_overrides: lis
|
|||
criticals_24h = int(row["criticals"] or 0)
|
||||
errors_24h = int(row["errors"] or 0)
|
||||
|
||||
# Per-source breakdown
|
||||
# Per-source breakdown — grouped by prefix:host stem (same logic as list_sources).
|
||||
source_rows = conn.execute(f"""
|
||||
SELECT
|
||||
source_id,
|
||||
CASE
|
||||
WHEN INSTR(SUBSTR(source_id, INSTR(source_id, ':')+1), ':') > 0
|
||||
THEN SUBSTR(source_id, 1,
|
||||
INSTR(source_id, ':')
|
||||
+ INSTR(SUBSTR(source_id, INSTR(source_id, ':')+1), ':')
|
||||
- 1)
|
||||
ELSE source_id
|
||||
END AS group_id,
|
||||
COUNT(*) AS entry_count,
|
||||
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS error_count,
|
||||
MAX(timestamp_iso) AS latest
|
||||
FROM log_entries
|
||||
WHERE timestamp_iso >= {since_expr}
|
||||
AND repeat_count = 1
|
||||
GROUP BY source_id
|
||||
GROUP BY group_id
|
||||
ORDER BY error_count DESC, entry_count DESC
|
||||
""").fetchall()
|
||||
source_health = [
|
||||
{
|
||||
"source_id": r["source_id"],
|
||||
"source_id": r["group_id"],
|
||||
"entry_count": int(r["entry_count"]),
|
||||
"error_count": int(r["error_count"]),
|
||||
"latest": r["latest"],
|
||||
|
|
|
|||
|
|
@ -90,6 +90,13 @@
|
|||
class="px-1.5 py-0.5 rounded text-[10px] font-medium
|
||||
bg-surface-raised text-text-dim border border-surface-border"
|
||||
>{{ gtype }}</span>
|
||||
<!-- Unit count badge for grouped journal sources -->
|
||||
<span
|
||||
v-if="src.unit_count && src.unit_count > 1"
|
||||
class="px-1.5 py-0.5 rounded text-[10px] font-medium
|
||||
bg-surface-raised text-text-dim border border-surface-border"
|
||||
:title="`${src.unit_count} systemd units aggregated into this source`"
|
||||
>{{ src.unit_count }} units</span>
|
||||
<!-- Upload badge for DB-only sources not in sources.yaml -->
|
||||
<span
|
||||
v-if="src.dbOnly"
|
||||
|
|
@ -158,6 +165,7 @@ interface SourceRow {
|
|||
// Local-specific
|
||||
path?: string
|
||||
// DB stats (always present, default 0/null)
|
||||
unit_count?: number
|
||||
entry_count: number
|
||||
error_count: number
|
||||
earliest: string | null
|
||||
|
|
@ -169,6 +177,7 @@ interface SourceRow {
|
|||
interface ConfiguredSource extends Omit<SourceRow, 'dbOnly'> {}
|
||||
interface DbSource {
|
||||
source_id: string
|
||||
unit_count: number
|
||||
entry_count: number
|
||||
error_count: number
|
||||
earliest: string | null
|
||||
|
|
@ -242,6 +251,7 @@ async function loadSources(): Promise<void> {
|
|||
.map(db => ({
|
||||
id: db.source_id,
|
||||
transport: 'local' as const,
|
||||
unit_count: db.unit_count,
|
||||
entry_count: db.entry_count,
|
||||
error_count: db.error_count,
|
||||
earliest: db.earliest,
|
||||
|
|
@ -267,7 +277,11 @@ function setBusy(id: string, on: boolean): void {
|
|||
}
|
||||
|
||||
async function deleteSource(sourceId: string): Promise<void> {
|
||||
if (!confirm(`Delete all entries for "${sourceId}"? This cannot be undone.`)) return
|
||||
const row = sources.value.find(s => s.id === sourceId)
|
||||
const label = row?.unit_count && row.unit_count > 1
|
||||
? `all ${row.unit_count} units under "${sourceId}"`
|
||||
: `"${sourceId}"`
|
||||
if (!confirm(`Delete all entries for ${label}? This cannot be undone.`)) return
|
||||
setBusy(sourceId, true)
|
||||
actionMsg.value = ''
|
||||
try {
|
||||
|
|
|
|||
Loading…
Reference in a new issue