Returns up to N randomly sampled page chunks (default 50, max 200) with chunk_id, doc_id, page_number, and text fields. No tier gate — internal tool endpoint for same-host corpus benchmarking. Returns [] on empty library.
105 lines
3.5 KiB
Python
105 lines
3.5 KiB
Python
# tests/test_library_api.py
|
|
"""Tests for GET/POST /api/library endpoints."""
|
|
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
|
|
|
|
def _add_doc(db_path: str, title: str, path: str, status: str = "ready") -> str:
|
|
conn = sqlite3.connect(db_path)
|
|
doc_id = conn.execute(
|
|
"INSERT INTO documents(title, file_path, status) VALUES (?,?,?) RETURNING id",
|
|
[title, path, status],
|
|
).fetchone()[0]
|
|
conn.commit()
|
|
conn.close()
|
|
return doc_id
|
|
|
|
|
|
def test_list_library_empty(client):
|
|
resp = client.get("/api/library")
|
|
assert resp.status_code == 200
|
|
assert resp.json() == []
|
|
|
|
|
|
def test_list_library_returns_documents(client, test_db):
|
|
_add_doc(test_db, "Player's Handbook", "/books/phb.pdf")
|
|
resp = client.get("/api/library")
|
|
assert resp.status_code == 200
|
|
docs = resp.json()
|
|
assert len(docs) == 1
|
|
assert docs[0]["title"] == "Player's Handbook"
|
|
assert "status" in docs[0]
|
|
|
|
|
|
def test_delete_document_removes_record(client, test_db):
|
|
doc_id = _add_doc(test_db, "Monster Manual", "/books/mm.pdf")
|
|
resp = client.delete(f"/api/library/{doc_id}")
|
|
assert resp.status_code == 204
|
|
resp2 = client.get("/api/library")
|
|
assert resp2.json() == []
|
|
|
|
|
|
def test_delete_nonexistent_returns_404(client):
|
|
resp = client.delete("/api/library/does-not-exist")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
def test_reingest_returns_task_id(client, test_db, tmp_path):
|
|
pdf_path = str(tmp_path / "books" / "test.pdf")
|
|
open(pdf_path, "wb").write(b"%PDF-1.4")
|
|
doc_id = _add_doc(test_db, "Test Book", pdf_path)
|
|
resp = client.post(f"/api/library/{doc_id}/reingest")
|
|
assert resp.status_code == 202
|
|
assert "task_id" in resp.json()
|
|
|
|
|
|
def test_reingest_updates_status_to_processing(client, test_db, tmp_path):
|
|
from pathlib import Path
|
|
pdf_path = str(tmp_path / "books" / "dm_guide.pdf")
|
|
Path(pdf_path).write_bytes(b"%PDF-1.4 empty fixture")
|
|
doc_id = _add_doc(test_db, "DM Guide", pdf_path)
|
|
|
|
resp = client.post(f"/api/library/{doc_id}/reingest")
|
|
assert resp.status_code == 202
|
|
|
|
# Document should be in processing state (or beyond if stub ingest ran instantly)
|
|
status_resp = client.get(f"/api/library/{doc_id}/status")
|
|
assert status_resp.json()["status"] in ("processing", "error", "ready")
|
|
|
|
|
|
def _add_chunks(db_path: str, doc_id: str, count: int) -> None:
|
|
conn = sqlite3.connect(db_path)
|
|
for i in range(count):
|
|
conn.execute(
|
|
"INSERT INTO page_chunks(doc_id, page_number, text, source, word_count) VALUES (?,?,?,?,?)",
|
|
[doc_id, i + 1, f"Page {i + 1} text content.", "text_layer", 4],
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def test_sample_chunks_empty_returns_empty(client):
|
|
resp = client.get("/api/library/sample-chunks")
|
|
assert resp.status_code == 200
|
|
assert resp.json() == []
|
|
|
|
|
|
def test_sample_chunks_returns_fields(client, test_db):
|
|
doc_id = _add_doc(test_db, "Monster Manual", "/books/mm.pdf")
|
|
_add_chunks(test_db, doc_id, 5)
|
|
resp = client.get("/api/library/sample-chunks?limit=3")
|
|
assert resp.status_code == 200
|
|
chunks = resp.json()
|
|
assert len(chunks) == 3
|
|
for c in chunks:
|
|
assert {"chunk_id", "doc_id", "page_number", "text"} == set(c.keys())
|
|
assert c["doc_id"] == doc_id
|
|
|
|
|
|
def test_sample_chunks_limit_capped_at_200(client, test_db):
|
|
doc_id = _add_doc(test_db, "Big Book", "/books/big.pdf")
|
|
_add_chunks(test_db, doc_id, 10)
|
|
resp = client.get("/api/library/sample-chunks?limit=9999")
|
|
assert resp.status_code == 200
|
|
assert len(resp.json()) <= 200
|