"""Context fact and document CRUD — MIT licensed.""" from __future__ import annotations import sqlite3 import uuid from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path @dataclass(frozen=True) class ContextFact: id: str category: str key: str value: str source: str | None created_at: str @dataclass(frozen=True) class ContextDocument: id: str filename: str doc_type: str full_text: str file_size: int | None uploaded_at: str def _connect(db_path: Path) -> sqlite3.Connection: conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") conn.row_factory = sqlite3.Row return conn def add_fact(db_path: Path, category: str, key: str, value: str, source: str | None = None) -> ContextFact: fact = ContextFact( id=str(uuid.uuid4()), category=category, key=key, value=value, source=source, created_at=datetime.now(timezone.utc).isoformat(), ) conn = _connect(db_path) conn.execute( "INSERT INTO context_facts(id, category, key, value, source, created_at) VALUES (?,?,?,?,?,?)", (fact.id, fact.category, fact.key, fact.value, fact.source, fact.created_at), ) conn.commit() conn.close() return fact def list_facts(db_path: Path, category: str | None = None) -> list[ContextFact]: conn = _connect(db_path) if category: rows = conn.execute( "SELECT * FROM context_facts WHERE category=? ORDER BY created_at", (category,) ).fetchall() else: rows = conn.execute( "SELECT * FROM context_facts ORDER BY category, created_at" ).fetchall() conn.close() return [ ContextFact( id=r["id"], category=r["category"], key=r["key"], value=r["value"], source=r["source"], created_at=r["created_at"], ) for r in rows ] def delete_fact(db_path: Path, fact_id: str) -> bool: conn = _connect(db_path) cursor = conn.execute("DELETE FROM context_facts WHERE id=?", (fact_id,)) conn.commit() conn.close() return cursor.rowcount > 0 def add_document( db_path: Path, filename: str, doc_type: str, full_text: str, file_size: int | None = None, ) -> ContextDocument: doc = ContextDocument( id=str(uuid.uuid4()), filename=filename, doc_type=doc_type, full_text=full_text, file_size=file_size, uploaded_at=datetime.now(timezone.utc).isoformat(), ) conn = _connect(db_path) conn.execute( "INSERT INTO context_documents(id, filename, doc_type, full_text, file_size, uploaded_at)" " VALUES (?,?,?,?,?,?)", (doc.id, doc.filename, doc.doc_type, doc.full_text, doc.file_size, doc.uploaded_at), ) conn.commit() conn.close() return doc def list_documents(db_path: Path) -> list[ContextDocument]: conn = _connect(db_path) rows = conn.execute( "SELECT id, filename, doc_type, full_text, file_size, uploaded_at" " FROM context_documents ORDER BY uploaded_at DESC" ).fetchall() conn.close() return [ ContextDocument( id=r["id"], filename=r["filename"], doc_type=r["doc_type"], full_text=r["full_text"], file_size=r["file_size"], uploaded_at=r["uploaded_at"], ) for r in rows ] def delete_document(db_path: Path, doc_id: str) -> bool: conn = _connect(db_path) cursor = conn.execute("DELETE FROM context_documents WHERE id=?", (doc_id,)) conn.commit() conn.close() return cursor.rowcount > 0