diff --git a/app/ingest/pipeline.py b/app/ingest/pipeline.py index ddc3c13..cbd9fee 100644 --- a/app/ingest/pipeline.py +++ b/app/ingest/pipeline.py @@ -38,6 +38,7 @@ CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns); CREATE TABLE IF NOT EXISTS incidents ( id TEXT PRIMARY KEY, label TEXT NOT NULL, + issue_type TEXT NOT NULL DEFAULT '', started_at TEXT, ended_at TEXT, notes TEXT NOT NULL DEFAULT '', @@ -45,14 +46,36 @@ CREATE TABLE IF NOT EXISTS incidents ( severity TEXT NOT NULL DEFAULT 'medium' ); CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at); + +CREATE TABLE IF NOT EXISTS received_bundles ( + id TEXT PRIMARY KEY, + source_host TEXT NOT NULL, + issue_type TEXT NOT NULL DEFAULT '', + label TEXT NOT NULL, + severity TEXT NOT NULL DEFAULT 'medium', + started_at TEXT, + bundled_at TEXT NOT NULL, + entry_count INTEGER NOT NULL DEFAULT 0, + bundle_json TEXT NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at); +CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type); """ def ensure_schema(db_path: Path) -> None: - """Create all tables if they don't exist. Safe to call on every startup.""" + """Create all tables and apply additive migrations. Safe to call on every startup.""" conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.executescript(_SCHEMA) + # Additive column migrations — ALTER TABLE silently skips if column exists + for stmt in [ + "ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''", + ]: + try: + conn.execute(stmt) + except sqlite3.OperationalError: + pass conn.commit() conn.close() diff --git a/app/rest.py b/app/rest.py index aa57470..9da8a62 100644 --- a/app/rest.py +++ b/app/rest.py @@ -7,7 +7,10 @@ Caddy (menagerie.circuitforge.tech/turnstone) without prefix stripping. from __future__ import annotations import dataclasses +import json import os +import urllib.error +import urllib.request from pathlib import Path from typing import Annotated @@ -19,11 +22,15 @@ from pydantic import BaseModel from app.ingest.pipeline import ensure_schema from app.services.incidents import ( + build_bundle, create_incident, delete_incident, + get_bundle, get_incident, get_incident_entries, + list_bundles, list_incidents, + store_bundle, ) from app.services.search import ( search as _search, @@ -35,6 +42,8 @@ from app.services.search import ( DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db")) DIST_DIR = Path(__file__).parent.parent / "web" / "dist" +SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown") +BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "") app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None) @@ -53,6 +62,7 @@ def _startup() -> None: class IncidentCreate(BaseModel): label: str + issue_type: str = "" started_at: str | None = None ended_at: str | None = None notes: str = "" @@ -174,6 +184,7 @@ def create_incident_endpoint(body: IncidentCreate) -> dict: incident = create_incident( DB_PATH, label=body.label, + issue_type=body.issue_type, started_at=body.started_at, ended_at=body.ended_at, notes=body.notes, @@ -206,6 +217,58 @@ def delete_incident_endpoint(incident_id: str) -> dict: return {"deleted": incident_id} +@router.get("/api/incidents/{incident_id}/bundle") +def get_incident_bundle(incident_id: str) -> dict: + incident = get_incident(DB_PATH, incident_id) + if not incident: + raise HTTPException(status_code=404, detail="Incident not found") + return build_bundle(DB_PATH, incident, source_host=SOURCE_HOST) + + +@router.post("/api/incidents/{incident_id}/send") +def send_incident_bundle(incident_id: str) -> dict: + if not BUNDLE_ENDPOINT: + raise HTTPException(status_code=503, detail="TURNSTONE_BUNDLE_ENDPOINT not configured") + incident = get_incident(DB_PATH, incident_id) + if not incident: + raise HTTPException(status_code=404, detail="Incident not found") + bundle = build_bundle(DB_PATH, incident, source_host=SOURCE_HOST) + payload = json.dumps(bundle).encode() + req = urllib.request.Request( + BUNDLE_ENDPOINT, + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + with urllib.request.urlopen(req, timeout=15) as resp: + return {"sent": True, "status": resp.status, "entry_count": len(bundle["log_entries"])} + except urllib.error.HTTPError as exc: + raise HTTPException(status_code=502, detail=f"Receiver returned {exc.code}") from exc + except OSError as exc: + raise HTTPException(status_code=502, detail=f"Send failed: {exc}") from exc + + +@router.post("/api/bundles") +def receive_bundle(bundle: dict) -> dict: + record = store_bundle(DB_PATH, bundle) + return {"id": record.id, "entry_count": record.entry_count} + + +@router.get("/api/bundles") +def list_bundles_endpoint() -> dict: + bundles = list_bundles(DB_PATH) + return {"bundles": [dataclasses.asdict(b) for b in bundles]} + + +@router.get("/api/bundles/{bundle_id}") +def get_bundle_endpoint(bundle_id: str) -> dict: + bundle = get_bundle(DB_PATH, bundle_id) + if not bundle: + raise HTTPException(status_code=404, detail="Bundle not found") + return dataclasses.asdict(bundle) + + app.include_router(router) diff --git a/app/services/incidents.py b/app/services/incidents.py index 7d1df8a..9699ba0 100644 --- a/app/services/incidents.py +++ b/app/services/incidents.py @@ -1,12 +1,13 @@ -"""CRUD operations for user-tagged incidents.""" +"""CRUD operations for user-tagged incidents and received log bundles.""" from __future__ import annotations +import json import sqlite3 import uuid from pathlib import Path from app.ingest.base import now_iso -from app.services.models import Incident +from app.services.models import Incident, ReceivedBundle from app.services.search import SearchResult, entries_in_window, search @@ -14,6 +15,7 @@ def _row_to_incident(row: sqlite3.Row) -> Incident: return Incident( id=row["id"], label=row["label"], + issue_type=row["issue_type"] if "issue_type" in row.keys() else "", started_at=row["started_at"], ended_at=row["ended_at"], notes=row["notes"], @@ -22,9 +24,24 @@ def _row_to_incident(row: sqlite3.Row) -> Incident: ) +def _row_to_bundle(row: sqlite3.Row) -> ReceivedBundle: + return ReceivedBundle( + id=row["id"], + source_host=row["source_host"], + issue_type=row["issue_type"], + label=row["label"], + severity=row["severity"], + started_at=row["started_at"], + bundled_at=row["bundled_at"], + entry_count=row["entry_count"], + bundle_json=row["bundle_json"], + ) + + def create_incident( db_path: Path, label: str, + issue_type: str = "", started_at: str | None = None, ended_at: str | None = None, notes: str = "", @@ -33,6 +50,7 @@ def create_incident( incident = Incident( id=str(uuid.uuid4()), label=label, + issue_type=issue_type, started_at=started_at, ended_at=ended_at, notes=notes, @@ -42,10 +60,10 @@ def create_incident( conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.execute( - "INSERT INTO incidents (id, label, started_at, ended_at, notes, created_at, severity) " - "VALUES (?, ?, ?, ?, ?, ?, ?)", - (incident.id, incident.label, incident.started_at, incident.ended_at, - incident.notes, incident.created_at, incident.severity), + "INSERT INTO incidents (id, label, issue_type, started_at, ended_at, notes, created_at, severity) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (incident.id, incident.label, incident.issue_type, incident.started_at, + incident.ended_at, incident.notes, incident.created_at, incident.severity), ) conn.commit() conn.close() @@ -88,12 +106,7 @@ def get_incident_entries( incident: Incident, limit: int = 100, ) -> list[SearchResult]: - """Return log entries associated with an incident's time window. - - Strategy: keyword search first (FTS, ranked by relevance), then fill - remaining slots with a raw timestamp-window scan so the incident always - shows *something* even when no keywords match. - """ + """Return log entries associated with an incident's time window.""" half = limit // 2 common: dict = dict(since=incident.started_at, until=incident.ended_at, limit=half) @@ -108,7 +121,6 @@ def get_incident_entries( seen.add(entry.entry_id) combined.append(entry) - # Fallback: fill remaining slots from raw window scan (errors first, then all) if len(combined) < limit: for entry in entries_in_window(db_path, incident.started_at, incident.ended_at, severity="ERROR", limit=half): if entry.entry_id not in seen: @@ -123,3 +135,89 @@ def get_incident_entries( combined.sort(key=lambda e: (e.timestamp_iso or "\xff", e.sequence)) return combined[:limit] + + +def build_bundle( + db_path: Path, + incident: Incident, + source_host: str, + limit: int = 200, +) -> dict: + """Assemble a labeled bundle: incident metadata + related log entries.""" + entries = get_incident_entries(db_path, incident, limit=limit) + return { + "bundle_version": 1, + "source_host": source_host, + "bundled_at": now_iso(), + "incident": { + "id": incident.id, + "label": incident.label, + "issue_type": incident.issue_type, + "started_at": incident.started_at, + "ended_at": incident.ended_at, + "severity": incident.severity, + "notes": incident.notes, + }, + "log_entries": [ + { + "entry_id": e.entry_id, + "source_id": e.source_id, + "timestamp_iso": e.timestamp_iso, + "severity": e.severity, + "text": e.text, + "matched_patterns": list(e.matched_patterns), + } + for e in entries + ], + } + + +def store_bundle(db_path: Path, bundle: dict) -> ReceivedBundle: + """Store an incoming bundle from a remote Turnstone instance.""" + inc = bundle.get("incident", {}) + record = ReceivedBundle( + id=str(uuid.uuid4()), + source_host=bundle.get("source_host", "unknown"), + issue_type=inc.get("issue_type", ""), + label=inc.get("label", ""), + severity=inc.get("severity", "medium"), + started_at=inc.get("started_at"), + bundled_at=bundle.get("bundled_at", now_iso()), + entry_count=len(bundle.get("log_entries", [])), + bundle_json=json.dumps(bundle), + ) + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute( + "INSERT INTO received_bundles " + "(id, source_host, issue_type, label, severity, started_at, bundled_at, entry_count, bundle_json) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + (record.id, record.source_host, record.issue_type, record.label, + record.severity, record.started_at, record.bundled_at, record.entry_count, record.bundle_json), + ) + conn.commit() + conn.close() + return record + + +def list_bundles(db_path: Path) -> list[ReceivedBundle]: + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + conn.row_factory = sqlite3.Row + rows = conn.execute( + "SELECT id, source_host, issue_type, label, severity, started_at, bundled_at, entry_count, bundle_json " + "FROM received_bundles ORDER BY bundled_at DESC" + ).fetchall() + conn.close() + return [_row_to_bundle(r) for r in rows] + + +def get_bundle(db_path: Path, bundle_id: str) -> ReceivedBundle | None: + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + conn.row_factory = sqlite3.Row + row = conn.execute( + "SELECT * FROM received_bundles WHERE id = ?", (bundle_id,) + ).fetchone() + conn.close() + return _row_to_bundle(row) if row else None diff --git a/app/services/models.py b/app/services/models.py index 2ec1a8d..e551135 100644 --- a/app/services/models.py +++ b/app/services/models.py @@ -39,8 +39,24 @@ class Incident: id: str # UUID label: str # free-text description ("plex crash", "audio broken") + issue_type: str # short category tag for pattern building ("qbit_stall", "auth_failure") started_at: str | None # ISO timestamp; None = open-ended start ended_at: str | None # ISO timestamp; None = open-ended end notes: str # additional context created_at: str # wall-clock when this was tagged severity: str # user-assigned: low / medium / high / critical + + +@dataclass(frozen=True) +class ReceivedBundle: + """A labeled incident bundle received from a remote Turnstone instance.""" + + id: str + source_host: str + issue_type: str + label: str + severity: str + started_at: str | None + bundled_at: str + entry_count: int + bundle_json: str # full bundle serialized as JSON string diff --git a/podman-standalone.sh b/podman-standalone.sh index e4e83cb..da360ff 100755 --- a/podman-standalone.sh +++ b/podman-standalone.sh @@ -59,6 +59,15 @@ DATA_DIR=/opt/turnstone/data PATTERNS_DIR=/opt/turnstone/patterns TZ=America/Los_Angeles +# ── Bundle push configuration ──────────────────────────────────────────────── +# Set TURNSTONE_BUNDLE_ENDPOINT before running this script to enable the +# "Send Bundle" button in the Incidents UI: +# +# export TURNSTONE_BUNDLE_ENDPOINT=https://turnstone.circuitforge.tech/turnstone/api/bundles +# bash /opt/turnstone/podman-standalone.sh +# +# TURNSTONE_SOURCE_HOST is auto-detected from `hostname` — override if needed. + # ── Log source bind mounts ──────────────────────────────────────────────────── # Add or remove mount flags below for each service whose logs you want to ingest. # Inside the container, paths appear under /logs// @@ -81,6 +90,8 @@ podman run -d \ -v "${PATTERNS_DIR}:/patterns:Z" \ -v "${QBIT_LOGS}:/logs/qbittorrent:ro" \ -e TURNSTONE_DB=/data/turnstone.db \ + -e TURNSTONE_SOURCE_HOST="$(hostname)" \ + -e TURNSTONE_BUNDLE_ENDPOINT="${TURNSTONE_BUNDLE_ENDPOINT:-}" \ -e PYTHONUNBUFFERED=1 \ -e TZ="${TZ}" \ --health-cmd="curl -f http://localhost:8534/turnstone/health || exit 1" \ diff --git a/web/src/App.vue b/web/src/App.vue index ebe22fd..cd42978 100644 --- a/web/src/App.vue +++ b/web/src/App.vue @@ -40,6 +40,7 @@ const navLinks = [ { to: '/search', label: 'Search' }, { to: '/diagnose', label: 'Diagnose' }, { to: '/incidents', label: 'Incidents' }, + { to: '/bundles', label: 'Bundles' }, { to: '/sources', label: 'Sources' }, ] diff --git a/web/src/router/index.ts b/web/src/router/index.ts index 238b906..42dbafe 100644 --- a/web/src/router/index.ts +++ b/web/src/router/index.ts @@ -4,6 +4,7 @@ import LogSearchView from '@/views/LogSearchView.vue' import DiagnoseView from '@/views/DiagnoseView.vue' import SourcesView from '@/views/SourcesView.vue' import IncidentsView from '@/views/IncidentsView.vue' +import BundlesView from '@/views/BundlesView.vue' export default createRouter({ history: createWebHistory(import.meta.env.BASE_URL), @@ -12,7 +13,8 @@ export default createRouter({ { path: '/dashboard', component: DashboardView }, { path: '/search', component: LogSearchView }, { path: '/diagnose', component: DiagnoseView }, - { path: '/sources', component: SourcesView }, { path: '/incidents', component: IncidentsView }, + { path: '/bundles', component: BundlesView }, + { path: '/sources', component: SourcesView }, ], }) diff --git a/web/src/views/BundlesView.vue b/web/src/views/BundlesView.vue new file mode 100644 index 0000000..abbe90c --- /dev/null +++ b/web/src/views/BundlesView.vue @@ -0,0 +1,183 @@ + + + diff --git a/web/src/views/IncidentsView.vue b/web/src/views/IncidentsView.vue index 39b29e8..4a6c2fe 100644 --- a/web/src/views/IncidentsView.vue +++ b/web/src/views/IncidentsView.vue @@ -83,6 +83,28 @@

+ +
+ + + + +
+
@@ -134,6 +156,7 @@ Description + Type Severity Window Tagged @@ -148,6 +171,10 @@ @click="selectIncident(inc)" > {{ inc.label }} + + {{ inc.issue_type }} + + {{ inc.severity }} @@ -171,8 +198,21 @@
-

{{ selected.label }}

- +
+

{{ selected.label }}

+ {{ selected.issue_type }} +
+
+ + {{ sendStatus.msg }} + +
Fetching log entries…
@@ -210,6 +250,7 @@ const BASE = import.meta.env.BASE_URL.replace(/\/$/, '') interface Incident { id: string label: string + issue_type: string started_at: string | null ended_at: string | null notes: string @@ -253,6 +294,7 @@ function getQuickRange(preset: string): { started_at: string | null; ended_at: s // ── form state ────────────────────────────────────────────── const form = ref({ label: '', + issue_type: '', severity: 'medium', notes: '', started_at: null as string | null, @@ -307,6 +349,7 @@ async function submitIncident() { headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ label: form.value.label, + issue_type: form.value.issue_type, severity: form.value.severity, notes: form.value.notes, started_at, @@ -316,7 +359,7 @@ async function submitIncident() { if (!res.ok) throw new Error(await res.text()) const created: Incident = await res.json() incidents.value.unshift(created) - form.value = { label: '', severity: 'medium', notes: '', started_at: null, ended_at: null, started_at_local: '', ended_at_local: '' } + form.value = { label: '', issue_type: '', severity: 'medium', notes: '', started_at: null, ended_at: null, started_at_local: '', ended_at_local: '' } activePreset.value = null showCustomPicker.value = false } catch (e: unknown) { @@ -346,14 +389,17 @@ async function deleteIncident(id: string) { } // ── detail drawer ───────────────────────────────────────────── -const selected = ref(null) +const selected = ref(null) const selectedEntries = ref([]) -const entriesLoading = ref(false) +const entriesLoading = ref(false) +const sending = ref(false) +const sendStatus = ref<{ ok: boolean; msg: string } | null>(null) async function selectIncident(inc: Incident) { selected.value = inc selectedEntries.value = [] entriesLoading.value = true + sendStatus.value = null try { const res = await fetch(`${BASE}/api/incidents/${inc.id}`) if (res.ok) { @@ -365,6 +411,25 @@ async function selectIncident(inc: Incident) { } } +async function sendBundle(id: string) { + sending.value = true + sendStatus.value = null + try { + const res = await fetch(`${BASE}/api/incidents/${id}/send`, { method: 'POST' }) + if (res.ok) { + const data = await res.json() + sendStatus.value = { ok: true, msg: `Sent ${data.entry_count} entries` } + } else { + const err = await res.json().catch(() => ({ detail: res.statusText })) + sendStatus.value = { ok: false, msg: err.detail ?? 'Send failed' } + } + } catch (e) { + sendStatus.value = { ok: false, msg: 'Network error' } + } finally { + sending.value = false + } +} + // ── helpers ─────────────────────────────────────────────────── function severityStyle(sev: string): Record { const k = sev?.toLowerCase() ?? 'low'