feat(api): add library CRUD endpoints and FastAPI factory

Implements GET/DELETE /api/library, POST /api/library/{id}/reingest,
POST /api/library/scan, and GET /api/library/{id}/status. Adds FastAPI
app factory with lifespan migrations, BM25 singleton wiring, get_db
dependency, ingest task registry with cf-orch/BackgroundTasks fallback,
and placeholder search/chat routers. All 5 new tests pass (14 total).
This commit is contained in:
pyr0ball 2026-05-04 17:24:50 -07:00
parent 47914cebeb
commit 4c2370f1de
9 changed files with 381 additions and 0 deletions

5
app/api/chat.py Normal file
View file

@ -0,0 +1,5 @@
# app/api/chat.py
"""RAG chat API — streaming LLM responses with page citations (Task 6)."""
from fastapi import APIRouter
router = APIRouter(prefix="/api/chat", tags=["chat"])

24
app/api/ingest.py Normal file
View file

@ -0,0 +1,24 @@
# app/api/ingest.py
"""Ingest job status polling (proxies cf-orch or checks in-memory registry)."""
from __future__ import annotations
from fastapi import APIRouter, HTTPException
router = APIRouter(prefix="/api/ingest", tags=["ingest"])
# Populated by _run_ingest_background when cf-orch is unavailable
_task_registry: dict[str, dict] = {}
@router.get("/{task_id}")
def get_task_status(task_id: str) -> dict:
# Check in-memory registry first (BackgroundTasks fallback)
if task_id in _task_registry:
return _task_registry[task_id]
# Try cf-orch
try:
from circuitforge_core.tasks import get_task_status as orch_status # type: ignore[import]
return orch_status(task_id)
except Exception:
raise HTTPException(status_code=404, detail="Task not found")

173
app/api/library.py Normal file
View file

@ -0,0 +1,173 @@
# app/api/library.py
"""
Document library management API.
All endpoints in this module are MIT no tier gate.
"""
from __future__ import annotations
import logging
import sqlite3
import uuid
from pathlib import Path
from typing import Callable
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
from app.config import WATCH_DIR, DB_PATH, VEC_DB_PATH
from app.deps import get_db
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/library", tags=["library"])
# Injected by main.py after _bm25 is created
_mark_bm25_dirty: Callable[[], None] | None = None
def _dispatch_ingest(
doc_id: str,
file_path: str,
background_tasks: BackgroundTasks,
) -> str:
"""Dispatch an ingest task. Tries cf-orch; falls back to BackgroundTasks."""
task_id = str(uuid.uuid4())
args = {
"doc_id": doc_id,
"file_path": file_path,
"db_path": DB_PATH,
"vec_db_path": VEC_DB_PATH,
}
try:
from circuitforge_core.tasks import dispatch_task # type: ignore[import]
task_id = dispatch_task(caller="pagepiper/ingest_pdf", args=args)
logger.info("Dispatched cf-orch ingest task %s for doc %s", task_id, doc_id)
except Exception:
from scripts.ingest_pdf import run as run_ingest
background_tasks.add_task(_run_ingest_background, run_ingest, args, task_id)
logger.info(
"cf-orch unavailable — running ingest in background thread (task %s)", task_id
)
return task_id
def _run_ingest_background(run_fn, args: dict, task_id: str) -> None:
from app.api.ingest import _task_registry
_task_registry[task_id] = {"status": "running", "progress": 0}
try:
run_fn(**args)
_task_registry[task_id] = {"status": "complete", "progress": 100}
if _mark_bm25_dirty:
_mark_bm25_dirty()
except Exception as exc:
_task_registry[task_id] = {"status": "error", "error": str(exc)}
@router.get("")
def list_library(db: sqlite3.Connection = Depends(get_db)) -> list[dict]:
rows = db.execute(
"SELECT id, title, file_path, status, task_id, page_count, created_at"
" FROM documents ORDER BY created_at DESC"
).fetchall()
return [dict(r) for r in rows]
@router.post("/scan", status_code=202)
def scan_library(
background_tasks: BackgroundTasks,
db: sqlite3.Connection = Depends(get_db),
) -> dict:
"""Scan the watched directory and queue ingest for any new PDFs."""
watch = WATCH_DIR
if not watch.exists():
raise HTTPException(status_code=404, detail=f"Watch directory not found: {watch}")
pdfs = list(watch.glob("**/*.pdf"))
queued = []
for pdf_path in pdfs:
path_str = str(pdf_path.resolve())
existing = db.execute(
"SELECT id, status FROM documents WHERE file_path = ?", [path_str]
).fetchone()
if existing and existing["status"] == "ready":
continue # already indexed
if existing:
doc_id = existing["id"]
else:
title = pdf_path.stem.replace("_", " ").replace("-", " ").title()
doc_id = db.execute(
"INSERT INTO documents(title, file_path, status) VALUES (?,?,?) RETURNING id",
[title, path_str, "pending"],
).fetchone()[0]
db.commit()
task_id = _dispatch_ingest(doc_id, path_str, background_tasks)
db.execute(
"UPDATE documents SET status='processing', task_id=? WHERE id=?",
[task_id, doc_id],
)
db.commit()
queued.append({"doc_id": doc_id, "task_id": task_id})
return {"discovered": len(pdfs), "queued": len(queued), "tasks": queued}
@router.post("/{doc_id}/reingest", status_code=202)
def reingest_document(
doc_id: str,
background_tasks: BackgroundTasks,
db: sqlite3.Connection = Depends(get_db),
) -> dict:
row = db.execute("SELECT file_path FROM documents WHERE id=?", [doc_id]).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Document not found")
task_id = _dispatch_ingest(doc_id, row["file_path"], background_tasks)
db.execute(
"UPDATE documents SET status='processing', task_id=?, error_msg=NULL WHERE id=?",
[task_id, doc_id],
)
db.commit()
return {"doc_id": doc_id, "task_id": task_id}
@router.delete("/{doc_id}", status_code=204)
def delete_document(
doc_id: str,
db: sqlite3.Connection = Depends(get_db),
) -> None:
row = db.execute("SELECT id FROM documents WHERE id=?", [doc_id]).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Document not found")
db.execute("DELETE FROM documents WHERE id=?", [doc_id])
db.commit()
# Remove embeddings from vector store
try:
from circuitforge_core.vector.sqlite_vec import LocalSQLiteVecStore # type: ignore[import]
store = LocalSQLiteVecStore(db_path=VEC_DB_PATH, table="page_vecs", dimensions=768)
store.delete_where({"doc_id": doc_id})
except Exception as exc:
logger.warning("Could not remove vectors for doc %s: %s", doc_id, exc)
if _mark_bm25_dirty:
_mark_bm25_dirty()
@router.get("/{doc_id}/status")
def document_status(
doc_id: str,
db: sqlite3.Connection = Depends(get_db),
) -> dict:
row = db.execute(
"SELECT id, status, task_id, page_count, error_msg FROM documents WHERE id=?",
[doc_id],
).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Document not found")
return dict(row)

5
app/api/search.py Normal file
View file

@ -0,0 +1,5 @@
# app/api/search.py
"""Search API — BM25 keyword and RAG vector search (Task 5)."""
from fastapi import APIRouter
router = APIRouter(prefix="/api/search", tags=["search"])

19
app/deps.py Normal file
View file

@ -0,0 +1,19 @@
# app/deps.py
"""FastAPI dependency providers."""
from __future__ import annotations
import sqlite3
from typing import Generator
from app.config import DB_PATH
def get_db() -> Generator[sqlite3.Connection, None, None]:
conn = sqlite3.connect(DB_PATH)
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA journal_mode = WAL")
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()

46
app/main.py Normal file
View file

@ -0,0 +1,46 @@
# app/main.py
"""FastAPI application factory for pagepiper."""
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.config import DB_PATH
from app.services.bm25_index import BM25Index
logger = logging.getLogger("pagepiper")
# Module-level BM25 singleton — shared across all requests
_bm25 = BM25Index()
def _apply_migrations() -> None:
from scripts.db_migrate import migrate
migrate(DB_PATH)
@asynccontextmanager
async def lifespan(app: FastAPI):
_apply_migrations()
_bm25.mark_dirty() # will rebuild on first search
yield
app = FastAPI(title="Pagepiper", lifespan=lifespan)
# Wire BM25 dirty callback into library router
from app.api import library as _lib_module # noqa: E402
_lib_module._mark_bm25_dirty = _bm25.mark_dirty
# Register routers
from app.api.library import router as library_router # noqa: E402
from app.api.ingest import router as ingest_router # noqa: E402
from app.api.search import router as search_router # noqa: E402
from app.api.chat import router as chat_router # noqa: E402
app.include_router(library_router)
app.include_router(ingest_router)
app.include_router(search_router)
app.include_router(chat_router)

11
scripts/ingest_pdf.py Normal file
View file

@ -0,0 +1,11 @@
# scripts/ingest_pdf.py
"""Ingest script stub — full implementation in T5."""
from __future__ import annotations
import logging
logger = logging.getLogger("pagepiper.ingest")
def run(doc_id: str, file_path: str, db_path: str, vec_db_path: str) -> None:
logger.info("Stub ingest run for doc %s at %s", doc_id, file_path)

44
tests/conftest.py Normal file
View file

@ -0,0 +1,44 @@
# tests/conftest.py
"""Shared fixtures for pagepiper test suite."""
from __future__ import annotations
import sqlite3
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def test_db(tmp_path) -> str:
db_path = str(tmp_path / "test.db")
schema = Path("migrations/001_initial_schema.sql").read_text()
conn = sqlite3.connect(db_path)
conn.executescript(schema)
conn.commit()
conn.close()
return db_path
@pytest.fixture
def client(test_db, tmp_path, monkeypatch):
monkeypatch.setenv("PAGEPIPER_DATA_DIR", str(tmp_path))
monkeypatch.setenv("PAGEPIPER_WATCH_DIR", str(tmp_path / "books"))
(tmp_path / "books").mkdir(exist_ok=True)
from app.main import app, _bm25
from app.deps import get_db
def override_db():
conn = sqlite3.connect(test_db)
conn.execute("PRAGMA foreign_keys = ON")
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
app.dependency_overrides[get_db] = override_db
_bm25.mark_dirty() # clear any state from previous tests
yield TestClient(app)
app.dependency_overrides.clear()

54
tests/test_library_api.py Normal file
View file

@ -0,0 +1,54 @@
# tests/test_library_api.py
"""Tests for GET/POST /api/library endpoints."""
from __future__ import annotations
import sqlite3
def _add_doc(db_path: str, title: str, path: str, status: str = "ready") -> str:
conn = sqlite3.connect(db_path)
doc_id = conn.execute(
"INSERT INTO documents(title, file_path, status) VALUES (?,?,?) RETURNING id",
[title, path, status],
).fetchone()[0]
conn.commit()
conn.close()
return doc_id
def test_list_library_empty(client):
resp = client.get("/api/library")
assert resp.status_code == 200
assert resp.json() == []
def test_list_library_returns_documents(client, test_db):
_add_doc(test_db, "Player's Handbook", "/books/phb.pdf")
resp = client.get("/api/library")
assert resp.status_code == 200
docs = resp.json()
assert len(docs) == 1
assert docs[0]["title"] == "Player's Handbook"
assert "status" in docs[0]
def test_delete_document_removes_record(client, test_db):
doc_id = _add_doc(test_db, "Monster Manual", "/books/mm.pdf")
resp = client.delete(f"/api/library/{doc_id}")
assert resp.status_code == 204
resp2 = client.get("/api/library")
assert resp2.json() == []
def test_delete_nonexistent_returns_404(client):
resp = client.delete("/api/library/does-not-exist")
assert resp.status_code == 404
def test_reingest_returns_task_id(client, test_db, tmp_path):
pdf_path = str(tmp_path / "books" / "test.pdf")
open(pdf_path, "wb").write(b"%PDF-1.4")
doc_id = _add_doc(test_db, "Test Book", pdf_path)
resp = client.post(f"/api/library/{doc_id}/reingest")
assert resp.status_code == 202
assert "task_id" in resp.json()