feat: hardware detection, cf-docuvision service, documents ingestion pipeline #14

Merged
pyr0ball merged 1 commit from feature/hardware-docuvision into main 2026-04-02 18:55:51 -07:00
25 changed files with 2008 additions and 0 deletions

View file

@ -0,0 +1,18 @@
"""
circuitforge_core.documents shared document ingestion pipeline.
Primary entry point::
from circuitforge_core.documents import ingest, StructuredDocument
doc: StructuredDocument = ingest(image_bytes, hint="auto")
"""
from .ingest import ingest
from .models import Element, ParsedTable, StructuredDocument
__all__ = [
"ingest",
"Element",
"ParsedTable",
"StructuredDocument",
]

View file

@ -0,0 +1,84 @@
"""
circuitforge_core.documents.client HTTP client for the cf-docuvision service.
Thin wrapper around the cf-docuvision FastAPI service's POST /extract endpoint.
Used by ingest() as the primary path; callers should not use this directly.
"""
from __future__ import annotations
import base64
import logging
from typing import Any
import requests
from .models import Element, ParsedTable, StructuredDocument
logger = logging.getLogger(__name__)
_DEFAULT_TIMEOUT_S = 60
class DocuvisionClient:
"""Synchronous HTTP client for cf-docuvision.
Args:
base_url: Root URL of the cf-docuvision service, e.g. 'http://localhost:8003'
timeout: Request timeout in seconds.
"""
def __init__(self, base_url: str = "http://localhost:8003", timeout: int = _DEFAULT_TIMEOUT_S) -> None:
self.base_url = base_url.rstrip("/")
self.timeout = timeout
def is_healthy(self) -> bool:
"""Return True if the service responds to GET /health."""
try:
resp = requests.get(f"{self.base_url}/health", timeout=3)
return resp.status_code == 200
except Exception:
return False
def extract(self, image_bytes: bytes, hint: str = "auto") -> StructuredDocument:
"""
Submit image bytes to cf-docuvision and return a StructuredDocument.
Raises:
requests.HTTPError: if the service returns a non-2xx status.
requests.ConnectionError / requests.Timeout: if the service is unreachable.
"""
payload = {
"image_b64": base64.b64encode(image_bytes).decode(),
"hint": hint,
}
resp = requests.post(
f"{self.base_url}/extract",
json=payload,
timeout=self.timeout,
)
resp.raise_for_status()
return _parse_response(resp.json())
def _parse_response(data: dict[str, Any]) -> StructuredDocument:
elements = [
Element(
type=e["type"],
text=e["text"],
bbox=tuple(e["bbox"]) if e.get("bbox") else None,
)
for e in data.get("elements", [])
]
tables = [
ParsedTable(
html=t["html"],
bbox=tuple(t["bbox"]) if t.get("bbox") else None,
)
for t in data.get("tables", [])
]
return StructuredDocument(
elements=elements,
raw_text=data.get("raw_text", ""),
tables=tables,
metadata=data.get("metadata", {}),
)

View file

@ -0,0 +1,137 @@
"""
circuitforge_core.documents.ingest public document ingestion entry point.
Primary path: cf-docuvision HTTP service (Dolphin-v2, layout-aware).
Fallback path: LLMRouter vision call (lower fidelity, no layout/bbox).
Usage::
from circuitforge_core.documents import ingest
with open("receipt.jpg", "rb") as f:
doc = ingest(f.read(), hint="table")
print(doc.raw_text)
for table in doc.tables:
print(table.html)
"""
from __future__ import annotations
import base64
import logging
import os
from typing import Any
from .client import DocuvisionClient
from .models import Element, StructuredDocument
logger = logging.getLogger(__name__)
_DOCUVISION_URL_ENV = "CF_DOCUVISION_URL"
_DOCUVISION_URL_DEFAULT = "http://localhost:8003"
_LLM_FALLBACK_PROMPTS: dict[str, str] = {
"auto": "Extract all text from this document. Return a JSON array of {\"type\": ..., \"text\": ...} objects.",
"table": "Extract all tables from this document as HTML. Return a JSON array including {\"type\": \"table\", \"html\": ..., \"text\": ...} objects.",
"text": "Extract all text from this document preserving headings and paragraphs. Return a JSON array of {\"type\": ..., \"text\": ...} objects.",
"form": "Extract all form field labels and values from this document. Return a JSON array of {\"type\": ..., \"text\": ...} objects.",
}
def ingest(
image_bytes: bytes,
hint: str = "auto",
*,
docuvision_url: str | None = None,
llm_router: Any | None = None,
llm_config_path: Any | None = None,
) -> StructuredDocument:
"""
Ingest an image and return a StructuredDocument.
Tries cf-docuvision first; falls back to LLMRouter vision if the service is
unavailable or fails. If neither is available, returns an empty document.
Args:
image_bytes: Raw bytes of the image (JPEG, PNG, etc.).
hint: Extraction mode: "auto" | "table" | "text" | "form".
docuvision_url: Override service URL (defaults to CF_DOCUVISION_URL env or localhost:8003).
llm_router: Pre-built LLMRouter instance for fallback (optional).
llm_config_path: Path to llm.yaml for lazy-constructing LLMRouter if needed.
Returns:
StructuredDocument always, even on total failure (empty document).
"""
url = docuvision_url or os.environ.get(_DOCUVISION_URL_ENV, _DOCUVISION_URL_DEFAULT)
client = DocuvisionClient(base_url=url)
# ── primary: cf-docuvision ────────────────────────────────────────────────
try:
if client.is_healthy():
doc = client.extract(image_bytes, hint=hint)
logger.debug("ingest: cf-docuvision succeeded (%d elements)", len(doc.elements))
return doc
logger.debug("ingest: cf-docuvision unhealthy, falling back to LLM")
except Exception as exc:
logger.warning("ingest: cf-docuvision failed (%s), falling back to LLM", exc)
# ── fallback: LLMRouter vision ────────────────────────────────────────────
router = llm_router or _build_llm_router(llm_config_path)
if router is None:
logger.warning("ingest: no LLM router available; returning empty document")
return StructuredDocument(metadata={"source": "none", "hint": hint})
try:
return _llm_ingest(router, image_bytes, hint)
except Exception as exc:
logger.warning("ingest: LLM fallback failed (%s); returning empty document", exc)
return StructuredDocument(metadata={"source": "llm_error", "hint": hint, "error": str(exc)})
# ── helpers ───────────────────────────────────────────────────────────────────
def _build_llm_router(config_path: Any | None) -> Any | None:
"""Lazily construct an LLMRouter; return None if unavailable."""
try:
from circuitforge_core.llm import LLMRouter
kwargs: dict[str, Any] = {}
if config_path is not None:
kwargs["config_path"] = config_path
return LLMRouter(**kwargs)
except Exception as exc:
logger.debug("ingest: could not build LLMRouter: %s", exc)
return None
def _llm_ingest(router: Any, image_bytes: bytes, hint: str) -> StructuredDocument:
"""Use LLMRouter's vision capability to extract document text."""
import json
prompt = _LLM_FALLBACK_PROMPTS.get(hint, _LLM_FALLBACK_PROMPTS["auto"])
b64 = base64.b64encode(image_bytes).decode()
raw = router.generate_vision(
prompt=prompt,
image_b64=b64,
)
# Try to parse structured output; fall back to single paragraph
elements: list[Element] = []
try:
parsed = json.loads(raw)
if isinstance(parsed, list):
for item in parsed:
elements.append(Element(
type=item.get("type", "paragraph"),
text=item.get("text", ""),
))
except (json.JSONDecodeError, TypeError):
elements = [Element(type="paragraph", text=raw.strip())]
raw_text = "\n".join(e.text for e in elements)
return StructuredDocument(
elements=elements,
raw_text=raw_text,
tables=[],
metadata={"source": "llm_fallback", "hint": hint},
)

View file

@ -0,0 +1,53 @@
"""
circuitforge_core.documents.models shared document data types.
These are the canonical output types from the document ingestion pipeline.
All consumers (kiwi, falcon, peregrine, godwit, ) receive a StructuredDocument
regardless of whether Dolphin-v2 or LLM fallback was used.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(frozen=True)
class Element:
"""A single logical content unit within a document.
type: one of heading | paragraph | list | table | figure | formula | code
text: extracted plain text (for tables: may be row summary or empty)
bbox: normalised [x0, y0, x1, y1] in 0-1 space, None when unavailable
"""
type: str
text: str
bbox: tuple[float, float, float, float] | None = None
@dataclass(frozen=True)
class ParsedTable:
"""An extracted table rendered as HTML."""
html: str
bbox: tuple[float, float, float, float] | None = None
@dataclass
class StructuredDocument:
"""
The canonical result of document ingestion.
Produced by ingest() for any input image regardless of which backend
(cf-docuvision or LLM fallback) processed it.
"""
elements: list[Element] = field(default_factory=list)
raw_text: str = ""
tables: list[ParsedTable] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
@property
def headings(self) -> list[Element]:
return [e for e in self.elements if e.type == "heading"]
@property
def paragraphs(self) -> list[Element]:
return [e for e in self.elements if e.type == "paragraph"]

View file

@ -0,0 +1,30 @@
# circuitforge_core/hardware/__init__.py
"""
Hardware detection and LLM profile generation.
Typical usage::
from circuitforge_core.hardware import detect_hardware, generate_profile
import yaml
spec = detect_hardware()
config = generate_profile(spec)
print(yaml.dump(config.to_dict()))
print("Recommended profile:", config.profile_name)
"""
from .detect import detect_hardware, detect_hardware_json
from .generator import generate_profile
from .models import HardwareSpec, LLMBackendConfig, LLMConfig
from .tiers import VRAM_TIERS, VramTier, select_tier
__all__ = [
"detect_hardware",
"detect_hardware_json",
"generate_profile",
"HardwareSpec",
"LLMBackendConfig",
"LLMConfig",
"VRAM_TIERS",
"VramTier",
"select_tier",
]

View file

@ -0,0 +1,196 @@
# circuitforge_core/hardware/detect.py
"""
Cross-platform hardware auto-detection.
Reads GPU info from:
- nvidia-smi (NVIDIA, Linux/Windows)
- rocm-smi (AMD, Linux)
- system_profiler (Apple Silicon, macOS)
- /proc/meminfo (Linux RAM)
- psutil (cross-platform RAM fallback)
Returns a HardwareSpec. On detection failure, returns a conservative
CPU-only spec so callers always get a usable result.
"""
from __future__ import annotations
import json
import platform
import re
import subprocess
import sys
from pathlib import Path
from .models import HardwareSpec
def _run(*args: str, timeout: int = 5) -> str:
"""Run a subprocess and return stdout, or empty string on any error."""
try:
result = subprocess.run(
list(args), capture_output=True, text=True, timeout=timeout
)
return result.stdout.strip()
except Exception:
return ""
def _ram_mb() -> int:
"""Return total system RAM in MB."""
# psutil is optional but preferred
try:
import psutil # type: ignore[import-untyped]
return psutil.virtual_memory().total // (1024 * 1024)
except ImportError:
pass
# Linux /proc/meminfo fallback
mem_info = Path("/proc/meminfo")
if mem_info.exists():
for line in mem_info.read_text().splitlines():
if line.startswith("MemTotal:"):
kb = int(line.split()[1])
return kb // 1024
return 0
def _detect_nvidia() -> tuple[int, int, str, str, str] | None:
"""
Returns (vram_mb, gpu_count, gpu_name, cuda_version, vendor) or None.
Uses nvidia-smi --query-gpu for reliable machine-parseable output.
"""
out = _run(
"nvidia-smi",
"--query-gpu=name,memory.total",
"--format=csv,noheader,nounits",
)
if not out:
return None
lines = [l.strip() for l in out.splitlines() if l.strip()]
if not lines:
return None
gpu_count = len(lines)
# Use the first GPU's VRAM as the representative value
parts = lines[0].split(",")
if len(parts) < 2:
return None
gpu_name = parts[0].strip()
try:
vram_mb = int(parts[1].strip())
except ValueError:
return None
# CUDA version from nvidia-smi header
header = _run("nvidia-smi", "--query", "--display=COMPUTE")
cuda_match = re.search(r"CUDA Version\s*:\s*([\d.]+)", header)
cuda_version = cuda_match.group(1) if cuda_match else ""
return vram_mb, gpu_count, gpu_name, cuda_version, "nvidia"
def _detect_amd() -> tuple[int, int, str, str, str] | None:
"""Returns (vram_mb, gpu_count, gpu_name, rocm_version, vendor) or None."""
out = _run("rocm-smi", "--showmeminfo", "vram", "--json")
if not out:
return None
try:
data = json.loads(out)
except json.JSONDecodeError:
return None
cards = [k for k in data if k.startswith("card")]
if not cards:
return None
gpu_count = len(cards)
first = data[cards[0]]
try:
vram_mb = int(first.get("VRAM Total Memory (B)", 0)) // (1024 * 1024)
except (ValueError, TypeError):
return None
gpu_name = first.get("Card series", "AMD GPU")
rocm_out = _run("rocminfo")
rocm_match = re.search(r"ROCm Runtime Version\s*:\s*([\d.]+)", rocm_out)
rocm_version = rocm_match.group(1) if rocm_match else ""
return vram_mb, gpu_count, gpu_name, rocm_version, "amd"
def _detect_apple() -> tuple[int, int, str, str, str] | None:
"""
Returns (unified_ram_mb, 1, gpu_name, '', 'apple') or None.
Apple Silicon shares RAM between CPU and GPU; we report total RAM as VRAM.
"""
if platform.system() != "Darwin":
return None
# Check for Apple Silicon
arm_check = _run("sysctl", "-n", "hw.optional.arm64")
if arm_check.strip() != "1":
return None
out = _run("system_profiler", "SPHardwareDataType", "-json")
try:
data = json.loads(out)
hw = data["SPHardwareDataType"][0]
chip = hw.get("chip_type", "Apple Silicon")
ram_str = hw.get("physical_memory", "0 GB")
ram_gb = int(re.search(r"(\d+)", ram_str).group(1)) # type: ignore[union-attr]
vram_mb = ram_gb * 1024 # unified memory
except Exception:
return None
return vram_mb, 1, chip, "", "apple"
def detect_hardware() -> HardwareSpec:
"""
Auto-detect hardware and return a HardwareSpec.
Detection order: NVIDIA AMD Apple CPU fallback.
Never raises returns a CPU-only spec on any detection failure.
"""
ram_mb = _ram_mb()
for detector in (_detect_nvidia, _detect_amd, _detect_apple):
try:
result = detector()
except Exception:
result = None
if result is not None:
vram_mb, gpu_count, gpu_name, version, vendor = result
return HardwareSpec(
vram_mb=vram_mb,
ram_mb=ram_mb,
gpu_count=gpu_count,
gpu_vendor=vendor,
gpu_name=gpu_name,
cuda_version=version if vendor == "nvidia" else "",
rocm_version=version if vendor == "amd" else "",
)
# CPU-only fallback
return HardwareSpec(
vram_mb=0,
ram_mb=ram_mb,
gpu_count=0,
gpu_vendor="cpu",
gpu_name="",
)
def detect_hardware_json() -> str:
"""Return detect_hardware() result as a JSON string (for CLI / one-liner use)."""
import dataclasses
spec = detect_hardware()
return json.dumps(dataclasses.asdict(spec), indent=2)
if __name__ == "__main__":
print(detect_hardware_json())

View file

@ -0,0 +1,91 @@
# circuitforge_core/hardware/generator.py
"""
Profile generator: HardwareSpec LLMConfig.
`generate_profile()` is the main entry point. It selects the appropriate
VRAM tier, builds a complete LLMConfig dict ready to write as llm.yaml,
and returns the matching public GpuProfile name for orch use.
"""
from __future__ import annotations
from .models import HardwareSpec, LLMBackendConfig, LLMConfig
from .tiers import select_tier
# Default backend URLs — overridable for non-standard setups
_OLLAMA_URL = "http://localhost:11434"
_VLLM_URL = "http://localhost:8000"
_VISION_URL = "http://localhost:8002"
_DOCUVISION_URL = "http://localhost:8003"
def generate_profile(
spec: HardwareSpec,
*,
ollama_url: str = _OLLAMA_URL,
vllm_url: str = _VLLM_URL,
vision_url: str = _VISION_URL,
docuvision_url: str = _DOCUVISION_URL,
) -> LLMConfig:
"""
Map a HardwareSpec to an LLMConfig.
Returns an LLMConfig whose `profile_name` matches a public GpuProfile YAML
in `circuitforge_core/resources/profiles/public/` so the orch can load the
correct service allocation profile automatically.
"""
tier = select_tier(spec.vram_mb)
has_vllm = "vllm" in tier.services
has_vision = "cf-vision" in tier.services
has_docuvision = "cf-docuvision" in tier.services
backends: dict[str, LLMBackendConfig] = {}
# Ollama is always available (CPU fallback)
backends["ollama"] = LLMBackendConfig(
enabled=True,
url=ollama_url,
model=tier.ollama_model,
)
# vllm — only on GPU tiers that can fit a model
if has_vllm and tier.vllm_candidates:
backends["vllm"] = LLMBackendConfig(
enabled=True,
url=vllm_url,
model_candidates=list(tier.vllm_candidates),
)
# Vision service
if has_vision:
backends["vision_service"] = LLMBackendConfig(
enabled=True,
url=vision_url,
)
# Docuvision service
if has_docuvision:
backends["docuvision_service"] = LLMBackendConfig(
enabled=True,
url=docuvision_url,
)
# Fallback order: prefer vllm over ollama when available (faster for batch)
if has_vllm:
fallback = ["vllm", "ollama"]
research_fallback = ["vllm", "ollama"]
else:
fallback = ["ollama"]
research_fallback = ["ollama"]
vision_fallback = (
["vision_service"] if has_vision else []
) + ["ollama"]
return LLMConfig(
profile_name=tier.profile_name,
backends=backends,
fallback_order=fallback,
research_fallback_order=research_fallback,
vision_fallback_order=vision_fallback,
)

View file

@ -0,0 +1,60 @@
# circuitforge_core/hardware/models.py
"""Data models for hardware detection and LLM configuration output."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(frozen=True)
class HardwareSpec:
"""Describes a user's hardware as detected or manually entered."""
vram_mb: int # total VRAM per primary GPU (0 = CPU-only)
ram_mb: int # total system RAM
gpu_count: int # number of GPUs
gpu_vendor: str # "nvidia" | "amd" | "apple" | "cpu"
gpu_name: str = "" # human-readable card name, e.g. "RTX 4080"
cuda_version: str = "" # e.g. "12.4" (empty if not CUDA)
rocm_version: str = "" # e.g. "5.7" (empty if not ROCm)
@dataclass
class LLMBackendConfig:
"""Configuration for a single LLM backend."""
enabled: bool
url: str
model: str = ""
model_candidates: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {"enabled": self.enabled, "url": self.url}
if self.model:
d["model"] = self.model
if self.model_candidates:
d["model_candidates"] = self.model_candidates
return d
@dataclass
class LLMConfig:
"""
Ready-to-serialize llm.yaml configuration.
Matches the schema consumed by LLMRouter in circuitforge products.
"""
profile_name: str # e.g. "single-gpu-8gb" — matches a public GpuProfile
backends: dict[str, LLMBackendConfig] = field(default_factory=dict)
fallback_order: list[str] = field(default_factory=list)
research_fallback_order: list[str] = field(default_factory=list)
vision_fallback_order: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"backends": {k: v.to_dict() for k, v in self.backends.items()},
"fallback_order": self.fallback_order,
"research_fallback_order": self.research_fallback_order,
"vision_fallback_order": self.vision_fallback_order,
}

View file

@ -0,0 +1,104 @@
# circuitforge_core/hardware/tiers.py
"""
VRAM tier ladder and model catalog.
Tiers map hardware VRAM (per-GPU) to:
- profile_name: matching public GpuProfile YAML in profiles/public/
- ollama_model: best default Ollama model for this tier
- vllm_candidates: ordered list of HF model dirs to try via cf-orch/vllm
- services: which cf-* managed services are available at this tier
- llm_max_params: rough upper bound, for human-readable display
"""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass(frozen=True)
class VramTier:
vram_min_mb: int # inclusive lower bound (0 = CPU-only)
vram_max_mb: int # exclusive upper bound (use sys.maxsize for the top tier)
profile_name: str # public GpuProfile YAML stem
ollama_model: str # e.g. "qwen2.5:7b-instruct-q4_k_m"
vllm_candidates: list[str] = field(default_factory=list)
services: list[str] = field(default_factory=list)
llm_max_params: str = "" # human-readable, e.g. "7b-q4"
# Ordered from smallest to largest — first match wins in select_tier().
VRAM_TIERS: list[VramTier] = [
VramTier(
vram_min_mb=0,
vram_max_mb=1,
profile_name="cpu-16gb",
ollama_model="qwen2.5:1.5b-instruct-q4_k_m",
vllm_candidates=[],
services=["ollama", "cf-stt", "cf-tts"],
llm_max_params="3b-q4",
),
VramTier(
vram_min_mb=1,
vram_max_mb=3_000,
profile_name="single-gpu-2gb",
ollama_model="qwen2.5:1.5b-instruct-q4_k_m",
vllm_candidates=[],
services=["ollama"],
llm_max_params="1.5b",
),
VramTier(
vram_min_mb=3_000,
vram_max_mb=5_000,
profile_name="single-gpu-4gb",
ollama_model="qwen2.5:3b-instruct-q4_k_m",
vllm_candidates=[],
services=["ollama", "cf-vision", "cf-stt", "cf-tts"],
llm_max_params="3b",
),
VramTier(
vram_min_mb=5_000,
vram_max_mb=7_000,
profile_name="single-gpu-6gb",
ollama_model="qwen2.5:7b-instruct-q4_k_m",
vllm_candidates=["Qwen2.5-3B-Instruct", "Phi-4-mini-instruct"],
services=["ollama", "vllm", "cf-vision", "cf-docuvision", "cf-stt", "cf-tts"],
llm_max_params="7b-q4",
),
VramTier(
vram_min_mb=7_000,
vram_max_mb=12_000,
profile_name="single-gpu-8gb",
ollama_model="qwen2.5:7b-instruct",
vllm_candidates=["Qwen2.5-3B-Instruct", "Phi-4-mini-instruct"],
services=["ollama", "vllm", "cf-vision", "cf-docuvision", "cf-stt", "cf-tts"],
llm_max_params="8b",
),
VramTier(
vram_min_mb=12_000,
vram_max_mb=20_000,
profile_name="single-gpu-16gb",
ollama_model="qwen2.5:14b-instruct-q4_k_m",
vllm_candidates=["Qwen2.5-14B-Instruct", "Qwen2.5-3B-Instruct", "Phi-4-mini-instruct"],
services=["ollama", "vllm", "cf-vision", "cf-docuvision", "cf-stt", "cf-tts",
"cf-embed", "cf-classify"],
llm_max_params="14b",
),
VramTier(
vram_min_mb=20_000,
vram_max_mb=10 ** 9,
profile_name="single-gpu-24gb",
ollama_model="qwen2.5:32b-instruct-q4_k_m",
vllm_candidates=["Qwen2.5-14B-Instruct", "Qwen2.5-3B-Instruct", "Phi-4-mini-instruct"],
services=["ollama", "vllm", "cf-vision", "cf-docuvision", "cf-stt", "cf-tts",
"cf-embed", "cf-classify", "comfyui"],
llm_max_params="32b-q4",
),
]
def select_tier(vram_mb: int) -> VramTier:
"""Return the best matching tier for the given per-GPU VRAM in MB."""
for tier in VRAM_TIERS:
if tier.vram_min_mb <= vram_mb < tier.vram_max_mb:
return tier
# Fallback: return the top tier for unusually large VRAM
return VRAM_TIERS[-1]

View file

@ -0,0 +1,250 @@
"""
cf-docuvision managed document understanding service.
Wraps ByteDance/Dolphin-v2 (Qwen2.5-VL backbone) behind a simple HTTP API.
Managed by cf-orch; started/stopped as a ProcessSpec service.
API
---
GET /health {"status": "ok", "model": "<path>"}
POST /extract ExtractResponse
Usage (standalone)::
python -m circuitforge_core.resources.docuvision.app \\
--model /Library/Assets/LLM/docuvision/models/dolphin-v2 \\
--port 8003 --gpu-id 0
"""
from __future__ import annotations
import argparse
import base64
import io
import json
import logging
from contextlib import asynccontextmanager
from typing import Any
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
logger = logging.getLogger(__name__)
# Module-level state — populated by _load_model() on first /extract call
_model: Any = None
_processor: Any = None
_model_path: str = ""
_device: str = "cpu"
# ── lazy loader ───────────────────────────────────────────────────────────────
def _load_model() -> None:
"""Lazy-load Dolphin-v2. Called once on first /extract request."""
global _model, _processor, _device
if _model is not None:
return
import torch
from transformers import AutoProcessor, AutoModelForCausalLM
logger.info("Loading Dolphin-v2 from %s ...", _model_path)
_device = "cuda" if torch.cuda.is_available() else "cpu"
_processor = AutoProcessor.from_pretrained(
_model_path,
trust_remote_code=True,
)
_model = AutoModelForCausalLM.from_pretrained(
_model_path,
trust_remote_code=True,
torch_dtype=torch.float16 if _device == "cuda" else torch.float32,
device_map=_device,
)
_model.eval()
logger.info("Dolphin-v2 loaded on %s", _device)
# ── FastAPI app ───────────────────────────────────────────────────────────────
@asynccontextmanager
async def _lifespan(app: FastAPI):
yield
app = FastAPI(title="cf-docuvision", lifespan=_lifespan)
# ── request / response models ─────────────────────────────────────────────────
class ExtractRequest(BaseModel):
"""
Either image_b64 (base64-encoded bytes) or image_path (absolute path) must
be provided. hint guides the extraction mode:
- "auto" - Dolphin-v2 detects layout and element types automatically
- "table" - optimise for tabular data (receipts, invoices, forms)
- "text" - optimise for dense prose (contracts, letters)
- "form" - optimise for form field extraction
"""
image_b64: str | None = None
image_path: str | None = None
hint: str = "auto"
class ElementOut(BaseModel):
type: str # heading | paragraph | list | table | figure | formula | code
text: str
bbox: list[float] | None = None # [x0, y0, x1, y1] normalised 0-1 if available
class TableOut(BaseModel):
html: str
bbox: list[float] | None = None
class ExtractResponse(BaseModel):
elements: list[ElementOut]
raw_text: str
tables: list[TableOut]
metadata: dict[str, Any]
# ── helpers ───────────────────────────────────────────────────────────────────
_HINT_PROMPTS: dict[str, str] = {
"auto": "Parse this document. Extract all elements with their types and text content.",
"table": "Extract all tables from this document as structured HTML. Also extract any line-item text.",
"text": "Extract all text from this document preserving paragraph and heading structure.",
"form": "Extract all form fields from this document. Return field labels and their values.",
}
def _image_from_request(req: ExtractRequest):
"""Return a PIL Image from either image_b64 or image_path."""
from PIL import Image
if req.image_b64:
img_bytes = base64.b64decode(req.image_b64)
return Image.open(io.BytesIO(img_bytes)).convert("RGB")
if req.image_path:
from pathlib import Path
p = Path(req.image_path)
if not p.exists():
raise HTTPException(status_code=404, detail=f"image_path not found: {req.image_path}")
return Image.open(p).convert("RGB")
raise HTTPException(status_code=422, detail="Either image_b64 or image_path must be provided")
def _parse_dolphin_output(raw: str) -> tuple[list[ElementOut], list[TableOut], str]:
"""
Parse Dolphin-v2's structured output into elements and tables.
Dolphin-v2 returns a JSON array of element dicts with keys:
type, text, [html], [bbox]
Falls back gracefully if the model returns plain text instead.
"""
elements: list[ElementOut] = []
tables: list[TableOut] = []
# Try JSON parse first
try:
parsed = json.loads(raw)
if isinstance(parsed, list):
for item in parsed:
etype = item.get("type", "paragraph")
text = item.get("text", "")
bbox = item.get("bbox")
if etype == "table":
tables.append(TableOut(html=item.get("html", text), bbox=bbox))
elements.append(ElementOut(type=etype, text=text, bbox=bbox))
raw_text = "\n".join(e.text for e in elements)
return elements, tables, raw_text
except (json.JSONDecodeError, TypeError):
pass
# Plain-text fallback: treat entire output as a single paragraph
elements = [ElementOut(type="paragraph", text=raw.strip())]
return elements, tables, raw.strip()
# ── routes ────────────────────────────────────────────────────────────────────
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok", "model": _model_path}
@app.post("/extract", response_model=ExtractResponse)
async def extract(req: ExtractRequest) -> ExtractResponse:
_load_model()
image = _image_from_request(req)
prompt = _HINT_PROMPTS.get(req.hint, _HINT_PROMPTS["auto"])
import torch
inputs = _processor(
text=prompt,
images=image,
return_tensors="pt",
).to(_device)
with torch.no_grad():
output_ids = _model.generate(
**inputs,
max_new_tokens=2048,
do_sample=False,
)
# Decode only the newly generated tokens
input_len = inputs["input_ids"].shape[1]
raw_output = _processor.decode(
output_ids[0][input_len:],
skip_special_tokens=True,
)
elements, tables, raw_text = _parse_dolphin_output(raw_output)
w, h = image.size
return ExtractResponse(
elements=elements,
raw_text=raw_text,
tables=tables,
metadata={
"hint": req.hint,
"width": w,
"height": h,
"model": _model_path,
"device": _device,
},
)
# ── CLI entry point ───────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="cf-docuvision service")
parser.add_argument("--model", required=True, help="Path to Dolphin-v2 model directory")
parser.add_argument("--port", type=int, default=8003)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--gpu-id", type=int, default=0)
args = parser.parse_args()
global _model_path
_model_path = args.model
import os
os.environ.setdefault("CUDA_VISIBLE_DEVICES", str(args.gpu_id))
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s")
uvicorn.run(app, host=args.host, port=args.port)
if __name__ == "__main__":
main()

View file

@ -27,6 +27,13 @@ services:
priority: 2
shared: true
max_concurrent: 3
managed:
type: process
exec_path: "/devl/miniconda3/envs/cf/bin/python"
args_template: "-m circuitforge_core.resources.docuvision.app --model /Library/Assets/LLM/docuvision/models/dolphin-v2 --port {port} --gpu-id {gpu_id}"
port: 8003
host_port: 8003
cwd: "/Library/Development/CircuitForge/circuitforge-core"
cf-stt:
max_mb: 1200
priority: 2

View file

@ -27,6 +27,13 @@ services:
priority: 2
shared: true
max_concurrent: 4
managed:
type: process
exec_path: "/devl/miniconda3/envs/cf/bin/python"
args_template: "-m circuitforge_core.resources.docuvision.app --model /Library/Assets/LLM/docuvision/models/dolphin-v2 --port {port} --gpu-id {gpu_id}"
port: 8003
host_port: 8003
cwd: "/Library/Development/CircuitForge/circuitforge-core"
cf-stt:
max_mb: 1200
priority: 2

View file

@ -27,6 +27,13 @@ services:
priority: 2
shared: true
max_concurrent: 1
managed:
type: process
exec_path: "/devl/miniconda3/envs/cf/bin/python"
args_template: "-m circuitforge_core.resources.docuvision.app --model /Library/Assets/LLM/docuvision/models/dolphin-v2 --port {port} --gpu-id {gpu_id}"
port: 8003
host_port: 8003
cwd: "/Library/Development/CircuitForge/circuitforge-core"
cf-stt:
max_mb: 600
priority: 2

View file

@ -27,6 +27,13 @@ services:
priority: 2
shared: true
max_concurrent: 2
managed:
type: process
exec_path: "/devl/miniconda3/envs/cf/bin/python"
args_template: "-m circuitforge_core.resources.docuvision.app --model /Library/Assets/LLM/docuvision/models/dolphin-v2 --port {port} --gpu-id {gpu_id}"
port: 8003
host_port: 8003
cwd: "/Library/Development/CircuitForge/circuitforge-core"
cf-stt:
max_mb: 1200
priority: 2

View file

View file

@ -0,0 +1,93 @@
# tests/test_documents/test_client.py
"""Unit tests for DocuvisionClient."""
from __future__ import annotations
import base64
import json
from unittest.mock import MagicMock, patch
import pytest
from circuitforge_core.documents.client import DocuvisionClient
def _mock_response(data: dict, status: int = 200) -> MagicMock:
resp = MagicMock()
resp.status_code = status
resp.json.return_value = data
resp.raise_for_status = MagicMock()
if status >= 400:
import requests
resp.raise_for_status.side_effect = requests.HTTPError(response=resp)
return resp
_EXTRACT_RESPONSE = {
"elements": [
{"type": "heading", "text": "Invoice", "bbox": [0.0, 0.0, 1.0, 0.1]},
{"type": "paragraph", "text": "Due: $100"},
],
"raw_text": "Invoice\nDue: $100",
"tables": [
{"html": "<table><tr><td>$100</td></tr></table>", "bbox": None},
],
"metadata": {"hint": "auto", "width": 800, "height": 1200, "model": "/fake/model", "device": "cpu"},
}
def test_is_healthy_true():
with patch("requests.get", return_value=_mock_response({}, 200)):
client = DocuvisionClient("http://localhost:8003")
assert client.is_healthy() is True
def test_is_healthy_false_on_error():
with patch("requests.get", side_effect=ConnectionError("refused")):
client = DocuvisionClient("http://localhost:8003")
assert client.is_healthy() is False
def test_is_healthy_false_on_500():
with patch("requests.get", return_value=_mock_response({}, 500)):
client = DocuvisionClient("http://localhost:8003")
assert client.is_healthy() is False
def test_extract_returns_structured_document():
with patch("requests.post", return_value=_mock_response(_EXTRACT_RESPONSE)):
client = DocuvisionClient()
doc = client.extract(b"fake-image-bytes", hint="auto")
assert doc.raw_text == "Invoice\nDue: $100"
assert len(doc.elements) == 2
assert doc.elements[0].type == "heading"
assert doc.elements[0].bbox == (0.0, 0.0, 1.0, 0.1)
assert len(doc.tables) == 1
assert "$100" in doc.tables[0].html
def test_extract_sends_base64_image():
with patch("requests.post", return_value=_mock_response(_EXTRACT_RESPONSE)) as mock_post:
client = DocuvisionClient()
client.extract(b"pixels", hint="table")
call_json = mock_post.call_args.kwargs["json"]
assert call_json["hint"] == "table"
assert base64.b64decode(call_json["image_b64"]) == b"pixels"
def test_extract_raises_on_http_error():
import requests as req_lib
with patch("requests.post", return_value=_mock_response({}, 422)):
client = DocuvisionClient()
with pytest.raises(req_lib.HTTPError):
client.extract(b"bad")
def test_extract_table_bbox_none_when_missing():
response = dict(_EXTRACT_RESPONSE)
response["tables"] = [{"html": "<table/>"}]
with patch("requests.post", return_value=_mock_response(response)):
client = DocuvisionClient()
doc = client.extract(b"img")
assert doc.tables[0].bbox is None

View file

@ -0,0 +1,140 @@
# tests/test_documents/test_ingest.py
"""Unit tests for circuitforge_core.documents.ingest."""
from __future__ import annotations
import json
from unittest.mock import MagicMock, patch
import pytest
from circuitforge_core.documents import ingest
from circuitforge_core.documents.models import Element, StructuredDocument
def _healthy_client(doc: StructuredDocument) -> MagicMock:
c = MagicMock()
c.is_healthy.return_value = True
c.extract.return_value = doc
return c
def _unhealthy_client() -> MagicMock:
c = MagicMock()
c.is_healthy.return_value = False
return c
def _doc_with_text(text: str) -> StructuredDocument:
return StructuredDocument(
elements=[Element(type="paragraph", text=text)],
raw_text=text,
metadata={"source": "docuvision"},
)
# ── primary path ──────────────────────────────────────────────────────────────
def test_ingest_uses_docuvision_when_healthy():
expected = _doc_with_text("hello world")
with patch("circuitforge_core.documents.ingest.DocuvisionClient", return_value=_healthy_client(expected)):
result = ingest(b"imgbytes", hint="auto")
assert result.raw_text == "hello world"
assert result.elements[0].type == "paragraph"
def test_ingest_passes_hint_to_client():
expected = _doc_with_text("table data")
with patch("circuitforge_core.documents.ingest.DocuvisionClient") as MockClient:
mock_instance = _healthy_client(expected)
MockClient.return_value = mock_instance
ingest(b"imgbytes", hint="table")
mock_instance.extract.assert_called_once_with(b"imgbytes", hint="table")
def test_ingest_uses_custom_url():
expected = _doc_with_text("text")
with patch("circuitforge_core.documents.ingest.DocuvisionClient") as MockClient:
mock_instance = _healthy_client(expected)
MockClient.return_value = mock_instance
ingest(b"imgbytes", docuvision_url="http://myhost:9000")
MockClient.assert_called_once_with(base_url="http://myhost:9000")
# ── fallback path ─────────────────────────────────────────────────────────────
def test_ingest_falls_back_to_llm_when_docuvision_unhealthy():
mock_router = MagicMock()
mock_router.generate_vision.return_value = json.dumps([
{"type": "paragraph", "text": "fallback text"},
])
with patch("circuitforge_core.documents.ingest.DocuvisionClient", return_value=_unhealthy_client()):
result = ingest(b"imgbytes", llm_router=mock_router)
assert result.raw_text == "fallback text"
assert result.metadata["source"] == "llm_fallback"
def test_ingest_llm_fallback_plain_text():
"""LLM returns plain text (not JSON) → single paragraph element."""
mock_router = MagicMock()
mock_router.generate_vision.return_value = "This is plain text output."
with patch("circuitforge_core.documents.ingest.DocuvisionClient", return_value=_unhealthy_client()):
result = ingest(b"imgbytes", llm_router=mock_router)
assert len(result.elements) == 1
assert result.elements[0].type == "paragraph"
assert "plain text" in result.elements[0].text
def test_ingest_returns_empty_doc_when_no_llm():
with patch("circuitforge_core.documents.ingest.DocuvisionClient", return_value=_unhealthy_client()), \
patch("circuitforge_core.documents.ingest._build_llm_router", return_value=None):
result = ingest(b"imgbytes")
assert isinstance(result, StructuredDocument)
assert result.elements == []
assert result.metadata["source"] == "none"
def test_ingest_returns_empty_doc_on_docuvision_exception():
failing_client = MagicMock()
failing_client.is_healthy.return_value = True
failing_client.extract.side_effect = ConnectionError("refused")
with patch("circuitforge_core.documents.ingest.DocuvisionClient", return_value=failing_client), \
patch("circuitforge_core.documents.ingest._build_llm_router", return_value=None):
result = ingest(b"imgbytes")
assert isinstance(result, StructuredDocument)
assert result.metadata["source"] == "none"
def test_ingest_returns_empty_doc_on_llm_exception():
mock_router = MagicMock()
mock_router.generate_vision.side_effect = RuntimeError("GPU OOM")
with patch("circuitforge_core.documents.ingest.DocuvisionClient", return_value=_unhealthy_client()):
result = ingest(b"imgbytes", llm_router=mock_router)
assert isinstance(result, StructuredDocument)
assert result.metadata["source"] == "llm_error"
assert "GPU OOM" in result.metadata["error"]
# ── CF_DOCUVISION_URL env var ─────────────────────────────────────────────────
def test_ingest_reads_url_from_env(monkeypatch):
monkeypatch.setenv("CF_DOCUVISION_URL", "http://envhost:7777")
expected = _doc_with_text("env-routed")
with patch("circuitforge_core.documents.ingest.DocuvisionClient") as MockClient:
mock_instance = _healthy_client(expected)
MockClient.return_value = mock_instance
ingest(b"imgbytes")
MockClient.assert_called_once_with(base_url="http://envhost:7777")

View file

@ -0,0 +1,49 @@
# tests/test_documents/test_models.py
"""Unit tests for circuitforge_core.documents.models."""
from __future__ import annotations
import pytest
from circuitforge_core.documents.models import Element, ParsedTable, StructuredDocument
def test_element_is_frozen():
e = Element(type="heading", text="Title")
with pytest.raises(Exception):
e.text = "changed" # type: ignore[misc]
def test_element_bbox_optional():
e = Element(type="paragraph", text="hello")
assert e.bbox is None
def test_parsed_table_frozen():
t = ParsedTable(html="<table/>")
with pytest.raises(Exception):
t.html = "changed" # type: ignore[misc]
def test_structured_document_defaults():
doc = StructuredDocument()
assert doc.elements == []
assert doc.raw_text == ""
assert doc.tables == []
assert doc.metadata == {}
def test_structured_document_headings_filter():
doc = StructuredDocument(elements=[
Element(type="heading", text="H1"),
Element(type="paragraph", text="body"),
Element(type="heading", text="H2"),
])
assert [e.text for e in doc.headings] == ["H1", "H2"]
def test_structured_document_paragraphs_filter():
doc = StructuredDocument(elements=[
Element(type="heading", text="H1"),
Element(type="paragraph", text="body"),
])
assert len(doc.paragraphs) == 1
assert doc.paragraphs[0].text == "body"

View file

View file

@ -0,0 +1,107 @@
# tests/test_hardware/test_detect.py
"""Tests for hardware auto-detection (subprocess mocked throughout)."""
import json
from unittest.mock import MagicMock, patch
import pytest
from circuitforge_core.hardware.detect import detect_hardware, detect_hardware_json
from circuitforge_core.hardware.models import HardwareSpec
_NVIDIA_SMI_OUTPUT = "NVIDIA GeForce RTX 4080, 16376\n"
_ROCM_SMI_JSON = json.dumps({
"card0": {
"Card series": "Radeon RX 7900 XTX",
"VRAM Total Memory (B)": str(24 * 1024 * 1024 * 1024),
}
})
_SYSTEM_PROFILER_JSON = json.dumps({
"SPHardwareDataType": [{
"chip_type": "Apple M2 Pro",
"physical_memory": "32 GB",
}]
})
def _mock_run(outputs: dict[str, str]):
"""Return a _run replacement that maps first arg → output."""
def fake_run(*args, **kwargs):
cmd = args[0] if args else ""
return outputs.get(cmd, "")
return fake_run
class TestDetectNvidia:
def test_returns_nvidia_spec(self):
with patch("circuitforge_core.hardware.detect._run",
side_effect=lambda *a, **kw: _NVIDIA_SMI_OUTPUT if "query-gpu" in " ".join(a) else ""), \
patch("circuitforge_core.hardware.detect._ram_mb", return_value=32768):
spec = detect_hardware()
assert spec.gpu_vendor == "nvidia"
assert spec.vram_mb == 16376
assert spec.gpu_name == "NVIDIA GeForce RTX 4080"
assert spec.ram_mb == 32768
def test_gpu_count_from_line_count(self):
dual_gpu = "RTX 4080, 16376\nRTX 3090, 24576\n"
with patch("circuitforge_core.hardware.detect._run",
side_effect=lambda *a, **kw: dual_gpu if "query-gpu" in " ".join(a) else ""), \
patch("circuitforge_core.hardware.detect._ram_mb", return_value=65536):
spec = detect_hardware()
assert spec.gpu_count == 2
assert spec.vram_mb == 16376 # first GPU
class TestDetectAmd:
def test_returns_amd_spec_when_nvidia_absent(self):
with patch("circuitforge_core.hardware.detect._run",
side_effect=lambda *a, **kw:
"" if "nvidia" in a[0] else
_ROCM_SMI_JSON if "rocm-smi" in a[0] else ""), \
patch("circuitforge_core.hardware.detect._ram_mb", return_value=65536):
spec = detect_hardware()
assert spec.gpu_vendor == "amd"
assert spec.vram_mb == 24576
assert "7900" in spec.gpu_name
class TestDetectApple:
def test_returns_apple_spec_on_macos(self):
with patch("platform.system", return_value="Darwin"), \
patch("circuitforge_core.hardware.detect._run",
side_effect=lambda *a, **kw:
"1" if "arm64" in " ".join(a) else
_SYSTEM_PROFILER_JSON if "SPHardware" in " ".join(a) else ""), \
patch("circuitforge_core.hardware.detect._ram_mb", return_value=32768):
spec = detect_hardware()
assert spec.gpu_vendor == "apple"
assert spec.vram_mb == 32768 # 32 GB unified
assert "M2" in spec.gpu_name
class TestCpuFallback:
def test_cpu_fallback_when_no_gpu_detected(self):
with patch("circuitforge_core.hardware.detect._run", return_value=""), \
patch("platform.system", return_value="Linux"), \
patch("circuitforge_core.hardware.detect._ram_mb", return_value=16384):
spec = detect_hardware()
assert spec.gpu_vendor == "cpu"
assert spec.vram_mb == 0
assert spec.gpu_count == 0
def test_never_raises(self):
with patch("circuitforge_core.hardware.detect._run", side_effect=RuntimeError("boom")), \
patch("circuitforge_core.hardware.detect._ram_mb", return_value=0):
spec = detect_hardware()
assert isinstance(spec, HardwareSpec)
class TestDetectHardwareJson:
def test_returns_valid_json(self):
with patch("circuitforge_core.hardware.detect._run", return_value=""), \
patch("circuitforge_core.hardware.detect._ram_mb", return_value=8192):
out = detect_hardware_json()
data = json.loads(out)
assert "vram_mb" in data
assert "gpu_vendor" in data

View file

@ -0,0 +1,71 @@
# tests/test_hardware/test_generator.py
"""Tests for the LLMConfig profile generator."""
import pytest
from circuitforge_core.hardware.generator import generate_profile
from circuitforge_core.hardware.models import HardwareSpec
def _spec(vram_mb: int, gpu_vendor: str = "nvidia") -> HardwareSpec:
return HardwareSpec(
vram_mb=vram_mb, ram_mb=32768, gpu_count=1 if vram_mb > 0 else 0,
gpu_vendor=gpu_vendor,
)
class TestGenerateProfile:
def test_cpu_only_no_vllm_backend(self):
config = generate_profile(_spec(0, "cpu"))
assert config.profile_name == "cpu-16gb"
assert "vllm" not in config.backends
assert config.backends["ollama"].enabled
def test_cpu_only_fallback_order_ollama_only(self):
config = generate_profile(_spec(0, "cpu"))
assert config.fallback_order == ["ollama"]
def test_6gb_has_vllm_backend(self):
config = generate_profile(_spec(6144))
assert "vllm" in config.backends
assert config.backends["vllm"].enabled
assert config.backends["vllm"].model_candidates
def test_6gb_has_vision_service(self):
config = generate_profile(_spec(6144))
assert "vision_service" in config.backends
def test_6gb_has_docuvision_service(self):
config = generate_profile(_spec(6144))
assert "docuvision_service" in config.backends
def test_4gb_no_docuvision(self):
config = generate_profile(_spec(4096))
assert "docuvision_service" not in config.backends
def test_vllm_tier_fallback_order_prefers_vllm(self):
config = generate_profile(_spec(8192))
assert config.fallback_order[0] == "vllm"
def test_16gb_profile_name(self):
config = generate_profile(_spec(16384))
assert config.profile_name == "single-gpu-16gb"
def test_to_dict_roundtrip(self):
config = generate_profile(_spec(8192))
d = config.to_dict()
assert "backends" in d
assert "fallback_order" in d
assert "ollama" in d["backends"]
assert d["backends"]["ollama"]["enabled"] is True
def test_custom_ollama_url(self):
config = generate_profile(_spec(8192), ollama_url="http://10.0.0.1:11434")
assert config.backends["ollama"].url == "http://10.0.0.1:11434"
def test_vllm_candidates_populated(self):
config = generate_profile(_spec(8192))
assert len(config.backends["vllm"].model_candidates) >= 1
def test_vision_fallback_includes_vision_service(self):
config = generate_profile(_spec(8192))
assert "vision_service" in config.vision_fallback_order

View file

@ -0,0 +1,67 @@
# tests/test_hardware/test_tiers.py
"""Tests for VRAM tier ladder selection."""
import pytest
from circuitforge_core.hardware.tiers import VRAM_TIERS, select_tier
class TestSelectTier:
def test_zero_vram_returns_cpu_tier(self):
tier = select_tier(0)
assert tier.profile_name == "cpu-16gb"
assert "vllm" not in tier.services
def test_2gb_gpu(self):
tier = select_tier(2048)
assert tier.profile_name == "single-gpu-2gb"
assert "ollama" in tier.services
def test_4gb_gpu(self):
tier = select_tier(4096)
assert tier.profile_name == "single-gpu-4gb"
assert "cf-vision" in tier.services
assert "vllm" not in tier.services
def test_6gb_gpu(self):
tier = select_tier(6144)
assert tier.profile_name == "single-gpu-6gb"
assert "vllm" in tier.services
assert "cf-docuvision" in tier.services
def test_8gb_gpu(self):
tier = select_tier(8192)
assert tier.profile_name == "single-gpu-8gb"
assert "vllm" in tier.services
def test_16gb_gpu(self):
tier = select_tier(16384)
assert tier.profile_name == "single-gpu-16gb"
assert "cf-embed" in tier.services
assert "cf-classify" in tier.services
def test_24gb_gpu(self):
tier = select_tier(24576)
assert tier.profile_name == "single-gpu-24gb"
assert "comfyui" in tier.services
def test_boundary_exact_6gb(self):
"""Exactly 6GB should land in the 6GB tier, not 4GB."""
tier = select_tier(6000)
assert tier.profile_name == "single-gpu-6gb"
def test_boundary_just_below_6gb(self):
tier = select_tier(4999)
assert tier.profile_name == "single-gpu-4gb"
def test_extremely_large_vram_returns_top_tier(self):
tier = select_tier(80 * 1024) # 80GB (H100)
assert tier.profile_name == "single-gpu-24gb"
def test_all_tiers_have_ollama(self):
for t in VRAM_TIERS:
assert "ollama" in t.services, f"{t.profile_name} missing ollama"
def test_vllm_tiers_have_candidates(self):
for t in VRAM_TIERS:
if "vllm" in t.services:
assert t.vllm_candidates, f"{t.profile_name} has vllm but no candidates"

View file

@ -0,0 +1,215 @@
# tests/test_resources/test_coordinator_probe.py
"""
Unit tests for _run_instance_probe_loop in coordinator/app.py.
Covers:
- healthy path: /health 200 state transitions starting running
- timeout path: no healthy response within _PROBE_TIMEOUT_S starting stopped
- cleanup path: non-starting instance cleans up its start_times entry
"""
from __future__ import annotations
import asyncio
from unittest.mock import MagicMock, patch
import pytest
from circuitforge_core.resources.coordinator.app import (
_PROBE_TIMEOUT_S,
_run_instance_probe_loop,
)
from circuitforge_core.resources.coordinator.service_registry import ServiceInstance, ServiceRegistry
# ── helpers ──────────────────────────────────────────────────────────────────
def _inst(**kwargs) -> ServiceInstance:
defaults = dict(
service="vllm", node_id="node1", gpu_id=0,
state="starting", model="qwen", url="http://localhost:8000",
)
defaults.update(kwargs)
return ServiceInstance(**defaults)
def _registry(*instances: ServiceInstance) -> MagicMock:
reg = MagicMock(spec=ServiceRegistry)
reg.all_instances.return_value = list(instances)
return reg
def _health_resp(status: int = 200) -> MagicMock:
"""Context-manager mock that simulates an HTTP response."""
resp = MagicMock()
resp.status = status
resp.__enter__ = lambda s: resp
resp.__exit__ = MagicMock(return_value=False)
return resp
async def _one_tick(coro_fn, registry, *, time_val: float = 1000.0, **url_patch):
"""
Run the probe loop for exactly one iteration then cancel it.
asyncio.sleep is patched to return immediately on the first call
and raise CancelledError on the second (ending the loop cleanly).
"""
calls = 0
async def _fake_sleep(_delay):
nonlocal calls
calls += 1
if calls > 1:
raise asyncio.CancelledError()
patches = [
patch("asyncio.sleep", new=_fake_sleep),
patch("time.time", return_value=time_val),
]
if url_patch:
patches.append(patch("urllib.request.urlopen", **url_patch))
ctx = [p.__enter__() for p in patches]
try:
await coro_fn(registry)
except asyncio.CancelledError:
pass
finally:
for p in reversed(patches):
p.__exit__(None, None, None)
# ── tests ────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_probe_transitions_starting_to_running():
"""GET /health → 200 while in starting state → upsert_instance(state='running')."""
reg = _registry(_inst(state="starting", url="http://localhost:8000"))
calls = 0
async def fake_sleep(_delay):
nonlocal calls
calls += 1
if calls > 1:
raise asyncio.CancelledError()
with patch("asyncio.sleep", new=fake_sleep), \
patch("time.time", return_value=1000.0), \
patch("urllib.request.urlopen", return_value=_health_resp(200)):
try:
await _run_instance_probe_loop(reg)
except asyncio.CancelledError:
pass
reg.upsert_instance.assert_called_once_with(
service="vllm", node_id="node1", gpu_id=0,
state="running", model="qwen", url="http://localhost:8000",
)
@pytest.mark.asyncio
async def test_probe_transitions_starting_to_stopped_on_timeout():
"""No healthy response + time past _PROBE_TIMEOUT_S → upsert_instance(state='stopped').
Tick 1: seeds start_times[key] = 1000.0
Tick 2: time has advanced past _PROBE_TIMEOUT_S timeout fires stopped
Tick 3: CancelledError exits the loop
"""
reg = _registry(_inst(state="starting", url="http://localhost:8000"))
tick = 0
# Tick 1: t=1000 (seed); Tick 2: t=far_future (timeout fires)
times = [1000.0, 1000.0 + _PROBE_TIMEOUT_S + 1.0]
async def fake_sleep(_delay):
nonlocal tick
tick += 1
if tick > 2:
raise asyncio.CancelledError()
with patch("asyncio.sleep", new=fake_sleep), \
patch("time.time", side_effect=times * 10), \
patch("urllib.request.urlopen", side_effect=OSError("connection refused")):
try:
await _run_instance_probe_loop(reg)
except asyncio.CancelledError:
pass
reg.upsert_instance.assert_called_once_with(
service="vllm", node_id="node1", gpu_id=0,
state="stopped", model="qwen", url="http://localhost:8000",
)
@pytest.mark.asyncio
async def test_probe_cleans_up_start_times_for_non_starting():
"""
An instance that is no longer in 'starting' state should not cause
upsert_instance to be called, and its key should be removed from start_times.
We verify this indirectly: run two ticks first with state='starting' (seeds
the key and transitions to running), second with the updated registry returning
state='running' (should not call upsert again).
"""
starting_inst = _inst(state="starting", url="http://localhost:8000")
running_inst = _inst(state="running", url="http://localhost:8000")
tick = 0
# First tick: instance is starting → transitions to running
# Second tick: registry now returns running → no upsert
# Third tick: cancel
def instances_side_effect():
if tick <= 1:
return [starting_inst]
return [running_inst]
reg = MagicMock(spec=ServiceRegistry)
reg.all_instances.side_effect = instances_side_effect
async def fake_sleep(_delay):
nonlocal tick
tick += 1
if tick > 2:
raise asyncio.CancelledError()
with patch("asyncio.sleep", new=fake_sleep), \
patch("time.time", return_value=1000.0), \
patch("urllib.request.urlopen", return_value=_health_resp(200)):
try:
await _run_instance_probe_loop(reg)
except asyncio.CancelledError:
pass
# upsert should have been called exactly once (the starting→running transition)
assert reg.upsert_instance.call_count == 1
reg.upsert_instance.assert_called_once_with(
service="vllm", node_id="node1", gpu_id=0,
state="running", model="qwen", url="http://localhost:8000",
)
@pytest.mark.asyncio
async def test_probe_no_url_does_not_attempt_health_check():
"""Instance with no URL stays in starting state (no health check, no timeout yet)."""
reg = _registry(_inst(state="starting", url=None))
tick = 0
async def fake_sleep(_delay):
nonlocal tick
tick += 1
if tick > 1:
raise asyncio.CancelledError()
with patch("asyncio.sleep", new=fake_sleep), \
patch("time.time", return_value=1000.0), \
patch("urllib.request.urlopen") as mock_urlopen:
try:
await _run_instance_probe_loop(reg)
except asyncio.CancelledError:
pass
mock_urlopen.assert_not_called()
reg.upsert_instance.assert_not_called()

View file

@ -0,0 +1,215 @@
# tests/test_resources/test_docuvision.py
"""
Unit tests for cf-docuvision FastAPI service (circuitforge_core/resources/docuvision/app.py).
Covers:
- GET /health status + model path
- POST /extract image_b64, image_path, hint routing, metadata fields
- _parse_dolphin_output JSON list path, table detection, plain-text fallback
- _image_from_request missing both fields 422; bad image_path 404
"""
from __future__ import annotations
import base64
import io
import json
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from fastapi.testclient import TestClient
from PIL import Image
import circuitforge_core.resources.docuvision.app as docuvision_module
from circuitforge_core.resources.docuvision.app import (
_parse_dolphin_output,
app,
)
# ── fixtures ──────────────────────────────────────────────────────────────────
def _make_jpeg_b64(width: int = 10, height: int = 10) -> str:
"""Return a base64-encoded 10x10 white JPEG."""
img = Image.new("RGB", (width, height), color=(255, 255, 255))
buf = io.BytesIO()
img.save(buf, format="JPEG")
return base64.b64encode(buf.getvalue()).decode()
@pytest.fixture(autouse=True)
def _reset_module_state():
"""Reset module-level model state between tests."""
docuvision_module._model = None
docuvision_module._processor = None
docuvision_module._model_path = "/fake/model"
docuvision_module._device = "cpu"
yield
docuvision_module._model = None
docuvision_module._processor = None
@pytest.fixture
def mock_model():
"""
Inject fake model + processor into the module so _load_model() is skipped.
The processor returns a dict-like with 'input_ids'; the model generate()
returns a tensor-like whose decode produces a JSON string.
"""
fake_ids = MagicMock()
fake_ids.shape = [1, 5] # input_len = 5
fake_inputs = {"input_ids": fake_ids}
fake_inputs_obj = MagicMock()
fake_inputs_obj.__getitem__ = lambda self, k: fake_inputs[k]
fake_inputs_obj.to = lambda device: fake_inputs_obj
fake_output = MagicMock()
fake_output.__getitem__ = lambda self, idx: MagicMock() # output_ids[0]
fake_model = MagicMock()
fake_model.generate.return_value = fake_output
fake_processor = MagicMock()
fake_processor.return_value = fake_inputs_obj
fake_processor.decode.return_value = json.dumps([
{"type": "heading", "text": "Invoice", "bbox": [0.0, 0.0, 1.0, 0.1]},
{"type": "table", "text": "row1", "html": "<table><tr><td>row1</td></tr></table>",
"bbox": [0.0, 0.1, 1.0, 0.5]},
])
docuvision_module._model = fake_model
docuvision_module._processor = fake_processor
return fake_model, fake_processor
@pytest.fixture
def client():
return TestClient(app)
# ── health ────────────────────────────────────────────────────────────────────
def test_health_returns_ok(client):
resp = client.get("/health")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert data["model"] == "/fake/model"
# ── _parse_dolphin_output ────────────────────────────────────────────────────
def test_parse_json_list_elements():
raw = json.dumps([
{"type": "heading", "text": "Title"},
{"type": "paragraph", "text": "Body text"},
])
elements, tables, raw_text = _parse_dolphin_output(raw)
assert len(elements) == 2
assert elements[0].type == "heading"
assert elements[0].text == "Title"
assert elements[1].type == "paragraph"
assert raw_text == "Title\nBody text"
assert tables == []
def test_parse_json_table_extracted():
raw = json.dumps([
{"type": "table", "text": "row", "html": "<table><tr><td>A</td></tr></table>",
"bbox": [0.0, 0.0, 1.0, 0.5]},
])
elements, tables, raw_text = _parse_dolphin_output(raw)
assert len(tables) == 1
assert tables[0].html == "<table><tr><td>A</td></tr></table>"
assert tables[0].bbox == [0.0, 0.0, 1.0, 0.5]
assert len(elements) == 1
assert elements[0].type == "table"
def test_parse_plain_text_fallback():
raw = "This is not JSON at all."
elements, tables, raw_text = _parse_dolphin_output(raw)
assert len(elements) == 1
assert elements[0].type == "paragraph"
assert elements[0].text == raw
assert tables == []
assert raw_text == raw
def test_parse_empty_string_fallback():
elements, tables, raw_text = _parse_dolphin_output("")
assert len(elements) == 1
assert elements[0].type == "paragraph"
assert elements[0].text == ""
def test_parse_json_missing_type_defaults_to_paragraph():
raw = json.dumps([{"text": "no type field"}])
elements, tables, _ = _parse_dolphin_output(raw)
assert elements[0].type == "paragraph"
# ── POST /extract ─────────────────────────────────────────────────────────────
def test_extract_image_b64(client, mock_model):
resp = client.post("/extract", json={"image_b64": _make_jpeg_b64(), "hint": "auto"})
assert resp.status_code == 200
data = resp.json()
assert "elements" in data
assert "raw_text" in data
assert "tables" in data
assert data["metadata"]["hint"] == "auto"
assert data["metadata"]["model"] == "/fake/model"
assert data["metadata"]["width"] == 10
assert data["metadata"]["height"] == 10
def test_extract_hint_table_routes_correct_prompt(client, mock_model):
_, fake_processor = mock_model
resp = client.post("/extract", json={"image_b64": _make_jpeg_b64(), "hint": "table"})
assert resp.status_code == 200
# Verify processor was called with the table-specific prompt
call_kwargs = fake_processor.call_args
assert "table" in call_kwargs.kwargs.get("text", "") or \
"table" in str(call_kwargs)
def test_extract_hint_unknown_falls_back_to_auto(client, mock_model):
"""An unrecognised hint silently falls back to the 'auto' prompt."""
resp = client.post("/extract", json={"image_b64": _make_jpeg_b64(), "hint": "nonsense"})
assert resp.status_code == 200
def test_extract_image_path(tmp_path, client, mock_model):
img_file = tmp_path / "doc.png"
Image.new("RGB", (8, 8), color=(0, 0, 0)).save(img_file)
resp = client.post("/extract", json={"image_path": str(img_file)})
assert resp.status_code == 200
assert resp.json()["metadata"]["width"] == 8
def test_extract_image_path_not_found(client, mock_model):
resp = client.post("/extract", json={"image_path": "/nonexistent/path/img.png"})
assert resp.status_code == 404
def test_extract_no_image_raises_422(client, mock_model):
resp = client.post("/extract", json={"hint": "auto"})
assert resp.status_code == 422
def test_extract_response_includes_tables(client, mock_model):
"""Verify table objects surface in response when model returns table elements."""
resp = client.post("/extract", json={"image_b64": _make_jpeg_b64()})
assert resp.status_code == 200
data = resp.json()
assert len(data["tables"]) == 1
assert "<table>" in data["tables"][0]["html"]
def test_extract_device_in_metadata(client, mock_model):
resp = client.post("/extract", json={"image_b64": _make_jpeg_b64()})
assert resp.status_code == 200
assert "device" in resp.json()["metadata"]