kiwi/app/services/barcode_scanner.py
pyr0ball 33a5cdec37 feat: cloud auth bypass, VRAM leasing, barcode EXIF fix, pipeline improvements
- cloud_session.py: CLOUD_AUTH_BYPASS_IPS with CIDR support; X-Real-IP for
  Docker bridge NAT-aware client IP resolution; local-dev DB path under
  CLOUD_DATA_ROOT for bypass sessions
- compose.cloud.yml: thread CLOUD_AUTH_BYPASS_IPS from shell env; document
  Docker bridge CIDR requirement in .env.example
- nginx.cloud.conf + nginx.conf: client_max_body_size 20m for barcode uploads
- barcode_scanner.py: EXIF orientation correction (PIL ImageOps.exif_transpose)
  before cv2 decode; rotation coverage extended to [90, 180, 270, 45, 135]
  to catch sideways barcodes the 270° case was missing
- llm_recipe.py: CF-core VRAM lease acquire/release wrapping LLMRouter calls
- tasks/runner.py + config.py: COORDINATOR_URL + recipe_llm VRAM budget (4GB)
- recipes.py: per-request Store creation inside asyncio.to_thread worker to
  avoid SQLite check_same_thread violations
- download_datasets.py: HF_PARQUET_FILES strategy for repos without dataset
  builders (lishuyang/recipepairs direct parquet download)
- derive_substitutions.py: use recipepairs_recipes.parquet for ingredient
  lookup; numpy array detection; JSON category parsing
- test_build_flavorgraph_index.py: rewritten for CSV-based index format
- pyproject.toml: add Pillow>=10.0 for EXIF rotation support
2026-04-01 16:06:23 -07:00

396 lines
13 KiB
Python

"""
Barcode scanning service using pyzbar.
This module provides functionality to detect and decode barcodes
from images (UPC, EAN, QR codes, etc.).
"""
import io
import cv2
import numpy as np
from pyzbar import pyzbar
from pathlib import Path
from typing import List, Dict, Any, Optional
import logging
try:
from PIL import Image as _PILImage
_HAS_PIL = True
except ImportError:
_HAS_PIL = False
logger = logging.getLogger(__name__)
class BarcodeScanner:
"""
Service for scanning barcodes from images.
Supports various barcode formats:
- UPC-A, UPC-E
- EAN-8, EAN-13
- Code 39, Code 128
- QR codes
- And more via pyzbar/libzbar
"""
def scan_image(self, image_path: Path) -> List[Dict[str, Any]]:
"""
Scan an image for barcodes.
Args:
image_path: Path to the image file
Returns:
List of detected barcodes, each as a dictionary with:
- data: Barcode data (string)
- type: Barcode type (e.g., 'EAN13', 'QRCODE')
- quality: Quality score (0-100)
- rect: Bounding box (x, y, width, height)
"""
try:
# Read image
image = cv2.imread(str(image_path))
if image is None:
logger.error(f"Failed to load image: {image_path}")
return []
# Convert to grayscale for better detection
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Try multiple preprocessing techniques and rotations for better detection
barcodes = []
# 1. Try on original grayscale
barcodes.extend(self._detect_barcodes(gray, image))
# 2. Try with adaptive thresholding (helps with poor lighting)
if not barcodes:
thresh = cv2.adaptiveThreshold(
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2
)
barcodes.extend(self._detect_barcodes(thresh, image))
# 3. Try with sharpening (helps with blurry images)
if not barcodes:
kernel = np.array([[-1, -1, -1],
[-1, 9, -1],
[-1, -1, -1]])
sharpened = cv2.filter2D(gray, -1, kernel)
barcodes.extend(self._detect_barcodes(sharpened, image))
# 4. Try rotations if still no barcodes found (handles tilted/rotated barcodes)
if not barcodes:
logger.info("No barcodes found in standard orientation, trying rotations...")
for angle in [90, 180, 270, 45, 135]:
rotated_gray = self._rotate_image(gray, angle)
rotated_color = self._rotate_image(image, angle)
detected = self._detect_barcodes(rotated_gray, rotated_color)
if detected:
logger.info(f"Found barcode(s) at {angle}° rotation")
barcodes.extend(detected)
break # Stop after first successful rotation
# Remove duplicates (same data)
unique_barcodes = self._deduplicate_barcodes(barcodes)
logger.info(f"Found {len(unique_barcodes)} barcode(s) in {image_path}")
return unique_barcodes
except Exception as e:
logger.error(f"Error scanning image {image_path}: {e}")
return []
def _detect_barcodes(
self,
image: np.ndarray,
original_image: np.ndarray
) -> List[Dict[str, Any]]:
"""
Detect barcodes in a preprocessed image.
Args:
image: Preprocessed image (grayscale)
original_image: Original color image (for quality assessment)
Returns:
List of detected barcodes
"""
detected = pyzbar.decode(image)
barcodes = []
for barcode in detected:
# Decode barcode data
barcode_data = barcode.data.decode("utf-8")
barcode_type = barcode.type
# Get bounding box
rect = barcode.rect
bbox = {
"x": rect.left,
"y": rect.top,
"width": rect.width,
"height": rect.height,
}
# Assess quality of barcode region
quality = self._assess_barcode_quality(original_image, bbox)
barcodes.append({
"data": barcode_data,
"type": barcode_type,
"quality": quality,
"rect": bbox,
})
return barcodes
def _assess_barcode_quality(
self,
image: np.ndarray,
bbox: Dict[str, int]
) -> int:
"""
Assess the quality of a detected barcode.
Args:
image: Original image
bbox: Bounding box of barcode
Returns:
Quality score (0-100)
"""
try:
# Extract barcode region
x, y, w, h = bbox["x"], bbox["y"], bbox["width"], bbox["height"]
# Add padding
pad = 10
y1 = max(0, y - pad)
y2 = min(image.shape[0], y + h + pad)
x1 = max(0, x - pad)
x2 = min(image.shape[1], x + w + pad)
region = image[y1:y2, x1:x2]
if region.size == 0:
return 50
# Convert to grayscale if needed
if len(region.shape) == 3:
region = cv2.cvtColor(region, cv2.COLOR_BGR2GRAY)
# Calculate sharpness (Laplacian variance)
laplacian_var = cv2.Laplacian(region, cv2.CV_64F).var()
sharpness_score = min(100, laplacian_var / 10) # Normalize
# Calculate contrast
min_val, max_val = region.min(), region.max()
contrast = (max_val - min_val) / 255.0 * 100
# Calculate size score (larger is better, up to a point)
area = w * h
size_score = min(100, area / 100) # Normalize
# Weighted average
quality = (sharpness_score * 0.4 + contrast * 0.4 + size_score * 0.2)
return int(quality)
except Exception as e:
logger.warning(f"Error assessing barcode quality: {e}")
return 50
def _rotate_image(self, image: np.ndarray, angle: float) -> np.ndarray:
"""
Rotate an image by a given angle.
Args:
image: Input image
angle: Rotation angle in degrees (any angle, but optimized for 90° increments)
Returns:
Rotated image
"""
# Use fast optimized rotation for common angles
if angle == 90:
return cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE)
elif angle == 180:
return cv2.rotate(image, cv2.ROTATE_180)
elif angle == 270:
return cv2.rotate(image, cv2.ROTATE_90_COUNTERCLOCKWISE)
elif angle == 0:
return image
else:
# For arbitrary angles, use affine transformation
(h, w) = image.shape[:2]
center = (w // 2, h // 2)
# Get rotation matrix
M = cv2.getRotationMatrix2D(center, angle, 1.0)
# Calculate new bounding dimensions
cos = np.abs(M[0, 0])
sin = np.abs(M[0, 1])
new_w = int((h * sin) + (w * cos))
new_h = int((h * cos) + (w * sin))
# Adjust rotation matrix for new dimensions
M[0, 2] += (new_w / 2) - center[0]
M[1, 2] += (new_h / 2) - center[1]
# Perform rotation
return cv2.warpAffine(image, M, (new_w, new_h),
flags=cv2.INTER_CUBIC,
borderMode=cv2.BORDER_REPLICATE)
def _deduplicate_barcodes(
self,
barcodes: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Remove duplicate barcodes (same data).
If multiple detections of the same barcode, keep the one
with the highest quality score.
Args:
barcodes: List of detected barcodes
Returns:
Deduplicated list
"""
seen = {}
for barcode in barcodes:
data = barcode["data"]
if data not in seen or barcode["quality"] > seen[data]["quality"]:
seen[data] = barcode
return list(seen.values())
def _fix_exif_orientation(self, image_bytes: bytes) -> bytes:
"""Apply EXIF orientation correction so cv2 sees an upright image.
Phone cameras embed rotation in EXIF; cv2.imdecode ignores it,
so a photo taken in portrait may arrive physically sideways in memory.
"""
if not _HAS_PIL:
return image_bytes
try:
pil = _PILImage.open(io.BytesIO(image_bytes))
pil = _PILImage.fromarray(np.array(pil)) # strips EXIF but applies orientation via PIL
# Use ImageOps.exif_transpose for proper EXIF-aware rotation
import PIL.ImageOps
pil = PIL.ImageOps.exif_transpose(pil)
buf = io.BytesIO()
pil.save(buf, format="JPEG")
return buf.getvalue()
except Exception:
return image_bytes
def scan_from_bytes(self, image_bytes: bytes) -> List[Dict[str, Any]]:
"""
Scan barcodes from image bytes (uploaded file).
Args:
image_bytes: Image data as bytes
Returns:
List of detected barcodes
"""
try:
# Apply EXIF orientation correction first (phone cameras embed rotation in EXIF;
# cv2.imdecode ignores it, causing sideways barcodes to appear rotated in memory).
image_bytes = self._fix_exif_orientation(image_bytes)
# Convert bytes to numpy array
nparr = np.frombuffer(image_bytes, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if image is None:
logger.error("Failed to decode image from bytes")
return []
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Try multiple approaches for better detection
barcodes = []
# 1. Try original orientation
barcodes.extend(self._detect_barcodes(gray, image))
# 2. Try with adaptive thresholding
if not barcodes:
thresh = cv2.adaptiveThreshold(
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2
)
barcodes.extend(self._detect_barcodes(thresh, image))
# 3. Try all 90° rotations + common tilt angles
# 90/270 catches truly sideways barcodes; 180 catches upside-down;
# 45/135 catches tilted barcodes on flat surfaces.
if not barcodes:
logger.info("No barcodes found in uploaded image, trying rotations...")
for angle in [90, 180, 270, 45, 135]:
rotated_gray = self._rotate_image(gray, angle)
rotated_color = self._rotate_image(image, angle)
detected = self._detect_barcodes(rotated_gray, rotated_color)
if detected:
logger.info(f"Found barcode(s) in uploaded image at {angle}° rotation")
barcodes.extend(detected)
break
return self._deduplicate_barcodes(barcodes)
except Exception as e:
logger.error(f"Error scanning image from bytes: {e}")
return []
def validate_barcode(self, barcode: str, barcode_type: str) -> bool:
"""
Validate a barcode using check digits (for EAN/UPC).
Args:
barcode: Barcode string
barcode_type: Type of barcode (e.g., 'EAN13', 'UPCA')
Returns:
True if valid, False otherwise
"""
if barcode_type in ["EAN13", "UPCA"]:
return self._validate_ean13(barcode)
elif barcode_type == "EAN8":
return self._validate_ean8(barcode)
# For other types, assume valid if detected
return True
def _validate_ean13(self, barcode: str) -> bool:
"""Validate EAN-13 barcode using check digit."""
if len(barcode) != 13 or not barcode.isdigit():
return False
# Calculate check digit
odd_sum = sum(int(barcode[i]) for i in range(0, 12, 2))
even_sum = sum(int(barcode[i]) for i in range(1, 12, 2))
total = odd_sum + (even_sum * 3)
check_digit = (10 - (total % 10)) % 10
return int(barcode[12]) == check_digit
def _validate_ean8(self, barcode: str) -> bool:
"""Validate EAN-8 barcode using check digit."""
if len(barcode) != 8 or not barcode.isdigit():
return False
# Calculate check digit
odd_sum = sum(int(barcode[i]) for i in range(1, 7, 2))
even_sum = sum(int(barcode[i]) for i in range(0, 7, 2))
total = (odd_sum * 3) + even_sum
check_digit = (10 - (total % 10)) % 10
return int(barcode[7]) == check_digit