From e58c3aea23652031c6f7bdee1df3768932a924f6 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 2 Apr 2026 12:55:38 -0700 Subject: [PATCH] fix: TTL sweep, immutability, service-scoped release, logger in orch alloc - ServiceRegistry: add sweep_expired_allocations() to remove stale TTL allocations and transition instances to idle; add get_allocation() helper - AgentSupervisor._run_idle_sweep: call sweep_expired_allocations() before idle-timeout check so crashed-caller leaks are cleaned up each sweep tick - schema._parse_managed: copy raw dict before extracting 'type' key instead of mutating caller's dict with pop() - app.release_allocation: validate allocation belongs to the given service path param before releasing; return 404 if mismatch - router._try_cf_orch_alloc: replace print() with logger.warning(); add module-level logger = logging.getLogger(__name__) - tests: add test_sweep_expired_allocations covering TTL expiry and idle state transition --- circuitforge_core/llm/router.py | 5 +++- .../resources/coordinator/agent_supervisor.py | 3 +++ .../resources/coordinator/app.py | 3 +++ .../resources/coordinator/service_registry.py | 19 +++++++++++++++ .../resources/profiles/schema.py | 7 +++--- tests/test_resources/test_service_registry.py | 23 +++++++++++++++++++ 6 files changed, 56 insertions(+), 4 deletions(-) diff --git a/circuitforge_core/llm/router.py b/circuitforge_core/llm/router.py index 7336209..3a22d81 100644 --- a/circuitforge_core/llm/router.py +++ b/circuitforge_core/llm/router.py @@ -3,12 +3,15 @@ LLM abstraction layer with priority fallback chain. Reads config from ~/.config/circuitforge/llm.yaml. Tries backends in order; falls back on any error. """ +import logging import os import yaml import requests from pathlib import Path from openai import OpenAI +logger = logging.getLogger(__name__) + CONFIG_PATH = Path.home() / ".config" / "circuitforge" / "llm.yaml" @@ -62,7 +65,7 @@ class LLMRouter: alloc = ctx.__enter__() return (ctx, alloc) except Exception as exc: - print(f"[LLMRouter] cf_orch allocation failed, using base_url directly: {exc}") + logger.warning("[LLMRouter] cf_orch allocation failed, using base_url directly: %s", exc) return None def complete(self, prompt: str, system: str | None = None, diff --git a/circuitforge_core/resources/coordinator/agent_supervisor.py b/circuitforge_core/resources/coordinator/agent_supervisor.py index b389abb..8536636 100644 --- a/circuitforge_core/resources/coordinator/agent_supervisor.py +++ b/circuitforge_core/resources/coordinator/agent_supervisor.py @@ -147,6 +147,9 @@ class AgentSupervisor: async def _run_idle_sweep(self) -> None: if self._service_registry is None: return + expired = self._service_registry.sweep_expired_allocations() + if expired: + logger.info("TTL sweep: expired %d allocation(s): %s", len(expired), expired) idle_stop_config = self._build_idle_stop_config() if not idle_stop_config: return diff --git a/circuitforge_core/resources/coordinator/app.py b/circuitforge_core/resources/coordinator/app.py index 76c104e..1fd7c91 100644 --- a/circuitforge_core/resources/coordinator/app.py +++ b/circuitforge_core/resources/coordinator/app.py @@ -346,6 +346,9 @@ def create_coordinator_app( @app.delete("/api/services/{service}/allocations/{allocation_id}") async def release_allocation(service: str, allocation_id: str) -> dict[str, Any]: + existing = service_registry.get_allocation(allocation_id) + if existing is None or existing.service != service: + raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found for service {service!r}") released = service_registry.release(allocation_id) if not released: raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found") diff --git a/circuitforge_core/resources/coordinator/service_registry.py b/circuitforge_core/resources/coordinator/service_registry.py index 9a2f254..c258ad7 100644 --- a/circuitforge_core/resources/coordinator/service_registry.py +++ b/circuitforge_core/resources/coordinator/service_registry.py @@ -121,6 +121,25 @@ class ServiceRegistry: self._instances[key] = inst return inst + def get_allocation(self, allocation_id: str) -> ServiceAllocation | None: + return self._allocations.get(allocation_id) + + def sweep_expired_allocations(self) -> list[str]: + """ + Remove all allocations whose TTL has elapsed and transition the + corresponding instance to 'idle' if no active allocations remain. + Returns the list of expired allocation_ids. + """ + now = time.time() + expired = [ + alloc_id + for alloc_id, alloc in self._allocations.items() + if alloc.expires_at > 0 and now > alloc.expires_at + ] + for alloc_id in expired: + self.release(alloc_id) + return expired + def all_allocations(self) -> list[ServiceAllocation]: return list(self._allocations.values()) diff --git a/circuitforge_core/resources/profiles/schema.py b/circuitforge_core/resources/profiles/schema.py index 4439039..1ba8ee5 100644 --- a/circuitforge_core/resources/profiles/schema.py +++ b/circuitforge_core/resources/profiles/schema.py @@ -61,11 +61,12 @@ class ServiceProfile(BaseModel): return values if not isinstance(raw, dict): return values - spec_type = raw.pop("type", None) + spec_type = raw.get("type") + managed_fields = {k: v for k, v in raw.items() if k != "type"} if spec_type == "docker": - values["managed"] = DockerSpec(**raw) + values["managed"] = DockerSpec(**managed_fields) elif spec_type == "process": - values["managed"] = ProcessSpec(**raw) + values["managed"] = ProcessSpec(**managed_fields) else: raise ValueError(f"Unknown managed service type: {spec_type!r}") return values diff --git a/tests/test_resources/test_service_registry.py b/tests/test_resources/test_service_registry.py index 5c19913..dc73a9c 100644 --- a/tests/test_resources/test_service_registry.py +++ b/tests/test_resources/test_service_registry.py @@ -61,3 +61,26 @@ def test_new_alloc_on_idle_instance_marks_it_running(registry): model="M", url="http://h:8000") registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "x", 300.0) assert registry.all_instances()[0].state == "running" + + +def test_sweep_expired_allocations(registry): + # Register a running instance so idle-transition logic has something to act on. + registry.upsert_instance("vllm", "heimdall", 0, state="running", + model="M", url="http://h:8000") + # Create an allocation with a very short TTL (1 second). + alloc = registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "caller", ttl_s=1) + assert registry.active_allocations("vllm", "heimdall", 0) == 1 + + # Wait for TTL to elapse. + time.sleep(1.1) + + expired = registry.sweep_expired_allocations() + + # The allocation should have been swept. + assert alloc.allocation_id in expired + assert registry.active_allocations("vllm", "heimdall", 0) == 0 + + # The instance should have transitioned to idle since no allocations remain. + instance = registry.all_instances()[0] + assert instance.state == "idle" + assert instance.idle_since is not None