154 lines
5.7 KiB
Python
154 lines
5.7 KiB
Python
# scripts/ingest_pdf.py
|
|
"""
|
|
cf-orch task: pagepiper/ingest_pdf
|
|
|
|
Extracts text from a PDF, stores page chunks in SQLite, and (if Ollama is
|
|
configured) generates embeddings and stores them in the sqlite-vec store.
|
|
|
|
Entry point:
|
|
python scripts/ingest_pdf.py --doc-id X --file-path Y --db-path Z --vec-db-path W
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger("pagepiper.ingest")
|
|
|
|
# Pages to embed per Ollama API call — avoids hitting request size limits on large PDFs
|
|
EMBED_BATCH_SIZE = 64
|
|
|
|
|
|
def _update_status(
|
|
conn: sqlite3.Connection,
|
|
doc_id: str,
|
|
status: str,
|
|
page_count: int | None = None,
|
|
error_msg: str | None = None,
|
|
) -> None:
|
|
if page_count is not None:
|
|
conn.execute(
|
|
"UPDATE documents SET status=?, page_count=?, updated_at=datetime('now') WHERE id=?",
|
|
[status, page_count, doc_id],
|
|
)
|
|
elif error_msg is not None:
|
|
conn.execute(
|
|
"UPDATE documents SET status=?, error_msg=?, updated_at=datetime('now') WHERE id=?",
|
|
[status, error_msg, doc_id],
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"UPDATE documents SET status=?, updated_at=datetime('now') WHERE id=?",
|
|
[status, doc_id],
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def run(doc_id: str, file_path: str, db_path: str, vec_db_path: str) -> None:
|
|
"""Run the full ingest pipeline for one PDF. Called by cf-orch or BackgroundTasks."""
|
|
from circuitforge_core.documents.pdf import PDFExtractor
|
|
|
|
conn: sqlite3.Connection | None = None
|
|
try:
|
|
conn = sqlite3.connect(db_path)
|
|
conn.execute("PRAGMA foreign_keys = ON")
|
|
_update_status(conn, doc_id, "processing")
|
|
|
|
# Step 1: Extract page chunks
|
|
logger.info("Extracting text from %s", file_path)
|
|
extractor = PDFExtractor(ocr_min_words=10)
|
|
chunks = extractor.chunk_pages(file_path)
|
|
logger.info("Extracted %d pages", len(chunks))
|
|
|
|
# Step 2: Store chunks (replace any existing for this doc)
|
|
conn.execute("DELETE FROM page_chunks WHERE doc_id=?", [doc_id])
|
|
chunk_rows: list[tuple[str, int, str]] = []
|
|
for chunk in chunks:
|
|
row = conn.execute(
|
|
"""INSERT INTO page_chunks(doc_id, page_number, text, source, word_count)
|
|
VALUES (?,?,?,?,?) RETURNING id""",
|
|
[doc_id, chunk.page_number, chunk.text, chunk.source, chunk.word_count],
|
|
).fetchone()
|
|
chunk_rows.append((row[0], chunk.page_number, chunk.text))
|
|
conn.commit()
|
|
|
|
# Step 3: Embed and store vectors if Ollama is configured (BYOK gate)
|
|
ollama_url = os.environ.get("PAGEPIPER_OLLAMA_URL", "").strip()
|
|
if ollama_url and chunks:
|
|
logger.info("Embedding %d pages via Ollama at %s", len(chunks), ollama_url)
|
|
from circuitforge_core.llm import LLMRouter
|
|
from circuitforge_core.vector.sqlite_vec import LocalSQLiteVecStore
|
|
|
|
_clean = ollama_url.rstrip("/")
|
|
base_url = _clean if _clean.endswith("/v1") else _clean + "/v1"
|
|
router = LLMRouter({
|
|
"fallback_order": ["ollama"],
|
|
"backends": {
|
|
"ollama": {
|
|
"type": "openai_compat",
|
|
"base_url": base_url,
|
|
"model": os.environ.get("PAGEPIPER_CHAT_MODEL", "mistral:7b"),
|
|
"embedding_model": os.environ.get(
|
|
"PAGEPIPER_EMBED_MODEL", "nomic-embed-text"
|
|
),
|
|
"supports_images": False,
|
|
}
|
|
},
|
|
})
|
|
vec_store = LocalSQLiteVecStore(
|
|
db_path=vec_db_path, table="page_vecs", dimensions=768
|
|
)
|
|
# Remove old vectors before re-inserting. If embedding fails mid-way,
|
|
# old vectors are gone but new ones are partial — re-ingest recovers.
|
|
vec_store.delete_where({"doc_id": doc_id})
|
|
|
|
texts = [text for _, _, text in chunk_rows]
|
|
vectors: list[list[float]] = []
|
|
for i in range(0, len(texts), EMBED_BATCH_SIZE):
|
|
vectors.extend(router.embed(texts[i : i + EMBED_BATCH_SIZE]))
|
|
|
|
for (chunk_id, page_number, _), vector in zip(chunk_rows, vectors):
|
|
vec_store.upsert(
|
|
id=chunk_id,
|
|
vector=vector,
|
|
metadata={"doc_id": doc_id, "page_number": page_number},
|
|
)
|
|
logger.info("Stored %d embeddings", len(vectors))
|
|
|
|
_update_status(conn, doc_id, "ready", page_count=len(chunks))
|
|
logger.info("Ingest complete for doc %s (%d pages)", doc_id, len(chunks))
|
|
|
|
except Exception as exc:
|
|
logger.error("Ingest failed for doc %s: %s", doc_id, exc, exc_info=True)
|
|
if conn is not None:
|
|
try:
|
|
_update_status(conn, doc_id, "error", error_msg=str(exc))
|
|
except Exception:
|
|
logger.warning("Could not write error status for doc %s", doc_id)
|
|
raise
|
|
finally:
|
|
if conn is not None:
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Ingest a PDF (cf-orch task entry point)"
|
|
)
|
|
parser.add_argument("--doc-id", required=True)
|
|
parser.add_argument("--file-path", required=True)
|
|
parser.add_argument("--db-path", required=True)
|
|
parser.add_argument("--vec-db-path", required=True)
|
|
a = parser.parse_args()
|
|
run(
|
|
doc_id=a.doc_id,
|
|
file_path=a.file_path,
|
|
db_path=a.db_path,
|
|
vec_db_path=a.vec_db_path,
|
|
)
|