diff --git a/scripts/ingest_pdf.py b/scripts/ingest_pdf.py index ff87b6f..5115480 100644 --- a/scripts/ingest_pdf.py +++ b/scripts/ingest_pdf.py @@ -1,11 +1,143 @@ # scripts/ingest_pdf.py -"""Ingest script stub — full implementation in T5.""" +""" +cf-orch task: pagepiper/ingest_pdf + +Extracts text from a PDF, stores page chunks in SQLite, and (if Ollama is +configured) generates embeddings and stores them in the sqlite-vec store. + +Entry point: + python scripts/ingest_pdf.py --doc-id X --file-path Y --db-path Z --vec-db-path W +""" from __future__ import annotations import logging +import os +import sqlite3 +from pathlib import Path logger = logging.getLogger("pagepiper.ingest") +def _update_status( + conn: sqlite3.Connection, + doc_id: str, + status: str, + page_count: int | None = None, + error_msg: str | None = None, +) -> None: + if page_count is not None: + conn.execute( + "UPDATE documents SET status=?, page_count=?, updated_at=datetime('now') WHERE id=?", + [status, page_count, doc_id], + ) + elif error_msg is not None: + conn.execute( + "UPDATE documents SET status=?, error_msg=?, updated_at=datetime('now') WHERE id=?", + [status, error_msg, doc_id], + ) + else: + conn.execute( + "UPDATE documents SET status=?, updated_at=datetime('now') WHERE id=?", + [status, doc_id], + ) + conn.commit() + + def run(doc_id: str, file_path: str, db_path: str, vec_db_path: str) -> None: - logger.info("Stub ingest run for doc %s at %s", doc_id, file_path) + """Run the full ingest pipeline for one PDF. Called by cf-orch or BackgroundTasks.""" + from circuitforge_core.documents.pdf import PDFExtractor + + conn = sqlite3.connect(db_path) + conn.execute("PRAGMA foreign_keys = ON") + + try: + _update_status(conn, doc_id, "processing") + + # Step 1: Extract page chunks + logger.info("Extracting text from %s", file_path) + extractor = PDFExtractor(ocr_min_words=10) + chunks = extractor.chunk_pages(file_path) + logger.info("Extracted %d pages", len(chunks)) + + # Step 2: Store chunks (replace any existing for this doc) + conn.execute("DELETE FROM page_chunks WHERE doc_id=?", [doc_id]) + chunk_rows: list[tuple[str, int, str]] = [] + for chunk in chunks: + row = conn.execute( + """INSERT INTO page_chunks(doc_id, page_number, text, source, word_count) + VALUES (?,?,?,?,?) RETURNING id""", + [doc_id, chunk.page_number, chunk.text, chunk.source, chunk.word_count], + ).fetchone() + chunk_rows.append((row[0], chunk.page_number, chunk.text)) + conn.commit() + + # Step 3: Embed and store vectors if Ollama is configured (BYOK gate) + ollama_url = os.environ.get("PAGEPIPER_OLLAMA_URL", "").strip() + if ollama_url and chunks: + logger.info("Embedding %d pages via Ollama at %s", len(chunks), ollama_url) + from circuitforge_core.llm import LLMRouter + from circuitforge_core.vector.sqlite_vec import LocalSQLiteVecStore + + _clean = ollama_url.rstrip("/") + base_url = _clean if _clean.endswith("/v1") else _clean + "/v1" + router = LLMRouter({ + "fallback_order": ["ollama"], + "backends": { + "ollama": { + "type": "openai_compat", + "base_url": base_url, + "model": os.environ.get("PAGEPIPER_CHAT_MODEL", "mistral:7b"), + "embedding_model": os.environ.get( + "PAGEPIPER_EMBED_MODEL", "nomic-embed-text" + ), + "supports_images": False, + } + }, + }) + vec_store = LocalSQLiteVecStore( + db_path=vec_db_path, table="page_vecs", dimensions=768 + ) + # Remove old vectors for this doc before re-inserting + vec_store.delete_where({"doc_id": doc_id}) + + texts = [text for _, _, text in chunk_rows] + vectors = router.embed(texts) + + for (chunk_id, page_number, _), vector in zip(chunk_rows, vectors): + vec_store.upsert( + entry_id=chunk_id, + vector=vector, + metadata={"doc_id": doc_id, "page_number": page_number}, + ) + logger.info("Stored %d embeddings", len(vectors)) + + _update_status(conn, doc_id, "ready", page_count=len(chunks)) + logger.info("Ingest complete for doc %s (%d pages)", doc_id, len(chunks)) + + except Exception as exc: + logger.error("Ingest failed for doc %s: %s", doc_id, exc, exc_info=True) + _update_status(conn, doc_id, "error", error_msg=str(exc)) + raise + finally: + conn.close() + + +if __name__ == "__main__": + import argparse + + logging.basicConfig(level=logging.INFO) + + parser = argparse.ArgumentParser( + description="Ingest a PDF (cf-orch task entry point)" + ) + parser.add_argument("--doc-id", required=True) + parser.add_argument("--file-path", required=True) + parser.add_argument("--db-path", required=True) + parser.add_argument("--vec-db-path", required=True) + a = parser.parse_args() + run( + doc_id=a.doc_id, + file_path=a.file_path, + db_path=a.db_path, + vec_db_path=a.vec_db_path, + ) diff --git a/tests/test_ingest.py b/tests/test_ingest.py new file mode 100644 index 0000000..2e82838 --- /dev/null +++ b/tests/test_ingest.py @@ -0,0 +1,140 @@ +# tests/test_ingest.py +"""Unit tests for scripts/ingest_pdf.py.""" +from __future__ import annotations + +import sqlite3 +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + + +@pytest.fixture +def ingest_db(tmp_path) -> tuple[str, str]: + db_path = str(tmp_path / "test.db") + schema = Path("migrations/001_initial_schema.sql").read_text() + conn = sqlite3.connect(db_path) + conn.executescript(schema) + conn.execute( + "INSERT INTO documents(id, title, file_path, status) VALUES ('d1','Test','test.pdf','pending')" + ) + conn.commit() + conn.close() + vec_db_path = str(tmp_path / "vecs.db") + return db_path, vec_db_path + + +def _make_mock_chunk(page_number: int = 1, text: str = "Some page text about rules.") -> MagicMock: + chunk = MagicMock() + chunk.page_number = page_number + chunk.text = text + chunk.source = "text_layer" + chunk.word_count = len(text.split()) + return chunk + + +def test_ingest_sets_status_ready_on_success(ingest_db): + db_path, vec_db_path = ingest_db + + mock_extractor = MagicMock() + mock_extractor.chunk_pages.return_value = [_make_mock_chunk()] + + with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor): + from scripts.ingest_pdf import run + run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path) + + conn = sqlite3.connect(db_path) + row = conn.execute("SELECT status, page_count FROM documents WHERE id='d1'").fetchone() + conn.close() + assert row[0] == "ready" + assert row[1] == 1 + + +def test_ingest_stores_page_chunks(ingest_db): + db_path, vec_db_path = ingest_db + + mock_extractor = MagicMock() + chunks = [_make_mock_chunk(page_number=i + 1, text=f"Page {i+1} text content.") for i in range(3)] + mock_extractor.chunk_pages.return_value = chunks + + with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor): + from scripts.ingest_pdf import run + run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path) + + conn = sqlite3.connect(db_path) + rows = conn.execute( + "SELECT page_number, text FROM page_chunks WHERE doc_id='d1' ORDER BY page_number" + ).fetchall() + conn.close() + assert len(rows) == 3 + assert rows[0][0] == 1 + assert "Page 1" in rows[0][1] + + +def test_ingest_sets_error_status_on_failure(ingest_db): + db_path, vec_db_path = ingest_db + + with patch("circuitforge_core.documents.pdf.PDFExtractor", side_effect=RuntimeError("PDF corrupt")): + from scripts.ingest_pdf import run + with pytest.raises(RuntimeError): + run(doc_id="d1", file_path="bad.pdf", db_path=db_path, vec_db_path=vec_db_path) + + conn = sqlite3.connect(db_path) + row = conn.execute("SELECT status, error_msg FROM documents WHERE id='d1'").fetchone() + conn.close() + assert row[0] == "error" + assert "PDF corrupt" in row[1] + + +def test_ingest_skips_embeddings_without_ollama_url(ingest_db, monkeypatch): + """When PAGEPIPER_OLLAMA_URL is unset, no vec DB file should be created.""" + db_path, vec_db_path = ingest_db + monkeypatch.delenv("PAGEPIPER_OLLAMA_URL", raising=False) + + mock_extractor = MagicMock() + mock_extractor.chunk_pages.return_value = [_make_mock_chunk()] + + with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor): + from scripts.ingest_pdf import run + run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path) + + # No embeddings were requested, so the vec DB should not have been created + assert not Path(vec_db_path).exists(), "vec DB should not be created without OLLAMA_URL" + + # Document should still be ready with chunks stored + conn = sqlite3.connect(db_path) + status = conn.execute("SELECT status FROM documents WHERE id='d1'").fetchone()[0] + chunk_count = conn.execute( + "SELECT COUNT(*) FROM page_chunks WHERE doc_id='d1'" + ).fetchone()[0] + conn.close() + assert status == "ready" + assert chunk_count == 1 + + +def test_ingest_replaces_existing_chunks_on_reingest(ingest_db): + """Re-running ingest for the same doc_id replaces old page_chunks.""" + db_path, vec_db_path = ingest_db + + mock_extractor = MagicMock() + + # First ingest: 3 pages + mock_extractor.chunk_pages.return_value = [ + _make_mock_chunk(page_number=i + 1, text=f"Original page {i+1}.") for i in range(3) + ] + with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor): + from scripts.ingest_pdf import run + run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path) + + # Second ingest: 1 page (simulating a re-ingest after file change) + mock_extractor.chunk_pages.return_value = [_make_mock_chunk(text="Updated single page.")] + with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor): + run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path) + + conn = sqlite3.connect(db_path) + rows = conn.execute( + "SELECT text FROM page_chunks WHERE doc_id='d1'" + ).fetchall() + conn.close() + assert len(rows) == 1 + assert "Updated" in rows[0][0] diff --git a/tests/test_library_api.py b/tests/test_library_api.py index 180d14b..b1a16a5 100644 --- a/tests/test_library_api.py +++ b/tests/test_library_api.py @@ -52,3 +52,17 @@ def test_reingest_returns_task_id(client, test_db, tmp_path): resp = client.post(f"/api/library/{doc_id}/reingest") assert resp.status_code == 202 assert "task_id" in resp.json() + + +def test_reingest_updates_status_to_processing(client, test_db, tmp_path): + from pathlib import Path + pdf_path = str(tmp_path / "books" / "dm_guide.pdf") + Path(pdf_path).write_bytes(b"%PDF-1.4 empty fixture") + doc_id = _add_doc(test_db, "DM Guide", pdf_path) + + resp = client.post(f"/api/library/{doc_id}/reingest") + assert resp.status_code == 202 + + # Document should be in processing state (or beyond if stub ingest ran instantly) + status_resp = client.get(f"/api/library/{doc_id}/status") + assert status_resp.json()["status"] in ("processing", "error", "ready")