"""Uniform connection wrapper over sqlite3 and psycopg3. Usage: with get_conn(db_path) as conn: conn.execute("SELECT ...", (param,)) conn.commit() For Postgres, db_path is ignored — all connections go through the shared pool. The pool is initialized lazily on first use from DATABASE_URL. """ from __future__ import annotations import logging import os import sqlite3 from contextlib import contextmanager from pathlib import Path from typing import Any, Generator from app.db.backend import BACKEND, Backend logger = logging.getLogger(__name__) _pool: Any = None # psycopg_pool.ConnectionPool, typed as Any to avoid import-time errors class _NopCursor: """Returned when a PRAGMA or other SQLite-only statement is skipped on Postgres.""" rowcount = 0 def fetchall(self) -> list: return [] def fetchone(self) -> None: return None def __iter__(self): return iter([]) class DbConn: """Wraps a raw sqlite3 or psycopg connection with a uniform execute API. Row access is always dict-like: - SQLite: conn.row_factory = sqlite3.Row (supports row["col"] and row[0]) - Postgres: row_factory = dict_row (returns plain dicts) """ __slots__ = ("_c", "_backend") def __init__(self, raw: Any, backend: Backend) -> None: self._c = raw self._backend = backend def _prep(self, sql: str) -> str | None: """Return None to skip (PRAGMA on Postgres), else return ready-to-execute SQL.""" stripped = sql.strip() if self._backend == Backend.POSTGRES and stripped.lower().startswith("pragma"): return None if self._backend == Backend.POSTGRES: return stripped.replace("?", "%s") return stripped def execute(self, sql: str, params: Any = ()) -> Any: prepared = self._prep(sql) if prepared is None: return _NopCursor() return self._c.execute(prepared, params) def executemany(self, sql: str, params_seq: Any) -> Any: prepared = self._prep(sql) if prepared is None: return _NopCursor() return self._c.executemany(prepared, params_seq) def commit(self) -> None: self._c.commit() def close(self) -> None: self._c.close() def __enter__(self) -> "DbConn": return self def __exit__(self, *_: Any) -> None: self.close() def _get_pool() -> Any: global _pool if _pool is not None: return _pool try: from psycopg_pool import ConnectionPool # type: ignore[import] url = os.environ["DATABASE_URL"] _pool = ConnectionPool(url, min_size=2, max_size=10, open=True) logger.info("Postgres connection pool opened (DATABASE_URL set)") return _pool except ImportError as exc: raise RuntimeError( "psycopg[binary,pool] is required for Postgres backend. " "Run: pip install 'psycopg[binary,pool]'" ) from exc except KeyError: raise RuntimeError("DATABASE_URL must be set when using Postgres backend") from None @contextmanager def get_conn(db_path: Path | None = None) -> Generator[DbConn, None, None]: """Yield a DbConn backed by sqlite3 (db_path required) or the Postgres pool.""" if BACKEND == Backend.POSTGRES: pool = _get_pool() from psycopg.rows import dict_row # type: ignore[import] with pool.connection() as raw: raw.row_factory = dict_row yield DbConn(raw, BACKEND) else: if db_path is None: raise ValueError("db_path is required for SQLite backend") raw = sqlite3.connect(str(db_path), timeout=90.0) raw.row_factory = sqlite3.Row try: raw.execute("PRAGMA journal_mode=WAL") raw.execute("PRAGMA busy_timeout=90000") raw.execute("PRAGMA foreign_keys=ON") yield DbConn(raw, BACKEND) finally: raw.close() def close_pool() -> None: """Close the Postgres connection pool — call during application shutdown.""" global _pool if _pool is not None: _pool.close() _pool = None logger.info("Postgres connection pool closed")