fix: TTL sweep, immutability, service-scoped release, logger in orch alloc
- ServiceRegistry: add sweep_expired_allocations() to remove stale TTL allocations and transition instances to idle; add get_allocation() helper - AgentSupervisor._run_idle_sweep: call sweep_expired_allocations() before idle-timeout check so crashed-caller leaks are cleaned up each sweep tick - schema._parse_managed: copy raw dict before extracting 'type' key instead of mutating caller's dict with pop() - app.release_allocation: validate allocation belongs to the given service path param before releasing; return 404 if mismatch - router._try_cf_orch_alloc: replace print() with logger.warning(); add module-level logger = logging.getLogger(__name__) - tests: add test_sweep_expired_allocations covering TTL expiry and idle state transition
This commit is contained in:
parent
1a20b80a50
commit
e58c3aea23
6 changed files with 56 additions and 4 deletions
|
|
@ -3,12 +3,15 @@ LLM abstraction layer with priority fallback chain.
|
||||||
Reads config from ~/.config/circuitforge/llm.yaml.
|
Reads config from ~/.config/circuitforge/llm.yaml.
|
||||||
Tries backends in order; falls back on any error.
|
Tries backends in order; falls back on any error.
|
||||||
"""
|
"""
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
import yaml
|
import yaml
|
||||||
import requests
|
import requests
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from openai import OpenAI
|
from openai import OpenAI
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
CONFIG_PATH = Path.home() / ".config" / "circuitforge" / "llm.yaml"
|
CONFIG_PATH = Path.home() / ".config" / "circuitforge" / "llm.yaml"
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -62,7 +65,7 @@ class LLMRouter:
|
||||||
alloc = ctx.__enter__()
|
alloc = ctx.__enter__()
|
||||||
return (ctx, alloc)
|
return (ctx, alloc)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
print(f"[LLMRouter] cf_orch allocation failed, using base_url directly: {exc}")
|
logger.warning("[LLMRouter] cf_orch allocation failed, using base_url directly: %s", exc)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def complete(self, prompt: str, system: str | None = None,
|
def complete(self, prompt: str, system: str | None = None,
|
||||||
|
|
|
||||||
|
|
@ -147,6 +147,9 @@ class AgentSupervisor:
|
||||||
async def _run_idle_sweep(self) -> None:
|
async def _run_idle_sweep(self) -> None:
|
||||||
if self._service_registry is None:
|
if self._service_registry is None:
|
||||||
return
|
return
|
||||||
|
expired = self._service_registry.sweep_expired_allocations()
|
||||||
|
if expired:
|
||||||
|
logger.info("TTL sweep: expired %d allocation(s): %s", len(expired), expired)
|
||||||
idle_stop_config = self._build_idle_stop_config()
|
idle_stop_config = self._build_idle_stop_config()
|
||||||
if not idle_stop_config:
|
if not idle_stop_config:
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -346,6 +346,9 @@ def create_coordinator_app(
|
||||||
|
|
||||||
@app.delete("/api/services/{service}/allocations/{allocation_id}")
|
@app.delete("/api/services/{service}/allocations/{allocation_id}")
|
||||||
async def release_allocation(service: str, allocation_id: str) -> dict[str, Any]:
|
async def release_allocation(service: str, allocation_id: str) -> dict[str, Any]:
|
||||||
|
existing = service_registry.get_allocation(allocation_id)
|
||||||
|
if existing is None or existing.service != service:
|
||||||
|
raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found for service {service!r}")
|
||||||
released = service_registry.release(allocation_id)
|
released = service_registry.release(allocation_id)
|
||||||
if not released:
|
if not released:
|
||||||
raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found")
|
raise HTTPException(404, detail=f"Allocation {allocation_id!r} not found")
|
||||||
|
|
|
||||||
|
|
@ -121,6 +121,25 @@ class ServiceRegistry:
|
||||||
self._instances[key] = inst
|
self._instances[key] = inst
|
||||||
return inst
|
return inst
|
||||||
|
|
||||||
|
def get_allocation(self, allocation_id: str) -> ServiceAllocation | None:
|
||||||
|
return self._allocations.get(allocation_id)
|
||||||
|
|
||||||
|
def sweep_expired_allocations(self) -> list[str]:
|
||||||
|
"""
|
||||||
|
Remove all allocations whose TTL has elapsed and transition the
|
||||||
|
corresponding instance to 'idle' if no active allocations remain.
|
||||||
|
Returns the list of expired allocation_ids.
|
||||||
|
"""
|
||||||
|
now = time.time()
|
||||||
|
expired = [
|
||||||
|
alloc_id
|
||||||
|
for alloc_id, alloc in self._allocations.items()
|
||||||
|
if alloc.expires_at > 0 and now > alloc.expires_at
|
||||||
|
]
|
||||||
|
for alloc_id in expired:
|
||||||
|
self.release(alloc_id)
|
||||||
|
return expired
|
||||||
|
|
||||||
def all_allocations(self) -> list[ServiceAllocation]:
|
def all_allocations(self) -> list[ServiceAllocation]:
|
||||||
return list(self._allocations.values())
|
return list(self._allocations.values())
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -61,11 +61,12 @@ class ServiceProfile(BaseModel):
|
||||||
return values
|
return values
|
||||||
if not isinstance(raw, dict):
|
if not isinstance(raw, dict):
|
||||||
return values
|
return values
|
||||||
spec_type = raw.pop("type", None)
|
spec_type = raw.get("type")
|
||||||
|
managed_fields = {k: v for k, v in raw.items() if k != "type"}
|
||||||
if spec_type == "docker":
|
if spec_type == "docker":
|
||||||
values["managed"] = DockerSpec(**raw)
|
values["managed"] = DockerSpec(**managed_fields)
|
||||||
elif spec_type == "process":
|
elif spec_type == "process":
|
||||||
values["managed"] = ProcessSpec(**raw)
|
values["managed"] = ProcessSpec(**managed_fields)
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Unknown managed service type: {spec_type!r}")
|
raise ValueError(f"Unknown managed service type: {spec_type!r}")
|
||||||
return values
|
return values
|
||||||
|
|
|
||||||
|
|
@ -61,3 +61,26 @@ def test_new_alloc_on_idle_instance_marks_it_running(registry):
|
||||||
model="M", url="http://h:8000")
|
model="M", url="http://h:8000")
|
||||||
registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "x", 300.0)
|
registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "x", 300.0)
|
||||||
assert registry.all_instances()[0].state == "running"
|
assert registry.all_instances()[0].state == "running"
|
||||||
|
|
||||||
|
|
||||||
|
def test_sweep_expired_allocations(registry):
|
||||||
|
# Register a running instance so idle-transition logic has something to act on.
|
||||||
|
registry.upsert_instance("vllm", "heimdall", 0, state="running",
|
||||||
|
model="M", url="http://h:8000")
|
||||||
|
# Create an allocation with a very short TTL (1 second).
|
||||||
|
alloc = registry.allocate("vllm", "heimdall", 0, "M", "http://h:8000", "caller", ttl_s=1)
|
||||||
|
assert registry.active_allocations("vllm", "heimdall", 0) == 1
|
||||||
|
|
||||||
|
# Wait for TTL to elapse.
|
||||||
|
time.sleep(1.1)
|
||||||
|
|
||||||
|
expired = registry.sweep_expired_allocations()
|
||||||
|
|
||||||
|
# The allocation should have been swept.
|
||||||
|
assert alloc.allocation_id in expired
|
||||||
|
assert registry.active_allocations("vllm", "heimdall", 0) == 0
|
||||||
|
|
||||||
|
# The instance should have transitioned to idle since no allocations remain.
|
||||||
|
instance = registry.all_instances()[0]
|
||||||
|
assert instance.state == "idle"
|
||||||
|
assert instance.idle_since is not None
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue