- Add app/db/ abstraction layer: Backend enum, DbConn wrapper, dialect helper (q() for ? vs %s paramstyle), get_conn(), tenant_id() - Auto-detect backend from DATABASE_URL; SQLite remains default when unset — no config change for local deployments - Add tenant_id column to all three logical DBs (main, context, incidents); idempotent ALTER TABLE migration runs before schema scripts on existing DBs - All INSERTs inject tenant_id; SELECTs use (tenant_id = ? OR tenant_id = '') for backward compat with pre-namespacing rows - Add docker-compose.yml with named volume turnstone_pgdata (survives rebuilds) and optional external Postgres support via DATABASE_URL override - Add scripts/migrate_sqlite_to_postgres.py — one-shot idempotent migration for existing SQLite data; ON CONFLICT DO NOTHING for safe re-runs - Fix SSH glean path in pipeline.py to use ensure_schema + get_conn (was still using raw sqlite3.connect + old _SCHEMA without tenant_id) - Fix FTS5 JOIN ambiguity: qualify repeat_count as f.repeat_count in search - Update all tests to use ensure_*_schema fixtures; add row_factory where needed - 394/394 tests passing Closes: #42 Closes: #50
204 lines
7.9 KiB
Python
204 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
|
"""One-shot migration: copy data from existing SQLite DBs into Postgres.
|
|
|
|
Usage:
|
|
DATABASE_URL=postgresql://... python scripts/migrate_sqlite_to_postgres.py \
|
|
--main-db data/turnstone.db \
|
|
--context-db data/turnstone-context.db \
|
|
--incidents-db data/turnstone-incidents.db \
|
|
[--tenant-id heimdall]
|
|
|
|
The script is idempotent: rows already present in Postgres (same id) are skipped.
|
|
It must be run ONCE per node after deploying the shared Postgres backend.
|
|
|
|
Prerequisites:
|
|
pip install 'psycopg[binary,pool]'
|
|
Set DATABASE_URL to the target Postgres connection string.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import sqlite3
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Allow running from the project root without installing the package
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
|
|
def _pg_connect():
|
|
import psycopg # type: ignore[import]
|
|
url = os.environ.get("DATABASE_URL")
|
|
if not url:
|
|
print("ERROR: DATABASE_URL not set", file=sys.stderr)
|
|
sys.exit(1)
|
|
return psycopg.connect(url, autocommit=False)
|
|
|
|
|
|
def _ensure_schema_pg() -> None:
|
|
from app.db.schema import ensure_schema, ensure_context_schema, ensure_incidents_schema
|
|
from pathlib import Path
|
|
ensure_schema(Path("/dev/null")) # db_path ignored for Postgres
|
|
ensure_context_schema(Path("/dev/null"))
|
|
ensure_incidents_schema(Path("/dev/null"))
|
|
print("Postgres schema verified")
|
|
|
|
|
|
def _migrate_table(
|
|
src_conn: sqlite3.Connection,
|
|
dst_conn,
|
|
table: str,
|
|
tenant_id: str,
|
|
columns: list[str],
|
|
conflict_cols: list[str],
|
|
) -> int:
|
|
"""Copy rows from SQLite table to Postgres. Returns rows inserted."""
|
|
# Check if source table exists
|
|
try:
|
|
rows = src_conn.execute(f"SELECT * FROM {table} LIMIT 0").fetchall() # noqa: S608
|
|
except sqlite3.OperationalError:
|
|
print(f" {table}: not found in SQLite — skipping")
|
|
return 0
|
|
|
|
# Fetch all rows
|
|
src_conn.row_factory = sqlite3.Row
|
|
rows = src_conn.execute(f"SELECT * FROM {table}").fetchall() # noqa: S608
|
|
if not rows:
|
|
print(f" {table}: empty — skipping")
|
|
return 0
|
|
|
|
# Build INSERT ... ON CONFLICT DO NOTHING
|
|
col_list = ", ".join(columns)
|
|
placeholders = ", ".join("%s" for _ in columns)
|
|
conflict = ", ".join(conflict_cols)
|
|
sql = (
|
|
f"INSERT INTO {table} ({col_list}) VALUES ({placeholders}) " # noqa: S608
|
|
f"ON CONFLICT ({conflict}) DO NOTHING"
|
|
)
|
|
|
|
inserted = 0
|
|
with dst_conn.cursor() as cur:
|
|
for row in rows:
|
|
# Build values: inject tenant_id if not present in source row
|
|
vals = []
|
|
for col in columns:
|
|
if col == "tenant_id":
|
|
try:
|
|
val = row["tenant_id"] or tenant_id
|
|
except (IndexError, KeyError):
|
|
val = tenant_id
|
|
else:
|
|
try:
|
|
vals.append(row[col])
|
|
except (IndexError, KeyError):
|
|
vals.append(None)
|
|
continue
|
|
vals.append(val)
|
|
cur.execute(sql, vals)
|
|
inserted += cur.rowcount
|
|
|
|
dst_conn.commit()
|
|
print(f" {table}: {inserted}/{len(rows)} rows inserted ({len(rows) - inserted} skipped)")
|
|
return inserted
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Migrate Turnstone SQLite → Postgres")
|
|
parser.add_argument("--main-db", default="data/turnstone.db")
|
|
parser.add_argument("--context-db", default="data/turnstone-context.db")
|
|
parser.add_argument("--incidents-db", default="data/turnstone-incidents.db")
|
|
parser.add_argument("--tenant-id", default=None, help="Override tenant ID (default: socket.gethostname())")
|
|
args = parser.parse_args()
|
|
|
|
if args.tenant_id:
|
|
os.environ["TURNSTONE_TENANT_ID"] = args.tenant_id
|
|
|
|
import socket
|
|
tenant_id = os.environ.get("TURNSTONE_TENANT_ID") or socket.gethostname()
|
|
print(f"Migrating as tenant_id={tenant_id!r}")
|
|
|
|
# Ensure Postgres schema exists first
|
|
os.environ.setdefault("DATABASE_URL", "") # schema functions check this
|
|
_ensure_schema_pg()
|
|
|
|
pg = _pg_connect()
|
|
total = 0
|
|
|
|
# ── Main DB ───────────────────────────────────────────────────────────────
|
|
main_path = Path(args.main_db)
|
|
if main_path.exists():
|
|
print(f"\nMigrating main DB: {main_path}")
|
|
src = sqlite3.connect(str(main_path))
|
|
src.row_factory = sqlite3.Row
|
|
|
|
total += _migrate_table(src, pg, "log_entries", tenant_id,
|
|
columns=["tenant_id", "id", "source_id", "sequence", "timestamp_raw",
|
|
"timestamp_iso", "ingest_time", "severity", "repeat_count",
|
|
"out_of_order", "matched_patterns", "text"],
|
|
conflict_cols=["tenant_id", "id"])
|
|
|
|
total += _migrate_table(src, pg, "glean_fingerprints", tenant_id,
|
|
columns=["tenant_id", "path", "mtime", "size", "gleaned_at"],
|
|
conflict_cols=["tenant_id", "path"])
|
|
|
|
total += _migrate_table(src, pg, "blocklist_candidates", tenant_id,
|
|
columns=["id", "tenant_id", "domain_or_ip", "source_device_ip", "source_device_name",
|
|
"first_seen", "last_seen", "hit_count", "status", "pushed_at",
|
|
"log_evidence", "matched_rule", "llm_score", "llm_reason"],
|
|
conflict_cols=["id"])
|
|
src.close()
|
|
else:
|
|
print(f"Main DB not found at {main_path} — skipping")
|
|
|
|
# ── Context DB ────────────────────────────────────────────────────────────
|
|
ctx_path = Path(args.context_db)
|
|
if ctx_path.exists():
|
|
print(f"\nMigrating context DB: {ctx_path}")
|
|
src = sqlite3.connect(str(ctx_path))
|
|
|
|
total += _migrate_table(src, pg, "context_facts", tenant_id,
|
|
columns=["id", "tenant_id", "category", "key", "value", "source", "created_at"],
|
|
conflict_cols=["id"])
|
|
|
|
total += _migrate_table(src, pg, "context_documents", tenant_id,
|
|
columns=["id", "tenant_id", "filename", "doc_type", "full_text", "file_size", "uploaded_at"],
|
|
conflict_cols=["id"])
|
|
|
|
total += _migrate_table(src, pg, "context_chunks", tenant_id,
|
|
columns=["id", "tenant_id", "document_id", "chunk_index", "text"],
|
|
conflict_cols=["id"])
|
|
src.close()
|
|
else:
|
|
print(f"Context DB not found at {ctx_path} — skipping")
|
|
|
|
# ── Incidents DB ──────────────────────────────────────────────────────────
|
|
inc_path = Path(args.incidents_db)
|
|
if inc_path.exists():
|
|
print(f"\nMigrating incidents DB: {inc_path}")
|
|
src = sqlite3.connect(str(inc_path))
|
|
|
|
total += _migrate_table(src, pg, "incidents", tenant_id,
|
|
columns=["id", "tenant_id", "label", "issue_type", "started_at", "ended_at",
|
|
"notes", "created_at", "severity"],
|
|
conflict_cols=["id"])
|
|
|
|
total += _migrate_table(src, pg, "received_bundles", tenant_id,
|
|
columns=["id", "tenant_id", "source_host", "issue_type", "label", "severity",
|
|
"started_at", "bundled_at", "entry_count", "bundle_json"],
|
|
conflict_cols=["id"])
|
|
|
|
total += _migrate_table(src, pg, "sent_bundles", tenant_id,
|
|
columns=["id", "tenant_id", "incident_id", "exported_at", "sanitized",
|
|
"entry_count", "bundle_json"],
|
|
conflict_cols=["id"])
|
|
src.close()
|
|
else:
|
|
print(f"Incidents DB not found at {inc_path} — skipping")
|
|
|
|
pg.close()
|
|
print(f"\nDone. Total rows inserted: {total}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|