diff --git a/app/ingest/pipeline.py b/app/ingest/pipeline.py
index ddc3c13..cbd9fee 100644
--- a/app/ingest/pipeline.py
+++ b/app/ingest/pipeline.py
@@ -38,6 +38,7 @@ CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns);
CREATE TABLE IF NOT EXISTS incidents (
id TEXT PRIMARY KEY,
label TEXT NOT NULL,
+ issue_type TEXT NOT NULL DEFAULT '',
started_at TEXT,
ended_at TEXT,
notes TEXT NOT NULL DEFAULT '',
@@ -45,14 +46,36 @@ CREATE TABLE IF NOT EXISTS incidents (
severity TEXT NOT NULL DEFAULT 'medium'
);
CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at);
+
+CREATE TABLE IF NOT EXISTS received_bundles (
+ id TEXT PRIMARY KEY,
+ source_host TEXT NOT NULL,
+ issue_type TEXT NOT NULL DEFAULT '',
+ label TEXT NOT NULL,
+ severity TEXT NOT NULL DEFAULT 'medium',
+ started_at TEXT,
+ bundled_at TEXT NOT NULL,
+ entry_count INTEGER NOT NULL DEFAULT 0,
+ bundle_json TEXT NOT NULL
+);
+CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at);
+CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type);
"""
def ensure_schema(db_path: Path) -> None:
- """Create all tables if they don't exist. Safe to call on every startup."""
+ """Create all tables and apply additive migrations. Safe to call on every startup."""
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript(_SCHEMA)
+ # Additive column migrations — ALTER TABLE silently skips if column exists
+ for stmt in [
+ "ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''",
+ ]:
+ try:
+ conn.execute(stmt)
+ except sqlite3.OperationalError:
+ pass
conn.commit()
conn.close()
diff --git a/app/rest.py b/app/rest.py
index aa57470..9da8a62 100644
--- a/app/rest.py
+++ b/app/rest.py
@@ -7,7 +7,10 @@ Caddy (menagerie.circuitforge.tech/turnstone) without prefix stripping.
from __future__ import annotations
import dataclasses
+import json
import os
+import urllib.error
+import urllib.request
from pathlib import Path
from typing import Annotated
@@ -19,11 +22,15 @@ from pydantic import BaseModel
from app.ingest.pipeline import ensure_schema
from app.services.incidents import (
+ build_bundle,
create_incident,
delete_incident,
+ get_bundle,
get_incident,
get_incident_entries,
+ list_bundles,
list_incidents,
+ store_bundle,
)
from app.services.search import (
search as _search,
@@ -35,6 +42,8 @@ from app.services.search import (
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
+SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
+BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "")
app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None)
@@ -53,6 +62,7 @@ def _startup() -> None:
class IncidentCreate(BaseModel):
label: str
+ issue_type: str = ""
started_at: str | None = None
ended_at: str | None = None
notes: str = ""
@@ -174,6 +184,7 @@ def create_incident_endpoint(body: IncidentCreate) -> dict:
incident = create_incident(
DB_PATH,
label=body.label,
+ issue_type=body.issue_type,
started_at=body.started_at,
ended_at=body.ended_at,
notes=body.notes,
@@ -206,6 +217,58 @@ def delete_incident_endpoint(incident_id: str) -> dict:
return {"deleted": incident_id}
+@router.get("/api/incidents/{incident_id}/bundle")
+def get_incident_bundle(incident_id: str) -> dict:
+ incident = get_incident(DB_PATH, incident_id)
+ if not incident:
+ raise HTTPException(status_code=404, detail="Incident not found")
+ return build_bundle(DB_PATH, incident, source_host=SOURCE_HOST)
+
+
+@router.post("/api/incidents/{incident_id}/send")
+def send_incident_bundle(incident_id: str) -> dict:
+ if not BUNDLE_ENDPOINT:
+ raise HTTPException(status_code=503, detail="TURNSTONE_BUNDLE_ENDPOINT not configured")
+ incident = get_incident(DB_PATH, incident_id)
+ if not incident:
+ raise HTTPException(status_code=404, detail="Incident not found")
+ bundle = build_bundle(DB_PATH, incident, source_host=SOURCE_HOST)
+ payload = json.dumps(bundle).encode()
+ req = urllib.request.Request(
+ BUNDLE_ENDPOINT,
+ data=payload,
+ headers={"Content-Type": "application/json"},
+ method="POST",
+ )
+ try:
+ with urllib.request.urlopen(req, timeout=15) as resp:
+ return {"sent": True, "status": resp.status, "entry_count": len(bundle["log_entries"])}
+ except urllib.error.HTTPError as exc:
+ raise HTTPException(status_code=502, detail=f"Receiver returned {exc.code}") from exc
+ except OSError as exc:
+ raise HTTPException(status_code=502, detail=f"Send failed: {exc}") from exc
+
+
+@router.post("/api/bundles")
+def receive_bundle(bundle: dict) -> dict:
+ record = store_bundle(DB_PATH, bundle)
+ return {"id": record.id, "entry_count": record.entry_count}
+
+
+@router.get("/api/bundles")
+def list_bundles_endpoint() -> dict:
+ bundles = list_bundles(DB_PATH)
+ return {"bundles": [dataclasses.asdict(b) for b in bundles]}
+
+
+@router.get("/api/bundles/{bundle_id}")
+def get_bundle_endpoint(bundle_id: str) -> dict:
+ bundle = get_bundle(DB_PATH, bundle_id)
+ if not bundle:
+ raise HTTPException(status_code=404, detail="Bundle not found")
+ return dataclasses.asdict(bundle)
+
+
app.include_router(router)
diff --git a/app/services/incidents.py b/app/services/incidents.py
index 7d1df8a..9699ba0 100644
--- a/app/services/incidents.py
+++ b/app/services/incidents.py
@@ -1,12 +1,13 @@
-"""CRUD operations for user-tagged incidents."""
+"""CRUD operations for user-tagged incidents and received log bundles."""
from __future__ import annotations
+import json
import sqlite3
import uuid
from pathlib import Path
from app.ingest.base import now_iso
-from app.services.models import Incident
+from app.services.models import Incident, ReceivedBundle
from app.services.search import SearchResult, entries_in_window, search
@@ -14,6 +15,7 @@ def _row_to_incident(row: sqlite3.Row) -> Incident:
return Incident(
id=row["id"],
label=row["label"],
+ issue_type=row["issue_type"] if "issue_type" in row.keys() else "",
started_at=row["started_at"],
ended_at=row["ended_at"],
notes=row["notes"],
@@ -22,9 +24,24 @@ def _row_to_incident(row: sqlite3.Row) -> Incident:
)
+def _row_to_bundle(row: sqlite3.Row) -> ReceivedBundle:
+ return ReceivedBundle(
+ id=row["id"],
+ source_host=row["source_host"],
+ issue_type=row["issue_type"],
+ label=row["label"],
+ severity=row["severity"],
+ started_at=row["started_at"],
+ bundled_at=row["bundled_at"],
+ entry_count=row["entry_count"],
+ bundle_json=row["bundle_json"],
+ )
+
+
def create_incident(
db_path: Path,
label: str,
+ issue_type: str = "",
started_at: str | None = None,
ended_at: str | None = None,
notes: str = "",
@@ -33,6 +50,7 @@ def create_incident(
incident = Incident(
id=str(uuid.uuid4()),
label=label,
+ issue_type=issue_type,
started_at=started_at,
ended_at=ended_at,
notes=notes,
@@ -42,10 +60,10 @@ def create_incident(
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(
- "INSERT INTO incidents (id, label, started_at, ended_at, notes, created_at, severity) "
- "VALUES (?, ?, ?, ?, ?, ?, ?)",
- (incident.id, incident.label, incident.started_at, incident.ended_at,
- incident.notes, incident.created_at, incident.severity),
+ "INSERT INTO incidents (id, label, issue_type, started_at, ended_at, notes, created_at, severity) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
+ (incident.id, incident.label, incident.issue_type, incident.started_at,
+ incident.ended_at, incident.notes, incident.created_at, incident.severity),
)
conn.commit()
conn.close()
@@ -88,12 +106,7 @@ def get_incident_entries(
incident: Incident,
limit: int = 100,
) -> list[SearchResult]:
- """Return log entries associated with an incident's time window.
-
- Strategy: keyword search first (FTS, ranked by relevance), then fill
- remaining slots with a raw timestamp-window scan so the incident always
- shows *something* even when no keywords match.
- """
+ """Return log entries associated with an incident's time window."""
half = limit // 2
common: dict = dict(since=incident.started_at, until=incident.ended_at, limit=half)
@@ -108,7 +121,6 @@ def get_incident_entries(
seen.add(entry.entry_id)
combined.append(entry)
- # Fallback: fill remaining slots from raw window scan (errors first, then all)
if len(combined) < limit:
for entry in entries_in_window(db_path, incident.started_at, incident.ended_at, severity="ERROR", limit=half):
if entry.entry_id not in seen:
@@ -123,3 +135,89 @@ def get_incident_entries(
combined.sort(key=lambda e: (e.timestamp_iso or "\xff", e.sequence))
return combined[:limit]
+
+
+def build_bundle(
+ db_path: Path,
+ incident: Incident,
+ source_host: str,
+ limit: int = 200,
+) -> dict:
+ """Assemble a labeled bundle: incident metadata + related log entries."""
+ entries = get_incident_entries(db_path, incident, limit=limit)
+ return {
+ "bundle_version": 1,
+ "source_host": source_host,
+ "bundled_at": now_iso(),
+ "incident": {
+ "id": incident.id,
+ "label": incident.label,
+ "issue_type": incident.issue_type,
+ "started_at": incident.started_at,
+ "ended_at": incident.ended_at,
+ "severity": incident.severity,
+ "notes": incident.notes,
+ },
+ "log_entries": [
+ {
+ "entry_id": e.entry_id,
+ "source_id": e.source_id,
+ "timestamp_iso": e.timestamp_iso,
+ "severity": e.severity,
+ "text": e.text,
+ "matched_patterns": list(e.matched_patterns),
+ }
+ for e in entries
+ ],
+ }
+
+
+def store_bundle(db_path: Path, bundle: dict) -> ReceivedBundle:
+ """Store an incoming bundle from a remote Turnstone instance."""
+ inc = bundle.get("incident", {})
+ record = ReceivedBundle(
+ id=str(uuid.uuid4()),
+ source_host=bundle.get("source_host", "unknown"),
+ issue_type=inc.get("issue_type", ""),
+ label=inc.get("label", ""),
+ severity=inc.get("severity", "medium"),
+ started_at=inc.get("started_at"),
+ bundled_at=bundle.get("bundled_at", now_iso()),
+ entry_count=len(bundle.get("log_entries", [])),
+ bundle_json=json.dumps(bundle),
+ )
+ conn = sqlite3.connect(str(db_path))
+ conn.execute("PRAGMA journal_mode=WAL")
+ conn.execute(
+ "INSERT INTO received_bundles "
+ "(id, source_host, issue_type, label, severity, started_at, bundled_at, entry_count, bundle_json) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
+ (record.id, record.source_host, record.issue_type, record.label,
+ record.severity, record.started_at, record.bundled_at, record.entry_count, record.bundle_json),
+ )
+ conn.commit()
+ conn.close()
+ return record
+
+
+def list_bundles(db_path: Path) -> list[ReceivedBundle]:
+ conn = sqlite3.connect(str(db_path))
+ conn.execute("PRAGMA journal_mode=WAL")
+ conn.row_factory = sqlite3.Row
+ rows = conn.execute(
+ "SELECT id, source_host, issue_type, label, severity, started_at, bundled_at, entry_count, bundle_json "
+ "FROM received_bundles ORDER BY bundled_at DESC"
+ ).fetchall()
+ conn.close()
+ return [_row_to_bundle(r) for r in rows]
+
+
+def get_bundle(db_path: Path, bundle_id: str) -> ReceivedBundle | None:
+ conn = sqlite3.connect(str(db_path))
+ conn.execute("PRAGMA journal_mode=WAL")
+ conn.row_factory = sqlite3.Row
+ row = conn.execute(
+ "SELECT * FROM received_bundles WHERE id = ?", (bundle_id,)
+ ).fetchone()
+ conn.close()
+ return _row_to_bundle(row) if row else None
diff --git a/app/services/models.py b/app/services/models.py
index 2ec1a8d..e551135 100644
--- a/app/services/models.py
+++ b/app/services/models.py
@@ -39,8 +39,24 @@ class Incident:
id: str # UUID
label: str # free-text description ("plex crash", "audio broken")
+ issue_type: str # short category tag for pattern building ("qbit_stall", "auth_failure")
started_at: str | None # ISO timestamp; None = open-ended start
ended_at: str | None # ISO timestamp; None = open-ended end
notes: str # additional context
created_at: str # wall-clock when this was tagged
severity: str # user-assigned: low / medium / high / critical
+
+
+@dataclass(frozen=True)
+class ReceivedBundle:
+ """A labeled incident bundle received from a remote Turnstone instance."""
+
+ id: str
+ source_host: str
+ issue_type: str
+ label: str
+ severity: str
+ started_at: str | None
+ bundled_at: str
+ entry_count: int
+ bundle_json: str # full bundle serialized as JSON string
diff --git a/podman-standalone.sh b/podman-standalone.sh
index 0c63999..1fe5dc2 100755
--- a/podman-standalone.sh
+++ b/podman-standalone.sh
@@ -59,6 +59,15 @@ DATA_DIR=/opt/turnstone/data
PATTERNS_DIR=/opt/turnstone/patterns
TZ=America/Los_Angeles
+# ── Bundle push configuration ────────────────────────────────────────────────
+# Set TURNSTONE_BUNDLE_ENDPOINT before running this script to enable the
+# "Send Bundle" button in the Incidents UI:
+#
+# export TURNSTONE_BUNDLE_ENDPOINT=https://turnstone.circuitforge.tech/turnstone/api/bundles
+# bash /opt/turnstone/podman-standalone.sh
+#
+# TURNSTONE_SOURCE_HOST is auto-detected from `hostname` — override if needed.
+
# ── Log source bind mounts ────────────────────────────────────────────────────
# Add or remove mount flags below for each service whose logs you want to ingest.
# Inside the container, paths appear under /logs/ Labeled incident bundles sent from remote Turnstone instances. Use these to build detection signatures. No bundles received yet. Bundles arrive when a remote Turnstone instance sends a labeled incident. {{ expandedEntries.length }} log entriesReceived Bundles
+