Compare commits
2 commits
818e46c17e
...
15dc4b2646
| Author | SHA1 | Date | |
|---|---|---|---|
| 15dc4b2646 | |||
| 922d91fb91 |
6 changed files with 218 additions and 191 deletions
42
README.md
42
README.md
|
|
@ -1,16 +1,33 @@
|
|||
# Peregrine
|
||||
|
||||
> **Primary development** happens at [git.opensourcesolarpunk.com](https://git.opensourcesolarpunk.com/pyr0ball/peregrine) — GitHub and Codeberg are push mirrors. Issues and PRs are welcome on either platform.
|
||||
> **Primary development** happens at [git.opensourcesolarpunk.com](https://git.opensourcesolarpunk.com/Circuit-Forge/peregrine) — GitHub and Codeberg are push mirrors. Issues and PRs are welcome on either platform.
|
||||
|
||||
[](./LICENSE-BSL)
|
||||
[](https://github.com/CircuitForge/peregrine/actions/workflows/ci.yml)
|
||||
|
||||
**AI-powered job search pipeline — by [Circuit Forge LLC](https://circuitforge.tech)**
|
||||
**Job search pipeline — by [Circuit Forge LLC](https://circuitforge.tech)**
|
||||
|
||||
> *"Don't be evil, for real and forever."*
|
||||
> *"Tools for the jobs that the system made hard on purpose."*
|
||||
|
||||
Automates the full job search lifecycle: discovery → matching → cover letters → applications → interview prep.
|
||||
Privacy-first, local-first. Your data never leaves your machine.
|
||||
---
|
||||
|
||||
Job search is a second job nobody hired you for.
|
||||
|
||||
ATS filters designed to reject. Job boards that show the same listing eight times. Cover letter number forty-seven for a role that might already be filled. Hours of prep for a phone screen that lasts twelve minutes.
|
||||
|
||||
Peregrine handles the pipeline — discovery, matching, tracking, drafting, and prep — so you can spend your time doing the work you actually want to be doing.
|
||||
|
||||
**LLM support is optional.** The full discovery and tracking pipeline works without one. When you do configure a backend, the LLM drafts the parts that are genuinely miserable — cover letters, company research briefs, interview prep sheets — and waits for your approval before anything goes anywhere.
|
||||
|
||||
### What Peregrine does not do
|
||||
|
||||
Peregrine does **not** submit job applications for you. You still have to go to each employer's site and click apply yourself.
|
||||
|
||||
This is intentional. Automated mass-applying is a bad experience for everyone — it's also a trust violation with employers who took the time to post a real role. Peregrine is a preparation and organization tool, not a bot.
|
||||
|
||||
What it *does* cover is everything before and after that click: finding the jobs, matching them against your resume, generating cover letters and prep materials, and once you've applied — tracking where you stand, classifying the emails that come back, and surfacing company research when an interview lands on your calendar. The submit button is yours. The rest of the grind is ours.
|
||||
|
||||
> **Exception:** [AIHawk](https://github.com/nicolomantini/LinkedIn-Easy-Apply) is a separate, optional tool that handles LinkedIn Easy Apply automation. Peregrine integrates with it for AIHawk-compatible profiles, but it is not part of Peregrine's core pipeline.
|
||||
|
||||
---
|
||||
|
||||
|
|
@ -19,7 +36,7 @@ Privacy-first, local-first. Your data never leaves your machine.
|
|||
**1. Clone and install dependencies** (Docker, NVIDIA toolkit if needed):
|
||||
|
||||
```bash
|
||||
git clone https://git.opensourcesolarpunk.com/pyr0ball/peregrine
|
||||
git clone https://git.opensourcesolarpunk.com/Circuit-Forge/peregrine
|
||||
cd peregrine
|
||||
./manage.sh setup
|
||||
```
|
||||
|
|
@ -129,21 +146,26 @@ Re-enter the wizard any time via **Settings → Developer → Reset wizard**.
|
|||
| **Company research briefs** | Free with LLM¹ |
|
||||
| **Interview prep & practice Q&A** | Free with LLM¹ |
|
||||
| **Survey assistant** (culture-fit Q&A, screenshot analysis) | Free with LLM¹ |
|
||||
| **AI wizard helpers** (career summary, bullet expansion, skill suggestions) | Free with LLM¹ |
|
||||
| **Wizard helpers** (career summary, bullet expansion, skill suggestions, job title suggestions, mission notes) | Free with LLM¹ |
|
||||
| Managed cloud LLM (no API key needed) | Paid |
|
||||
| Email sync & auto-classification | Paid |
|
||||
| LLM-powered keyword blocklist | Paid |
|
||||
| Job tracking integrations (Notion, Airtable, Google Sheets) | Paid |
|
||||
| Calendar sync (Google, Apple) | Paid |
|
||||
| Slack notifications | Paid |
|
||||
| CircuitForge shared cover-letter model | Paid |
|
||||
| Vue 3 SPA beta UI | Paid |
|
||||
| **Voice guidelines** (custom writing style & tone) | Premium with LLM¹ ² |
|
||||
| Cover letter model fine-tuning (your writing, your model) | Premium |
|
||||
| Multi-user support | Premium |
|
||||
|
||||
¹ **BYOK unlock:** configure any LLM backend — a local [Ollama](https://ollama.com) or vLLM instance,
|
||||
or your own API key (Anthropic, OpenAI-compatible) — and all AI features marked **Free with LLM**
|
||||
¹ **BYOK (bring your own key/backend) unlock:** configure any LLM backend — a local [Ollama](https://ollama.com) or vLLM instance,
|
||||
or your own API key (Anthropic, OpenAI-compatible) — and all features marked **Free with LLM** or **Premium with LLM**
|
||||
unlock at no charge. The paid tier earns its price by providing managed cloud inference so you
|
||||
don't need a key at all, plus integrations and email sync.
|
||||
|
||||
² **Voice guidelines** requires Premium tier without a configured LLM backend. With BYOK, it unlocks at any tier.
|
||||
|
||||
---
|
||||
|
||||
## Email Sync
|
||||
|
|
@ -201,6 +223,6 @@ Full documentation at: https://docs.circuitforge.tech/peregrine
|
|||
## License
|
||||
|
||||
Core discovery pipeline: [MIT](LICENSE-MIT)
|
||||
AI features (cover letter generation, company research, interview prep, UI): [BSL 1.1](LICENSE-BSL)
|
||||
LLM features (cover letter generation, company research, interview prep, UI): [BSL 1.1](LICENSE-BSL)
|
||||
|
||||
© 2026 Circuit Forge LLC
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
name: job-seeker
|
||||
name: cf
|
||||
# Recreate: conda env create -f environment.yml
|
||||
# Update pinned snapshot: conda env export --no-builds > environment.yml
|
||||
channels:
|
||||
|
|
|
|||
|
|
@ -94,7 +94,7 @@ case "$CMD" in
|
|||
|
||||
models)
|
||||
info "Checking ollama models..."
|
||||
conda run -n job-seeker python scripts/preflight.py --models-only
|
||||
conda run -n cf python scripts/preflight.py --models-only
|
||||
success "Model check complete."
|
||||
;;
|
||||
|
||||
|
|
@ -190,7 +190,7 @@ case "$CMD" in
|
|||
RUNNER=""
|
||||
fi
|
||||
info "Running E2E tests (mode=${MODE}, headless=${HEADLESS})..."
|
||||
$RUNNER conda run -n job-seeker pytest tests/e2e/ \
|
||||
$RUNNER conda run -n cf pytest tests/e2e/ \
|
||||
--mode="${MODE}" \
|
||||
--json-report \
|
||||
--json-report-file="${RESULTS_DIR}/report.json" \
|
||||
|
|
|
|||
|
|
@ -9,10 +9,13 @@ and marks the task completed or failed.
|
|||
Deduplication: only one queued/running task per (task_type, job_id) is allowed.
|
||||
Different task types for the same job run concurrently (e.g. cover letter + research).
|
||||
"""
|
||||
import logging
|
||||
import sqlite3
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
from scripts.db import (
|
||||
DEFAULT_DB,
|
||||
insert_task,
|
||||
|
|
@ -20,6 +23,7 @@ from scripts.db import (
|
|||
update_task_stage,
|
||||
update_cover_letter,
|
||||
save_research,
|
||||
save_optimized_resume,
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -39,9 +43,13 @@ def submit_task(db_path: Path = DEFAULT_DB, task_type: str = "",
|
|||
if is_new:
|
||||
from scripts.task_scheduler import get_scheduler, LLM_TASK_TYPES
|
||||
if task_type in LLM_TASK_TYPES:
|
||||
get_scheduler(db_path, run_task_fn=_run_task).enqueue(
|
||||
enqueued = get_scheduler(db_path, run_task_fn=_run_task).enqueue(
|
||||
task_id, task_type, job_id or 0, params
|
||||
)
|
||||
if not enqueued:
|
||||
update_task_status(
|
||||
db_path, task_id, "failed", error="Queue depth limit reached"
|
||||
)
|
||||
else:
|
||||
t = threading.Thread(
|
||||
target=_run_task,
|
||||
|
|
@ -261,6 +269,48 @@ def _run_task(db_path: Path, task_id: int, task_type: str, job_id: int,
|
|||
)
|
||||
return
|
||||
|
||||
elif task_type == "resume_optimize":
|
||||
import json as _json
|
||||
from scripts.resume_parser import structure_resume
|
||||
from scripts.resume_optimizer import (
|
||||
extract_jd_signals,
|
||||
prioritize_gaps,
|
||||
rewrite_for_ats,
|
||||
hallucination_check,
|
||||
render_resume_text,
|
||||
)
|
||||
from scripts.user_profile import load_user_profile
|
||||
|
||||
description = job.get("description", "")
|
||||
resume_path = load_user_profile().get("resume_path", "")
|
||||
|
||||
# Parse the candidate's resume
|
||||
update_task_stage(db_path, task_id, "parsing resume")
|
||||
resume_text = Path(resume_path).read_text(errors="replace") if resume_path else ""
|
||||
resume_struct, parse_err = structure_resume(resume_text)
|
||||
|
||||
# Extract keyword gaps and build gap report (free tier)
|
||||
update_task_stage(db_path, task_id, "extracting keyword gaps")
|
||||
gaps = extract_jd_signals(description, resume_text)
|
||||
prioritized = prioritize_gaps(gaps, resume_struct)
|
||||
gap_report = _json.dumps(prioritized, indent=2)
|
||||
|
||||
# Full rewrite (paid tier only)
|
||||
rewritten_text = ""
|
||||
p = _json.loads(params or "{}")
|
||||
if p.get("full_rewrite", False):
|
||||
update_task_stage(db_path, task_id, "rewriting resume sections")
|
||||
candidate_voice = load_user_profile().get("candidate_voice", "")
|
||||
rewritten = rewrite_for_ats(resume_struct, prioritized, job, candidate_voice)
|
||||
if hallucination_check(resume_struct, rewritten):
|
||||
rewritten_text = render_resume_text(rewritten)
|
||||
else:
|
||||
log.warning("[task_runner] resume_optimize hallucination check failed for job %d", job_id)
|
||||
|
||||
save_optimized_resume(db_path, job_id=job_id,
|
||||
text=rewritten_text,
|
||||
gap_report=gap_report)
|
||||
|
||||
elif task_type == "prepare_training":
|
||||
from scripts.prepare_training_data import build_records, write_jsonl, DEFAULT_OUTPUT
|
||||
records = build_records()
|
||||
|
|
|
|||
|
|
@ -1,232 +1,176 @@
|
|||
# scripts/task_scheduler.py
|
||||
"""Resource-aware batch scheduler for LLM background tasks.
|
||||
"""Peregrine LLM task scheduler — thin shim over circuitforge_core.tasks.scheduler.
|
||||
|
||||
Routes LLM task types through per-type deques with VRAM-aware scheduling.
|
||||
Non-LLM tasks bypass this module — routing lives in scripts/task_runner.py.
|
||||
All scheduling logic lives in circuitforge_core. This module defines
|
||||
Peregrine-specific task types, VRAM budgets, and config loading.
|
||||
|
||||
Public API:
|
||||
LLM_TASK_TYPES — set of task type strings routed through the scheduler
|
||||
get_scheduler() — lazy singleton accessor
|
||||
reset_scheduler() — test teardown only
|
||||
Public API (unchanged — callers do not need to change):
|
||||
LLM_TASK_TYPES — frozenset of task type strings routed through the scheduler
|
||||
DEFAULT_VRAM_BUDGETS — dict of conservative peak VRAM estimates per task type
|
||||
TaskSpec — lightweight task descriptor (re-exported from core)
|
||||
TaskScheduler — backward-compatible wrapper around the core scheduler class
|
||||
get_scheduler() — returns the process-level TaskScheduler singleton
|
||||
reset_scheduler() — test teardown only
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
import threading
|
||||
from collections import deque, namedtuple
|
||||
from pathlib import Path
|
||||
from typing import Callable, Optional
|
||||
|
||||
# Module-level import so tests can monkeypatch scripts.task_scheduler._get_gpus
|
||||
try:
|
||||
from scripts.preflight import get_gpus as _get_gpus
|
||||
except Exception: # graceful degradation if preflight unavailable
|
||||
_get_gpus = lambda: []
|
||||
from circuitforge_core.tasks.scheduler import (
|
||||
TaskSpec, # re-export unchanged
|
||||
TaskScheduler as _CoreTaskScheduler,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Task types that go through the scheduler (all others spawn free threads)
|
||||
# ── Peregrine task types and VRAM budgets ─────────────────────────────────────
|
||||
|
||||
LLM_TASK_TYPES: frozenset[str] = frozenset({
|
||||
"cover_letter",
|
||||
"company_research",
|
||||
"wizard_generate",
|
||||
"resume_optimize",
|
||||
})
|
||||
|
||||
# Conservative peak VRAM estimates (GB) per task type.
|
||||
# Overridable per-install via scheduler.vram_budgets in config/llm.yaml.
|
||||
DEFAULT_VRAM_BUDGETS: dict[str, float] = {
|
||||
"cover_letter": 2.5, # alex-cover-writer:latest (~2GB GGUF + headroom)
|
||||
"cover_letter": 2.5, # alex-cover-writer:latest (~2 GB GGUF + headroom)
|
||||
"company_research": 5.0, # llama3.1:8b or vllm model
|
||||
"wizard_generate": 2.5, # same model family as cover_letter
|
||||
"resume_optimize": 5.0, # section-by-section rewrite; same budget as research
|
||||
}
|
||||
|
||||
# Lightweight task descriptor stored in per-type deques
|
||||
TaskSpec = namedtuple("TaskSpec", ["id", "job_id", "params"])
|
||||
_DEFAULT_MAX_QUEUE_DEPTH = 500
|
||||
|
||||
|
||||
class TaskScheduler:
|
||||
"""Resource-aware LLM task batch scheduler. Use get_scheduler() — not direct construction."""
|
||||
def _load_config_overrides(db_path: Path) -> tuple[dict[str, float], int]:
|
||||
"""Load VRAM budget overrides and max_queue_depth from config/llm.yaml."""
|
||||
budgets = dict(DEFAULT_VRAM_BUDGETS)
|
||||
max_depth = _DEFAULT_MAX_QUEUE_DEPTH
|
||||
config_path = db_path.parent.parent / "config" / "llm.yaml"
|
||||
if config_path.exists():
|
||||
try:
|
||||
import yaml
|
||||
with open(config_path) as f:
|
||||
cfg = yaml.safe_load(f) or {}
|
||||
sched_cfg = cfg.get("scheduler", {})
|
||||
budgets.update(sched_cfg.get("vram_budgets", {}))
|
||||
max_depth = int(sched_cfg.get("max_queue_depth", max_depth))
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Failed to load scheduler config from %s: %s", config_path, exc
|
||||
)
|
||||
return budgets, max_depth
|
||||
|
||||
|
||||
# Module-level stub so tests can monkeypatch scripts.task_scheduler._get_gpus
|
||||
# (existing tests monkeypatch this symbol — keep it here for backward compat).
|
||||
try:
|
||||
from scripts.preflight import get_gpus as _get_gpus
|
||||
except Exception:
|
||||
_get_gpus = lambda: [] # noqa: E731
|
||||
|
||||
|
||||
class TaskScheduler(_CoreTaskScheduler):
|
||||
"""Peregrine-specific TaskScheduler.
|
||||
|
||||
Extends circuitforge_core.tasks.scheduler.TaskScheduler with:
|
||||
- Peregrine default VRAM budgets and task types wired into __init__
|
||||
- Config loading from config/llm.yaml
|
||||
- Backward-compatible two-argument __init__ signature (db_path, run_task_fn)
|
||||
- _get_gpus monkeypatch support (existing tests patch this module-level symbol)
|
||||
- Backward-compatible enqueue() that marks dropped tasks failed in the DB
|
||||
and logs under the scripts.task_scheduler logger
|
||||
|
||||
Direct construction is still supported for tests; production code should
|
||||
use get_scheduler() instead.
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: Path, run_task_fn: Callable) -> None:
|
||||
self._db_path = db_path
|
||||
self._run_task = run_task_fn
|
||||
budgets, max_depth = _load_config_overrides(db_path)
|
||||
|
||||
self._lock = threading.Lock()
|
||||
self._wake = threading.Event()
|
||||
self._stop = threading.Event()
|
||||
self._queues: dict[str, deque] = {}
|
||||
self._active: dict[str, threading.Thread] = {}
|
||||
self._reserved_vram: float = 0.0
|
||||
self._thread: Optional[threading.Thread] = None
|
||||
# Resolve VRAM using module-level _get_gpus so tests can monkeypatch it
|
||||
try:
|
||||
gpus = _get_gpus()
|
||||
available_vram: float = (
|
||||
sum(g["vram_total_gb"] for g in gpus) if gpus else 999.0
|
||||
)
|
||||
except Exception:
|
||||
available_vram = 999.0
|
||||
|
||||
# Load VRAM budgets: defaults + optional config overrides
|
||||
self._budgets: dict[str, float] = dict(DEFAULT_VRAM_BUDGETS)
|
||||
config_path = db_path.parent.parent / "config" / "llm.yaml"
|
||||
self._max_queue_depth: int = 500
|
||||
if config_path.exists():
|
||||
try:
|
||||
import yaml
|
||||
with open(config_path) as f:
|
||||
cfg = yaml.safe_load(f) or {}
|
||||
sched_cfg = cfg.get("scheduler", {})
|
||||
self._budgets.update(sched_cfg.get("vram_budgets", {}))
|
||||
self._max_queue_depth = sched_cfg.get("max_queue_depth", 500)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to load scheduler config from %s: %s", config_path, exc)
|
||||
|
||||
# Warn on LLM types with no budget entry after merge
|
||||
# Warn under this module's logger for any task types with no VRAM budget
|
||||
# (mirrors the core warning but captures under scripts.task_scheduler
|
||||
# so existing tests using caplog.at_level(logger="scripts.task_scheduler") pass)
|
||||
for t in LLM_TASK_TYPES:
|
||||
if t not in self._budgets:
|
||||
if t not in budgets:
|
||||
logger.warning(
|
||||
"No VRAM budget defined for LLM task type %r — "
|
||||
"defaulting to 0.0 GB (unlimited concurrency for this type)", t
|
||||
)
|
||||
|
||||
# Detect total GPU VRAM; fall back to unlimited (999) on CPU-only systems.
|
||||
# Uses module-level _get_gpus so tests can monkeypatch scripts.task_scheduler._get_gpus.
|
||||
try:
|
||||
gpus = _get_gpus()
|
||||
self._available_vram: float = (
|
||||
sum(g["vram_total_gb"] for g in gpus) if gpus else 999.0
|
||||
)
|
||||
except Exception:
|
||||
self._available_vram = 999.0
|
||||
super().__init__(
|
||||
db_path=db_path,
|
||||
run_task_fn=run_task_fn,
|
||||
task_types=LLM_TASK_TYPES,
|
||||
vram_budgets=budgets,
|
||||
available_vram_gb=available_vram,
|
||||
max_queue_depth=max_depth,
|
||||
)
|
||||
|
||||
# Durability: reload surviving 'queued' LLM tasks from prior run
|
||||
self._load_queued_tasks()
|
||||
|
||||
def enqueue(self, task_id: int, task_type: str, job_id: int,
|
||||
params: Optional[str]) -> None:
|
||||
def enqueue(
|
||||
self,
|
||||
task_id: int,
|
||||
task_type: str,
|
||||
job_id: int,
|
||||
params: Optional[str],
|
||||
) -> bool:
|
||||
"""Add an LLM task to the scheduler queue.
|
||||
|
||||
If the queue for this type is at max_queue_depth, the task is marked
|
||||
failed in SQLite immediately (no ghost queued rows) and a warning is logged.
|
||||
When the queue is full, marks the task failed in SQLite immediately
|
||||
(backward-compatible with the original Peregrine behavior) and logs a
|
||||
warning under the scripts.task_scheduler logger.
|
||||
|
||||
Returns True if enqueued, False if the queue was full.
|
||||
"""
|
||||
from scripts.db import update_task_status
|
||||
|
||||
with self._lock:
|
||||
q = self._queues.setdefault(task_type, deque())
|
||||
if len(q) >= self._max_queue_depth:
|
||||
logger.warning(
|
||||
"Queue depth limit reached for %s (max=%d) — task %d dropped",
|
||||
task_type, self._max_queue_depth, task_id,
|
||||
)
|
||||
update_task_status(self._db_path, task_id, "failed",
|
||||
error="Queue depth limit reached")
|
||||
return
|
||||
q.append(TaskSpec(task_id, job_id, params))
|
||||
|
||||
self._wake.set()
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start the background scheduler loop thread. Call once after construction."""
|
||||
self._thread = threading.Thread(
|
||||
target=self._scheduler_loop, name="task-scheduler", daemon=True
|
||||
)
|
||||
self._thread.start()
|
||||
|
||||
def shutdown(self, timeout: float = 5.0) -> None:
|
||||
"""Signal the scheduler to stop and wait for it to exit."""
|
||||
self._stop.set()
|
||||
self._wake.set() # unblock any wait()
|
||||
if self._thread and self._thread.is_alive():
|
||||
self._thread.join(timeout=timeout)
|
||||
|
||||
def _scheduler_loop(self) -> None:
|
||||
"""Main scheduler daemon — wakes on enqueue or batch completion."""
|
||||
while not self._stop.is_set():
|
||||
self._wake.wait(timeout=30)
|
||||
self._wake.clear()
|
||||
|
||||
with self._lock:
|
||||
# Defense in depth: reap externally-killed batch threads.
|
||||
# In normal operation _active.pop() runs in finally before _wake fires,
|
||||
# so this reap finds nothing — no double-decrement risk.
|
||||
for t, thread in list(self._active.items()):
|
||||
if not thread.is_alive():
|
||||
self._reserved_vram -= self._budgets.get(t, 0.0)
|
||||
del self._active[t]
|
||||
|
||||
# Start new type batches while VRAM allows
|
||||
candidates = sorted(
|
||||
[t for t in self._queues if self._queues[t] and t not in self._active],
|
||||
key=lambda t: len(self._queues[t]),
|
||||
reverse=True,
|
||||
)
|
||||
for task_type in candidates:
|
||||
budget = self._budgets.get(task_type, 0.0)
|
||||
# Always allow at least one batch to run even if its budget
|
||||
# exceeds _available_vram (prevents permanent starvation when
|
||||
# a single type's budget is larger than the VRAM ceiling).
|
||||
if self._reserved_vram == 0.0 or self._reserved_vram + budget <= self._available_vram:
|
||||
thread = threading.Thread(
|
||||
target=self._batch_worker,
|
||||
args=(task_type,),
|
||||
name=f"batch-{task_type}",
|
||||
daemon=True,
|
||||
)
|
||||
self._active[task_type] = thread
|
||||
self._reserved_vram += budget
|
||||
thread.start()
|
||||
|
||||
def _batch_worker(self, task_type: str) -> None:
|
||||
"""Serial consumer for one task type. Runs until the type's deque is empty."""
|
||||
try:
|
||||
while True:
|
||||
with self._lock:
|
||||
q = self._queues.get(task_type)
|
||||
if not q:
|
||||
break
|
||||
task = q.popleft()
|
||||
# _run_task is scripts.task_runner._run_task (passed at construction)
|
||||
self._run_task(
|
||||
self._db_path, task.id, task_type, task.job_id, task.params
|
||||
)
|
||||
finally:
|
||||
# Always release — even if _run_task raises.
|
||||
# _active.pop here prevents the scheduler loop reap from double-decrementing.
|
||||
with self._lock:
|
||||
self._active.pop(task_type, None)
|
||||
self._reserved_vram -= self._budgets.get(task_type, 0.0)
|
||||
self._wake.set()
|
||||
|
||||
def _load_queued_tasks(self) -> None:
|
||||
"""Load pre-existing queued LLM tasks from SQLite into deques (called once in __init__)."""
|
||||
llm_types = sorted(LLM_TASK_TYPES) # sorted for deterministic SQL params in logs
|
||||
placeholders = ",".join("?" * len(llm_types))
|
||||
conn = sqlite3.connect(self._db_path)
|
||||
rows = conn.execute(
|
||||
f"SELECT id, task_type, job_id, params FROM background_tasks"
|
||||
f" WHERE status='queued' AND task_type IN ({placeholders})"
|
||||
f" ORDER BY created_at ASC",
|
||||
llm_types,
|
||||
).fetchall()
|
||||
conn.close()
|
||||
|
||||
for row_id, task_type, job_id, params in rows:
|
||||
q = self._queues.setdefault(task_type, deque())
|
||||
q.append(TaskSpec(row_id, job_id, params))
|
||||
|
||||
if rows:
|
||||
logger.info("Scheduler: resumed %d queued task(s) from prior run", len(rows))
|
||||
enqueued = super().enqueue(task_id, task_type, job_id, params)
|
||||
if not enqueued:
|
||||
# Log under this module's logger so existing caplog tests pass
|
||||
logger.warning(
|
||||
"Queue depth limit reached for %s (max=%d) — task %d dropped",
|
||||
task_type, self._max_queue_depth, task_id,
|
||||
)
|
||||
from scripts.db import update_task_status
|
||||
update_task_status(
|
||||
self._db_path, task_id, "failed", error="Queue depth limit reached"
|
||||
)
|
||||
return enqueued
|
||||
|
||||
|
||||
# ── Singleton ─────────────────────────────────────────────────────────────────
|
||||
# ── Peregrine-local singleton ──────────────────────────────────────────────────
|
||||
# We manage our own singleton (not the core one) so the process-level instance
|
||||
# is always a Peregrine TaskScheduler (with the enqueue() override).
|
||||
|
||||
_scheduler: Optional[TaskScheduler] = None
|
||||
_scheduler_lock = threading.Lock()
|
||||
|
||||
|
||||
def get_scheduler(db_path: Path, run_task_fn: Callable = None) -> TaskScheduler:
|
||||
"""Return the process-level TaskScheduler singleton, constructing it if needed.
|
||||
def get_scheduler(
|
||||
db_path: Path,
|
||||
run_task_fn: Optional[Callable] = None,
|
||||
) -> TaskScheduler:
|
||||
"""Return the process-level Peregrine TaskScheduler singleton.
|
||||
|
||||
run_task_fn is required on the first call; ignored on subsequent calls.
|
||||
Safety: inner lock + double-check prevents double-construction under races.
|
||||
The outer None check is a fast-path performance optimisation only.
|
||||
run_task_fn is required on the first call; ignored on subsequent calls
|
||||
(double-checked locking — singleton already constructed).
|
||||
"""
|
||||
global _scheduler
|
||||
if _scheduler is None: # fast path — avoids lock on steady state
|
||||
if _scheduler is None: # fast path — no lock on steady state
|
||||
with _scheduler_lock:
|
||||
if _scheduler is None: # re-check under lock (double-checked locking)
|
||||
if _scheduler is None: # re-check under lock
|
||||
if run_task_fn is None:
|
||||
raise ValueError("run_task_fn required on first get_scheduler() call")
|
||||
_scheduler = TaskScheduler(db_path, run_task_fn)
|
||||
|
|
|
|||
|
|
@ -470,3 +470,14 @@ def test_llm_tasks_routed_to_scheduler(tmp_db):
|
|||
task_runner.submit_task(tmp_db, "cover_letter", 1)
|
||||
|
||||
assert "cover_letter" in enqueue_calls
|
||||
|
||||
|
||||
def test_shim_exports_unchanged_api():
|
||||
"""Peregrine shim must re-export LLM_TASK_TYPES, get_scheduler, reset_scheduler."""
|
||||
from scripts.task_scheduler import LLM_TASK_TYPES, get_scheduler, reset_scheduler
|
||||
assert "cover_letter" in LLM_TASK_TYPES
|
||||
assert "company_research" in LLM_TASK_TYPES
|
||||
assert "wizard_generate" in LLM_TASK_TYPES
|
||||
assert "resume_optimize" in LLM_TASK_TYPES
|
||||
assert callable(get_scheduler)
|
||||
assert callable(reset_scheduler)
|
||||
|
|
|
|||
Loading…
Reference in a new issue