Compare commits

...

10 commits
v0.1.0 ... main

Author SHA1 Message Date
9d33b1ab54 docs: add status badge (beta) 2026-05-15 20:13:53 -07:00
bcd321367e feat: GET /api/library/sample-chunks for Avocet embed bench (closes #6)
Returns up to N randomly sampled page chunks (default 50, max 200) with
chunk_id, doc_id, page_number, and text fields. No tier gate — internal
tool endpoint for same-host corpus benchmarking. Returns [] on empty library.
2026-05-13 23:01:16 -07:00
1e066cf66c feat: encryption at rest infrastructure for cloud user data (closes #5)
Implements Option B (fscrypt) from the issue design: OS-level filesystem
encryption for per-user data directories on the cloud host.

- app/startup.py: warn_if_unencrypted() checks for fscrypt at startup in
  cloud mode and logs a SECURITY warning if the users/ directory is not
  encrypted — catches misconfigured deployments before any data is stored
- app/main.py: call warn_if_unencrypted() during lifespan in cloud mode
- scripts/setup_cloud_fscrypt.sh: operator script to encrypt a user's
  data directory with fscrypt (run as root on host before container start);
  supports --list and --status subcommands

Key management note: current implementation uses pam_passphrase protector.
For unattended server boot, integrate a raw_key protector from a secrets
manager (Vault, AWS Secrets Manager, etc.) — see script comments.

SQLCipher (Option A) deferred: sqlite-vec virtual table compatibility with
SQLCipher's encrypted VFS needs investigation before committing to that path.
2026-05-13 18:35:17 -07:00
8eef52a054 feat: per-user database isolation for cloud instances (closes #4)
Implements Option A from the issue design: each cloud user gets their own
data directory (DATA_DIR/users/{user_id}/) with separate pagepiper.db,
pagepiper_vecs.db, uploads/, and books/. Local mode is unchanged.

Key changes:
- app/startup.py: extract apply_migrations, reembed_docs,
  check_and_rebuild_vec_schema out of main.py (no circular imports)
- app/config.py: add LOCAL_USER_ID constant and user_data_dir() helper
- app/cloud_session.py: extract resolve_authenticated_user(); require_paid_tier
  now returns user_id (str) instead of None
- app/deps.py: add UserCtx dataclass (db_path, vec_db_path, data_dir,
  watch_dir, bm25) + get_user_ctx dependency; per-user startup guard runs
  migrations + vec schema check once per process per user
- app/main.py: _bm25 singleton -> _bm25_map dict keyed by user_id;
  add _get_bm25_for(); lifespan only runs startup checks in local mode
- app/api/library.py, search.py, chat.py: thread UserCtx through all
  endpoints; remove module-level _mark_bm25_dirty injection pattern
- tests/conftest.py: override get_user_ctx in addition to get_db so all
  endpoints get a consistent test UserCtx
2026-05-13 16:31:51 -07:00
df9e91ad89 chore: standardize cloud commands to hyphen syntax + add update command
Closes #1: rename cloud:* subcommands to cloud-* (hyphen is the
conventional CLI separator; colon syntax is non-standard).

Closes #2: add update (git pull + rebuild) for both local and cloud
stacks, covering the common deploy-from-git workflow.
2026-05-13 16:12:12 -07:00
3765fbc0f9 fix: quote-first prompt structure + escape phrase post-processing to kill hallucinations
three-layer approach to stop 7B model from supplementing retrieved context
with training-data knowledge:

1. system prompt redesigned: 'no memory of books/stories/authors' eliminates
   the model's self-permission to draw on parametric knowledge

2. quote-first prompt structure: model must commit to a specific quoted passage
   before generating an answer — explicit NOT FOUND required when excerpts lack
   the answer, preventing the 'excerpt doesn't say X... however in the series...'
   escape pattern

3. _strip_escape() post-processor: catches any residual leakage by scanning for
   known escape phrases ('in the series', 'by terry goodkind', 'it can be assumed',
   etc.) and replacing the response with the canned no-answer message
2026-05-06 10:30:11 -07:00
32cb21e2cd fix: reinforce no-hallucination constraint in user-turn prompt; cap per-doc retrieval
synthesizer: repeat the no-outside-knowledge rule inside the user message turn —
small models (7B) follow user-turn instructions more reliably than system-prompt
alone when parametric memory competes with the retrieved context

retriever: cap each document to max(2, top_k//3) slots in the ranked list so
one book cannot flood all result slots on character-name BM25 matches — forces
coverage across more documents when the answer may be in any of them
2026-05-06 10:26:51 -07:00
347b391c6e fix: prevent LLM hallucination when retrieval returns low-signal results
- Strengthen synthesizer system prompt: hard 'respond with exactly' constraint
  instead of soft 'say so'; removes any wiggle room for the model to supplement
  from training data
- Add early return in synthesize() when chunks is empty (belt-and-suspenders
  alongside the existing guard in chat.py)
- Add MIN_SIGNAL threshold (0.01) in retriever: if the top combined score is
  below the threshold, return empty so the caller's no-results path fires instead
  of sending noise chunks to the LLM
2026-05-06 10:17:51 -07:00
895d0b6129 docs(readme): landing page rewrite — screenshots, quick start, formats table, tiers, Forgejo-primary, split license 2026-05-06 08:51:38 -07:00
b105a0fc14 docs: add Playwright screenshots for library and chat views 2026-05-06 08:39:57 -07:00
19 changed files with 839 additions and 333 deletions

192
README.md
View file

@ -1,75 +1,67 @@
# Pagepiper
**v0.1.0** | Self-hosted PDF and EPUB search for your personal library
**Search your document library. Get answers with exact page citations.**
Pagepiper lets you drop PDFs and EPUBs into a library, index them, and search across the full text. With [Ollama](https://ollama.com) configured, you also get hybrid vector search and an LLM (large language model) chat interface that cites specific page numbers when it answers.
[![Status](https://img.shields.io/badge/status-beta-blue)](https://git.opensourcesolarpunk.com/Circuit-Forge/pagepiper)
[![License: MIT / BSL 1.1](https://img.shields.io/badge/license-MIT%20%2F%20BSL%201.1-blue)](LICENSE)
[![Version](https://img.shields.io/badge/version-v0.1.0-orange)](https://git.opensourcesolarpunk.com/Circuit-Forge/pagepiper/releases)
Built for TTRPG (tabletop roleplaying game) players tired of ctrl-F'ing through Pathfinder core rulebooks. Works equally well for fan fiction EPUB collections, AO3 exports, and any personal document library.
Self-hosted PDF and EPUB search with BM25 (Best Match 25) full-text indexing and LLM (large language model) synthesis. Drop your documents in, ask a question, get an answer that tells you exactly which page to turn to.
Try it: [pagepiper.circuitforge.tech](https://pagepiper.circuitforge.tech)
Built for TTRPG (tabletop roleplaying game) players who are tired of ctrl-F'ing through six-hundred-page rulebooks. Works equally well for legal research, technical manuals, academic papers, or any personal document library you want to query in plain language.
No cloud required. Your files stay on your machine.
---
## Features
## Screenshots
| Feature | Free tier | Paid (BYOK) |
|---------|-----------|-------------|
| PDF and EPUB upload via browser drag-and-drop | Yes | Yes |
| Directory scan for existing files | Yes | Yes |
| BM25 full-text search (no LLM required) | Yes | Yes |
| Unlimited local ingestion | Yes | Yes |
| Hybrid BM25 + k-NN vector search | No | Yes (local Ollama) |
| LLM chat with page-level citations | No | Yes (local Ollama) |
| Thumbs up / down feedback on answers | No | Yes |
### Library
BYOK (bring your own key) means you supply your own Ollama instance. No cloud API keys, no usage billing.
![Library view — documents listed with ingest status and page counts](docs/screenshots/01-library.png)
**BM25** (Best Match 25) is a keyword ranking algorithm. It works without any LLM and runs entirely inside the Docker container. **k-NN** (k-nearest neighbor) vector search uses embeddings to find passages that are semantically similar to your question, even when the exact words don't match.
### Chat with citations
![Chat view — answer with source document and page number for every claim](docs/screenshots/02-chat.png)
---
## Tech Stack
## Why Pagepiper?
- **Backend:** FastAPI + SQLite (BM25 via custom BM25Index, vectors via sqlite-vec)
- **Frontend:** Vue 3 SPA served by nginx
- **Embedding model:** `nomic-embed-text` via Ollama (1024-dim, optional)
- **Chat LLM:** `mistral:7b` via Ollama (optional, any Ollama model works)
- **Deployment:** Docker Compose
- **Your library, not ours.** Documents are indexed and stored locally. Nothing is sent to a third-party service unless you explicitly configure a cloud LLM.
- **Works without an LLM.** BM25 full-text search runs entirely inside the Docker container. No Ollama, no API key, no GPU required for keyword search.
- **Answers cite their sources.** Every LLM response includes the document name and page number it drew from. You can verify or dispute every answer.
- **Hybrid search when you want it.** Connect a local Ollama instance to unlock semantic (vector) search that finds relevant passages even when your question doesn't use the exact words in the text.
- **Open ingest pipeline.** The indexing and search layer is MIT-licensed. Add support for new formats, improve the PDF parser, contribute — the community benefits directly.
---
## Quick Start (Self-Hosting)
## Quick Start
### Prerequisites
- [Docker](https://docs.docker.com/get-docker/) and Docker Compose
- PDFs or EPUBs you want to search
- Optional: [Ollama](https://ollama.com) for semantic search and RAG (retrieval-augmented generation) chat
### 1. Clone the repo
**Prerequisites:** [Docker](https://docs.docker.com/get-docker/) and Docker Compose. Optionally [Ollama](https://ollama.com) for LLM-synthesized answers.
```bash
git clone https://git.opensourcesolarpunk.com/Circuit-Forge/pagepiper
cd pagepiper
```
### 2. Configure
```bash
cp .env.example .env
./manage.sh start
```
Open [http://localhost:8521](http://localhost:8521).
### Configure
Open `.env` and set your paths:
```dotenv
# Directory to scan for PDFs/EPUBs (used by the "Scan" button in the UI)
PAGEPIPER_BOOKS_DIR=/path/to/your/pdfs
# Where Pagepiper stores its SQLite index and uploaded files
PAGEPIPER_DATA_DIR=data
PAGEPIPER_DATA_DIR=./data
# Directory to scan for existing PDFs/EPUBs (used by the Scan button)
PAGEPIPER_BOOKS_DIR=/path/to/your/documents
```
To unlock hybrid search and LLM chat, uncomment and set the Ollama block:
To unlock LLM synthesis and semantic search, add your Ollama endpoint:
```dotenv
PAGEPIPER_OLLAMA_URL=http://localhost:11434
@ -77,121 +69,81 @@ PAGEPIPER_CHAT_MODEL=mistral:7b
PAGEPIPER_EMBED_MODEL=nomic-embed-text
```
### 3. Start
### Add documents
```bash
./manage.sh start
```
**Upload via browser** — click **Upload** in the Library view. Files save to `data/uploads/` and index automatically.
Open [http://localhost:8521](http://localhost:8521).
### 4. Add documents
Two ways to add files:
**Upload via browser** (easiest for small collections): Click **Upload** in the Library view and select a PDF or EPUB. The file saves to `data/uploads/` and begins indexing automatically.
**Scan a directory** (best for large collections): Set `PAGEPIPER_BOOKS_DIR` in your `.env` to a folder of PDFs/EPUBs, then click **Scan** in the Library view. Pagepiper finds all files recursively and queues them for indexing.
### 5. Search and chat
Switch to the **Chat** tab and ask questions. On the free tier, BM25 keyword search returns matching passages. With Ollama configured, you get semantic search and an LLM-generated answer with page-number citations.
**Scan a directory** — set `PAGEPIPER_BOOKS_DIR` in `.env`, then click **Scan**. Pagepiper finds all files recursively and queues them.
---
## Ollama Setup (optional)
## Supported Formats
Install Ollama from [ollama.com](https://ollama.com), then pull the models:
```bash
ollama pull mistral:7b
ollama pull nomic-embed-text
```
On a headless Linux server, make Ollama listen on all interfaces so the Docker container can reach it:
```bash
OLLAMA_HOST=0.0.0.0 ollama serve
```
On Docker Desktop (Linux or Mac), `host.docker.internal` resolves automatically. No extra network config needed.
| Format | Ingest | Page-level citations |
|--------|--------|----------------------|
| PDF | Yes | Yes |
| EPUB | Yes | Yes (chapter/location) |
---
## Environment Variables
## Stack
| Variable | Default | Description |
|----------|---------|-------------|
| `PAGEPIPER_BOOKS_DIR` | `./books` | Host directory to scan for PDFs and EPUBs |
| `PAGEPIPER_DATA_DIR` | `./data` | SQLite index and uploaded files live here |
| `PAGEPIPER_OLLAMA_URL` | *(unset)* | Ollama base URL; leave blank for BM25-only mode |
| `PAGEPIPER_EMBED_MODEL` | `nomic-embed-text` | Ollama embedding model (1024-dim default) |
| `PAGEPIPER_EMBED_DIMS` | `1024` | Must match the embedding model's output dimensions |
| `PAGEPIPER_CHAT_MODEL` | `mistral:7b` | Ollama chat model; any Ollama model name works |
| `PAGEPIPER_CHAT_FEEDBACK` | *(unset)* | Set to `true` to enable thumbs up/down on chat answers |
| Layer | Technology |
|-------|-----------|
| Backend API | FastAPI + SQLite |
| Full-text search | BM25 (custom index, no external service) |
| Vector search | sqlite-vec + Ollama embeddings (optional) |
| LLM synthesis | Ollama (local, any model) |
| Frontend | Vue 3 SPA served by nginx |
| Deployment | Docker Compose |
Default ports: Web UI `8521`, API `8540`.
---
## Management
```bash
./manage.sh start # Build and start (dev)
./manage.sh start # Build and start
./manage.sh stop # Stop
./manage.sh restart # Restart
./manage.sh status # Show container status
./manage.sh logs [svc] # Tail logs (default: all services; pass 'api' or 'web' to filter)
./manage.sh open # Open the UI in your browser
./manage.sh build # Rebuild images without cache
./manage.sh cloud:start # Start the cloud managed instance (port 8533)
./manage.sh cloud:stop
./manage.sh cloud:restart
./manage.sh cloud:status
./manage.sh cloud:logs [svc]
./manage.sh cloud:build
./manage.sh logs [svc] # Tail logs (pass 'api' or 'web' to filter)
./manage.sh open # Open UI in browser
./manage.sh build # Rebuild images
./manage.sh test # Run test suite
```
---
## Cloud Managed Instance
## Tiers
The cloud deployment runs at [pagepiper.circuitforge.tech](https://pagepiper.circuitforge.tech) and at `menagerie.circuitforge.tech/pagepiper`. It uses `compose.cloud.yml` with LLM inference routed through the cf-orch coordinator.
| Feature | Free | Paid (BYOK) |
|---------|------|-------------|
| PDF and EPUB upload | Yes | Yes |
| Directory scan | Yes | Yes |
| BM25 full-text search | Yes | Yes |
| Unlimited local ingestion | Yes | Yes |
| Hybrid BM25 + vector search | — | Yes (local Ollama) |
| LLM synthesis with page citations | — | Yes (local Ollama) |
To run your own cloud-style deployment:
```bash
cp .env.cloud.example .env
# Edit .env: set PAGEPIPER_OLLAMA_URL and data paths
./manage.sh cloud:start
```
Cloud instance listens on port 8533. The API is internal-only; nginx proxies `/api/` to the backend.
BYOK means you supply your own Ollama instance. No cloud API keys, no usage metering.
---
## Data and Backups
## Forgejo-primary
The `data/` directory contains the SQLite index database and all uploaded files. Back it up to preserve your index. Pagepiper indexes documents at ingest time. If you modify or replace a source file, use the re-index button on the document card to rebuild its entry.
Large PDFs (hundreds of pages) can take a few minutes to index. The status badge on the document card updates as indexing progresses.
Pagepiper is developed and hosted at [git.opensourcesolarpunk.com/Circuit-Forge/pagepiper](https://git.opensourcesolarpunk.com/Circuit-Forge/pagepiper). GitHub mirrors exist for discoverability only. File issues and submit pull requests on Forgejo.
---
## Licensing
## License
Pagepiper uses a split license:
- **MIT:** BM25 full-text search, document library management, ingest pipeline, EPUB support
- **BSL 1.1:** Hybrid vector search (embedding + k-NN), RAG chat, LLM integration
BSL 1.1 is free for personal non-commercial self-hosting. SaaS re-hosting or commercial redistribution requires a license from CircuitForge. BSL 1.1 converts to MIT after four years.
License keys: [circuitforge.tech](https://circuitforge.tech)
- **MIT:** Document ingest pipeline, BM25 full-text index, library management, EPUB support — the core discovery and retrieval layer.
- **BSL 1.1 (Business Source License):** Hybrid vector search, LLM synthesis, RAG (retrieval-augmented generation) chat interface — free for personal non-commercial self-hosting; commercial use or SaaS re-hosting requires a license. Converts to MIT after four years.
---
## Contributing
Issues and PRs welcome at [git.opensourcesolarpunk.com/Circuit-Forge/pagepiper](https://git.opensourcesolarpunk.com/Circuit-Forge/pagepiper).
The ingest pipeline and BM25 index are MIT-licensed. If you build a better PDF parser or add support for additional formats (CBZ, MOBI, etc.), the community benefits directly.
*A [Circuit Forge LLC](https://circuitforge.tech) product. Privacy · Safety · Accessibility — co-equal, non-negotiable.*

View file

@ -10,9 +10,11 @@ from __future__ import annotations
import logging
import os
from fastapi import APIRouter, HTTPException
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from app.cloud_session import require_paid_tier
from app.deps import UserCtx, get_user_ctx
from app.services.retriever import Retriever
from app.services.synthesizer import Synthesizer
@ -56,21 +58,6 @@ def _get_llm_router():
return LLMRouter(cfg)
def _get_db_path() -> str:
"""Read lazily so test fixtures take effect."""
import pathlib
data_dir = pathlib.Path(os.environ.get("PAGEPIPER_DATA_DIR", "data"))
return str(data_dir / "pagepiper.db")
def _get_vec_db_path() -> str:
import pathlib
data_dir = pathlib.Path(os.environ.get("PAGEPIPER_DATA_DIR", "data"))
return str(data_dir / "pagepiper_vecs.db")
def _require_llm():
"""Return LLMRouter or raise 402."""
llm = _get_llm_router()
@ -89,18 +76,20 @@ def _require_llm():
@router.post("")
def chat(req: ChatRequest) -> ChatResponse:
def chat(
req: ChatRequest,
ctx: UserCtx = Depends(get_user_ctx),
_tier: str = Depends(require_paid_tier),
) -> ChatResponse:
llm = _require_llm()
from app.main import _bm25
retriever = Retriever(_bm25)
retriever = Retriever(ctx.bm25)
chunks = retriever.hybrid_search(
query=req.message,
top_k=req.top_k,
doc_ids=req.doc_ids,
db_path=_get_db_path(),
vec_db_path=_get_vec_db_path(),
db_path=ctx.db_path,
vec_db_path=ctx.vec_db_path,
llm=llm,
)
@ -141,7 +130,10 @@ def chat_feedback_status() -> dict:
@router.post("/feedback")
def submit_chat_feedback(req: ChatFeedbackRequest) -> dict:
def submit_chat_feedback(
req: ChatFeedbackRequest,
ctx: UserCtx = Depends(get_user_ctx),
) -> dict:
import json
import sqlite3
@ -149,8 +141,7 @@ def submit_chat_feedback(req: ChatFeedbackRequest) -> dict:
from fastapi import HTTPException
raise HTTPException(status_code=422, detail="rating must be 1 or -1")
db_path = _get_db_path()
con = sqlite3.connect(db_path)
con = sqlite3.connect(ctx.db_path)
try:
con.execute(
"INSERT INTO chat_feedback (rating, question, answer, doc_ids) VALUES (?, ?, ?, ?)",

View file

@ -14,26 +14,25 @@ from typing import Callable
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, UploadFile
from app.config import WATCH_DIR, DB_PATH, VEC_DB_PATH, DATA_DIR
from app.deps import get_db
from app.config import VEC_DIMENSIONS
from app.deps import UserCtx, get_db, get_user_ctx
_MAX_UPLOAD_BYTES = 200 * 1024 * 1024 # 200 MB
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/library", tags=["library"])
# Injected by main.py after _bm25 is created
_mark_bm25_dirty: Callable[[], None] | None = None
_INGEST_TASKS = {
".pdf": "pagepiper/ingest_pdf",
".epub": "pagepiper/ingest_epub",
".docx": "pagepiper/ingest_docx",
}
_INGEST_RUNNERS = {
".pdf": "scripts.ingest_pdf",
".epub": "scripts.ingest_epub",
".docx": "scripts.ingest_docx",
}
@ -41,24 +40,22 @@ def _dispatch_ingest(
doc_id: str,
file_path: str,
background_tasks: BackgroundTasks,
data_dir: Path,
mark_dirty_fn: Callable[[], None],
) -> str:
"""Dispatch an ingest task. Tries cf-orch; falls back to BackgroundTasks."""
import importlib
import os as _os
from pathlib import Path as _Path
suffix = _Path(file_path).suffix.lower()
suffix = Path(file_path).suffix.lower()
task_name = _INGEST_TASKS.get(suffix, "pagepiper/ingest_pdf")
runner_module = _INGEST_RUNNERS.get(suffix, "scripts.ingest_pdf")
# Read lazily so test fixtures (monkeypatch.setenv) take effect
_data_dir = _Path(_os.environ.get("PAGEPIPER_DATA_DIR", "data"))
task_id = str(uuid.uuid4())
args = {
"doc_id": doc_id,
"file_path": file_path,
"db_path": str(_data_dir / "pagepiper.db"),
"vec_db_path": str(_data_dir / "pagepiper_vecs.db"),
"db_path": str(data_dir / "pagepiper.db"),
"vec_db_path": str(data_dir / "pagepiper_vecs.db"),
}
try:
@ -67,7 +64,7 @@ def _dispatch_ingest(
logger.info("Dispatched cf-orch ingest task %s for doc %s", task_id, doc_id)
except Exception:
mod = importlib.import_module(runner_module)
background_tasks.add_task(_run_ingest_background, mod.run, args, task_id)
background_tasks.add_task(_run_ingest_background, mod.run, args, task_id, mark_dirty_fn)
logger.info(
"cf-orch unavailable — running ingest in background thread (task %s)", task_id
)
@ -75,19 +72,47 @@ def _dispatch_ingest(
return task_id
def _run_ingest_background(run_fn: Callable[..., None], args: dict, task_id: str) -> None:
def _run_ingest_background(
run_fn: Callable[..., None],
args: dict,
task_id: str,
mark_dirty_fn: Callable[[], None] | None = None,
) -> None:
from app.api.ingest import _task_registry
_task_registry[task_id] = {"status": "running", "progress": 0}
try:
run_fn(**args)
_task_registry[task_id] = {"status": "complete", "progress": 100}
if _mark_bm25_dirty:
_mark_bm25_dirty()
if mark_dirty_fn:
mark_dirty_fn()
except Exception as exc:
logger.exception("Ingest task %s failed", task_id)
_task_registry[task_id] = {"status": "error", "error": str(exc)}
@router.get("/sample-chunks")
def sample_chunks(
limit: int = 50,
db: sqlite3.Connection = Depends(get_db),
) -> list[dict]:
"""Return up to `limit` randomly sampled page chunks for corpus benchmarking.
No tier gate internal tool, same-host access only. Returns [] if empty.
"""
if limit < 1:
limit = 1
elif limit > 200:
limit = 200
rows = db.execute(
"SELECT id, doc_id, page_number, text FROM page_chunks ORDER BY RANDOM() LIMIT ?",
[limit],
).fetchall()
return [
{"chunk_id": r["id"], "doc_id": r["doc_id"], "page_number": r["page_number"], "text": r["text"]}
for r in rows
]
@router.get("")
def list_library(db: sqlite3.Connection = Depends(get_db)) -> list[dict]:
rows = db.execute(
@ -101,13 +126,18 @@ def list_library(db: sqlite3.Connection = Depends(get_db)) -> list[dict]:
def scan_library(
background_tasks: BackgroundTasks,
db: sqlite3.Connection = Depends(get_db),
ctx: UserCtx = Depends(get_user_ctx),
) -> dict:
"""Scan the watched directory and queue ingest for any new PDFs."""
watch = WATCH_DIR
watch = ctx.watch_dir
if not watch.exists():
raise HTTPException(status_code=404, detail=f"Watch directory not found: {watch}")
pdfs = list(watch.glob("**/*.pdf")) + list(watch.glob("**/*.epub"))
pdfs = (
list(watch.glob("**/*.pdf"))
+ list(watch.glob("**/*.epub"))
+ list(watch.glob("**/*.docx"))
)
queued = []
for pdf_path in pdfs:
@ -117,7 +147,7 @@ def scan_library(
).fetchone()
if existing and existing["status"] == "ready":
continue # already indexed
continue
if existing:
doc_id = existing["id"]
@ -129,7 +159,9 @@ def scan_library(
).fetchone()[0]
db.commit()
task_id = _dispatch_ingest(doc_id, path_str, background_tasks)
task_id = _dispatch_ingest(
doc_id, path_str, background_tasks, ctx.data_dir, ctx.bm25.mark_dirty
)
db.execute(
"UPDATE documents SET status='processing', task_id=? WHERE id=?",
[task_id, doc_id],
@ -145,12 +177,15 @@ def reingest_document(
doc_id: str,
background_tasks: BackgroundTasks,
db: sqlite3.Connection = Depends(get_db),
ctx: UserCtx = Depends(get_user_ctx),
) -> dict:
row = db.execute("SELECT file_path FROM documents WHERE id=?", [doc_id]).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Document not found")
task_id = _dispatch_ingest(doc_id, row["file_path"], background_tasks)
task_id = _dispatch_ingest(
doc_id, row["file_path"], background_tasks, ctx.data_dir, ctx.bm25.mark_dirty
)
db.execute(
"UPDATE documents SET status='processing', task_id=?, error_msg=NULL WHERE id=?",
[task_id, doc_id],
@ -163,6 +198,7 @@ def reingest_document(
def delete_document(
doc_id: str,
db: sqlite3.Connection = Depends(get_db),
ctx: UserCtx = Depends(get_user_ctx),
) -> None:
row = db.execute("SELECT id FROM documents WHERE id=?", [doc_id]).fetchone()
if not row:
@ -171,23 +207,21 @@ def delete_document(
db.execute("DELETE FROM documents WHERE id=?", [doc_id])
db.commit()
# Remove embeddings from vector store
try:
from circuitforge_core.vector.sqlite_vec import LocalSQLiteVecStore # type: ignore[import]
from app.config import VEC_DIMENSIONS
store = LocalSQLiteVecStore(db_path=VEC_DB_PATH, table="page_vecs", dimensions=VEC_DIMENSIONS)
store = LocalSQLiteVecStore(
db_path=ctx.vec_db_path, table="page_vecs", dimensions=VEC_DIMENSIONS
)
store.delete_where({"doc_id": doc_id})
except Exception as exc:
logger.warning("Could not remove vectors for doc %s: %s", doc_id, exc)
if _mark_bm25_dirty:
_mark_bm25_dirty()
ctx.bm25.mark_dirty()
def _get_vec_count(doc_id: str) -> int:
"""Return how many vectors have been stored for this doc. Returns 0 on any error."""
def _get_vec_count(doc_id: str, vec_db_path: str) -> int:
try:
conn = sqlite3.connect(VEC_DB_PATH)
conn = sqlite3.connect(vec_db_path)
count = conn.execute(
"SELECT COUNT(*) FROM page_vecs_meta WHERE json_extract(metadata, '$.doc_id') = ?",
[doc_id],
@ -202,6 +236,7 @@ def _get_vec_count(doc_id: str) -> int:
def document_status(
doc_id: str,
db: sqlite3.Connection = Depends(get_db),
ctx: UserCtx = Depends(get_user_ctx),
) -> dict:
row = db.execute(
"SELECT id, status, task_id, page_count, error_msg FROM documents WHERE id=?",
@ -210,7 +245,7 @@ def document_status(
if not row:
raise HTTPException(status_code=404, detail="Document not found")
result = dict(row)
result["vec_count"] = _get_vec_count(doc_id)
result["vec_count"] = _get_vec_count(doc_id, ctx.vec_db_path)
return result
@ -219,18 +254,19 @@ def upload_document(
file: UploadFile,
background_tasks: BackgroundTasks,
db: sqlite3.Connection = Depends(get_db),
ctx: UserCtx = Depends(get_user_ctx),
) -> dict:
"""Accept a PDF/EPUB upload, save to data/uploads/, and queue for indexing."""
name = Path(file.filename or "").name
suffix = Path(name).suffix.lower()
if suffix not in _INGEST_TASKS:
raise HTTPException(status_code=400, detail="Supported formats: PDF, EPUB")
raise HTTPException(status_code=400, detail="Supported formats: PDF, EPUB, DOCX")
content = file.file.read()
if len(content) > _MAX_UPLOAD_BYTES:
raise HTTPException(status_code=413, detail="File exceeds 200 MB limit")
upload_dir = DATA_DIR / "uploads"
upload_dir = ctx.data_dir / "uploads"
upload_dir.mkdir(parents=True, exist_ok=True)
dest = upload_dir / name
dest.write_bytes(content)
@ -253,7 +289,9 @@ def upload_document(
).fetchone()[0]
db.commit()
task_id = _dispatch_ingest(doc_id, path_str, background_tasks)
task_id = _dispatch_ingest(
doc_id, path_str, background_tasks, ctx.data_dir, ctx.bm25.mark_dirty
)
db.execute(
"UPDATE documents SET status='processing', task_id=? WHERE id=?",
[task_id, doc_id],

View file

@ -7,13 +7,11 @@ MIT — no tier gate. No Ollama required.
from __future__ import annotations
import logging
import os
from typing import Annotated
from fastapi import APIRouter, Depends
from pydantic import BaseModel, Field
from app.services.bm25_index import BM25Index
from app.deps import UserCtx, get_user_ctx
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/search", tags=["search"])
@ -29,32 +27,17 @@ class SearchResult(BaseModel):
chunk_id: str
doc_id: str
page_number: int
text_snippet: str # first 300 chars of the page text
text_snippet: str
bm25_score: float
def _get_bm25() -> BM25Index:
import app.main as _main
bm25 = getattr(_main, "_bm25", None)
if bm25 is None:
raise RuntimeError("BM25 index not initialised — app.main not loaded")
return bm25
def _get_db_path() -> str:
"""Read lazily so test fixtures (monkeypatch.setattr) take effect."""
import pathlib
data_dir = pathlib.Path(os.environ.get("PAGEPIPER_DATA_DIR", "data"))
return str(data_dir / "pagepiper.db")
@router.post("")
def search(
req: SearchRequest,
bm25: Annotated[BM25Index, Depends(_get_bm25)],
ctx: UserCtx = Depends(get_user_ctx),
) -> list[SearchResult]:
bm25.ensure_fresh(_get_db_path())
hits = bm25.query(req.query, top_k=req.top_k, doc_ids=req.doc_ids)
ctx.bm25.ensure_fresh(ctx.db_path)
hits = ctx.bm25.query(req.query, top_k=req.top_k, doc_ids=req.doc_ids)
return [
SearchResult(
chunk_id=h.chunk_id,

131
app/cloud_session.py Normal file
View file

@ -0,0 +1,131 @@
# app/cloud_session.py
"""Cloud session auth for Pagepiper — validates cf_session cookie via Directus + Heimdall.
In local mode (CLOUD_MODE unset or false), require_paid_tier is a no-op.
In cloud mode, the Caddy proxy forwards the browser's Cookie header as
X-CF-Session. This module extracts cf_session, validates it against
Directus /users/me, then checks the user's Pagepiper tier via Heimdall.
Auto-provisions a free tier key for new users.
"""
from __future__ import annotations
import logging
import os
import re
import httpx
from fastapi import HTTPException, Request
log = logging.getLogger(__name__)
CLOUD_MODE: bool = os.environ.get("CLOUD_MODE", "").lower() in ("1", "true", "yes")
DIRECTUS_URL: str = os.environ.get("DIRECTUS_URL", "http://172.31.0.3:8055").rstrip("/")
HEIMDALL_URL: str = os.environ.get("HEIMDALL_URL", "https://license.circuitforge.tech").rstrip("/")
HEIMDALL_ADMIN_TOKEN: str = os.environ.get("HEIMDALL_ADMIN_TOKEN", "")
_TIER_ORDER = {"free": 0, "paid": 1, "premium": 2, "ultra": 3}
def _extract_session_token(cookie_header: str) -> str:
m = re.search(r"(?:^|;)\s*cf_session=([^;]+)", cookie_header)
return m.group(1).strip() if m else ""
def _get_user_id(jwt: str) -> str | None:
try:
resp = httpx.get(
f"{DIRECTUS_URL}/users/me",
headers={"Authorization": f"Bearer {jwt}"},
timeout=5.0,
)
if resp.status_code == 200:
return resp.json().get("data", {}).get("id")
except Exception as exc:
log.warning("Directus session check failed: %s", exc)
return None
def _ensure_provisioned(user_id: str) -> None:
if not HEIMDALL_ADMIN_TOKEN:
return
try:
httpx.post(
f"{HEIMDALL_URL}/admin/provision",
json={"directus_user_id": user_id, "product": "pagepiper", "tier": "free"},
headers={"Authorization": f"Bearer {HEIMDALL_ADMIN_TOKEN}"},
timeout=5.0,
)
except Exception as exc:
log.warning("Heimdall provision failed for user %s: %s", user_id, exc)
def _get_tier(user_id: str) -> str:
if not HEIMDALL_ADMIN_TOKEN:
return "free"
try:
resp = httpx.get(
f"{HEIMDALL_URL}/admin/cloud/resolve",
params={"directus_user_id": user_id, "product": "pagepiper"},
headers={"Authorization": f"Bearer {HEIMDALL_ADMIN_TOKEN}"},
timeout=5.0,
)
if resp.status_code == 200:
return resp.json().get("tier", "free")
except Exception as exc:
log.warning("Heimdall tier check failed for user %s: %s", user_id, exc)
return "free"
def resolve_authenticated_user(request: Request) -> str:
"""Validate the session cookie and return the Directus user_id. Raises 401 if invalid."""
cookie_header = request.headers.get("x-cf-session", "")
jwt = _extract_session_token(cookie_header)
if not jwt:
raise HTTPException(
status_code=401,
detail={
"error": "auth_required",
"message": "Sign in at circuitforge.tech to use Pagepiper cloud.",
},
)
user_id = _get_user_id(jwt)
if not user_id:
raise HTTPException(
status_code=401,
detail={
"error": "session_invalid",
"message": "Your session has expired. Sign in again at circuitforge.tech.",
},
)
_ensure_provisioned(user_id)
return user_id
def require_paid_tier(request: Request) -> str:
"""FastAPI dependency — 401 if no valid session, 402 if tier < paid. Returns user_id.
In local mode (CLOUD_MODE not set), returns LOCAL_USER_ID without any auth check.
"""
if not CLOUD_MODE:
from app.config import LOCAL_USER_ID
return LOCAL_USER_ID
user_id = resolve_authenticated_user(request)
tier = _get_tier(user_id)
if _TIER_ORDER.get(tier, 0) < _TIER_ORDER["paid"]:
raise HTTPException(
status_code=402,
detail={
"error": "upgrade_required",
"message": (
"RAG chat requires a Paid tier Pagepiper license. "
"Upgrade at circuitforge.tech/software/pagepiper."
),
},
)
return user_id

View file

@ -12,15 +12,35 @@ VEC_DB_PATH = str(DATA_DIR / "pagepiper_vecs.db")
WATCH_DIR = Path(os.environ.get("PAGEPIPER_WATCH_DIR", "books"))
VEC_DIMENSIONS = int(os.environ.get("PAGEPIPER_EMBED_DIMS", "1024"))
LOCAL_USER_ID = "__local__"
def user_data_dir(user_id: str) -> Path:
"""Return (and create) the per-user data directory under DATA_DIR/users/."""
d = DATA_DIR / "users" / user_id
d.mkdir(parents=True, exist_ok=True)
return d
def get_llm_config() -> dict | None:
"""Build LLMRouter config from env vars. Returns None if PAGEPIPER_OLLAMA_URL is unset."""
"""Build LLMRouter config from env vars.
Returns None only when neither PAGEPIPER_OLLAMA_URL nor CF_ORCH_URL is set.
CF_ORCH_URL alone is sufficient the coordinator resolves the service URL at
allocation time so PAGEPIPER_OLLAMA_URL becomes optional.
"""
url = os.environ.get("PAGEPIPER_OLLAMA_URL", "").strip()
if not url:
orch_url = os.environ.get("CF_ORCH_URL", "").strip()
if not url and not orch_url:
return None
chat_model = os.environ.get("PAGEPIPER_CHAT_MODEL", "mistral:7b")
_base_url = ""
if url:
_clean = url.rstrip("/")
_base_url = _clean if _clean.endswith("/v1") else _clean + "/v1"
chat_model = os.environ.get("PAGEPIPER_CHAT_MODEL", "mistral:7b")
backend: dict = {
"type": "openai_compat",
@ -30,12 +50,9 @@ def get_llm_config() -> dict | None:
"supports_images": False,
}
# Wire cf-orch allocation when coordinator is configured so the model stays warm
# and cold-start latency doesn't cause chat timeouts.
orch_url = os.environ.get("CF_ORCH_URL", "").strip()
if orch_url:
backend["cf_orch"] = {
"service": "ollama",
"service": os.environ.get("PAGEPIPER_ORCH_SERVICE", "ollama"),
"model_candidates": [chat_model],
"ttl_s": 3600,
}

View file

@ -3,13 +3,75 @@
from __future__ import annotations
import sqlite3
from dataclasses import dataclass
from pathlib import Path
from typing import Generator
from app.config import DB_PATH
from fastapi import Depends, Request
from app.config import DATA_DIR, LOCAL_USER_ID
from app.services.bm25_index import BM25Index
def get_db() -> Generator[sqlite3.Connection, None, None]:
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
@dataclass
class UserCtx:
"""Per-request context routing DB paths and BM25 to the right user."""
user_id: str
db_path: str
vec_db_path: str
data_dir: Path
watch_dir: Path
bm25: BM25Index
_user_startup_done: set[str] = set()
def _run_user_startup(user_id: str, user_dir: Path) -> None:
"""Run migrations and vec schema check once per process lifetime per user."""
if user_id in _user_startup_done:
return
_user_startup_done.add(user_id)
from app.config import VEC_DIMENSIONS
from app.startup import apply_migrations, check_and_rebuild_vec_schema
apply_migrations(str(user_dir / "pagepiper.db"))
check_and_rebuild_vec_schema(
str(user_dir / "pagepiper_vecs.db"), VEC_DIMENSIONS, str(user_dir / "pagepiper.db")
)
def get_user_ctx(request: Request) -> UserCtx:
"""Resolve the per-user data directory, DB paths, and BM25 instance for this request."""
import app.main as _main
from app.cloud_session import CLOUD_MODE
if CLOUD_MODE:
from app.cloud_session import resolve_authenticated_user
from app.config import user_data_dir
user_id = resolve_authenticated_user(request)
user_dir = user_data_dir(user_id)
_run_user_startup(user_id, user_dir)
watch_dir = user_dir / "books"
watch_dir.mkdir(parents=True, exist_ok=True)
else:
from app.config import WATCH_DIR
user_id = LOCAL_USER_ID
user_dir = DATA_DIR
watch_dir = WATCH_DIR
return UserCtx(
user_id=user_id,
db_path=str(user_dir / "pagepiper.db"),
vec_db_path=str(user_dir / "pagepiper_vecs.db"),
data_dir=user_dir,
watch_dir=watch_dir,
bm25=_main._get_bm25_for(user_id),
)
def get_db(ctx: UserCtx = Depends(get_user_ctx)) -> Generator[sqlite3.Connection, None, None]:
conn = sqlite3.connect(ctx.db_path, check_same_thread=False)
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA journal_mode = WAL")
conn.row_factory = sqlite3.Row

View file

@ -4,9 +4,6 @@ from __future__ import annotations
import logging
import os
import re
import sqlite3
import threading
from contextlib import asynccontextmanager
from fastapi import FastAPI
@ -16,110 +13,40 @@ from app.services.bm25_index import BM25Index
logger = logging.getLogger("pagepiper")
# Module-level BM25 singleton — shared across all requests
_bm25 = BM25Index()
# Per-user BM25 registry — keyed by user_id; "__local__" for single-user mode
_bm25_map: dict[str, BM25Index] = {}
def _apply_migrations() -> None:
from scripts.db_migrate import migrate
migrate(DB_PATH)
def _reembed_docs(docs: list[tuple[str, str]], db_path: str, vec_db_path: str) -> None:
"""Re-run full ingest for a list of (doc_id, file_path) sequentially."""
for doc_id, file_path in docs:
suffix = os.path.splitext(file_path)[1].lower()
try:
if suffix == ".epub":
from scripts.ingest_epub import run
else:
from scripts.ingest_pdf import run
logger.info("Auto re-embed: starting %s", os.path.basename(file_path))
run(doc_id=doc_id, file_path=file_path, db_path=db_path, vec_db_path=vec_db_path)
except Exception as exc:
logger.error("Auto re-embed failed for doc %s: %s", doc_id[:8], exc)
def _check_vec_schema(vec_db_path: str, expected_dims: int, db_path: str) -> None:
"""Drop the vec DB if its stored dimension doesn't match config, then queue re-embed.
sqlite-vec bakes the embedding dimension into the virtual table DDL, so changing
models requires dropping and recreating the whole file. Catches the mismatch at
startup rather than surfacing it as an obscure OperationalError mid-request.
"""
if not os.path.exists(vec_db_path):
return
try:
conn = sqlite3.connect(vec_db_path)
row = conn.execute(
"SELECT sql FROM sqlite_master WHERE name='page_vecs_vecs'"
).fetchone()
conn.close()
except Exception as exc:
logger.warning("Vec schema check could not read %s (non-fatal): %s", vec_db_path, exc)
return
if not row:
return # table not yet created — first embed will build it with the right dims
m = re.search(r'float\[(\d+)\]', row[0])
if not m:
return
actual_dims = int(m.group(1))
if actual_dims == expected_dims:
return
logger.warning(
"Vec DB dimension mismatch: stored=%d, configured=%d — dropping %s and queuing re-embed",
actual_dims, expected_dims, vec_db_path,
)
try:
os.remove(vec_db_path)
except OSError as exc:
logger.error(
"Could not delete stale vec DB %s: %s — fix permissions and restart", vec_db_path, exc
)
return
# Collect all ready docs so we can rebuild their embeddings in the background.
try:
conn = sqlite3.connect(db_path)
docs = conn.execute(
"SELECT id, file_path FROM documents WHERE status='ready'"
).fetchall()
conn.close()
except Exception as exc:
logger.warning("Could not query documents for re-embed: %s", exc)
return
if not docs:
return
logger.info("Queuing re-embed for %d document(s) in background", len(docs))
threading.Thread(
target=_reembed_docs,
args=(docs, db_path, vec_db_path),
daemon=True,
name="pagepiper-reembed",
).start()
def _get_bm25_for(user_id: str) -> BM25Index:
if user_id not in _bm25_map:
_bm25_map[user_id] = BM25Index()
return _bm25_map[user_id]
@asynccontextmanager
async def lifespan(app: FastAPI):
_apply_migrations()
from app.cloud_session import CLOUD_MODE
from app.config import LOCAL_USER_ID
from app.startup import apply_migrations, check_and_rebuild_vec_schema
embed_model = os.environ.get("PAGEPIPER_EMBED_MODEL", "nomic-embed-text")
logger.info("Pagepiper starting — embed model: %s, dims: %d", embed_model, VEC_DIMENSIONS)
_check_vec_schema(VEC_DB_PATH, VEC_DIMENSIONS, DB_PATH)
_bm25.mark_dirty() # will rebuild on first search
if CLOUD_MODE:
from app.startup import warn_if_unencrypted
from app.config import DATA_DIR
warn_if_unencrypted(str(DATA_DIR))
else:
# In cloud mode, per-user migration and vec schema check run on first request (deps.py).
apply_migrations(DB_PATH)
check_and_rebuild_vec_schema(VEC_DB_PATH, VEC_DIMENSIONS, DB_PATH)
_get_bm25_for(LOCAL_USER_ID).mark_dirty()
yield
app = FastAPI(title="Pagepiper", lifespan=lifespan)
# Wire BM25 dirty callback into library router
from app.api import library as _lib_module # noqa: E402
_lib_module._mark_bm25_dirty = _bm25.mark_dirty
# Register routers
from app.api.library import router as library_router # noqa: E402
from app.api.ingest import router as ingest_router # noqa: E402

View file

@ -170,7 +170,30 @@ class Retriever:
vec = (1.0 / (1.0 + r.vector_score)) if r.vector_score is not None else 0.0
return bm25 * 0.5 + vec * 0.5
ranked = sorted(merged.values(), key=_combined, reverse=True)[:top_k]
all_ranked = sorted(merged.values(), key=_combined, reverse=True)
# Discard results where the best match is pure noise (neither BM25 term
# overlap nor vector similarity exceeded the minimum signal threshold).
# This lets the caller's empty-result guard fire instead of sending
# low-confidence chunks to the LLM where it fills gaps with training data.
MIN_SIGNAL = 0.01
if all_ranked and _combined(all_ranked[0]) < MIN_SIGNAL:
return []
# Cap per-document contribution to max_per_doc of top_k so that one book
# does not crowd out all slots when the query matches it heavily by name
# alone (e.g. a character name that appears in every chapter).
max_per_doc = max(2, top_k // 3)
ranked: list[RetrievedChunk] = []
doc_counts: dict[str, int] = {}
for r in all_ranked:
if len(ranked) >= top_k:
break
count = doc_counts.get(r.doc_id, 0)
if count < max_per_doc:
ranked.append(r)
doc_counts[r.doc_id] = count + 1
adjacent = _fetch_adjacent(ranked, db_path)
return ranked + adjacent
@ -189,5 +212,8 @@ class Retriever:
)
for r in self._bm25.query(query, top_k=top_k, doc_ids=doc_ids)
]
MIN_SIGNAL = 0.01
if hits and hits[0].bm25_score < MIN_SIGNAL:
return []
adjacent = _fetch_adjacent(hits, db_path)
return hits + adjacent

View file

@ -11,12 +11,52 @@ from dataclasses import dataclass
from app.services.retriever import RetrievedChunk
_SYSTEM_PROMPT = (
"You are a helpful document assistant. "
"Answer the user's question using ONLY the provided document excerpts. "
"For each claim, cite the source page as [p.N]. "
"If the excerpts are insufficient, say so. Do not invent information."
"You are a strict document retrieval assistant. "
"Your sole job is to extract and present information from the document excerpts given to you. "
"You have no memory of books, stories, or authors. "
"If the excerpts do not contain the answer, say so and stop. Never guess."
)
_NO_RESULTS_ANSWER = (
"I could not find any relevant passages in the indexed documents for that question. "
"Try rephrasing, or check that the relevant document has been ingested."
)
# Phrases the model uses when it escapes the provided context and pulls from
# training data. Any response containing one of these is replaced with the
# canned no-answer message.
_ESCAPE_PHRASES = [
"in the series",
"in the novel",
"in the book",
"in the context of the series",
"it can be assumed",
"based on my knowledge",
"based on the broader",
"the broader story",
"by terry goodkind",
"sword of truth",
"legend of the seeker",
"throughout the series",
"throughout the novel",
"throughout the book",
]
def _strip_escape(response: str) -> str:
"""Replace responses that leaked outside the provided context with the canned message.
Detects the 'helpful override' pattern where the model acknowledges the
excerpts lack the answer but supplements from training data anyway.
"""
lower = response.lower()
if any(phrase in lower for phrase in _ESCAPE_PHRASES):
return (
"I could not find an answer to that question in the indexed documents. "
"The answer may be in a document that has not been ingested yet."
)
return response
@dataclass(frozen=True)
class Citation:
@ -42,13 +82,30 @@ class Synthesizer:
history: list[dict],
chunks: list[RetrievedChunk],
) -> SynthesisResult:
if not chunks:
return SynthesisResult(answer=_NO_RESULTS_ANSWER, citations=())
# 1500 chars (~300 words) per chunk: enough to capture definitions that
# appear mid-paragraph without blowing past a 32k-context model's limit.
context_parts = [f"[p.{c.page_number}]\n{c.text[:1500]}" for c in chunks]
context = "\n\n---\n\n".join(context_parts)
prompt = f"Document excerpts:\n\n{context}\n\nQuestion: {message}"
# Quote-first structure: the model must commit to a grounding passage
# before generating an answer. Forces an explicit "NOT FOUND" admission
# when the excerpt doesn't contain the answer, rather than the "the excerpt
# doesn't say... however, in the series..." escape pattern.
prompt = (
f"Excerpts from the indexed documents:\n\n{context}\n\n"
f"---\n\n"
f"Question: {message}\n\n"
f"Step 1 — Find the relevant passage: Quote the exact sentence(s) from "
f"the excerpts above that answer the question, or write NOT FOUND.\n\n"
f"Step 2 — Answer: Based solely on what you quoted in Step 1, answer "
f"the question with page citations [p.N]. If Step 1 is NOT FOUND, "
f"write: \"I could not find an answer to that question in the indexed documents.\""
)
answer = self._llm.complete(prompt, system=_SYSTEM_PROMPT)
answer = _strip_escape(answer)
citations = tuple(
Citation(

137
app/startup.py Normal file
View file

@ -0,0 +1,137 @@
# app/startup.py
"""DB migration and vec schema check utilities — called at startup and on first user request."""
from __future__ import annotations
import logging
import os
import re
import sqlite3
import subprocess
import threading
logger = logging.getLogger("pagepiper")
def warn_if_unencrypted(data_dir: str) -> None:
"""Log a warning if cloud mode is running without fscrypt encryption.
Checks whether the users/ subdirectory of data_dir is fscrypt-encrypted.
Non-fatal: warns but does not block startup.
"""
users_dir = os.path.join(data_dir, "users")
os.makedirs(users_dir, exist_ok=True)
if not _fscrypt_available():
logger.warning(
"SECURITY: fscrypt not found on this system. Cloud user data at %s is stored "
"unencrypted. Install fscrypt and run scripts/setup_cloud_fscrypt.sh to enable "
"encryption at rest.",
users_dir,
)
return
try:
result = subprocess.run(
["fscrypt", "status", users_dir],
capture_output=True, text=True, timeout=5,
)
if "Encrypted" not in result.stdout:
logger.warning(
"SECURITY: user data directory %s is not fscrypt-encrypted. "
"Run: sudo scripts/setup_cloud_fscrypt.sh <user_id>",
users_dir,
)
except Exception as exc:
logger.debug("fscrypt status check failed (non-fatal): %s", exc)
def _fscrypt_available() -> bool:
try:
subprocess.run(["fscrypt", "--version"], capture_output=True, timeout=2)
return True
except (FileNotFoundError, subprocess.TimeoutExpired):
return False
def apply_migrations(db_path: str) -> None:
from scripts.db_migrate import migrate
migrate(db_path)
def reembed_docs(docs: list[tuple[str, str]], db_path: str, vec_db_path: str) -> None:
for doc_id, file_path in docs:
suffix = os.path.splitext(file_path)[1].lower()
try:
if suffix == ".epub":
from scripts.ingest_epub import run
elif suffix == ".docx":
from scripts.ingest_docx import run
else:
from scripts.ingest_pdf import run
logger.info("Auto re-embed: starting %s", os.path.basename(file_path))
run(doc_id=doc_id, file_path=file_path, db_path=db_path, vec_db_path=vec_db_path)
except Exception as exc:
logger.error("Auto re-embed failed for doc %s: %s", doc_id[:8], exc)
def check_and_rebuild_vec_schema(vec_db_path: str, expected_dims: int, db_path: str) -> None:
"""Drop the vec DB if its stored dimension doesn't match config, then queue re-embed.
sqlite-vec bakes the embedding dimension into the virtual table DDL, so changing
models requires dropping and recreating the whole file. Catches the mismatch at
startup rather than surfacing it as an obscure OperationalError mid-request.
"""
if not os.path.exists(vec_db_path):
return
try:
conn = sqlite3.connect(vec_db_path)
row = conn.execute(
"SELECT sql FROM sqlite_master WHERE name='page_vecs_vecs'"
).fetchone()
conn.close()
except Exception as exc:
logger.warning("Vec schema check could not read %s (non-fatal): %s", vec_db_path, exc)
return
if not row:
return
m = re.search(r'float\[(\d+)\]', row[0])
if not m:
return
actual_dims = int(m.group(1))
if actual_dims == expected_dims:
return
logger.warning(
"Vec DB dimension mismatch: stored=%d, configured=%d — dropping %s and queuing re-embed",
actual_dims, expected_dims, vec_db_path,
)
try:
os.remove(vec_db_path)
except OSError as exc:
logger.error(
"Could not delete stale vec DB %s: %s — fix permissions and restart", vec_db_path, exc
)
return
try:
conn = sqlite3.connect(db_path)
docs = conn.execute(
"SELECT id, file_path FROM documents WHERE status='ready'"
).fetchall()
conn.close()
except Exception as exc:
logger.warning("Could not query documents for re-embed: %s", exc)
return
if not docs:
return
logger.info("Queuing re-embed for %d document(s) in background", len(docs))
threading.Thread(
target=reembed_docs,
args=(docs, db_path, vec_db_path),
daemon=True,
name="pagepiper-reembed",
).start()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 26 KiB

After

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 25 KiB

After

Width:  |  Height:  |  Size: 128 KiB

View file

@ -12,8 +12,8 @@ OVERRIDE_ARGS=()
[[ -f "compose.override.yml" ]] && OVERRIDE_ARGS=(-f compose.override.yml)
usage() {
echo "Usage: $0 {start|stop|restart|status|logs [svc]|open|build|test"
echo " |cloud:start|cloud:stop|cloud:restart|cloud:status|cloud:logs [svc]|cloud:build}"
echo "Usage: $0 {start|stop|restart|status|logs [svc]|open|build|test|update"
echo " |cloud-start|cloud-stop|cloud-restart|cloud-status|cloud-logs [svc]|cloud-build|cloud-update}"
exit 1
}
@ -48,27 +48,39 @@ case "$cmd" in
test)
conda run -n cf pytest tests/ -v
;;
cloud:start)
update)
git pull
docker compose -f "$COMPOSE_FILE" "${OVERRIDE_ARGS[@]}" down
docker compose -f "$COMPOSE_FILE" "${OVERRIDE_ARGS[@]}" up -d --build
echo "Pagepiper updated and running → http://localhost:${WEB_PORT}"
;;
cloud-start)
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" up -d --build
echo "Pagepiper cloud running → http://localhost:${CLOUD_WEB_PORT}"
;;
cloud:stop)
cloud-stop)
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" down
;;
cloud:restart)
cloud-restart)
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" down
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" up -d --build
echo "Pagepiper cloud running → http://localhost:${CLOUD_WEB_PORT}"
;;
cloud:status)
cloud-status)
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" ps
;;
cloud:logs)
cloud-logs)
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" logs -f "${1:-}"
;;
cloud:build)
cloud-build)
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" build --no-cache
;;
cloud-update)
git pull
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" down
docker compose -f "$COMPOSE_CLOUD_FILE" -p "$CLOUD_PROJECT" up -d --build
echo "Pagepiper cloud updated and running → http://localhost:${CLOUD_WEB_PORT}"
;;
*)
usage
;;

120
scripts/setup_cloud_fscrypt.sh Executable file
View file

@ -0,0 +1,120 @@
#!/usr/bin/env bash
# setup_cloud_fscrypt.sh — encrypt a cloud user's data directory with fscrypt.
#
# Run as root on the HOST (not inside the container) before first deployment.
# Requires: fscrypt >= 0.3, Linux kernel >= 4.1, ext4/f2fs filesystem.
#
# Usage:
# sudo ./scripts/setup_cloud_fscrypt.sh <user_id>
# sudo ./scripts/setup_cloud_fscrypt.sh --list # show all encrypted dirs
# sudo ./scripts/setup_cloud_fscrypt.sh --status <user_id>
#
# Environment:
# PAGEPIPER_DATA_DIR — base data directory (default: /devl/pagepiper-cloud-data)
#
# Key management:
# Keys are stored in the system protector backed by a passphrase or root keyring.
# For unattended unlock on server boot, use a raw_key protector derived from a
# secret in HashiCorp Vault or similar; see docs/encryption.md for details.
set -euo pipefail
DATA_DIR="${PAGEPIPER_DATA_DIR:-/devl/pagepiper-cloud-data}"
USERS_DIR="$DATA_DIR/users"
_usage() {
grep '^# ' "$0" | cut -c3-
exit 1
}
_require_root() {
if [[ "$EUID" -ne 0 ]]; then
echo "ERROR: this script must be run as root" >&2
exit 1
fi
}
_require_fscrypt() {
if ! command -v fscrypt &>/dev/null; then
echo "ERROR: fscrypt not found. Install with: apt-get install fscrypt" >&2
exit 1
fi
}
_check_fscrypt_setup() {
local mnt
mnt=$(df -P "$DATA_DIR" | tail -1 | awk '{print $6}')
if ! fscrypt status "$mnt" &>/dev/null; then
echo "Initialising fscrypt on $mnt..."
fscrypt setup --quiet "$mnt"
echo "fscrypt setup complete on $mnt"
fi
}
cmd="${1:-}"
case "$cmd" in
--list)
_require_root
_require_fscrypt
echo "Encrypted user directories under $USERS_DIR:"
find "$USERS_DIR" -maxdepth 1 -mindepth 1 -type d 2>/dev/null | while read -r dir; do
if fscrypt status "$dir" 2>/dev/null | grep -q "Encrypted"; then
echo " [encrypted] $dir"
else
echo " [plain] $dir"
fi
done
;;
--status)
_require_root
_require_fscrypt
user_id="${2:-}"
[[ -z "$user_id" ]] && { echo "Usage: $0 --status <user_id>" >&2; exit 1; }
user_dir="$USERS_DIR/$user_id"
if [[ ! -d "$user_dir" ]]; then
echo "Directory $user_dir does not exist"
exit 1
fi
fscrypt status "$user_dir"
;;
"")
_usage
;;
-*)
_usage
;;
*)
# Encrypt a user's directory
user_id="$1"
_require_root
_require_fscrypt
user_dir="$USERS_DIR/$user_id"
if [[ ! -d "$user_dir" ]]; then
echo "Creating user directory: $user_dir"
mkdir -p "$user_dir"
fi
if fscrypt status "$user_dir" 2>/dev/null | grep -q "Encrypted"; then
echo "Directory $user_dir is already encrypted."
exit 0
fi
# Warn if directory contains existing data — fscrypt encrypt migrates in place
if [[ -n "$(ls -A "$user_dir")" ]]; then
echo "WARNING: $user_dir is non-empty. fscrypt will encrypt files in place."
echo "Ensure the container is stopped and you have a backup before continuing."
read -rp "Continue? [y/N] " confirm
[[ "$confirm" =~ ^[Yy]$ ]] || exit 1
fi
_check_fscrypt_setup
echo "Encrypting $user_dir..."
fscrypt encrypt "$user_dir" --source=pam_passphrase --quiet
echo "Encryption set up for user $user_id. Directory: $user_dir"
echo ""
echo "IMPORTANT: unlock the directory before starting the container:"
echo " fscrypt unlock $user_dir"
;;
esac

View file

@ -27,13 +27,32 @@ def client(test_db, tmp_path, monkeypatch):
(tmp_path / "books").mkdir(exist_ok=True)
import app.main as _main_module
from app.main import app, _bm25
from app.deps import get_db
from app.config import LOCAL_USER_ID
from app.deps import UserCtx, get_db, get_user_ctx
from app.main import app
from app.services.bm25_index import BM25Index
from app.startup import apply_migrations, check_and_rebuild_vec_schema
# Suppress startup side effects — test_db fixture already applies the schema,
# and vec schema validation is tested separately in test_startup.py
monkeypatch.setattr(_main_module, "_apply_migrations", lambda: None)
monkeypatch.setattr(_main_module, "_check_vec_schema", lambda *a, **kw: None)
monkeypatch.setattr(_main_module, "_apply_migrations", lambda: None, raising=False)
monkeypatch.setattr(
"app.startup.apply_migrations", lambda *a, **kw: None
)
monkeypatch.setattr(
"app.startup.check_and_rebuild_vec_schema", lambda *a, **kw: None
)
test_bm25 = BM25Index()
test_bm25.mark_dirty()
def override_user_ctx():
return UserCtx(
user_id=LOCAL_USER_ID,
db_path=test_db,
vec_db_path=str(tmp_path / "test_vecs.db"),
data_dir=Path(tmp_path),
watch_dir=Path(tmp_path) / "books",
bm25=test_bm25,
)
def override_db():
conn = sqlite3.connect(test_db)
@ -44,7 +63,7 @@ def client(test_db, tmp_path, monkeypatch):
finally:
conn.close()
app.dependency_overrides[get_user_ctx] = override_user_ctx
app.dependency_overrides[get_db] = override_db
_bm25.mark_dirty() # clear any state from previous tests
yield TestClient(app)
app.dependency_overrides.clear()

View file

@ -66,3 +66,40 @@ def test_reingest_updates_status_to_processing(client, test_db, tmp_path):
# Document should be in processing state (or beyond if stub ingest ran instantly)
status_resp = client.get(f"/api/library/{doc_id}/status")
assert status_resp.json()["status"] in ("processing", "error", "ready")
def _add_chunks(db_path: str, doc_id: str, count: int) -> None:
conn = sqlite3.connect(db_path)
for i in range(count):
conn.execute(
"INSERT INTO page_chunks(doc_id, page_number, text, source, word_count) VALUES (?,?,?,?,?)",
[doc_id, i + 1, f"Page {i + 1} text content.", "text_layer", 4],
)
conn.commit()
conn.close()
def test_sample_chunks_empty_returns_empty(client):
resp = client.get("/api/library/sample-chunks")
assert resp.status_code == 200
assert resp.json() == []
def test_sample_chunks_returns_fields(client, test_db):
doc_id = _add_doc(test_db, "Monster Manual", "/books/mm.pdf")
_add_chunks(test_db, doc_id, 5)
resp = client.get("/api/library/sample-chunks?limit=3")
assert resp.status_code == 200
chunks = resp.json()
assert len(chunks) == 3
for c in chunks:
assert {"chunk_id", "doc_id", "page_number", "text"} == set(c.keys())
assert c["doc_id"] == doc_id
def test_sample_chunks_limit_capped_at_200(client, test_db):
doc_id = _add_doc(test_db, "Big Book", "/books/big.pdf")
_add_chunks(test_db, doc_id, 10)
resp = client.get("/api/library/sample-chunks?limit=9999")
assert resp.status_code == 200
assert len(resp.json()) <= 200

View file

@ -20,9 +20,7 @@ def _add_chunks(db_path: str, doc_id: str, chunks: list[dict]) -> None:
conn.close()
def test_search_returns_results(client, test_db, monkeypatch):
import app.api.search as _search_mod
monkeypatch.setattr(_search_mod, "_get_db_path", lambda: test_db)
def test_search_returns_results(client, test_db):
# BM25Okapi IDF is 0 when df == N/2 (e.g. 2 docs, 1 match → log(1.0) = 0).
# Add a 3rd unrelated chunk so relevant terms score above zero.
_add_chunks(test_db, "book-a", [
@ -46,9 +44,7 @@ def test_search_empty_index_returns_empty(client):
assert resp.json() == []
def test_search_filters_by_doc_ids(client, test_db, monkeypatch):
import app.api.search as _search_mod
monkeypatch.setattr(_search_mod, "_get_db_path", lambda: test_db)
def test_search_filters_by_doc_ids(client, test_db):
# Three chunks so BM25Okapi IDF is non-zero for terms appearing in one doc.
_add_chunks(test_db, "book-a", [
{"page_number": 1, "text": "Grapple rules for melee attacks."},

View file

@ -9,7 +9,8 @@ from unittest.mock import MagicMock, patch
import pytest
from app.main import _check_vec_schema, _reembed_docs
from app.startup import check_and_rebuild_vec_schema as _check_vec_schema
from app.startup import reembed_docs as _reembed_docs
def _make_vec_db(path: str, dims: int) -> None: