pagepiper/scripts/ingest_epub.py
pyr0ball e52bdb5128 feat: RAG retrieval quality, artifact cleaning, and ingestion progress UI
Retrieval:
- Add _fetch_adjacent() to retriever: fetches page ± 1 chunks from DB
  after ranking so mid-sentence EPUB chunk boundaries don't lose context
- Fix vec DB doc-filter: oversample to top_k*20 before Python filter
  instead of post-filtering an already-small global pool (fixes wrong-book
  results when searching within a single document)
- top_k default 5 → 10; context per chunk 500 → 1500 chars; citation
  snippet 200 → 400 chars

Artifact cleaning:
- Add scripts/text_clean.py: strips ABC Amber LIT Converter watermarks,
  processtext.com URLs, bare page numbers, piracy stamps from extracted text
- Wire clean_paragraph() into ingest_pdf.py and new ingest_epub.py

Startup validation:
- _check_vec_schema() at boot: detects embedding dimension mismatch,
  deletes stale vec DB, and queues sequential re-embed in background thread
- Sequential _reembed_docs() prevents SQLite lock races on startup re-embed

cf-orch integration:
- Wire CF_ORCH_URL / CF_LICENSE_KEY into LLMRouter backend config so
  allocate() fires and keeps the Ollama model warm between requests

Ingestion progress UI:
- GET /api/library/{doc_id}/status now returns vec_count from page_vecs_meta
- DocumentCard.vue polls status every 3 s while processing and shows
  two-phase progress: indeterminate animation during extraction,
  determinate "Embedding N/M pages" bar once vectors start landing

Other:
- Chat feedback endpoint + thumbs up/down UI (FeedbackButton.vue)
- EPUB ingest script (ingest_epub.py) with heading-based chunking
- migration 002: chat_feedback table
- README.md with setup and feature overview
2026-05-06 08:25:58 -07:00

239 lines
8.8 KiB
Python

# scripts/ingest_epub.py
"""
cf-orch task: pagepiper/ingest_epub
Extracts text from an EPUB file, stores chapter chunks in SQLite, and (if Ollama is
configured) generates embeddings and stores them in the sqlite-vec store.
Each EPUB chapter becomes one chunk (equivalent to a PDF page).
Entry point:
python scripts/ingest_epub.py --doc-id X --file-path Y --db-path Z --vec-db-path W
"""
from __future__ import annotations
import logging
import os
import sqlite3
from dataclasses import dataclass
from pathlib import Path
logger = logging.getLogger("pagepiper.ingest_epub")
EMBED_BATCH_SIZE = 64
_WORDS_PER_CHUNK = 500 # target chunk size for word-count fallback
@dataclass
class _Chunk:
page_number: int
text: str
source: str
word_count: int
def _paragraphs_from_soup(soup) -> list[str]:
"""Extract non-trivial, artifact-free text lines from parsed HTML."""
from scripts.text_clean import filter_paragraphs
raw = soup.get_text(separator="\n", strip=True)
return filter_paragraphs(raw.splitlines())
def _chunks_from_paragraphs(paragraphs: list[str], start_num: int) -> list[_Chunk]:
"""Accumulate paragraphs into ~_WORDS_PER_CHUNK-word chunks."""
chunks: list[_Chunk] = []
current: list[str] = []
current_count = 0
chunk_num = start_num
for para in paragraphs:
words = para.split()
if current_count + len(words) > _WORDS_PER_CHUNK and current:
text = "\n".join(current)
chunks.append(_Chunk(chunk_num, text, "text", current_count))
chunk_num += 1
current, current_count = [], 0
current.append(para)
current_count += len(words)
if current:
text = "\n".join(current)
chunks.append(_Chunk(chunk_num, text, "text", current_count))
return chunks
def _extract_chunks(file_path: str) -> list[_Chunk]:
import ebooklib
from ebooklib import epub
from bs4 import BeautifulSoup
from scripts.text_clean import clean_line, is_artifact_line
book = epub.read_epub(file_path, options={"ignore_ncx": True})
all_chunks: list[_Chunk] = []
for item in book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
soup = BeautifulSoup(item.get_content(), "html.parser")
headings = soup.find_all(["h1", "h2", "h3", "h4"])
if len(headings) >= 2:
# Heading-based split: one chunk per section
current_parts: list[str] = []
for elem in soup.find_all(["h1", "h2", "h3", "h4", "p", "li", "blockquote"]):
if elem.name in ("h1", "h2", "h3", "h4"):
if current_parts:
text = "\n".join(current_parts).strip()
if text:
n = len(all_chunks) + 1
all_chunks.append(_Chunk(n, text, "text", len(text.split())))
current_parts = [elem.get_text(" ", strip=True)]
else:
t = clean_line(elem.get_text(" ", strip=True))
if t and not is_artifact_line(t):
current_parts.append(t)
if current_parts:
text = "\n".join(current_parts).strip()
if text:
n = len(all_chunks) + 1
all_chunks.append(_Chunk(n, text, "text", len(text.split())))
else:
# Word-count fallback: accumulate paragraphs into ~500-word chunks
paragraphs = _paragraphs_from_soup(soup)
if paragraphs:
all_chunks.extend(_chunks_from_paragraphs(paragraphs, len(all_chunks) + 1))
return all_chunks
def _update_status(
conn: sqlite3.Connection,
doc_id: str,
status: str,
page_count: int | None = None,
error_msg: str | None = None,
) -> None:
if page_count is not None:
conn.execute(
"UPDATE documents SET status=?, page_count=?, updated_at=datetime('now') WHERE id=?",
[status, page_count, doc_id],
)
elif error_msg is not None:
conn.execute(
"UPDATE documents SET status=?, error_msg=?, updated_at=datetime('now') WHERE id=?",
[status, error_msg, doc_id],
)
else:
conn.execute(
"UPDATE documents SET status=?, updated_at=datetime('now') WHERE id=?",
[status, doc_id],
)
conn.commit()
def run(doc_id: str, file_path: str, db_path: str, vec_db_path: str) -> None:
"""Run the full ingest pipeline for one EPUB. Called by cf-orch or BackgroundTasks."""
conn: sqlite3.Connection | None = None
try:
conn = sqlite3.connect(db_path, timeout=30)
conn.execute("PRAGMA journal_mode = WAL")
conn.execute("PRAGMA foreign_keys = ON")
_update_status(conn, doc_id, "processing")
logger.info("Extracting chapters from %s", file_path)
chunks = _extract_chunks(file_path)
logger.info("Extracted %d chapters", len(chunks))
conn.execute("DELETE FROM page_chunks WHERE doc_id=?", [doc_id])
chunk_rows: list[tuple[str, int, str]] = []
for chunk in chunks:
row = conn.execute(
"""INSERT INTO page_chunks(doc_id, page_number, text, source, word_count)
VALUES (?,?,?,?,?) RETURNING id""",
[doc_id, chunk.page_number, chunk.text, chunk.source, chunk.word_count],
).fetchone()
chunk_rows.append((row[0], chunk.page_number, chunk.text))
conn.commit()
# Embedding failure is non-fatal: document remains BM25-searchable.
ollama_url = os.environ.get("PAGEPIPER_OLLAMA_URL", "").strip()
if ollama_url and chunks:
try:
logger.info("Embedding %d chapters via Ollama at %s", len(chunks), ollama_url)
from circuitforge_core.llm import LLMRouter
from circuitforge_core.vector.sqlite_vec import LocalSQLiteVecStore
_clean = ollama_url.rstrip("/")
base_url = _clean if _clean.endswith("/v1") else _clean + "/v1"
router = LLMRouter({
"fallback_order": ["ollama"],
"backends": {
"ollama": {
"type": "openai_compat",
"base_url": base_url,
"model": os.environ.get("PAGEPIPER_CHAT_MODEL", "mistral:7b"),
"embedding_model": os.environ.get(
"PAGEPIPER_EMBED_MODEL", "nomic-embed-text"
),
"supports_images": False,
}
},
})
embed_dims = int(os.environ.get("PAGEPIPER_EMBED_DIMS", "1024"))
vec_store = LocalSQLiteVecStore(
db_path=vec_db_path, table="page_vecs", dimensions=embed_dims
)
vec_store.delete_where({"doc_id": doc_id})
texts = [text for _, _, text in chunk_rows]
vectors: list[list[float]] = []
for i in range(0, len(texts), EMBED_BATCH_SIZE):
vectors.extend(router.embed(texts[i : i + EMBED_BATCH_SIZE]))
for (chunk_id, page_number, _), vector in zip(chunk_rows, vectors):
vec_store.upsert(
entry_id=chunk_id,
vector=vector,
metadata={"doc_id": doc_id, "page_number": page_number},
)
logger.info("Stored %d embeddings", len(vectors))
except Exception as embed_exc:
logger.warning(
"Embedding skipped for doc %s — BM25 only (reason: %s)",
doc_id, embed_exc,
)
_update_status(conn, doc_id, "ready", page_count=len(chunks))
logger.info("Ingest complete for doc %s (%d chapters)", doc_id, len(chunks))
except Exception as exc:
logger.error("Ingest failed for doc %s: %s", doc_id, exc, exc_info=True)
if conn is not None:
try:
_update_status(conn, doc_id, "error", error_msg=str(exc))
except Exception:
logger.warning("Could not write error status for doc %s", doc_id)
raise
finally:
if conn is not None:
conn.close()
if __name__ == "__main__":
import argparse
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(
description="Ingest an EPUB (cf-orch task entry point)"
)
parser.add_argument("--doc-id", required=True)
parser.add_argument("--file-path", required=True)
parser.add_argument("--db-path", required=True)
parser.add_argument("--vec-db-path", required=True)
a = parser.parse_args()
run(
doc_id=a.doc_id,
file_path=a.file_path,
db_path=a.db_path,
vec_db_path=a.vec_db_path,
)