From 4c2370f1dedeef95ac0ae0ae03e29428789f1237 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 4 May 2026 17:24:50 -0700 Subject: [PATCH] feat(api): add library CRUD endpoints and FastAPI factory Implements GET/DELETE /api/library, POST /api/library/{id}/reingest, POST /api/library/scan, and GET /api/library/{id}/status. Adds FastAPI app factory with lifespan migrations, BM25 singleton wiring, get_db dependency, ingest task registry with cf-orch/BackgroundTasks fallback, and placeholder search/chat routers. All 5 new tests pass (14 total). --- app/api/chat.py | 5 ++ app/api/ingest.py | 24 ++++++ app/api/library.py | 173 ++++++++++++++++++++++++++++++++++++++ app/api/search.py | 5 ++ app/deps.py | 19 +++++ app/main.py | 46 ++++++++++ scripts/ingest_pdf.py | 11 +++ tests/conftest.py | 44 ++++++++++ tests/test_library_api.py | 54 ++++++++++++ 9 files changed, 381 insertions(+) create mode 100644 app/api/chat.py create mode 100644 app/api/ingest.py create mode 100644 app/api/library.py create mode 100644 app/api/search.py create mode 100644 app/deps.py create mode 100644 app/main.py create mode 100644 scripts/ingest_pdf.py create mode 100644 tests/conftest.py create mode 100644 tests/test_library_api.py diff --git a/app/api/chat.py b/app/api/chat.py new file mode 100644 index 0000000..8260801 --- /dev/null +++ b/app/api/chat.py @@ -0,0 +1,5 @@ +# app/api/chat.py +"""RAG chat API — streaming LLM responses with page citations (Task 6).""" +from fastapi import APIRouter + +router = APIRouter(prefix="/api/chat", tags=["chat"]) diff --git a/app/api/ingest.py b/app/api/ingest.py new file mode 100644 index 0000000..318c948 --- /dev/null +++ b/app/api/ingest.py @@ -0,0 +1,24 @@ +# app/api/ingest.py +"""Ingest job status polling (proxies cf-orch or checks in-memory registry).""" +from __future__ import annotations + +from fastapi import APIRouter, HTTPException + +router = APIRouter(prefix="/api/ingest", tags=["ingest"]) + +# Populated by _run_ingest_background when cf-orch is unavailable +_task_registry: dict[str, dict] = {} + + +@router.get("/{task_id}") +def get_task_status(task_id: str) -> dict: + # Check in-memory registry first (BackgroundTasks fallback) + if task_id in _task_registry: + return _task_registry[task_id] + + # Try cf-orch + try: + from circuitforge_core.tasks import get_task_status as orch_status # type: ignore[import] + return orch_status(task_id) + except Exception: + raise HTTPException(status_code=404, detail="Task not found") diff --git a/app/api/library.py b/app/api/library.py new file mode 100644 index 0000000..2f10834 --- /dev/null +++ b/app/api/library.py @@ -0,0 +1,173 @@ +# app/api/library.py +""" +Document library management API. + +All endpoints in this module are MIT — no tier gate. +""" +from __future__ import annotations + +import logging +import sqlite3 +import uuid +from pathlib import Path +from typing import Callable + +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException + +from app.config import WATCH_DIR, DB_PATH, VEC_DB_PATH +from app.deps import get_db + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/library", tags=["library"]) + +# Injected by main.py after _bm25 is created +_mark_bm25_dirty: Callable[[], None] | None = None + + +def _dispatch_ingest( + doc_id: str, + file_path: str, + background_tasks: BackgroundTasks, +) -> str: + """Dispatch an ingest task. Tries cf-orch; falls back to BackgroundTasks.""" + task_id = str(uuid.uuid4()) + args = { + "doc_id": doc_id, + "file_path": file_path, + "db_path": DB_PATH, + "vec_db_path": VEC_DB_PATH, + } + + try: + from circuitforge_core.tasks import dispatch_task # type: ignore[import] + task_id = dispatch_task(caller="pagepiper/ingest_pdf", args=args) + logger.info("Dispatched cf-orch ingest task %s for doc %s", task_id, doc_id) + except Exception: + from scripts.ingest_pdf import run as run_ingest + background_tasks.add_task(_run_ingest_background, run_ingest, args, task_id) + logger.info( + "cf-orch unavailable — running ingest in background thread (task %s)", task_id + ) + + return task_id + + +def _run_ingest_background(run_fn, args: dict, task_id: str) -> None: + from app.api.ingest import _task_registry + _task_registry[task_id] = {"status": "running", "progress": 0} + try: + run_fn(**args) + _task_registry[task_id] = {"status": "complete", "progress": 100} + if _mark_bm25_dirty: + _mark_bm25_dirty() + except Exception as exc: + _task_registry[task_id] = {"status": "error", "error": str(exc)} + + +@router.get("") +def list_library(db: sqlite3.Connection = Depends(get_db)) -> list[dict]: + rows = db.execute( + "SELECT id, title, file_path, status, task_id, page_count, created_at" + " FROM documents ORDER BY created_at DESC" + ).fetchall() + return [dict(r) for r in rows] + + +@router.post("/scan", status_code=202) +def scan_library( + background_tasks: BackgroundTasks, + db: sqlite3.Connection = Depends(get_db), +) -> dict: + """Scan the watched directory and queue ingest for any new PDFs.""" + watch = WATCH_DIR + if not watch.exists(): + raise HTTPException(status_code=404, detail=f"Watch directory not found: {watch}") + + pdfs = list(watch.glob("**/*.pdf")) + queued = [] + + for pdf_path in pdfs: + path_str = str(pdf_path.resolve()) + existing = db.execute( + "SELECT id, status FROM documents WHERE file_path = ?", [path_str] + ).fetchone() + + if existing and existing["status"] == "ready": + continue # already indexed + + if existing: + doc_id = existing["id"] + else: + title = pdf_path.stem.replace("_", " ").replace("-", " ").title() + doc_id = db.execute( + "INSERT INTO documents(title, file_path, status) VALUES (?,?,?) RETURNING id", + [title, path_str, "pending"], + ).fetchone()[0] + db.commit() + + task_id = _dispatch_ingest(doc_id, path_str, background_tasks) + db.execute( + "UPDATE documents SET status='processing', task_id=? WHERE id=?", + [task_id, doc_id], + ) + db.commit() + queued.append({"doc_id": doc_id, "task_id": task_id}) + + return {"discovered": len(pdfs), "queued": len(queued), "tasks": queued} + + +@router.post("/{doc_id}/reingest", status_code=202) +def reingest_document( + doc_id: str, + background_tasks: BackgroundTasks, + db: sqlite3.Connection = Depends(get_db), +) -> dict: + row = db.execute("SELECT file_path FROM documents WHERE id=?", [doc_id]).fetchone() + if not row: + raise HTTPException(status_code=404, detail="Document not found") + + task_id = _dispatch_ingest(doc_id, row["file_path"], background_tasks) + db.execute( + "UPDATE documents SET status='processing', task_id=?, error_msg=NULL WHERE id=?", + [task_id, doc_id], + ) + db.commit() + return {"doc_id": doc_id, "task_id": task_id} + + +@router.delete("/{doc_id}", status_code=204) +def delete_document( + doc_id: str, + db: sqlite3.Connection = Depends(get_db), +) -> None: + row = db.execute("SELECT id FROM documents WHERE id=?", [doc_id]).fetchone() + if not row: + raise HTTPException(status_code=404, detail="Document not found") + + db.execute("DELETE FROM documents WHERE id=?", [doc_id]) + db.commit() + + # Remove embeddings from vector store + try: + from circuitforge_core.vector.sqlite_vec import LocalSQLiteVecStore # type: ignore[import] + store = LocalSQLiteVecStore(db_path=VEC_DB_PATH, table="page_vecs", dimensions=768) + store.delete_where({"doc_id": doc_id}) + except Exception as exc: + logger.warning("Could not remove vectors for doc %s: %s", doc_id, exc) + + if _mark_bm25_dirty: + _mark_bm25_dirty() + + +@router.get("/{doc_id}/status") +def document_status( + doc_id: str, + db: sqlite3.Connection = Depends(get_db), +) -> dict: + row = db.execute( + "SELECT id, status, task_id, page_count, error_msg FROM documents WHERE id=?", + [doc_id], + ).fetchone() + if not row: + raise HTTPException(status_code=404, detail="Document not found") + return dict(row) diff --git a/app/api/search.py b/app/api/search.py new file mode 100644 index 0000000..7c47da4 --- /dev/null +++ b/app/api/search.py @@ -0,0 +1,5 @@ +# app/api/search.py +"""Search API — BM25 keyword and RAG vector search (Task 5).""" +from fastapi import APIRouter + +router = APIRouter(prefix="/api/search", tags=["search"]) diff --git a/app/deps.py b/app/deps.py new file mode 100644 index 0000000..e795afb --- /dev/null +++ b/app/deps.py @@ -0,0 +1,19 @@ +# app/deps.py +"""FastAPI dependency providers.""" +from __future__ import annotations + +import sqlite3 +from typing import Generator + +from app.config import DB_PATH + + +def get_db() -> Generator[sqlite3.Connection, None, None]: + conn = sqlite3.connect(DB_PATH) + conn.execute("PRAGMA foreign_keys = ON") + conn.execute("PRAGMA journal_mode = WAL") + conn.row_factory = sqlite3.Row + try: + yield conn + finally: + conn.close() diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..7469d3d --- /dev/null +++ b/app/main.py @@ -0,0 +1,46 @@ +# app/main.py +"""FastAPI application factory for pagepiper.""" +from __future__ import annotations + +import logging +from contextlib import asynccontextmanager + +from fastapi import FastAPI + +from app.config import DB_PATH +from app.services.bm25_index import BM25Index + +logger = logging.getLogger("pagepiper") + +# Module-level BM25 singleton — shared across all requests +_bm25 = BM25Index() + + +def _apply_migrations() -> None: + from scripts.db_migrate import migrate + migrate(DB_PATH) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + _apply_migrations() + _bm25.mark_dirty() # will rebuild on first search + yield + + +app = FastAPI(title="Pagepiper", lifespan=lifespan) + +# Wire BM25 dirty callback into library router +from app.api import library as _lib_module # noqa: E402 +_lib_module._mark_bm25_dirty = _bm25.mark_dirty + +# Register routers +from app.api.library import router as library_router # noqa: E402 +from app.api.ingest import router as ingest_router # noqa: E402 +from app.api.search import router as search_router # noqa: E402 +from app.api.chat import router as chat_router # noqa: E402 + +app.include_router(library_router) +app.include_router(ingest_router) +app.include_router(search_router) +app.include_router(chat_router) diff --git a/scripts/ingest_pdf.py b/scripts/ingest_pdf.py new file mode 100644 index 0000000..ff87b6f --- /dev/null +++ b/scripts/ingest_pdf.py @@ -0,0 +1,11 @@ +# scripts/ingest_pdf.py +"""Ingest script stub — full implementation in T5.""" +from __future__ import annotations + +import logging + +logger = logging.getLogger("pagepiper.ingest") + + +def run(doc_id: str, file_path: str, db_path: str, vec_db_path: str) -> None: + logger.info("Stub ingest run for doc %s at %s", doc_id, file_path) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..c204030 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,44 @@ +# tests/conftest.py +"""Shared fixtures for pagepiper test suite.""" +from __future__ import annotations + +import sqlite3 +from pathlib import Path + +import pytest +from fastapi.testclient import TestClient + + +@pytest.fixture +def test_db(tmp_path) -> str: + db_path = str(tmp_path / "test.db") + schema = Path("migrations/001_initial_schema.sql").read_text() + conn = sqlite3.connect(db_path) + conn.executescript(schema) + conn.commit() + conn.close() + return db_path + + +@pytest.fixture +def client(test_db, tmp_path, monkeypatch): + monkeypatch.setenv("PAGEPIPER_DATA_DIR", str(tmp_path)) + monkeypatch.setenv("PAGEPIPER_WATCH_DIR", str(tmp_path / "books")) + (tmp_path / "books").mkdir(exist_ok=True) + + from app.main import app, _bm25 + from app.deps import get_db + + def override_db(): + conn = sqlite3.connect(test_db) + conn.execute("PRAGMA foreign_keys = ON") + conn.row_factory = sqlite3.Row + try: + yield conn + finally: + conn.close() + + app.dependency_overrides[get_db] = override_db + _bm25.mark_dirty() # clear any state from previous tests + yield TestClient(app) + app.dependency_overrides.clear() diff --git a/tests/test_library_api.py b/tests/test_library_api.py new file mode 100644 index 0000000..180d14b --- /dev/null +++ b/tests/test_library_api.py @@ -0,0 +1,54 @@ +# tests/test_library_api.py +"""Tests for GET/POST /api/library endpoints.""" +from __future__ import annotations + +import sqlite3 + + +def _add_doc(db_path: str, title: str, path: str, status: str = "ready") -> str: + conn = sqlite3.connect(db_path) + doc_id = conn.execute( + "INSERT INTO documents(title, file_path, status) VALUES (?,?,?) RETURNING id", + [title, path, status], + ).fetchone()[0] + conn.commit() + conn.close() + return doc_id + + +def test_list_library_empty(client): + resp = client.get("/api/library") + assert resp.status_code == 200 + assert resp.json() == [] + + +def test_list_library_returns_documents(client, test_db): + _add_doc(test_db, "Player's Handbook", "/books/phb.pdf") + resp = client.get("/api/library") + assert resp.status_code == 200 + docs = resp.json() + assert len(docs) == 1 + assert docs[0]["title"] == "Player's Handbook" + assert "status" in docs[0] + + +def test_delete_document_removes_record(client, test_db): + doc_id = _add_doc(test_db, "Monster Manual", "/books/mm.pdf") + resp = client.delete(f"/api/library/{doc_id}") + assert resp.status_code == 204 + resp2 = client.get("/api/library") + assert resp2.json() == [] + + +def test_delete_nonexistent_returns_404(client): + resp = client.delete("/api/library/does-not-exist") + assert resp.status_code == 404 + + +def test_reingest_returns_task_id(client, test_db, tmp_path): + pdf_path = str(tmp_path / "books" / "test.pdf") + open(pdf_path, "wb").write(b"%PDF-1.4") + doc_id = _add_doc(test_db, "Test Book", pdf_path) + resp = client.post(f"/api/library/{doc_id}/reingest") + assert resp.status_code == 202 + assert "task_id" in resp.json()