From e2658f743f9fd93763ed50b6e565faefd645c739 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Sun, 19 Apr 2026 22:11:34 -0700 Subject: [PATCH] feat(scheduler): OrchestratedScheduler for cloud/multi-GPU, configurable via env Switches to OrchestratedScheduler in cloud mode so concurrent recipe_llm jobs fan out across all registered cf-orch GPU nodes instead of serializing on one. Under load this eliminates poll timeouts from queue backup. USE_ORCH_SCHEDULER env var gives explicit control independent of CLOUD_MODE: unset follow CLOUD_MODE (cloud=orch, local=local) true OrchestratedScheduler always (e.g. multi-GPU local rig) false LocalScheduler always (e.g. cloud single-GPU dev instance) ImportError fallback: if circuitforge_orch is not installed and orch is requested, logs a warning and falls back to LocalScheduler gracefully. --- .env.example | 3 +++ app/core/config.py | 7 +++++ app/tasks/scheduler.py | 58 +++++++++++++++++++++++++++++++++++++++--- 3 files changed, 65 insertions(+), 3 deletions(-) diff --git a/.env.example b/.env.example index ba4326b..5f0e9f5 100644 --- a/.env.example +++ b/.env.example @@ -51,6 +51,9 @@ ENABLE_OCR=false DEBUG=false CLOUD_MODE=false DEMO_MODE=false +# USE_ORCH_SCHEDULER: use coordinator-aware multi-GPU scheduler instead of local FIFO. +# Unset = follow CLOUD_MODE. Set true for multi-GPU local rigs without full cloud auth. +# USE_ORCH_SCHEDULER=false # Cloud mode (set in compose.cloud.yml; also set here for reference) # CLOUD_DATA_ROOT=/devl/kiwi-cloud-data diff --git a/app/core/config.py b/app/core/config.py index 6b55f07..315fa90 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -66,6 +66,13 @@ class Settings: # Feature flags ENABLE_OCR: bool = os.environ.get("ENABLE_OCR", "false").lower() in ("1", "true", "yes") + # Use OrchestratedScheduler (coordinator-aware, multi-GPU fan-out) instead of + # LocalScheduler. Defaults to true in CLOUD_MODE; can be set independently + # for multi-GPU local rigs that don't need full cloud auth. + USE_ORCH_SCHEDULER: bool | None = ( + None if os.environ.get("USE_ORCH_SCHEDULER") is None + else os.environ.get("USE_ORCH_SCHEDULER", "").lower() in ("1", "true", "yes") + ) # Runtime DEBUG: bool = os.environ.get("DEBUG", "false").lower() in ("1", "true", "yes") diff --git a/app/tasks/scheduler.py b/app/tasks/scheduler.py index 64bd268..e16718b 100644 --- a/app/tasks/scheduler.py +++ b/app/tasks/scheduler.py @@ -1,5 +1,10 @@ # app/tasks/scheduler.py -"""Kiwi LLM task scheduler — thin shim over circuitforge_core.tasks.scheduler.""" +"""Kiwi LLM task scheduler — thin shim over circuitforge_core.tasks.scheduler. + +Local mode (CLOUD_MODE unset): LocalScheduler — simple FIFO, no coordinator. +Cloud mode (CLOUD_MODE=true): OrchestratedScheduler — coordinator-aware, fans +out concurrent jobs across all registered cf-orch GPU nodes. +""" from __future__ import annotations from pathlib import Path @@ -7,15 +12,50 @@ from pathlib import Path from circuitforge_core.tasks.scheduler import ( TaskScheduler, get_scheduler as _base_get_scheduler, - reset_scheduler, # re-export for tests + reset_scheduler as _reset_local, # re-export for tests ) +from app.cloud_session import CLOUD_MODE from app.core.config import settings from app.tasks.runner import LLM_TASK_TYPES, VRAM_BUDGETS, run_task +def _use_orch() -> bool: + """Return True if the OrchestratedScheduler should be used. + + Explicit USE_ORCH_SCHEDULER env var takes priority; falls back to CLOUD_MODE. + """ + override = settings.USE_ORCH_SCHEDULER + return CLOUD_MODE if override is None else override + + def get_scheduler(db_path: Path) -> TaskScheduler: - """Return the process-level TaskScheduler singleton for Kiwi.""" + """Return the process-level TaskScheduler singleton for Kiwi. + + OrchestratedScheduler: coordinator-aware, fans out concurrent jobs across + all registered cf-orch GPU nodes. Active when USE_ORCH_SCHEDULER=true or + CLOUD_MODE=true (and USE_ORCH_SCHEDULER is not explicitly false). + + LocalScheduler: serial FIFO, no coordinator dependency. Default for local dev. + """ + if _use_orch(): + try: + from circuitforge_orch.scheduler import get_orch_scheduler + except ImportError: + import logging + logging.getLogger(__name__).warning( + "circuitforge_orch not installed — falling back to LocalScheduler" + ) + else: + return get_orch_scheduler( + db_path=db_path, + run_task_fn=run_task, + task_types=LLM_TASK_TYPES, + vram_budgets=VRAM_BUDGETS, + coordinator_url=settings.COORDINATOR_URL, + service_name="kiwi", + ) + return _base_get_scheduler( db_path=db_path, run_task_fn=run_task, @@ -24,3 +64,15 @@ def get_scheduler(db_path: Path) -> TaskScheduler: coordinator_url=settings.COORDINATOR_URL, service_name="kiwi", ) + + +def reset_scheduler() -> None: + """Shut down and clear the active scheduler singleton. TEST TEARDOWN ONLY.""" + if _use_orch(): + try: + from circuitforge_orch.scheduler import reset_orch_scheduler + reset_orch_scheduler() + return + except ImportError: + pass + _reset_local()