cf-vision/cf_vision/camera.py
pyr0ball 353525c1f4 feat: initial cf-vision scaffold — ImageFrame API, stub inference modules
- cf_vision/models.py: ImageFrame + ImageElement + BoundingBox (MIT)
  Full Dolphin-v2 element taxonomy (21 types), convenience accessors
  (text_blocks, barcodes, tables, full_text)
- cf_vision/router.py: VisionRouter — mock + real paths, task routing
  (document, barcode, receipt, general)
- cf_vision/barcode.py: BarcodeScanner — pyzbar wrapper, CPU-only, MIT
- cf_vision/ocr.py: DolphinOCR — ByteDance/Dolphin-v2 async stub (BSL 1.1)
- cf_vision/receipt.py: ReceiptParser stub — Kiwi Phase 2 target (BSL 1.1)
- cf_vision/camera.py: CameraCapture — OpenCV single-frame capture (MIT)
- pyproject.toml: inference / barcode / camera optional extras
- .env.example: HF_TOKEN, CF_VISION_DEVICE, CF_VISION_MOCK
- README: module map, ImageFrame API reference, consumer roadmap
- tests: 6 passing (ImageFrame accessors, VisionRouter mock/real)

Extracted from circuitforge_core.vision per cf-core#36.
2026-04-06 17:59:00 -07:00

79 lines
2.5 KiB
Python

# cf_vision/camera.py — camera capture and preprocessing
#
# MIT licensed. Uses OpenCV for capture; no GPU required for capture itself.
# Requires [camera] extras: pip install cf-vision[camera]
#
# Planned consumers:
# Kiwi — live barcode scan from phone camera
# Godwit — fingerprint/ID document capture for emergency identity recovery
from __future__ import annotations
import asyncio
import logging
logger = logging.getLogger(__name__)
class CameraCapture:
"""
Single-frame camera capture with preprocessing.
Captures one frame from a camera device, normalises it to JPEG bytes
suitable for VisionRouter.analyze() or BarcodeScanner.scan().
Usage
-----
capture = CameraCapture(device_index=0)
jpeg_bytes = await capture.capture_async()
frame = router.analyze(jpeg_bytes, task="barcode")
Requires: pip install cf-vision[camera]
"""
def __init__(
self,
device_index: int = 0,
width: int = 1280,
height: int = 720,
jpeg_quality: int = 92,
) -> None:
self._device_index = device_index
self._width = width
self._height = height
self._jpeg_quality = jpeg_quality
def capture(self) -> bytes:
"""
Capture one frame and return JPEG bytes.
Stub: raises NotImplementedError until Kiwi Phase 2.
"""
try:
import cv2
except ImportError as exc:
raise ImportError(
"OpenCV is required for camera capture. "
"Install with: pip install cf-vision[camera]"
) from exc
cap = cv2.VideoCapture(self._device_index)
try:
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self._width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self._height)
ok, frame = cap.read()
if not ok or frame is None:
raise RuntimeError(
f"Camera device {self._device_index} did not return a frame."
)
ok, buf = cv2.imencode(
".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self._jpeg_quality]
)
if not ok:
raise RuntimeError("JPEG encoding failed")
return bytes(buf)
finally:
cap.release()
async def capture_async(self) -> bytes:
"""capture() without blocking the event loop."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.capture)