""" Merlin daemon — main camera loop + FastAPI control API. Start: conda run -n cf python -m merlin.daemon API: http://localhost:8522 GET /status → {"running": bool, "backend": str, "config": {...}} POST /toggle → start or stop the detection loop POST /config/reload → reload config from ~/.merlin/config.yaml """ from __future__ import annotations import logging import threading from contextlib import asynccontextmanager from typing import Any import cv2 import mediapipe as mp import numpy as np import uvicorn from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from circuitforge_core.input.gestures import CameraCapture, HandsDetector from merlin.actions.executor import ActionExecutor from merlin.actions.mapper import ActionMapper from merlin.config import MerlinConfig from merlin.features.blink import BlinkDetector from merlin.features.gaze import GazeEstimator from merlin.features.hand_gesture import HandGestureDetector from merlin.features.head_pose import HeadPoseEstimator logger = logging.getLogger("merlin.daemon") logging.basicConfig(level=logging.INFO) # ---------- Shared state ---------- _running = False _loop_thread: threading.Thread | None = None _config = MerlinConfig.load() _executor = ActionExecutor() _mapper = ActionMapper(_config) def _camera_loop() -> None: global _running blink_det = BlinkDetector(threshold=_config.blink_threshold) gaze_est = GazeEstimator() head_est = HeadPoseEstimator() hand_det = HandGestureDetector() face_mesh = mp.solutions.face_mesh.FaceMesh( refine_landmarks=True, min_detection_confidence=0.7, min_tracking_confidence=0.5, ) with CameraCapture(device_index=_config.camera_device) as cam: with HandsDetector(max_hands=1) as hands_det: for frame_bgr in cam.frames(): if not _running: break rgb = frame_bgr[:, :, ::-1] face_results = face_mesh.process(rgb) if face_results.multi_face_landmarks: face = np.array( [ [p.x, p.y, p.z] for p in face_results.multi_face_landmarks[0].landmark ], dtype=np.float32, ) blink = blink_det.detect(face) if blink: action = _mapper.map(blink.value) if action: _executor.execute(action) head = head_est.update(face) if head: action = _mapper.map(head.value) if action: _executor.execute(action) hands = hands_det.detect(rgb) if hands: gesture = hand_det.detect(hands[0].points) if gesture: action = _mapper.map(gesture.value) if action: _executor.execute(action) face_mesh.close() _running = False logger.info("Camera loop stopped") # ---------- FastAPI ---------- @asynccontextmanager async def lifespan(app: FastAPI): logger.info("Merlin daemon starting on port 8522") yield global _running _running = False app = FastAPI(title="Merlin Daemon API", lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:8521"], allow_methods=["*"], allow_headers=["*"], ) @app.get("/status") def status() -> dict[str, Any]: return { "running": _running, "backend": _executor._backend, "config": { "camera_device": _config.camera_device, "blink_threshold": _config.blink_threshold, "mappings": _config.mappings, }, } @app.post("/toggle") def toggle() -> dict[str, str]: global _running, _loop_thread if _running: _running = False return {"status": "stopped"} _running = True _loop_thread = threading.Thread(target=_camera_loop, daemon=True) _loop_thread.start() return {"status": "started"} @app.post("/config/reload") def reload_config() -> dict[str, str]: global _config _config = MerlinConfig.load() _mapper.reload(_config) return {"status": "reloaded"} if __name__ == "__main__": uvicorn.run("merlin.daemon:app", host="127.0.0.1", port=8522, reload=False)