From c6fa9baf2cb4b3d5b8412ccc9d73be663d10daf8 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 4 May 2026 17:36:18 -0700 Subject: [PATCH] fix(ingest): batch embedding, connection guard, correct upsert id param, module-level imports in tests --- scripts/ingest_pdf.py | 27 +++++++++++++++++++-------- tests/test_ingest.py | 6 ++---- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/scripts/ingest_pdf.py b/scripts/ingest_pdf.py index 5115480..a6bf9af 100644 --- a/scripts/ingest_pdf.py +++ b/scripts/ingest_pdf.py @@ -17,6 +17,9 @@ from pathlib import Path logger = logging.getLogger("pagepiper.ingest") +# Pages to embed per Ollama API call — avoids hitting request size limits on large PDFs +EMBED_BATCH_SIZE = 64 + def _update_status( conn: sqlite3.Connection, @@ -47,10 +50,10 @@ def run(doc_id: str, file_path: str, db_path: str, vec_db_path: str) -> None: """Run the full ingest pipeline for one PDF. Called by cf-orch or BackgroundTasks.""" from circuitforge_core.documents.pdf import PDFExtractor - conn = sqlite3.connect(db_path) - conn.execute("PRAGMA foreign_keys = ON") - + conn: sqlite3.Connection | None = None try: + conn = sqlite3.connect(db_path) + conn.execute("PRAGMA foreign_keys = ON") _update_status(conn, doc_id, "processing") # Step 1: Extract page chunks @@ -97,15 +100,18 @@ def run(doc_id: str, file_path: str, db_path: str, vec_db_path: str) -> None: vec_store = LocalSQLiteVecStore( db_path=vec_db_path, table="page_vecs", dimensions=768 ) - # Remove old vectors for this doc before re-inserting + # Remove old vectors before re-inserting. If embedding fails mid-way, + # old vectors are gone but new ones are partial — re-ingest recovers. vec_store.delete_where({"doc_id": doc_id}) texts = [text for _, _, text in chunk_rows] - vectors = router.embed(texts) + vectors: list[list[float]] = [] + for i in range(0, len(texts), EMBED_BATCH_SIZE): + vectors.extend(router.embed(texts[i : i + EMBED_BATCH_SIZE])) for (chunk_id, page_number, _), vector in zip(chunk_rows, vectors): vec_store.upsert( - entry_id=chunk_id, + id=chunk_id, vector=vector, metadata={"doc_id": doc_id, "page_number": page_number}, ) @@ -116,10 +122,15 @@ def run(doc_id: str, file_path: str, db_path: str, vec_db_path: str) -> None: except Exception as exc: logger.error("Ingest failed for doc %s: %s", doc_id, exc, exc_info=True) - _update_status(conn, doc_id, "error", error_msg=str(exc)) + if conn is not None: + try: + _update_status(conn, doc_id, "error", error_msg=str(exc)) + except Exception: + logger.warning("Could not write error status for doc %s", doc_id) raise finally: - conn.close() + if conn is not None: + conn.close() if __name__ == "__main__": diff --git a/tests/test_ingest.py b/tests/test_ingest.py index 2e82838..2e42dac 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -8,6 +8,8 @@ from unittest.mock import MagicMock, patch import pytest +from scripts.ingest_pdf import run + @pytest.fixture def ingest_db(tmp_path) -> tuple[str, str]: @@ -40,7 +42,6 @@ def test_ingest_sets_status_ready_on_success(ingest_db): mock_extractor.chunk_pages.return_value = [_make_mock_chunk()] with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor): - from scripts.ingest_pdf import run run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path) conn = sqlite3.connect(db_path) @@ -58,7 +59,6 @@ def test_ingest_stores_page_chunks(ingest_db): mock_extractor.chunk_pages.return_value = chunks with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor): - from scripts.ingest_pdf import run run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path) conn = sqlite3.connect(db_path) @@ -95,7 +95,6 @@ def test_ingest_skips_embeddings_without_ollama_url(ingest_db, monkeypatch): mock_extractor.chunk_pages.return_value = [_make_mock_chunk()] with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor): - from scripts.ingest_pdf import run run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path) # No embeddings were requested, so the vec DB should not have been created @@ -123,7 +122,6 @@ def test_ingest_replaces_existing_chunks_on_reingest(ingest_db): _make_mock_chunk(page_number=i + 1, text=f"Original page {i+1}.") for i in range(3) ] with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor): - from scripts.ingest_pdf import run run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path) # Second ingest: 1 page (simulating a re-ingest after file change)