- Add app/db/ abstraction layer: Backend enum, DbConn wrapper, dialect helper (q() for ? vs %s paramstyle), get_conn(), tenant_id() - Auto-detect backend from DATABASE_URL; SQLite remains default when unset — no config change for local deployments - Add tenant_id column to all three logical DBs (main, context, incidents); idempotent ALTER TABLE migration runs before schema scripts on existing DBs - All INSERTs inject tenant_id; SELECTs use (tenant_id = ? OR tenant_id = '') for backward compat with pre-namespacing rows - Add docker-compose.yml with named volume turnstone_pgdata (survives rebuilds) and optional external Postgres support via DATABASE_URL override - Add scripts/migrate_sqlite_to_postgres.py — one-shot idempotent migration for existing SQLite data; ON CONFLICT DO NOTHING for safe re-runs - Fix SSH glean path in pipeline.py to use ensure_schema + get_conn (was still using raw sqlite3.connect + old _SCHEMA without tenant_id) - Fix FTS5 JOIN ambiguity: qualify repeat_count as f.repeat_count in search - Update all tests to use ensure_*_schema fixtures; add row_factory where needed - 394/394 tests passing Closes: #42 Closes: #50
93 lines
3.5 KiB
Python
93 lines
3.5 KiB
Python
"""Per-backend SQL fragments and placeholder rewriting.
|
|
|
|
All production SQL should be written with SQLite-style `?` placeholders.
|
|
Call q(sql) before passing to execute/executemany — it rewrites to %s for
|
|
Postgres and leaves SQLite queries untouched.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from app.db.backend import BACKEND, Backend
|
|
|
|
|
|
def q(sql: str) -> str:
|
|
"""Rewrite ? placeholders to %s for Postgres; no-op for SQLite."""
|
|
if BACKEND == Backend.POSTGRES:
|
|
return sql.replace("?", "%s")
|
|
return sql
|
|
|
|
|
|
class _Fragments:
|
|
"""SQL fragments that differ between backends."""
|
|
|
|
@property
|
|
def insert_or_ignore(self) -> str:
|
|
return "INSERT" if BACKEND == Backend.POSTGRES else "INSERT OR IGNORE"
|
|
|
|
@property
|
|
def on_conflict_ignore(self) -> str:
|
|
# Caller must substitute the column name(s) at use time when using Postgres.
|
|
# For log_entries: ON CONFLICT (tenant_id, id) DO NOTHING
|
|
# For generic use this property is a no-op sentinel; prefer insert_ignore_into().
|
|
return ""
|
|
|
|
def insert_ignore_entries(self) -> str:
|
|
"""Full INSERT ... ON CONFLICT clause for log_entries."""
|
|
if BACKEND == Backend.POSTGRES:
|
|
return "INSERT INTO log_entries"
|
|
return "INSERT OR IGNORE INTO log_entries"
|
|
|
|
def entries_conflict_clause(self) -> str:
|
|
if BACKEND == Backend.POSTGRES:
|
|
return "ON CONFLICT (tenant_id, id) DO NOTHING"
|
|
return ""
|
|
|
|
def fingerprint_upsert(self) -> str:
|
|
if BACKEND == Backend.POSTGRES:
|
|
return (
|
|
"INSERT INTO glean_fingerprints (tenant_id, path, mtime, size, gleaned_at)"
|
|
" VALUES (%s, %s, %s, %s, %s)"
|
|
" ON CONFLICT (tenant_id, path)"
|
|
" DO UPDATE SET mtime=EXCLUDED.mtime, size=EXCLUDED.size, gleaned_at=EXCLUDED.gleaned_at"
|
|
)
|
|
return (
|
|
"INSERT OR REPLACE INTO glean_fingerprints (tenant_id, path, mtime, size, gleaned_at)"
|
|
" VALUES (?,?,?,?,?)"
|
|
)
|
|
|
|
def source_group_expr(self, col: str = "source_id") -> str:
|
|
"""SQL expression that collapses prefix:host:unit → prefix:host stem."""
|
|
if BACKEND == Backend.POSTGRES:
|
|
return f"""
|
|
CASE
|
|
WHEN array_length(string_to_array({col}, ':'), 1) >= 3
|
|
THEN split_part({col}, ':', 1) || ':' || split_part({col}, ':', 2)
|
|
ELSE {col}
|
|
END
|
|
"""
|
|
return f"""
|
|
CASE
|
|
WHEN INSTR(SUBSTR({col}, INSTR({col}, ':')+1), ':') > 0
|
|
THEN SUBSTR({col}, 1,
|
|
INSTR({col}, ':')
|
|
+ INSTR(SUBSTR({col}, INSTR({col}, ':')+1), ':')
|
|
- 1)
|
|
ELSE {col}
|
|
END
|
|
"""
|
|
|
|
def fts_match_clause(self) -> str:
|
|
"""WHERE clause fragment for FTS query. Caller supplies the query param."""
|
|
if BACKEND == Backend.POSTGRES:
|
|
return "text_tsv @@ websearch_to_tsquery('english', %s)"
|
|
return "log_fts MATCH ?"
|
|
|
|
def fts_rank_expr(self) -> str:
|
|
"""ORDER BY expression for FTS rank (best match first). Postgres needs the query twice."""
|
|
if BACKEND == Backend.POSTGRES:
|
|
# ts_rank returns 0..1 where higher is better; pass the query again as param
|
|
return "ts_rank(text_tsv, websearch_to_tsquery('english', %s)) DESC"
|
|
# FTS5 rank is negative BM25; ASC = most-negative = best match
|
|
return "rank ASC"
|
|
|
|
|
|
frag = _Fragments()
|