feat(ingest): add full PDF ingest pipeline (cf-orch task, BYOK embed)
This commit is contained in:
parent
751faf1679
commit
f4574dd05e
3 changed files with 288 additions and 2 deletions
|
|
@ -1,11 +1,143 @@
|
|||
# scripts/ingest_pdf.py
|
||||
"""Ingest script stub — full implementation in T5."""
|
||||
"""
|
||||
cf-orch task: pagepiper/ingest_pdf
|
||||
|
||||
Extracts text from a PDF, stores page chunks in SQLite, and (if Ollama is
|
||||
configured) generates embeddings and stores them in the sqlite-vec store.
|
||||
|
||||
Entry point:
|
||||
python scripts/ingest_pdf.py --doc-id X --file-path Y --db-path Z --vec-db-path W
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger("pagepiper.ingest")
|
||||
|
||||
|
||||
def _update_status(
|
||||
conn: sqlite3.Connection,
|
||||
doc_id: str,
|
||||
status: str,
|
||||
page_count: int | None = None,
|
||||
error_msg: str | None = None,
|
||||
) -> None:
|
||||
if page_count is not None:
|
||||
conn.execute(
|
||||
"UPDATE documents SET status=?, page_count=?, updated_at=datetime('now') WHERE id=?",
|
||||
[status, page_count, doc_id],
|
||||
)
|
||||
elif error_msg is not None:
|
||||
conn.execute(
|
||||
"UPDATE documents SET status=?, error_msg=?, updated_at=datetime('now') WHERE id=?",
|
||||
[status, error_msg, doc_id],
|
||||
)
|
||||
else:
|
||||
conn.execute(
|
||||
"UPDATE documents SET status=?, updated_at=datetime('now') WHERE id=?",
|
||||
[status, doc_id],
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def run(doc_id: str, file_path: str, db_path: str, vec_db_path: str) -> None:
|
||||
logger.info("Stub ingest run for doc %s at %s", doc_id, file_path)
|
||||
"""Run the full ingest pipeline for one PDF. Called by cf-orch or BackgroundTasks."""
|
||||
from circuitforge_core.documents.pdf import PDFExtractor
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.execute("PRAGMA foreign_keys = ON")
|
||||
|
||||
try:
|
||||
_update_status(conn, doc_id, "processing")
|
||||
|
||||
# Step 1: Extract page chunks
|
||||
logger.info("Extracting text from %s", file_path)
|
||||
extractor = PDFExtractor(ocr_min_words=10)
|
||||
chunks = extractor.chunk_pages(file_path)
|
||||
logger.info("Extracted %d pages", len(chunks))
|
||||
|
||||
# Step 2: Store chunks (replace any existing for this doc)
|
||||
conn.execute("DELETE FROM page_chunks WHERE doc_id=?", [doc_id])
|
||||
chunk_rows: list[tuple[str, int, str]] = []
|
||||
for chunk in chunks:
|
||||
row = conn.execute(
|
||||
"""INSERT INTO page_chunks(doc_id, page_number, text, source, word_count)
|
||||
VALUES (?,?,?,?,?) RETURNING id""",
|
||||
[doc_id, chunk.page_number, chunk.text, chunk.source, chunk.word_count],
|
||||
).fetchone()
|
||||
chunk_rows.append((row[0], chunk.page_number, chunk.text))
|
||||
conn.commit()
|
||||
|
||||
# Step 3: Embed and store vectors if Ollama is configured (BYOK gate)
|
||||
ollama_url = os.environ.get("PAGEPIPER_OLLAMA_URL", "").strip()
|
||||
if ollama_url and chunks:
|
||||
logger.info("Embedding %d pages via Ollama at %s", len(chunks), ollama_url)
|
||||
from circuitforge_core.llm import LLMRouter
|
||||
from circuitforge_core.vector.sqlite_vec import LocalSQLiteVecStore
|
||||
|
||||
_clean = ollama_url.rstrip("/")
|
||||
base_url = _clean if _clean.endswith("/v1") else _clean + "/v1"
|
||||
router = LLMRouter({
|
||||
"fallback_order": ["ollama"],
|
||||
"backends": {
|
||||
"ollama": {
|
||||
"type": "openai_compat",
|
||||
"base_url": base_url,
|
||||
"model": os.environ.get("PAGEPIPER_CHAT_MODEL", "mistral:7b"),
|
||||
"embedding_model": os.environ.get(
|
||||
"PAGEPIPER_EMBED_MODEL", "nomic-embed-text"
|
||||
),
|
||||
"supports_images": False,
|
||||
}
|
||||
},
|
||||
})
|
||||
vec_store = LocalSQLiteVecStore(
|
||||
db_path=vec_db_path, table="page_vecs", dimensions=768
|
||||
)
|
||||
# Remove old vectors for this doc before re-inserting
|
||||
vec_store.delete_where({"doc_id": doc_id})
|
||||
|
||||
texts = [text for _, _, text in chunk_rows]
|
||||
vectors = router.embed(texts)
|
||||
|
||||
for (chunk_id, page_number, _), vector in zip(chunk_rows, vectors):
|
||||
vec_store.upsert(
|
||||
entry_id=chunk_id,
|
||||
vector=vector,
|
||||
metadata={"doc_id": doc_id, "page_number": page_number},
|
||||
)
|
||||
logger.info("Stored %d embeddings", len(vectors))
|
||||
|
||||
_update_status(conn, doc_id, "ready", page_count=len(chunks))
|
||||
logger.info("Ingest complete for doc %s (%d pages)", doc_id, len(chunks))
|
||||
|
||||
except Exception as exc:
|
||||
logger.error("Ingest failed for doc %s: %s", doc_id, exc, exc_info=True)
|
||||
_update_status(conn, doc_id, "error", error_msg=str(exc))
|
||||
raise
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Ingest a PDF (cf-orch task entry point)"
|
||||
)
|
||||
parser.add_argument("--doc-id", required=True)
|
||||
parser.add_argument("--file-path", required=True)
|
||||
parser.add_argument("--db-path", required=True)
|
||||
parser.add_argument("--vec-db-path", required=True)
|
||||
a = parser.parse_args()
|
||||
run(
|
||||
doc_id=a.doc_id,
|
||||
file_path=a.file_path,
|
||||
db_path=a.db_path,
|
||||
vec_db_path=a.vec_db_path,
|
||||
)
|
||||
|
|
|
|||
140
tests/test_ingest.py
Normal file
140
tests/test_ingest.py
Normal file
|
|
@ -0,0 +1,140 @@
|
|||
# tests/test_ingest.py
|
||||
"""Unit tests for scripts/ingest_pdf.py."""
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ingest_db(tmp_path) -> tuple[str, str]:
|
||||
db_path = str(tmp_path / "test.db")
|
||||
schema = Path("migrations/001_initial_schema.sql").read_text()
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.executescript(schema)
|
||||
conn.execute(
|
||||
"INSERT INTO documents(id, title, file_path, status) VALUES ('d1','Test','test.pdf','pending')"
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
vec_db_path = str(tmp_path / "vecs.db")
|
||||
return db_path, vec_db_path
|
||||
|
||||
|
||||
def _make_mock_chunk(page_number: int = 1, text: str = "Some page text about rules.") -> MagicMock:
|
||||
chunk = MagicMock()
|
||||
chunk.page_number = page_number
|
||||
chunk.text = text
|
||||
chunk.source = "text_layer"
|
||||
chunk.word_count = len(text.split())
|
||||
return chunk
|
||||
|
||||
|
||||
def test_ingest_sets_status_ready_on_success(ingest_db):
|
||||
db_path, vec_db_path = ingest_db
|
||||
|
||||
mock_extractor = MagicMock()
|
||||
mock_extractor.chunk_pages.return_value = [_make_mock_chunk()]
|
||||
|
||||
with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor):
|
||||
from scripts.ingest_pdf import run
|
||||
run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path)
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
row = conn.execute("SELECT status, page_count FROM documents WHERE id='d1'").fetchone()
|
||||
conn.close()
|
||||
assert row[0] == "ready"
|
||||
assert row[1] == 1
|
||||
|
||||
|
||||
def test_ingest_stores_page_chunks(ingest_db):
|
||||
db_path, vec_db_path = ingest_db
|
||||
|
||||
mock_extractor = MagicMock()
|
||||
chunks = [_make_mock_chunk(page_number=i + 1, text=f"Page {i+1} text content.") for i in range(3)]
|
||||
mock_extractor.chunk_pages.return_value = chunks
|
||||
|
||||
with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor):
|
||||
from scripts.ingest_pdf import run
|
||||
run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path)
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
rows = conn.execute(
|
||||
"SELECT page_number, text FROM page_chunks WHERE doc_id='d1' ORDER BY page_number"
|
||||
).fetchall()
|
||||
conn.close()
|
||||
assert len(rows) == 3
|
||||
assert rows[0][0] == 1
|
||||
assert "Page 1" in rows[0][1]
|
||||
|
||||
|
||||
def test_ingest_sets_error_status_on_failure(ingest_db):
|
||||
db_path, vec_db_path = ingest_db
|
||||
|
||||
with patch("circuitforge_core.documents.pdf.PDFExtractor", side_effect=RuntimeError("PDF corrupt")):
|
||||
from scripts.ingest_pdf import run
|
||||
with pytest.raises(RuntimeError):
|
||||
run(doc_id="d1", file_path="bad.pdf", db_path=db_path, vec_db_path=vec_db_path)
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
row = conn.execute("SELECT status, error_msg FROM documents WHERE id='d1'").fetchone()
|
||||
conn.close()
|
||||
assert row[0] == "error"
|
||||
assert "PDF corrupt" in row[1]
|
||||
|
||||
|
||||
def test_ingest_skips_embeddings_without_ollama_url(ingest_db, monkeypatch):
|
||||
"""When PAGEPIPER_OLLAMA_URL is unset, no vec DB file should be created."""
|
||||
db_path, vec_db_path = ingest_db
|
||||
monkeypatch.delenv("PAGEPIPER_OLLAMA_URL", raising=False)
|
||||
|
||||
mock_extractor = MagicMock()
|
||||
mock_extractor.chunk_pages.return_value = [_make_mock_chunk()]
|
||||
|
||||
with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor):
|
||||
from scripts.ingest_pdf import run
|
||||
run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path)
|
||||
|
||||
# No embeddings were requested, so the vec DB should not have been created
|
||||
assert not Path(vec_db_path).exists(), "vec DB should not be created without OLLAMA_URL"
|
||||
|
||||
# Document should still be ready with chunks stored
|
||||
conn = sqlite3.connect(db_path)
|
||||
status = conn.execute("SELECT status FROM documents WHERE id='d1'").fetchone()[0]
|
||||
chunk_count = conn.execute(
|
||||
"SELECT COUNT(*) FROM page_chunks WHERE doc_id='d1'"
|
||||
).fetchone()[0]
|
||||
conn.close()
|
||||
assert status == "ready"
|
||||
assert chunk_count == 1
|
||||
|
||||
|
||||
def test_ingest_replaces_existing_chunks_on_reingest(ingest_db):
|
||||
"""Re-running ingest for the same doc_id replaces old page_chunks."""
|
||||
db_path, vec_db_path = ingest_db
|
||||
|
||||
mock_extractor = MagicMock()
|
||||
|
||||
# First ingest: 3 pages
|
||||
mock_extractor.chunk_pages.return_value = [
|
||||
_make_mock_chunk(page_number=i + 1, text=f"Original page {i+1}.") for i in range(3)
|
||||
]
|
||||
with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor):
|
||||
from scripts.ingest_pdf import run
|
||||
run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path)
|
||||
|
||||
# Second ingest: 1 page (simulating a re-ingest after file change)
|
||||
mock_extractor.chunk_pages.return_value = [_make_mock_chunk(text="Updated single page.")]
|
||||
with patch("circuitforge_core.documents.pdf.PDFExtractor", return_value=mock_extractor):
|
||||
run(doc_id="d1", file_path="test.pdf", db_path=db_path, vec_db_path=vec_db_path)
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
rows = conn.execute(
|
||||
"SELECT text FROM page_chunks WHERE doc_id='d1'"
|
||||
).fetchall()
|
||||
conn.close()
|
||||
assert len(rows) == 1
|
||||
assert "Updated" in rows[0][0]
|
||||
|
|
@ -52,3 +52,17 @@ def test_reingest_returns_task_id(client, test_db, tmp_path):
|
|||
resp = client.post(f"/api/library/{doc_id}/reingest")
|
||||
assert resp.status_code == 202
|
||||
assert "task_id" in resp.json()
|
||||
|
||||
|
||||
def test_reingest_updates_status_to_processing(client, test_db, tmp_path):
|
||||
from pathlib import Path
|
||||
pdf_path = str(tmp_path / "books" / "dm_guide.pdf")
|
||||
Path(pdf_path).write_bytes(b"%PDF-1.4 empty fixture")
|
||||
doc_id = _add_doc(test_db, "DM Guide", pdf_path)
|
||||
|
||||
resp = client.post(f"/api/library/{doc_id}/reingest")
|
||||
assert resp.status_code == 202
|
||||
|
||||
# Document should be in processing state (or beyond if stub ingest ran instantly)
|
||||
status_resp = client.get(f"/api/library/{doc_id}/status")
|
||||
assert status_resp.json()["status"] in ("processing", "error", "ready")
|
||||
|
|
|
|||
Loading…
Reference in a new issue