diff --git a/circuitforge_core/documents/__init__.py b/circuitforge_core/documents/__init__.py new file mode 100644 index 0000000..cbf6899 --- /dev/null +++ b/circuitforge_core/documents/__init__.py @@ -0,0 +1,18 @@ +""" +circuitforge_core.documents — shared document ingestion pipeline. + +Primary entry point:: + + from circuitforge_core.documents import ingest, StructuredDocument + + doc: StructuredDocument = ingest(image_bytes, hint="auto") +""" +from .ingest import ingest +from .models import Element, ParsedTable, StructuredDocument + +__all__ = [ + "ingest", + "Element", + "ParsedTable", + "StructuredDocument", +] diff --git a/circuitforge_core/documents/client.py b/circuitforge_core/documents/client.py new file mode 100644 index 0000000..d1c83ae --- /dev/null +++ b/circuitforge_core/documents/client.py @@ -0,0 +1,84 @@ +""" +circuitforge_core.documents.client — HTTP client for the cf-docuvision service. + +Thin wrapper around the cf-docuvision FastAPI service's POST /extract endpoint. +Used by ingest() as the primary path; callers should not use this directly. +""" +from __future__ import annotations + +import base64 +import logging +from typing import Any + +import requests + +from .models import Element, ParsedTable, StructuredDocument + +logger = logging.getLogger(__name__) + +_DEFAULT_TIMEOUT_S = 60 + + +class DocuvisionClient: + """Synchronous HTTP client for cf-docuvision. + + Args: + base_url: Root URL of the cf-docuvision service, e.g. 'http://localhost:8003' + timeout: Request timeout in seconds. + """ + + def __init__(self, base_url: str = "http://localhost:8003", timeout: int = _DEFAULT_TIMEOUT_S) -> None: + self.base_url = base_url.rstrip("/") + self.timeout = timeout + + def is_healthy(self) -> bool: + """Return True if the service responds to GET /health.""" + try: + resp = requests.get(f"{self.base_url}/health", timeout=3) + return resp.status_code == 200 + except Exception: + return False + + def extract(self, image_bytes: bytes, hint: str = "auto") -> StructuredDocument: + """ + Submit image bytes to cf-docuvision and return a StructuredDocument. + + Raises: + requests.HTTPError: if the service returns a non-2xx status. + requests.ConnectionError / requests.Timeout: if the service is unreachable. + """ + payload = { + "image_b64": base64.b64encode(image_bytes).decode(), + "hint": hint, + } + resp = requests.post( + f"{self.base_url}/extract", + json=payload, + timeout=self.timeout, + ) + resp.raise_for_status() + return _parse_response(resp.json()) + + +def _parse_response(data: dict[str, Any]) -> StructuredDocument: + elements = [ + Element( + type=e["type"], + text=e["text"], + bbox=tuple(e["bbox"]) if e.get("bbox") else None, + ) + for e in data.get("elements", []) + ] + tables = [ + ParsedTable( + html=t["html"], + bbox=tuple(t["bbox"]) if t.get("bbox") else None, + ) + for t in data.get("tables", []) + ] + return StructuredDocument( + elements=elements, + raw_text=data.get("raw_text", ""), + tables=tables, + metadata=data.get("metadata", {}), + ) diff --git a/circuitforge_core/documents/ingest.py b/circuitforge_core/documents/ingest.py new file mode 100644 index 0000000..8d2304b --- /dev/null +++ b/circuitforge_core/documents/ingest.py @@ -0,0 +1,137 @@ +""" +circuitforge_core.documents.ingest — public document ingestion entry point. + +Primary path: cf-docuvision HTTP service (Dolphin-v2, layout-aware). +Fallback path: LLMRouter vision call (lower fidelity, no layout/bbox). + +Usage:: + + from circuitforge_core.documents import ingest + + with open("receipt.jpg", "rb") as f: + doc = ingest(f.read(), hint="table") + + print(doc.raw_text) + for table in doc.tables: + print(table.html) +""" +from __future__ import annotations + +import base64 +import logging +import os +from typing import Any + +from .client import DocuvisionClient +from .models import Element, StructuredDocument + +logger = logging.getLogger(__name__) + +_DOCUVISION_URL_ENV = "CF_DOCUVISION_URL" +_DOCUVISION_URL_DEFAULT = "http://localhost:8003" + +_LLM_FALLBACK_PROMPTS: dict[str, str] = { + "auto": "Extract all text from this document. Return a JSON array of {\"type\": ..., \"text\": ...} objects.", + "table": "Extract all tables from this document as HTML. Return a JSON array including {\"type\": \"table\", \"html\": ..., \"text\": ...} objects.", + "text": "Extract all text from this document preserving headings and paragraphs. Return a JSON array of {\"type\": ..., \"text\": ...} objects.", + "form": "Extract all form field labels and values from this document. Return a JSON array of {\"type\": ..., \"text\": ...} objects.", +} + + +def ingest( + image_bytes: bytes, + hint: str = "auto", + *, + docuvision_url: str | None = None, + llm_router: Any | None = None, + llm_config_path: Any | None = None, +) -> StructuredDocument: + """ + Ingest an image and return a StructuredDocument. + + Tries cf-docuvision first; falls back to LLMRouter vision if the service is + unavailable or fails. If neither is available, returns an empty document. + + Args: + image_bytes: Raw bytes of the image (JPEG, PNG, etc.). + hint: Extraction mode: "auto" | "table" | "text" | "form". + docuvision_url: Override service URL (defaults to CF_DOCUVISION_URL env or localhost:8003). + llm_router: Pre-built LLMRouter instance for fallback (optional). + llm_config_path: Path to llm.yaml for lazy-constructing LLMRouter if needed. + + Returns: + StructuredDocument — always, even on total failure (empty document). + """ + url = docuvision_url or os.environ.get(_DOCUVISION_URL_ENV, _DOCUVISION_URL_DEFAULT) + client = DocuvisionClient(base_url=url) + + # ── primary: cf-docuvision ──────────────────────────────────────────────── + try: + if client.is_healthy(): + doc = client.extract(image_bytes, hint=hint) + logger.debug("ingest: cf-docuvision succeeded (%d elements)", len(doc.elements)) + return doc + logger.debug("ingest: cf-docuvision unhealthy, falling back to LLM") + except Exception as exc: + logger.warning("ingest: cf-docuvision failed (%s), falling back to LLM", exc) + + # ── fallback: LLMRouter vision ──────────────────────────────────────────── + router = llm_router or _build_llm_router(llm_config_path) + if router is None: + logger.warning("ingest: no LLM router available; returning empty document") + return StructuredDocument(metadata={"source": "none", "hint": hint}) + + try: + return _llm_ingest(router, image_bytes, hint) + except Exception as exc: + logger.warning("ingest: LLM fallback failed (%s); returning empty document", exc) + return StructuredDocument(metadata={"source": "llm_error", "hint": hint, "error": str(exc)}) + + +# ── helpers ─────────────────────────────────────────────────────────────────── + +def _build_llm_router(config_path: Any | None) -> Any | None: + """Lazily construct an LLMRouter; return None if unavailable.""" + try: + from circuitforge_core.llm import LLMRouter + kwargs: dict[str, Any] = {} + if config_path is not None: + kwargs["config_path"] = config_path + return LLMRouter(**kwargs) + except Exception as exc: + logger.debug("ingest: could not build LLMRouter: %s", exc) + return None + + +def _llm_ingest(router: Any, image_bytes: bytes, hint: str) -> StructuredDocument: + """Use LLMRouter's vision capability to extract document text.""" + import json + + prompt = _LLM_FALLBACK_PROMPTS.get(hint, _LLM_FALLBACK_PROMPTS["auto"]) + b64 = base64.b64encode(image_bytes).decode() + + raw = router.generate_vision( + prompt=prompt, + image_b64=b64, + ) + + # Try to parse structured output; fall back to single paragraph + elements: list[Element] = [] + try: + parsed = json.loads(raw) + if isinstance(parsed, list): + for item in parsed: + elements.append(Element( + type=item.get("type", "paragraph"), + text=item.get("text", ""), + )) + except (json.JSONDecodeError, TypeError): + elements = [Element(type="paragraph", text=raw.strip())] + + raw_text = "\n".join(e.text for e in elements) + return StructuredDocument( + elements=elements, + raw_text=raw_text, + tables=[], + metadata={"source": "llm_fallback", "hint": hint}, + ) diff --git a/circuitforge_core/documents/models.py b/circuitforge_core/documents/models.py new file mode 100644 index 0000000..112fee9 --- /dev/null +++ b/circuitforge_core/documents/models.py @@ -0,0 +1,53 @@ +""" +circuitforge_core.documents.models — shared document data types. + +These are the canonical output types from the document ingestion pipeline. +All consumers (kiwi, falcon, peregrine, godwit, …) receive a StructuredDocument +regardless of whether Dolphin-v2 or LLM fallback was used. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass(frozen=True) +class Element: + """A single logical content unit within a document. + + type: one of heading | paragraph | list | table | figure | formula | code + text: extracted plain text (for tables: may be row summary or empty) + bbox: normalised [x0, y0, x1, y1] in 0-1 space, None when unavailable + """ + type: str + text: str + bbox: tuple[float, float, float, float] | None = None + + +@dataclass(frozen=True) +class ParsedTable: + """An extracted table rendered as HTML.""" + html: str + bbox: tuple[float, float, float, float] | None = None + + +@dataclass +class StructuredDocument: + """ + The canonical result of document ingestion. + + Produced by ingest() for any input image regardless of which backend + (cf-docuvision or LLM fallback) processed it. + """ + elements: list[Element] = field(default_factory=list) + raw_text: str = "" + tables: list[ParsedTable] = field(default_factory=list) + metadata: dict[str, Any] = field(default_factory=dict) + + @property + def headings(self) -> list[Element]: + return [e for e in self.elements if e.type == "heading"] + + @property + def paragraphs(self) -> list[Element]: + return [e for e in self.elements if e.type == "paragraph"] diff --git a/circuitforge_core/hardware/__init__.py b/circuitforge_core/hardware/__init__.py new file mode 100644 index 0000000..1608b4d --- /dev/null +++ b/circuitforge_core/hardware/__init__.py @@ -0,0 +1,30 @@ +# circuitforge_core/hardware/__init__.py +""" +Hardware detection and LLM profile generation. + +Typical usage:: + + from circuitforge_core.hardware import detect_hardware, generate_profile + import yaml + + spec = detect_hardware() + config = generate_profile(spec) + print(yaml.dump(config.to_dict())) + print("Recommended profile:", config.profile_name) +""" +from .detect import detect_hardware, detect_hardware_json +from .generator import generate_profile +from .models import HardwareSpec, LLMBackendConfig, LLMConfig +from .tiers import VRAM_TIERS, VramTier, select_tier + +__all__ = [ + "detect_hardware", + "detect_hardware_json", + "generate_profile", + "HardwareSpec", + "LLMBackendConfig", + "LLMConfig", + "VRAM_TIERS", + "VramTier", + "select_tier", +] diff --git a/circuitforge_core/hardware/detect.py b/circuitforge_core/hardware/detect.py new file mode 100644 index 0000000..49b522f --- /dev/null +++ b/circuitforge_core/hardware/detect.py @@ -0,0 +1,196 @@ +# circuitforge_core/hardware/detect.py +""" +Cross-platform hardware auto-detection. + +Reads GPU info from: + - nvidia-smi (NVIDIA, Linux/Windows) + - rocm-smi (AMD, Linux) + - system_profiler (Apple Silicon, macOS) + - /proc/meminfo (Linux RAM) + - psutil (cross-platform RAM fallback) + +Returns a HardwareSpec. On detection failure, returns a conservative +CPU-only spec so callers always get a usable result. +""" +from __future__ import annotations + +import json +import platform +import re +import subprocess +import sys +from pathlib import Path + +from .models import HardwareSpec + + +def _run(*args: str, timeout: int = 5) -> str: + """Run a subprocess and return stdout, or empty string on any error.""" + try: + result = subprocess.run( + list(args), capture_output=True, text=True, timeout=timeout + ) + return result.stdout.strip() + except Exception: + return "" + + +def _ram_mb() -> int: + """Return total system RAM in MB.""" + # psutil is optional but preferred + try: + import psutil # type: ignore[import-untyped] + return psutil.virtual_memory().total // (1024 * 1024) + except ImportError: + pass + + # Linux /proc/meminfo fallback + mem_info = Path("/proc/meminfo") + if mem_info.exists(): + for line in mem_info.read_text().splitlines(): + if line.startswith("MemTotal:"): + kb = int(line.split()[1]) + return kb // 1024 + + return 0 + + +def _detect_nvidia() -> tuple[int, int, str, str, str] | None: + """ + Returns (vram_mb, gpu_count, gpu_name, cuda_version, vendor) or None. + Uses nvidia-smi --query-gpu for reliable machine-parseable output. + """ + out = _run( + "nvidia-smi", + "--query-gpu=name,memory.total", + "--format=csv,noheader,nounits", + ) + if not out: + return None + + lines = [l.strip() for l in out.splitlines() if l.strip()] + if not lines: + return None + + gpu_count = len(lines) + # Use the first GPU's VRAM as the representative value + parts = lines[0].split(",") + if len(parts) < 2: + return None + + gpu_name = parts[0].strip() + try: + vram_mb = int(parts[1].strip()) + except ValueError: + return None + + # CUDA version from nvidia-smi header + header = _run("nvidia-smi", "--query", "--display=COMPUTE") + cuda_match = re.search(r"CUDA Version\s*:\s*([\d.]+)", header) + cuda_version = cuda_match.group(1) if cuda_match else "" + + return vram_mb, gpu_count, gpu_name, cuda_version, "nvidia" + + +def _detect_amd() -> tuple[int, int, str, str, str] | None: + """Returns (vram_mb, gpu_count, gpu_name, rocm_version, vendor) or None.""" + out = _run("rocm-smi", "--showmeminfo", "vram", "--json") + if not out: + return None + + try: + data = json.loads(out) + except json.JSONDecodeError: + return None + + cards = [k for k in data if k.startswith("card")] + if not cards: + return None + + gpu_count = len(cards) + first = data[cards[0]] + try: + vram_mb = int(first.get("VRAM Total Memory (B)", 0)) // (1024 * 1024) + except (ValueError, TypeError): + return None + + gpu_name = first.get("Card series", "AMD GPU") + rocm_out = _run("rocminfo") + rocm_match = re.search(r"ROCm Runtime Version\s*:\s*([\d.]+)", rocm_out) + rocm_version = rocm_match.group(1) if rocm_match else "" + + return vram_mb, gpu_count, gpu_name, rocm_version, "amd" + + +def _detect_apple() -> tuple[int, int, str, str, str] | None: + """ + Returns (unified_ram_mb, 1, gpu_name, '', 'apple') or None. + Apple Silicon shares RAM between CPU and GPU; we report total RAM as VRAM. + """ + if platform.system() != "Darwin": + return None + + # Check for Apple Silicon + arm_check = _run("sysctl", "-n", "hw.optional.arm64") + if arm_check.strip() != "1": + return None + + out = _run("system_profiler", "SPHardwareDataType", "-json") + try: + data = json.loads(out) + hw = data["SPHardwareDataType"][0] + chip = hw.get("chip_type", "Apple Silicon") + ram_str = hw.get("physical_memory", "0 GB") + ram_gb = int(re.search(r"(\d+)", ram_str).group(1)) # type: ignore[union-attr] + vram_mb = ram_gb * 1024 # unified memory + except Exception: + return None + + return vram_mb, 1, chip, "", "apple" + + +def detect_hardware() -> HardwareSpec: + """ + Auto-detect hardware and return a HardwareSpec. + + Detection order: NVIDIA → AMD → Apple → CPU fallback. + Never raises — returns a CPU-only spec on any detection failure. + """ + ram_mb = _ram_mb() + + for detector in (_detect_nvidia, _detect_amd, _detect_apple): + try: + result = detector() + except Exception: + result = None + if result is not None: + vram_mb, gpu_count, gpu_name, version, vendor = result + return HardwareSpec( + vram_mb=vram_mb, + ram_mb=ram_mb, + gpu_count=gpu_count, + gpu_vendor=vendor, + gpu_name=gpu_name, + cuda_version=version if vendor == "nvidia" else "", + rocm_version=version if vendor == "amd" else "", + ) + + # CPU-only fallback + return HardwareSpec( + vram_mb=0, + ram_mb=ram_mb, + gpu_count=0, + gpu_vendor="cpu", + gpu_name="", + ) + + +def detect_hardware_json() -> str: + """Return detect_hardware() result as a JSON string (for CLI / one-liner use).""" + import dataclasses + spec = detect_hardware() + return json.dumps(dataclasses.asdict(spec), indent=2) + + +if __name__ == "__main__": + print(detect_hardware_json()) diff --git a/circuitforge_core/hardware/generator.py b/circuitforge_core/hardware/generator.py new file mode 100644 index 0000000..ff012b1 --- /dev/null +++ b/circuitforge_core/hardware/generator.py @@ -0,0 +1,91 @@ +# circuitforge_core/hardware/generator.py +""" +Profile generator: HardwareSpec → LLMConfig. + +`generate_profile()` is the main entry point. It selects the appropriate +VRAM tier, builds a complete LLMConfig dict ready to write as llm.yaml, +and returns the matching public GpuProfile name for orch use. +""" +from __future__ import annotations + +from .models import HardwareSpec, LLMBackendConfig, LLMConfig +from .tiers import select_tier + + +# Default backend URLs — overridable for non-standard setups +_OLLAMA_URL = "http://localhost:11434" +_VLLM_URL = "http://localhost:8000" +_VISION_URL = "http://localhost:8002" +_DOCUVISION_URL = "http://localhost:8003" + + +def generate_profile( + spec: HardwareSpec, + *, + ollama_url: str = _OLLAMA_URL, + vllm_url: str = _VLLM_URL, + vision_url: str = _VISION_URL, + docuvision_url: str = _DOCUVISION_URL, +) -> LLMConfig: + """ + Map a HardwareSpec to an LLMConfig. + + Returns an LLMConfig whose `profile_name` matches a public GpuProfile YAML + in `circuitforge_core/resources/profiles/public/` so the orch can load the + correct service allocation profile automatically. + """ + tier = select_tier(spec.vram_mb) + has_vllm = "vllm" in tier.services + has_vision = "cf-vision" in tier.services + has_docuvision = "cf-docuvision" in tier.services + + backends: dict[str, LLMBackendConfig] = {} + + # Ollama is always available (CPU fallback) + backends["ollama"] = LLMBackendConfig( + enabled=True, + url=ollama_url, + model=tier.ollama_model, + ) + + # vllm — only on GPU tiers that can fit a model + if has_vllm and tier.vllm_candidates: + backends["vllm"] = LLMBackendConfig( + enabled=True, + url=vllm_url, + model_candidates=list(tier.vllm_candidates), + ) + + # Vision service + if has_vision: + backends["vision_service"] = LLMBackendConfig( + enabled=True, + url=vision_url, + ) + + # Docuvision service + if has_docuvision: + backends["docuvision_service"] = LLMBackendConfig( + enabled=True, + url=docuvision_url, + ) + + # Fallback order: prefer vllm over ollama when available (faster for batch) + if has_vllm: + fallback = ["vllm", "ollama"] + research_fallback = ["vllm", "ollama"] + else: + fallback = ["ollama"] + research_fallback = ["ollama"] + + vision_fallback = ( + ["vision_service"] if has_vision else [] + ) + ["ollama"] + + return LLMConfig( + profile_name=tier.profile_name, + backends=backends, + fallback_order=fallback, + research_fallback_order=research_fallback, + vision_fallback_order=vision_fallback, + ) diff --git a/circuitforge_core/hardware/models.py b/circuitforge_core/hardware/models.py new file mode 100644 index 0000000..e640832 --- /dev/null +++ b/circuitforge_core/hardware/models.py @@ -0,0 +1,60 @@ +# circuitforge_core/hardware/models.py +"""Data models for hardware detection and LLM configuration output.""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass(frozen=True) +class HardwareSpec: + """Describes a user's hardware as detected or manually entered.""" + + vram_mb: int # total VRAM per primary GPU (0 = CPU-only) + ram_mb: int # total system RAM + gpu_count: int # number of GPUs + gpu_vendor: str # "nvidia" | "amd" | "apple" | "cpu" + gpu_name: str = "" # human-readable card name, e.g. "RTX 4080" + cuda_version: str = "" # e.g. "12.4" (empty if not CUDA) + rocm_version: str = "" # e.g. "5.7" (empty if not ROCm) + + +@dataclass +class LLMBackendConfig: + """Configuration for a single LLM backend.""" + + enabled: bool + url: str + model: str = "" + model_candidates: list[str] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + d: dict[str, Any] = {"enabled": self.enabled, "url": self.url} + if self.model: + d["model"] = self.model + if self.model_candidates: + d["model_candidates"] = self.model_candidates + return d + + +@dataclass +class LLMConfig: + """ + Ready-to-serialize llm.yaml configuration. + + Matches the schema consumed by LLMRouter in circuitforge products. + """ + + profile_name: str # e.g. "single-gpu-8gb" — matches a public GpuProfile + backends: dict[str, LLMBackendConfig] = field(default_factory=dict) + fallback_order: list[str] = field(default_factory=list) + research_fallback_order: list[str] = field(default_factory=list) + vision_fallback_order: list[str] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + return { + "backends": {k: v.to_dict() for k, v in self.backends.items()}, + "fallback_order": self.fallback_order, + "research_fallback_order": self.research_fallback_order, + "vision_fallback_order": self.vision_fallback_order, + } diff --git a/circuitforge_core/hardware/tiers.py b/circuitforge_core/hardware/tiers.py new file mode 100644 index 0000000..a4b7229 --- /dev/null +++ b/circuitforge_core/hardware/tiers.py @@ -0,0 +1,104 @@ +# circuitforge_core/hardware/tiers.py +""" +VRAM tier ladder and model catalog. + +Tiers map hardware VRAM (per-GPU) to: + - profile_name: matching public GpuProfile YAML in profiles/public/ + - ollama_model: best default Ollama model for this tier + - vllm_candidates: ordered list of HF model dirs to try via cf-orch/vllm + - services: which cf-* managed services are available at this tier + - llm_max_params: rough upper bound, for human-readable display +""" +from __future__ import annotations + +from dataclasses import dataclass, field + + +@dataclass(frozen=True) +class VramTier: + vram_min_mb: int # inclusive lower bound (0 = CPU-only) + vram_max_mb: int # exclusive upper bound (use sys.maxsize for the top tier) + profile_name: str # public GpuProfile YAML stem + ollama_model: str # e.g. "qwen2.5:7b-instruct-q4_k_m" + vllm_candidates: list[str] = field(default_factory=list) + services: list[str] = field(default_factory=list) + llm_max_params: str = "" # human-readable, e.g. "7b-q4" + + +# Ordered from smallest to largest — first match wins in select_tier(). +VRAM_TIERS: list[VramTier] = [ + VramTier( + vram_min_mb=0, + vram_max_mb=1, + profile_name="cpu-16gb", + ollama_model="qwen2.5:1.5b-instruct-q4_k_m", + vllm_candidates=[], + services=["ollama", "cf-stt", "cf-tts"], + llm_max_params="3b-q4", + ), + VramTier( + vram_min_mb=1, + vram_max_mb=3_000, + profile_name="single-gpu-2gb", + ollama_model="qwen2.5:1.5b-instruct-q4_k_m", + vllm_candidates=[], + services=["ollama"], + llm_max_params="1.5b", + ), + VramTier( + vram_min_mb=3_000, + vram_max_mb=5_000, + profile_name="single-gpu-4gb", + ollama_model="qwen2.5:3b-instruct-q4_k_m", + vllm_candidates=[], + services=["ollama", "cf-vision", "cf-stt", "cf-tts"], + llm_max_params="3b", + ), + VramTier( + vram_min_mb=5_000, + vram_max_mb=7_000, + profile_name="single-gpu-6gb", + ollama_model="qwen2.5:7b-instruct-q4_k_m", + vllm_candidates=["Qwen2.5-3B-Instruct", "Phi-4-mini-instruct"], + services=["ollama", "vllm", "cf-vision", "cf-docuvision", "cf-stt", "cf-tts"], + llm_max_params="7b-q4", + ), + VramTier( + vram_min_mb=7_000, + vram_max_mb=12_000, + profile_name="single-gpu-8gb", + ollama_model="qwen2.5:7b-instruct", + vllm_candidates=["Qwen2.5-3B-Instruct", "Phi-4-mini-instruct"], + services=["ollama", "vllm", "cf-vision", "cf-docuvision", "cf-stt", "cf-tts"], + llm_max_params="8b", + ), + VramTier( + vram_min_mb=12_000, + vram_max_mb=20_000, + profile_name="single-gpu-16gb", + ollama_model="qwen2.5:14b-instruct-q4_k_m", + vllm_candidates=["Qwen2.5-14B-Instruct", "Qwen2.5-3B-Instruct", "Phi-4-mini-instruct"], + services=["ollama", "vllm", "cf-vision", "cf-docuvision", "cf-stt", "cf-tts", + "cf-embed", "cf-classify"], + llm_max_params="14b", + ), + VramTier( + vram_min_mb=20_000, + vram_max_mb=10 ** 9, + profile_name="single-gpu-24gb", + ollama_model="qwen2.5:32b-instruct-q4_k_m", + vllm_candidates=["Qwen2.5-14B-Instruct", "Qwen2.5-3B-Instruct", "Phi-4-mini-instruct"], + services=["ollama", "vllm", "cf-vision", "cf-docuvision", "cf-stt", "cf-tts", + "cf-embed", "cf-classify", "comfyui"], + llm_max_params="32b-q4", + ), +] + + +def select_tier(vram_mb: int) -> VramTier: + """Return the best matching tier for the given per-GPU VRAM in MB.""" + for tier in VRAM_TIERS: + if tier.vram_min_mb <= vram_mb < tier.vram_max_mb: + return tier + # Fallback: return the top tier for unusually large VRAM + return VRAM_TIERS[-1] diff --git a/circuitforge_core/resources/docuvision/__init__.py b/circuitforge_core/resources/docuvision/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/circuitforge_core/resources/docuvision/app.py b/circuitforge_core/resources/docuvision/app.py new file mode 100644 index 0000000..3501b45 --- /dev/null +++ b/circuitforge_core/resources/docuvision/app.py @@ -0,0 +1,250 @@ +""" +cf-docuvision — managed document understanding service. + +Wraps ByteDance/Dolphin-v2 (Qwen2.5-VL backbone) behind a simple HTTP API. +Managed by cf-orch; started/stopped as a ProcessSpec service. + +API +--- +GET /health → {"status": "ok", "model": ""} +POST /extract → ExtractResponse + +Usage (standalone):: + + python -m circuitforge_core.resources.docuvision.app \\ + --model /Library/Assets/LLM/docuvision/models/dolphin-v2 \\ + --port 8003 --gpu-id 0 +""" +from __future__ import annotations + +import argparse +import base64 +import io +import json +import logging +from contextlib import asynccontextmanager +from typing import Any + +import uvicorn +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + +# Module-level state — populated by _load_model() on first /extract call +_model: Any = None +_processor: Any = None +_model_path: str = "" +_device: str = "cpu" + + +# ── lazy loader ─────────────────────────────────────────────────────────────── + +def _load_model() -> None: + """Lazy-load Dolphin-v2. Called once on first /extract request.""" + global _model, _processor, _device + + if _model is not None: + return + + import torch + from transformers import AutoProcessor, AutoModelForCausalLM + + logger.info("Loading Dolphin-v2 from %s ...", _model_path) + _device = "cuda" if torch.cuda.is_available() else "cpu" + + _processor = AutoProcessor.from_pretrained( + _model_path, + trust_remote_code=True, + ) + _model = AutoModelForCausalLM.from_pretrained( + _model_path, + trust_remote_code=True, + torch_dtype=torch.float16 if _device == "cuda" else torch.float32, + device_map=_device, + ) + _model.eval() + logger.info("Dolphin-v2 loaded on %s", _device) + + +# ── FastAPI app ─────────────────────────────────────────────────────────────── + +@asynccontextmanager +async def _lifespan(app: FastAPI): + yield + + +app = FastAPI(title="cf-docuvision", lifespan=_lifespan) + + +# ── request / response models ───────────────────────────────────────────────── + +class ExtractRequest(BaseModel): + """ + Either image_b64 (base64-encoded bytes) or image_path (absolute path) must + be provided. hint guides the extraction mode: + - "auto" - Dolphin-v2 detects layout and element types automatically + - "table" - optimise for tabular data (receipts, invoices, forms) + - "text" - optimise for dense prose (contracts, letters) + - "form" - optimise for form field extraction + """ + image_b64: str | None = None + image_path: str | None = None + hint: str = "auto" + + +class ElementOut(BaseModel): + type: str # heading | paragraph | list | table | figure | formula | code + text: str + bbox: list[float] | None = None # [x0, y0, x1, y1] normalised 0-1 if available + + +class TableOut(BaseModel): + html: str + bbox: list[float] | None = None + + +class ExtractResponse(BaseModel): + elements: list[ElementOut] + raw_text: str + tables: list[TableOut] + metadata: dict[str, Any] + + +# ── helpers ─────────────────────────────────────────────────────────────────── + +_HINT_PROMPTS: dict[str, str] = { + "auto": "Parse this document. Extract all elements with their types and text content.", + "table": "Extract all tables from this document as structured HTML. Also extract any line-item text.", + "text": "Extract all text from this document preserving paragraph and heading structure.", + "form": "Extract all form fields from this document. Return field labels and their values.", +} + + +def _image_from_request(req: ExtractRequest): + """Return a PIL Image from either image_b64 or image_path.""" + from PIL import Image + + if req.image_b64: + img_bytes = base64.b64decode(req.image_b64) + return Image.open(io.BytesIO(img_bytes)).convert("RGB") + + if req.image_path: + from pathlib import Path + p = Path(req.image_path) + if not p.exists(): + raise HTTPException(status_code=404, detail=f"image_path not found: {req.image_path}") + return Image.open(p).convert("RGB") + + raise HTTPException(status_code=422, detail="Either image_b64 or image_path must be provided") + + +def _parse_dolphin_output(raw: str) -> tuple[list[ElementOut], list[TableOut], str]: + """ + Parse Dolphin-v2's structured output into elements and tables. + + Dolphin-v2 returns a JSON array of element dicts with keys: + type, text, [html], [bbox] + + Falls back gracefully if the model returns plain text instead. + """ + elements: list[ElementOut] = [] + tables: list[TableOut] = [] + + # Try JSON parse first + try: + parsed = json.loads(raw) + if isinstance(parsed, list): + for item in parsed: + etype = item.get("type", "paragraph") + text = item.get("text", "") + bbox = item.get("bbox") + if etype == "table": + tables.append(TableOut(html=item.get("html", text), bbox=bbox)) + elements.append(ElementOut(type=etype, text=text, bbox=bbox)) + raw_text = "\n".join(e.text for e in elements) + return elements, tables, raw_text + except (json.JSONDecodeError, TypeError): + pass + + # Plain-text fallback: treat entire output as a single paragraph + elements = [ElementOut(type="paragraph", text=raw.strip())] + return elements, tables, raw.strip() + + +# ── routes ──────────────────────────────────────────────────────────────────── + +@app.get("/health") +async def health() -> dict[str, str]: + return {"status": "ok", "model": _model_path} + + +@app.post("/extract", response_model=ExtractResponse) +async def extract(req: ExtractRequest) -> ExtractResponse: + _load_model() + + image = _image_from_request(req) + prompt = _HINT_PROMPTS.get(req.hint, _HINT_PROMPTS["auto"]) + + import torch + + inputs = _processor( + text=prompt, + images=image, + return_tensors="pt", + ).to(_device) + + with torch.no_grad(): + output_ids = _model.generate( + **inputs, + max_new_tokens=2048, + do_sample=False, + ) + + # Decode only the newly generated tokens + input_len = inputs["input_ids"].shape[1] + raw_output = _processor.decode( + output_ids[0][input_len:], + skip_special_tokens=True, + ) + + elements, tables, raw_text = _parse_dolphin_output(raw_output) + + w, h = image.size + + return ExtractResponse( + elements=elements, + raw_text=raw_text, + tables=tables, + metadata={ + "hint": req.hint, + "width": w, + "height": h, + "model": _model_path, + "device": _device, + }, + ) + + +# ── CLI entry point ─────────────────────────────────────────────────────────── + +def main() -> None: + parser = argparse.ArgumentParser(description="cf-docuvision service") + parser.add_argument("--model", required=True, help="Path to Dolphin-v2 model directory") + parser.add_argument("--port", type=int, default=8003) + parser.add_argument("--host", default="0.0.0.0") + parser.add_argument("--gpu-id", type=int, default=0) + args = parser.parse_args() + + global _model_path + _model_path = args.model + + import os + os.environ.setdefault("CUDA_VISIBLE_DEVICES", str(args.gpu_id)) + + logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s") + uvicorn.run(app, host=args.host, port=args.port) + + +if __name__ == "__main__": + main() diff --git a/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml index 23b6e54..457c063 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-16gb.yaml @@ -27,6 +27,13 @@ services: priority: 2 shared: true max_concurrent: 3 + managed: + type: process + exec_path: "/devl/miniconda3/envs/cf/bin/python" + args_template: "-m circuitforge_core.resources.docuvision.app --model /Library/Assets/LLM/docuvision/models/dolphin-v2 --port {port} --gpu-id {gpu_id}" + port: 8003 + host_port: 8003 + cwd: "/Library/Development/CircuitForge/circuitforge-core" cf-stt: max_mb: 1200 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml index 243adf9..5e933bc 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-24gb.yaml @@ -27,6 +27,13 @@ services: priority: 2 shared: true max_concurrent: 4 + managed: + type: process + exec_path: "/devl/miniconda3/envs/cf/bin/python" + args_template: "-m circuitforge_core.resources.docuvision.app --model /Library/Assets/LLM/docuvision/models/dolphin-v2 --port {port} --gpu-id {gpu_id}" + port: 8003 + host_port: 8003 + cwd: "/Library/Development/CircuitForge/circuitforge-core" cf-stt: max_mb: 1200 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml index 935a536..1e8d6b3 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-6gb.yaml @@ -27,6 +27,13 @@ services: priority: 2 shared: true max_concurrent: 1 + managed: + type: process + exec_path: "/devl/miniconda3/envs/cf/bin/python" + args_template: "-m circuitforge_core.resources.docuvision.app --model /Library/Assets/LLM/docuvision/models/dolphin-v2 --port {port} --gpu-id {gpu_id}" + port: 8003 + host_port: 8003 + cwd: "/Library/Development/CircuitForge/circuitforge-core" cf-stt: max_mb: 600 priority: 2 diff --git a/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml index 7d62939..ae3b170 100644 --- a/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml +++ b/circuitforge_core/resources/profiles/public/single-gpu-8gb.yaml @@ -27,6 +27,13 @@ services: priority: 2 shared: true max_concurrent: 2 + managed: + type: process + exec_path: "/devl/miniconda3/envs/cf/bin/python" + args_template: "-m circuitforge_core.resources.docuvision.app --model /Library/Assets/LLM/docuvision/models/dolphin-v2 --port {port} --gpu-id {gpu_id}" + port: 8003 + host_port: 8003 + cwd: "/Library/Development/CircuitForge/circuitforge-core" cf-stt: max_mb: 1200 priority: 2 diff --git a/tests/test_documents/__init__.py b/tests/test_documents/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_documents/test_client.py b/tests/test_documents/test_client.py new file mode 100644 index 0000000..fae9557 --- /dev/null +++ b/tests/test_documents/test_client.py @@ -0,0 +1,93 @@ +# tests/test_documents/test_client.py +"""Unit tests for DocuvisionClient.""" +from __future__ import annotations + +import base64 +import json +from unittest.mock import MagicMock, patch + +import pytest + +from circuitforge_core.documents.client import DocuvisionClient + + +def _mock_response(data: dict, status: int = 200) -> MagicMock: + resp = MagicMock() + resp.status_code = status + resp.json.return_value = data + resp.raise_for_status = MagicMock() + if status >= 400: + import requests + resp.raise_for_status.side_effect = requests.HTTPError(response=resp) + return resp + + +_EXTRACT_RESPONSE = { + "elements": [ + {"type": "heading", "text": "Invoice", "bbox": [0.0, 0.0, 1.0, 0.1]}, + {"type": "paragraph", "text": "Due: $100"}, + ], + "raw_text": "Invoice\nDue: $100", + "tables": [ + {"html": "
$100
", "bbox": None}, + ], + "metadata": {"hint": "auto", "width": 800, "height": 1200, "model": "/fake/model", "device": "cpu"}, +} + + +def test_is_healthy_true(): + with patch("requests.get", return_value=_mock_response({}, 200)): + client = DocuvisionClient("http://localhost:8003") + assert client.is_healthy() is True + + +def test_is_healthy_false_on_error(): + with patch("requests.get", side_effect=ConnectionError("refused")): + client = DocuvisionClient("http://localhost:8003") + assert client.is_healthy() is False + + +def test_is_healthy_false_on_500(): + with patch("requests.get", return_value=_mock_response({}, 500)): + client = DocuvisionClient("http://localhost:8003") + assert client.is_healthy() is False + + +def test_extract_returns_structured_document(): + with patch("requests.post", return_value=_mock_response(_EXTRACT_RESPONSE)): + client = DocuvisionClient() + doc = client.extract(b"fake-image-bytes", hint="auto") + + assert doc.raw_text == "Invoice\nDue: $100" + assert len(doc.elements) == 2 + assert doc.elements[0].type == "heading" + assert doc.elements[0].bbox == (0.0, 0.0, 1.0, 0.1) + assert len(doc.tables) == 1 + assert "$100" in doc.tables[0].html + + +def test_extract_sends_base64_image(): + with patch("requests.post", return_value=_mock_response(_EXTRACT_RESPONSE)) as mock_post: + client = DocuvisionClient() + client.extract(b"pixels", hint="table") + + call_json = mock_post.call_args.kwargs["json"] + assert call_json["hint"] == "table" + assert base64.b64decode(call_json["image_b64"]) == b"pixels" + + +def test_extract_raises_on_http_error(): + import requests as req_lib + with patch("requests.post", return_value=_mock_response({}, 422)): + client = DocuvisionClient() + with pytest.raises(req_lib.HTTPError): + client.extract(b"bad") + + +def test_extract_table_bbox_none_when_missing(): + response = dict(_EXTRACT_RESPONSE) + response["tables"] = [{"html": ""}] + with patch("requests.post", return_value=_mock_response(response)): + client = DocuvisionClient() + doc = client.extract(b"img") + assert doc.tables[0].bbox is None diff --git a/tests/test_documents/test_ingest.py b/tests/test_documents/test_ingest.py new file mode 100644 index 0000000..fe8b354 --- /dev/null +++ b/tests/test_documents/test_ingest.py @@ -0,0 +1,140 @@ +# tests/test_documents/test_ingest.py +"""Unit tests for circuitforge_core.documents.ingest.""" +from __future__ import annotations + +import json +from unittest.mock import MagicMock, patch + +import pytest + +from circuitforge_core.documents import ingest +from circuitforge_core.documents.models import Element, StructuredDocument + + +def _healthy_client(doc: StructuredDocument) -> MagicMock: + c = MagicMock() + c.is_healthy.return_value = True + c.extract.return_value = doc + return c + + +def _unhealthy_client() -> MagicMock: + c = MagicMock() + c.is_healthy.return_value = False + return c + + +def _doc_with_text(text: str) -> StructuredDocument: + return StructuredDocument( + elements=[Element(type="paragraph", text=text)], + raw_text=text, + metadata={"source": "docuvision"}, + ) + + +# ── primary path ────────────────────────────────────────────────────────────── + +def test_ingest_uses_docuvision_when_healthy(): + expected = _doc_with_text("hello world") + with patch("circuitforge_core.documents.ingest.DocuvisionClient", return_value=_healthy_client(expected)): + result = ingest(b"imgbytes", hint="auto") + + assert result.raw_text == "hello world" + assert result.elements[0].type == "paragraph" + + +def test_ingest_passes_hint_to_client(): + expected = _doc_with_text("table data") + with patch("circuitforge_core.documents.ingest.DocuvisionClient") as MockClient: + mock_instance = _healthy_client(expected) + MockClient.return_value = mock_instance + ingest(b"imgbytes", hint="table") + + mock_instance.extract.assert_called_once_with(b"imgbytes", hint="table") + + +def test_ingest_uses_custom_url(): + expected = _doc_with_text("text") + with patch("circuitforge_core.documents.ingest.DocuvisionClient") as MockClient: + mock_instance = _healthy_client(expected) + MockClient.return_value = mock_instance + ingest(b"imgbytes", docuvision_url="http://myhost:9000") + + MockClient.assert_called_once_with(base_url="http://myhost:9000") + + +# ── fallback path ───────────────────────────────────────────────────────────── + +def test_ingest_falls_back_to_llm_when_docuvision_unhealthy(): + mock_router = MagicMock() + mock_router.generate_vision.return_value = json.dumps([ + {"type": "paragraph", "text": "fallback text"}, + ]) + + with patch("circuitforge_core.documents.ingest.DocuvisionClient", return_value=_unhealthy_client()): + result = ingest(b"imgbytes", llm_router=mock_router) + + assert result.raw_text == "fallback text" + assert result.metadata["source"] == "llm_fallback" + + +def test_ingest_llm_fallback_plain_text(): + """LLM returns plain text (not JSON) → single paragraph element.""" + mock_router = MagicMock() + mock_router.generate_vision.return_value = "This is plain text output." + + with patch("circuitforge_core.documents.ingest.DocuvisionClient", return_value=_unhealthy_client()): + result = ingest(b"imgbytes", llm_router=mock_router) + + assert len(result.elements) == 1 + assert result.elements[0].type == "paragraph" + assert "plain text" in result.elements[0].text + + +def test_ingest_returns_empty_doc_when_no_llm(): + with patch("circuitforge_core.documents.ingest.DocuvisionClient", return_value=_unhealthy_client()), \ + patch("circuitforge_core.documents.ingest._build_llm_router", return_value=None): + result = ingest(b"imgbytes") + + assert isinstance(result, StructuredDocument) + assert result.elements == [] + assert result.metadata["source"] == "none" + + +def test_ingest_returns_empty_doc_on_docuvision_exception(): + failing_client = MagicMock() + failing_client.is_healthy.return_value = True + failing_client.extract.side_effect = ConnectionError("refused") + + with patch("circuitforge_core.documents.ingest.DocuvisionClient", return_value=failing_client), \ + patch("circuitforge_core.documents.ingest._build_llm_router", return_value=None): + result = ingest(b"imgbytes") + + assert isinstance(result, StructuredDocument) + assert result.metadata["source"] == "none" + + +def test_ingest_returns_empty_doc_on_llm_exception(): + mock_router = MagicMock() + mock_router.generate_vision.side_effect = RuntimeError("GPU OOM") + + with patch("circuitforge_core.documents.ingest.DocuvisionClient", return_value=_unhealthy_client()): + result = ingest(b"imgbytes", llm_router=mock_router) + + assert isinstance(result, StructuredDocument) + assert result.metadata["source"] == "llm_error" + assert "GPU OOM" in result.metadata["error"] + + +# ── CF_DOCUVISION_URL env var ───────────────────────────────────────────────── + +def test_ingest_reads_url_from_env(monkeypatch): + monkeypatch.setenv("CF_DOCUVISION_URL", "http://envhost:7777") + expected = _doc_with_text("env-routed") + + with patch("circuitforge_core.documents.ingest.DocuvisionClient") as MockClient: + mock_instance = _healthy_client(expected) + MockClient.return_value = mock_instance + ingest(b"imgbytes") + + MockClient.assert_called_once_with(base_url="http://envhost:7777") diff --git a/tests/test_documents/test_models.py b/tests/test_documents/test_models.py new file mode 100644 index 0000000..6a46a40 --- /dev/null +++ b/tests/test_documents/test_models.py @@ -0,0 +1,49 @@ +# tests/test_documents/test_models.py +"""Unit tests for circuitforge_core.documents.models.""" +from __future__ import annotations + +import pytest +from circuitforge_core.documents.models import Element, ParsedTable, StructuredDocument + + +def test_element_is_frozen(): + e = Element(type="heading", text="Title") + with pytest.raises(Exception): + e.text = "changed" # type: ignore[misc] + + +def test_element_bbox_optional(): + e = Element(type="paragraph", text="hello") + assert e.bbox is None + + +def test_parsed_table_frozen(): + t = ParsedTable(html="
") + with pytest.raises(Exception): + t.html = "changed" # type: ignore[misc] + + +def test_structured_document_defaults(): + doc = StructuredDocument() + assert doc.elements == [] + assert doc.raw_text == "" + assert doc.tables == [] + assert doc.metadata == {} + + +def test_structured_document_headings_filter(): + doc = StructuredDocument(elements=[ + Element(type="heading", text="H1"), + Element(type="paragraph", text="body"), + Element(type="heading", text="H2"), + ]) + assert [e.text for e in doc.headings] == ["H1", "H2"] + + +def test_structured_document_paragraphs_filter(): + doc = StructuredDocument(elements=[ + Element(type="heading", text="H1"), + Element(type="paragraph", text="body"), + ]) + assert len(doc.paragraphs) == 1 + assert doc.paragraphs[0].text == "body" diff --git a/tests/test_hardware/__init__.py b/tests/test_hardware/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_hardware/test_detect.py b/tests/test_hardware/test_detect.py new file mode 100644 index 0000000..959f144 --- /dev/null +++ b/tests/test_hardware/test_detect.py @@ -0,0 +1,107 @@ +# tests/test_hardware/test_detect.py +"""Tests for hardware auto-detection (subprocess mocked throughout).""" +import json +from unittest.mock import MagicMock, patch + +import pytest + +from circuitforge_core.hardware.detect import detect_hardware, detect_hardware_json +from circuitforge_core.hardware.models import HardwareSpec + + +_NVIDIA_SMI_OUTPUT = "NVIDIA GeForce RTX 4080, 16376\n" +_ROCM_SMI_JSON = json.dumps({ + "card0": { + "Card series": "Radeon RX 7900 XTX", + "VRAM Total Memory (B)": str(24 * 1024 * 1024 * 1024), + } +}) +_SYSTEM_PROFILER_JSON = json.dumps({ + "SPHardwareDataType": [{ + "chip_type": "Apple M2 Pro", + "physical_memory": "32 GB", + }] +}) + + +def _mock_run(outputs: dict[str, str]): + """Return a _run replacement that maps first arg → output.""" + def fake_run(*args, **kwargs): + cmd = args[0] if args else "" + return outputs.get(cmd, "") + return fake_run + + +class TestDetectNvidia: + def test_returns_nvidia_spec(self): + with patch("circuitforge_core.hardware.detect._run", + side_effect=lambda *a, **kw: _NVIDIA_SMI_OUTPUT if "query-gpu" in " ".join(a) else ""), \ + patch("circuitforge_core.hardware.detect._ram_mb", return_value=32768): + spec = detect_hardware() + assert spec.gpu_vendor == "nvidia" + assert spec.vram_mb == 16376 + assert spec.gpu_name == "NVIDIA GeForce RTX 4080" + assert spec.ram_mb == 32768 + + def test_gpu_count_from_line_count(self): + dual_gpu = "RTX 4080, 16376\nRTX 3090, 24576\n" + with patch("circuitforge_core.hardware.detect._run", + side_effect=lambda *a, **kw: dual_gpu if "query-gpu" in " ".join(a) else ""), \ + patch("circuitforge_core.hardware.detect._ram_mb", return_value=65536): + spec = detect_hardware() + assert spec.gpu_count == 2 + assert spec.vram_mb == 16376 # first GPU + + +class TestDetectAmd: + def test_returns_amd_spec_when_nvidia_absent(self): + with patch("circuitforge_core.hardware.detect._run", + side_effect=lambda *a, **kw: + "" if "nvidia" in a[0] else + _ROCM_SMI_JSON if "rocm-smi" in a[0] else ""), \ + patch("circuitforge_core.hardware.detect._ram_mb", return_value=65536): + spec = detect_hardware() + assert spec.gpu_vendor == "amd" + assert spec.vram_mb == 24576 + assert "7900" in spec.gpu_name + + +class TestDetectApple: + def test_returns_apple_spec_on_macos(self): + with patch("platform.system", return_value="Darwin"), \ + patch("circuitforge_core.hardware.detect._run", + side_effect=lambda *a, **kw: + "1" if "arm64" in " ".join(a) else + _SYSTEM_PROFILER_JSON if "SPHardware" in " ".join(a) else ""), \ + patch("circuitforge_core.hardware.detect._ram_mb", return_value=32768): + spec = detect_hardware() + assert spec.gpu_vendor == "apple" + assert spec.vram_mb == 32768 # 32 GB unified + assert "M2" in spec.gpu_name + + +class TestCpuFallback: + def test_cpu_fallback_when_no_gpu_detected(self): + with patch("circuitforge_core.hardware.detect._run", return_value=""), \ + patch("platform.system", return_value="Linux"), \ + patch("circuitforge_core.hardware.detect._ram_mb", return_value=16384): + spec = detect_hardware() + assert spec.gpu_vendor == "cpu" + assert spec.vram_mb == 0 + assert spec.gpu_count == 0 + + def test_never_raises(self): + with patch("circuitforge_core.hardware.detect._run", side_effect=RuntimeError("boom")), \ + patch("circuitforge_core.hardware.detect._ram_mb", return_value=0): + spec = detect_hardware() + assert isinstance(spec, HardwareSpec) + + +class TestDetectHardwareJson: + def test_returns_valid_json(self): + with patch("circuitforge_core.hardware.detect._run", return_value=""), \ + patch("circuitforge_core.hardware.detect._ram_mb", return_value=8192): + out = detect_hardware_json() + data = json.loads(out) + assert "vram_mb" in data + assert "gpu_vendor" in data diff --git a/tests/test_hardware/test_generator.py b/tests/test_hardware/test_generator.py new file mode 100644 index 0000000..a0c959a --- /dev/null +++ b/tests/test_hardware/test_generator.py @@ -0,0 +1,71 @@ +# tests/test_hardware/test_generator.py +"""Tests for the LLMConfig profile generator.""" +import pytest + +from circuitforge_core.hardware.generator import generate_profile +from circuitforge_core.hardware.models import HardwareSpec + + +def _spec(vram_mb: int, gpu_vendor: str = "nvidia") -> HardwareSpec: + return HardwareSpec( + vram_mb=vram_mb, ram_mb=32768, gpu_count=1 if vram_mb > 0 else 0, + gpu_vendor=gpu_vendor, + ) + + +class TestGenerateProfile: + def test_cpu_only_no_vllm_backend(self): + config = generate_profile(_spec(0, "cpu")) + assert config.profile_name == "cpu-16gb" + assert "vllm" not in config.backends + assert config.backends["ollama"].enabled + + def test_cpu_only_fallback_order_ollama_only(self): + config = generate_profile(_spec(0, "cpu")) + assert config.fallback_order == ["ollama"] + + def test_6gb_has_vllm_backend(self): + config = generate_profile(_spec(6144)) + assert "vllm" in config.backends + assert config.backends["vllm"].enabled + assert config.backends["vllm"].model_candidates + + def test_6gb_has_vision_service(self): + config = generate_profile(_spec(6144)) + assert "vision_service" in config.backends + + def test_6gb_has_docuvision_service(self): + config = generate_profile(_spec(6144)) + assert "docuvision_service" in config.backends + + def test_4gb_no_docuvision(self): + config = generate_profile(_spec(4096)) + assert "docuvision_service" not in config.backends + + def test_vllm_tier_fallback_order_prefers_vllm(self): + config = generate_profile(_spec(8192)) + assert config.fallback_order[0] == "vllm" + + def test_16gb_profile_name(self): + config = generate_profile(_spec(16384)) + assert config.profile_name == "single-gpu-16gb" + + def test_to_dict_roundtrip(self): + config = generate_profile(_spec(8192)) + d = config.to_dict() + assert "backends" in d + assert "fallback_order" in d + assert "ollama" in d["backends"] + assert d["backends"]["ollama"]["enabled"] is True + + def test_custom_ollama_url(self): + config = generate_profile(_spec(8192), ollama_url="http://10.0.0.1:11434") + assert config.backends["ollama"].url == "http://10.0.0.1:11434" + + def test_vllm_candidates_populated(self): + config = generate_profile(_spec(8192)) + assert len(config.backends["vllm"].model_candidates) >= 1 + + def test_vision_fallback_includes_vision_service(self): + config = generate_profile(_spec(8192)) + assert "vision_service" in config.vision_fallback_order diff --git a/tests/test_hardware/test_tiers.py b/tests/test_hardware/test_tiers.py new file mode 100644 index 0000000..b0d533b --- /dev/null +++ b/tests/test_hardware/test_tiers.py @@ -0,0 +1,67 @@ +# tests/test_hardware/test_tiers.py +"""Tests for VRAM tier ladder selection.""" +import pytest + +from circuitforge_core.hardware.tiers import VRAM_TIERS, select_tier + + +class TestSelectTier: + def test_zero_vram_returns_cpu_tier(self): + tier = select_tier(0) + assert tier.profile_name == "cpu-16gb" + assert "vllm" not in tier.services + + def test_2gb_gpu(self): + tier = select_tier(2048) + assert tier.profile_name == "single-gpu-2gb" + assert "ollama" in tier.services + + def test_4gb_gpu(self): + tier = select_tier(4096) + assert tier.profile_name == "single-gpu-4gb" + assert "cf-vision" in tier.services + assert "vllm" not in tier.services + + def test_6gb_gpu(self): + tier = select_tier(6144) + assert tier.profile_name == "single-gpu-6gb" + assert "vllm" in tier.services + assert "cf-docuvision" in tier.services + + def test_8gb_gpu(self): + tier = select_tier(8192) + assert tier.profile_name == "single-gpu-8gb" + assert "vllm" in tier.services + + def test_16gb_gpu(self): + tier = select_tier(16384) + assert tier.profile_name == "single-gpu-16gb" + assert "cf-embed" in tier.services + assert "cf-classify" in tier.services + + def test_24gb_gpu(self): + tier = select_tier(24576) + assert tier.profile_name == "single-gpu-24gb" + assert "comfyui" in tier.services + + def test_boundary_exact_6gb(self): + """Exactly 6GB should land in the 6GB tier, not 4GB.""" + tier = select_tier(6000) + assert tier.profile_name == "single-gpu-6gb" + + def test_boundary_just_below_6gb(self): + tier = select_tier(4999) + assert tier.profile_name == "single-gpu-4gb" + + def test_extremely_large_vram_returns_top_tier(self): + tier = select_tier(80 * 1024) # 80GB (H100) + assert tier.profile_name == "single-gpu-24gb" + + def test_all_tiers_have_ollama(self): + for t in VRAM_TIERS: + assert "ollama" in t.services, f"{t.profile_name} missing ollama" + + def test_vllm_tiers_have_candidates(self): + for t in VRAM_TIERS: + if "vllm" in t.services: + assert t.vllm_candidates, f"{t.profile_name} has vllm but no candidates" diff --git a/tests/test_resources/test_coordinator_probe.py b/tests/test_resources/test_coordinator_probe.py new file mode 100644 index 0000000..52a86f0 --- /dev/null +++ b/tests/test_resources/test_coordinator_probe.py @@ -0,0 +1,215 @@ +# tests/test_resources/test_coordinator_probe.py +""" +Unit tests for _run_instance_probe_loop in coordinator/app.py. + +Covers: + - healthy path: /health → 200 → state transitions starting → running + - timeout path: no healthy response within _PROBE_TIMEOUT_S → starting → stopped + - cleanup path: non-starting instance cleans up its start_times entry +""" +from __future__ import annotations + +import asyncio +from unittest.mock import MagicMock, patch + +import pytest + +from circuitforge_core.resources.coordinator.app import ( + _PROBE_TIMEOUT_S, + _run_instance_probe_loop, +) +from circuitforge_core.resources.coordinator.service_registry import ServiceInstance, ServiceRegistry + + +# ── helpers ────────────────────────────────────────────────────────────────── + +def _inst(**kwargs) -> ServiceInstance: + defaults = dict( + service="vllm", node_id="node1", gpu_id=0, + state="starting", model="qwen", url="http://localhost:8000", + ) + defaults.update(kwargs) + return ServiceInstance(**defaults) + + +def _registry(*instances: ServiceInstance) -> MagicMock: + reg = MagicMock(spec=ServiceRegistry) + reg.all_instances.return_value = list(instances) + return reg + + +def _health_resp(status: int = 200) -> MagicMock: + """Context-manager mock that simulates an HTTP response.""" + resp = MagicMock() + resp.status = status + resp.__enter__ = lambda s: resp + resp.__exit__ = MagicMock(return_value=False) + return resp + + +async def _one_tick(coro_fn, registry, *, time_val: float = 1000.0, **url_patch): + """ + Run the probe loop for exactly one iteration then cancel it. + + asyncio.sleep is patched to return immediately on the first call + and raise CancelledError on the second (ending the loop cleanly). + """ + calls = 0 + + async def _fake_sleep(_delay): + nonlocal calls + calls += 1 + if calls > 1: + raise asyncio.CancelledError() + + patches = [ + patch("asyncio.sleep", new=_fake_sleep), + patch("time.time", return_value=time_val), + ] + if url_patch: + patches.append(patch("urllib.request.urlopen", **url_patch)) + + ctx = [p.__enter__() for p in patches] + try: + await coro_fn(registry) + except asyncio.CancelledError: + pass + finally: + for p in reversed(patches): + p.__exit__(None, None, None) + + +# ── tests ──────────────────────────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_probe_transitions_starting_to_running(): + """GET /health → 200 while in starting state → upsert_instance(state='running').""" + reg = _registry(_inst(state="starting", url="http://localhost:8000")) + + calls = 0 + + async def fake_sleep(_delay): + nonlocal calls + calls += 1 + if calls > 1: + raise asyncio.CancelledError() + + with patch("asyncio.sleep", new=fake_sleep), \ + patch("time.time", return_value=1000.0), \ + patch("urllib.request.urlopen", return_value=_health_resp(200)): + try: + await _run_instance_probe_loop(reg) + except asyncio.CancelledError: + pass + + reg.upsert_instance.assert_called_once_with( + service="vllm", node_id="node1", gpu_id=0, + state="running", model="qwen", url="http://localhost:8000", + ) + + +@pytest.mark.asyncio +async def test_probe_transitions_starting_to_stopped_on_timeout(): + """No healthy response + time past _PROBE_TIMEOUT_S → upsert_instance(state='stopped'). + + Tick 1: seeds start_times[key] = 1000.0 + Tick 2: time has advanced past _PROBE_TIMEOUT_S → timeout fires → stopped + Tick 3: CancelledError exits the loop + """ + reg = _registry(_inst(state="starting", url="http://localhost:8000")) + + tick = 0 + # Tick 1: t=1000 (seed); Tick 2: t=far_future (timeout fires) + times = [1000.0, 1000.0 + _PROBE_TIMEOUT_S + 1.0] + + async def fake_sleep(_delay): + nonlocal tick + tick += 1 + if tick > 2: + raise asyncio.CancelledError() + + with patch("asyncio.sleep", new=fake_sleep), \ + patch("time.time", side_effect=times * 10), \ + patch("urllib.request.urlopen", side_effect=OSError("connection refused")): + try: + await _run_instance_probe_loop(reg) + except asyncio.CancelledError: + pass + + reg.upsert_instance.assert_called_once_with( + service="vllm", node_id="node1", gpu_id=0, + state="stopped", model="qwen", url="http://localhost:8000", + ) + + +@pytest.mark.asyncio +async def test_probe_cleans_up_start_times_for_non_starting(): + """ + An instance that is no longer in 'starting' state should not cause + upsert_instance to be called, and its key should be removed from start_times. + + We verify this indirectly: run two ticks — first with state='starting' (seeds + the key and transitions to running), second with the updated registry returning + state='running' (should not call upsert again). + """ + starting_inst = _inst(state="starting", url="http://localhost:8000") + running_inst = _inst(state="running", url="http://localhost:8000") + + tick = 0 + + # First tick: instance is starting → transitions to running + # Second tick: registry now returns running → no upsert + # Third tick: cancel + def instances_side_effect(): + if tick <= 1: + return [starting_inst] + return [running_inst] + + reg = MagicMock(spec=ServiceRegistry) + reg.all_instances.side_effect = instances_side_effect + + async def fake_sleep(_delay): + nonlocal tick + tick += 1 + if tick > 2: + raise asyncio.CancelledError() + + with patch("asyncio.sleep", new=fake_sleep), \ + patch("time.time", return_value=1000.0), \ + patch("urllib.request.urlopen", return_value=_health_resp(200)): + try: + await _run_instance_probe_loop(reg) + except asyncio.CancelledError: + pass + + # upsert should have been called exactly once (the starting→running transition) + assert reg.upsert_instance.call_count == 1 + reg.upsert_instance.assert_called_once_with( + service="vllm", node_id="node1", gpu_id=0, + state="running", model="qwen", url="http://localhost:8000", + ) + + +@pytest.mark.asyncio +async def test_probe_no_url_does_not_attempt_health_check(): + """Instance with no URL stays in starting state (no health check, no timeout yet).""" + reg = _registry(_inst(state="starting", url=None)) + + tick = 0 + + async def fake_sleep(_delay): + nonlocal tick + tick += 1 + if tick > 1: + raise asyncio.CancelledError() + + with patch("asyncio.sleep", new=fake_sleep), \ + patch("time.time", return_value=1000.0), \ + patch("urllib.request.urlopen") as mock_urlopen: + try: + await _run_instance_probe_loop(reg) + except asyncio.CancelledError: + pass + + mock_urlopen.assert_not_called() + reg.upsert_instance.assert_not_called() diff --git a/tests/test_resources/test_docuvision.py b/tests/test_resources/test_docuvision.py new file mode 100644 index 0000000..4b4f9f0 --- /dev/null +++ b/tests/test_resources/test_docuvision.py @@ -0,0 +1,215 @@ +# tests/test_resources/test_docuvision.py +""" +Unit tests for cf-docuvision FastAPI service (circuitforge_core/resources/docuvision/app.py). + +Covers: + - GET /health → status + model path + - POST /extract → image_b64, image_path, hint routing, metadata fields + - _parse_dolphin_output → JSON list path, table detection, plain-text fallback + - _image_from_request → missing both fields → 422; bad image_path → 404 +""" +from __future__ import annotations + +import base64 +import io +import json +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from fastapi.testclient import TestClient +from PIL import Image + +import circuitforge_core.resources.docuvision.app as docuvision_module +from circuitforge_core.resources.docuvision.app import ( + _parse_dolphin_output, + app, +) + + +# ── fixtures ────────────────────────────────────────────────────────────────── + +def _make_jpeg_b64(width: int = 10, height: int = 10) -> str: + """Return a base64-encoded 10x10 white JPEG.""" + img = Image.new("RGB", (width, height), color=(255, 255, 255)) + buf = io.BytesIO() + img.save(buf, format="JPEG") + return base64.b64encode(buf.getvalue()).decode() + + +@pytest.fixture(autouse=True) +def _reset_module_state(): + """Reset module-level model state between tests.""" + docuvision_module._model = None + docuvision_module._processor = None + docuvision_module._model_path = "/fake/model" + docuvision_module._device = "cpu" + yield + docuvision_module._model = None + docuvision_module._processor = None + + +@pytest.fixture +def mock_model(): + """ + Inject fake model + processor into the module so _load_model() is skipped. + + The processor returns a dict-like with 'input_ids'; the model generate() + returns a tensor-like whose decode produces a JSON string. + """ + fake_ids = MagicMock() + fake_ids.shape = [1, 5] # input_len = 5 + + fake_inputs = {"input_ids": fake_ids} + fake_inputs_obj = MagicMock() + fake_inputs_obj.__getitem__ = lambda self, k: fake_inputs[k] + fake_inputs_obj.to = lambda device: fake_inputs_obj + + fake_output = MagicMock() + fake_output.__getitem__ = lambda self, idx: MagicMock() # output_ids[0] + + fake_model = MagicMock() + fake_model.generate.return_value = fake_output + + fake_processor = MagicMock() + fake_processor.return_value = fake_inputs_obj + fake_processor.decode.return_value = json.dumps([ + {"type": "heading", "text": "Invoice", "bbox": [0.0, 0.0, 1.0, 0.1]}, + {"type": "table", "text": "row1", "html": "
row1
", + "bbox": [0.0, 0.1, 1.0, 0.5]}, + ]) + + docuvision_module._model = fake_model + docuvision_module._processor = fake_processor + return fake_model, fake_processor + + +@pytest.fixture +def client(): + return TestClient(app) + + +# ── health ──────────────────────────────────────────────────────────────────── + +def test_health_returns_ok(client): + resp = client.get("/health") + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "ok" + assert data["model"] == "/fake/model" + + +# ── _parse_dolphin_output ──────────────────────────────────────────────────── + +def test_parse_json_list_elements(): + raw = json.dumps([ + {"type": "heading", "text": "Title"}, + {"type": "paragraph", "text": "Body text"}, + ]) + elements, tables, raw_text = _parse_dolphin_output(raw) + assert len(elements) == 2 + assert elements[0].type == "heading" + assert elements[0].text == "Title" + assert elements[1].type == "paragraph" + assert raw_text == "Title\nBody text" + assert tables == [] + + +def test_parse_json_table_extracted(): + raw = json.dumps([ + {"type": "table", "text": "row", "html": "
A
", + "bbox": [0.0, 0.0, 1.0, 0.5]}, + ]) + elements, tables, raw_text = _parse_dolphin_output(raw) + assert len(tables) == 1 + assert tables[0].html == "
A
" + assert tables[0].bbox == [0.0, 0.0, 1.0, 0.5] + assert len(elements) == 1 + assert elements[0].type == "table" + + +def test_parse_plain_text_fallback(): + raw = "This is not JSON at all." + elements, tables, raw_text = _parse_dolphin_output(raw) + assert len(elements) == 1 + assert elements[0].type == "paragraph" + assert elements[0].text == raw + assert tables == [] + assert raw_text == raw + + +def test_parse_empty_string_fallback(): + elements, tables, raw_text = _parse_dolphin_output("") + assert len(elements) == 1 + assert elements[0].type == "paragraph" + assert elements[0].text == "" + + +def test_parse_json_missing_type_defaults_to_paragraph(): + raw = json.dumps([{"text": "no type field"}]) + elements, tables, _ = _parse_dolphin_output(raw) + assert elements[0].type == "paragraph" + + +# ── POST /extract ───────────────────────────────────────────────────────────── + +def test_extract_image_b64(client, mock_model): + resp = client.post("/extract", json={"image_b64": _make_jpeg_b64(), "hint": "auto"}) + assert resp.status_code == 200 + data = resp.json() + assert "elements" in data + assert "raw_text" in data + assert "tables" in data + assert data["metadata"]["hint"] == "auto" + assert data["metadata"]["model"] == "/fake/model" + assert data["metadata"]["width"] == 10 + assert data["metadata"]["height"] == 10 + + +def test_extract_hint_table_routes_correct_prompt(client, mock_model): + _, fake_processor = mock_model + resp = client.post("/extract", json={"image_b64": _make_jpeg_b64(), "hint": "table"}) + assert resp.status_code == 200 + # Verify processor was called with the table-specific prompt + call_kwargs = fake_processor.call_args + assert "table" in call_kwargs.kwargs.get("text", "") or \ + "table" in str(call_kwargs) + + +def test_extract_hint_unknown_falls_back_to_auto(client, mock_model): + """An unrecognised hint silently falls back to the 'auto' prompt.""" + resp = client.post("/extract", json={"image_b64": _make_jpeg_b64(), "hint": "nonsense"}) + assert resp.status_code == 200 + + +def test_extract_image_path(tmp_path, client, mock_model): + img_file = tmp_path / "doc.png" + Image.new("RGB", (8, 8), color=(0, 0, 0)).save(img_file) + resp = client.post("/extract", json={"image_path": str(img_file)}) + assert resp.status_code == 200 + assert resp.json()["metadata"]["width"] == 8 + + +def test_extract_image_path_not_found(client, mock_model): + resp = client.post("/extract", json={"image_path": "/nonexistent/path/img.png"}) + assert resp.status_code == 404 + + +def test_extract_no_image_raises_422(client, mock_model): + resp = client.post("/extract", json={"hint": "auto"}) + assert resp.status_code == 422 + + +def test_extract_response_includes_tables(client, mock_model): + """Verify table objects surface in response when model returns table elements.""" + resp = client.post("/extract", json={"image_b64": _make_jpeg_b64()}) + assert resp.status_code == 200 + data = resp.json() + assert len(data["tables"]) == 1 + assert "" in data["tables"][0]["html"] + + +def test_extract_device_in_metadata(client, mock_model): + resp = client.post("/extract", json={"image_b64": _make_jpeg_b64()}) + assert resp.status_code == 200 + assert "device" in resp.json()["metadata"]