feat: dual-backend SQLite/Postgres + multi-tenant source namespacing

- Add app/db/ abstraction layer: Backend enum, DbConn wrapper,
  dialect helper (q() for ? vs %s paramstyle), get_conn(), tenant_id()
- Auto-detect backend from DATABASE_URL; SQLite remains default when
  unset — no config change for local deployments
- Add tenant_id column to all three logical DBs (main, context, incidents);
  idempotent ALTER TABLE migration runs before schema scripts on existing DBs
- All INSERTs inject tenant_id; SELECTs use (tenant_id = ? OR tenant_id = '')
  for backward compat with pre-namespacing rows
- Add docker-compose.yml with named volume turnstone_pgdata (survives rebuilds)
  and optional external Postgres support via DATABASE_URL override
- Add scripts/migrate_sqlite_to_postgres.py — one-shot idempotent migration
  for existing SQLite data; ON CONFLICT DO NOTHING for safe re-runs
- Fix SSH glean path in pipeline.py to use ensure_schema + get_conn
  (was still using raw sqlite3.connect + old _SCHEMA without tenant_id)
- Fix FTS5 JOIN ambiguity: qualify repeat_count as f.repeat_count in search
- Update all tests to use ensure_*_schema fixtures; add row_factory where needed
- 394/394 tests passing

Closes: #42
Closes: #50
This commit is contained in:
pyr0ball 2026-06-08 08:37:54 -07:00
parent 1de156ebde
commit 0311d72e53
26 changed files with 1584 additions and 661 deletions

View file

@ -1,12 +1,13 @@
"""Context fact and document CRUD — MIT licensed.""" """Context fact and document CRUD — MIT licensed."""
from __future__ import annotations from __future__ import annotations
import sqlite3
import uuid import uuid
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from app.db import get_conn, resolve_tenant_id
@dataclass(frozen=True) @dataclass(frozen=True)
class ContextFact: class ContextFact:
@ -28,19 +29,8 @@ class ContextDocument:
uploaded_at: str uploaded_at: str
def _connect(db_path: Path) -> sqlite3.Connection:
# timeout=30: retry for up to 30 s when another writer (e.g. the glean
# collector) holds a WAL write lock. PRAGMA busy_timeout is a SQLite-level
# hint that operates after the connection is open; the Python sqlite3 module's
# own retry loop is controlled solely by this timeout= argument.
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
return conn
def add_fact(db_path: Path, category: str, key: str, value: str, source: str | None = None) -> ContextFact: def add_fact(db_path: Path, category: str, key: str, value: str, source: str | None = None) -> ContextFact:
tid = resolve_tenant_id()
fact = ContextFact( fact = ContextFact(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
category=category, category=category,
@ -49,27 +39,28 @@ def add_fact(db_path: Path, category: str, key: str, value: str, source: str | N
source=source, source=source,
created_at=datetime.now(timezone.utc).isoformat(), created_at=datetime.now(timezone.utc).isoformat(),
) )
conn = _connect(db_path) with get_conn(db_path) as conn:
conn.execute( conn.execute(
"INSERT INTO context_facts(id, category, key, value, source, created_at) VALUES (?,?,?,?,?,?)", "INSERT INTO context_facts(id, tenant_id, category, key, value, source, created_at) VALUES (?,?,?,?,?,?,?)",
(fact.id, fact.category, fact.key, fact.value, fact.source, fact.created_at), (fact.id, tid, fact.category, fact.key, fact.value, fact.source, fact.created_at),
) )
conn.commit() conn.commit()
conn.close()
return fact return fact
def list_facts(db_path: Path, category: str | None = None) -> list[ContextFact]: def list_facts(db_path: Path, category: str | None = None) -> list[ContextFact]:
conn = _connect(db_path) tid = resolve_tenant_id()
if category: with get_conn(db_path) as conn:
rows = conn.execute( if category:
"SELECT * FROM context_facts WHERE category=? ORDER BY created_at", (category,) rows = conn.execute(
).fetchall() "SELECT * FROM context_facts WHERE category=? AND (tenant_id=? OR tenant_id='') ORDER BY created_at",
else: (category, tid),
rows = conn.execute( ).fetchall()
"SELECT * FROM context_facts ORDER BY category, created_at" else:
).fetchall() rows = conn.execute(
conn.close() "SELECT * FROM context_facts WHERE (tenant_id=? OR tenant_id='') ORDER BY category, created_at",
(tid,),
).fetchall()
return [ return [
ContextFact( ContextFact(
id=r["id"], category=r["category"], key=r["key"], id=r["id"], category=r["category"], key=r["key"],
@ -80,10 +71,13 @@ def list_facts(db_path: Path, category: str | None = None) -> list[ContextFact]:
def delete_fact(db_path: Path, fact_id: str) -> bool: def delete_fact(db_path: Path, fact_id: str) -> bool:
conn = _connect(db_path) tid = resolve_tenant_id()
cursor = conn.execute("DELETE FROM context_facts WHERE id=?", (fact_id,)) with get_conn(db_path) as conn:
conn.commit() cursor = conn.execute(
conn.close() "DELETE FROM context_facts WHERE id=? AND (tenant_id=? OR tenant_id='')",
(fact_id, tid),
)
conn.commit()
return cursor.rowcount > 0 return cursor.rowcount > 0
@ -94,6 +88,7 @@ def add_document(
full_text: str, full_text: str,
file_size: int | None = None, file_size: int | None = None,
) -> ContextDocument: ) -> ContextDocument:
tid = resolve_tenant_id()
doc = ContextDocument( doc = ContextDocument(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
filename=filename, filename=filename,
@ -102,24 +97,24 @@ def add_document(
file_size=file_size, file_size=file_size,
uploaded_at=datetime.now(timezone.utc).isoformat(), uploaded_at=datetime.now(timezone.utc).isoformat(),
) )
conn = _connect(db_path) with get_conn(db_path) as conn:
conn.execute( conn.execute(
"INSERT INTO context_documents(id, filename, doc_type, full_text, file_size, uploaded_at)" "INSERT INTO context_documents(id, tenant_id, filename, doc_type, full_text, file_size, uploaded_at)"
" VALUES (?,?,?,?,?,?)", " VALUES (?,?,?,?,?,?,?)",
(doc.id, doc.filename, doc.doc_type, doc.full_text, doc.file_size, doc.uploaded_at), (doc.id, tid, doc.filename, doc.doc_type, doc.full_text, doc.file_size, doc.uploaded_at),
) )
conn.commit() conn.commit()
conn.close()
return doc return doc
def list_documents(db_path: Path) -> list[ContextDocument]: def list_documents(db_path: Path) -> list[ContextDocument]:
conn = _connect(db_path) tid = resolve_tenant_id()
rows = conn.execute( with get_conn(db_path) as conn:
"SELECT id, filename, doc_type, full_text, file_size, uploaded_at" rows = conn.execute(
" FROM context_documents ORDER BY uploaded_at DESC" "SELECT id, filename, doc_type, full_text, file_size, uploaded_at"
).fetchall() " FROM context_documents WHERE (tenant_id=? OR tenant_id='') ORDER BY uploaded_at DESC",
conn.close() (tid,),
).fetchall()
return [ return [
ContextDocument( ContextDocument(
id=r["id"], filename=r["filename"], doc_type=r["doc_type"], id=r["id"], filename=r["filename"], doc_type=r["doc_type"],
@ -130,8 +125,11 @@ def list_documents(db_path: Path) -> list[ContextDocument]:
def delete_document(db_path: Path, doc_id: str) -> bool: def delete_document(db_path: Path, doc_id: str) -> bool:
conn = _connect(db_path) tid = resolve_tenant_id()
cursor = conn.execute("DELETE FROM context_documents WHERE id=?", (doc_id,)) with get_conn(db_path) as conn:
conn.commit() cursor = conn.execute(
conn.close() "DELETE FROM context_documents WHERE id=? AND (tenant_id=? OR tenant_id='')",
(doc_id, tid),
)
conn.commit()
return cursor.rowcount > 0 return cursor.rowcount > 0

36
app/db/__init__.py Normal file
View file

@ -0,0 +1,36 @@
"""Turnstone database abstraction — unified SQLite / Postgres interface.
Public API:
BACKEND Backend.SQLITE or Backend.POSTGRES
get_conn(path) context manager yielding a DbConn
resolve_tenant_id() this node's tenant ID (env or hostname)
q(sql) rewrite ? placeholders to %s for Postgres
frag SQL fragment helpers (insert_or_ignore, source_group_expr, )
ensure_schema idempotent schema init
close_pool call during shutdown when using Postgres
"""
from app.db.backend import BACKEND, Backend
from app.db.conn import DbConn, close_pool, get_conn
from app.db.dialect import frag, q
from app.db.schema import (
ensure_context_schema,
ensure_incidents_schema,
ensure_schema,
migrate_incidents_to_dedicated_db,
)
from app.db.tenant import resolve_tenant_id
__all__ = [
"BACKEND",
"Backend",
"DbConn",
"close_pool",
"get_conn",
"frag",
"q",
"ensure_schema",
"ensure_context_schema",
"ensure_incidents_schema",
"migrate_incidents_to_dedicated_db",
"resolve_tenant_id",
]

20
app/db/backend.py Normal file
View file

@ -0,0 +1,20 @@
"""Backend detection — SQLITE (default) or POSTGRES based on DATABASE_URL."""
from __future__ import annotations
import os
from enum import Enum
class Backend(Enum):
SQLITE = "sqlite"
POSTGRES = "postgres"
def _detect() -> Backend:
url = os.environ.get("DATABASE_URL", "")
if url.startswith(("postgresql://", "postgres://", "postgresql+psycopg://")):
return Backend.POSTGRES
return Backend.SQLITE
BACKEND: Backend = _detect()

136
app/db/conn.py Normal file
View file

@ -0,0 +1,136 @@
"""Uniform connection wrapper over sqlite3 and psycopg3.
Usage:
with get_conn(db_path) as conn:
conn.execute("SELECT ...", (param,))
conn.commit()
For Postgres, db_path is ignored all connections go through the shared pool.
The pool is initialized lazily on first use from DATABASE_URL.
"""
from __future__ import annotations
import logging
import os
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Generator
from app.db.backend import BACKEND, Backend
logger = logging.getLogger(__name__)
_pool: Any = None # psycopg_pool.ConnectionPool, typed as Any to avoid import-time errors
class _NopCursor:
"""Returned when a PRAGMA or other SQLite-only statement is skipped on Postgres."""
rowcount = 0
def fetchall(self) -> list:
return []
def fetchone(self) -> None:
return None
def __iter__(self):
return iter([])
class DbConn:
"""Wraps a raw sqlite3 or psycopg connection with a uniform execute API.
Row access is always dict-like:
- SQLite: conn.row_factory = sqlite3.Row (supports row["col"] and row[0])
- Postgres: row_factory = dict_row (returns plain dicts)
"""
__slots__ = ("_c", "_backend")
def __init__(self, raw: Any, backend: Backend) -> None:
self._c = raw
self._backend = backend
def _prep(self, sql: str) -> str | None:
"""Return None to skip (PRAGMA on Postgres), else return ready-to-execute SQL."""
stripped = sql.strip()
if self._backend == Backend.POSTGRES and stripped.lower().startswith("pragma"):
return None
if self._backend == Backend.POSTGRES:
return stripped.replace("?", "%s")
return stripped
def execute(self, sql: str, params: Any = ()) -> Any:
prepared = self._prep(sql)
if prepared is None:
return _NopCursor()
return self._c.execute(prepared, params)
def executemany(self, sql: str, params_seq: Any) -> Any:
prepared = self._prep(sql)
if prepared is None:
return _NopCursor()
return self._c.executemany(prepared, params_seq)
def commit(self) -> None:
self._c.commit()
def close(self) -> None:
self._c.close()
def __enter__(self) -> "DbConn":
return self
def __exit__(self, *_: Any) -> None:
self.close()
def _get_pool() -> Any:
global _pool
if _pool is not None:
return _pool
try:
from psycopg_pool import ConnectionPool # type: ignore[import]
url = os.environ["DATABASE_URL"]
_pool = ConnectionPool(url, min_size=2, max_size=10, open=True)
logger.info("Postgres connection pool opened (DATABASE_URL set)")
return _pool
except ImportError as exc:
raise RuntimeError(
"psycopg[binary,pool] is required for Postgres backend. "
"Run: pip install 'psycopg[binary,pool]'"
) from exc
except KeyError:
raise RuntimeError("DATABASE_URL must be set when using Postgres backend") from None
@contextmanager
def get_conn(db_path: Path | None = None) -> Generator[DbConn, None, None]:
"""Yield a DbConn backed by sqlite3 (db_path required) or the Postgres pool."""
if BACKEND == Backend.POSTGRES:
pool = _get_pool()
from psycopg.rows import dict_row # type: ignore[import]
with pool.connection() as raw:
raw.row_factory = dict_row
yield DbConn(raw, BACKEND)
else:
if db_path is None:
raise ValueError("db_path is required for SQLite backend")
raw = sqlite3.connect(str(db_path), timeout=30.0)
raw.row_factory = sqlite3.Row
try:
raw.execute("PRAGMA journal_mode=WAL")
raw.execute("PRAGMA foreign_keys=ON")
yield DbConn(raw, BACKEND)
finally:
raw.close()
def close_pool() -> None:
"""Close the Postgres connection pool — call during application shutdown."""
global _pool
if _pool is not None:
_pool.close()
_pool = None
logger.info("Postgres connection pool closed")

93
app/db/dialect.py Normal file
View file

@ -0,0 +1,93 @@
"""Per-backend SQL fragments and placeholder rewriting.
All production SQL should be written with SQLite-style `?` placeholders.
Call q(sql) before passing to execute/executemany it rewrites to %s for
Postgres and leaves SQLite queries untouched.
"""
from __future__ import annotations
from app.db.backend import BACKEND, Backend
def q(sql: str) -> str:
"""Rewrite ? placeholders to %s for Postgres; no-op for SQLite."""
if BACKEND == Backend.POSTGRES:
return sql.replace("?", "%s")
return sql
class _Fragments:
"""SQL fragments that differ between backends."""
@property
def insert_or_ignore(self) -> str:
return "INSERT" if BACKEND == Backend.POSTGRES else "INSERT OR IGNORE"
@property
def on_conflict_ignore(self) -> str:
# Caller must substitute the column name(s) at use time when using Postgres.
# For log_entries: ON CONFLICT (tenant_id, id) DO NOTHING
# For generic use this property is a no-op sentinel; prefer insert_ignore_into().
return ""
def insert_ignore_entries(self) -> str:
"""Full INSERT ... ON CONFLICT clause for log_entries."""
if BACKEND == Backend.POSTGRES:
return "INSERT INTO log_entries"
return "INSERT OR IGNORE INTO log_entries"
def entries_conflict_clause(self) -> str:
if BACKEND == Backend.POSTGRES:
return "ON CONFLICT (tenant_id, id) DO NOTHING"
return ""
def fingerprint_upsert(self) -> str:
if BACKEND == Backend.POSTGRES:
return (
"INSERT INTO glean_fingerprints (tenant_id, path, mtime, size, gleaned_at)"
" VALUES (%s, %s, %s, %s, %s)"
" ON CONFLICT (tenant_id, path)"
" DO UPDATE SET mtime=EXCLUDED.mtime, size=EXCLUDED.size, gleaned_at=EXCLUDED.gleaned_at"
)
return (
"INSERT OR REPLACE INTO glean_fingerprints (tenant_id, path, mtime, size, gleaned_at)"
" VALUES (?,?,?,?,?)"
)
def source_group_expr(self, col: str = "source_id") -> str:
"""SQL expression that collapses prefix:host:unit → prefix:host stem."""
if BACKEND == Backend.POSTGRES:
return f"""
CASE
WHEN array_length(string_to_array({col}, ':'), 1) >= 3
THEN split_part({col}, ':', 1) || ':' || split_part({col}, ':', 2)
ELSE {col}
END
"""
return f"""
CASE
WHEN INSTR(SUBSTR({col}, INSTR({col}, ':')+1), ':') > 0
THEN SUBSTR({col}, 1,
INSTR({col}, ':')
+ INSTR(SUBSTR({col}, INSTR({col}, ':')+1), ':')
- 1)
ELSE {col}
END
"""
def fts_match_clause(self) -> str:
"""WHERE clause fragment for FTS query. Caller supplies the query param."""
if BACKEND == Backend.POSTGRES:
return "text_tsv @@ websearch_to_tsquery('english', %s)"
return "log_fts MATCH ?"
def fts_rank_expr(self) -> str:
"""ORDER BY expression for FTS rank (best match first). Postgres needs the query twice."""
if BACKEND == Backend.POSTGRES:
# ts_rank returns 0..1 where higher is better; pass the query again as param
return "ts_rank(text_tsv, websearch_to_tsquery('english', %s)) DESC"
# FTS5 rank is negative BM25; ASC = most-negative = best match
return "rank ASC"
frag = _Fragments()

454
app/db/schema.py Normal file
View file

@ -0,0 +1,454 @@
"""Schema creation and idempotent migrations for all Turnstone databases.
Three logical databases (main, context, incidents) map to:
- SQLite: three separate .db files (avoids write-lock contention)
- Postgres: three table-groups in one physical DB (row-level locking makes separation unnecessary)
All ensure_* functions are idempotent: safe to call on every startup.
"""
from __future__ import annotations
import logging
import sqlite3
from pathlib import Path
from app.db.backend import BACKEND, Backend
from app.db.conn import get_conn
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# SQLite DDL — kept as executescript strings (SQLite only)
# ---------------------------------------------------------------------------
_MAIN_SCHEMA_SQLITE = """
CREATE TABLE IF NOT EXISTS log_entries (
id TEXT NOT NULL,
tenant_id TEXT NOT NULL DEFAULT '',
source_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
timestamp_raw TEXT,
timestamp_iso TEXT,
ingest_time TEXT NOT NULL,
severity TEXT,
repeat_count INTEGER DEFAULT 1,
out_of_order INTEGER DEFAULT 0,
matched_patterns TEXT DEFAULT '[]',
text TEXT NOT NULL,
PRIMARY KEY (tenant_id, id)
);
CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id);
CREATE INDEX IF NOT EXISTS idx_tenant_src ON log_entries(tenant_id, source_id);
CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso);
CREATE INDEX IF NOT EXISTS idx_ts_repeat ON log_entries(timestamp_iso, repeat_count);
CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(tenant_id, severity);
CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns);
CREATE TABLE IF NOT EXISTS glean_fingerprints (
tenant_id TEXT NOT NULL DEFAULT '',
path TEXT NOT NULL,
mtime REAL NOT NULL,
size INTEGER NOT NULL,
gleaned_at TEXT NOT NULL,
PRIMARY KEY (tenant_id, path)
);
CREATE TABLE IF NOT EXISTS incidents (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
label TEXT NOT NULL,
issue_type TEXT NOT NULL DEFAULT '',
started_at TEXT,
ended_at TEXT,
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'medium'
);
CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at);
CREATE INDEX IF NOT EXISTS idx_incidents_tenant ON incidents(tenant_id);
CREATE TABLE IF NOT EXISTS received_bundles (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
source_host TEXT NOT NULL,
issue_type TEXT NOT NULL DEFAULT '',
label TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'medium',
started_at TEXT,
bundled_at TEXT NOT NULL,
entry_count INTEGER NOT NULL DEFAULT 0,
bundle_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at);
CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type);
CREATE TABLE IF NOT EXISTS sent_bundles (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
incident_id TEXT NOT NULL,
exported_at TEXT NOT NULL,
sanitized INTEGER NOT NULL DEFAULT 0,
entry_count INTEGER NOT NULL DEFAULT 0,
bundle_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sent_bundles_incident ON sent_bundles(incident_id);
CREATE INDEX IF NOT EXISTS idx_sent_bundles_time ON sent_bundles(exported_at);
CREATE TABLE IF NOT EXISTS blocklist_candidates (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
domain_or_ip TEXT NOT NULL,
source_device_ip TEXT,
source_device_name TEXT,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL,
hit_count INTEGER DEFAULT 1,
status TEXT DEFAULT 'pending',
pushed_at TEXT,
log_evidence TEXT DEFAULT '[]',
matched_rule TEXT,
llm_score REAL,
llm_reason TEXT
);
CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip);
CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status);
CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip);
CREATE INDEX IF NOT EXISTS idx_blocklist_tenant ON blocklist_candidates(tenant_id);
"""
_CONTEXT_SCHEMA_SQLITE = """
CREATE TABLE IF NOT EXISTS context_facts (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
category TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
source TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category);
CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key);
CREATE INDEX IF NOT EXISTS idx_facts_tenant ON context_facts(tenant_id);
CREATE TABLE IF NOT EXISTS context_documents (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
filename TEXT NOT NULL,
doc_type TEXT NOT NULL,
full_text TEXT NOT NULL,
file_size INTEGER,
uploaded_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_docs_tenant ON context_documents(tenant_id);
CREATE TABLE IF NOT EXISTS context_chunks (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
text TEXT NOT NULL,
embedding BLOB
);
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
CREATE INDEX IF NOT EXISTS idx_chunks_tenant ON context_chunks(tenant_id);
"""
# ---------------------------------------------------------------------------
# Postgres DDL — executed statement-by-statement
# ---------------------------------------------------------------------------
_MAIN_SCHEMA_PG_STMTS = [
"""
CREATE TABLE IF NOT EXISTS log_entries (
id TEXT NOT NULL,
tenant_id TEXT NOT NULL DEFAULT '',
source_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
timestamp_raw TEXT,
timestamp_iso TEXT,
ingest_time TEXT NOT NULL,
severity TEXT,
repeat_count INTEGER DEFAULT 1,
out_of_order INTEGER DEFAULT 0,
matched_patterns TEXT DEFAULT '[]',
text TEXT NOT NULL,
text_tsv tsvector,
PRIMARY KEY (tenant_id, id)
)
""",
"CREATE INDEX IF NOT EXISTS idx_tenant_src ON log_entries(tenant_id, source_id)",
"CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso)",
"CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(tenant_id, severity)",
"CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns)",
"CREATE INDEX IF NOT EXISTS idx_fts_gin ON log_entries USING GIN(text_tsv)",
"""
CREATE OR REPLACE FUNCTION _ts_update_text_tsv() RETURNS trigger AS $$
BEGIN
NEW.text_tsv := to_tsvector('english', COALESCE(NEW.text, ''));
RETURN NEW;
END;
$$ LANGUAGE plpgsql
""",
"""
DO $$ BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_trigger WHERE tgname = 'trig_log_entries_tsv'
) THEN
CREATE TRIGGER trig_log_entries_tsv
BEFORE INSERT OR UPDATE OF text ON log_entries
FOR EACH ROW EXECUTE FUNCTION _ts_update_text_tsv();
END IF;
END $$
""",
"""
CREATE TABLE IF NOT EXISTS glean_fingerprints (
tenant_id TEXT NOT NULL DEFAULT '',
path TEXT NOT NULL,
mtime DOUBLE PRECISION NOT NULL,
size BIGINT NOT NULL,
gleaned_at TEXT NOT NULL,
PRIMARY KEY (tenant_id, path)
)
""",
"""
CREATE TABLE IF NOT EXISTS incidents (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
label TEXT NOT NULL,
issue_type TEXT NOT NULL DEFAULT '',
started_at TEXT,
ended_at TEXT,
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'medium'
)
""",
"CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at)",
"CREATE INDEX IF NOT EXISTS idx_incidents_tenant ON incidents(tenant_id)",
"""
CREATE TABLE IF NOT EXISTS received_bundles (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
source_host TEXT NOT NULL,
issue_type TEXT NOT NULL DEFAULT '',
label TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'medium',
started_at TEXT,
bundled_at TEXT NOT NULL,
entry_count INTEGER NOT NULL DEFAULT 0,
bundle_json TEXT NOT NULL
)
""",
"CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at)",
"CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type)",
"""
CREATE TABLE IF NOT EXISTS sent_bundles (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
incident_id TEXT NOT NULL,
exported_at TEXT NOT NULL,
sanitized INTEGER NOT NULL DEFAULT 0,
entry_count INTEGER NOT NULL DEFAULT 0,
bundle_json TEXT NOT NULL
)
""",
"CREATE INDEX IF NOT EXISTS idx_sent_bundles_incident ON sent_bundles(incident_id)",
"CREATE INDEX IF NOT EXISTS idx_sent_bundles_time ON sent_bundles(exported_at)",
"""
CREATE TABLE IF NOT EXISTS blocklist_candidates (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
domain_or_ip TEXT NOT NULL,
source_device_ip TEXT,
source_device_name TEXT,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL,
hit_count INTEGER DEFAULT 1,
status TEXT DEFAULT 'pending',
pushed_at TEXT,
log_evidence TEXT DEFAULT '[]',
matched_rule TEXT,
llm_score DOUBLE PRECISION,
llm_reason TEXT
)
""",
"CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip)",
"CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status)",
"CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip)",
"CREATE INDEX IF NOT EXISTS idx_blocklist_tenant ON blocklist_candidates(tenant_id)",
]
_CONTEXT_SCHEMA_PG_STMTS = [
"""
CREATE TABLE IF NOT EXISTS context_facts (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
category TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
source TEXT,
created_at TEXT NOT NULL
)
""",
"CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category)",
"CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key)",
"CREATE INDEX IF NOT EXISTS idx_facts_tenant ON context_facts(tenant_id)",
"""
CREATE TABLE IF NOT EXISTS context_documents (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
filename TEXT NOT NULL,
doc_type TEXT NOT NULL,
full_text TEXT NOT NULL,
file_size BIGINT,
uploaded_at TEXT NOT NULL
)
""",
"CREATE INDEX IF NOT EXISTS idx_docs_tenant ON context_documents(tenant_id)",
"""
CREATE TABLE IF NOT EXISTS context_chunks (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
text TEXT NOT NULL,
embedding BYTEA
)
""",
"CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id)",
"CREATE INDEX IF NOT EXISTS idx_chunks_tenant ON context_chunks(tenant_id)",
]
# ---------------------------------------------------------------------------
# SQLite additive column migrations — applied after CREATE TABLE on every boot
# ---------------------------------------------------------------------------
_MAIN_MIGRATIONS_SQLITE = [
"ALTER TABLE log_entries ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''",
"ALTER TABLE incidents ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE received_bundles ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE sent_bundles ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE blocklist_candidates ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE glean_fingerprints ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE glean_fingerprints ADD COLUMN mtime REAL",
"ALTER TABLE glean_fingerprints ADD COLUMN size INTEGER",
"ALTER TABLE glean_fingerprints ADD COLUMN gleaned_at TEXT",
]
_CONTEXT_MIGRATIONS_SQLITE = [
"ALTER TABLE context_facts ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE context_documents ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE context_chunks ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
]
def _run_sqlite_migrations(conn: sqlite3.Connection, stmts: list[str]) -> None:
for stmt in stmts:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass # column already exists or table not present yet — both are fine
def _run_pg_stmts(stmts: list[str]) -> None:
"""Execute Postgres DDL statements — each in its own transaction for IF NOT EXISTS safety."""
from psycopg import connect as pg_connect # type: ignore[import]
import os
url = os.environ["DATABASE_URL"]
with pg_connect(url, autocommit=True) as conn:
for stmt in stmts:
stripped = stmt.strip()
if stripped:
conn.execute(stripped)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def ensure_schema(db_path: Path) -> None:
"""Ensure main log/incidents/blocklist tables exist. Idempotent."""
if BACKEND == Backend.POSTGRES:
_run_pg_stmts(_MAIN_SCHEMA_PG_STMTS)
logger.debug("Postgres main schema verified")
return
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
# Migrations first: add tenant_id to existing tables BEFORE index creation touches it
_run_sqlite_migrations(conn, _MAIN_MIGRATIONS_SQLITE)
conn.commit()
conn.executescript(_MAIN_SCHEMA_SQLITE)
conn.close()
logger.debug("SQLite main schema verified at %s", db_path)
def ensure_context_schema(db_path: Path) -> None:
"""Ensure context KB tables exist. Idempotent."""
if BACKEND == Backend.POSTGRES:
_run_pg_stmts(_CONTEXT_SCHEMA_PG_STMTS)
logger.debug("Postgres context schema verified")
return
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
_run_sqlite_migrations(conn, _CONTEXT_MIGRATIONS_SQLITE)
conn.commit()
conn.executescript(_CONTEXT_SCHEMA_SQLITE)
conn.close()
logger.debug("SQLite context schema verified at %s", db_path)
def migrate_incidents_to_dedicated_db(main_db: Path, incidents_db: Path) -> int:
"""One-shot migration: copy incidents/bundles rows from main DB to incidents DB.
Safe to call on every startup rows already in incidents_db are skipped.
No-op for Postgres (single DB, no migration needed).
"""
if BACKEND == Backend.POSTGRES:
return 0
src = sqlite3.connect(str(main_db), timeout=30.0)
src.row_factory = sqlite3.Row
dst = sqlite3.connect(str(incidents_db), timeout=30.0)
migrated = 0
for table in ("incidents", "received_bundles", "sent_bundles"):
try:
rows = src.execute(f"SELECT * FROM {table}").fetchall() # noqa: S608
except sqlite3.OperationalError:
continue
if not rows:
continue
cols = ", ".join(rows[0].keys())
placeholders = ", ".join("?" * len(rows[0].keys()))
dst.executemany(
f"INSERT OR IGNORE INTO {table} ({cols}) VALUES ({placeholders})", # noqa: S608
[tuple(r) for r in rows],
)
migrated += len(rows)
dst.commit()
src.close()
dst.close()
return migrated
def ensure_incidents_schema(db_path: Path) -> None:
"""Ensure incidents/bundles tables exist. Idempotent.
For Postgres, incidents live in the same DB as log_entries (already created by
ensure_schema), so this is a no-op the tables were created above.
"""
if BACKEND == Backend.POSTGRES:
return
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
_run_sqlite_migrations(conn, _MAIN_MIGRATIONS_SQLITE)
conn.commit()
conn.executescript(_MAIN_SCHEMA_SQLITE)
conn.close()
logger.debug("SQLite incidents schema verified at %s", db_path)

12
app/db/tenant.py Normal file
View file

@ -0,0 +1,12 @@
"""Tenant ID resolution — TURNSTONE_TENANT_ID env var, hostname fallback."""
from __future__ import annotations
import os
import socket
from functools import lru_cache
@lru_cache(maxsize=1)
def resolve_tenant_id() -> str:
"""Return this node's tenant ID. Result is cached after first call."""
return os.environ.get("TURNSTONE_TENANT_ID") or socket.gethostname()

View file

@ -1,18 +1,19 @@
"""Upload adapter: processes file bytes and writes to context store — MIT licensed.""" """Upload adapter: processes file bytes and writes to context store — MIT licensed."""
from __future__ import annotations from __future__ import annotations
import sqlite3
import uuid import uuid
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from app.context.chunker import process_upload from app.context.chunker import process_upload
from app.context.store import add_document, add_fact from app.context.store import add_document, add_fact
from app.db import get_conn, resolve_tenant_id
def glean_upload(db_path: Path, filename: str, content: bytes) -> dict[str, Any]: def glean_upload(db_path: Path, filename: str, content: bytes) -> dict[str, Any]:
"""Process an uploaded file and write to context store. Returns result summary.""" """Process an uploaded file and write to context store. Returns result summary."""
doc_type, facts, chunks = process_upload(filename, content) doc_type, facts, chunks = process_upload(filename, content)
tid = resolve_tenant_id()
doc = add_document( doc = add_document(
db_path, db_path,
@ -25,15 +26,13 @@ def glean_upload(db_path: Path, filename: str, content: bytes) -> dict[str, Any]
for fact in facts: for fact in facts:
add_fact(db_path, fact.category, fact.key, fact.value, source="upload") add_fact(db_path, fact.category, fact.key, fact.value, source="upload")
conn = sqlite3.connect(str(db_path), timeout=30.0) with get_conn(db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL") for i, chunk_text in enumerate(chunks):
for i, chunk_text in enumerate(chunks): conn.execute(
conn.execute( "INSERT INTO context_chunks(id, tenant_id, document_id, chunk_index, text) VALUES (?,?,?,?,?)",
"INSERT INTO context_chunks(id, document_id, chunk_index, text) VALUES (?,?,?,?)", (str(uuid.uuid4()), tid, doc.id, i, chunk_text),
(str(uuid.uuid4()), doc.id, i, chunk_text), )
) conn.commit()
conn.commit()
conn.close()
return { return {
"document_id": doc.id, "document_id": doc.id,

View file

@ -1,12 +1,24 @@
"""Glean pipeline: auto-detect format, parse, write to SQLite.""" """Glean pipeline: auto-detect format, parse, write to SQLite or Postgres."""
from __future__ import annotations from __future__ import annotations
import json import json
import logging import logging
import re import re
import sqlite3 import sqlite3 # still used in migrate_incidents_to_dedicated_db (SQLite-only migration)
from pathlib import Path from pathlib import Path
from typing import Iterator from typing import Any, Iterator
from app.db import (
frag,
get_conn,
resolve_tenant_id,
)
from app.db.schema import (
ensure_context_schema,
ensure_incidents_schema,
ensure_schema,
migrate_incidents_to_dedicated_db,
)
import yaml import yaml
@ -169,127 +181,13 @@ CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
""" """
def ensure_schema(db_path: Path) -> None: # ensure_schema / ensure_context_schema / ensure_incidents_schema / migrate_incidents_to_dedicated_db
"""Create all tables and apply additive migrations. Safe to call on every startup.""" # are now implemented in app/db/schema.py and re-exported via app/db/__init__.py.
conn = sqlite3.connect(str(db_path), timeout=30.0) # The imports at the top of this file bring them in; these names are kept as module-level
conn.execute("PRAGMA journal_mode=WAL") # symbols so existing callers (rest.py, tests) still find them here without changes.
conn.executescript(_SCHEMA)
# Additive column migrations — ALTER TABLE silently skips if column exists
for stmt in [
"ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''",
]:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass
conn.commit()
conn.close()
def ensure_context_schema(db_path: Path) -> None: # _INCIDENTS_SCHEMA and its ensure_/migrate_ functions moved to app/db/schema.py
"""Create context KB tables in a dedicated database file.
Using a separate file from the main log DB means context fact writes never
contend with the high-throughput glean scheduler, which can hold the main
DB write lock for seconds at a time when flushing large journal batches.
"""
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.executescript(_CONTEXT_SCHEMA)
conn.commit()
conn.close()
_INCIDENTS_SCHEMA = """
CREATE TABLE IF NOT EXISTS incidents (
id TEXT PRIMARY KEY,
label TEXT NOT NULL,
issue_type TEXT NOT NULL DEFAULT '',
started_at TEXT,
ended_at TEXT,
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'medium'
);
CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at);
CREATE TABLE IF NOT EXISTS received_bundles (
id TEXT PRIMARY KEY,
source_host TEXT NOT NULL,
issue_type TEXT NOT NULL DEFAULT '',
label TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'medium',
started_at TEXT,
bundled_at TEXT NOT NULL,
entry_count INTEGER NOT NULL DEFAULT 0,
bundle_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at);
CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type);
CREATE TABLE IF NOT EXISTS sent_bundles (
id TEXT PRIMARY KEY,
incident_id TEXT NOT NULL,
exported_at TEXT NOT NULL,
sanitized INTEGER NOT NULL DEFAULT 0,
entry_count INTEGER NOT NULL DEFAULT 0,
bundle_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sent_bundles_incident ON sent_bundles(incident_id);
CREATE INDEX IF NOT EXISTS idx_sent_bundles_time ON sent_bundles(exported_at);
"""
def ensure_incidents_schema(db_path: Path) -> None:
"""Create incidents tables in a dedicated database file.
Using a separate file from the main log DB means incident writes never
contend with the FTS5 bulk-insert write lock held by the glean scheduler.
Mirrors the context_facts split (CONTEXT_DB_PATH / turnstone-context.db).
"""
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript(_INCIDENTS_SCHEMA)
for stmt in [
"ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''",
]:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass
conn.commit()
conn.close()
def migrate_incidents_to_dedicated_db(main_db: Path, incidents_db: Path) -> int:
"""One-shot migration: copy incidents/bundles rows from main DB to incidents DB.
Safe to call on every startup rows already present in incidents_db are
skipped via INSERT OR IGNORE. Returns the count of rows migrated.
"""
src = sqlite3.connect(str(main_db), timeout=30.0)
src.row_factory = sqlite3.Row
dst = sqlite3.connect(str(incidents_db), timeout=30.0)
migrated = 0
for table in ("incidents", "received_bundles", "sent_bundles"):
try:
rows = src.execute(f"SELECT * FROM {table}").fetchall() # noqa: S608
except sqlite3.OperationalError:
continue
if not rows:
continue
cols = ", ".join(rows[0].keys())
placeholders = ", ".join("?" * len(rows[0].keys()))
dst.executemany(
f"INSERT OR IGNORE INTO {table} ({cols}) VALUES ({placeholders})", # noqa: S608
[tuple(r) for r in rows],
)
migrated += len(rows)
dst.commit()
src.close()
dst.close()
return migrated
def _fingerprint(path: Path) -> tuple[float, int]: def _fingerprint(path: Path) -> tuple[float, int]:
@ -298,36 +196,28 @@ def _fingerprint(path: Path) -> tuple[float, int]:
return st.st_mtime, st.st_size return st.st_mtime, st.st_size
def _fp_unchanged(conn: sqlite3.Connection, path: Path, mtime: float, size: int) -> bool: def _fp_unchanged(conn: Any, path: Path, mtime: float, size: int) -> bool:
"""Return True only when the stored fingerprint exactly matches (mtime, size). """Return True only when the stored fingerprint exactly matches (mtime, size)."""
tid = resolve_tenant_id()
A smaller size (log rotation) or a larger size (new lines appended) both
return False so the caller re-gleams the file.
"""
row = conn.execute( row = conn.execute(
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?", "SELECT mtime, size FROM glean_fingerprints WHERE path = ? AND (tenant_id = ? OR tenant_id = '')",
(str(path),), (str(path), tid),
).fetchone() ).fetchone()
if row is None: if row is None:
return False return False
return row[0] == mtime and row[1] == size return row["mtime"] == mtime and row["size"] == size
def _save_fingerprint( def _save_fingerprint(
conn: sqlite3.Connection, conn: Any,
path: Path, path: Path,
mtime: float, mtime: float,
size: int, size: int,
gleaned_at: str, gleaned_at: str,
) -> None: ) -> None:
"""Upsert the fingerprint for *path* after a successful glean.""" """Upsert the fingerprint for *path* after a successful glean."""
conn.execute( tid = resolve_tenant_id()
""" conn.execute(frag.fingerprint_upsert(), (tid, str(path), mtime, size, gleaned_at))
INSERT OR REPLACE INTO glean_fingerprints (path, mtime, size, gleaned_at)
VALUES (?, ?, ?, ?)
""",
(str(path), mtime, size, gleaned_at),
)
def _detect_format(first_line: str) -> str: def _detect_format(first_line: str) -> str:
@ -400,18 +290,22 @@ def _parse_file(
yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time) yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time)
def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None: def _write_batch(conn: Any, batch: list[RetrievedEntry]) -> None:
conn.executemany( tid = resolve_tenant_id()
""" conflict = frag.entries_conflict_clause()
INSERT OR IGNORE INTO log_entries sql = f"""
(id, source_id, sequence, timestamp_raw, timestamp_iso, {frag.insert_ignore_entries()}
(tenant_id, id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order, ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text) matched_patterns, text)
VALUES (?,?,?,?,?,?,?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
""", {conflict}
"""
conn.executemany(
sql,
[ [
( (
e.entry_id, e.source_id, e.sequence, tid, e.entry_id, e.source_id, e.sequence,
e.timestamp_raw, e.timestamp_iso, e.ingest_time, e.timestamp_raw, e.timestamp_iso, e.ingest_time,
e.severity, e.repeat_count, int(e.out_of_order), e.severity, e.repeat_count, int(e.out_of_order),
json.dumps(list(e.matched_patterns)), e.text, json.dumps(list(e.matched_patterns)), e.text,
@ -435,46 +329,41 @@ def _glean_files(
ingest_time = now_iso() ingest_time = now_iso()
source_id_map = source_id_map or {} source_id_map = source_id_map or {}
conn = sqlite3.connect(str(db_path), timeout=30.0) ensure_schema(db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript(_SCHEMA)
conn.commit()
stats: dict[str, int] = {} with get_conn(db_path) as conn:
skipped: list[str] = [] stats: dict[str, int] = {}
skipped: list[str] = []
for log_file in files: for log_file in files:
source_id = source_id_map.get(log_file, log_file.stem) source_id = source_id_map.get(log_file, log_file.stem)
# Fingerprint check — skip files whose mtime+size haven't changed. mtime, size = _fingerprint(log_file)
mtime, size = _fingerprint(log_file) if not force and _fp_unchanged(conn, log_file, mtime, size):
if not force and _fp_unchanged(conn, log_file, mtime, size): logger.debug("Skipping unchanged file: %s", log_file.name)
logger.debug("Skipping unchanged file: %s", log_file.name) skipped.append(log_file.name)
skipped.append(log_file.name) stats[source_id] = stats.get(source_id, 0)
stats[source_id] = stats.get(source_id, 0) continue
continue
count = 0 count = 0
batch: list[RetrievedEntry] = [] batch: list[RetrievedEntry] = []
for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id): for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id):
batch.append(entry) batch.append(entry)
if len(batch) >= batch_size: if len(batch) >= batch_size:
_write_batch(conn, batch)
conn.commit()
count += len(batch)
batch.clear()
if batch:
_write_batch(conn, batch) _write_batch(conn, batch)
conn.commit() conn.commit()
count += len(batch) count += len(batch)
batch.clear()
if batch: _save_fingerprint(conn, log_file, mtime, size, ingest_time)
_write_batch(conn, batch)
conn.commit() conn.commit()
count += len(batch)
_save_fingerprint(conn, log_file, mtime, size, ingest_time) stats[source_id] = stats.get(source_id, 0) + count
conn.commit() logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id)
stats[source_id] = stats.get(source_id, 0) + count
logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id)
conn.close()
if skipped: if skipped:
logger.info("Skipped %d unchanged file(s): %s", len(skipped), ", ".join(skipped)) logger.info("Skipped %d unchanged file(s): %s", len(skipped), ", ".join(skipped))
@ -493,7 +382,7 @@ def _stream_and_write(
source_id: str, source_id: str,
compiled: list[tuple[LogPattern, object]], compiled: list[tuple[LogPattern, object]],
ingest_time: str, ingest_time: str,
conn: sqlite3.Connection, conn: Any,
batch_size: int, batch_size: int,
) -> int: ) -> int:
"""Stream *cmd* output through *parser* and write entries to *conn*. """Stream *cmd* output through *parser* and write entries to *conn*.
@ -525,7 +414,7 @@ def _glean_ssh_source(
src: dict, # type: ignore[type-arg] src: dict, # type: ignore[type-arg]
compiled: list[tuple[LogPattern, object]], compiled: list[tuple[LogPattern, object]],
ingest_time: str, ingest_time: str,
conn: sqlite3.Connection, conn: Any,
batch_size: int, batch_size: int,
) -> dict[str, int]: ) -> dict[str, int]:
"""Open one SSHTransport connection for *src* and glean all its glean items. """Open one SSHTransport connection for *src* and glean all its glean items.
@ -618,15 +507,9 @@ def glean_ssh_source(
compiled = _compile(load_patterns(effective_pattern_file)) compiled = _compile(load_patterns(effective_pattern_file))
ingest_time = now_iso() ingest_time = now_iso()
conn = sqlite3.connect(str(db_path), timeout=30.0) ensure_schema(db_path)
conn.execute("PRAGMA journal_mode=WAL") with get_conn(db_path) as conn:
conn.executescript(_SCHEMA)
conn.commit()
try:
stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size) stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size)
finally:
conn.close()
logger.info("Rebuilding FTS index after SSH source glean...") logger.info("Rebuilding FTS index after SSH source glean...")
build_fts_index(db_path) build_fts_index(db_path)
@ -740,18 +623,13 @@ def glean_sources(
compiled = _compile(load_patterns(effective_pattern_file)) compiled = _compile(load_patterns(effective_pattern_file))
ingest_time = now_iso() ingest_time = now_iso()
conn = sqlite3.connect(str(db_path), timeout=30.0) ensure_schema(db_path)
conn.execute("PRAGMA journal_mode=WAL") with get_conn(db_path) as conn:
conn.executescript(_SCHEMA)
conn.commit()
try:
for src in ssh_sources: for src in ssh_sources:
ssh_stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size) ssh_stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size)
for k, v in ssh_stats.items(): for k, v in ssh_stats.items():
stats[k] = stats.get(k, 0) + v stats[k] = stats.get(k, 0) + v
finally: conn.commit()
conn.close()
# Rebuild FTS only when SSH sources added entries (_glean_files already # Rebuild FTS only when SSH sources added entries (_glean_files already
# rebuilds when local sources are present; safe to call again if both ran). # rebuilds when local sources are present; safe to call again if both ran).

View file

@ -11,7 +11,7 @@ from __future__ import annotations
import logging import logging
import os import os
import sqlite3 import sqlite3 # still used for the pre-index-check on SQLite backend
import sys import sys
from pathlib import Path from pathlib import Path
@ -53,15 +53,15 @@ _index_ready = False
def _ensure_index() -> None: def _ensure_index() -> None:
"""Build FTS index on first use; skip if already present.""" """Build FTS index on first use; skip if already present (SQLite only)."""
global _index_ready global _index_ready
if _index_ready: if _index_ready:
return return
try: try:
conn = sqlite3.connect(str(DB_PATH), timeout=30.0) raw = sqlite3.connect(str(DB_PATH), timeout=30.0)
count = conn.execute("SELECT COUNT(*) FROM log_fts").fetchone()[0] count = raw.execute("SELECT COUNT(*) FROM log_fts").fetchone()[0]
conn.close() raw.close()
if count > 0: if count > 0:
_index_ready = True _index_ready = True
logger.info("FTS index present (%d entries)", count) logger.info("FTS index present (%d entries)", count)

View file

@ -35,7 +35,8 @@ from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel from pydantic import BaseModel
from app.glean.pipeline import ensure_schema, ensure_context_schema, ensure_incidents_schema, migrate_incidents_to_dedicated_db, glean_file as _glean_file, glean_ssh_source as _glean_ssh_source from app.db import close_pool, ensure_schema, ensure_context_schema, ensure_incidents_schema, migrate_incidents_to_dedicated_db
from app.glean.pipeline import glean_file as _glean_file, glean_ssh_source as _glean_ssh_source
from app.glean.base import load_compiled_patterns, now_iso from app.glean.base import load_compiled_patterns, now_iso
from app.glean.tautulli import parse_webhook as _parse_tautulli from app.glean.tautulli import parse_webhook as _parse_tautulli
from app.glean.wazuh import is_wazuh_alert as _is_wazuh_alert, parse as _parse_wazuh from app.glean.wazuh import is_wazuh_alert as _is_wazuh_alert, parse as _parse_wazuh
@ -185,6 +186,7 @@ async def _lifespan(app: FastAPI):
await task await task
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
close_pool() # no-op if SQLite backend
app = FastAPI(title="Turnstone API", version="0.6.2", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan) app = FastAPI(title="Turnstone API", version="0.6.2", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan)

View file

@ -4,10 +4,12 @@ from __future__ import annotations
import dataclasses import dataclasses
import json import json
import re import re
import sqlite3
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any
from app.db import get_conn, resolve_tenant_id
import yaml import yaml
@ -91,26 +93,26 @@ def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat() return datetime.now(timezone.utc).isoformat()
def _row_to_candidate(row: tuple) -> BlocklistCandidate: def _row_to_candidate(row: Any) -> BlocklistCandidate:
return BlocklistCandidate( return BlocklistCandidate(
id=row[0], id=row["id"],
domain_or_ip=row[1], domain_or_ip=row["domain_or_ip"],
source_device_ip=row[2], source_device_ip=row["source_device_ip"],
source_device_name=row[3], source_device_name=row["source_device_name"],
first_seen=row[4], first_seen=row["first_seen"],
last_seen=row[5], last_seen=row["last_seen"],
hit_count=row[6], hit_count=row["hit_count"],
status=row[7], status=row["status"],
pushed_at=row[8], pushed_at=row["pushed_at"],
log_evidence=json.loads(row[9] or "[]"), log_evidence=json.loads(row["log_evidence"] or "[]"),
matched_rule=row[10], matched_rule=row["matched_rule"],
llm_score=row[11], llm_score=row["llm_score"],
llm_reason=row[12], llm_reason=row["llm_reason"],
) )
def _upsert_candidate( def _upsert_candidate(
conn: sqlite3.Connection, conn: Any,
domain_or_ip: str, domain_or_ip: str,
source_device_ip: str | None, source_device_ip: str | None,
source_device_name: str | None, source_device_name: str | None,
@ -119,26 +121,29 @@ def _upsert_candidate(
now: str, now: str,
) -> bool: ) -> bool:
"""Insert or update a candidate. Returns True if a new row was created.""" """Insert or update a candidate. Returns True if a new row was created."""
tid = resolve_tenant_id()
row = conn.execute( row = conn.execute(
"SELECT id, hit_count, log_evidence FROM blocklist_candidates " "SELECT id, hit_count, log_evidence FROM blocklist_candidates "
"WHERE domain_or_ip = ? AND source_device_ip IS ?", "WHERE domain_or_ip = ? AND source_device_ip IS ? AND (tenant_id = ? OR tenant_id = '')",
(domain_or_ip, source_device_ip), (domain_or_ip, source_device_ip, tid),
).fetchone() ).fetchone()
if row is None: if row is None:
conn.execute( conn.execute(
"""INSERT INTO blocklist_candidates """INSERT INTO blocklist_candidates
(id, domain_or_ip, source_device_ip, source_device_name, (id, tenant_id, domain_or_ip, source_device_ip, source_device_name,
first_seen, last_seen, hit_count, status, pushed_at, log_evidence, matched_rule) first_seen, last_seen, hit_count, status, pushed_at, log_evidence, matched_rule)
VALUES (?, ?, ?, ?, ?, ?, 1, 'pending', NULL, ?, ?)""", VALUES (?, ?, ?, ?, ?, ?, ?, 1, 'pending', NULL, ?, ?)""",
( (
str(uuid.uuid4()), domain_or_ip, source_device_ip, source_device_name, str(uuid.uuid4()), tid, domain_or_ip, source_device_ip, source_device_name,
now, now, json.dumps([entry_id]), matched_rule, now, now, json.dumps([entry_id]), matched_rule,
), ),
) )
return True return True
existing_id, hit_count, existing_evidence = row existing_id = row["id"]
hit_count = row["hit_count"]
existing_evidence = row["log_evidence"]
evidence = json.loads(existing_evidence or "[]") evidence = json.loads(existing_evidence or "[]")
if entry_id not in evidence: if entry_id not in evidence:
evidence.append(entry_id) evidence.append(entry_id)
@ -172,14 +177,16 @@ def run_scan(
now = _now_iso() now = _now_iso()
count = 0 count = 0
conn = sqlite3.connect(str(db_path), timeout=30.0) tid = resolve_tenant_id()
try: with get_conn(db_path) as conn:
rows = conn.execute( rows = conn.execute(
f"SELECT id, text FROM log_entries WHERE source_id IN ({placeholders})", f"SELECT id, text FROM log_entries WHERE source_id IN ({placeholders}) AND (tenant_id = ? OR tenant_id = '')", # noqa: S608
router_source_ids, (*router_source_ids, tid),
).fetchall() ).fetchall()
for entry_id, text in rows: for row in rows:
entry_id, text = row["id"], row["text"]
# rest of loop body follows unchanged
src_ip: str | None = None src_ip: str | None = None
dst: str | None = None dst: str | None = None
@ -204,8 +211,6 @@ def run_scan(
count += 1 count += 1
conn.commit() conn.commit()
finally:
conn.close()
return count return count
@ -226,26 +231,27 @@ def list_candidates(
status: str | None = None, status: str | None = None,
device_ip: str | None = None, device_ip: str | None = None,
) -> list[BlocklistCandidate]: ) -> list[BlocklistCandidate]:
conn = sqlite3.connect(str(db_path), timeout=30.0) tid = resolve_tenant_id()
try: conditions = ["(tenant_id = ? OR tenant_id = '')"]
query = f"{_CANDIDATE_SELECT} WHERE 1=1" params: list = [tid]
params: list = [] if status and status != "all":
if status and status != "all": conditions.append("status = ?")
query += " AND status = ?" params.append(status)
params.append(status) if device_ip:
if device_ip: conditions.append("source_device_ip = ?")
query += " AND source_device_ip = ?" params.append(device_ip)
params.append(device_ip) where = " AND ".join(conditions)
query += " ORDER BY last_seen DESC" with get_conn(db_path) as conn:
rows = conn.execute(query, params).fetchall() rows = conn.execute(
finally: f"{_CANDIDATE_SELECT} WHERE {where} ORDER BY last_seen DESC", # noqa: S608
conn.close() params,
).fetchall()
return [_row_to_candidate(r) for r in rows] return [_row_to_candidate(r) for r in rows]
def _get_candidate(conn: sqlite3.Connection, candidate_id: str) -> BlocklistCandidate: def _get_candidate(conn: Any, candidate_id: str) -> BlocklistCandidate:
row = conn.execute( row = conn.execute(
f"{_CANDIDATE_SELECT} WHERE id=?", f"{_CANDIDATE_SELECT} WHERE id=?", # noqa: S608
(candidate_id,), (candidate_id,),
).fetchone() ).fetchone()
if row is None: if row is None:
@ -255,43 +261,31 @@ def _get_candidate(conn: sqlite3.Connection, candidate_id: str) -> BlocklistCand
def get_candidate(db_path: Path, candidate_id: str) -> BlocklistCandidate: def get_candidate(db_path: Path, candidate_id: str) -> BlocklistCandidate:
"""Fetch a single candidate by ID. Raises KeyError if not found.""" """Fetch a single candidate by ID. Raises KeyError if not found."""
conn = sqlite3.connect(str(db_path), timeout=30.0) with get_conn(db_path) as conn:
try:
return _get_candidate(conn, candidate_id) return _get_candidate(conn, candidate_id)
finally:
conn.close()
def update_candidate_status(db_path: Path, candidate_id: str, new_status: str) -> BlocklistCandidate: def update_candidate_status(db_path: Path, candidate_id: str, new_status: str) -> BlocklistCandidate:
if new_status not in _VALID_STATUSES: if new_status not in _VALID_STATUSES:
raise ValueError(f"Invalid status {new_status!r}. Must be one of {_VALID_STATUSES}") raise ValueError(f"Invalid status {new_status!r}. Must be one of {_VALID_STATUSES}")
conn = sqlite3.connect(str(db_path), timeout=30.0) with get_conn(db_path) as conn:
try:
conn.execute("UPDATE blocklist_candidates SET status=? WHERE id=?", (new_status, candidate_id)) conn.execute("UPDATE blocklist_candidates SET status=? WHERE id=?", (new_status, candidate_id))
conn.commit() conn.commit()
return _get_candidate(conn, candidate_id) return _get_candidate(conn, candidate_id)
finally:
conn.close()
def mark_pushed(db_path: Path, candidate_id: str) -> BlocklistCandidate: def mark_pushed(db_path: Path, candidate_id: str) -> BlocklistCandidate:
conn = sqlite3.connect(str(db_path), timeout=30.0) with get_conn(db_path) as conn:
try:
conn.execute( conn.execute(
"UPDATE blocklist_candidates SET status='pushed', pushed_at=? WHERE id=?", "UPDATE blocklist_candidates SET status='pushed', pushed_at=? WHERE id=?",
(_now_iso(), candidate_id), (_now_iso(), candidate_id),
) )
conn.commit() conn.commit()
return _get_candidate(conn, candidate_id) return _get_candidate(conn, candidate_id)
finally:
conn.close()
def mark_unblocked(db_path: Path, candidate_id: str) -> BlocklistCandidate: def mark_unblocked(db_path: Path, candidate_id: str) -> BlocklistCandidate:
conn = sqlite3.connect(str(db_path), timeout=30.0) with get_conn(db_path) as conn:
try:
conn.execute("UPDATE blocklist_candidates SET status='unblocked' WHERE id=?", (candidate_id,)) conn.execute("UPDATE blocklist_candidates SET status='unblocked' WHERE id=?", (candidate_id,))
conn.commit() conn.commit()
return _get_candidate(conn, candidate_id) return _get_candidate(conn, candidate_id)
finally:
conn.close()

View file

@ -3,10 +3,10 @@ from __future__ import annotations
import json import json
import re import re
import sqlite3
import uuid import uuid
from pathlib import Path from pathlib import Path
from app.db import get_conn, resolve_tenant_id
from app.glean.base import now_iso from app.glean.base import now_iso
from app.services.models import Incident, ReceivedBundle, SentBundle from app.services.models import Incident, ReceivedBundle, SentBundle
from app.services.search import SearchResult, entries_in_window, search from app.services.search import SearchResult, entries_in_window, search
@ -26,7 +26,7 @@ def _redact_text(text: str) -> str:
return text return text
def _row_to_incident(row: sqlite3.Row) -> Incident: def _row_to_incident(row) -> Incident:
return Incident( return Incident(
id=row["id"], id=row["id"],
label=row["label"], label=row["label"],
@ -39,7 +39,7 @@ def _row_to_incident(row: sqlite3.Row) -> Incident:
) )
def _row_to_bundle(row: sqlite3.Row) -> ReceivedBundle: def _row_to_bundle(row) -> ReceivedBundle:
return ReceivedBundle( return ReceivedBundle(
id=row["id"], id=row["id"],
source_host=row["source_host"], source_host=row["source_host"],
@ -62,6 +62,7 @@ def create_incident(
notes: str = "", notes: str = "",
severity: str = "medium", severity: str = "medium",
) -> Incident: ) -> Incident:
tid = resolve_tenant_id()
incident = Incident( incident = Incident(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
label=label, label=label,
@ -72,47 +73,45 @@ def create_incident(
created_at=now_iso(), created_at=now_iso(),
severity=severity, severity=severity,
) )
conn = sqlite3.connect(str(db_path), timeout=30.0) with get_conn(db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL") conn.execute(
conn.execute( "INSERT INTO incidents (id, tenant_id, label, issue_type, started_at, ended_at, notes, created_at, severity) "
"INSERT INTO incidents (id, label, issue_type, started_at, ended_at, notes, created_at, severity) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (incident.id, tid, incident.label, incident.issue_type, incident.started_at,
(incident.id, incident.label, incident.issue_type, incident.started_at, incident.ended_at, incident.notes, incident.created_at, incident.severity),
incident.ended_at, incident.notes, incident.created_at, incident.severity), )
) conn.commit()
conn.commit()
conn.close()
return incident return incident
def list_incidents(db_path: Path) -> list[Incident]: def list_incidents(db_path: Path) -> list[Incident]:
conn = sqlite3.connect(str(db_path), timeout=30.0) tid = resolve_tenant_id()
conn.execute("PRAGMA journal_mode=WAL") with get_conn(db_path) as conn:
conn.row_factory = sqlite3.Row rows = conn.execute(
rows = conn.execute( "SELECT * FROM incidents WHERE (tenant_id = ? OR tenant_id = '') ORDER BY created_at DESC",
"SELECT * FROM incidents ORDER BY created_at DESC" (tid,),
).fetchall() ).fetchall()
conn.close()
return [_row_to_incident(r) for r in rows] return [_row_to_incident(r) for r in rows]
def get_incident(db_path: Path, incident_id: str) -> Incident | None: def get_incident(db_path: Path, incident_id: str) -> Incident | None:
conn = sqlite3.connect(str(db_path), timeout=30.0) tid = resolve_tenant_id()
conn.execute("PRAGMA journal_mode=WAL") with get_conn(db_path) as conn:
conn.row_factory = sqlite3.Row row = conn.execute(
row = conn.execute( "SELECT * FROM incidents WHERE id = ? AND (tenant_id = ? OR tenant_id = '')",
"SELECT * FROM incidents WHERE id = ?", (incident_id,) (incident_id, tid),
).fetchone() ).fetchone()
conn.close()
return _row_to_incident(row) if row else None return _row_to_incident(row) if row else None
def delete_incident(db_path: Path, incident_id: str) -> bool: def delete_incident(db_path: Path, incident_id: str) -> bool:
conn = sqlite3.connect(str(db_path), timeout=30.0) tid = resolve_tenant_id()
conn.execute("PRAGMA journal_mode=WAL") with get_conn(db_path) as conn:
cur = conn.execute("DELETE FROM incidents WHERE id = ?", (incident_id,)) cur = conn.execute(
conn.commit() "DELETE FROM incidents WHERE id = ? AND (tenant_id = ? OR tenant_id = '')",
conn.close() (incident_id, tid),
)
conn.commit()
return cur.rowcount > 0 return cur.rowcount > 0
@ -191,6 +190,7 @@ def build_bundle(
def record_sent_bundle(db_path: Path, incident_id: str, bundle: dict, sanitized: bool) -> SentBundle: def record_sent_bundle(db_path: Path, incident_id: str, bundle: dict, sanitized: bool) -> SentBundle:
"""Log an outgoing bundle export to the sent_bundles table.""" """Log an outgoing bundle export to the sent_bundles table."""
tid = resolve_tenant_id()
record = SentBundle( record = SentBundle(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
incident_id=incident_id, incident_id=incident_id,
@ -199,28 +199,25 @@ def record_sent_bundle(db_path: Path, incident_id: str, bundle: dict, sanitized:
entry_count=len(bundle.get("log_entries", [])), entry_count=len(bundle.get("log_entries", [])),
bundle_json=json.dumps(bundle), bundle_json=json.dumps(bundle),
) )
conn = sqlite3.connect(str(db_path), timeout=30.0) with get_conn(db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL") conn.execute(
conn.execute( "INSERT INTO sent_bundles (id, tenant_id, incident_id, exported_at, sanitized, entry_count, bundle_json) "
"INSERT INTO sent_bundles (id, incident_id, exported_at, sanitized, entry_count, bundle_json) " "VALUES (?, ?, ?, ?, ?, ?, ?)",
"VALUES (?, ?, ?, ?, ?, ?)", (record.id, tid, record.incident_id, record.exported_at,
(record.id, record.incident_id, record.exported_at, int(record.sanitized), int(record.sanitized), record.entry_count, record.bundle_json),
record.entry_count, record.bundle_json), )
) conn.commit()
conn.commit()
conn.close()
return record return record
def list_sent_bundles(db_path: Path) -> list[SentBundle]: def list_sent_bundles(db_path: Path) -> list[SentBundle]:
conn = sqlite3.connect(str(db_path), timeout=30.0) tid = resolve_tenant_id()
conn.execute("PRAGMA journal_mode=WAL") with get_conn(db_path) as conn:
conn.row_factory = sqlite3.Row rows = conn.execute(
rows = conn.execute( "SELECT id, incident_id, exported_at, sanitized, entry_count, bundle_json "
"SELECT id, incident_id, exported_at, sanitized, entry_count, bundle_json " "FROM sent_bundles WHERE (tenant_id = ? OR tenant_id = '') ORDER BY exported_at DESC",
"FROM sent_bundles ORDER BY exported_at DESC" (tid,),
).fetchall() ).fetchall()
conn.close()
return [ return [
SentBundle( SentBundle(
id=r["id"], id=r["id"],
@ -236,6 +233,7 @@ def list_sent_bundles(db_path: Path) -> list[SentBundle]:
def store_bundle(db_path: Path, bundle: dict) -> ReceivedBundle: def store_bundle(db_path: Path, bundle: dict) -> ReceivedBundle:
"""Store an incoming bundle from a remote Turnstone instance.""" """Store an incoming bundle from a remote Turnstone instance."""
tid = resolve_tenant_id()
inc = bundle.get("incident", {}) inc = bundle.get("incident", {})
record = ReceivedBundle( record = ReceivedBundle(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
@ -248,38 +246,34 @@ def store_bundle(db_path: Path, bundle: dict) -> ReceivedBundle:
entry_count=len(bundle.get("log_entries", [])), entry_count=len(bundle.get("log_entries", [])),
bundle_json=json.dumps(bundle), bundle_json=json.dumps(bundle),
) )
conn = sqlite3.connect(str(db_path), timeout=30.0) with get_conn(db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL") conn.execute(
conn.execute( "INSERT INTO received_bundles "
"INSERT INTO received_bundles " "(id, tenant_id, source_host, issue_type, label, severity, started_at, bundled_at, entry_count, bundle_json) "
"(id, source_host, issue_type, label, severity, started_at, bundled_at, entry_count, bundle_json) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", (record.id, tid, record.source_host, record.issue_type, record.label,
(record.id, record.source_host, record.issue_type, record.label, record.severity, record.started_at, record.bundled_at, record.entry_count, record.bundle_json),
record.severity, record.started_at, record.bundled_at, record.entry_count, record.bundle_json), )
) conn.commit()
conn.commit()
conn.close()
return record return record
def list_bundles(db_path: Path) -> list[ReceivedBundle]: def list_bundles(db_path: Path) -> list[ReceivedBundle]:
conn = sqlite3.connect(str(db_path), timeout=30.0) tid = resolve_tenant_id()
conn.execute("PRAGMA journal_mode=WAL") with get_conn(db_path) as conn:
conn.row_factory = sqlite3.Row rows = conn.execute(
rows = conn.execute( "SELECT id, source_host, issue_type, label, severity, started_at, bundled_at, entry_count, bundle_json "
"SELECT id, source_host, issue_type, label, severity, started_at, bundled_at, entry_count, bundle_json " "FROM received_bundles WHERE (tenant_id = ? OR tenant_id = '') ORDER BY bundled_at DESC",
"FROM received_bundles ORDER BY bundled_at DESC" (tid,),
).fetchall() ).fetchall()
conn.close()
return [_row_to_bundle(r) for r in rows] return [_row_to_bundle(r) for r in rows]
def get_bundle(db_path: Path, bundle_id: str) -> ReceivedBundle | None: def get_bundle(db_path: Path, bundle_id: str) -> ReceivedBundle | None:
conn = sqlite3.connect(str(db_path), timeout=30.0) tid = resolve_tenant_id()
conn.execute("PRAGMA journal_mode=WAL") with get_conn(db_path) as conn:
conn.row_factory = sqlite3.Row row = conn.execute(
row = conn.execute( "SELECT * FROM received_bundles WHERE id = ? AND (tenant_id = ? OR tenant_id = '')",
"SELECT * FROM received_bundles WHERE id = ?", (bundle_id,) (bundle_id, tid),
).fetchone() ).fetchone()
conn.close()
return _row_to_bundle(row) if row else None return _row_to_bundle(row) if row else None

View file

@ -1,4 +1,8 @@
"""FTS5-based log search with optional hybrid BM25 + vector re-ranking.""" """FTS-based log search with optional hybrid BM25 + vector re-ranking.
SQLite backend: FTS5 virtual table with Porter stemmer.
Postgres backend: tsvector column with GIN index + websearch_to_tsquery.
"""
from __future__ import annotations from __future__ import annotations
import json import json
@ -6,8 +10,11 @@ import logging
import re import re
import sqlite3 import sqlite3
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
from app.db import BACKEND, Backend, frag, get_conn, resolve_tenant_id
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -28,22 +35,24 @@ class SearchResult:
def build_fts_index(db_path: Path) -> None: def build_fts_index(db_path: Path) -> None:
"""Build (or rebuild) the FTS5 index from log_entries. Safe to re-run. """Build (or rebuild) the FTS5 index from log_entries. Safe to re-run.
Drops and recreates the table if the schema is stale (missing sequence column). For Postgres, the tsvector column is maintained by a trigger this is a no-op.
""" """
conn = sqlite3.connect(str(db_path), timeout=30.0) if BACKEND == Backend.POSTGRES:
conn.execute("PRAGMA journal_mode=WAL") return
raw = sqlite3.connect(str(db_path), timeout=30.0)
raw.execute("PRAGMA journal_mode=WAL")
# Check whether existing table has the sequence column; rebuild if not.
needs_rebuild = False needs_rebuild = False
try: try:
conn.execute("SELECT sequence FROM log_fts LIMIT 0") raw.execute("SELECT sequence FROM log_fts LIMIT 0")
except sqlite3.OperationalError: except sqlite3.OperationalError:
needs_rebuild = True needs_rebuild = True
if needs_rebuild: if needs_rebuild:
conn.execute("DROP TABLE IF EXISTS log_fts") raw.execute("DROP TABLE IF EXISTS log_fts")
conn.executescript(""" raw.executescript("""
CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5( CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5(
text, text,
entry_id UNINDEXED, entry_id UNINDEXED,
@ -57,8 +66,7 @@ def build_fts_index(db_path: Path) -> None:
tokenize = 'porter ascii' tokenize = 'porter ascii'
); );
""") """)
# Only insert rows not already indexed raw.execute("""
conn.execute("""
INSERT INTO log_fts(text, entry_id, source_id, sequence, severity, INSERT INTO log_fts(text, entry_id, source_id, sequence, severity,
timestamp_iso, matched_patterns, timestamp_iso, matched_patterns,
repeat_count, out_of_order) repeat_count, out_of_order)
@ -68,8 +76,8 @@ def build_fts_index(db_path: Path) -> None:
FROM log_entries e FROM log_entries e
WHERE e.id NOT IN (SELECT entry_id FROM log_fts WHERE entry_id IS NOT NULL) WHERE e.id NOT IN (SELECT entry_id FROM log_fts WHERE entry_id IS NOT NULL)
""") """)
conn.commit() raw.commit()
conn.close() raw.close()
def _sanitize_fts_query(raw: str, or_mode: bool = False) -> str: def _sanitize_fts_query(raw: str, or_mode: bool = False) -> str:
@ -198,14 +206,44 @@ def _bm25_search(
include_repeats: bool = False, include_repeats: bool = False,
or_mode: bool = False, or_mode: bool = False,
) -> list[SearchResult]: ) -> list[SearchResult]:
"""Pure BM25 FTS5 search — internal helper used by both search() and _hybrid_search().""" """FTS search — BM25 via FTS5 (SQLite) or tsvector (Postgres)."""
conn = sqlite3.connect(str(db_path), timeout=30.0) tid = resolve_tenant_id()
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
if BACKEND == Backend.POSTGRES:
return _pg_fts_search(
db_path, query, tid,
severity=severity, source_filter=source_filter,
pattern_filter=pattern_filter, since=since, until=until,
limit=limit, include_repeats=include_repeats,
)
return _sqlite_fts_search(
db_path, query, tid,
severity=severity, source_filter=source_filter,
pattern_filter=pattern_filter, since=since, until=until,
limit=limit, include_repeats=include_repeats, or_mode=or_mode,
)
def _sqlite_fts_search(
db_path: Path,
query: str,
tid: str,
severity: str | None,
source_filter: str | None,
pattern_filter: str | None,
since: str | None,
until: str | None,
limit: int,
include_repeats: bool,
or_mode: bool,
) -> list[SearchResult]:
fts_query = _sanitize_fts_query(query, or_mode=or_mode) fts_query = _sanitize_fts_query(query, or_mode=or_mode)
conditions = ["log_fts MATCH ?"] conditions = [
params: list = [fts_query] "log_fts MATCH ?",
"(e.tenant_id = ? OR e.tenant_id = '')",
]
params: list = [fts_query, tid]
if severity: if severity:
conditions.append("severity = ?") conditions.append("severity = ?")
@ -223,29 +261,33 @@ def _bm25_search(
conditions.append("timestamp_iso <= ?") conditions.append("timestamp_iso <= ?")
params.append(until) params.append(until)
if not include_repeats: if not include_repeats:
conditions.append("repeat_count = 1") conditions.append("f.repeat_count = 1")
where = " AND ".join(conditions) where = " AND ".join(conditions)
params.append(limit) params.append(limit)
raw = sqlite3.connect(str(db_path), timeout=30.0)
raw.row_factory = sqlite3.Row
try: try:
rows = conn.execute( rows = raw.execute(
f""" f"""
SELECT entry_id, source_id, sequence, timestamp_iso, severity, SELECT f.entry_id, f.source_id, f.sequence, f.timestamp_iso, f.severity,
repeat_count, out_of_order, matched_patterns, text, rank f.repeat_count, f.out_of_order, f.matched_patterns, f.text, f.rank
FROM log_fts FROM log_fts f
JOIN log_entries e ON e.id = f.entry_id
WHERE {where} WHERE {where}
ORDER BY rank ORDER BY f.rank
LIMIT ? LIMIT ?
""", """,
params, params,
).fetchall() ).fetchall()
except sqlite3.OperationalError as e: except sqlite3.OperationalError as exc:
logger.warning("FTS query failed (%s) — index may not be built yet", e) logger.warning("FTS query failed (%s) — index may not be built yet", exc)
conn.close()
return [] return []
finally:
raw.close()
results = [ return [
SearchResult( SearchResult(
entry_id=r["entry_id"], entry_id=r["entry_id"],
source_id=r["source_id"], source_id=r["source_id"],
@ -256,12 +298,83 @@ def _bm25_search(
out_of_order=bool(r["out_of_order"]), out_of_order=bool(r["out_of_order"]),
matched_patterns=json.loads(r["matched_patterns"] or "[]"), matched_patterns=json.loads(r["matched_patterns"] or "[]"),
text=r["text"], text=r["text"],
rank=r["rank"], rank=float(r["rank"]),
)
for r in rows
]
def _pg_fts_search(
db_path: Path,
query: str,
tid: str,
severity: str | None,
source_filter: str | None,
pattern_filter: str | None,
since: str | None,
until: str | None,
limit: int,
include_repeats: bool,
) -> list[SearchResult]:
"""Postgres FTS via tsvector column and websearch_to_tsquery."""
tsq = "websearch_to_tsquery('english', %s)"
conditions = [
f"text_tsv @@ {tsq}",
"(tenant_id = %s OR tenant_id = '')",
]
params: list = [query, tid]
if severity:
conditions.append("severity = %s")
params.append(severity.upper())
if source_filter:
conditions.append("source_id LIKE %s")
params.append(f"%{source_filter}%")
if pattern_filter:
conditions.append("matched_patterns LIKE %s")
params.append(f'%"{pattern_filter}"%')
if since:
conditions.append("timestamp_iso >= %s")
params.append(since)
if until:
conditions.append("timestamp_iso <= %s")
params.append(until)
if not include_repeats:
conditions.append("repeat_count = 1")
where = " AND ".join(conditions)
# ts_rank needs the tsquery again — append it then the limit
params.extend([query, limit])
with get_conn(db_path) as conn:
rows = conn.execute(
f"""
SELECT id AS entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text,
ts_rank(text_tsv, {tsq}) AS rank
FROM log_entries
WHERE {where}
ORDER BY rank DESC
LIMIT %s
""",
params,
).fetchall()
return [
SearchResult(
entry_id=r["entry_id"],
source_id=r["source_id"],
sequence=r["sequence"],
timestamp_iso=r["timestamp_iso"],
severity=r["severity"],
repeat_count=r["repeat_count"],
out_of_order=bool(r["out_of_order"]),
matched_patterns=json.loads(r["matched_patterns"] or "[]"),
text=r["text"],
rank=float(r["rank"]),
) )
for r in rows for r in rows
] ]
conn.close()
return results
def entries_in_window( def entries_in_window(
@ -282,12 +395,12 @@ def entries_in_window(
(e.g. network-syslog) don't crowd out lower-volume but more interesting ones. (e.g. network-syslog) don't crowd out lower-volume but more interesting ones.
Errors/warnings are ranked first within each source partition. Errors/warnings are ranked first within each source partition.
""" """
conn = sqlite3.connect(str(db_path), timeout=30.0) tid = resolve_tenant_id()
conn.execute("PRAGMA journal_mode=WAL") conditions: list[str] = [
conn.row_factory = sqlite3.Row "repeat_count = 1",
"(tenant_id = ? OR tenant_id = '')",
conditions: list[str] = ["repeat_count = 1"] ]
params: list = [] params: list = [tid]
if since: if since:
conditions.append("timestamp_iso >= ?") conditions.append("timestamp_iso >= ?")
@ -305,8 +418,7 @@ def entries_in_window(
where = " AND ".join(conditions) where = " AND ".join(conditions)
if per_source_cap is not None: if per_source_cap is not None:
# Use a window function to cap rows per source, errors/warnings first. sql = f"""
query = f"""
WITH ranked AS ( WITH ranked AS (
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity, SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank, repeat_count, out_of_order, matched_patterns, text, 0.0 as rank,
@ -333,7 +445,7 @@ def entries_in_window(
""" """
params.extend([per_source_cap, limit]) params.extend([per_source_cap, limit])
else: else:
query = f""" sql = f"""
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity, SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank repeat_count, out_of_order, matched_patterns, text, 0.0 as rank
FROM log_entries FROM log_entries
@ -343,8 +455,8 @@ def entries_in_window(
""" """
params.append(limit) params.append(limit)
rows = conn.execute(query, params).fetchall() with get_conn(db_path) as conn:
conn.close() rows = conn.execute(sql, params).fetchall()
return [ return [
SearchResult( SearchResult(
@ -357,7 +469,7 @@ def entries_in_window(
out_of_order=bool(r["out_of_order"]), out_of_order=bool(r["out_of_order"]),
matched_patterns=json.loads(r["matched_patterns"] or "[]"), matched_patterns=json.loads(r["matched_patterns"] or "[]"),
text=r["text"], text=r["text"],
rank=r["rank"], rank=float(r["rank"]),
) )
for r in rows for r in rows
] ]
@ -376,16 +488,14 @@ def recent_source_errors(
Bypasses FTS ranking so text content doesn't affect which errors surface. Bypasses FTS ranking so text content doesn't affect which errors surface.
Used by diagnose when FTS keyword search returns nothing for a known source. Used by diagnose when FTS keyword search returns nothing for a known source.
""" """
conn = sqlite3.connect(str(db_path), timeout=30.0) tid = resolve_tenant_id()
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
conditions = [ conditions = [
"source_id LIKE ?", "source_id LIKE ?",
"severity = ?", "severity = ?",
"repeat_count = 1", "repeat_count = 1",
"(tenant_id = ? OR tenant_id = '')",
] ]
params: list = [f"%{source_filter}%", severity.upper()] params: list = [f"%{source_filter}%", severity.upper(), tid]
if since: if since:
conditions.append("timestamp_iso >= ?") conditions.append("timestamp_iso >= ?")
@ -397,18 +507,18 @@ def recent_source_errors(
params.append(limit) params.append(limit)
where = " AND ".join(conditions) where = " AND ".join(conditions)
rows = conn.execute( with get_conn(db_path) as conn:
f""" rows = conn.execute(
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity, f"""
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
FROM log_entries repeat_count, out_of_order, matched_patterns, text, 0.0 as rank
WHERE {where} FROM log_entries
ORDER BY timestamp_iso DESC WHERE {where}
LIMIT ? ORDER BY timestamp_iso DESC
""", LIMIT ?
params, """,
).fetchall() params,
conn.close() ).fetchall()
return [ return [
SearchResult( SearchResult(
@ -421,7 +531,7 @@ def recent_source_errors(
out_of_order=bool(r["out_of_order"]), out_of_order=bool(r["out_of_order"]),
matched_patterns=json.loads(r["matched_patterns"] or "[]"), matched_patterns=json.loads(r["matched_patterns"] or "[]"),
text=r["text"], text=r["text"],
rank=r["rank"], rank=float(r["rank"]),
) )
for r in rows for r in rows
] ]
@ -436,37 +546,34 @@ def list_sources(db_path: Path) -> list[dict]:
returned as-is. ``unit_count`` reports how many distinct sub-units were returned as-is. ``unit_count`` reports how many distinct sub-units were
merged into each row. merged into each row.
""" """
conn = sqlite3.connect(str(db_path), timeout=30.0) tid = resolve_tenant_id()
conn.execute("PRAGMA journal_mode=WAL") group_expr = frag.source_group_expr("source_id")
rows = conn.execute(""" with get_conn(db_path) as conn:
SELECT rows = conn.execute(
CASE f"""
WHEN INSTR(SUBSTR(source_id, INSTR(source_id, ':')+1), ':') > 0 SELECT
THEN SUBSTR(source_id, 1, {group_expr} AS group_id,
INSTR(source_id, ':') COUNT(DISTINCT source_id) AS unit_count,
+ INSTR(SUBSTR(source_id, INSTR(source_id, ':')+1), ':') COUNT(*) AS entry_count,
- 1) MIN(timestamp_iso) AS earliest,
ELSE source_id MAX(timestamp_iso) AS latest,
END AS group_id, SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT')
COUNT(DISTINCT source_id) AS unit_count, THEN 1 ELSE 0 END) AS error_count
COUNT(*) AS entry_count, FROM log_entries
MIN(timestamp_iso) AS earliest, WHERE (tenant_id = ? OR tenant_id = '')
MAX(timestamp_iso) AS latest, GROUP BY group_id
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') ORDER BY entry_count DESC
THEN 1 ELSE 0 END) AS error_count """,
FROM log_entries (tid,),
GROUP BY group_id ).fetchall()
ORDER BY entry_count DESC
""").fetchall()
conn.close()
return [ return [
{ {
"source_id": r[0], "source_id": r["group_id"],
"unit_count": r[1], "unit_count": r["unit_count"],
"entry_count": r[2], "entry_count": r["entry_count"],
"earliest": r[3], "earliest": r["earliest"],
"latest": r[4], "latest": r["latest"],
"error_count": r[5], "error_count": r["error_count"],
} }
for r in rows for r in rows
] ]
@ -498,47 +605,65 @@ def stats_summary(db_path: Path, window_hours: int = 24, severity_overrides: lis
Queries plain log_entries (not FTS) so it works even before the index is built. Queries plain log_entries (not FTS) so it works even before the index is built.
""" """
rules = _compile_overrides(severity_overrides or []) rules = _compile_overrides(severity_overrides or [])
tid = resolve_tenant_id()
group_expr = frag.source_group_expr("source_id")
since_iso = (
datetime.now(timezone.utc) - timedelta(hours=window_hours)
).strftime("%Y-%m-%dT%H:%M:%S")
conn = sqlite3.connect(str(db_path), timeout=30.0) with get_conn(db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL") row = conn.execute(
conn.row_factory = sqlite3.Row """
SELECT
COUNT(*) AS total,
SUM(CASE WHEN severity = 'CRITICAL' THEN 1 ELSE 0 END) AS criticals,
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS errors
FROM log_entries
WHERE timestamp_iso >= ?
AND repeat_count = 1
AND (tenant_id = ? OR tenant_id = '')
""",
(since_iso, tid),
).fetchone()
total_24h = int(row["total"] or 0)
criticals_24h = int(row["criticals"] or 0)
errors_24h = int(row["errors"] or 0)
since_expr = f"strftime('%Y-%m-%dT%H:%M:%S', 'now', '-{window_hours} hours')" source_rows = conn.execute(
f"""
SELECT
{group_expr} AS group_id,
COUNT(*) AS entry_count,
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS error_count,
MAX(timestamp_iso) AS latest
FROM log_entries
WHERE timestamp_iso >= ?
AND repeat_count = 1
AND (tenant_id = ? OR tenant_id = '')
GROUP BY group_id
ORDER BY error_count DESC, entry_count DESC
""",
(since_iso, tid),
).fetchall()
# Overall counts in window crit_rows = conn.execute(
row = conn.execute(f""" """
SELECT SELECT id as entry_id, source_id, timestamp_iso, severity, text
COUNT(*) AS total, FROM log_entries
SUM(CASE WHEN severity = 'CRITICAL' THEN 1 ELSE 0 END) AS criticals, WHERE severity = 'CRITICAL'
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS errors AND repeat_count = 1
FROM log_entries AND (tenant_id = ? OR tenant_id = '')
WHERE timestamp_iso >= {since_expr} ORDER BY timestamp_iso DESC
AND repeat_count = 1 LIMIT 25
""").fetchone() """,
total_24h = int(row["total"] or 0) (tid,),
criticals_24h = int(row["criticals"] or 0) ).fetchall()
errors_24h = int(row["errors"] or 0)
last_row = conn.execute(
"SELECT MAX(ingest_time) AS t FROM log_entries WHERE (tenant_id = ? OR tenant_id = '')",
(tid,),
).fetchone()
# Per-source breakdown — grouped by prefix:host stem (same logic as list_sources).
source_rows = conn.execute(f"""
SELECT
CASE
WHEN INSTR(SUBSTR(source_id, INSTR(source_id, ':')+1), ':') > 0
THEN SUBSTR(source_id, 1,
INSTR(source_id, ':')
+ INSTR(SUBSTR(source_id, INSTR(source_id, ':')+1), ':')
- 1)
ELSE source_id
END AS group_id,
COUNT(*) AS entry_count,
SUM(CASE WHEN severity IN ('ERROR','CRITICAL','EMERGENCY','ALERT') THEN 1 ELSE 0 END) AS error_count,
MAX(timestamp_iso) AS latest
FROM log_entries
WHERE timestamp_iso >= {since_expr}
AND repeat_count = 1
GROUP BY group_id
ORDER BY error_count DESC, entry_count DESC
""").fetchall()
source_health = [ source_health = [
{ {
"source_id": r["group_id"], "source_id": r["group_id"],
@ -549,16 +674,6 @@ def stats_summary(db_path: Path, window_hours: int = 24, severity_overrides: lis
for r in source_rows for r in source_rows
] ]
# Fetch candidate criticals (fetch more so filtering doesn't leave us with too few)
crit_rows = conn.execute("""
SELECT id as entry_id, source_id, timestamp_iso, severity, text
FROM log_entries
WHERE severity = 'CRITICAL' AND repeat_count = 1
ORDER BY timestamp_iso DESC
LIMIT 25
""").fetchall()
# Apply overrides: skip entries whose effective severity is no longer CRITICAL
suppressed = 0 suppressed = 0
recent_criticals = [] recent_criticals = []
for r in crit_rows: for r in crit_rows:
@ -576,11 +691,8 @@ def stats_summary(db_path: Path, window_hours: int = 24, severity_overrides: lis
else: else:
suppressed += 1 suppressed += 1
last_row = conn.execute("SELECT MAX(ingest_time) AS t FROM log_entries").fetchone()
last_gleaned: str | None = last_row["t"] if last_row else None last_gleaned: str | None = last_row["t"] if last_row else None
conn.close()
return { return {
"window_hours": window_hours, "window_hours": window_hours,
"total_24h": total_24h, "total_24h": total_24h,

View file

@ -11,7 +11,7 @@ from __future__ import annotations
import asyncio import asyncio
import json import json
import logging import logging
import sqlite3 from app.db import get_conn, resolve_tenant_id
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
@ -49,9 +49,8 @@ def get_state() -> IngestState:
def _query_matched_since(db_path: Path, since: str | None) -> list[dict]: def _query_matched_since(db_path: Path, since: str | None) -> list[dict]:
"""Return entries with non-empty matched_patterns, optionally filtered by ingest_time.""" """Return entries with non-empty matched_patterns, optionally filtered by ingest_time."""
conn = sqlite3.connect(str(db_path), timeout=30.0) tid = resolve_tenant_id()
conn.row_factory = sqlite3.Row with get_conn(db_path) as conn:
try:
if since: if since:
rows = conn.execute( rows = conn.execute(
""" """
@ -59,11 +58,13 @@ def _query_matched_since(db_path: Path, since: str | None) -> list[dict]:
ingest_time, severity, repeat_count, out_of_order, ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text matched_patterns, text
FROM log_entries FROM log_entries
WHERE matched_patterns != '[]' AND ingest_time > ? WHERE matched_patterns != '[]'
AND ingest_time > ?
AND (tenant_id = ? OR tenant_id = '')
ORDER BY ingest_time ORDER BY ingest_time
LIMIT 5000 LIMIT 5000
""", """,
(since,), (since, tid),
).fetchall() ).fetchall()
else: else:
rows = conn.execute( rows = conn.execute(
@ -73,13 +74,13 @@ def _query_matched_since(db_path: Path, since: str | None) -> list[dict]:
matched_patterns, text matched_patterns, text
FROM log_entries FROM log_entries
WHERE matched_patterns != '[]' WHERE matched_patterns != '[]'
AND (tenant_id = ? OR tenant_id = '')
ORDER BY ingest_time DESC ORDER BY ingest_time DESC
LIMIT 5000 LIMIT 5000
""", """,
(tid,),
).fetchall() ).fetchall()
return [dict(r) for r in rows] return [dict(r) for r in rows]
finally:
conn.close()
async def submit_matched( async def submit_matched(

View file

@ -8,7 +8,6 @@ from __future__ import annotations
import json import json
import logging import logging
import sqlite3
import subprocess import subprocess
import threading import threading
from dataclasses import dataclass, field from dataclasses import dataclass, field
@ -21,9 +20,10 @@ import yaml
from app.glean import journald as journald_parser, syslog as syslog_parser from app.glean import journald as journald_parser, syslog as syslog_parser
from app.glean import plaintext as plaintext_parser, servarr as servarr_parser, plex as plex_parser from app.glean import plaintext as plaintext_parser, servarr as servarr_parser, plex as plex_parser
from app.glean import qbittorrent as qbit_parser, caddy as caddy_parser from app.glean import qbittorrent as qbit_parser, caddy as caddy_parser
from app.glean.pipeline import _detect_format from app.db import get_conn
from app.db.schema import ensure_schema
from app.glean.pipeline import _detect_format, _write_batch
from app.glean.base import _compile, load_patterns, now_iso from app.glean.base import _compile, load_patterns, now_iso
from app.glean.pipeline import _write_batch, _SCHEMA
from app.services.search import build_fts_index from app.services.search import build_fts_index
from app.services.models import RetrievedEntry from app.services.models import RetrievedEntry
@ -111,28 +111,24 @@ class WatchSource:
patterns = load_patterns(self.pattern_file) patterns = load_patterns(self.pattern_file)
compiled = _compile(patterns) compiled = _compile(patterns)
conn = sqlite3.connect(str(self.db_path), timeout=30.0) ensure_schema(self.db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript(_SCHEMA)
conn.commit()
try: with get_conn(self.db_path) as conn:
cmd = self._build_command() try:
if not cmd: cmd = self._build_command()
return if not cmd:
self._proc = subprocess.Popen( return
cmd, self._proc = subprocess.Popen(
stdout=subprocess.PIPE, cmd,
stderr=subprocess.PIPE, stdout=subprocess.PIPE,
text=True, stderr=subprocess.PIPE,
bufsize=1, text=True,
) bufsize=1,
self._drain(conn, compiled) )
except Exception as exc: self._drain(conn, compiled)
self._error = str(exc) except Exception as exc:
logger.error("Watch source %r crashed: %s", self.config.source_id, exc) self._error = str(exc)
finally: logger.error("Watch source %r crashed: %s", self.config.source_id, exc)
conn.close()
def _build_command(self) -> list[str] | None: def _build_command(self) -> list[str] | None:
t = self.config.source_type t = self.config.source_type
@ -193,7 +189,7 @@ class WatchSource:
return [] return []
def _drain(self, conn: sqlite3.Connection, compiled) -> None: def _drain(self, conn, compiled) -> None:
"""Read lines from the subprocess and flush to DB periodically.""" """Read lines from the subprocess and flush to DB periodically."""
assert self._proc is not None assert self._proc is not None
buffer: list[str] = [] buffer: list[str] = []
@ -229,7 +225,7 @@ class WatchSource:
if buffer: if buffer:
self._flush(conn, buffer, compiled, flush_count) self._flush(conn, buffer, compiled, flush_count)
def _flush(self, conn: sqlite3.Connection, lines: list[str], compiled, flush_count: int) -> int: def _flush(self, conn, lines: list[str], compiled, flush_count: int) -> int:
ingest_time = now_iso() ingest_time = now_iso()
try: try:
entries = self._parse_lines(lines, ingest_time, compiled) entries = self._parse_lines(lines, ingest_time, compiled)

50
docker-compose.yml Normal file
View file

@ -0,0 +1,50 @@
version: "3.9"
# Turnstone with external Postgres DB.
# Data lives in the named volume `turnstone_pgdata` — survives image rebuilds.
# To adopt an EXISTING Postgres install, set DATABASE_URL to point at it and
# remove the `db` service and `depends_on` blocks.
#
# Quick start:
# docker compose up -d
# # Then open http://localhost:8520
services:
db:
image: postgres:16-alpine
restart: unless-stopped
environment:
POSTGRES_DB: turnstone
POSTGRES_USER: turnstone
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-turnstone_dev}
volumes:
- turnstone_pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U turnstone -d turnstone"]
interval: 5s
timeout: 5s
retries: 5
turnstone:
build: .
restart: unless-stopped
ports:
- "${TURNSTONE_PORT:-8520}:8520"
depends_on:
db:
condition: service_healthy
environment:
# Backend selection — comment out DATABASE_URL to fall back to SQLite
DATABASE_URL: postgresql://turnstone:${POSTGRES_PASSWORD:-turnstone_dev}@db:5432/turnstone
TURNSTONE_TENANT_ID: ${TURNSTONE_TENANT_ID:-}
TURNSTONE_API_KEY: ${TURNSTONE_API_KEY:-}
TURNSTONE_GLEAN_INTERVAL: ${TURNSTONE_GLEAN_INTERVAL:-900}
TURNSTONE_SOURCE_HOST: ${TURNSTONE_SOURCE_HOST:-}
TURNSTONE_SUBMIT_ENDPOINT: ${TURNSTONE_SUBMIT_ENDPOINT:-}
volumes:
- ./patterns:/app/patterns:ro
- ./data:/app/data # optional: persists SQLite files if DATABASE_URL unset
volumes:
turnstone_pgdata:
name: turnstone_pgdata

View file

@ -1,5 +1,7 @@
fastapi>=0.110.0 fastapi>=0.110.0
uvicorn[standard]>=0.27.0 uvicorn[standard]>=0.27.0
# Postgres backend — optional; SQLite is used when DATABASE_URL is unset
psycopg[binary,pool]>=3.1.0
pydantic>=2.0.0 pydantic>=2.0.0
pyyaml>=6.0 pyyaml>=6.0
aiofiles>=23.0.0 aiofiles>=23.0.0

View file

@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""One-shot migration: copy data from existing SQLite DBs into Postgres.
Usage:
DATABASE_URL=postgresql://... python scripts/migrate_sqlite_to_postgres.py \
--main-db data/turnstone.db \
--context-db data/turnstone-context.db \
--incidents-db data/turnstone-incidents.db \
[--tenant-id heimdall]
The script is idempotent: rows already present in Postgres (same id) are skipped.
It must be run ONCE per node after deploying the shared Postgres backend.
Prerequisites:
pip install 'psycopg[binary,pool]'
Set DATABASE_URL to the target Postgres connection string.
"""
from __future__ import annotations
import argparse
import os
import sqlite3
import sys
from pathlib import Path
# Allow running from the project root without installing the package
sys.path.insert(0, str(Path(__file__).parent.parent))
def _pg_connect():
import psycopg # type: ignore[import]
url = os.environ.get("DATABASE_URL")
if not url:
print("ERROR: DATABASE_URL not set", file=sys.stderr)
sys.exit(1)
return psycopg.connect(url, autocommit=False)
def _ensure_schema_pg() -> None:
from app.db.schema import ensure_schema, ensure_context_schema, ensure_incidents_schema
from pathlib import Path
ensure_schema(Path("/dev/null")) # db_path ignored for Postgres
ensure_context_schema(Path("/dev/null"))
ensure_incidents_schema(Path("/dev/null"))
print("Postgres schema verified")
def _migrate_table(
src_conn: sqlite3.Connection,
dst_conn,
table: str,
tenant_id: str,
columns: list[str],
conflict_cols: list[str],
) -> int:
"""Copy rows from SQLite table to Postgres. Returns rows inserted."""
# Check if source table exists
try:
rows = src_conn.execute(f"SELECT * FROM {table} LIMIT 0").fetchall() # noqa: S608
except sqlite3.OperationalError:
print(f" {table}: not found in SQLite — skipping")
return 0
# Fetch all rows
src_conn.row_factory = sqlite3.Row
rows = src_conn.execute(f"SELECT * FROM {table}").fetchall() # noqa: S608
if not rows:
print(f" {table}: empty — skipping")
return 0
# Build INSERT ... ON CONFLICT DO NOTHING
col_list = ", ".join(columns)
placeholders = ", ".join("%s" for _ in columns)
conflict = ", ".join(conflict_cols)
sql = (
f"INSERT INTO {table} ({col_list}) VALUES ({placeholders}) " # noqa: S608
f"ON CONFLICT ({conflict}) DO NOTHING"
)
inserted = 0
with dst_conn.cursor() as cur:
for row in rows:
# Build values: inject tenant_id if not present in source row
vals = []
for col in columns:
if col == "tenant_id":
try:
val = row["tenant_id"] or tenant_id
except (IndexError, KeyError):
val = tenant_id
else:
try:
vals.append(row[col])
except (IndexError, KeyError):
vals.append(None)
continue
vals.append(val)
cur.execute(sql, vals)
inserted += cur.rowcount
dst_conn.commit()
print(f" {table}: {inserted}/{len(rows)} rows inserted ({len(rows) - inserted} skipped)")
return inserted
def main() -> None:
parser = argparse.ArgumentParser(description="Migrate Turnstone SQLite → Postgres")
parser.add_argument("--main-db", default="data/turnstone.db")
parser.add_argument("--context-db", default="data/turnstone-context.db")
parser.add_argument("--incidents-db", default="data/turnstone-incidents.db")
parser.add_argument("--tenant-id", default=None, help="Override tenant ID (default: socket.gethostname())")
args = parser.parse_args()
if args.tenant_id:
os.environ["TURNSTONE_TENANT_ID"] = args.tenant_id
import socket
tenant_id = os.environ.get("TURNSTONE_TENANT_ID") or socket.gethostname()
print(f"Migrating as tenant_id={tenant_id!r}")
# Ensure Postgres schema exists first
os.environ.setdefault("DATABASE_URL", "") # schema functions check this
_ensure_schema_pg()
pg = _pg_connect()
total = 0
# ── Main DB ───────────────────────────────────────────────────────────────
main_path = Path(args.main_db)
if main_path.exists():
print(f"\nMigrating main DB: {main_path}")
src = sqlite3.connect(str(main_path))
src.row_factory = sqlite3.Row
total += _migrate_table(src, pg, "log_entries", tenant_id,
columns=["tenant_id", "id", "source_id", "sequence", "timestamp_raw",
"timestamp_iso", "ingest_time", "severity", "repeat_count",
"out_of_order", "matched_patterns", "text"],
conflict_cols=["tenant_id", "id"])
total += _migrate_table(src, pg, "glean_fingerprints", tenant_id,
columns=["tenant_id", "path", "mtime", "size", "gleaned_at"],
conflict_cols=["tenant_id", "path"])
total += _migrate_table(src, pg, "blocklist_candidates", tenant_id,
columns=["id", "tenant_id", "domain_or_ip", "source_device_ip", "source_device_name",
"first_seen", "last_seen", "hit_count", "status", "pushed_at",
"log_evidence", "matched_rule", "llm_score", "llm_reason"],
conflict_cols=["id"])
src.close()
else:
print(f"Main DB not found at {main_path} — skipping")
# ── Context DB ────────────────────────────────────────────────────────────
ctx_path = Path(args.context_db)
if ctx_path.exists():
print(f"\nMigrating context DB: {ctx_path}")
src = sqlite3.connect(str(ctx_path))
total += _migrate_table(src, pg, "context_facts", tenant_id,
columns=["id", "tenant_id", "category", "key", "value", "source", "created_at"],
conflict_cols=["id"])
total += _migrate_table(src, pg, "context_documents", tenant_id,
columns=["id", "tenant_id", "filename", "doc_type", "full_text", "file_size", "uploaded_at"],
conflict_cols=["id"])
total += _migrate_table(src, pg, "context_chunks", tenant_id,
columns=["id", "tenant_id", "document_id", "chunk_index", "text"],
conflict_cols=["id"])
src.close()
else:
print(f"Context DB not found at {ctx_path} — skipping")
# ── Incidents DB ──────────────────────────────────────────────────────────
inc_path = Path(args.incidents_db)
if inc_path.exists():
print(f"\nMigrating incidents DB: {inc_path}")
src = sqlite3.connect(str(inc_path))
total += _migrate_table(src, pg, "incidents", tenant_id,
columns=["id", "tenant_id", "label", "issue_type", "started_at", "ended_at",
"notes", "created_at", "severity"],
conflict_cols=["id"])
total += _migrate_table(src, pg, "received_bundles", tenant_id,
columns=["id", "tenant_id", "source_host", "issue_type", "label", "severity",
"started_at", "bundled_at", "entry_count", "bundle_json"],
conflict_cols=["id"])
total += _migrate_table(src, pg, "sent_bundles", tenant_id,
columns=["id", "tenant_id", "incident_id", "exported_at", "sanitized",
"entry_count", "bundle_json"],
conflict_cols=["id"])
src.close()
else:
print(f"Incidents DB not found at {inc_path} — skipping")
pg.close()
print(f"\nDone. Total rows inserted: {total}")
if __name__ == "__main__":
main()

View file

@ -4,6 +4,7 @@ import sqlite3
from pathlib import Path from pathlib import Path
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
from app.db.schema import ensure_schema, ensure_context_schema
from app.services.llm import summarize from app.services.llm import summarize
from app.services.search import SearchResult from app.services.search import SearchResult
@ -64,36 +65,14 @@ def test_summarize_without_context_block_unchanged():
@pytest.fixture @pytest.fixture
def db_with_facts(tmp_path): def db_with_facts(tmp_path):
db_path = tmp_path / "t.db" db_path = tmp_path / "t.db"
ensure_schema(db_path)
ensure_context_schema(db_path)
conn = sqlite3.connect(str(db_path)) conn = sqlite3.connect(str(db_path))
conn.executescript(""" conn.execute(
CREATE TABLE log_entries ( "INSERT INTO context_facts(id, tenant_id, category, key, value, source, created_at) "
id TEXT PRIMARY KEY, source_id TEXT NOT NULL, sequence INTEGER NOT NULL, "VALUES (?,?,?,?,?,?,?)",
timestamp_raw TEXT, timestamp_iso TEXT, ingest_time TEXT NOT NULL, ("f1", "", "service", "plex", "port:32400", "wizard", "2026-05-13T00:00:00+00:00"),
severity TEXT, repeat_count INTEGER DEFAULT 1, out_of_order INTEGER DEFAULT 0, )
matched_patterns TEXT DEFAULT '[]', text TEXT NOT NULL
);
CREATE VIRTUAL TABLE IF NOT EXISTS log_fts USING fts5(
text, entry_id UNINDEXED, source_id UNINDEXED, sequence UNINDEXED,
severity UNINDEXED, timestamp_iso UNINDEXED, matched_patterns UNINDEXED,
repeat_count UNINDEXED, out_of_order UNINDEXED, tokenize='porter ascii'
);
CREATE TABLE context_facts (
id TEXT PRIMARY KEY, category TEXT NOT NULL, key TEXT NOT NULL,
value TEXT NOT NULL, source TEXT, created_at TEXT NOT NULL
);
CREATE TABLE context_documents (
id TEXT PRIMARY KEY, filename TEXT NOT NULL, doc_type TEXT NOT NULL,
full_text TEXT NOT NULL, file_size INTEGER, uploaded_at TEXT NOT NULL
);
CREATE TABLE context_chunks (
id TEXT PRIMARY KEY, document_id TEXT NOT NULL
REFERENCES context_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL, text TEXT NOT NULL, embedding BLOB
);
INSERT INTO context_facts VALUES (
'f1','service','plex','port:32400','wizard','2026-05-13T00:00:00+00:00'
);
""")
conn.commit() conn.commit()
conn.close() conn.close()
return db_path return db_path

View file

@ -1,8 +1,8 @@
"""End-to-end upload pipeline: file bytes → DB rows.""" """End-to-end upload pipeline: file bytes → DB rows."""
import sqlite3
import pytest import pytest
from pathlib import Path from pathlib import Path
from app.db.schema import ensure_context_schema
from app.glean.doc_upload import glean_upload from app.glean.doc_upload import glean_upload
from app.context.store import list_facts, list_documents from app.context.store import list_facts, list_documents
from app.context.chunker import UnsupportedDocType from app.context.chunker import UnsupportedDocType
@ -11,24 +11,7 @@ from app.context.chunker import UnsupportedDocType
@pytest.fixture @pytest.fixture
def db(tmp_path): def db(tmp_path):
db_path = tmp_path / "t.db" db_path = tmp_path / "t.db"
conn = sqlite3.connect(str(db_path)) ensure_context_schema(db_path)
conn.executescript("""
CREATE TABLE context_facts (
id TEXT PRIMARY KEY, category TEXT NOT NULL, key TEXT NOT NULL,
value TEXT NOT NULL, source TEXT, created_at TEXT NOT NULL
);
CREATE TABLE context_documents (
id TEXT PRIMARY KEY, filename TEXT NOT NULL, doc_type TEXT NOT NULL,
full_text TEXT NOT NULL, file_size INTEGER, uploaded_at TEXT NOT NULL
);
CREATE TABLE context_chunks (
id TEXT PRIMARY KEY, document_id TEXT NOT NULL
REFERENCES context_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL, text TEXT NOT NULL, embedding BLOB
);
""")
conn.commit()
conn.close()
return db_path return db_path

View file

@ -1,13 +1,13 @@
"""Verify the three new context tables are created by ensure_schema.""" """Verify the three context tables are created by ensure_context_schema."""
import sqlite3 import sqlite3
from pathlib import Path from pathlib import Path
import pytest import pytest
from app.glean.pipeline import ensure_schema from app.db.schema import ensure_context_schema
def test_context_tables_created(tmp_path): def test_context_tables_created(tmp_path):
db = tmp_path / "t.db" db = tmp_path / "t.db"
ensure_schema(db) ensure_context_schema(db)
conn = sqlite3.connect(str(db)) conn = sqlite3.connect(str(db))
tables = {r[0] for r in conn.execute( tables = {r[0] for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'" "SELECT name FROM sqlite_master WHERE type='table'"
@ -20,5 +20,5 @@ def test_context_tables_created(tmp_path):
def test_context_schema_idempotent(tmp_path): def test_context_schema_idempotent(tmp_path):
db = tmp_path / "t.db" db = tmp_path / "t.db"
ensure_schema(db) ensure_context_schema(db)
ensure_schema(db) # second call must not raise ensure_context_schema(db) # second call must not raise

View file

@ -2,6 +2,7 @@
import sqlite3 import sqlite3
import pytest import pytest
from pathlib import Path from pathlib import Path
from app.db.schema import ensure_context_schema
from app.context.store import ( from app.context.store import (
add_fact, list_facts, delete_fact, add_fact, list_facts, delete_fact,
add_document, list_documents, delete_document, add_document, list_documents, delete_document,
@ -12,24 +13,7 @@ from app.context.store import (
@pytest.fixture @pytest.fixture
def db(tmp_path): def db(tmp_path):
db_path = tmp_path / "t.db" db_path = tmp_path / "t.db"
conn = sqlite3.connect(str(db_path)) ensure_context_schema(db_path)
conn.executescript("""
CREATE TABLE context_facts (
id TEXT PRIMARY KEY, category TEXT NOT NULL, key TEXT NOT NULL,
value TEXT NOT NULL, source TEXT, created_at TEXT NOT NULL
);
CREATE TABLE context_documents (
id TEXT PRIMARY KEY, filename TEXT NOT NULL, doc_type TEXT NOT NULL,
full_text TEXT NOT NULL, file_size INTEGER, uploaded_at TEXT NOT NULL
);
CREATE TABLE context_chunks (
id TEXT PRIMARY KEY, document_id TEXT NOT NULL
REFERENCES context_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL, text TEXT NOT NULL, embedding BLOB
);
""")
conn.commit()
conn.close()
return db_path return db_path

View file

@ -2,21 +2,14 @@
import sqlite3 import sqlite3
import pytest import pytest
from pathlib import Path from pathlib import Path
from app.db.schema import ensure_context_schema
from app.context.wizard import get_schema, advance_step, is_complete, apply_session, TOTAL_STEPS from app.context.wizard import get_schema, advance_step, is_complete, apply_session, TOTAL_STEPS
@pytest.fixture @pytest.fixture
def db(tmp_path): def db(tmp_path):
db_path = tmp_path / "t.db" db_path = tmp_path / "t.db"
conn = sqlite3.connect(str(db_path)) ensure_context_schema(db_path)
conn.executescript("""
CREATE TABLE context_facts (
id TEXT PRIMARY KEY, category TEXT NOT NULL, key TEXT NOT NULL,
value TEXT NOT NULL, source TEXT, created_at TEXT NOT NULL
);
""")
conn.commit()
conn.close()
return db_path return db_path

View file

@ -51,12 +51,14 @@ class TestFingerprintHelpers:
def test_fp_unchanged_returns_false_when_no_record(self, db_path: Path, log_file: Path) -> None: def test_fp_unchanged_returns_false_when_no_record(self, db_path: Path, log_file: Path) -> None:
conn = sqlite3.connect(str(db_path)) conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
mtime, size = _fingerprint(log_file) mtime, size = _fingerprint(log_file)
assert _fp_unchanged(conn, log_file, mtime, size) is False assert _fp_unchanged(conn, log_file, mtime, size) is False
conn.close() conn.close()
def test_fp_unchanged_returns_true_after_save(self, db_path: Path, log_file: Path) -> None: def test_fp_unchanged_returns_true_after_save(self, db_path: Path, log_file: Path) -> None:
conn = sqlite3.connect(str(db_path)) conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
mtime, size = _fingerprint(log_file) mtime, size = _fingerprint(log_file)
_save_fingerprint(conn, log_file, mtime, size, now_iso()) _save_fingerprint(conn, log_file, mtime, size, now_iso())
conn.commit() conn.commit()
@ -65,6 +67,7 @@ class TestFingerprintHelpers:
def test_fp_unchanged_returns_false_on_size_change(self, db_path: Path, log_file: Path) -> None: def test_fp_unchanged_returns_false_on_size_change(self, db_path: Path, log_file: Path) -> None:
conn = sqlite3.connect(str(db_path)) conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
mtime, size = _fingerprint(log_file) mtime, size = _fingerprint(log_file)
_save_fingerprint(conn, log_file, mtime, size, now_iso()) _save_fingerprint(conn, log_file, mtime, size, now_iso())
conn.commit() conn.commit()
@ -74,6 +77,7 @@ class TestFingerprintHelpers:
def test_fp_unchanged_returns_false_on_mtime_change(self, db_path: Path, log_file: Path) -> None: def test_fp_unchanged_returns_false_on_mtime_change(self, db_path: Path, log_file: Path) -> None:
conn = sqlite3.connect(str(db_path)) conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
mtime, size = _fingerprint(log_file) mtime, size = _fingerprint(log_file)
_save_fingerprint(conn, log_file, mtime, size, now_iso()) _save_fingerprint(conn, log_file, mtime, size, now_iso())
conn.commit() conn.commit()

View file

@ -33,12 +33,11 @@ def db(tmp_path: Path) -> Path:
("database connection refused backend gone away", "ERROR"), ("database connection refused backend gone away", "ERROR"),
("mDNS avahi heartbeat ok", "INFO"), ("mDNS avahi heartbeat ok", "INFO"),
]): ]):
# Columns: id, source_id, sequence, timestamp_raw, timestamp_iso,
# ingest_time, severity, repeat_count, out_of_order,
# matched_patterns, text
conn.execute( conn.execute(
"INSERT INTO log_entries VALUES (?,?,?,?,?,?,?,?,?,?,?)", "INSERT INTO log_entries(id, tenant_id, source_id, sequence, timestamp_raw, "
(str(uuid.uuid4()), "src", i, None, None, "2026-01-01T00:00:00", sev, 1, 0, "[]", text), "timestamp_iso, ingest_time, severity, repeat_count, out_of_order, "
"matched_patterns, text) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
(str(uuid.uuid4()), "", "src", i, None, None, "2026-01-01T00:00:00", sev, 1, 0, "[]", text),
) )
conn.commit() conn.commit()
conn.close() conn.close()