"""Per-backend SQL fragments and placeholder rewriting. All production SQL should be written with SQLite-style `?` placeholders. Call q(sql) before passing to execute/executemany — it rewrites to %s for Postgres and leaves SQLite queries untouched. """ from __future__ import annotations from app.db.backend import BACKEND, Backend def q(sql: str) -> str: """Rewrite ? placeholders to %s for Postgres; no-op for SQLite.""" if BACKEND == Backend.POSTGRES: return sql.replace("?", "%s") return sql class _Fragments: """SQL fragments that differ between backends.""" @property def insert_or_ignore(self) -> str: return "INSERT" if BACKEND == Backend.POSTGRES else "INSERT OR IGNORE" @property def on_conflict_ignore(self) -> str: # Caller must substitute the column name(s) at use time when using Postgres. # For log_entries: ON CONFLICT (tenant_id, id) DO NOTHING # For generic use this property is a no-op sentinel; prefer insert_ignore_into(). return "" def insert_ignore_entries(self) -> str: """Full INSERT ... ON CONFLICT clause for log_entries.""" if BACKEND == Backend.POSTGRES: return "INSERT INTO log_entries" return "INSERT OR IGNORE INTO log_entries" def entries_conflict_clause(self) -> str: if BACKEND == Backend.POSTGRES: return "ON CONFLICT (tenant_id, id) DO NOTHING" return "" def fingerprint_upsert(self) -> str: if BACKEND == Backend.POSTGRES: return ( "INSERT INTO glean_fingerprints (tenant_id, path, mtime, size, gleaned_at)" " VALUES (%s, %s, %s, %s, %s)" " ON CONFLICT (tenant_id, path)" " DO UPDATE SET mtime=EXCLUDED.mtime, size=EXCLUDED.size, gleaned_at=EXCLUDED.gleaned_at" ) return ( "INSERT OR REPLACE INTO glean_fingerprints (tenant_id, path, mtime, size, gleaned_at)" " VALUES (?,?,?,?,?)" ) def source_group_expr(self, col: str = "source_id") -> str: """SQL expression that collapses prefix:host:unit → prefix:host stem.""" if BACKEND == Backend.POSTGRES: return f""" CASE WHEN array_length(string_to_array({col}, ':'), 1) >= 3 THEN split_part({col}, ':', 1) || ':' || split_part({col}, ':', 2) ELSE {col} END """ return f""" CASE WHEN INSTR(SUBSTR({col}, INSTR({col}, ':')+1), ':') > 0 THEN SUBSTR({col}, 1, INSTR({col}, ':') + INSTR(SUBSTR({col}, INSTR({col}, ':')+1), ':') - 1) ELSE {col} END """ def fts_match_clause(self) -> str: """WHERE clause fragment for FTS query. Caller supplies the query param.""" if BACKEND == Backend.POSTGRES: return "text_tsv @@ websearch_to_tsquery('english', %s)" return "log_fts MATCH ?" def fts_rank_expr(self) -> str: """ORDER BY expression for FTS rank (best match first). Postgres needs the query twice.""" if BACKEND == Backend.POSTGRES: # ts_rank returns 0..1 where higher is better; pass the query again as param return "ts_rank(text_tsv, websearch_to_tsquery('english', %s)) DESC" # FTS5 rank is negative BM25; ASC = most-negative = best match return "rank ASC" frag = _Fragments()