fix(ingest): batch embedding, connection guard, correct upsert id param, module-level imports in tests

This commit is contained in:
pyr0ball 2026-05-04 17:36:18 -07:00
parent f4574dd05e
commit c6fa9baf2c
2 changed files with 21 additions and 12 deletions

View file

@ -17,6 +17,9 @@ from pathlib import Path
logger = logging.getLogger("pagepiper.ingest")
# Pages to embed per Ollama API call — avoids hitting request size limits on large PDFs
EMBED_BATCH_SIZE = 64
def _update_status(
conn: sqlite3.Connection,
@ -47,10 +50,10 @@ def run(doc_id: str, file_path: str, db_path: str, vec_db_path: str) -> None:
"""Run the full ingest pipeline for one PDF. Called by cf-orch or BackgroundTasks."""
from circuitforge_core.documents.pdf import PDFExtractor
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA foreign_keys = ON")
conn: sqlite3.Connection | None = None
try:
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA foreign_keys = ON")
_update_status(conn, doc_id, "processing")
# Step 1: Extract page chunks
@ -97,15 +100,18 @@ def run(doc_id: str, file_path: str, db_path: str, vec_db_path: str) -> None:
vec_store = LocalSQLiteVecStore(
db_path=vec_db_path, table="page_vecs", dimensions=768
)
# Remove old vectors for this doc before re-inserting
# Remove old vectors before re-inserting. If embedding fails mid-way,
# old vectors are gone but new ones are partial — re-ingest recovers.
vec_store.delete_where({"doc_id": doc_id})
texts = [text for _, _, text in chunk_rows]
vectors = router.embed(texts)
vectors: list[list[float]] = []
for i in range(0, len(texts), EMBED_BATCH_SIZE):
vectors.extend(router.embed(texts[i : i + EMBED_BATCH_SIZE]))
for (chunk_id, page_number, _), vector in zip(chunk_rows, vectors):
vec_store.upsert(
entry_id=chunk_id,
id=chunk_id,
vector=vector,
metadata={"doc_id": doc_id, "page_number": page_number},
)
@ -116,10 +122,15 @@ def run(doc_id: str, file_path: str, db_path: str, vec_db_path: str) -> None:
except Exception as exc:
logger.error("Ingest failed for doc %s: %s", doc_id, exc, exc_info=True)
_update_status(conn, doc_id, "error", error_msg=str(exc))
if conn is not None:
try:
_update_status(conn, doc_id, "error", error_msg=str(exc))
except Exception:
logger.warning("Could not write error status for doc %s", doc_id)
raise
finally:
conn.close()
if conn is not None:
conn.close()
if __name__ == "__main__":

View file

@ -8,6 +8,8 @@ from unittest.mock import MagicMock, patch
import pytest
from scripts.ingest_pdf import run
@pytest.fixture
def ingest_db(tmp_path) -> tuple[str, str]:
@ -40,7 +42,6 @@ def test_ingest_sets_status_ready_on_success(ingest_db):
mock_extractor.chunk_pages.return_value = [_make_mock_chunk()]
with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor):
from scripts.ingest_pdf import run
run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path)
conn = sqlite3.connect(db_path)
@ -58,7 +59,6 @@ def test_ingest_stores_page_chunks(ingest_db):
mock_extractor.chunk_pages.return_value = chunks
with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor):
from scripts.ingest_pdf import run
run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path)
conn = sqlite3.connect(db_path)
@ -95,7 +95,6 @@ def test_ingest_skips_embeddings_without_ollama_url(ingest_db, monkeypatch):
mock_extractor.chunk_pages.return_value = [_make_mock_chunk()]
with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor):
from scripts.ingest_pdf import run
run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path)
# No embeddings were requested, so the vec DB should not have been created
@ -123,7 +122,6 @@ def test_ingest_replaces_existing_chunks_on_reingest(ingest_db):
_make_mock_chunk(page_number=i + 1, text=f"Original page {i+1}.") for i in range(3)
]
with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor):
from scripts.ingest_pdf import run
run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path)
# Second ingest: 1 page (simulating a re-ingest after file change)