- Add app/db/ abstraction layer: Backend enum, DbConn wrapper, dialect helper (q() for ? vs %s paramstyle), get_conn(), tenant_id() - Auto-detect backend from DATABASE_URL; SQLite remains default when unset — no config change for local deployments - Add tenant_id column to all three logical DBs (main, context, incidents); idempotent ALTER TABLE migration runs before schema scripts on existing DBs - All INSERTs inject tenant_id; SELECTs use (tenant_id = ? OR tenant_id = '') for backward compat with pre-namespacing rows - Add docker-compose.yml with named volume turnstone_pgdata (survives rebuilds) and optional external Postgres support via DATABASE_URL override - Add scripts/migrate_sqlite_to_postgres.py — one-shot idempotent migration for existing SQLite data; ON CONFLICT DO NOTHING for safe re-runs - Fix SSH glean path in pipeline.py to use ensure_schema + get_conn (was still using raw sqlite3.connect + old _SCHEMA without tenant_id) - Fix FTS5 JOIN ambiguity: qualify repeat_count as f.repeat_count in search - Update all tests to use ensure_*_schema fixtures; add row_factory where needed - 394/394 tests passing Closes: #42 Closes: #50
136 lines
4.1 KiB
Python
136 lines
4.1 KiB
Python
"""Uniform connection wrapper over sqlite3 and psycopg3.
|
|
|
|
Usage:
|
|
with get_conn(db_path) as conn:
|
|
conn.execute("SELECT ...", (param,))
|
|
conn.commit()
|
|
|
|
For Postgres, db_path is ignored — all connections go through the shared pool.
|
|
The pool is initialized lazily on first use from DATABASE_URL.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import sqlite3
|
|
from contextlib import contextmanager
|
|
from pathlib import Path
|
|
from typing import Any, Generator
|
|
|
|
from app.db.backend import BACKEND, Backend
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_pool: Any = None # psycopg_pool.ConnectionPool, typed as Any to avoid import-time errors
|
|
|
|
|
|
class _NopCursor:
|
|
"""Returned when a PRAGMA or other SQLite-only statement is skipped on Postgres."""
|
|
rowcount = 0
|
|
|
|
def fetchall(self) -> list:
|
|
return []
|
|
|
|
def fetchone(self) -> None:
|
|
return None
|
|
|
|
def __iter__(self):
|
|
return iter([])
|
|
|
|
|
|
class DbConn:
|
|
"""Wraps a raw sqlite3 or psycopg connection with a uniform execute API.
|
|
|
|
Row access is always dict-like:
|
|
- SQLite: conn.row_factory = sqlite3.Row (supports row["col"] and row[0])
|
|
- Postgres: row_factory = dict_row (returns plain dicts)
|
|
"""
|
|
|
|
__slots__ = ("_c", "_backend")
|
|
|
|
def __init__(self, raw: Any, backend: Backend) -> None:
|
|
self._c = raw
|
|
self._backend = backend
|
|
|
|
def _prep(self, sql: str) -> str | None:
|
|
"""Return None to skip (PRAGMA on Postgres), else return ready-to-execute SQL."""
|
|
stripped = sql.strip()
|
|
if self._backend == Backend.POSTGRES and stripped.lower().startswith("pragma"):
|
|
return None
|
|
if self._backend == Backend.POSTGRES:
|
|
return stripped.replace("?", "%s")
|
|
return stripped
|
|
|
|
def execute(self, sql: str, params: Any = ()) -> Any:
|
|
prepared = self._prep(sql)
|
|
if prepared is None:
|
|
return _NopCursor()
|
|
return self._c.execute(prepared, params)
|
|
|
|
def executemany(self, sql: str, params_seq: Any) -> Any:
|
|
prepared = self._prep(sql)
|
|
if prepared is None:
|
|
return _NopCursor()
|
|
return self._c.executemany(prepared, params_seq)
|
|
|
|
def commit(self) -> None:
|
|
self._c.commit()
|
|
|
|
def close(self) -> None:
|
|
self._c.close()
|
|
|
|
def __enter__(self) -> "DbConn":
|
|
return self
|
|
|
|
def __exit__(self, *_: Any) -> None:
|
|
self.close()
|
|
|
|
|
|
def _get_pool() -> Any:
|
|
global _pool
|
|
if _pool is not None:
|
|
return _pool
|
|
try:
|
|
from psycopg_pool import ConnectionPool # type: ignore[import]
|
|
url = os.environ["DATABASE_URL"]
|
|
_pool = ConnectionPool(url, min_size=2, max_size=10, open=True)
|
|
logger.info("Postgres connection pool opened (DATABASE_URL set)")
|
|
return _pool
|
|
except ImportError as exc:
|
|
raise RuntimeError(
|
|
"psycopg[binary,pool] is required for Postgres backend. "
|
|
"Run: pip install 'psycopg[binary,pool]'"
|
|
) from exc
|
|
except KeyError:
|
|
raise RuntimeError("DATABASE_URL must be set when using Postgres backend") from None
|
|
|
|
|
|
@contextmanager
|
|
def get_conn(db_path: Path | None = None) -> Generator[DbConn, None, None]:
|
|
"""Yield a DbConn backed by sqlite3 (db_path required) or the Postgres pool."""
|
|
if BACKEND == Backend.POSTGRES:
|
|
pool = _get_pool()
|
|
from psycopg.rows import dict_row # type: ignore[import]
|
|
with pool.connection() as raw:
|
|
raw.row_factory = dict_row
|
|
yield DbConn(raw, BACKEND)
|
|
else:
|
|
if db_path is None:
|
|
raise ValueError("db_path is required for SQLite backend")
|
|
raw = sqlite3.connect(str(db_path), timeout=30.0)
|
|
raw.row_factory = sqlite3.Row
|
|
try:
|
|
raw.execute("PRAGMA journal_mode=WAL")
|
|
raw.execute("PRAGMA foreign_keys=ON")
|
|
yield DbConn(raw, BACKEND)
|
|
finally:
|
|
raw.close()
|
|
|
|
|
|
def close_pool() -> None:
|
|
"""Close the Postgres connection pool — call during application shutdown."""
|
|
global _pool
|
|
if _pool is not None:
|
|
_pool.close()
|
|
_pool = None
|
|
logger.info("Postgres connection pool closed")
|