pagepiper/app/services/retriever.py
pyr0ball e52bdb5128 feat: RAG retrieval quality, artifact cleaning, and ingestion progress UI
Retrieval:
- Add _fetch_adjacent() to retriever: fetches page ± 1 chunks from DB
  after ranking so mid-sentence EPUB chunk boundaries don't lose context
- Fix vec DB doc-filter: oversample to top_k*20 before Python filter
  instead of post-filtering an already-small global pool (fixes wrong-book
  results when searching within a single document)
- top_k default 5 → 10; context per chunk 500 → 1500 chars; citation
  snippet 200 → 400 chars

Artifact cleaning:
- Add scripts/text_clean.py: strips ABC Amber LIT Converter watermarks,
  processtext.com URLs, bare page numbers, piracy stamps from extracted text
- Wire clean_paragraph() into ingest_pdf.py and new ingest_epub.py

Startup validation:
- _check_vec_schema() at boot: detects embedding dimension mismatch,
  deletes stale vec DB, and queues sequential re-embed in background thread
- Sequential _reembed_docs() prevents SQLite lock races on startup re-embed

cf-orch integration:
- Wire CF_ORCH_URL / CF_LICENSE_KEY into LLMRouter backend config so
  allocate() fires and keeps the Ollama model warm between requests

Ingestion progress UI:
- GET /api/library/{doc_id}/status now returns vec_count from page_vecs_meta
- DocumentCard.vue polls status every 3 s while processing and shows
  two-phase progress: indeterminate animation during extraction,
  determinate "Embedding N/M pages" bar once vectors start landing

Other:
- Chat feedback endpoint + thumbs up/down UI (FeedbackButton.vue)
- EPUB ingest script (ingest_epub.py) with heading-based chunking
- migration 002: chat_feedback table
- README.md with setup and feature overview
2026-05-06 08:25:58 -07:00

193 lines
6.7 KiB
Python

# app/services/retriever.py
"""
Hybrid BM25 + semantic retriever.
BSL 1.1 — semantic path requires PAGEPIPER_OLLAMA_URL (BYOK gate).
BM25-only path is MIT and has no gate.
"""
from __future__ import annotations
import logging
import sqlite3
from dataclasses import dataclass
from app.services.bm25_index import BM25Index
logger = logging.getLogger(__name__)
def _fetch_adjacent(
hits: list["RetrievedChunk"],
db_path: str,
window: int = 1,
) -> list["RetrievedChunk"]:
"""Return chunks immediately before/after each hit that aren't already in the hit set.
Definitional passages often start mid-sentence because the EPUB/PDF chunk
boundary fell mid-paragraph. Fetching the preceding chunk restores the subject
so the LLM can understand 'them' / 'they' references correctly.
"""
if not hits:
return []
existing_keys = {(c.doc_id, c.page_number) for c in hits}
needed: dict[str, set[int]] = {}
for c in hits:
for delta in range(-window, window + 1):
if delta == 0:
continue
adj_page = c.page_number + delta
if adj_page > 0 and (c.doc_id, adj_page) not in existing_keys:
needed.setdefault(c.doc_id, set()).add(adj_page)
if not needed:
return []
extra: list[RetrievedChunk] = []
try:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
for doc_id, pages in needed.items():
placeholders = ",".join("?" * len(pages))
rows = conn.execute(
f"SELECT id, doc_id, page_number, text FROM page_chunks "
f"WHERE doc_id=? AND page_number IN ({placeholders})",
[doc_id] + sorted(pages),
).fetchall()
for row in rows:
extra.append(
RetrievedChunk(
chunk_id=row["id"],
doc_id=row["doc_id"],
page_number=row["page_number"],
text=row["text"],
bm25_score=0.0,
vector_score=None,
)
)
conn.close()
except Exception as exc:
logger.warning("Context expansion query failed (non-fatal): %s", exc)
return extra
@dataclass(frozen=True)
class RetrievedChunk:
"""A chunk returned by the retriever, with source scores."""
chunk_id: str
doc_id: str
page_number: int
text: str
bm25_score: float
vector_score: float | None
class Retriever:
def __init__(self, bm25: BM25Index) -> None:
self._bm25 = bm25
def hybrid_search(
self,
query: str,
top_k: int,
doc_ids: list[str] | None,
db_path: str,
vec_db_path: str,
llm, # LLMRouter | None — caller must pass
) -> list[RetrievedChunk]:
"""
Merge BM25 and semantic results.
Falls back to BM25-only if llm is None.
"""
if llm is None:
return self._bm25_only(query, top_k, doc_ids, db_path)
from circuitforge_core.vector.sqlite_vec import LocalSQLiteVecStore
self._bm25.ensure_fresh(db_path)
bm25_hits = {
r.chunk_id: r
for r in self._bm25.query(query, top_k=top_k * 2, doc_ids=doc_ids)
}
try:
vec = llm.embed([query])[0]
except Exception as exc:
logger.warning("Embed failed, falling back to BM25-only: %s", exc)
return self._bm25_only(query, top_k, doc_ids, db_path)
from app.config import VEC_DIMENSIONS
store = LocalSQLiteVecStore(db_path=vec_db_path, table="page_vecs", dimensions=VEC_DIMENSIONS)
# sqlite-vec applies filter_metadata as a Python post-filter after fetching k
# nearest globally. When the corpus spans many documents and only a subset is
# selected, most of those k candidates are from non-target docs and get dropped,
# leaving too few vector hits. Oversample heavily and filter in Python instead.
if doc_ids:
vec_candidates = store.query(vec, top_k=top_k * 20)
vec_hits = [h for h in vec_candidates if h.metadata.get("doc_id") in doc_ids]
else:
vec_hits = store.query(vec, top_k=top_k * 2)
# Merge: BM25 hits take priority; vector hits fill in additional results
merged: dict[str, RetrievedChunk] = {}
for cid, r in bm25_hits.items():
merged[cid] = RetrievedChunk(
chunk_id=cid,
doc_id=r.doc_id,
page_number=r.page_number,
text=r.text,
bm25_score=r.score,
vector_score=None,
)
for vh in vec_hits:
# _chunks is the loaded list of dicts from BM25Index; no public accessor exists
text = next((c["text"] for c in self._bm25._chunks if c["id"] == vh.entry_id), "")
if vh.entry_id in merged:
existing = merged[vh.entry_id]
merged[vh.entry_id] = RetrievedChunk(
chunk_id=existing.chunk_id,
doc_id=existing.doc_id,
page_number=existing.page_number,
text=existing.text,
bm25_score=existing.bm25_score,
vector_score=vh.score,
)
else:
merged[vh.entry_id] = RetrievedChunk(
chunk_id=vh.entry_id,
doc_id=vh.metadata.get("doc_id", ""),
page_number=int(vh.metadata.get("page_number", 0)),
text=text,
bm25_score=0.0,
vector_score=vh.score,
)
def _combined(r: RetrievedChunk) -> float:
bm25 = r.bm25_score
# sqlite-vec returns L2 distance (lower=better); invert to [0,1] higher-is-better
vec = (1.0 / (1.0 + r.vector_score)) if r.vector_score is not None else 0.0
return bm25 * 0.5 + vec * 0.5
ranked = sorted(merged.values(), key=_combined, reverse=True)[:top_k]
adjacent = _fetch_adjacent(ranked, db_path)
return ranked + adjacent
def _bm25_only(
self, query: str, top_k: int, doc_ids: list[str] | None, db_path: str
) -> list[RetrievedChunk]:
self._bm25.ensure_fresh(db_path)
hits = [
RetrievedChunk(
chunk_id=r.chunk_id,
doc_id=r.doc_id,
page_number=r.page_number,
text=r.text,
bm25_score=r.score,
vector_score=None,
)
for r in self._bm25.query(query, top_k=top_k, doc_ids=doc_ids)
]
adjacent = _fetch_adjacent(hits, db_path)
return hits + adjacent