# app/api/library.py """ Document library management API. All endpoints in this module are MIT — no tier gate. """ from __future__ import annotations import logging import sqlite3 import uuid from pathlib import Path from typing import Callable from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, UploadFile from app.config import WATCH_DIR, DB_PATH, VEC_DB_PATH, DATA_DIR from app.deps import get_db _MAX_UPLOAD_BYTES = 200 * 1024 * 1024 # 200 MB logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/library", tags=["library"]) # Injected by main.py after _bm25 is created _mark_bm25_dirty: Callable[[], None] | None = None _INGEST_TASKS = { ".pdf": "pagepiper/ingest_pdf", ".epub": "pagepiper/ingest_epub", } _INGEST_RUNNERS = { ".pdf": "scripts.ingest_pdf", ".epub": "scripts.ingest_epub", } def _dispatch_ingest( doc_id: str, file_path: str, background_tasks: BackgroundTasks, ) -> str: """Dispatch an ingest task. Tries cf-orch; falls back to BackgroundTasks.""" import importlib import os as _os from pathlib import Path as _Path suffix = _Path(file_path).suffix.lower() task_name = _INGEST_TASKS.get(suffix, "pagepiper/ingest_pdf") runner_module = _INGEST_RUNNERS.get(suffix, "scripts.ingest_pdf") # Read lazily so test fixtures (monkeypatch.setenv) take effect _data_dir = _Path(_os.environ.get("PAGEPIPER_DATA_DIR", "data")) task_id = str(uuid.uuid4()) args = { "doc_id": doc_id, "file_path": file_path, "db_path": str(_data_dir / "pagepiper.db"), "vec_db_path": str(_data_dir / "pagepiper_vecs.db"), } try: from circuitforge_core.tasks import dispatch_task # type: ignore[import] task_id = dispatch_task(caller=task_name, args=args) logger.info("Dispatched cf-orch ingest task %s for doc %s", task_id, doc_id) except Exception: mod = importlib.import_module(runner_module) background_tasks.add_task(_run_ingest_background, mod.run, args, task_id) logger.info( "cf-orch unavailable — running ingest in background thread (task %s)", task_id ) return task_id def _run_ingest_background(run_fn: Callable[..., None], args: dict, task_id: str) -> None: from app.api.ingest import _task_registry _task_registry[task_id] = {"status": "running", "progress": 0} try: run_fn(**args) _task_registry[task_id] = {"status": "complete", "progress": 100} if _mark_bm25_dirty: _mark_bm25_dirty() except Exception as exc: logger.exception("Ingest task %s failed", task_id) _task_registry[task_id] = {"status": "error", "error": str(exc)} @router.get("") def list_library(db: sqlite3.Connection = Depends(get_db)) -> list[dict]: rows = db.execute( "SELECT id, title, file_path, status, task_id, page_count, created_at" " FROM documents ORDER BY created_at DESC" ).fetchall() return [dict(r) for r in rows] @router.post("/scan", status_code=202) def scan_library( background_tasks: BackgroundTasks, db: sqlite3.Connection = Depends(get_db), ) -> dict: """Scan the watched directory and queue ingest for any new PDFs.""" watch = WATCH_DIR if not watch.exists(): raise HTTPException(status_code=404, detail=f"Watch directory not found: {watch}") pdfs = list(watch.glob("**/*.pdf")) + list(watch.glob("**/*.epub")) queued = [] for pdf_path in pdfs: path_str = str(pdf_path.resolve()) existing = db.execute( "SELECT id, status FROM documents WHERE file_path = ?", [path_str] ).fetchone() if existing and existing["status"] == "ready": continue # already indexed if existing: doc_id = existing["id"] else: title = pdf_path.stem.replace("_", " ").replace("-", " ").title() doc_id = db.execute( "INSERT INTO documents(title, file_path, status) VALUES (?,?,?) RETURNING id", [title, path_str, "pending"], ).fetchone()[0] db.commit() task_id = _dispatch_ingest(doc_id, path_str, background_tasks) db.execute( "UPDATE documents SET status='processing', task_id=? WHERE id=?", [task_id, doc_id], ) db.commit() queued.append({"doc_id": doc_id, "task_id": task_id}) return {"discovered": len(pdfs), "queued": len(queued), "tasks": queued} @router.post("/{doc_id}/reingest", status_code=202) def reingest_document( doc_id: str, background_tasks: BackgroundTasks, db: sqlite3.Connection = Depends(get_db), ) -> dict: row = db.execute("SELECT file_path FROM documents WHERE id=?", [doc_id]).fetchone() if not row: raise HTTPException(status_code=404, detail="Document not found") task_id = _dispatch_ingest(doc_id, row["file_path"], background_tasks) db.execute( "UPDATE documents SET status='processing', task_id=?, error_msg=NULL WHERE id=?", [task_id, doc_id], ) db.commit() return {"doc_id": doc_id, "task_id": task_id} @router.delete("/{doc_id}", status_code=204) def delete_document( doc_id: str, db: sqlite3.Connection = Depends(get_db), ) -> None: row = db.execute("SELECT id FROM documents WHERE id=?", [doc_id]).fetchone() if not row: raise HTTPException(status_code=404, detail="Document not found") db.execute("DELETE FROM documents WHERE id=?", [doc_id]) db.commit() # Remove embeddings from vector store try: from circuitforge_core.vector.sqlite_vec import LocalSQLiteVecStore # type: ignore[import] from app.config import VEC_DIMENSIONS store = LocalSQLiteVecStore(db_path=VEC_DB_PATH, table="page_vecs", dimensions=VEC_DIMENSIONS) store.delete_where({"doc_id": doc_id}) except Exception as exc: logger.warning("Could not remove vectors for doc %s: %s", doc_id, exc) if _mark_bm25_dirty: _mark_bm25_dirty() def _get_vec_count(doc_id: str) -> int: """Return how many vectors have been stored for this doc. Returns 0 on any error.""" try: conn = sqlite3.connect(VEC_DB_PATH) count = conn.execute( "SELECT COUNT(*) FROM page_vecs_meta WHERE json_extract(metadata, '$.doc_id') = ?", [doc_id], ).fetchone()[0] conn.close() return int(count) except Exception: return 0 @router.get("/{doc_id}/status") def document_status( doc_id: str, db: sqlite3.Connection = Depends(get_db), ) -> dict: row = db.execute( "SELECT id, status, task_id, page_count, error_msg FROM documents WHERE id=?", [doc_id], ).fetchone() if not row: raise HTTPException(status_code=404, detail="Document not found") result = dict(row) result["vec_count"] = _get_vec_count(doc_id) return result @router.post("/upload", status_code=202) def upload_document( file: UploadFile, background_tasks: BackgroundTasks, db: sqlite3.Connection = Depends(get_db), ) -> dict: """Accept a PDF/EPUB upload, save to data/uploads/, and queue for indexing.""" name = Path(file.filename or "").name suffix = Path(name).suffix.lower() if suffix not in _INGEST_TASKS: raise HTTPException(status_code=400, detail="Supported formats: PDF, EPUB") content = file.file.read() if len(content) > _MAX_UPLOAD_BYTES: raise HTTPException(status_code=413, detail="File exceeds 200 MB limit") upload_dir = DATA_DIR / "uploads" upload_dir.mkdir(parents=True, exist_ok=True) dest = upload_dir / name dest.write_bytes(content) path_str = str(dest.resolve()) existing = db.execute( "SELECT id, status FROM documents WHERE file_path = ?", [path_str] ).fetchone() if existing and existing["status"] == "ready": return {"doc_id": existing["id"], "task_id": None, "filename": name, "status": "already_indexed"} if existing: doc_id = existing["id"] else: title = dest.stem.replace("_", " ").replace("-", " ").title() doc_id = db.execute( "INSERT INTO documents(title, file_path, status) VALUES (?,?,?) RETURNING id", [title, path_str, "pending"], ).fetchone()[0] db.commit() task_id = _dispatch_ingest(doc_id, path_str, background_tasks) db.execute( "UPDATE documents SET status='processing', task_id=? WHERE id=?", [task_id, doc_id], ) db.commit() return {"doc_id": doc_id, "task_id": task_id, "filename": name, "status": "queued"}