docs: revise queue optimizer spec after review

Addresses 16 review findings across two passes:
- Clarify _active.pop/double-decrement non-issue
- Fix app.py change target (inline SQL, not kill_stuck_tasks)
- Scope durability to LLM types only
- Add _budgets to state table with load logic
- Fix singleton safety explanation (lock, not GIL)
- Ghost row fix: mark dropped tasks failed in DB
- Document static _available_vram as known limitation
- Fix test_llm_tasks_batch_by_type description
- Eliminate circular import via routing split in submit_task()
- Add missing budget warning at construction
This commit is contained in:
pyr0ball 2026-03-14 16:46:38 -07:00
parent 9fcfe7daa1
commit 7983f3365d

View file

@ -13,7 +13,7 @@ On single-GPU and CPU-only systems, the background task runner spawns a daemon t
The root issue is that `submit_task()` is a spawn-per-task model with no scheduling layer. SQLite's `background_tasks` table is a status log, not a consumed work queue. The root issue is that `submit_task()` is a spawn-per-task model with no scheduling layer. SQLite's `background_tasks` table is a status log, not a consumed work queue.
Additionally, on restart all `queued` tasks are cleared to `failed`, discarding pending work. Additionally, on restart all `queued` and `running` tasks are cleared to `failed` (inline SQL in `app.py`'s `_startup()`), discarding pending work that had not yet started executing.
--- ---
@ -22,7 +22,7 @@ Additionally, on restart all `queued` tasks are cleared to `failed`, discarding
- Eliminate unnecessary model switching by batching LLM tasks by type - Eliminate unnecessary model switching by batching LLM tasks by type
- Allow concurrent model execution when VRAM permits multiple models simultaneously - Allow concurrent model execution when VRAM permits multiple models simultaneously
- Preserve FIFO ordering within each task type - Preserve FIFO ordering within each task type
- Survive process restarts — `queued` tasks resume after restart - Survive process restarts — `queued` tasks resume after restart; only `running` tasks (whose results are unknown) are reset to `failed`
- Apply to all tiers (no tier gating) - Apply to all tiers (no tier gating)
- Keep non-LLM tasks (discovery, email sync, scrape, enrich) unaffected — they continue to spawn free threads - Keep non-LLM tasks (discovery, email sync, scrape, enrich) unaffected — they continue to spawn free threads
@ -34,6 +34,8 @@ Additionally, on restart all `queued` tasks are cleared to `failed`, discarding
- Adding new task types - Adding new task types
- Tier gating on the scheduler - Tier gating on the scheduler
- Persistent task history in memory - Persistent task history in memory
- Durability for non-LLM task types (discovery, email_sync, etc. — these do not survive restarts, same as current behavior)
- Dynamic VRAM tracking — `_available_vram` is read once at startup and not refreshed (see Known Limitations)
--- ---
@ -41,40 +43,66 @@ Additionally, on restart all `queued` tasks are cleared to `failed`, discarding
### Task Classification ### Task Classification
``` ```python
LLM_TASK_TYPES = {"cover_letter", "company_research", "wizard_generate"} LLM_TASK_TYPES = {"cover_letter", "company_research", "wizard_generate"}
``` ```
All other task types (`discovery`, `email_sync`, `scrape_url`, `enrich_descriptions`, The routing rule is: if `task_type in LLM_TASK_TYPES`, route through the scheduler. Everything else spawns a free thread unchanged from the current implementation. **Future task types default to bypass mode** unless explicitly added to `LLM_TASK_TYPES` — which is the safe default (bypass = current behavior).
`enrich_craigslist`, `prepare_training`) bypass the scheduler and spawn free threads,
unchanged from the current implementation. `LLM_TASK_TYPES` is defined in `scripts/task_scheduler.py` and imported by `scripts/task_runner.py` for routing. This import direction (task_runner imports from task_scheduler) avoids circular imports because `task_scheduler.py` does **not** import from `task_runner.py`.
Current non-LLM types (all bypass scheduler): `discovery`, `email_sync`, `scrape_url`, `enrich_descriptions`, `enrich_craigslist`, `prepare_training`.
### Routing in `submit_task()` — No Circular Import
The routing split lives entirely in `submit_task()` in `task_runner.py`:
```python
def submit_task(db_path, task_type, job_id=None, params=None):
task_id, is_new = insert_task(db_path, task_type, job_id or 0, params=params)
if is_new:
from scripts.task_scheduler import get_scheduler, LLM_TASK_TYPES
if task_type in LLM_TASK_TYPES:
get_scheduler(db_path).enqueue(task_id, task_type, job_id or 0, params)
else:
t = threading.Thread(
target=_run_task,
args=(db_path, task_id, task_type, job_id or 0, params),
daemon=True,
)
t.start()
return task_id, is_new
```
`TaskScheduler.enqueue()` only handles LLM task types and never imports or calls `_run_task`. This eliminates any circular import between `task_runner` and `task_scheduler`.
### Component Overview ### Component Overview
``` ```
submit_task() ──→ TaskScheduler.enqueue(task_id, task_type, job_id, params) submit_task()
├── LLM task? ──→ per-type deque ──→ Scheduler loop ├── task_type in LLM_TASK_TYPES?
│ │ │ │ yes │ no
└── Non-LLM task? ──→ spawn thread (unchanged) │ ▼ ▼
│ get_scheduler().enqueue() spawn free thread (unchanged)
┌─────────────────────────────┘ │ │
│ ▼
Scheduling cycle │ per-type deque
(wakes on enqueue or batch completion) │ │
│ ▼
Clean up finished batches, release VRAM │ Scheduler loop (daemon thread)
│ (wakes on enqueue or batch completion)
Sort eligible types by queue depth (desc) │ │
│ Sort eligible types by queue depth (desc)
For each type: │ │
reserved_vram + budget[type] ≤ available_vram? │ For each type:
│ yes │ no │ reserved_vram + budget[type] ≤ available_vram?
▼ ▼ │ │ yes │ no
Start batch worker skip (wait for slot) │ ▼ ▼
(serial: one task at a time) │ Start batch worker skip (wait for slot)
│ (serial: one task at a time)
Batch worker signals done → scheduler re-evaluates │ │
│ Batch worker signals done → scheduler re-evaluates
``` ```
### New File: `scripts/task_scheduler.py` ### New File: `scripts/task_scheduler.py`
@ -85,12 +113,31 @@ submit_task() ──→ TaskScheduler.enqueue(task_id, task_type, job_id, para
|---|---|---| |---|---|---|
| `_queues` | `dict[str, deque[TaskSpec]]` | Per-type pending task deques | | `_queues` | `dict[str, deque[TaskSpec]]` | Per-type pending task deques |
| `_active` | `dict[str, Thread]` | Currently running batch worker per type | | `_active` | `dict[str, Thread]` | Currently running batch worker per type |
| `_reserved_vram` | `float` | Sum of VRAM budgets for active batches | | `_budgets` | `dict[str, float]` | VRAM budget per task type (GB). Loaded at construction by merging `DEFAULT_VRAM_BUDGETS` with `scheduler.vram_budgets` from `config/llm.yaml`. Config path derived from `db_path` (e.g. `db_path.parent.parent / "config/llm.yaml"`). Missing file or key → defaults used as-is. At construction, a warning is logged for any type in `LLM_TASK_TYPES` with no budget entry after the merge. |
| `_available_vram` | `float` | Total VRAM from `get_gpus()`; 999.0 on CPU-only | | `_reserved_vram` | `float` | Sum of `_budgets` values for currently active type batches |
| `_available_vram` | `float` | Total VRAM from `get_gpus()` summed across all GPUs at construction; 999.0 on CPU-only systems. Static — not refreshed after startup (see Known Limitations). |
| `_max_queue_depth` | `int` | Max tasks per type queue before drops. From `scheduler.max_queue_depth` in config; default 500. |
| `_lock` | `threading.Lock` | Protects all mutable scheduler state | | `_lock` | `threading.Lock` | Protects all mutable scheduler state |
| `_wake` | `threading.Event` | Pulsed on enqueue or batch completion | | `_wake` | `threading.Event` | Pulsed on enqueue or batch completion |
| `_stop` | `threading.Event` | Set by `shutdown()` to terminate the loop | | `_stop` | `threading.Event` | Set by `shutdown()` to terminate the loop |
**Default VRAM budgets (module-level constant):**
```python
DEFAULT_VRAM_BUDGETS: dict[str, float] = {
"cover_letter": 2.5, # alex-cover-writer:latest (~2GB GGUF + headroom)
"company_research": 5.0, # llama3.1:8b or vllm model
"wizard_generate": 2.5, # same model family as cover_letter
}
```
At construction, the scheduler validates that every type in `LLM_TASK_TYPES` has an entry
in the merged `_budgets`. If any type is missing, a warning is logged:
```
WARNING task_scheduler: No VRAM budget defined for LLM task type 'foo' — defaulting to 0.0 GB (unlimited concurrency for this type)
```
**Scheduler loop:** **Scheduler loop:**
```python ```python
@ -99,7 +146,11 @@ while not _stop.is_set():
_wake.clear() _wake.clear()
with _lock: with _lock:
# Release finished batches # Defense in depth: reap dead threads not yet cleaned by their finally block.
# In the normal path, a batch worker's finally block calls _active.pop() and
# decrements _reserved_vram BEFORE firing _wake — so by the time we scan here,
# the entry is already gone and there is no double-decrement risk.
# This reap only catches threads killed externally (daemon exit on shutdown).
for t, thread in list(_active.items()): for t, thread in list(_active.items()):
if not thread.is_alive(): if not thread.is_alive():
_reserved_vram -= _budgets.get(t, 0) _reserved_vram -= _budgets.get(t, 0)
@ -112,7 +163,7 @@ while not _stop.is_set():
reverse=True, reverse=True,
) )
for task_type in candidates: for task_type in candidates:
budget = _budgets.get(task_type, DEFAULT_VRAM_BUDGETS.get(task_type, 0)) budget = _budgets.get(task_type, 0)
if _reserved_vram + budget <= _available_vram: if _reserved_vram + budget <= _available_vram:
thread = Thread(target=_batch_worker, args=(task_type,), daemon=True) thread = Thread(target=_batch_worker, args=(task_type,), daemon=True)
_active[task_type] = thread _active[task_type] = thread
@ -122,6 +173,11 @@ while not _stop.is_set():
**Batch worker:** **Batch worker:**
The `finally` block is the single authoritative path for releasing `_reserved_vram` and
removing the entry from `_active`. Because `_active.pop` runs in `finally` before
`_wake.set()`, the scheduler loop's dead-thread scan will never find this entry —
no double-decrement is possible in the normal execution path.
```python ```python
def _batch_worker(task_type: str) -> None: def _batch_worker(task_type: str) -> None:
try: try:
@ -138,95 +194,137 @@ def _batch_worker(task_type: str) -> None:
_wake.set() _wake.set()
``` ```
Tasks arriving mid-batch for an already-active type are appended to the deque and `_run_task` here refers to `task_runner._run_task`, passed in as a callable at
picked up naturally by the running batch worker — no re-scheduling needed. construction (e.g. `self._run_task = run_task_fn`). The caller (`task_runner.py`)
passes `_run_task` when constructing the scheduler, avoiding any import of `task_runner`
from within `task_scheduler`.
**Singleton access:** **`enqueue()` method:**
`enqueue()` only accepts LLM task types. Non-LLM routing is handled in `submit_task()`
before `enqueue()` is called (see Routing section above).
```python
def enqueue(self, task_id: int, task_type: str, job_id: int, params: str | None) -> None:
with self._lock:
q = self._queues.setdefault(task_type, deque())
if len(q) >= self._max_queue_depth:
logger.warning(
"Queue depth limit reached for %s (max=%d) — task %d dropped",
task_type, self._max_queue_depth, task_id,
)
update_task_status(self._db_path, task_id, "failed",
error="Queue depth limit reached")
return
q.append(TaskSpec(task_id, job_id, params))
self._wake.set()
```
When a task is dropped at the depth limit, `update_task_status()` marks it `failed` in
SQLite immediately — the row inserted by `insert_task()` is never left as a permanent
ghost in `queued` state.
**Singleton access — thread-safe initialization:**
```python ```python
_scheduler: TaskScheduler | None = None _scheduler: TaskScheduler | None = None
_scheduler_lock = threading.Lock()
def get_scheduler(db_path: Path) -> TaskScheduler: def get_scheduler(db_path: Path) -> TaskScheduler:
global _scheduler global _scheduler
if _scheduler is None: if _scheduler is None: # fast path — avoids lock on steady state
_scheduler = TaskScheduler(db_path) with _scheduler_lock:
_scheduler.start() if _scheduler is None: # re-check under lock (double-checked locking)
_scheduler = TaskScheduler(db_path)
_scheduler.start()
return _scheduler return _scheduler
def reset_scheduler() -> None: def reset_scheduler() -> None:
"""Tear down and clear singleton. Test teardown only.""" """Tear down and clear singleton. Test teardown only."""
global _scheduler global _scheduler
if _scheduler: with _scheduler_lock:
_scheduler.shutdown() if _scheduler:
_scheduler = None _scheduler.shutdown()
_scheduler = None
``` ```
### VRAM Budget Configuration The safety guarantee comes from the **inner `with _scheduler_lock:` block and re-check**,
not from GIL atomicity. The outer `if _scheduler is None` is a performance optimization
(avoid acquiring the lock on every `submit_task()` call once the scheduler is running).
Two threads racing at startup will both pass the outer check, but only one will win the
inner lock and construct the scheduler; the other will see a non-None value on its
inner re-check and return the already-constructed instance.
Declared in `config/llm.yaml` under a `scheduler:` key: ---
```yaml ## Required Call Ordering in `app.py`
scheduler:
vram_budgets:
cover_letter: 2.5 # alex-cover-writer:latest (~2GB GGUF + headroom)
company_research: 5.0 # llama3.1:8b or vllm model
wizard_generate: 2.5 # same model family as cover_letter
max_queue_depth: 500
```
Defaults (used when key absent — backwards compatible with existing installs): `reset_running_tasks()` **must complete before** `get_scheduler()` is ever called.
The scheduler's durability query reads `status='queued'` rows; if `reset_running_tasks()`
has not yet run, a row stuck in `status='running'` from a prior crash would be loaded
into the deque and re-executed, producing a duplicate result.
In practice, the first call to `get_scheduler()` is triggered by the `submit_task()` call
inside `_startup()`'s SearXNG auto-recovery block — not by a user action. The ordering
holds because `reset_running_tasks()` is called on an earlier line within the same
`_startup()` function body. **Do not reorder these calls.**
```python ```python
DEFAULT_VRAM_BUDGETS = { @st.cache_resource
"cover_letter": 2.5, def _startup() -> None:
"company_research": 5.0, # Step 1: Reset interrupted tasks — MUST come first
"wizard_generate": 2.5, from scripts.db import reset_running_tasks
} reset_running_tasks(get_db_path())
# Step 2 (later in same function): SearXNG re-queue calls submit_task(),
# which triggers get_scheduler() for the first time. Ordering is guaranteed
# because _startup() runs synchronously and step 1 is already complete.
conn = sqlite3.connect(get_db_path())
# ... existing SearXNG re-queue logic using conn ...
conn.close()
``` ```
`_available_vram` is read from `preflight.get_gpus()` at scheduler startup (sum across
all GPUs). CPU-only systems get `_available_vram = 999.0`, allowing all type batches to
run concurrently — preserving existing behavior on CPU installs.
### Memory Safety
- **Batch worker `finally` block** — always releases `_reserved_vram` and fires `_wake`,
even if `_run_task()` raises. Prevents permanently wedged VRAM reservations.
- **Scheduler loop reaps dead threads**`thread.is_alive()` check catches any worker
that exits without firing `_wake` (defense in depth).
- **Max queue depth**`enqueue()` rejects tasks past `max_queue_depth` with a logged
warning. Prevents unbounded memory growth under pathological conditions.
- **No in-memory history** — completed/failed state lives exclusively in SQLite. Deques
hold only pending `TaskSpec` namedtuples. Memory footprint is `O(pending tasks)`.
- **`reset_scheduler()`** — explicit teardown for test isolation. Sets `_stop` event,
joins the scheduler thread (with timeout), clears the module-level reference.
--- ---
## Changes to Existing Files ## Changes to Existing Files
### `scripts/task_runner.py` ### `scripts/task_runner.py`
`submit_task()` becomes a thin shim: `submit_task()` gains routing logic; `_run_task` is passed to the scheduler at first call:
```python ```python
def submit_task(db_path, task_type, job_id=None, params=None): def submit_task(db_path, task_type, job_id=None, params=None):
task_id, is_new = insert_task(db_path, task_type, job_id or 0, params=params) task_id, is_new = insert_task(db_path, task_type, job_id or 0, params=params)
if is_new: if is_new:
from scripts.task_scheduler import get_scheduler from scripts.task_scheduler import get_scheduler, LLM_TASK_TYPES
get_scheduler(db_path).enqueue(task_id, task_type, job_id or 0, params) if task_type in LLM_TASK_TYPES:
get_scheduler(db_path, run_task_fn=_run_task).enqueue(
task_id, task_type, job_id or 0, params
)
else:
t = threading.Thread(
target=_run_task,
args=(db_path, task_id, task_type, job_id or 0, params),
daemon=True,
)
t.start()
return task_id, is_new return task_id, is_new
``` ```
`_run_task()` and all task handler branches remain unchanged. `get_scheduler()` accepts `run_task_fn` only on first call (when constructing); subsequent
calls ignore it (singleton already initialized). `_run_task()` and all handler branches
remain unchanged.
### `scripts/db.py` ### `scripts/db.py`
Add `reset_running_tasks()` helper (alongside existing `kill_stuck_tasks()`): Add `reset_running_tasks()` alongside the existing `kill_stuck_tasks()`. Like
`kill_stuck_tasks()`, it uses a plain `sqlite3.connect()` — consistent with the
existing pattern in this file, and appropriate because this call happens before the
app's connection pooling is established:
```python ```python
def reset_running_tasks(db_path: Path = DEFAULT_DB) -> int: def reset_running_tasks(db_path: Path = DEFAULT_DB) -> int:
"""On restart: mark in-flight tasks failed. Queued tasks are left for scheduler.""" """On restart: mark in-flight tasks failed. Queued tasks survive for the scheduler."""
conn = sqlite3.connect(db_path) conn = sqlite3.connect(db_path)
count = conn.execute( count = conn.execute(
"UPDATE background_tasks SET status='failed', error='Interrupted by restart'," "UPDATE background_tasks SET status='failed', error='Interrupted by restart',"
@ -239,20 +337,37 @@ def reset_running_tasks(db_path: Path = DEFAULT_DB) -> int:
### `app/app.py` ### `app/app.py`
Replace `kill_stuck_tasks()` call with `reset_running_tasks()` on startup: Inside `_startup()`, replace the inline SQL block that wipes both `queued` and `running`
rows with a call to `reset_running_tasks()`. The replacement must be the **first operation
in `_startup()`** — before the SearXNG re-queue logic that calls `submit_task()`:
```python ```python
# Before # REMOVE this block:
kill_stuck_tasks(db_path) conn.execute(
"UPDATE background_tasks SET status='failed', error='Interrupted by server restart',"
" finished_at=datetime('now') WHERE status IN ('queued','running')"
)
# After — queued tasks survive for the scheduler to resume # ADD at the top of _startup(), before any submit_task() calls:
reset_running_tasks(db_path) from scripts.db import reset_running_tasks
# Scheduler reads surviving 'queued' rows during get_scheduler() startup reset_running_tasks(get_db_path())
``` ```
The existing `conn` used for subsequent SearXNG logic is unaffected — `reset_running_tasks()`
opens and closes its own connection.
### `config/llm.yaml.example` ### `config/llm.yaml.example`
Add `scheduler:` section documenting VRAM budget keys. Add `scheduler:` section:
```yaml
scheduler:
vram_budgets:
cover_letter: 2.5 # alex-cover-writer:latest (~2GB GGUF + headroom)
company_research: 5.0 # llama3.1:8b or vllm model
wizard_generate: 2.5 # same model family as cover_letter
max_queue_depth: 500
```
--- ---
@ -262,49 +377,91 @@ No schema changes. The existing `background_tasks` table supports all scheduler
| Column | Scheduler use | | Column | Scheduler use |
|---|---| |---|---|
| `task_type` | Queue routing | | `task_type` | Queue routing — determines which deque receives the task |
| `status` | `queued`pending; `running` → active; `completed`/`failed` → done | | `status` | `queued`in deque; `running` → batch worker executing; `completed`/`failed` → done |
| `created_at` | FIFO ordering within type | | `created_at` | FIFO ordering within type (durability startup query sorts by this) |
| `params` | Passed through to `_run_task()` unchanged | | `params` | Passed through to `_run_task()` unchanged |
--- ---
## Durability ## Durability
On startup, `TaskScheduler.__init__()` queries: Scope: **LLM task types only** (`cover_letter`, `company_research`, `wizard_generate`).
Non-LLM tasks do not survive restarts, same as current behavior.
On construction, `TaskScheduler.__init__()` queries:
```sql ```sql
SELECT id, task_type, job_id, params SELECT id, task_type, job_id, params
FROM background_tasks FROM background_tasks
WHERE status = 'queued' WHERE status = 'queued'
AND task_type IN ('cover_letter', 'company_research', 'wizard_generate')
ORDER BY created_at ASC ORDER BY created_at ASC
``` ```
LLM tasks are pushed onto their respective deques. Non-LLM tasks (which don't survive Results are pushed onto their respective deques. This query runs inside `__init__` before
restarts under the current model) are re-spawned as free threads. `start()` is called (before the scheduler loop thread exists), so there is no concurrency
concern with deque population.
`running` rows are reset to `failed` by `reset_running_tasks()` before the scheduler `running` rows are reset to `failed` by `reset_running_tasks()` before `get_scheduler()`
starts — their results are unknown and must be re-submitted by the user. is called — see Required Call Ordering above.
---
## Known Limitations
**Static `_available_vram`:** Total GPU VRAM is read from `get_gpus()` once at scheduler
construction and never refreshed. Changes after startup — another process releasing VRAM,
a GPU going offline, Ollama unloading a model — are not reflected. The scheduler's
correctness depends on per-task VRAM budgets being conservative estimates of **peak model
footprint** (not free VRAM at a given moment). On a system where Ollama and vLLM share
the GPU, budgets should account for both models potentially resident simultaneously.
Dynamic VRAM polling is a future enhancement.
---
## Memory Safety
- **`finally` block owns VRAM release** — batch worker always decrements `_reserved_vram`
and removes its `_active` entry before firing `_wake`, even on exception. The scheduler
loop's dead-thread scan is defense in depth for externally-killed daemons only; it cannot
double-decrement because `_active.pop` in `finally` runs first.
- **Max queue depth with DB cleanup**`enqueue()` rejects tasks past `max_queue_depth`,
logs a warning, and immediately marks the dropped task `failed` in SQLite to prevent
permanent ghost rows in `queued` state.
- **No in-memory history** — deques hold only pending `TaskSpec` namedtuples. Completed
and failed state lives exclusively in SQLite. Memory footprint is `O(pending tasks)`.
- **Thread-safe singleton** — double-checked locking with `_scheduler_lock` prevents
double-construction. Safety comes from the inner lock + re-check; the outer `None`
check is a performance optimization only.
- **Missing budget warning** — any `LLM_TASK_TYPES` entry with no budget entry after
config merge logs a warning at construction; defaults to 0.0 GB (unlimited concurrency
for that type). This prevents silent incorrect scheduling for future task types.
- **`reset_scheduler()`** — explicit teardown for test isolation: sets `_stop`, joins
scheduler thread with timeout, clears module-level reference under `_scheduler_lock`.
--- ---
## Testing (`tests/test_task_scheduler.py`) ## Testing (`tests/test_task_scheduler.py`)
All tests mock `_run_task` to avoid real LLM calls. `reset_scheduler()` is called in
an `autouse` fixture for isolation between test cases.
| Test | What it verifies | | Test | What it verifies |
|---|---| |---|---|
| `test_llm_tasks_batch_by_type` | N cover_letter + M research enqueued; all cover_letters execute before any research when VRAM only fits one model | | `test_deepest_queue_wins_first_slot` | N cover_letter + M research enqueued (N > M); cover_letter batch starts first when `_available_vram` only fits one model budget, because it has the deeper queue |
| `test_fifo_within_type` | Arrival order preserved within a type batch | | `test_fifo_within_type` | Arrival order preserved within a type batch |
| `test_concurrent_batches_when_vram_allows` | Two type batches start simultaneously when `available_vram` fits both budgets | | `test_concurrent_batches_when_vram_allows` | Two type batches start simultaneously when `_available_vram` fits both budgets combined |
| `test_new_tasks_picked_up_mid_batch` | Task enqueued while batch is active is consumed by the running worker | | `test_new_tasks_picked_up_mid_batch` | Task enqueued via `enqueue()` while a batch is active is consumed by the running worker in the same batch |
| `test_worker_crash_releases_vram` | `_run_task` raises; `_reserved_vram` returns to 0; scheduler continues | | `test_worker_crash_releases_vram` | `_run_task` raises; `_reserved_vram` returns to 0; scheduler continues; no double-decrement |
| `test_non_llm_tasks_bypass_scheduler` | `discovery`, `email_sync` etc. spawn free threads; scheduler deques untouched | | `test_non_llm_tasks_bypass_scheduler` | `discovery`, `email_sync` etc. spawn free threads via `submit_task()`; scheduler deques untouched |
| `test_durability_on_startup` | DB has existing `queued` rows; scheduler re-enqueues them on init | | `test_durability_llm_tasks_on_startup` | DB has existing `queued` LLM-type rows; scheduler loads them into deques on construction |
| `test_running_rows_reset_on_startup` | `running` rows → `failed` via `reset_running_tasks()`; `queued` rows untouched | | `test_durability_excludes_non_llm` | `queued` non-LLM rows in DB are not loaded into deques on startup |
| `test_max_queue_depth` | Enqueue past limit logs warning and does not crash | | `test_running_rows_reset_before_scheduler` | `reset_running_tasks()` sets `running``failed`; `queued` rows untouched |
| `test_reset_scheduler_cleans_up` | `reset_scheduler()` stops loop thread; no lingering threads | | `test_max_queue_depth_marks_failed` | Enqueue past limit logs warning, does not add to deque, and marks task `failed` in DB |
| `test_missing_budget_logs_warning` | Type in `LLM_TASK_TYPES` with no budget entry at construction logs a warning |
All tests mock `_run_task` to avoid real LLM calls. `reset_scheduler()` called in | `test_singleton_thread_safe` | Concurrent calls to `get_scheduler()` produce exactly one scheduler instance |
teardown for isolation. | `test_reset_scheduler_cleans_up` | `reset_scheduler()` stops loop thread; no lingering threads after call |
--- ---
@ -312,9 +469,9 @@ teardown for isolation.
| File | Change | | File | Change |
|---|---| |---|---|
| `scripts/task_scheduler.py` | **New** — ~160 lines | | `scripts/task_scheduler.py` | **New** — ~180 lines |
| `scripts/task_runner.py` | `submit_task()` shim — ~8 lines changed | | `scripts/task_runner.py` | `submit_task()` routing shim — ~12 lines changed |
| `scripts/db.py` | `reset_running_tasks()` — ~10 lines added | | `scripts/db.py` | `reset_running_tasks()` added — ~10 lines |
| `app/app.py` | Startup: `kill_stuck_tasks``reset_running_tasks` | | `app/app.py` | `_startup()`: inline SQL block → `reset_running_tasks()` call, placed first |
| `config/llm.yaml.example` | Add `scheduler:` section | | `config/llm.yaml.example` | Add `scheduler:` section |
| `tests/test_task_scheduler.py` | **New** — ~200 lines | | `tests/test_task_scheduler.py` | **New** — ~240 lines |