feat(daemon): implement camera loop + FastAPI control API on port 8522

This commit is contained in:
pyr0ball 2026-04-26 21:24:09 -07:00
parent 5af689389e
commit 5d66dbb56b

152
merlin/daemon.py Normal file
View file

@ -0,0 +1,152 @@
"""
Merlin daemon main camera loop + FastAPI control API.
Start: conda run -n cf python -m merlin.daemon
API: http://localhost:8522
GET /status {"running": bool, "backend": str, "config": {...}}
POST /toggle start or stop the detection loop
POST /config/reload reload config from ~/.merlin/config.yaml
"""
from __future__ import annotations
import logging
import threading
from contextlib import asynccontextmanager
from typing import Any
import cv2
import mediapipe as mp
import numpy as np
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from circuitforge_core.input.gestures import CameraCapture, HandsDetector
from merlin.actions.executor import ActionExecutor
from merlin.actions.mapper import ActionMapper
from merlin.config import MerlinConfig
from merlin.features.blink import BlinkDetector
from merlin.features.gaze import GazeEstimator
from merlin.features.hand_gesture import HandGestureDetector
from merlin.features.head_pose import HeadPoseEstimator
logger = logging.getLogger("merlin.daemon")
logging.basicConfig(level=logging.INFO)
# ---------- Shared state ----------
_running = False
_loop_thread: threading.Thread | None = None
_config = MerlinConfig.load()
_executor = ActionExecutor()
_mapper = ActionMapper(_config)
def _camera_loop() -> None:
global _running
blink_det = BlinkDetector(threshold=_config.blink_threshold)
gaze_est = GazeEstimator()
head_est = HeadPoseEstimator()
hand_det = HandGestureDetector()
face_mesh = mp.solutions.face_mesh.FaceMesh(
refine_landmarks=True,
min_detection_confidence=0.7,
min_tracking_confidence=0.5,
)
with CameraCapture(device_index=_config.camera_device) as cam:
with HandsDetector(max_hands=1) as hands_det:
for frame_bgr in cam.frames():
if not _running:
break
rgb = frame_bgr[:, :, ::-1]
face_results = face_mesh.process(rgb)
if face_results.multi_face_landmarks:
face = np.array(
[
[p.x, p.y, p.z]
for p in face_results.multi_face_landmarks[0].landmark
],
dtype=np.float32,
)
blink = blink_det.detect(face)
if blink:
action = _mapper.map(blink.value)
if action:
_executor.execute(action)
head = head_est.update(face)
if head:
action = _mapper.map(head.value)
if action:
_executor.execute(action)
hands = hands_det.detect(rgb)
if hands:
gesture = hand_det.detect(hands[0].points)
if gesture:
action = _mapper.map(gesture.value)
if action:
_executor.execute(action)
face_mesh.close()
_running = False
logger.info("Camera loop stopped")
# ---------- FastAPI ----------
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Merlin daemon starting on port 8522")
yield
global _running
_running = False
app = FastAPI(title="Merlin Daemon API", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:8521"],
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/status")
def status() -> dict[str, Any]:
return {
"running": _running,
"backend": _executor._backend,
"config": {
"camera_device": _config.camera_device,
"blink_threshold": _config.blink_threshold,
"mappings": _config.mappings,
},
}
@app.post("/toggle")
def toggle() -> dict[str, str]:
global _running, _loop_thread
if _running:
_running = False
return {"status": "stopped"}
_running = True
_loop_thread = threading.Thread(target=_camera_loop, daemon=True)
_loop_thread.start()
return {"status": "started"}
@app.post("/config/reload")
def reload_config() -> dict[str, str]:
global _config
_config = MerlinConfig.load()
_mapper.reload(_config)
return {"status": "reloaded"}
if __name__ == "__main__":
uvicorn.run("merlin.daemon:app", host="127.0.0.1", port=8522, reload=False)