# tests/test_library_api.py """Tests for GET/POST /api/library endpoints.""" from __future__ import annotations import sqlite3 def _add_doc(db_path: str, title: str, path: str, status: str = "ready") -> str: conn = sqlite3.connect(db_path) doc_id = conn.execute( "INSERT INTO documents(title, file_path, status) VALUES (?,?,?) RETURNING id", [title, path, status], ).fetchone()[0] conn.commit() conn.close() return doc_id def test_list_library_empty(client): resp = client.get("/api/library") assert resp.status_code == 200 assert resp.json() == [] def test_list_library_returns_documents(client, test_db): _add_doc(test_db, "Player's Handbook", "/books/phb.pdf") resp = client.get("/api/library") assert resp.status_code == 200 docs = resp.json() assert len(docs) == 1 assert docs[0]["title"] == "Player's Handbook" assert "status" in docs[0] def test_delete_document_removes_record(client, test_db): doc_id = _add_doc(test_db, "Monster Manual", "/books/mm.pdf") resp = client.delete(f"/api/library/{doc_id}") assert resp.status_code == 204 resp2 = client.get("/api/library") assert resp2.json() == [] def test_delete_nonexistent_returns_404(client): resp = client.delete("/api/library/does-not-exist") assert resp.status_code == 404 def test_reingest_returns_task_id(client, test_db, tmp_path): pdf_path = str(tmp_path / "books" / "test.pdf") open(pdf_path, "wb").write(b"%PDF-1.4") doc_id = _add_doc(test_db, "Test Book", pdf_path) resp = client.post(f"/api/library/{doc_id}/reingest") assert resp.status_code == 202 assert "task_id" in resp.json() def test_reingest_updates_status_to_processing(client, test_db, tmp_path): from pathlib import Path pdf_path = str(tmp_path / "books" / "dm_guide.pdf") Path(pdf_path).write_bytes(b"%PDF-1.4 empty fixture") doc_id = _add_doc(test_db, "DM Guide", pdf_path) resp = client.post(f"/api/library/{doc_id}/reingest") assert resp.status_code == 202 # Document should be in processing state (or beyond if stub ingest ran instantly) status_resp = client.get(f"/api/library/{doc_id}/status") assert status_resp.json()["status"] in ("processing", "error", "ready") def _add_chunks(db_path: str, doc_id: str, count: int) -> None: conn = sqlite3.connect(db_path) for i in range(count): conn.execute( "INSERT INTO page_chunks(doc_id, page_number, text, source, word_count) VALUES (?,?,?,?,?)", [doc_id, i + 1, f"Page {i + 1} text content.", "text_layer", 4], ) conn.commit() conn.close() def test_sample_chunks_empty_returns_empty(client): resp = client.get("/api/library/sample-chunks") assert resp.status_code == 200 assert resp.json() == [] def test_sample_chunks_returns_fields(client, test_db): doc_id = _add_doc(test_db, "Monster Manual", "/books/mm.pdf") _add_chunks(test_db, doc_id, 5) resp = client.get("/api/library/sample-chunks?limit=3") assert resp.status_code == 200 chunks = resp.json() assert len(chunks) == 3 for c in chunks: assert {"chunk_id", "doc_id", "page_number", "text"} == set(c.keys()) assert c["doc_id"] == doc_id def test_sample_chunks_limit_capped_at_200(client, test_db): doc_id = _add_doc(test_db, "Big Book", "/books/big.pdf") _add_chunks(test_db, doc_id, 10) resp = client.get("/api/library/sample-chunks?limit=9999") assert resp.status_code == 200 assert len(resp.json()) <= 200