Retrieval:
- Add _fetch_adjacent() to retriever: fetches page ± 1 chunks from DB
after ranking so mid-sentence EPUB chunk boundaries don't lose context
- Fix vec DB doc-filter: oversample to top_k*20 before Python filter
instead of post-filtering an already-small global pool (fixes wrong-book
results when searching within a single document)
- top_k default 5 → 10; context per chunk 500 → 1500 chars; citation
snippet 200 → 400 chars
Artifact cleaning:
- Add scripts/text_clean.py: strips ABC Amber LIT Converter watermarks,
processtext.com URLs, bare page numbers, piracy stamps from extracted text
- Wire clean_paragraph() into ingest_pdf.py and new ingest_epub.py
Startup validation:
- _check_vec_schema() at boot: detects embedding dimension mismatch,
deletes stale vec DB, and queues sequential re-embed in background thread
- Sequential _reembed_docs() prevents SQLite lock races on startup re-embed
cf-orch integration:
- Wire CF_ORCH_URL / CF_LICENSE_KEY into LLMRouter backend config so
allocate() fires and keeps the Ollama model warm between requests
Ingestion progress UI:
- GET /api/library/{doc_id}/status now returns vec_count from page_vecs_meta
- DocumentCard.vue polls status every 3 s while processing and shows
two-phase progress: indeterminate animation during extraction,
determinate "Embedding N/M pages" bar once vectors start landing
Other:
- Chat feedback endpoint + thumbs up/down UI (FeedbackButton.vue)
- EPUB ingest script (ingest_epub.py) with heading-based chunking
- migration 002: chat_feedback table
- README.md with setup and feature overview
193 lines
6.7 KiB
Python
193 lines
6.7 KiB
Python
# app/services/retriever.py
|
|
"""
|
|
Hybrid BM25 + semantic retriever.
|
|
|
|
BSL 1.1 — semantic path requires PAGEPIPER_OLLAMA_URL (BYOK gate).
|
|
BM25-only path is MIT and has no gate.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import sqlite3
|
|
from dataclasses import dataclass
|
|
|
|
from app.services.bm25_index import BM25Index
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _fetch_adjacent(
|
|
hits: list["RetrievedChunk"],
|
|
db_path: str,
|
|
window: int = 1,
|
|
) -> list["RetrievedChunk"]:
|
|
"""Return chunks immediately before/after each hit that aren't already in the hit set.
|
|
|
|
Definitional passages often start mid-sentence because the EPUB/PDF chunk
|
|
boundary fell mid-paragraph. Fetching the preceding chunk restores the subject
|
|
so the LLM can understand 'them' / 'they' references correctly.
|
|
"""
|
|
if not hits:
|
|
return []
|
|
|
|
existing_keys = {(c.doc_id, c.page_number) for c in hits}
|
|
needed: dict[str, set[int]] = {}
|
|
for c in hits:
|
|
for delta in range(-window, window + 1):
|
|
if delta == 0:
|
|
continue
|
|
adj_page = c.page_number + delta
|
|
if adj_page > 0 and (c.doc_id, adj_page) not in existing_keys:
|
|
needed.setdefault(c.doc_id, set()).add(adj_page)
|
|
|
|
if not needed:
|
|
return []
|
|
|
|
extra: list[RetrievedChunk] = []
|
|
try:
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
for doc_id, pages in needed.items():
|
|
placeholders = ",".join("?" * len(pages))
|
|
rows = conn.execute(
|
|
f"SELECT id, doc_id, page_number, text FROM page_chunks "
|
|
f"WHERE doc_id=? AND page_number IN ({placeholders})",
|
|
[doc_id] + sorted(pages),
|
|
).fetchall()
|
|
for row in rows:
|
|
extra.append(
|
|
RetrievedChunk(
|
|
chunk_id=row["id"],
|
|
doc_id=row["doc_id"],
|
|
page_number=row["page_number"],
|
|
text=row["text"],
|
|
bm25_score=0.0,
|
|
vector_score=None,
|
|
)
|
|
)
|
|
conn.close()
|
|
except Exception as exc:
|
|
logger.warning("Context expansion query failed (non-fatal): %s", exc)
|
|
|
|
return extra
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RetrievedChunk:
|
|
"""A chunk returned by the retriever, with source scores."""
|
|
|
|
chunk_id: str
|
|
doc_id: str
|
|
page_number: int
|
|
text: str
|
|
bm25_score: float
|
|
vector_score: float | None
|
|
|
|
|
|
class Retriever:
|
|
def __init__(self, bm25: BM25Index) -> None:
|
|
self._bm25 = bm25
|
|
|
|
def hybrid_search(
|
|
self,
|
|
query: str,
|
|
top_k: int,
|
|
doc_ids: list[str] | None,
|
|
db_path: str,
|
|
vec_db_path: str,
|
|
llm, # LLMRouter | None — caller must pass
|
|
) -> list[RetrievedChunk]:
|
|
"""
|
|
Merge BM25 and semantic results.
|
|
Falls back to BM25-only if llm is None.
|
|
"""
|
|
if llm is None:
|
|
return self._bm25_only(query, top_k, doc_ids, db_path)
|
|
|
|
from circuitforge_core.vector.sqlite_vec import LocalSQLiteVecStore
|
|
|
|
self._bm25.ensure_fresh(db_path)
|
|
bm25_hits = {
|
|
r.chunk_id: r
|
|
for r in self._bm25.query(query, top_k=top_k * 2, doc_ids=doc_ids)
|
|
}
|
|
|
|
try:
|
|
vec = llm.embed([query])[0]
|
|
except Exception as exc:
|
|
logger.warning("Embed failed, falling back to BM25-only: %s", exc)
|
|
return self._bm25_only(query, top_k, doc_ids, db_path)
|
|
from app.config import VEC_DIMENSIONS
|
|
store = LocalSQLiteVecStore(db_path=vec_db_path, table="page_vecs", dimensions=VEC_DIMENSIONS)
|
|
|
|
# sqlite-vec applies filter_metadata as a Python post-filter after fetching k
|
|
# nearest globally. When the corpus spans many documents and only a subset is
|
|
# selected, most of those k candidates are from non-target docs and get dropped,
|
|
# leaving too few vector hits. Oversample heavily and filter in Python instead.
|
|
if doc_ids:
|
|
vec_candidates = store.query(vec, top_k=top_k * 20)
|
|
vec_hits = [h for h in vec_candidates if h.metadata.get("doc_id") in doc_ids]
|
|
else:
|
|
vec_hits = store.query(vec, top_k=top_k * 2)
|
|
|
|
# Merge: BM25 hits take priority; vector hits fill in additional results
|
|
merged: dict[str, RetrievedChunk] = {}
|
|
for cid, r in bm25_hits.items():
|
|
merged[cid] = RetrievedChunk(
|
|
chunk_id=cid,
|
|
doc_id=r.doc_id,
|
|
page_number=r.page_number,
|
|
text=r.text,
|
|
bm25_score=r.score,
|
|
vector_score=None,
|
|
)
|
|
for vh in vec_hits:
|
|
# _chunks is the loaded list of dicts from BM25Index; no public accessor exists
|
|
text = next((c["text"] for c in self._bm25._chunks if c["id"] == vh.entry_id), "")
|
|
if vh.entry_id in merged:
|
|
existing = merged[vh.entry_id]
|
|
merged[vh.entry_id] = RetrievedChunk(
|
|
chunk_id=existing.chunk_id,
|
|
doc_id=existing.doc_id,
|
|
page_number=existing.page_number,
|
|
text=existing.text,
|
|
bm25_score=existing.bm25_score,
|
|
vector_score=vh.score,
|
|
)
|
|
else:
|
|
merged[vh.entry_id] = RetrievedChunk(
|
|
chunk_id=vh.entry_id,
|
|
doc_id=vh.metadata.get("doc_id", ""),
|
|
page_number=int(vh.metadata.get("page_number", 0)),
|
|
text=text,
|
|
bm25_score=0.0,
|
|
vector_score=vh.score,
|
|
)
|
|
|
|
def _combined(r: RetrievedChunk) -> float:
|
|
bm25 = r.bm25_score
|
|
# sqlite-vec returns L2 distance (lower=better); invert to [0,1] higher-is-better
|
|
vec = (1.0 / (1.0 + r.vector_score)) if r.vector_score is not None else 0.0
|
|
return bm25 * 0.5 + vec * 0.5
|
|
|
|
ranked = sorted(merged.values(), key=_combined, reverse=True)[:top_k]
|
|
adjacent = _fetch_adjacent(ranked, db_path)
|
|
return ranked + adjacent
|
|
|
|
def _bm25_only(
|
|
self, query: str, top_k: int, doc_ids: list[str] | None, db_path: str
|
|
) -> list[RetrievedChunk]:
|
|
self._bm25.ensure_fresh(db_path)
|
|
hits = [
|
|
RetrievedChunk(
|
|
chunk_id=r.chunk_id,
|
|
doc_id=r.doc_id,
|
|
page_number=r.page_number,
|
|
text=r.text,
|
|
bm25_score=r.score,
|
|
vector_score=None,
|
|
)
|
|
for r in self._bm25.query(query, top_k=top_k, doc_ids=doc_ids)
|
|
]
|
|
adjacent = _fetch_adjacent(hits, db_path)
|
|
return hits + adjacent
|