Compare commits

...

8 commits

Author SHA1 Message Date
9d33b1ab54 docs: add status badge (beta) 2026-05-15 20:13:53 -07:00
bcd321367e feat: GET /api/library/sample-chunks for Avocet embed bench (closes #6)
Returns up to N randomly sampled page chunks (default 50, max 200) with
chunk_id, doc_id, page_number, and text fields. No tier gate — internal
tool endpoint for same-host corpus benchmarking. Returns [] on empty library.
2026-05-13 23:01:16 -07:00
1e066cf66c feat: encryption at rest infrastructure for cloud user data (closes #5)
Implements Option B (fscrypt) from the issue design: OS-level filesystem
encryption for per-user data directories on the cloud host.

- app/startup.py: warn_if_unencrypted() checks for fscrypt at startup in
  cloud mode and logs a SECURITY warning if the users/ directory is not
  encrypted — catches misconfigured deployments before any data is stored
- app/main.py: call warn_if_unencrypted() during lifespan in cloud mode
- scripts/setup_cloud_fscrypt.sh: operator script to encrypt a user's
  data directory with fscrypt (run as root on host before container start);
  supports --list and --status subcommands

Key management note: current implementation uses pam_passphrase protector.
For unattended server boot, integrate a raw_key protector from a secrets
manager (Vault, AWS Secrets Manager, etc.) — see script comments.

SQLCipher (Option A) deferred: sqlite-vec virtual table compatibility with
SQLCipher's encrypted VFS needs investigation before committing to that path.
2026-05-13 18:35:17 -07:00
8eef52a054 feat: per-user database isolation for cloud instances (closes #4)
Implements Option A from the issue design: each cloud user gets their own
data directory (DATA_DIR/users/{user_id}/) with separate pagepiper.db,
pagepiper_vecs.db, uploads/, and books/. Local mode is unchanged.

Key changes:
- app/startup.py: extract apply_migrations, reembed_docs,
  check_and_rebuild_vec_schema out of main.py (no circular imports)
- app/config.py: add LOCAL_USER_ID constant and user_data_dir() helper
- app/cloud_session.py: extract resolve_authenticated_user(); require_paid_tier
  now returns user_id (str) instead of None
- app/deps.py: add UserCtx dataclass (db_path, vec_db_path, data_dir,
  watch_dir, bm25) + get_user_ctx dependency; per-user startup guard runs
  migrations + vec schema check once per process per user
- app/main.py: _bm25 singleton -> _bm25_map dict keyed by user_id;
  add _get_bm25_for(); lifespan only runs startup checks in local mode
- app/api/library.py, search.py, chat.py: thread UserCtx through all
  endpoints; remove module-level _mark_bm25_dirty injection pattern
- tests/conftest.py: override get_user_ctx in addition to get_db so all
  endpoints get a consistent test UserCtx
2026-05-13 16:31:51 -07:00
df9e91ad89 chore: standardize cloud commands to hyphen syntax + add update command
Closes #1: rename cloud:* subcommands to cloud-* (hyphen is the
conventional CLI separator; colon syntax is non-standard).

Closes #2: add update (git pull + rebuild) for both local and cloud
stacks, covering the common deploy-from-git workflow.
2026-05-13 16:12:12 -07:00
3765fbc0f9 fix: quote-first prompt structure + escape phrase post-processing to kill hallucinations
three-layer approach to stop 7B model from supplementing retrieved context
with training-data knowledge:

1. system prompt redesigned: 'no memory of books/stories/authors' eliminates
   the model's self-permission to draw on parametric knowledge

2. quote-first prompt structure: model must commit to a specific quoted passage
   before generating an answer — explicit NOT FOUND required when excerpts lack
   the answer, preventing the 'excerpt doesn't say X... however in the series...'
   escape pattern

3. _strip_escape() post-processor: catches any residual leakage by scanning for
   known escape phrases ('in the series', 'by terry goodkind', 'it can be assumed',
   etc.) and replacing the response with the canned no-answer message
2026-05-06 10:30:11 -07:00
32cb21e2cd fix: reinforce no-hallucination constraint in user-turn prompt; cap per-doc retrieval
synthesizer: repeat the no-outside-knowledge rule inside the user message turn —
small models (7B) follow user-turn instructions more reliably than system-prompt
alone when parametric memory competes with the retrieved context

retriever: cap each document to max(2, top_k//3) slots in the ranked list so
one book cannot flood all result slots on character-name BM25 matches — forces
coverage across more documents when the answer may be in any of them
2026-05-06 10:26:51 -07:00
347b391c6e fix: prevent LLM hallucination when retrieval returns low-signal results
- Strengthen synthesizer system prompt: hard 'respond with exactly' constraint
  instead of soft 'say so'; removes any wiggle room for the model to supplement
  from training data
- Add early return in synthesize() when chunks is empty (belt-and-suspenders
  alongside the existing guard in chat.py)
- Add MIN_SIGNAL threshold (0.01) in retriever: if the top combined score is
  below the threshold, return empty so the caller's no-results path fires instead
  of sending noise chunks to the LLM
2026-05-06 10:17:51 -07:00
17 changed files with 768 additions and 213 deletions

View file

@ -2,6 +2,7 @@
**Search your document library. Get answers with exact page citations.**
[![Status](https://img.shields.io/badge/status-beta-blue)](https://git.opensourcesolarpunk.com/Circuit-Forge/pagepiper)
[![License: MIT / BSL 1.1](https://img.shields.io/badge/license-MIT%20%2F%20BSL%201.1-blue)](LICENSE)
[![Version](https://img.shields.io/badge/version-v0.1.0-orange)](https://git.opensourcesolarpunk.com/Circuit-Forge/pagepiper/releases)

View file

@ -10,9 +10,11 @@ from __future__ import annotations
import logging
import os
from fastapi import APIRouter, HTTPException
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from app.cloud_session import require_paid_tier
from app.deps import UserCtx, get_user_ctx
from app.services.retriever import Retriever
from app.services.synthesizer import Synthesizer
@ -56,21 +58,6 @@ def _get_llm_router():
return LLMRouter(cfg)
def _get_db_path() -> str:
"""Read lazily so test fixtures take effect."""
import pathlib
data_dir = pathlib.Path(os.environ.get("PAGEPIPER_DATA_DIR", "data"))
return str(data_dir / "pagepiper.db")
def _get_vec_db_path() -> str:
import pathlib
data_dir = pathlib.Path(os.environ.get("PAGEPIPER_DATA_DIR", "data"))
return str(data_dir / "pagepiper_vecs.db")
def _require_llm():
"""Return LLMRouter or raise 402."""
llm = _get_llm_router()
@ -89,18 +76,20 @@ def _require_llm():
@router.post("")
def chat(req: ChatRequest) -> ChatResponse:
def chat(
req: ChatRequest,
ctx: UserCtx = Depends(get_user_ctx),
_tier: str = Depends(require_paid_tier),
) -> ChatResponse:
llm = _require_llm()
from app.main import _bm25
retriever = Retriever(_bm25)
retriever = Retriever(ctx.bm25)
chunks = retriever.hybrid_search(
query=req.message,
top_k=req.top_k,
doc_ids=req.doc_ids,
db_path=_get_db_path(),
vec_db_path=_get_vec_db_path(),
db_path=ctx.db_path,
vec_db_path=ctx.vec_db_path,
llm=llm,
)
@ -141,7 +130,10 @@ def chat_feedback_status() -> dict:
@router.post("/feedback")
def submit_chat_feedback(req: ChatFeedbackRequest) -> dict:
def submit_chat_feedback(
req: ChatFeedbackRequest,
ctx: UserCtx = Depends(get_user_ctx),
) -> dict:
import json
import sqlite3
@ -149,8 +141,7 @@ def submit_chat_feedback(req: ChatFeedbackRequest) -> dict:
from fastapi import HTTPException
raise HTTPException(status_code=422, detail="rating must be 1 or -1")
db_path = _get_db_path()
con = sqlite3.connect(db_path)
con = sqlite3.connect(ctx.db_path)
try:
con.execute(
"INSERT INTO chat_feedback (rating, question, answer, doc_ids) VALUES (?, ?, ?, ?)",

View file

@ -14,26 +14,25 @@ from typing import Callable
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, UploadFile
from app.config import WATCH_DIR, DB_PATH, VEC_DB_PATH, DATA_DIR
from app.deps import get_db
from app.config import VEC_DIMENSIONS
from app.deps import UserCtx, get_db, get_user_ctx
_MAX_UPLOAD_BYTES = 200 * 1024 * 1024 # 200 MB
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/library", tags=["library"])
# Injected by main.py after _bm25 is created
_mark_bm25_dirty: Callable[[], None] | None = None
_INGEST_TASKS = {
".pdf": "pagepiper/ingest_pdf",
".epub": "pagepiper/ingest_epub",
".docx": "pagepiper/ingest_docx",
}
_INGEST_RUNNERS = {
".pdf": "scripts.ingest_pdf",
".epub": "scripts.ingest_epub",
".docx": "scripts.ingest_docx",
}
@ -41,24 +40,22 @@ def _dispatch_ingest(
doc_id: str,
file_path: str,
background_tasks: BackgroundTasks,
data_dir: Path,
mark_dirty_fn: Callable[[], None],
) -> str:
"""Dispatch an ingest task. Tries cf-orch; falls back to BackgroundTasks."""
import importlib
import os as _os
from pathlib import Path as _Path
suffix = _Path(file_path).suffix.lower()
suffix = Path(file_path).suffix.lower()
task_name = _INGEST_TASKS.get(suffix, "pagepiper/ingest_pdf")
runner_module = _INGEST_RUNNERS.get(suffix, "scripts.ingest_pdf")
# Read lazily so test fixtures (monkeypatch.setenv) take effect
_data_dir = _Path(_os.environ.get("PAGEPIPER_DATA_DIR", "data"))
task_id = str(uuid.uuid4())
args = {
"doc_id": doc_id,
"file_path": file_path,
"db_path": str(_data_dir / "pagepiper.db"),
"vec_db_path": str(_data_dir / "pagepiper_vecs.db"),
"db_path": str(data_dir / "pagepiper.db"),
"vec_db_path": str(data_dir / "pagepiper_vecs.db"),
}
try:
@ -67,7 +64,7 @@ def _dispatch_ingest(
logger.info("Dispatched cf-orch ingest task %s for doc %s", task_id, doc_id)
except Exception:
mod = importlib.import_module(runner_module)
background_tasks.add_task(_run_ingest_background, mod.run, args, task_id)
background_tasks.add_task(_run_ingest_background, mod.run, args, task_id, mark_dirty_fn)
logger.info(
"cf-orch unavailable — running ingest in background thread (task %s)", task_id
)
@ -75,19 +72,47 @@ def _dispatch_ingest(
return task_id
def _run_ingest_background(run_fn: Callable[..., None], args: dict, task_id: str) -> None:
def _run_ingest_background(
run_fn: Callable[..., None],
args: dict,
task_id: str,
mark_dirty_fn: Callable[[], None] | None = None,
) -> None:
from app.api.ingest import _task_registry
_task_registry[task_id] = {"status": "running", "progress": 0}
try:
run_fn(**args)
_task_registry[task_id] = {"status": "complete", "progress": 100}
if _mark_bm25_dirty:
_mark_bm25_dirty()
if mark_dirty_fn:
mark_dirty_fn()
except Exception as exc:
logger.exception("Ingest task %s failed", task_id)
_task_registry[task_id] = {"status": "error", "error": str(exc)}
@router.get("/sample-chunks")
def sample_chunks(
limit: int = 50,
db: sqlite3.Connection = Depends(get_db),
) -> list[dict]:
"""Return up to `limit` randomly sampled page chunks for corpus benchmarking.
No tier gate internal tool, same-host access only. Returns [] if empty.
"""
if limit < 1:
limit = 1
elif limit > 200:
limit = 200
rows = db.execute(
"SELECT id, doc_id, page_number, text FROM page_chunks ORDER BY RANDOM() LIMIT ?",
[limit],
).fetchall()
return [
{"chunk_id": r["id"], "doc_id": r["doc_id"], "page_number": r["page_number"], "text": r["text"]}
for r in rows
]
@router.get("")
def list_library(db: sqlite3.Connection = Depends(get_db)) -> list[dict]:
rows = db.execute(
@ -101,13 +126,18 @@ def list_library(db: sqlite3.Connection = Depends(get_db)) -> list[dict]:
def scan_library(
background_tasks: BackgroundTasks,
db: sqlite3.Connection = Depends(get_db),
ctx: UserCtx = Depends(get_user_ctx),
) -> dict:
"""Scan the watched directory and queue ingest for any new PDFs."""
watch = WATCH_DIR
watch = ctx.watch_dir
if not watch.exists():
raise HTTPException(status_code=404, detail=f"Watch directory not found: {watch}")
pdfs = list(watch.glob("**/*.pdf")) + list(watch.glob("**/*.epub"))
pdfs = (
list(watch.glob("**/*.pdf"))
+ list(watch.glob("**/*.epub"))
+ list(watch.glob("**/*.docx"))
)
queued = []
for pdf_path in pdfs:
@ -117,7 +147,7 @@ def scan_library(
).fetchone()
if existing and existing["status"] == "ready":
continue # already indexed
continue
if existing:
doc_id = existing["id"]
@ -129,7 +159,9 @@ def scan_library(
).fetchone()[0]
db.commit()
task_id = _dispatch_ingest(doc_id, path_str, background_tasks)
task_id = _dispatch_ingest(
doc_id, path_str, background_tasks, ctx.data_dir, ctx.bm25.mark_dirty
)
db.execute(
"UPDATE documents SET status='processing', task_id=? WHERE id=?",
[task_id, doc_id],
@ -145,12 +177,15 @@ def reingest_document(
doc_id: str,
background_tasks: BackgroundTasks,
db: sqlite3.Connection = Depends(get_db),
ctx: UserCtx = Depends(get_user_ctx),
) -> dict:
row = db.execute("SELECT file_path FROM documents WHERE id=?", [doc_id]).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Document not found")
task_id = _dispatch_ingest(doc_id, row["file_path"], background_tasks)
task_id = _dispatch_ingest(
doc_id, row["file_path"], background_tasks, ctx.data_dir, ctx.bm25.mark_dirty
)
db.execute(
"UPDATE documents SET status='processing', task_id=?, error_msg=NULL WHERE id=?",
[task_id, doc_id],
@ -163,6 +198,7 @@ def reingest_document(
def delete_document(
doc_id: str,
db: sqlite3.Connection = Depends(get_db),
ctx: UserCtx = Depends(get_user_ctx),
) -> None:
row = db.execute("SELECT id FROM documents WHERE id=?", [doc_id]).fetchone()
if not row:
@ -171,23 +207,21 @@ def delete_document(
db.execute("DELETE FROM documents WHERE id=?", [doc_id])
db.commit()
# Remove embeddings from vector store
try:
from circuitforge_core.vector.sqlite_vec import LocalSQLiteVecStore # type: ignore[import]
from app.config import VEC_DIMENSIONS
store = LocalSQLiteVecStore(db_path=VEC_DB_PATH, table="page_vecs", dimensions=VEC_DIMENSIONS)
store = LocalSQLiteVecStore(
db_path=ctx.vec_db_path, table="page_vecs", dimensions=VEC_DIMENSIONS
)
store.delete_where({"doc_id": doc_id})
except Exception as exc:
logger.warning("Could not remove vectors for doc %s: %s", doc_id, exc)
if _mark_bm25_dirty:
_mark_bm25_dirty()
ctx.bm25.mark_dirty()
def _get_vec_count(doc_id: str) -> int:
"""Return how many vectors have been stored for this doc. Returns 0 on any error."""
def _get_vec_count(doc_id: str, vec_db_path: str) -> int:
try:
conn = sqlite3.connect(VEC_DB_PATH)
conn = sqlite3.connect(vec_db_path)
count = conn.execute(
"SELECT COUNT(*) FROM page_vecs_meta WHERE json_extract(metadata, '$.doc_id') = ?",
[doc_id],
@ -202,6 +236,7 @@ def _get_vec_count(doc_id: str) -> int:
def document_status(
doc_id: str,
db: sqlite3.Connection = Depends(get_db),
ctx: UserCtx = Depends(get_user_ctx),
) -> dict:
row = db.execute(
"SELECT id, status, task_id, page_count, error_msg FROM documents WHERE id=?",
@ -210,7 +245,7 @@ def document_status(
if not row:
raise HTTPException(status_code=404, detail="Document not found")
result = dict(row)
result["vec_count"] = _get_vec_count(doc_id)
result["vec_count"] = _get_vec_count(doc_id, ctx.vec_db_path)
return result
@ -219,18 +254,19 @@ def upload_document(
file: UploadFile,
background_tasks: BackgroundTasks,
db: sqlite3.Connection = Depends(get_db),
ctx: UserCtx = Depends(get_user_ctx),
) -> dict:
"""Accept a PDF/EPUB upload, save to data/uploads/, and queue for indexing."""
name = Path(file.filename or "").name
suffix = Path(name).suffix.lower()
if suffix not in _INGEST_TASKS:
raise HTTPException(status_code=400, detail="Supported formats: PDF, EPUB")
raise HTTPException(status_code=400, detail="Supported formats: PDF, EPUB, DOCX")
content = file.file.read()
if len(content) > _MAX_UPLOAD_BYTES:
raise HTTPException(status_code=413, detail="File exceeds 200 MB limit")
upload_dir = DATA_DIR / "uploads"
upload_dir = ctx.data_dir / "uploads"
upload_dir.mkdir(parents=True, exist_ok=True)
dest = upload_dir / name
dest.write_bytes(content)
@ -253,7 +289,9 @@ def upload_document(
).fetchone()[0]
db.commit()
task_id = _dispatch_ingest(doc_id, path_str, background_tasks)
task_id = _dispatch_ingest(
doc_id, path_str, background_tasks, ctx.data_dir, ctx.bm25.mark_dirty
)
db.execute(
"UPDATE documents SET status='processing', task_id=? WHERE id=?",
[task_id, doc_id],

View file

@ -7,13 +7,11 @@ MIT — no tier gate. No Ollama required.
from __future__ import annotations
import logging
import os
from typing import Annotated
from fastapi import APIRouter, Depends
from pydantic import BaseModel, Field
from app.services.bm25_index import BM25Index
from app.deps import UserCtx, get_user_ctx
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/search", tags=["search"])
@ -29,32 +27,17 @@ class SearchResult(BaseModel):
chunk_id: str
doc_id: str
page_number: int
text_snippet: str # first 300 chars of the page text
text_snippet: str
bm25_score: float
def _get_bm25() -> BM25Index:
import app.main as _main
bm25 = getattr(_main, "_bm25", None)
if bm25 is None:
raise RuntimeError("BM25 index not initialised — app.main not loaded")
return bm25
def _get_db_path() -> str:
"""Read lazily so test fixtures (monkeypatch.setattr) take effect."""
import pathlib
data_dir = pathlib.Path(os.environ.get("PAGEPIPER_DATA_DIR", "data"))
return str(data_dir / "pagepiper.db")
@router.post("")
def search(
req: SearchRequest,
bm25: Annotated[BM25Index, Depends(_get_bm25)],
ctx: UserCtx = Depends(get_user_ctx),
) -> list[SearchResult]:
bm25.ensure_fresh(_get_db_path())
hits = bm25.query(req.query, top_k=req.top_k, doc_ids=req.doc_ids)
ctx.bm25.ensure_fresh(ctx.db_path)
hits = ctx.bm25.query(req.query, top_k=req.top_k, doc_ids=req.doc_ids)
return [
SearchResult(
chunk_id=h.chunk_id,

131
app/cloud_session.py Normal file
View file

@ -0,0 +1,131 @@
# app/cloud_session.py
"""Cloud session auth for Pagepiper — validates cf_session cookie via Directus + Heimdall.
In local mode (CLOUD_MODE unset or false), require_paid_tier is a no-op.
In cloud mode, the Caddy proxy forwards the browser's Cookie header as
X-CF-Session. This module extracts cf_session, validates it against
Directus /users/me, then checks the user's Pagepiper tier via Heimdall.
Auto-provisions a free tier key for new users.
"""
from __future__ import annotations
import logging
import os
import re
import httpx
from fastapi import HTTPException, Request
log = logging.getLogger(__name__)
CLOUD_MODE: bool = os.environ.get("CLOUD_MODE", "").lower() in ("1", "true", "yes")
DIRECTUS_URL: str = os.environ.get("DIRECTUS_URL", "http://172.31.0.3:8055").rstrip("/")
HEIMDALL_URL: str = os.environ.get("HEIMDALL_URL", "https://license.circuitforge.tech").rstrip("/")
HEIMDALL_ADMIN_TOKEN: str = os.environ.get("HEIMDALL_ADMIN_TOKEN", "")
_TIER_ORDER = {"free": 0, "paid": 1, "premium": 2, "ultra": 3}
def _extract_session_token(cookie_header: str) -> str:
m = re.search(r"(?:^|;)\s*cf_session=([^;]+)", cookie_header)
return m.group(1).strip() if m else ""
def _get_user_id(jwt: str) -> str | None:
try:
resp = httpx.get(
f"{DIRECTUS_URL}/users/me",
headers={"Authorization": f"Bearer {jwt}"},
timeout=5.0,
)
if resp.status_code == 200:
return resp.json().get("data", {}).get("id")
except Exception as exc:
log.warning("Directus session check failed: %s", exc)
return None
def _ensure_provisioned(user_id: str) -> None:
if not HEIMDALL_ADMIN_TOKEN:
return
try:
httpx.post(
f"{HEIMDALL_URL}/admin/provision",
json={"directus_user_id": user_id, "product": "pagepiper", "tier": "free"},
headers={"Authorization": f"Bearer {HEIMDALL_ADMIN_TOKEN}"},
timeout=5.0,
)
except Exception as exc:
log.warning("Heimdall provision failed for user %s: %s", user_id, exc)
def _get_tier(user_id: str) -> str:
if not HEIMDALL_ADMIN_TOKEN:
return "free"
try:
resp = httpx.get(
f"{HEIMDALL_URL}/admin/cloud/resolve",
params={"directus_user_id": user_id, "product": "pagepiper"},
headers={"Authorization": f"Bearer {HEIMDALL_ADMIN_TOKEN}"},
timeout=5.0,
)
if resp.status_code == 200:
return resp.json().get("tier", "free")
except Exception as exc:
log.warning("Heimdall tier check failed for user %s: %s", user_id, exc)
return "free"
def resolve_authenticated_user(request: Request) -> str:
"""Validate the session cookie and return the Directus user_id. Raises 401 if invalid."""
cookie_header = request.headers.get("x-cf-session", "")
jwt = _extract_session_token(cookie_header)
if not jwt:
raise HTTPException(
status_code=401,
detail={
"error": "auth_required",
"message": "Sign in at circuitforge.tech to use Pagepiper cloud.",
},
)
user_id = _get_user_id(jwt)
if not user_id:
raise HTTPException(
status_code=401,
detail={
"error": "session_invalid",
"message": "Your session has expired. Sign in again at circuitforge.tech.",
},
)
_ensure_provisioned(user_id)
return user_id
def require_paid_tier(request: Request) -> str:
"""FastAPI dependency — 401 if no valid session, 402 if tier < paid. Returns user_id.
In local mode (CLOUD_MODE not set), returns LOCAL_USER_ID without any auth check.
"""
if not CLOUD_MODE:
from app.config import LOCAL_USER_ID
return LOCAL_USER_ID
user_id = resolve_authenticated_user(request)
tier = _get_tier(user_id)
if _TIER_ORDER.get(tier, 0) < _TIER_ORDER["paid"]:
raise HTTPException(
status_code=402,
detail={
"error": "upgrade_required",
"message": (
"RAG chat requires a Paid tier Pagepiper license. "
"Upgrade at circuitforge.tech/software/pagepiper."
),
},
)
return user_id

View file

@ -12,16 +12,36 @@ VEC_DB_PATH = str(DATA_DIR / "pagepiper_vecs.db")
WATCH_DIR = Path(os.environ.get("PAGEPIPER_WATCH_DIR", "books"))
VEC_DIMENSIONS = int(os.environ.get("PAGEPIPER_EMBED_DIMS", "1024"))
LOCAL_USER_ID = "__local__"
def user_data_dir(user_id: str) -> Path:
"""Return (and create) the per-user data directory under DATA_DIR/users/."""
d = DATA_DIR / "users" / user_id
d.mkdir(parents=True, exist_ok=True)
return d
def get_llm_config() -> dict | None:
"""Build LLMRouter config from env vars. Returns None if PAGEPIPER_OLLAMA_URL is unset."""
"""Build LLMRouter config from env vars.
Returns None only when neither PAGEPIPER_OLLAMA_URL nor CF_ORCH_URL is set.
CF_ORCH_URL alone is sufficient the coordinator resolves the service URL at
allocation time so PAGEPIPER_OLLAMA_URL becomes optional.
"""
url = os.environ.get("PAGEPIPER_OLLAMA_URL", "").strip()
if not url:
orch_url = os.environ.get("CF_ORCH_URL", "").strip()
if not url and not orch_url:
return None
_clean = url.rstrip("/")
_base_url = _clean if _clean.endswith("/v1") else _clean + "/v1"
chat_model = os.environ.get("PAGEPIPER_CHAT_MODEL", "mistral:7b")
_base_url = ""
if url:
_clean = url.rstrip("/")
_base_url = _clean if _clean.endswith("/v1") else _clean + "/v1"
backend: dict = {
"type": "openai_compat",
"base_url": _base_url,
@ -30,12 +50,9 @@ def get_llm_config() -> dict | None:
"supports_images": False,
}
# Wire cf-orch allocation when coordinator is configured so the model stays warm
# and cold-start latency doesn't cause chat timeouts.
orch_url = os.environ.get("CF_ORCH_URL", "").strip()
if orch_url:
backend["cf_orch"] = {
"service": "ollama",
"service": os.environ.get("PAGEPIPER_ORCH_SERVICE", "ollama"),
"model_candidates": [chat_model],
"ttl_s": 3600,
}

View file

@ -3,13 +3,75 @@
from __future__ import annotations
import sqlite3
from dataclasses import dataclass
from pathlib import Path
from typing import Generator
from app.config import DB_PATH
from fastapi import Depends, Request
from app.config import DATA_DIR, LOCAL_USER_ID
from app.services.bm25_index import BM25Index
def get_db() -> Generator[sqlite3.Connection, None, None]:
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
@dataclass
class UserCtx:
"""Per-request context routing DB paths and BM25 to the right user."""
user_id: str
db_path: str
vec_db_path: str
data_dir: Path
watch_dir: Path
bm25: BM25Index
_user_startup_done: set[str] = set()
def _run_user_startup(user_id: str, user_dir: Path) -> None:
"""Run migrations and vec schema check once per process lifetime per user."""
if user_id in _user_startup_done:
return
_user_startup_done.add(user_id)
from app.config import VEC_DIMENSIONS
from app.startup import apply_migrations, check_and_rebuild_vec_schema
apply_migrations(str(user_dir / "pagepiper.db"))
check_and_rebuild_vec_schema(
str(user_dir / "pagepiper_vecs.db"), VEC_DIMENSIONS, str(user_dir / "pagepiper.db")
)
def get_user_ctx(request: Request) -> UserCtx:
"""Resolve the per-user data directory, DB paths, and BM25 instance for this request."""
import app.main as _main
from app.cloud_session import CLOUD_MODE
if CLOUD_MODE:
from app.cloud_session import resolve_authenticated_user
from app.config import user_data_dir
user_id = resolve_authenticated_user(request)
user_dir = user_data_dir(user_id)
_run_user_startup(user_id, user_dir)
watch_dir = user_dir / "books"
watch_dir.mkdir(parents=True, exist_ok=True)
else:
from app.config import WATCH_DIR
user_id = LOCAL_USER_ID
user_dir = DATA_DIR
watch_dir = WATCH_DIR
return UserCtx(
user_id=user_id,
db_path=str(user_dir / "pagepiper.db"),
vec_db_path=str(user_dir / "pagepiper_vecs.db"),
data_dir=user_dir,
watch_dir=watch_dir,
bm25=_main._get_bm25_for(user_id),
)
def get_db(ctx: UserCtx = Depends(get_user_ctx)) -> Generator[sqlite3.Connection, None, None]:
conn = sqlite3.connect(ctx.db_path, check_same_thread=False)
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA journal_mode = WAL")
conn.row_factory = sqlite3.Row

View file

@ -4,9 +4,6 @@ from __future__ import annotations
import logging
import os
import re
import sqlite3
import threading
from contextlib import asynccontextmanager
from fastapi import FastAPI
@ -16,110 +13,40 @@ from app.services.bm25_index import BM25Index
logger = logging.getLogger("pagepiper")
# Module-level BM25 singleton — shared across all requests
_bm25 = BM25Index()
# Per-user BM25 registry — keyed by user_id; "__local__" for single-user mode
_bm25_map: dict[str, BM25Index] = {}
def _apply_migrations() -> None:
from scripts.db_migrate import migrate
migrate(DB_PATH)
def _reembed_docs(docs: list[tuple[str, str]], db_path: str, vec_db_path: str) -> None:
"""Re-run full ingest for a list of (doc_id, file_path) sequentially."""
for doc_id, file_path in docs:
suffix = os.path.splitext(file_path)[1].lower()
try:
if suffix == ".epub":
from scripts.ingest_epub import run
else:
from scripts.ingest_pdf import run
logger.info("Auto re-embed: starting %s", os.path.basename(file_path))
run(doc_id=doc_id, file_path=file_path, db_path=db_path, vec_db_path=vec_db_path)
except Exception as exc:
logger.error("Auto re-embed failed for doc %s: %s", doc_id[:8], exc)
def _check_vec_schema(vec_db_path: str, expected_dims: int, db_path: str) -> None:
"""Drop the vec DB if its stored dimension doesn't match config, then queue re-embed.
sqlite-vec bakes the embedding dimension into the virtual table DDL, so changing
models requires dropping and recreating the whole file. Catches the mismatch at
startup rather than surfacing it as an obscure OperationalError mid-request.
"""
if not os.path.exists(vec_db_path):
return
try:
conn = sqlite3.connect(vec_db_path)
row = conn.execute(
"SELECT sql FROM sqlite_master WHERE name='page_vecs_vecs'"
).fetchone()
conn.close()
except Exception as exc:
logger.warning("Vec schema check could not read %s (non-fatal): %s", vec_db_path, exc)
return
if not row:
return # table not yet created — first embed will build it with the right dims
m = re.search(r'float\[(\d+)\]', row[0])
if not m:
return
actual_dims = int(m.group(1))
if actual_dims == expected_dims:
return
logger.warning(
"Vec DB dimension mismatch: stored=%d, configured=%d — dropping %s and queuing re-embed",
actual_dims, expected_dims, vec_db_path,
)
try:
os.remove(vec_db_path)
except OSError as exc:
logger.error(
"Could not delete stale vec DB %s: %s — fix permissions and restart", vec_db_path, exc
)
return
# Collect all ready docs so we can rebuild their embeddings in the background.
try:
conn = sqlite3.connect(db_path)
docs = conn.execute(
"SELECT id, file_path FROM documents WHERE status='ready'"
).fetchall()
conn.close()
except Exception as exc:
logger.warning("Could not query documents for re-embed: %s", exc)
return
if not docs:
return
logger.info("Queuing re-embed for %d document(s) in background", len(docs))
threading.Thread(
target=_reembed_docs,
args=(docs, db_path, vec_db_path),
daemon=True,
name="pagepiper-reembed",
).start()
def _get_bm25_for(user_id: str) -> BM25Index:
if user_id not in _bm25_map:
_bm25_map[user_id] = BM25Index()
return _bm25_map[user_id]
@asynccontextmanager
async def lifespan(app: FastAPI):
_apply_migrations()
from app.cloud_session import CLOUD_MODE
from app.config import LOCAL_USER_ID
from app.startup import apply_migrations, check_and_rebuild_vec_schema
embed_model = os.environ.get("PAGEPIPER_EMBED_MODEL", "nomic-embed-text")
logger.info("Pagepiper starting — embed model: %s, dims: %d", embed_model, VEC_DIMENSIONS)
_check_vec_schema(VEC_DB_PATH, VEC_DIMENSIONS, DB_PATH)
_bm25.mark_dirty() # will rebuild on first search
if CLOUD_MODE:
from app.startup import warn_if_unencrypted
from app.config import DATA_DIR
warn_if_unencrypted(str(DATA_DIR))
else:
# In cloud mode, per-user migration and vec schema check run on first request (deps.py).
apply_migrations(DB_PATH)
check_and_rebuild_vec_schema(VEC_DB_PATH, VEC_DIMENSIONS, DB_PATH)
_get_bm25_for(LOCAL_USER_ID).mark_dirty()
yield
app = FastAPI(title="Pagepiper", lifespan=lifespan)
# Wire BM25 dirty callback into library router
from app.api import library as _lib_module # noqa: E402
_lib_module._mark_bm25_dirty = _bm25.mark_dirty
# Register routers
from app.api.library import router as library_router # noqa: E402
from app.api.ingest import router as ingest_router # noqa: E402

View file

@ -170,7 +170,30 @@ class Retriever:
vec = (1.0 / (1.0 + r.vector_score)) if r.vector_score is not None else 0.0
return bm25 * 0.5 + vec * 0.5
ranked = sorted(merged.values(), key=_combined, reverse=True)[:top_k]
all_ranked = sorted(merged.values(), key=_combined, reverse=True)
# Discard results where the best match is pure noise (neither BM25 term
# overlap nor vector similarity exceeded the minimum signal threshold).
# This lets the caller's empty-result guard fire instead of sending
# low-confidence chunks to the LLM where it fills gaps with training data.
MIN_SIGNAL = 0.01
if all_ranked and _combined(all_ranked[0]) < MIN_SIGNAL:
return []
# Cap per-document contribution to max_per_doc of top_k so that one book
# does not crowd out all slots when the query matches it heavily by name
# alone (e.g. a character name that appears in every chapter).
max_per_doc = max(2, top_k // 3)
ranked: list[RetrievedChunk] = []
doc_counts: dict[str, int] = {}
for r in all_ranked:
if len(ranked) >= top_k:
break
count = doc_counts.get(r.doc_id, 0)
if count < max_per_doc:
ranked.append(r)
doc_counts[r.doc_id] = count + 1
adjacent = _fetch_adjacent(ranked, db_path)
return ranked + adjacent
@ -189,5 +212,8 @@ class Retriever:
)
for r in self._bm25.query(query, top_k=top_k, doc_ids=doc_ids)
]
MIN_SIGNAL = 0.01
if hits and hits[0].bm25_score < MIN_SIGNAL:
return []
adjacent = _fetch_adjacent(hits, db_path)
return hits + adjacent

View file

@ -11,12 +11,52 @@ from dataclasses import dataclass
from app.services.retriever import RetrievedChunk
_SYSTEM_PROMPT = (
"You are a helpful document assistant. "
"Answer the user's question using ONLY the provided document excerpts. "
"For each claim, cite the source page as [p.N]. "
"If the excerpts are insufficient, say so. Do not invent information."
"You are a strict document retrieval assistant. "
"Your sole job is to extract and present information from the document excerpts given to you. "
"You have no memory of books, stories, or authors. "
"If the excerpts do not contain the answer, say so and stop. Never guess."
)
_NO_RESULTS_ANSWER = (
"I could not find any relevant passages in the indexed documents for that question. "
"Try rephrasing, or check that the relevant document has been ingested."
)
# Phrases the model uses when it escapes the provided context and pulls from
# training data. Any response containing one of these is replaced with the
# canned no-answer message.
_ESCAPE_PHRASES = [
"in the series",
"in the novel",
"in the book",
"in the context of the series",
"it can be assumed",
"based on my knowledge",
"based on the broader",
"the broader story",
"by terry goodkind",
"sword of truth",
"legend of the seeker",
"throughout the series",
"throughout the novel",
"throughout the book",
]
def _strip_escape(response: str) -> str:
"""Replace responses that leaked outside the provided context with the canned message.
Detects the 'helpful override' pattern where the model acknowledges the
excerpts lack the answer but supplements from training data anyway.
"""
lower = response.lower()
if any(phrase in lower for phrase in _ESCAPE_PHRASES):
return (
"I could not find an answer to that question in the indexed documents. "
"The answer may be in a document that has not been ingested yet."
)
return response
@dataclass(frozen=True)
class Citation:
@ -42,13 +82,30 @@ class Synthesizer:
history: list[dict],
chunks: list[RetrievedChunk],
) -> SynthesisResult:
if not chunks:
return SynthesisResult(answer=_NO_RESULTS_ANSWER, citations=())
# 1500 chars (~300 words) per chunk: enough to capture definitions that
# appear mid-paragraph without blowing past a 32k-context model's limit.
context_parts = [f"[p.{c.page_number}]\n{c.text[:1500]}" for c in chunks]
context = "\n\n---\n\n".join(context_parts)
prompt = f"Document excerpts:\n\n{context}\n\nQuestion: {message}"
# Quote-first structure: the model must commit to a grounding passage
# before generating an answer. Forces an explicit "NOT FOUND" admission
# when the excerpt doesn't contain the answer, rather than the "the excerpt
# doesn't say... however, in the series..." escape pattern.
prompt = (
f"Excerpts from the indexed documents:\n\n{context}\n\n"
f"---\n\n"
f"Question: {message}\n\n"
f"Step 1 — Find the relevant passage: Quote the exact sentence(s) from "
f"the excerpts above that answer the question, or write NOT FOUND.\n\n"
f"Step 2 — Answer: Based solely on what you quoted in Step 1, answer "
f"the question with page citations [p.N]. If Step 1 is NOT FOUND, "
f"write: \"I could not find an answer to that question in the indexed documents.\""
)
answer = self._llm.complete(prompt, system=_SYSTEM_PROMPT)
answer = _strip_escape(answer)
citations = tuple(
Citation(

137
app/startup.py Normal file
View file

@ -0,0 +1,137 @@
# app/startup.py
"""DB migration and vec schema check utilities — called at startup and on first user request."""
from __future__ import annotations
import logging
import os
import re
import sqlite3
import subprocess
import threading
logger = logging.getLogger("pagepiper")
def warn_if_unencrypted(data_dir: str) -> None:
"""Log a warning if cloud mode is running without fscrypt encryption.
Checks whether the users/ subdirectory of data_dir is fscrypt-encrypted.
Non-fatal: warns but does not block startup.
"""
users_dir = os.path.join(data_dir, "users")
os.makedirs(users_dir, exist_ok=True)
if not _fscrypt_available():
logger.warning(
"SECURITY: fscrypt not found on this system. Cloud user data at %s is stored "
"unencrypted. Install fscrypt and run scripts/setup_cloud_fscrypt.sh to enable "
"encryption at rest.",
users_dir,
)
return
try:
result = subprocess.run(
["fscrypt", "status", users_dir],
capture_output=True, text=True, timeout=5,
)
if "Encrypted" not in result.stdout:
logger.warning(
"SECURITY: user data directory %s is not fscrypt-encrypted. "
"Run: sudo scripts/setup_cloud_fscrypt.sh <user_id>",
users_dir,
)
except Exception as exc:
logger.debug("fscrypt status check failed (non-fatal): %s", exc)
def _fscrypt_available() -> bool:
try:
subprocess.run(["fscrypt", "--version"], capture_output=True, timeout=2)
return True
except (FileNotFoundError, subprocess.TimeoutExpired):
return False
def apply_migrations(db_path: str) -> None:
from scripts.db_migrate import migrate
migrate(db_path)
def reembed_docs(docs: list[tuple[str, str]], db_path: str, vec_db_path: str) -> None:
for doc_id, file_path in docs:
suffix = os.path.splitext(file_path)[1].lower()
try:
if suffix == ".epub":
from scripts.ingest_epub import run
elif suffix == ".docx":
from scripts.ingest_docx import run
else:
from scripts.ingest_pdf import run
logger.info("Auto re-embed: starting %s", os.path.basename(file_path))
run(doc_id=doc_id, file_path=file_path, db_path=db_path, vec_db_path=vec_db_path)
except Exception as exc:
logger.error("Auto re-embed failed for doc %s: %s", doc_id[:8], exc)
def check_and_rebuild_vec_schema(vec_db_path: str, expected_dims: int, db_path: str) -> None:
"""Drop the vec DB if its stored dimension doesn't match config, then queue re-embed.
sqlite-vec bakes the embedding dimension into the virtual table DDL, so changing
models requires dropping and recreating the whole file. Catches the mismatch at
startup rather than surfacing it as an obscure OperationalError mid-request.
"""
if not os.path.exists(vec_db_path):
return
try:
conn = sqlite3.connect(vec_db_path)
row = conn.execute(
"SELECT sql FROM sqlite_master WHERE name='page_vecs_vecs'"
).fetchone()
conn.close()
except Exception as exc:
logger.warning("Vec schema check could not read %s (non-fatal): %s", vec_db_path, exc)
return
if not row:
return
m = re.search(r'float\[(\d+)\]', row[0])
if not m:
return
actual_dims = int(m.group(1))
if actual_dims == expected_dims:
return
logger.warning(
"Vec DB dimension mismatch: stored=%d, configured=%d — dropping %s and queuing re-embed",
actual_dims, expected_dims, vec_db_path,
)
try:
os.remove(vec_db_path)
except OSError as exc:
logger.error(
"Could not delete stale vec DB %s: %s — fix permissions and restart", vec_db_path, exc
)
return
try:
conn = sqlite3.connect(db_path)
docs = conn.execute(
"SELECT id, file_path FROM documents WHERE status='ready'"
).fetchall()
conn.close()
except Exception as exc:
logger.warning("Could not query documents for re-embed: %s", exc)
return
if not docs:
return
logger.info("Queuing re-embed for %d document(s) in background", len(docs))
threading.Thread(
target=reembed_docs,
args=(docs, db_path, vec_db_path),
daemon=True,
name="pagepiper-reembed",
).start()

View file

@ -12,8 +12,8 @@ OVERRIDE_ARGS=()
[[ -f "compose.override.yml" ]] && OVERRIDE_ARGS=(-f compose.override.yml)
usage() {
echo "Usage: $0 {start|stop|restart|status|logs [svc]|open|build|test"
echo " |cloud:start|cloud:stop|cloud:restart|cloud:status|cloud:logs [svc]|cloud:build}"
echo "Usage: $0 {start|stop|restart|status|logs [svc]|open|build|test|update"
echo " |cloud-start|cloud-stop|cloud-restart|cloud-status|cloud-logs [svc]|cloud-build|cloud-update}"
exit 1
}
@ -48,27 +48,39 @@ case "$cmd" in
test)
conda run -n cf pytest tests/ -v
;;
cloud:start)
update)
git pull
docker compose -f "$COMPOSE_FILE" "${OVERRIDE_ARGS[@]}" down
docker compose -f "$COMPOSE_FILE" "${OVERRIDE_ARGS[@]}" up -d --build
echo "Pagepiper updated and running → http://localhost:${WEB_PORT}"
;;
cloud-start)
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" up -d --build
echo "Pagepiper cloud running → http://localhost:${CLOUD_WEB_PORT}"
;;
cloud:stop)
cloud-stop)
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" down
;;
cloud:restart)
cloud-restart)
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" down
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" up -d --build
echo "Pagepiper cloud running → http://localhost:${CLOUD_WEB_PORT}"
;;
cloud:status)
cloud-status)
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" ps
;;
cloud:logs)
cloud-logs)
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" logs -f "${1:-}"
;;
cloud:build)
cloud-build)
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" build --no-cache
;;
cloud-update)
git pull
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" down
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" up -d --build
echo "Pagepiper cloud updated and running → http://localhost:${CLOUD_WEB_PORT}"
;;
*)
usage
;;

120
scripts/setup_cloud_fscrypt.sh Executable file
View file

@ -0,0 +1,120 @@
#!/usr/bin/env bash
# setup_cloud_fscrypt.sh — encrypt a cloud user's data directory with fscrypt.
#
# Run as root on the HOST (not inside the container) before first deployment.
# Requires: fscrypt >= 0.3, Linux kernel >= 4.1, ext4/f2fs filesystem.
#
# Usage:
# sudo ./scripts/setup_cloud_fscrypt.sh <user_id>
# sudo ./scripts/setup_cloud_fscrypt.sh --list # show all encrypted dirs
# sudo ./scripts/setup_cloud_fscrypt.sh --status <user_id>
#
# Environment:
# PAGEPIPER_DATA_DIR — base data directory (default: /devl/pagepiper-cloud-data)
#
# Key management:
# Keys are stored in the system protector backed by a passphrase or root keyring.
# For unattended unlock on server boot, use a raw_key protector derived from a
# secret in HashiCorp Vault or similar; see docs/encryption.md for details.
set -euo pipefail
DATA_DIR="${PAGEPIPER_DATA_DIR:-/devl/pagepiper-cloud-data}"
USERS_DIR="$DATA_DIR/users"
_usage() {
grep '^# ' "$0" | cut -c3-
exit 1
}
_require_root() {
if [[ "$EUID" -ne 0 ]]; then
echo "ERROR: this script must be run as root" >&2
exit 1
fi
}
_require_fscrypt() {
if ! command -v fscrypt &>/dev/null; then
echo "ERROR: fscrypt not found. Install with: apt-get install fscrypt" >&2
exit 1
fi
}
_check_fscrypt_setup() {
local mnt
mnt=$(df -P "$DATA_DIR" | tail -1 | awk '{print $6}')
if ! fscrypt status "$mnt" &>/dev/null; then
echo "Initialising fscrypt on $mnt..."
fscrypt setup --quiet "$mnt"
echo "fscrypt setup complete on $mnt"
fi
}
cmd="${1:-}"
case "$cmd" in
--list)
_require_root
_require_fscrypt
echo "Encrypted user directories under $USERS_DIR:"
find "$USERS_DIR" -maxdepth 1 -mindepth 1 -type d 2>/dev/null | while read -r dir; do
if fscrypt status "$dir" 2>/dev/null | grep -q "Encrypted"; then
echo " [encrypted] $dir"
else
echo " [plain] $dir"
fi
done
;;
--status)
_require_root
_require_fscrypt
user_id="${2:-}"
[[ -z "$user_id" ]] && { echo "Usage: $0 --status <user_id>" >&2; exit 1; }
user_dir="$USERS_DIR/$user_id"
if [[ ! -d "$user_dir" ]]; then
echo "Directory $user_dir does not exist"
exit 1
fi
fscrypt status "$user_dir"
;;
"")
_usage
;;
-*)
_usage
;;
*)
# Encrypt a user's directory
user_id="$1"
_require_root
_require_fscrypt
user_dir="$USERS_DIR/$user_id"
if [[ ! -d "$user_dir" ]]; then
echo "Creating user directory: $user_dir"
mkdir -p "$user_dir"
fi
if fscrypt status "$user_dir" 2>/dev/null | grep -q "Encrypted"; then
echo "Directory $user_dir is already encrypted."
exit 0
fi
# Warn if directory contains existing data — fscrypt encrypt migrates in place
if [[ -n "$(ls -A "$user_dir")" ]]; then
echo "WARNING: $user_dir is non-empty. fscrypt will encrypt files in place."
echo "Ensure the container is stopped and you have a backup before continuing."
read -rp "Continue? [y/N] " confirm
[[ "$confirm" =~ ^[Yy]$ ]] || exit 1
fi
_check_fscrypt_setup
echo "Encrypting $user_dir..."
fscrypt encrypt "$user_dir" --source=pam_passphrase --quiet
echo "Encryption set up for user $user_id. Directory: $user_dir"
echo ""
echo "IMPORTANT: unlock the directory before starting the container:"
echo " fscrypt unlock $user_dir"
;;
esac

View file

@ -27,13 +27,32 @@ def client(test_db, tmp_path, monkeypatch):
(tmp_path / "books").mkdir(exist_ok=True)
import app.main as _main_module
from app.main import app, _bm25
from app.deps import get_db
from app.config import LOCAL_USER_ID
from app.deps import UserCtx, get_db, get_user_ctx
from app.main import app
from app.services.bm25_index import BM25Index
from app.startup import apply_migrations, check_and_rebuild_vec_schema
# Suppress startup side effects — test_db fixture already applies the schema,
# and vec schema validation is tested separately in test_startup.py
monkeypatch.setattr(_main_module, "_apply_migrations", lambda: None)
monkeypatch.setattr(_main_module, "_check_vec_schema", lambda *a, **kw: None)
monkeypatch.setattr(_main_module, "_apply_migrations", lambda: None, raising=False)
monkeypatch.setattr(
"app.startup.apply_migrations", lambda *a, **kw: None
)
monkeypatch.setattr(
"app.startup.check_and_rebuild_vec_schema", lambda *a, **kw: None
)
test_bm25 = BM25Index()
test_bm25.mark_dirty()
def override_user_ctx():
return UserCtx(
user_id=LOCAL_USER_ID,
db_path=test_db,
vec_db_path=str(tmp_path / "test_vecs.db"),
data_dir=Path(tmp_path),
watch_dir=Path(tmp_path) / "books",
bm25=test_bm25,
)
def override_db():
conn = sqlite3.connect(test_db)
@ -44,7 +63,7 @@ def client(test_db, tmp_path, monkeypatch):
finally:
conn.close()
app.dependency_overrides[get_user_ctx] = override_user_ctx
app.dependency_overrides[get_db] = override_db
_bm25.mark_dirty() # clear any state from previous tests
yield TestClient(app)
app.dependency_overrides.clear()

View file

@ -66,3 +66,40 @@ def test_reingest_updates_status_to_processing(client, test_db, tmp_path):
# Document should be in processing state (or beyond if stub ingest ran instantly)
status_resp = client.get(f"/api/library/{doc_id}/status")
assert status_resp.json()["status"] in ("processing", "error", "ready")
def _add_chunks(db_path: str, doc_id: str, count: int) -> None:
conn = sqlite3.connect(db_path)
for i in range(count):
conn.execute(
"INSERT INTO page_chunks(doc_id, page_number, text, source, word_count) VALUES (?,?,?,?,?)",
[doc_id, i + 1, f"Page {i + 1} text content.", "text_layer", 4],
)
conn.commit()
conn.close()
def test_sample_chunks_empty_returns_empty(client):
resp = client.get("/api/library/sample-chunks")
assert resp.status_code == 200
assert resp.json() == []
def test_sample_chunks_returns_fields(client, test_db):
doc_id = _add_doc(test_db, "Monster Manual", "/books/mm.pdf")
_add_chunks(test_db, doc_id, 5)
resp = client.get("/api/library/sample-chunks?limit=3")
assert resp.status_code == 200
chunks = resp.json()
assert len(chunks) == 3
for c in chunks:
assert {"chunk_id", "doc_id", "page_number", "text"} == set(c.keys())
assert c["doc_id"] == doc_id
def test_sample_chunks_limit_capped_at_200(client, test_db):
doc_id = _add_doc(test_db, "Big Book", "/books/big.pdf")
_add_chunks(test_db, doc_id, 10)
resp = client.get("/api/library/sample-chunks?limit=9999")
assert resp.status_code == 200
assert len(resp.json()) <= 200

View file

@ -20,9 +20,7 @@ def _add_chunks(db_path: str, doc_id: str, chunks: list[dict]) -> None:
conn.close()
def test_search_returns_results(client, test_db, monkeypatch):
import app.api.search as _search_mod
monkeypatch.setattr(_search_mod, "_get_db_path", lambda: test_db)
def test_search_returns_results(client, test_db):
# BM25Okapi IDF is 0 when df == N/2 (e.g. 2 docs, 1 match → log(1.0) = 0).
# Add a 3rd unrelated chunk so relevant terms score above zero.
_add_chunks(test_db, "book-a", [
@ -46,9 +44,7 @@ def test_search_empty_index_returns_empty(client):
assert resp.json() == []
def test_search_filters_by_doc_ids(client, test_db, monkeypatch):
import app.api.search as _search_mod
monkeypatch.setattr(_search_mod, "_get_db_path", lambda: test_db)
def test_search_filters_by_doc_ids(client, test_db):
# Three chunks so BM25Okapi IDF is non-zero for terms appearing in one doc.
_add_chunks(test_db, "book-a", [
{"page_number": 1, "text": "Grapple rules for melee attacks."},

View file

@ -9,7 +9,8 @@ from unittest.mock import MagicMock, patch
import pytest
from app.main import _check_vec_schema, _reembed_docs
from app.startup import check_and_rebuild_vec_schema as _check_vec_schema
from app.startup import reembed_docs as _reembed_docs
def _make_vec_db(path: str, dims: int) -> None: