diff --git a/.gitignore b/.gitignore index fd29395..a9e2b5e 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,9 @@ config/user.yaml.working # Claude context files — kept out of version control CLAUDE.md +.superpowers/ +pytest-output.txt +docs/superpowers/ data/email_score.jsonl data/email_label_queue.jsonl diff --git a/docs/superpowers/plans/2026-03-14-llm-queue-optimizer.md b/docs/superpowers/plans/2026-03-14-llm-queue-optimizer.md deleted file mode 100644 index ef0dfbf..0000000 --- a/docs/superpowers/plans/2026-03-14-llm-queue-optimizer.md +++ /dev/null @@ -1,1306 +0,0 @@ -# LLM Queue Optimizer Implementation Plan - -> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Replace Peregrine's spawn-per-task LLM threading model with a resource-aware batch scheduler that groups tasks by model type, respects VRAM budgets, and survives process restarts. - -**Architecture:** A new `TaskScheduler` singleton (in `scripts/task_scheduler.py`) maintains per-type deques for LLM tasks (`cover_letter`, `company_research`, `wizard_generate`). A scheduler daemon thread picks the deepest queue that fits in available VRAM and runs it serially; multiple type batches may overlap when VRAM allows. Non-LLM tasks (`discovery`, `email_sync`, etc.) continue to spawn free threads unchanged. On restart, `queued` LLM tasks are re-loaded from SQLite; only `running` tasks (results unknown) are reset to `failed`. - -**Tech Stack:** Python 3.12, SQLite (via `scripts/db.py`), `threading`, `collections.deque`, `scripts/preflight.py` (VRAM detection), pytest - -**Spec:** `docs/superpowers/specs/2026-03-14-llm-queue-optimizer-design.md` - -**Worktree:** `/Library/Development/CircuitForge/peregrine/.worktrees/feature-llm-queue-optimizer/` - -**All commands run from worktree root.** Pytest: `/devl/miniconda3/envs/job-seeker/bin/pytest` - ---- - -## Chunk 1: Foundation - -Tasks 1–3. DB helper, config update, and skeleton module. No threading yet. - ---- - -### Task 1: `reset_running_tasks()` in `scripts/db.py` - -Adds a focused restart-safe helper that resets only `running` tasks to `failed`, leaving `queued` rows untouched for the scheduler to resume. - -**Files:** -- Modify: `scripts/db.py` (after `kill_stuck_tasks()`, ~line 367) -- Create: `tests/test_task_scheduler.py` (first test) - -- [ ] **Step 1: Create the test file with the first failing test** - -Create `tests/test_task_scheduler.py`: - -```python -# tests/test_task_scheduler.py -"""Tests for scripts/task_scheduler.py and related db helpers.""" -import sqlite3 -import threading -import time -from collections import deque -from pathlib import Path - -import pytest - -from scripts.db import init_db, reset_running_tasks - - -@pytest.fixture -def tmp_db(tmp_path): - db = tmp_path / "test.db" - init_db(db) - return db - - -def test_reset_running_tasks_resets_only_running(tmp_db): - """reset_running_tasks() marks running→failed but leaves queued untouched.""" - conn = sqlite3.connect(tmp_db) - conn.execute( - "INSERT INTO background_tasks (task_type, job_id, status) VALUES (?,?,?)", - ("cover_letter", 1, "running"), - ) - conn.execute( - "INSERT INTO background_tasks (task_type, job_id, status) VALUES (?,?,?)", - ("company_research", 2, "queued"), - ) - conn.commit() - conn.close() - - count = reset_running_tasks(tmp_db) - - conn = sqlite3.connect(tmp_db) - rows = {r[0]: r[1] for r in conn.execute( - "SELECT task_type, status FROM background_tasks" - ).fetchall()} - conn.close() - - assert count == 1 - assert rows["cover_letter"] == "failed" - assert rows["company_research"] == "queued" - - -def test_reset_running_tasks_returns_zero_when_nothing_running(tmp_db): - """Returns 0 when no running tasks exist.""" - conn = sqlite3.connect(tmp_db) - conn.execute( - "INSERT INTO background_tasks (task_type, job_id, status) VALUES (?,?,?)", - ("cover_letter", 1, "queued"), - ) - conn.commit() - conn.close() - - assert reset_running_tasks(tmp_db) == 0 -``` - -- [ ] **Step 2: Run tests to confirm they fail** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_task_scheduler.py -v -``` - -Expected: `ImportError: cannot import name 'reset_running_tasks' from 'scripts.db'` - -- [ ] **Step 3: Add `reset_running_tasks()` to `scripts/db.py`** - -Insert after `kill_stuck_tasks()` (~line 367): - -```python -def reset_running_tasks(db_path: Path = DEFAULT_DB) -> int: - """On restart: mark in-flight tasks failed. Queued tasks survive for the scheduler.""" - conn = sqlite3.connect(db_path) - count = conn.execute( - "UPDATE background_tasks SET status='failed', error='Interrupted by restart'," - " finished_at=datetime('now') WHERE status='running'" - ).rowcount - conn.commit() - conn.close() - return count -``` - -- [ ] **Step 4: Run tests to confirm they pass** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_task_scheduler.py -v -``` - -Expected: `2 passed` - -- [ ] **Step 5: Commit** - -```bash -git add scripts/db.py tests/test_task_scheduler.py -git commit -m "feat(db): add reset_running_tasks() for durable scheduler restart" -``` - ---- - -### Task 2: Add `scheduler:` section to `config/llm.yaml.example` - -Documents VRAM budgets so operators know what to configure. - -**Files:** -- Modify: `config/llm.yaml.example` (append at end) - -- [ ] **Step 1: Append scheduler config section** - -Add to the end of `config/llm.yaml.example`: - -```yaml - -# ── Scheduler — LLM batch queue optimizer ───────────────────────────────────── -# The scheduler batches LLM tasks by model type to avoid GPU model switching. -# VRAM budgets are conservative peak estimates (GB) for each task type. -# Increase if your models are larger; decrease if tasks share GPU memory well. -scheduler: - vram_budgets: - cover_letter: 2.5 # alex-cover-writer:latest (~2GB GGUF + headroom) - company_research: 5.0 # llama3.1:8b or vllm model - wizard_generate: 2.5 # same model family as cover_letter - max_queue_depth: 500 # max pending tasks per type before drops (with logged warning) -``` - -- [ ] **Step 2: Verify the file is valid YAML** - -```bash -conda run -n job-seeker python -c "import yaml; yaml.safe_load(open('config/llm.yaml.example'))" -``` - -Expected: no output (no error) - -- [ ] **Step 3: Commit** - -```bash -git add config/llm.yaml.example -git commit -m "docs(config): add scheduler VRAM budget config to llm.yaml.example" -``` - ---- - -### Task 3: Create `scripts/task_scheduler.py` skeleton - -Establishes the module with constants, `TaskSpec`, and an empty `TaskScheduler` class. Subsequent tasks fill in the implementation method by method under TDD. - -**Files:** -- Create: `scripts/task_scheduler.py` - -- [ ] **Step 1: Create the skeleton file** - -Create `scripts/task_scheduler.py`: - -```python -# scripts/task_scheduler.py -"""Resource-aware batch scheduler for LLM background tasks. - -Routes LLM task types through per-type deques with VRAM-aware scheduling. -Non-LLM tasks bypass this module — routing lives in scripts/task_runner.py. - -Public API: - LLM_TASK_TYPES — set of task type strings routed through the scheduler - get_scheduler() — lazy singleton accessor - reset_scheduler() — test teardown only -""" -import logging -import sqlite3 -import threading -from collections import deque, namedtuple -from pathlib import Path -from typing import Callable, Optional - -# Module-level import so tests can monkeypatch scripts.task_scheduler._get_gpus -try: - from scripts.preflight import get_gpus as _get_gpus -except Exception: # graceful degradation if preflight unavailable - _get_gpus = lambda: [] - -logger = logging.getLogger(__name__) - -# Task types that go through the scheduler (all others spawn free threads) -LLM_TASK_TYPES: frozenset[str] = frozenset({ - "cover_letter", - "company_research", - "wizard_generate", -}) - -# Conservative peak VRAM estimates (GB) per task type. -# Overridable per-install via scheduler.vram_budgets in config/llm.yaml. -DEFAULT_VRAM_BUDGETS: dict[str, float] = { - "cover_letter": 2.5, # alex-cover-writer:latest (~2GB GGUF + headroom) - "company_research": 5.0, # llama3.1:8b or vllm model - "wizard_generate": 2.5, # same model family as cover_letter -} - -# Lightweight task descriptor stored in per-type deques -TaskSpec = namedtuple("TaskSpec", ["id", "job_id", "params"]) - - -class TaskScheduler: - """Resource-aware LLM task batch scheduler. Use get_scheduler() — not direct construction.""" - pass - - -# ── Singleton ───────────────────────────────────────────────────────────────── - -_scheduler: Optional[TaskScheduler] = None -_scheduler_lock = threading.Lock() - - -def get_scheduler(db_path: Path, run_task_fn: Callable = None) -> TaskScheduler: - """Return the process-level TaskScheduler singleton, constructing it if needed. - - run_task_fn is required on the first call (when the singleton is constructed); - ignored on subsequent calls. Pass scripts.task_runner._run_task. - """ - raise NotImplementedError - - -def reset_scheduler() -> None: - """Shut down and clear the singleton. TEST TEARDOWN ONLY — not for production use.""" - raise NotImplementedError -``` - -- [ ] **Step 2: Verify the module imports cleanly** - -```bash -conda run -n job-seeker python -c "from scripts.task_scheduler import LLM_TASK_TYPES, TaskSpec, TaskScheduler; print('ok')" -``` - -Expected: `ok` - -- [ ] **Step 3: Commit** - -```bash -git add scripts/task_scheduler.py -git commit -m "feat(scheduler): add task_scheduler.py skeleton with constants and TaskSpec" -``` - ---- - -## Chunk 2: Scheduler Core - -Tasks 4–7. Implements `TaskScheduler` method-by-method under TDD: init, enqueue, loop, workers, singleton, and durability. - ---- - -### Task 4: `TaskScheduler.__init__()` — budget loading and VRAM detection - -**Files:** -- Modify: `scripts/task_scheduler.py` (replace `pass` in class body) -- Modify: `tests/test_task_scheduler.py` (add tests) - -- [ ] **Step 1: Add failing tests** - -Append to `tests/test_task_scheduler.py`: - -```python -from scripts.task_scheduler import ( - TaskScheduler, LLM_TASK_TYPES, DEFAULT_VRAM_BUDGETS, - get_scheduler, reset_scheduler, -) - - -def _noop_run_task(*args, **kwargs): - """Stand-in for _run_task that does nothing.""" - pass - - -@pytest.fixture(autouse=True) -def clean_scheduler(): - """Reset singleton between every test.""" - yield - reset_scheduler() - - -def test_default_budgets_used_when_no_config(tmp_db): - """Scheduler falls back to DEFAULT_VRAM_BUDGETS when config key absent.""" - s = TaskScheduler(tmp_db, _noop_run_task) - assert s._budgets == DEFAULT_VRAM_BUDGETS - - -def test_config_budgets_override_defaults(tmp_db, tmp_path): - """Values in llm.yaml scheduler.vram_budgets override defaults.""" - config_dir = tmp_db.parent.parent / "config" - config_dir.mkdir(parents=True, exist_ok=True) - (config_dir / "llm.yaml").write_text( - "scheduler:\n vram_budgets:\n cover_letter: 9.9\n" - ) - s = TaskScheduler(tmp_db, _noop_run_task) - assert s._budgets["cover_letter"] == 9.9 - # Non-overridden keys still use defaults - assert s._budgets["company_research"] == DEFAULT_VRAM_BUDGETS["company_research"] - - -def test_missing_budget_logs_warning(tmp_db, caplog): - """A type in LLM_TASK_TYPES with no budget entry logs a warning.""" - import logging - # Temporarily add a type with no budget - original = LLM_TASK_TYPES.copy() if hasattr(LLM_TASK_TYPES, 'copy') else set(LLM_TASK_TYPES) - from scripts import task_scheduler as ts - ts.LLM_TASK_TYPES = frozenset(LLM_TASK_TYPES | {"orphan_type"}) - try: - with caplog.at_level(logging.WARNING, logger="scripts.task_scheduler"): - s = TaskScheduler(tmp_db, _noop_run_task) - assert any("orphan_type" in r.message for r in caplog.records) - finally: - ts.LLM_TASK_TYPES = frozenset(original) - - -def test_cpu_only_system_gets_unlimited_vram(tmp_db, monkeypatch): - """_available_vram is 999.0 when _get_gpus() returns empty list.""" - # Patch the module-level _get_gpus in task_scheduler (not preflight) - # so __init__'s _ts_mod._get_gpus() call picks up the mock. - monkeypatch.setattr("scripts.task_scheduler._get_gpus", lambda: []) - s = TaskScheduler(tmp_db, _noop_run_task) - assert s._available_vram == 999.0 - - -def test_gpu_vram_summed_across_all_gpus(tmp_db, monkeypatch): - """_available_vram sums vram_total_gb across all detected GPUs.""" - fake_gpus = [ - {"name": "RTX 3090", "vram_total_gb": 24.0, "vram_free_gb": 20.0}, - {"name": "RTX 3090", "vram_total_gb": 24.0, "vram_free_gb": 18.0}, - ] - monkeypatch.setattr("scripts.task_scheduler._get_gpus", lambda: fake_gpus) - s = TaskScheduler(tmp_db, _noop_run_task) - assert s._available_vram == 48.0 -``` - -- [ ] **Step 2: Run to confirm failures** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_task_scheduler.py -v -k "budget or vram or warning" -``` - -Expected: multiple failures — `TaskScheduler.__init__` not implemented yet - -- [ ] **Step 3: Implement `__init__`** - -Replace `pass` in the `TaskScheduler` class with: - -```python -def __init__(self, db_path: Path, run_task_fn: Callable) -> None: - self._db_path = db_path - self._run_task = run_task_fn - - self._lock = threading.Lock() - self._wake = threading.Event() - self._stop = threading.Event() - self._queues: dict[str, deque] = {} - self._active: dict[str, threading.Thread] = {} - self._reserved_vram: float = 0.0 - self._thread: Optional[threading.Thread] = None - - # Load VRAM budgets: defaults + optional config overrides - self._budgets: dict[str, float] = dict(DEFAULT_VRAM_BUDGETS) - config_path = db_path.parent.parent / "config" / "llm.yaml" - self._max_queue_depth: int = 500 - if config_path.exists(): - try: - import yaml - with open(config_path) as f: - cfg = yaml.safe_load(f) or {} - sched_cfg = cfg.get("scheduler", {}) - self._budgets.update(sched_cfg.get("vram_budgets", {})) - self._max_queue_depth = sched_cfg.get("max_queue_depth", 500) - except Exception as exc: - logger.warning("Failed to load scheduler config from %s: %s", config_path, exc) - - # Warn on LLM types with no budget entry after merge - for t in LLM_TASK_TYPES: - if t not in self._budgets: - logger.warning( - "No VRAM budget defined for LLM task type %r — " - "defaulting to 0.0 GB (unlimited concurrency for this type)", t - ) - - # Detect total GPU VRAM; fall back to unlimited (999) on CPU-only systems. - # Uses module-level _get_gpus so tests can monkeypatch scripts.task_scheduler._get_gpus. - try: - from scripts import task_scheduler as _ts_mod - gpus = _ts_mod._get_gpus() - self._available_vram: float = ( - sum(g["vram_total_gb"] for g in gpus) if gpus else 999.0 - ) - except Exception: - self._available_vram = 999.0 -``` - -- [ ] **Step 4: Run tests to confirm they pass** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_task_scheduler.py -v -k "budget or vram or warning" -``` - -Expected: 5 passed - -- [ ] **Step 5: Commit** - -```bash -git add scripts/task_scheduler.py tests/test_task_scheduler.py -git commit -m "feat(scheduler): implement TaskScheduler.__init__ with budget loading and VRAM detection" -``` - ---- - -### Task 5: `TaskScheduler.enqueue()` — depth guard and ghost-row cleanup - -**Files:** -- Modify: `scripts/task_scheduler.py` (add `enqueue` method) -- Modify: `tests/test_task_scheduler.py` (add tests) - -- [ ] **Step 1: Add failing tests** - -Append to `tests/test_task_scheduler.py`: - -```python -def test_enqueue_adds_taskspec_to_deque(tmp_db): - """enqueue() appends a TaskSpec to the correct per-type deque.""" - s = TaskScheduler(tmp_db, _noop_run_task) - s.enqueue(1, "cover_letter", 10, None) - s.enqueue(2, "cover_letter", 11, '{"key": "val"}') - - assert len(s._queues["cover_letter"]) == 2 - assert s._queues["cover_letter"][0].id == 1 - assert s._queues["cover_letter"][1].id == 2 - - -def test_enqueue_wakes_scheduler(tmp_db): - """enqueue() sets the _wake event so the scheduler loop re-evaluates.""" - s = TaskScheduler(tmp_db, _noop_run_task) - assert not s._wake.is_set() - s.enqueue(1, "cover_letter", 10, None) - assert s._wake.is_set() - - -def test_max_queue_depth_marks_task_failed(tmp_db): - """When queue is at max_queue_depth, dropped task is marked failed in DB.""" - from scripts.db import insert_task - - s = TaskScheduler(tmp_db, _noop_run_task) - s._max_queue_depth = 2 - - # Fill the queue to the limit via direct deque manipulation (no DB rows needed) - from scripts.task_scheduler import TaskSpec - s._queues.setdefault("cover_letter", deque()) - s._queues["cover_letter"].append(TaskSpec(99, 1, None)) - s._queues["cover_letter"].append(TaskSpec(100, 2, None)) - - # Insert a real DB row for the task we're about to drop - task_id, _ = insert_task(tmp_db, "cover_letter", 3) - - # This enqueue should be rejected and the DB row marked failed - s.enqueue(task_id, "cover_letter", 3, None) - - conn = sqlite3.connect(tmp_db) - row = conn.execute( - "SELECT status, error FROM background_tasks WHERE id=?", (task_id,) - ).fetchone() - conn.close() - - assert row[0] == "failed" - assert "depth" in row[1].lower() - # Queue length unchanged - assert len(s._queues["cover_letter"]) == 2 - - -def test_max_queue_depth_logs_warning(tmp_db, caplog): - """Queue depth overflow logs a WARNING.""" - import logging - from scripts.db import insert_task - from scripts.task_scheduler import TaskSpec - - s = TaskScheduler(tmp_db, _noop_run_task) - s._max_queue_depth = 0 # immediately at limit - - task_id, _ = insert_task(tmp_db, "cover_letter", 1) - with caplog.at_level(logging.WARNING, logger="scripts.task_scheduler"): - s.enqueue(task_id, "cover_letter", 1, None) - - assert any("depth" in r.message.lower() for r in caplog.records) -``` - -- [ ] **Step 2: Run to confirm failures** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_task_scheduler.py -v -k "enqueue or depth" -``` - -Expected: failures — `enqueue` not defined - -- [ ] **Step 3: Implement `enqueue()`** - -Add method to `TaskScheduler` (after `__init__`): - -```python -def enqueue(self, task_id: int, task_type: str, job_id: int, - params: Optional[str]) -> None: - """Add an LLM task to the scheduler queue. - - If the queue for this type is at max_queue_depth, the task is marked - failed in SQLite immediately (no ghost queued rows) and a warning is logged. - """ - from scripts.db import update_task_status - - with self._lock: - q = self._queues.setdefault(task_type, deque()) - if len(q) >= self._max_queue_depth: - logger.warning( - "Queue depth limit reached for %s (max=%d) — task %d dropped", - task_type, self._max_queue_depth, task_id, - ) - update_task_status(self._db_path, task_id, "failed", - error="Queue depth limit reached") - return - q.append(TaskSpec(task_id, job_id, params)) - - self._wake.set() -``` - -- [ ] **Step 4: Run tests to confirm they pass** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_task_scheduler.py -v -k "enqueue or depth" -``` - -Expected: 4 passed - -- [ ] **Step 5: Commit** - -```bash -git add scripts/task_scheduler.py tests/test_task_scheduler.py -git commit -m "feat(scheduler): implement enqueue() with depth guard and ghost-row cleanup" -``` - ---- - -### Task 6: Scheduler loop, batch worker, `start()`, and `shutdown()` - -The core execution engine. The scheduler loop picks the deepest eligible queue and starts a serial batch worker for it. - -**Files:** -- Modify: `scripts/task_scheduler.py` (add `start`, `shutdown`, `_scheduler_loop`, `_batch_worker`) -- Modify: `tests/test_task_scheduler.py` (add threading tests) - -- [ ] **Step 1: Add failing tests** - -Append to `tests/test_task_scheduler.py`: - -```python -# ── Threading helpers ───────────────────────────────────────────────────────── - -def _make_recording_run_task(log: list, done_event: threading.Event, expected: int): - """Returns a mock _run_task that records (task_id, task_type) and sets done when expected count reached.""" - def _run(db_path, task_id, task_type, job_id, params): - log.append((task_id, task_type)) - if len(log) >= expected: - done_event.set() - return _run - - -def _start_scheduler(tmp_db, run_task_fn, available_vram=999.0): - s = TaskScheduler(tmp_db, run_task_fn) - s._available_vram = available_vram - s.start() - return s - - -# ── Tests ───────────────────────────────────────────────────────────────────── - -def test_deepest_queue_wins_first_slot(tmp_db): - """Type with more queued tasks starts first when VRAM only fits one type.""" - log, done = [], threading.Event() - - # Build scheduler but DO NOT start it yet — enqueue all tasks first - # so the scheduler sees the full picture on its very first wake. - run_task_fn = _make_recording_run_task(log, done, 4) - s = TaskScheduler(tmp_db, run_task_fn) - s._available_vram = 3.0 # fits cover_letter (2.5) but not +company_research (5.0) - - # Enqueue cover_letter (3 tasks) and company_research (1 task) before start. - # cover_letter has the deeper queue and must win the first batch slot. - for i in range(3): - s.enqueue(i + 1, "cover_letter", i + 1, None) - s.enqueue(4, "company_research", 4, None) - - s.start() # scheduler now sees all tasks atomically on its first iteration - assert done.wait(timeout=5.0), "timed out — not all 4 tasks completed" - s.shutdown() - - assert len(log) == 4 - cl = [i for i, (_, t) in enumerate(log) if t == "cover_letter"] - cr = [i for i, (_, t) in enumerate(log) if t == "company_research"] - assert len(cl) == 3 and len(cr) == 1 - assert max(cl) < min(cr), "All cover_letter tasks must finish before company_research starts" - - -def test_fifo_within_type(tmp_db): - """Tasks of the same type execute in arrival (FIFO) order.""" - log, done = [], threading.Event() - s = _start_scheduler(tmp_db, _make_recording_run_task(log, done, 3)) - - for task_id in [10, 20, 30]: - s.enqueue(task_id, "cover_letter", task_id, None) - - assert done.wait(timeout=5.0), "timed out — not all 3 tasks completed" - s.shutdown() - - assert [task_id for task_id, _ in log] == [10, 20, 30] - - -def test_concurrent_batches_when_vram_allows(tmp_db): - """Two type batches start simultaneously when VRAM fits both.""" - started = {"cover_letter": threading.Event(), "company_research": threading.Event()} - all_done = threading.Event() - log = [] - - def run_task(db_path, task_id, task_type, job_id, params): - started[task_type].set() - log.append(task_type) - if len(log) >= 2: - all_done.set() - - # VRAM=10.0 fits both cover_letter (2.5) and company_research (5.0) simultaneously - s = _start_scheduler(tmp_db, run_task, available_vram=10.0) - s.enqueue(1, "cover_letter", 1, None) - s.enqueue(2, "company_research", 2, None) - - all_done.wait(timeout=5.0) - s.shutdown() - - # Both types should have started (possibly overlapping) - assert started["cover_letter"].is_set() - assert started["company_research"].is_set() - - -def test_new_tasks_picked_up_mid_batch(tmp_db): - """A task enqueued while a batch is running is consumed in the same batch.""" - log, done = [], threading.Event() - task1_started = threading.Event() # fires when task 1 begins executing - task2_ready = threading.Event() # fires when task 2 has been enqueued - - def run_task(db_path, task_id, task_type, job_id, params): - if task_id == 1: - task1_started.set() # signal: task 1 is now running - task2_ready.wait(timeout=2.0) # wait for task 2 to be in the deque - log.append(task_id) - if len(log) >= 2: - done.set() - - s = _start_scheduler(tmp_db, run_task) - s.enqueue(1, "cover_letter", 1, None) - task1_started.wait(timeout=2.0) # wait until task 1 is actually executing - s.enqueue(2, "cover_letter", 2, None) - task2_ready.set() # unblock task 1 so it finishes - - assert done.wait(timeout=5.0), "timed out — task 2 never picked up mid-batch" - s.shutdown() - - assert log == [1, 2] - - -def test_worker_crash_releases_vram(tmp_db): - """If _run_task raises, _reserved_vram returns to 0 and scheduler continues.""" - log, done = [], threading.Event() - - def run_task(db_path, task_id, task_type, job_id, params): - if task_id == 1: - raise RuntimeError("simulated failure") - log.append(task_id) - done.set() - - s = _start_scheduler(tmp_db, run_task, available_vram=3.0) - s.enqueue(1, "cover_letter", 1, None) - s.enqueue(2, "cover_letter", 2, None) - - assert done.wait(timeout=5.0), "timed out — task 2 never completed after task 1 crash" - s.shutdown() - - # Second task still ran, VRAM was released - assert 2 in log - assert s._reserved_vram == 0.0 -``` - -- [ ] **Step 2: Run to confirm failures** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_task_scheduler.py -v -k "batch or fifo or concurrent or mid_batch or crash" -``` - -Expected: failures — `start`, `shutdown` not defined - -- [ ] **Step 3: Implement `start()`, `shutdown()`, `_scheduler_loop()`, `_batch_worker()`** - -Add these methods to `TaskScheduler`: - -```python -def start(self) -> None: - """Start the background scheduler loop thread. Call once after construction.""" - self._thread = threading.Thread( - target=self._scheduler_loop, name="task-scheduler", daemon=True - ) - self._thread.start() - -def shutdown(self, timeout: float = 5.0) -> None: - """Signal the scheduler to stop and wait for it to exit.""" - self._stop.set() - self._wake.set() # unblock any wait() - if self._thread and self._thread.is_alive(): - self._thread.join(timeout=timeout) - -def _scheduler_loop(self) -> None: - """Main scheduler daemon — wakes on enqueue or batch completion.""" - while not self._stop.is_set(): - self._wake.wait(timeout=30) - self._wake.clear() - - with self._lock: - # Defense in depth: reap externally-killed batch threads. - # In normal operation _active.pop() runs in finally before _wake fires, - # so this reap finds nothing — no double-decrement risk. - for t, thread in list(self._active.items()): - if not thread.is_alive(): - self._reserved_vram -= self._budgets.get(t, 0.0) - del self._active[t] - - # Start new type batches while VRAM allows - candidates = sorted( - [t for t in self._queues if self._queues[t] and t not in self._active], - key=lambda t: len(self._queues[t]), - reverse=True, - ) - for task_type in candidates: - budget = self._budgets.get(task_type, 0.0) - if self._reserved_vram + budget <= self._available_vram: - thread = threading.Thread( - target=self._batch_worker, - args=(task_type,), - name=f"batch-{task_type}", - daemon=True, - ) - self._active[task_type] = thread - self._reserved_vram += budget - thread.start() - -def _batch_worker(self, task_type: str) -> None: - """Serial consumer for one task type. Runs until the type's deque is empty.""" - try: - while True: - with self._lock: - q = self._queues.get(task_type) - if not q: - break - task = q.popleft() - # _run_task is scripts.task_runner._run_task (passed at construction) - self._run_task( - self._db_path, task.id, task_type, task.job_id, task.params - ) - finally: - # Always release — even if _run_task raises. - # _active.pop here prevents the scheduler loop reap from double-decrementing. - with self._lock: - self._active.pop(task_type, None) - self._reserved_vram -= self._budgets.get(task_type, 0.0) - self._wake.set() -``` - -- [ ] **Step 4: Run tests to confirm they pass** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_task_scheduler.py -v -k "batch or fifo or concurrent or mid_batch or crash" -``` - -Expected: 5 passed - -- [ ] **Step 5: Run all scheduler tests so far** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_task_scheduler.py -v -``` - -Expected: all passing (no regressions) - -- [ ] **Step 6: Commit** - -```bash -git add scripts/task_scheduler.py tests/test_task_scheduler.py -git commit -m "feat(scheduler): implement scheduler loop and batch worker with VRAM-aware scheduling" -``` - ---- - -## Chunk 3: Integration - -Tasks 7–11. Singleton, durability, routing shim, app.py startup change, and full suite verification. - ---- - -### Task 7: Singleton — `get_scheduler()` and `reset_scheduler()` - -**Files:** -- Modify: `scripts/task_scheduler.py` (implement the two functions) -- Modify: `tests/test_task_scheduler.py` (add tests) - -- [ ] **Step 1: Add failing tests** - -Append to `tests/test_task_scheduler.py`: - -```python -def test_get_scheduler_returns_singleton(tmp_db): - """Multiple calls to get_scheduler() return the same instance.""" - s1 = get_scheduler(tmp_db, _noop_run_task) - s2 = get_scheduler(tmp_db, _noop_run_task) - assert s1 is s2 - - -def test_singleton_thread_safe(tmp_db): - """Concurrent get_scheduler() calls produce exactly one instance.""" - instances = [] - errors = [] - - def _get(): - try: - instances.append(get_scheduler(tmp_db, _noop_run_task)) - except Exception as e: - errors.append(e) - - threads = [threading.Thread(target=_get) for _ in range(20)] - for t in threads: - t.start() - for t in threads: - t.join() - - assert not errors - assert len(set(id(s) for s in instances)) == 1 # all the same object - - -def test_reset_scheduler_cleans_up(tmp_db): - """reset_scheduler() shuts down the scheduler; no threads linger.""" - s = get_scheduler(tmp_db, _noop_run_task) - thread = s._thread - assert thread.is_alive() - - reset_scheduler() - - thread.join(timeout=2.0) - assert not thread.is_alive() - - # After reset, get_scheduler creates a fresh instance - s2 = get_scheduler(tmp_db, _noop_run_task) - assert s2 is not s -``` - -- [ ] **Step 2: Run to confirm failures** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_task_scheduler.py -v -k "singleton or reset" -``` - -Expected: failures — `get_scheduler` / `reset_scheduler` raise `NotImplementedError` - -- [ ] **Step 3: Implement `get_scheduler()` and `reset_scheduler()`** - -Replace the `raise NotImplementedError` stubs at the bottom of `scripts/task_scheduler.py`: - -```python -def get_scheduler(db_path: Path, run_task_fn: Callable = None) -> TaskScheduler: - """Return the process-level TaskScheduler singleton, constructing it if needed. - - run_task_fn is required on the first call; ignored on subsequent calls. - Safety: inner lock + double-check prevents double-construction under races. - The outer None check is a fast-path performance optimisation only. - """ - global _scheduler - if _scheduler is None: # fast path — avoids lock on steady state - with _scheduler_lock: - if _scheduler is None: # re-check under lock (double-checked locking) - if run_task_fn is None: - raise ValueError("run_task_fn required on first get_scheduler() call") - _scheduler = TaskScheduler(db_path, run_task_fn) - _scheduler.start() - return _scheduler - - -def reset_scheduler() -> None: - """Shut down and clear the singleton. TEST TEARDOWN ONLY.""" - global _scheduler - with _scheduler_lock: - if _scheduler is not None: - _scheduler.shutdown() - _scheduler = None -``` - -- [ ] **Step 4: Run tests to confirm they pass** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_task_scheduler.py -v -k "singleton or reset" -``` - -Expected: 3 passed - -- [ ] **Step 5: Commit** - -```bash -git add scripts/task_scheduler.py tests/test_task_scheduler.py -git commit -m "feat(scheduler): implement thread-safe singleton get_scheduler/reset_scheduler" -``` - ---- - -### Task 8: Durability — re-queue surviving `queued` rows on startup - -On construction, the scheduler loads pre-existing `queued` LLM tasks from SQLite into deques, so they execute after restart without user re-submission. - -**Files:** -- Modify: `scripts/task_scheduler.py` (add durability query to `__init__`) -- Modify: `tests/test_task_scheduler.py` (add tests) - -- [ ] **Step 1: Add failing tests** - -Append to `tests/test_task_scheduler.py`: - -```python -def test_durability_loads_queued_llm_tasks_on_startup(tmp_db): - """Scheduler loads pre-existing queued LLM tasks into deques at construction.""" - from scripts.db import insert_task - - # Pre-insert queued rows simulating a prior run - id1, _ = insert_task(tmp_db, "cover_letter", 1) - id2, _ = insert_task(tmp_db, "company_research", 2) - - s = TaskScheduler(tmp_db, _noop_run_task) - - assert len(s._queues.get("cover_letter", [])) == 1 - assert s._queues["cover_letter"][0].id == id1 - assert len(s._queues.get("company_research", [])) == 1 - assert s._queues["company_research"][0].id == id2 - - -def test_durability_excludes_non_llm_queued_tasks(tmp_db): - """Non-LLM queued tasks are not loaded into the scheduler deques.""" - from scripts.db import insert_task - - insert_task(tmp_db, "discovery", 0) - insert_task(tmp_db, "email_sync", 0) - - s = TaskScheduler(tmp_db, _noop_run_task) - - assert "discovery" not in s._queues or len(s._queues["discovery"]) == 0 - assert "email_sync" not in s._queues or len(s._queues["email_sync"]) == 0 - - -def test_durability_preserves_fifo_order(tmp_db): - """Queued tasks are loaded in created_at (FIFO) order.""" - conn = sqlite3.connect(tmp_db) - # Insert with explicit timestamps to control order - conn.execute( - "INSERT INTO background_tasks (task_type, job_id, params, status, created_at)" - " VALUES (?,?,?,?,?)", ("cover_letter", 1, None, "queued", "2026-01-01 10:00:00") - ) - conn.execute( - "INSERT INTO background_tasks (task_type, job_id, params, status, created_at)" - " VALUES (?,?,?,?,?)", ("cover_letter", 2, None, "queued", "2026-01-01 09:00:00") - ) - conn.commit() - ids = [r[0] for r in conn.execute( - "SELECT id FROM background_tasks ORDER BY created_at ASC" - ).fetchall()] - conn.close() - - s = TaskScheduler(tmp_db, _noop_run_task) - - loaded_ids = [t.id for t in s._queues["cover_letter"]] - assert loaded_ids == ids -``` - -- [ ] **Step 2: Run to confirm failures** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_task_scheduler.py -v -k "durability" -``` - -Expected: failures — deques empty on construction (durability not implemented yet) - -- [ ] **Step 3: Add durability query to `__init__`** - -Append to the end of `TaskScheduler.__init__()` (after VRAM detection): - -```python - # Durability: reload surviving 'queued' LLM tasks from prior run - self._load_queued_tasks() -``` - -Add the private method to `TaskScheduler`: - -```python -def _load_queued_tasks(self) -> None: - """Load pre-existing queued LLM tasks from SQLite into deques (called once in __init__).""" - llm_types = sorted(LLM_TASK_TYPES) # sorted for deterministic SQL params in logs - placeholders = ",".join("?" * len(llm_types)) - conn = sqlite3.connect(self._db_path) - rows = conn.execute( - f"SELECT id, task_type, job_id, params FROM background_tasks" - f" WHERE status='queued' AND task_type IN ({placeholders})" - f" ORDER BY created_at ASC", - llm_types, - ).fetchall() - conn.close() - - for row_id, task_type, job_id, params in rows: - q = self._queues.setdefault(task_type, deque()) - q.append(TaskSpec(row_id, job_id, params)) - - if rows: - logger.info("Scheduler: resumed %d queued task(s) from prior run", len(rows)) -``` - -- [ ] **Step 4: Run tests to confirm they pass** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_task_scheduler.py -v -k "durability" -``` - -Expected: 3 passed - -- [ ] **Step 5: Commit** - -```bash -git add scripts/task_scheduler.py tests/test_task_scheduler.py -git commit -m "feat(scheduler): add durability — re-queue surviving LLM tasks on startup" -``` - ---- - -### Task 9: `submit_task()` routing shim in `task_runner.py` - -Replaces the old spawn-per-task model with scheduler routing for LLM tasks while leaving non-LLM tasks unchanged. - -**Files:** -- Modify: `scripts/task_runner.py` (`submit_task` function) -- Modify: `tests/test_task_scheduler.py` (add integration test) - -- [ ] **Step 1: Add failing test** - -Append to `tests/test_task_scheduler.py`: - -```python -def test_non_llm_tasks_bypass_scheduler(tmp_db): - """submit_task() for non-LLM types invoke _run_task directly, not enqueue().""" - from scripts import task_runner - - # Initialize the singleton properly so submit_task routes correctly - s = get_scheduler(tmp_db, _noop_run_task) - - run_task_calls = [] - enqueue_calls = [] - - original_run_task = task_runner._run_task - original_enqueue = s.enqueue - - def recording_run_task(*args, **kwargs): - run_task_calls.append(args[2]) # task_type is 3rd arg - - def recording_enqueue(task_id, task_type, job_id, params): - enqueue_calls.append(task_type) - - import unittest.mock as mock - with mock.patch.object(task_runner, "_run_task", recording_run_task), \ - mock.patch.object(s, "enqueue", recording_enqueue): - task_runner.submit_task(tmp_db, "discovery", 0) - - # discovery goes directly to _run_task; enqueue is never called - assert "discovery" not in enqueue_calls - # The scheduler deque is untouched - assert "discovery" not in s._queues or len(s._queues["discovery"]) == 0 - - -def test_llm_tasks_routed_to_scheduler(tmp_db): - """submit_task() for LLM types calls enqueue(), not _run_task directly.""" - from scripts import task_runner - - s = get_scheduler(tmp_db, _noop_run_task) - - enqueue_calls = [] - original_enqueue = s.enqueue - - import unittest.mock as mock - with mock.patch.object(s, "enqueue", side_effect=lambda *a, **kw: enqueue_calls.append(a[1]) or original_enqueue(*a, **kw)): - task_runner.submit_task(tmp_db, "cover_letter", 1) - - assert "cover_letter" in enqueue_calls -``` - -- [ ] **Step 2: Run to confirm failures** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_task_scheduler.py -v -k "bypass or routed" -``` - -Expected: failures — `submit_task` still spawns threads for all types - -- [ ] **Step 3: Update `submit_task()` in `scripts/task_runner.py`** - -Replace the existing `submit_task` function: - -```python -def submit_task(db_path: Path = DEFAULT_DB, task_type: str = "", - job_id: int = None, - params: str | None = None) -> tuple[int, bool]: - """Submit a background task. - - LLM task types (cover_letter, company_research, wizard_generate) are routed - through the TaskScheduler for VRAM-aware batch scheduling. - All other types spawn a free daemon thread as before. - - Returns (task_id, True) if a new task was queued. - Returns (existing_id, False) if an identical task is already in-flight. - """ - task_id, is_new = insert_task(db_path, task_type, job_id or 0, params=params) - if is_new: - from scripts.task_scheduler import get_scheduler, LLM_TASK_TYPES - if task_type in LLM_TASK_TYPES: - get_scheduler(db_path, run_task_fn=_run_task).enqueue( - task_id, task_type, job_id or 0, params - ) - else: - t = threading.Thread( - target=_run_task, - args=(db_path, task_id, task_type, job_id or 0, params), - daemon=True, - ) - t.start() - return task_id, is_new -``` - -- [ ] **Step 4: Run tests to confirm they pass** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_task_scheduler.py -v -k "bypass or routed" -``` - -Expected: 2 passed - -- [ ] **Step 5: Commit** - -```bash -git add scripts/task_runner.py tests/test_task_scheduler.py -git commit -m "feat(task_runner): route LLM tasks through scheduler in submit_task()" -``` - ---- - -### Task 10: `app.py` startup — replace inline SQL with `reset_running_tasks()` - -Enables durability by leaving `queued` rows intact on restart. - -**Files:** -- Modify: `app/app.py` (`_startup` function) - -- [ ] **Step 1: Locate the exact lines to change in `app/app.py`** - -The block to replace is inside `_startup()`. It looks like: - -```python -conn.execute( - "UPDATE background_tasks SET status='failed', error='Interrupted by server restart'," - " finished_at=datetime('now') WHERE status IN ('queued','running')" -) -conn.commit() -``` - -- [ ] **Step 2: Replace the inline SQL block** - -In `app/app.py`, find `_startup()`. At the start of the function body, **before** the existing `conn = sqlite3.connect(get_db_path())` block, add: - -```python - # Reset only in-flight tasks — queued tasks survive for the scheduler to resume. - # MUST run before any submit_task() call in this function. - from scripts.db import reset_running_tasks - reset_running_tasks(get_db_path()) -``` - -Then delete the inline SQL block and its `conn.commit()` call. Leave the `conn = sqlite3.connect(...)` that follows (used by the SearXNG re-queue logic) untouched. - -The result should look like: - -```python -@st.cache_resource -def _startup() -> None: - """Runs exactly once per server lifetime (st.cache_resource). - 1. Marks zombie tasks as failed. - 2. Auto-queues re-runs for any research generated without SearXNG data, - if SearXNG is now reachable. - """ - # Reset only in-flight tasks — queued tasks survive for the scheduler to resume. - # MUST run before any submit_task() call in this function. - from scripts.db import reset_running_tasks - reset_running_tasks(get_db_path()) - - conn = sqlite3.connect(get_db_path()) - # ... remainder of function unchanged ... -``` - -- [ ] **Step 3: Verify the app module has valid syntax** - -```bash -conda run -n job-seeker python -m py_compile app/app.py && echo "syntax ok" -``` - -Expected: `syntax ok` (avoids executing Streamlit module-level code which would fail outside a server context) - -- [ ] **Step 4: Commit** - -```bash -git add app/app.py -git commit -m "feat(app): use reset_running_tasks() on startup to preserve queued tasks" -``` - ---- - -### Task 11: Full suite verification - -Run the complete test suite against the baseline (pre-existing failure already documented in issue #12). - -**Files:** none — verification only - -- [ ] **Step 1: Run the full test suite excluding the known pre-existing failure** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/ -v -k "not test_generate_calls_llm_router" 2>&1 | tail -10 -``` - -Expected: `N passed` with zero failures. Any failure here is a regression introduced by this feature. - -- [ ] **Step 1b: Confirm the pre-existing failure still exists (and only that one)** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/ -v 2>&1 | grep -E "FAILED|passed|failed" | tail -5 -``` - -Expected: exactly `1 failed` (the pre-existing `test_generate_calls_llm_router`, tracked in issue #12) - -- [ ] **Step 2: Verify no regressions in task runner tests** - -```bash -/devl/miniconda3/envs/job-seeker/bin/pytest tests/ -v -k "task_runner or task_scheduler" 2>&1 | tail -20 -``` - -Expected: all passing - -- [ ] **Step 3: Final commit — update branch with feature complete marker** - -```bash -git commit --allow-empty -m "feat: LLM queue optimizer complete — closes #2 - -Resource-aware batch scheduler for LLM tasks: -- scripts/task_scheduler.py (new): TaskScheduler singleton with VRAM-aware - batch scheduling, durability, thread-safe singleton, memory safety -- scripts/task_runner.py: submit_task() routes LLM types through scheduler -- scripts/db.py: reset_running_tasks() for durable restart behavior -- app/app.py: _startup() preserves queued tasks on restart -- config/llm.yaml.example: scheduler VRAM budget config documented -- tests/test_task_scheduler.py (new): 13 tests covering all behaviors - -Pre-existing failure: test_generate_calls_llm_router (issue #12, unrelated)" -``` diff --git a/docs/superpowers/plans/2026-03-15-jobgether-integration.md b/docs/superpowers/plans/2026-03-15-jobgether-integration.md deleted file mode 100644 index b08ffa2..0000000 --- a/docs/superpowers/plans/2026-03-15-jobgether-integration.md +++ /dev/null @@ -1,700 +0,0 @@ -# Jobgether Integration Implementation Plan - -> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Filter Jobgether listings out of all other scrapers, add a dedicated Jobgether scraper and URL scraper (Playwright-based), and add recruiter-aware cover letter framing for Jobgether jobs. - -**Architecture:** Blocklist config handles filtering with zero code changes. A new `_scrape_jobgether()` in `scrape_url.py` handles manual URL imports via Playwright with URL slug fallback. A new `scripts/custom_boards/jobgether.py` handles discovery. Cover letter framing is an `is_jobgether` flag threaded from `task_runner.py` → `generate()` → `build_prompt()`. - -**Tech Stack:** Python, Playwright (already installed), SQLite, PyTest, YAML config - -**Spec:** `/Library/Development/CircuitForge/peregrine/docs/superpowers/specs/2026-03-15-jobgether-integration-design.md` - ---- - -## Worktree Setup - -- [ ] **Create worktree for this feature** - -```bash -cd /Library/Development/CircuitForge/peregrine -git worktree add .worktrees/jobgether-integration -b feature/jobgether-integration -``` - -All implementation work happens in `/Library/Development/CircuitForge/peregrine/.worktrees/jobgether-integration/`. - ---- - -## Chunk 1: Blocklist filter + scrape_url.py - -### Task 1: Add Jobgether to blocklist - -**Files:** -- Modify: `/Library/Development/CircuitForge/peregrine/config/blocklist.yaml` - -- [ ] **Step 1: Edit blocklist.yaml** - -```yaml -companies: - - jobgether -``` - -- [ ] **Step 2: Verify the existing `_is_blocklisted` test passes (or write one)** - -Check `/Library/Development/CircuitForge/peregrine/tests/test_discover.py` for existing blocklist tests. If none cover company matching, add: - -```python -def test_is_blocklisted_jobgether(): - from scripts.discover import _is_blocklisted - blocklist = {"companies": ["jobgether"], "industries": [], "locations": []} - assert _is_blocklisted({"company": "Jobgether", "location": "", "description": ""}, blocklist) - assert _is_blocklisted({"company": "jobgether inc", "location": "", "description": ""}, blocklist) - assert not _is_blocklisted({"company": "Acme Corp", "location": "", "description": ""}, blocklist) -``` - -Run: `conda run -n job-seeker python -m pytest tests/test_discover.py -v -k "blocklist"` -Expected: PASS - -- [ ] **Step 3: Commit** - -```bash -git add config/blocklist.yaml tests/test_discover.py -git commit -m "feat: filter Jobgether listings via blocklist" -``` - ---- - -### Task 2: Add Jobgether detection to scrape_url.py - -**Files:** -- Modify: `/Library/Development/CircuitForge/peregrine/scripts/scrape_url.py` -- Modify: `/Library/Development/CircuitForge/peregrine/tests/test_scrape_url.py` - -- [ ] **Step 1: Write failing tests** - -In `/Library/Development/CircuitForge/peregrine/tests/test_scrape_url.py`, add: - -```python -def test_detect_board_jobgether(): - from scripts.scrape_url import _detect_board - assert _detect_board("https://jobgether.com/offer/69b42d9d24d79271ee0618e8-csm---resware") == "jobgether" - assert _detect_board("https://www.jobgether.com/offer/abc-role---company") == "jobgether" - - -def test_jobgether_slug_company_extraction(): - from scripts.scrape_url import _company_from_jobgether_url - assert _company_from_jobgether_url( - "https://jobgether.com/offer/69b42d9d24d79271ee0618e8-customer-success-manager---resware" - ) == "Resware" - assert _company_from_jobgether_url( - "https://jobgether.com/offer/abc123-director-of-cs---acme-corp" - ) == "Acme Corp" - assert _company_from_jobgether_url( - "https://jobgether.com/offer/abc123-no-separator-here" - ) == "" - - -def test_scrape_jobgether_no_playwright(tmp_path): - """When Playwright is unavailable, _scrape_jobgether falls back to URL slug for company.""" - # Patch playwright.sync_api to None in sys.modules so the local import inside - # _scrape_jobgether raises ImportError at call time (local imports run at call time, - # not at module load time — so no reload needed). - import sys - import unittest.mock as mock - - url = "https://jobgether.com/offer/69b42d9d24d79271ee0618e8-customer-success-manager---resware" - with mock.patch.dict(sys.modules, {"playwright": None, "playwright.sync_api": None}): - from scripts.scrape_url import _scrape_jobgether - result = _scrape_jobgether(url) - - assert result.get("company") == "Resware" - assert result.get("source") == "jobgether" -``` - -Run: `conda run -n job-seeker python -m pytest tests/test_scrape_url.py::test_detect_board_jobgether tests/test_scrape_url.py::test_jobgether_slug_company_extraction tests/test_scrape_url.py::test_scrape_jobgether_no_playwright -v` -Expected: FAIL (functions not yet defined) - -- [ ] **Step 2: Add `_company_from_jobgether_url()` to scrape_url.py** - -Add after the `_STRIP_PARAMS` block (around line 34): - -```python -def _company_from_jobgether_url(url: str) -> str: - """Extract company name from Jobgether offer URL slug. - - Slug format: /offer/{24-hex-hash}-{title-slug}---{company-slug} - Triple-dash separator delimits title from company. - Returns title-cased company name, or "" if pattern not found. - """ - m = re.search(r"---([^/?]+)$", urlparse(url).path) - if not m: - print(f"[scrape_url] Jobgether URL slug: no company separator found in {url}") - return "" - return m.group(1).replace("-", " ").title() -``` - -- [ ] **Step 3: Add `"jobgether"` branch to `_detect_board()`** - -In `/Library/Development/CircuitForge/peregrine/scripts/scrape_url.py`, modify `_detect_board()` (add before `return "generic"`): - -```python - if "jobgether.com" in url_lower: - return "jobgether" -``` - -- [ ] **Step 4: Add `_scrape_jobgether()` function** - -Add after `_scrape_glassdoor()` (around line 137): - -```python -def _scrape_jobgether(url: str) -> dict: - """Scrape a Jobgether offer page using Playwright to bypass 403. - - Falls back to URL slug for company name when Playwright is unavailable. - Does not use requests — no raise_for_status(). - """ - try: - from playwright.sync_api import sync_playwright - except ImportError: - company = _company_from_jobgether_url(url) - if company: - print(f"[scrape_url] Jobgether: Playwright not installed, using slug fallback → {company}") - return {"company": company, "source": "jobgether"} if company else {} - - try: - with sync_playwright() as p: - browser = p.chromium.launch(headless=True) - try: - ctx = browser.new_context(user_agent=_HEADERS["User-Agent"]) - page = ctx.new_page() - page.goto(url, timeout=30_000) - page.wait_for_load_state("networkidle", timeout=20_000) - - result = page.evaluate("""() => { - const title = document.querySelector('h1')?.textContent?.trim() || ''; - const company = document.querySelector('[class*="company"], [class*="employer"], [data-testid*="company"]') - ?.textContent?.trim() || ''; - const location = document.querySelector('[class*="location"], [data-testid*="location"]') - ?.textContent?.trim() || ''; - const desc = document.querySelector('[class*="description"], [class*="job-desc"], article') - ?.innerText?.trim() || ''; - return { title, company, location, description: desc }; - }""") - finally: - browser.close() - - # Fall back to slug for company if DOM extraction missed it - if not result.get("company"): - result["company"] = _company_from_jobgether_url(url) - - result["source"] = "jobgether" - return {k: v for k, v in result.items() if v} - - except Exception as exc: - print(f"[scrape_url] Jobgether Playwright error for {url}: {exc}") - # Last resort: slug fallback - company = _company_from_jobgether_url(url) - return {"company": company, "source": "jobgether"} if company else {} -``` - -> ⚠️ **The CSS selectors in the `page.evaluate()` call are placeholders.** Before committing, inspect `https://jobgether.com/offer/` in a browser to find the actual class names for title, company, location, and description. Update the selectors accordingly. - -- [ ] **Step 5: Add dispatch branch in `scrape_job_url()`** - -In the `if board == "linkedin":` dispatch chain (around line 208), add before the `else`: - -```python - elif board == "jobgether": - fields = _scrape_jobgether(url) -``` - -- [ ] **Step 6: Run tests to verify they pass** - -Run: `conda run -n job-seeker python -m pytest tests/test_scrape_url.py -v` -Expected: All PASS (including pre-existing tests) - -- [ ] **Step 7: Commit** - -```bash -git add scripts/scrape_url.py tests/test_scrape_url.py -git commit -m "feat: add Jobgether URL detection and scraper to scrape_url.py" -``` - ---- - -## Chunk 2: Jobgether custom board scraper - -> ⚠️ **Pre-condition:** Before writing the scraper, inspect `https://jobgether.com/remote-jobs` live to determine the actual URL/filter param format and DOM card selectors. Use the Playwright MCP browser tool or Chrome devtools. Record: (1) the query param for job title search, (2) the job card CSS selectors for title, company, URL, location, salary. - -### Task 3: Inspect Jobgether search live - -**Files:** None (research step) - -- [ ] **Step 1: Navigate to Jobgether remote jobs and inspect search params** - -Using browser devtools or Playwright network capture, navigate to `https://jobgether.com/remote-jobs`, search for "Customer Success Manager", and capture: -- The resulting URL (query params) -- Network requests (XHR/fetch) if the page uses API calls -- CSS selectors for job card elements - -Record findings here before proceeding. - -- [ ] **Step 2: Test a Playwright page.evaluate() extraction manually** - -```python -# Run interactively to validate selectors -from playwright.sync_api import sync_playwright -with sync_playwright() as p: - browser = p.chromium.launch(headless=False) # headless=False to see the page - page = browser.new_page() - page.goto("https://jobgether.com/remote-jobs") - page.wait_for_load_state("networkidle") - # Test your selectors here - cards = page.query_selector_all("[YOUR_CARD_SELECTOR]") - print(len(cards)) - browser.close() -``` - ---- - -### Task 4: Write jobgether.py scraper - -**Files:** -- Create: `/Library/Development/CircuitForge/peregrine/scripts/custom_boards/jobgether.py` -- Modify: `/Library/Development/CircuitForge/peregrine/tests/test_discover.py` (or create `tests/test_jobgether.py`) - -- [ ] **Step 1: Write failing test** - -In `/Library/Development/CircuitForge/peregrine/tests/test_discover.py` (or a new `tests/test_jobgether.py`): - -```python -def test_jobgether_scraper_returns_empty_on_missing_playwright(monkeypatch): - """Graceful fallback when Playwright is unavailable.""" - import scripts.custom_boards.jobgether as jg - monkeypatch.setattr("scripts.custom_boards.jobgether.sync_playwright", None) - result = jg.scrape({"titles": ["Customer Success Manager"]}, "Remote", results_wanted=5) - assert result == [] - - -def test_jobgether_scraper_respects_results_wanted(monkeypatch): - """Scraper caps results at results_wanted.""" - import scripts.custom_boards.jobgether as jg - - fake_jobs = [ - {"title": f"CSM {i}", "href": f"/offer/abc{i}-csm---acme", "company": f"Acme {i}", - "location": "Remote", "is_remote": True, "salary": ""} - for i in range(20) - ] - - class FakePage: - def goto(self, *a, **kw): pass - def wait_for_load_state(self, *a, **kw): pass - def evaluate(self, _): return fake_jobs - - class FakeCtx: - def new_page(self): return FakePage() - - class FakeBrowser: - def new_context(self, **kw): return FakeCtx() - def close(self): pass - - class FakeChromium: - def launch(self, **kw): return FakeBrowser() - - class FakeP: - chromium = FakeChromium() - def __enter__(self): return self - def __exit__(self, *a): pass - - monkeypatch.setattr("scripts.custom_boards.jobgether.sync_playwright", lambda: FakeP()) - result = jg.scrape({"titles": ["CSM"]}, "Remote", results_wanted=5) - assert len(result) <= 5 -``` - -Run: `conda run -n job-seeker python -m pytest tests/ -v -k "jobgether"` -Expected: FAIL (module not found) - -- [ ] **Step 2: Create `scripts/custom_boards/jobgether.py`** - -```python -"""Jobgether scraper — Playwright-based (requires chromium installed). - -Jobgether (jobgether.com) is a remote-work job aggregator. It blocks plain -requests with 403, so we use Playwright to render the page and extract cards. - -Install Playwright: conda run -n job-seeker pip install playwright && - conda run -n job-seeker python -m playwright install chromium - -Returns a list of dicts compatible with scripts.db.insert_job(). -""" -from __future__ import annotations - -import re -import time -from typing import Any - -_BASE = "https://jobgether.com" -_SEARCH_PATH = "/remote-jobs" - -# TODO: Replace with confirmed query param key after live inspection (Task 3) -_QUERY_PARAM = "search" - -# Module-level import so tests can monkeypatch scripts.custom_boards.jobgether.sync_playwright -try: - from playwright.sync_api import sync_playwright -except ImportError: - sync_playwright = None - - -def scrape(profile: dict, location: str, results_wanted: int = 50) -> list[dict]: - """ - Scrape job listings from Jobgether using Playwright. - - Args: - profile: Search profile dict (uses 'titles'). - location: Location string — Jobgether is remote-focused; location used - only if the site exposes a location filter. - results_wanted: Maximum results to return across all titles. - - Returns: - List of job dicts with keys: title, company, url, source, location, - is_remote, salary, description. - """ - if sync_playwright is None: - print( - " [jobgether] playwright not installed.\n" - " Install: conda run -n job-seeker pip install playwright && " - "conda run -n job-seeker python -m playwright install chromium" - ) - return [] - - results: list[dict] = [] - seen_urls: set[str] = set() - - with sync_playwright() as p: - browser = p.chromium.launch(headless=True) - ctx = browser.new_context( - user_agent=( - "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " - "(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" - ) - ) - page = ctx.new_page() - - for title in profile.get("titles", []): - if len(results) >= results_wanted: - break - - # TODO: Confirm URL param format from live inspection (Task 3) - url = f"{_BASE}{_SEARCH_PATH}?{_QUERY_PARAM}={title.replace(' ', '+')}" - - try: - page.goto(url, timeout=30_000) - page.wait_for_load_state("networkidle", timeout=20_000) - except Exception as exc: - print(f" [jobgether] Page load error for '{title}': {exc}") - continue - - # TODO: Replace JS selector with confirmed card selector from Task 3 - try: - raw_jobs: list[dict[str, Any]] = page.evaluate(_extract_jobs_js()) - except Exception as exc: - print(f" [jobgether] JS extract error for '{title}': {exc}") - continue - - if not raw_jobs: - print(f" [jobgether] No cards found for '{title}' — selector may need updating") - continue - - for job in raw_jobs: - href = job.get("href", "") - if not href: - continue - full_url = _BASE + href if href.startswith("/") else href - if full_url in seen_urls: - continue - seen_urls.add(full_url) - - results.append({ - "title": job.get("title", ""), - "company": job.get("company", ""), - "url": full_url, - "source": "jobgether", - "location": job.get("location") or "Remote", - "is_remote": True, # Jobgether is remote-focused - "salary": job.get("salary") or "", - "description": "", # not in card view; scrape_url fills in - }) - - if len(results) >= results_wanted: - break - - time.sleep(1) # polite pacing between titles - - browser.close() - - return results[:results_wanted] - - -def _extract_jobs_js() -> str: - """JS to run in page context — extracts job data from rendered card elements. - - TODO: Replace selectors with confirmed values from Task 3 live inspection. - """ - return """() => { - // TODO: replace '[class*=job-card]' with confirmed card selector - const cards = document.querySelectorAll('[class*="job-card"], [data-testid*="job"]'); - return Array.from(cards).map(card => { - // TODO: replace these selectors with confirmed values - const titleEl = card.querySelector('h2, h3, [class*="title"]'); - const companyEl = card.querySelector('[class*="company"], [class*="employer"]'); - const linkEl = card.querySelector('a'); - const salaryEl = card.querySelector('[class*="salary"]'); - const locationEl = card.querySelector('[class*="location"]'); - return { - title: titleEl ? titleEl.textContent.trim() : null, - company: companyEl ? companyEl.textContent.trim() : null, - href: linkEl ? linkEl.getAttribute('href') : null, - salary: salaryEl ? salaryEl.textContent.trim() : null, - location: locationEl ? locationEl.textContent.trim() : null, - is_remote: true, - }; - }).filter(j => j.title && j.href); - }""" -``` - -- [ ] **Step 3: Run tests** - -Run: `conda run -n job-seeker python -m pytest tests/ -v -k "jobgether"` -Expected: PASS - -- [ ] **Step 4: Commit** - -```bash -git add scripts/custom_boards/jobgether.py tests/test_discover.py -git commit -m "feat: add Jobgether custom board scraper (selectors pending live inspection)" -``` - ---- - -## Chunk 3: Registration, config, cover letter framing - -### Task 5: Register scraper in discover.py + update search_profiles.yaml - -**Files:** -- Modify: `/Library/Development/CircuitForge/peregrine/scripts/discover.py` -- Modify: `/Library/Development/CircuitForge/peregrine/config/search_profiles.yaml` -- Modify: `/Library/Development/CircuitForge/peregrine/config/search_profiles.yaml.example` (if it exists) - -- [ ] **Step 1: Add import to discover.py import block (lines 20–22)** - -`jobgether.py` absorbs the Playwright `ImportError` internally (module-level `try/except`), so it always imports successfully. Match the existing pattern exactly: - -```python -from scripts.custom_boards import jobgether as _jobgether -``` - -- [ ] **Step 2: Add to CUSTOM_SCRAPERS dict literal (lines 30–34)** - -```python -CUSTOM_SCRAPERS: dict[str, object] = { - "adzuna": _adzuna.scrape, - "theladders": _theladders.scrape, - "craigslist": _craigslist.scrape, - "jobgether": _jobgether.scrape, -} -``` - -When Playwright is absent, `_jobgether.scrape()` returns `[]` gracefully — no special guard needed in `discover.py`. - -- [ ] **Step 3: Add `jobgether` to remote-eligible profiles in search_profiles.yaml** - -Add `- jobgether` to the `custom_boards` list for every profile that has `Remote` in its `locations`. Based on the current file, that means: `cs_leadership`, `music_industry`, `animal_welfare`, `education`. Do NOT add it to `default` (locations: San Francisco CA only). - -- [ ] **Step 4: Run discover tests** - -Run: `conda run -n job-seeker python -m pytest tests/test_discover.py -v` -Expected: All PASS - -- [ ] **Step 5: Commit** - -```bash -git add scripts/discover.py config/search_profiles.yaml -git commit -m "feat: register Jobgether scraper and add to remote search profiles" -``` - ---- - -### Task 6: Cover letter recruiter framing - -**Files:** -- Modify: `/Library/Development/CircuitForge/peregrine/scripts/generate_cover_letter.py` -- Modify: `/Library/Development/CircuitForge/peregrine/scripts/task_runner.py` -- Modify: `/Library/Development/CircuitForge/peregrine/tests/test_match.py` or add `tests/test_cover_letter.py` - -- [ ] **Step 1: Write failing test** - -Create or add to `/Library/Development/CircuitForge/peregrine/tests/test_cover_letter.py`: - -```python -def test_build_prompt_jobgether_framing_unknown_company(): - from scripts.generate_cover_letter import build_prompt - prompt = build_prompt( - title="Customer Success Manager", - company="Jobgether", - description="CSM role at an undisclosed company.", - examples=[], - is_jobgether=True, - ) - assert "Your client" in prompt - assert "recruiter" in prompt.lower() or "jobgether" in prompt.lower() - - -def test_build_prompt_jobgether_framing_known_company(): - from scripts.generate_cover_letter import build_prompt - prompt = build_prompt( - title="Customer Success Manager", - company="Resware", - description="CSM role at Resware.", - examples=[], - is_jobgether=True, - ) - assert "Your client at Resware" in prompt - - -def test_build_prompt_no_jobgether_framing_by_default(): - from scripts.generate_cover_letter import build_prompt - prompt = build_prompt( - title="Customer Success Manager", - company="Acme Corp", - description="CSM role.", - examples=[], - ) - assert "Your client" not in prompt -``` - -Run: `conda run -n job-seeker python -m pytest tests/test_cover_letter.py -v` -Expected: FAIL - -- [ ] **Step 2: Add `is_jobgether` to `build_prompt()` in generate_cover_letter.py** - -Modify the `build_prompt()` signature (line 186): - -```python -def build_prompt( - title: str, - company: str, - description: str, - examples: list[dict], - mission_hint: str | None = None, - is_jobgether: bool = False, -) -> str: -``` - -Add the recruiter hint block after the `mission_hint` block (after line 203): - -```python - if is_jobgether: - if company and company.lower() != "jobgether": - recruiter_note = ( - f"🤝 Recruiter context: This listing is posted by Jobgether on behalf of " - f"{company}. Address the cover letter to the Jobgether recruiter, not directly " - f"to the hiring company. Use framing like 'Your client at {company} will " - f"appreciate...' rather than addressing {company} directly. The role " - f"requirements are those of the actual employer." - ) - else: - recruiter_note = ( - "🤝 Recruiter context: This listing is posted by Jobgether on behalf of an " - "undisclosed employer. Address the cover letter to the Jobgether recruiter. " - "Use framing like 'Your client will appreciate...' rather than addressing " - "the company directly." - ) - parts.append(f"{recruiter_note}\n") -``` - -- [ ] **Step 3: Add `is_jobgether` to `generate()` signature** - -Modify `generate()` (line 233): - -```python -def generate( - title: str, - company: str, - description: str = "", - previous_result: str = "", - feedback: str = "", - is_jobgether: bool = False, - _router=None, -) -> str: -``` - -Pass it through to `build_prompt()` (line 254): - -```python - prompt = build_prompt(title, company, description, examples, - mission_hint=mission_hint, is_jobgether=is_jobgether) -``` - -- [ ] **Step 4: Pass `is_jobgether` from task_runner.py** - -In `/Library/Development/CircuitForge/peregrine/scripts/task_runner.py`, modify the `generate()` call inside the `cover_letter` task block (`elif task_type == "cover_letter":` starts at line 152; the `generate()` call is at ~line 156): - -```python - elif task_type == "cover_letter": - import json as _json - p = _json.loads(params or "{}") - from scripts.generate_cover_letter import generate - result = generate( - job.get("title", ""), - job.get("company", ""), - job.get("description", ""), - previous_result=p.get("previous_result", ""), - feedback=p.get("feedback", ""), - is_jobgether=job.get("source") == "jobgether", - ) - update_cover_letter(db_path, job_id, result) -``` - -- [ ] **Step 5: Run tests** - -Run: `conda run -n job-seeker python -m pytest tests/test_cover_letter.py -v` -Expected: All PASS - -- [ ] **Step 6: Run full test suite** - -Run: `conda run -n job-seeker python -m pytest tests/ -v` -Expected: All PASS - -- [ ] **Step 7: Commit** - -```bash -git add scripts/generate_cover_letter.py scripts/task_runner.py tests/test_cover_letter.py -git commit -m "feat: add Jobgether recruiter framing to cover letter generation" -``` - ---- - -## Final: Merge - -- [ ] **Merge worktree branch to main** - -```bash -cd /Library/Development/CircuitForge/peregrine -git merge feature/jobgether-integration -git worktree remove .worktrees/jobgether-integration -``` - -- [ ] **Push to remote** - -```bash -git push origin main -``` - ---- - -## Manual verification after merge - -1. Add the stuck Jobgether manual import (job 2286) — delete the old stuck row and re-add the URL via "Add Jobs by URL" in the Home page. Verify the scraper resolves company = "Resware". -2. Run a short discovery (`discover.py` with `results_per_board: 5`) and confirm no `company="Jobgether"` rows appear in `staging.db`. -3. Generate a cover letter for a Jobgether-sourced job and confirm recruiter framing appears. diff --git a/docs/superpowers/plans/2026-03-16-e2e-test-harness.md b/docs/superpowers/plans/2026-03-16-e2e-test-harness.md deleted file mode 100644 index 75d8726..0000000 --- a/docs/superpowers/plans/2026-03-16-e2e-test-harness.md +++ /dev/null @@ -1,1572 +0,0 @@ -# E2E Test Harness Implementation Plan - -> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Build a multi-mode Playwright/pytest E2E harness that smoke-tests every Peregrine page and audits every interactable element across demo, cloud, and local instances, reporting unexpected errors and expected-failure regressions. - -**Architecture:** Mode-parameterized pytest suite under `tests/e2e/` isolated from unit tests. Each mode (demo/cloud/local) declares its base URL, auth setup, and expected-failure patterns. A shared `conftest.py` provides Streamlit-aware helpers (settle waiter, DOM error scanner, console capture). Smoke pass checks pages on load; interaction pass dynamically discovers and clicks every button/tab/select, diffing errors before/after each click. - -**Tech Stack:** Python 3.11, pytest, pytest-playwright, playwright (Chromium), pytest-json-report, python-dotenv. All installed in existing `job-seeker` conda env. - ---- - -## File Map - -| Action | Path | Responsibility | -|--------|------|----------------| -| Create | `tests/e2e/__init__.py` | Package marker | -| Create | `tests/e2e/conftest.py` | `--mode` option, browser fixture, Streamlit helpers, cloud auth | -| Create | `tests/e2e/models.py` | `ErrorRecord` dataclass, `ModeConfig` dataclass | -| Create | `tests/e2e/modes/__init__.py` | Package marker | -| Create | `tests/e2e/modes/demo.py` | Demo mode config (port 8504, expected_failures list) | -| Create | `tests/e2e/modes/cloud.py` | Cloud mode config (port 8505, Directus JWT auth) | -| Create | `tests/e2e/modes/local.py` | Local mode config (port 8502, no auth) | -| Create | `tests/e2e/pages/__init__.py` | Package marker | -| Create | `tests/e2e/pages/base_page.py` | `BasePage`: navigate, error scan, screenshot on fail | -| Create | `tests/e2e/pages/home_page.py` | Home page object + interactable inventory | -| Create | `tests/e2e/pages/job_review_page.py` | Job Review page object | -| Create | `tests/e2e/pages/apply_page.py` | Apply Workspace page object | -| Create | `tests/e2e/pages/interviews_page.py` | Interviews kanban page object | -| Create | `tests/e2e/pages/interview_prep_page.py` | Interview Prep page object | -| Create | `tests/e2e/pages/survey_page.py` | Survey Assistant page object | -| Create | `tests/e2e/pages/settings_page.py` | Settings page object (tab-aware) | -| Create | `tests/e2e/test_smoke.py` | Parametrized smoke pass | -| Create | `tests/e2e/test_interactions.py` | Parametrized interaction pass | -| Create | `tests/e2e/results/.gitkeep` | Keeps results dir in git; outputs gitignored | -| Create | `compose.e2e.yml` | Cloud instance E2E overlay (informational env vars) | -| Modify | `pytest.ini` | Add `--ignore=tests/e2e` to `addopts` | -| Modify | `requirements.txt` | Add pytest-playwright, pytest-json-report | - -**Unit tests for helpers live at:** `tests/e2e/test_helpers.py` — tests for `diff_errors`, `ErrorRecord`, `ModeConfig`, fnmatch pattern validation, and JWT auth logic (mocked). - ---- - -## Task 0: Virtual Display Setup (Xvfb) - -**Files:** -- Modify: `manage.sh` (add `xvfb-run` wrapper for headed E2E sessions) - -Heimdall has no physical display. Playwright runs headless by default (no display needed), but headed mode for debugging requires a virtual framebuffer. This is the same Xvfb setup planned for browser-based scraping — set it up once here. - -- [ ] **Step 1: Check if Xvfb is installed** - -```bash -which Xvfb && Xvfb -help 2>&1 | head -3 -``` - -If missing: -```bash -sudo apt-get install -y xvfb -``` - -- [ ] **Step 2: Verify `pyvirtualdisplay` is available (optional Python wrapper)** - -```bash -conda run -n job-seeker python -c "from pyvirtualdisplay import Display; print('ok')" 2>/dev/null || \ - conda run -n job-seeker pip install pyvirtualdisplay && echo "installed" -``` - -- [ ] **Step 3: Add `xvfb-run` wrapper to manage.sh e2e subcommand** - -When `E2E_HEADLESS=false`, wrap the pytest call with `xvfb-run`: - -```bash -e2e) - MODE="${2:-demo}" - RESULTS_DIR="tests/e2e/results/${MODE}" - mkdir -p "${RESULTS_DIR}" - HEADLESS="${E2E_HEADLESS:-true}" - if [ "$HEADLESS" = "false" ]; then - RUNNER="xvfb-run --auto-servernum --server-args='-screen 0 1280x900x24'" - else - RUNNER="" - fi - $RUNNER conda run -n job-seeker pytest tests/e2e/ \ - --mode="${MODE}" \ - --json-report \ - --json-report-file="${RESULTS_DIR}/report.json" \ - -v "${@:3}" - ;; -``` - -- [ ] **Step 4: Test headless mode works (no display needed)** - -```bash -conda run -n job-seeker python -c " -from playwright.sync_api import sync_playwright -with sync_playwright() as p: - b = p.chromium.launch(headless=True) - page = b.new_page() - page.goto('about:blank') - b.close() - print('headless ok') -" -``` - -Expected: `headless ok` - -- [ ] **Step 5: Test headed mode via xvfb-run** - -```bash -xvfb-run --auto-servernum conda run -n job-seeker python -c " -from playwright.sync_api import sync_playwright -with sync_playwright() as p: - b = p.chromium.launch(headless=False) - page = b.new_page() - page.goto('about:blank') - title = page.title() - b.close() - print('headed ok, title:', title) -" -``` - -Expected: `headed ok, title: ` - -- [ ] **Step 6: Commit** - -```bash -git add manage.sh -git commit -m "chore(e2e): add xvfb-run wrapper for headed debugging sessions" -``` - ---- - -## Task 1: Install Dependencies + Scaffold Structure - -**Files:** -- Modify: `requirements.txt` -- Modify: `pytest.ini` -- Create: `tests/e2e/__init__.py`, `tests/e2e/modes/__init__.py`, `tests/e2e/pages/__init__.py`, `tests/e2e/results/.gitkeep` - -- [ ] **Step 1: Install new packages into conda env** - -```bash -conda run -n job-seeker pip install pytest-playwright pytest-json-report -conda run -n job-seeker playwright install chromium -``` - -Expected: `playwright install chromium` downloads ~200MB Chromium binary. No errors. - -- [ ] **Step 2: Verify playwright is importable** - -```bash -conda run -n job-seeker python -c "from playwright.sync_api import sync_playwright; print('ok')" -conda run -n job-seeker python -c "import pytest_playwright; print('ok')" -``` - -Expected: both print `ok`. - -- [ ] **Step 3: Add deps to requirements.txt** - -Add after the `playwright>=1.40` line (already present for LinkedIn scraper): - -``` -pytest-playwright>=0.4 -pytest-json-report>=1.5 -``` - -- [ ] **Step 4: Isolate E2E from unit tests** - -`test_helpers.py` (unit tests for models/helpers) must be reachable by `pytest tests/` -without triggering E2E browser tests. Put it at `tests/test_e2e_helpers.py` — inside -`tests/` but outside `tests/e2e/`. The browser-dependent tests (`test_smoke.py`, -`test_interactions.py`) live in `tests/e2e/` and are only collected when explicitly -targeted with `pytest tests/e2e/ --mode=`. - -Add a `tests/e2e/conftest.py` guard that skips E2E collection if `--mode` is not -provided (belt-and-suspenders — prevents accidental collection if someone runs -`pytest tests/e2e/` without `--mode`): - -```python -# at top of tests/e2e/conftest.py — added in Task 4 -def pytest_collection_modifyitems(config, items): - if not config.getoption("--mode", default=None): - skip = pytest.mark.skip(reason="E2E tests require --mode flag") - for item in items: - item.add_marker(skip) -``` - -Note: `test_helpers.py` in the file map above refers to `tests/test_e2e_helpers.py`. -Update the file map entry accordingly. - -- [ ] **Step 5: Create directory skeleton** - -```bash -mkdir -p /Library/Development/CircuitForge/peregrine/tests/e2e/modes -mkdir -p /Library/Development/CircuitForge/peregrine/tests/e2e/pages -mkdir -p /Library/Development/CircuitForge/peregrine/tests/e2e/results -touch tests/e2e/__init__.py -touch tests/e2e/modes/__init__.py -touch tests/e2e/pages/__init__.py -touch tests/e2e/results/.gitkeep -``` - -- [ ] **Step 6: Add results output to .gitignore** - -Add to `.gitignore`: -``` -tests/e2e/results/demo/ -tests/e2e/results/cloud/ -tests/e2e/results/local/ -``` - -- [ ] **Step 7: Verify unit tests still pass (nothing broken)** - -```bash -conda run -n job-seeker pytest tests/ -x -q 2>&1 | tail -5 -``` - -Expected: same pass count as before, no collection errors. - -- [ ] **Step 8: Commit** - -```bash -git add requirements.txt pytest.ini tests/e2e/ .gitignore -git commit -m "chore(e2e): scaffold E2E harness directory and install deps" -``` - ---- - -## Task 2: Models — `ErrorRecord` and `ModeConfig` (TDD) - -**Files:** -- Create: `tests/e2e/models.py` -- Create: `tests/e2e/test_helpers.py` (unit tests for models + helpers) - -- [ ] **Step 1: Write failing tests for `ErrorRecord`** - -Create `tests/e2e/test_helpers.py`: - -```python -"""Unit tests for E2E harness models and helper utilities.""" -import fnmatch -import pytest -from tests.e2e.models import ErrorRecord, ModeConfig, diff_errors - - -def test_error_record_equality(): - a = ErrorRecord(type="exception", message="boom", element_html="
boom
") - b = ErrorRecord(type="exception", message="boom", element_html="
boom
") - assert a == b - - -def test_error_record_inequality(): - a = ErrorRecord(type="exception", message="boom", element_html="") - b = ErrorRecord(type="alert", message="boom", element_html="") - assert a != b - - -def test_diff_errors_returns_new_only(): - before = [ErrorRecord("exception", "old error", "")] - after = [ - ErrorRecord("exception", "old error", ""), - ErrorRecord("alert", "new error", ""), - ] - result = diff_errors(before, after) - assert result == [ErrorRecord("alert", "new error", "")] - - -def test_diff_errors_empty_when_no_change(): - errors = [ErrorRecord("exception", "x", "")] - assert diff_errors(errors, errors) == [] - - -def test_diff_errors_empty_before(): - after = [ErrorRecord("alert", "boom", "")] - assert diff_errors([], after) == after - - -def test_mode_config_expected_failure_match(): - config = ModeConfig( - name="demo", - base_url="http://localhost:8504", - auth_setup=lambda ctx: None, - expected_failures=["Fetch*", "Generate Cover Letter"], - results_dir=None, - settings_tabs=["👤 My Profile"], - ) - assert config.matches_expected_failure("Fetch New Jobs") - assert config.matches_expected_failure("Generate Cover Letter") - assert not config.matches_expected_failure("View Jobs") - - -def test_mode_config_no_expected_failures(): - config = ModeConfig( - name="local", - base_url="http://localhost:8502", - auth_setup=lambda ctx: None, - expected_failures=[], - results_dir=None, - settings_tabs=[], - ) - assert not config.matches_expected_failure("Fetch New Jobs") -``` - -- [ ] **Step 2: Run test — confirm it fails (models don't exist yet)** - -```bash -conda run -n job-seeker pytest tests/e2e/test_helpers.py -v 2>&1 | head -20 -``` - -Expected: `ImportError` or `ModuleNotFoundError` — models not yet written. - -- [ ] **Step 3: Write `models.py`** - -Create `tests/e2e/models.py`: - -```python -"""Shared data models for the Peregrine E2E test harness.""" -from __future__ import annotations -import fnmatch -from dataclasses import dataclass, field -from pathlib import Path -from typing import Callable, Any - - -@dataclass(frozen=True) -class ErrorRecord: - type: str # "exception" | "alert" - message: str - element_html: str - - def __eq__(self, other: object) -> bool: - if not isinstance(other, ErrorRecord): - return NotImplemented - return (self.type, self.message) == (other.type, other.message) - - def __hash__(self) -> int: - return hash((self.type, self.message)) - - -def diff_errors(before: list[ErrorRecord], after: list[ErrorRecord]) -> list[ErrorRecord]: - """Return errors in `after` that were not present in `before`.""" - before_set = set(before) - return [e for e in after if e not in before_set] - - -@dataclass -class ModeConfig: - name: str - base_url: str - auth_setup: Callable[[Any], None] # (BrowserContext) -> None - expected_failures: list[str] # fnmatch glob patterns against element labels - results_dir: Path | None - settings_tabs: list[str] # tabs expected to be present in this mode - - def matches_expected_failure(self, label: str) -> bool: - """Return True if label matches any expected_failure pattern (fnmatch).""" - return any(fnmatch.fnmatch(label, pattern) for pattern in self.expected_failures) -``` - -- [ ] **Step 4: Run tests — confirm they pass** - -```bash -conda run -n job-seeker pytest tests/e2e/test_helpers.py -v -``` - -Expected: 7 tests, all PASS. - -- [ ] **Step 5: Commit** - -```bash -git add tests/e2e/models.py tests/e2e/test_helpers.py -git commit -m "feat(e2e): add ErrorRecord, ModeConfig, diff_errors models with tests" -``` - ---- - -## Task 3: Mode Configs — demo, cloud, local - -**Files:** -- Create: `tests/e2e/modes/demo.py` -- Create: `tests/e2e/modes/cloud.py` -- Create: `tests/e2e/modes/local.py` - -No browser needed yet — these are pure data/config. Tests for auth logic (cloud) come in Task 4. - -- [ ] **Step 1: Write `modes/demo.py`** - -```python -"""Demo mode config — port 8504, DEMO_MODE=true, LLM/scraping neutered.""" -from pathlib import Path -from tests.e2e.models import ModeConfig - -# Base tabs present in all modes -_BASE_SETTINGS_TABS = [ - "👤 My Profile", "📝 Resume Profile", "🔎 Search", - "⚙️ System", "🎯 Fine-Tune", "🔑 License", "💾 Data", -] - -DEMO = ModeConfig( - name="demo", - base_url="http://localhost:8504", - auth_setup=lambda ctx: None, # no auth in demo mode - expected_failures=[ - "Fetch*", # "Fetch New Jobs" — discovery blocked - "Generate Cover Letter*", # LLM blocked - "Generate*", # any other Generate button - "Analyze Screenshot*", # vision service blocked - "Push to Calendar*", # calendar push blocked - "Sync Email*", # email sync blocked - "Start Email Sync*", - ], - results_dir=Path("tests/e2e/results/demo"), - settings_tabs=_BASE_SETTINGS_TABS, # no Privacy or Developer tab in demo -) -``` - -- [ ] **Step 2: Write `modes/local.py`** - -```python -"""Local mode config — port 8502, full features, no auth.""" -from pathlib import Path -from tests.e2e.models import ModeConfig - -_BASE_SETTINGS_TABS = [ - "👤 My Profile", "📝 Resume Profile", "🔎 Search", - "⚙️ System", "🎯 Fine-Tune", "🔑 License", "💾 Data", -] - -LOCAL = ModeConfig( - name="local", - base_url="http://localhost:8502", - auth_setup=lambda ctx: None, - expected_failures=[], - results_dir=Path("tests/e2e/results/local"), - settings_tabs=_BASE_SETTINGS_TABS, -) -``` - -- [ ] **Step 3: Write `modes/cloud.py` (auth logic placeholder — full impl in Task 4)** - -```python -"""Cloud mode config — port 8505, CLOUD_MODE=true, Directus JWT auth.""" -from __future__ import annotations -import os -import time -import logging -from pathlib import Path -from typing import Any - -import requests -from dotenv import load_dotenv - -from tests.e2e.models import ModeConfig - -load_dotenv(".env.e2e") - -log = logging.getLogger(__name__) - -_BASE_SETTINGS_TABS = [ - "👤 My Profile", "📝 Resume Profile", "🔎 Search", - "⚙️ System", "🎯 Fine-Tune", "🔑 License", "💾 Data", "🔒 Privacy", -] - -# Token cache — refreshed if within 100s of expiry -_token_cache: dict[str, Any] = {"token": None, "expires_at": 0.0} - - -def _get_jwt() -> str: - """ - Acquire a Directus JWT for the e2e test user. - Strategy A: user/pass login (preferred). - Strategy B: persistent JWT from E2E_DIRECTUS_JWT env var. - Caches the token and refreshes 100s before expiry. - """ - # Strategy B fallback first check - if not os.environ.get("E2E_DIRECTUS_EMAIL"): - jwt = os.environ.get("E2E_DIRECTUS_JWT", "") - if not jwt: - raise RuntimeError("Cloud mode requires E2E_DIRECTUS_EMAIL+PASSWORD or E2E_DIRECTUS_JWT in .env.e2e") - return jwt - - # Check cache - if _token_cache["token"] and time.time() < _token_cache["expires_at"] - 100: - return _token_cache["token"] - - # Strategy A: fresh login - directus_url = os.environ.get("E2E_DIRECTUS_URL", "http://172.31.0.2:8055") - resp = requests.post( - f"{directus_url}/auth/login", - json={ - "email": os.environ["E2E_DIRECTUS_EMAIL"], - "password": os.environ["E2E_DIRECTUS_PASSWORD"], - }, - timeout=10, - ) - resp.raise_for_status() - data = resp.json()["data"] - token = data["access_token"] - expires_in_ms = data.get("expires", 900_000) - - _token_cache["token"] = token - _token_cache["expires_at"] = time.time() + (expires_in_ms / 1000) - log.info("Acquired Directus JWT for e2e test user (expires in %ds)", expires_in_ms // 1000) - return token - - -def _cloud_auth_setup(context: Any) -> None: - """Inject X-CF-Session header with real Directus JWT into all browser requests.""" - jwt = _get_jwt() - # X-CF-Session value is parsed by cloud_session.py as a cookie-format string: - # it looks for cf_session= within the header value. - context.set_extra_http_headers({"X-CF-Session": f"cf_session={jwt}"}) - - -CLOUD = ModeConfig( - name="cloud", - base_url="http://localhost:8505", - auth_setup=_cloud_auth_setup, - expected_failures=[], - results_dir=Path("tests/e2e/results/cloud"), - settings_tabs=_BASE_SETTINGS_TABS, -) -``` - -- [ ] **Step 4: Add JWT auth tests to `tests/test_e2e_helpers.py`** - -Append to `tests/test_e2e_helpers.py` (note: outside `tests/e2e/`): - -```python -from unittest.mock import patch, MagicMock -import time - - -def test_get_jwt_strategy_b_fallback(monkeypatch): - """Falls back to persistent JWT when no email env var set.""" - monkeypatch.delenv("E2E_DIRECTUS_EMAIL", raising=False) - monkeypatch.setenv("E2E_DIRECTUS_JWT", "persistent.jwt.token") - # Reset module-level cache - import tests.e2e.modes.cloud as cloud_mod - cloud_mod._token_cache.update({"token": None, "expires_at": 0.0}) - assert cloud_mod._get_jwt() == "persistent.jwt.token" - - -def test_get_jwt_strategy_b_raises_if_no_token(monkeypatch): - """Raises if neither email nor JWT env var is set.""" - monkeypatch.delenv("E2E_DIRECTUS_EMAIL", raising=False) - monkeypatch.delenv("E2E_DIRECTUS_JWT", raising=False) - import tests.e2e.modes.cloud as cloud_mod - cloud_mod._token_cache.update({"token": None, "expires_at": 0.0}) - with pytest.raises(RuntimeError, match="Cloud mode requires"): - cloud_mod._get_jwt() - - -def test_get_jwt_strategy_a_login(monkeypatch): - """Strategy A: calls Directus /auth/login and caches token.""" - monkeypatch.setenv("E2E_DIRECTUS_EMAIL", "e2e@circuitforge.tech") - monkeypatch.setenv("E2E_DIRECTUS_PASSWORD", "testpass") - monkeypatch.setenv("E2E_DIRECTUS_URL", "http://fake-directus:8055") - - import tests.e2e.modes.cloud as cloud_mod - cloud_mod._token_cache.update({"token": None, "expires_at": 0.0}) - - mock_resp = MagicMock() - mock_resp.json.return_value = {"data": {"access_token": "fresh.jwt", "expires": 900_000}} - mock_resp.raise_for_status = lambda: None - - with patch("tests.e2e.modes.cloud.requests.post", return_value=mock_resp) as mock_post: - token = cloud_mod._get_jwt() - - assert token == "fresh.jwt" - mock_post.assert_called_once() - assert cloud_mod._token_cache["token"] == "fresh.jwt" - - -def test_get_jwt_uses_cache(monkeypatch): - """Returns cached token if not yet expired.""" - monkeypatch.setenv("E2E_DIRECTUS_EMAIL", "e2e@circuitforge.tech") - import tests.e2e.modes.cloud as cloud_mod - cloud_mod._token_cache.update({"token": "cached.jwt", "expires_at": time.time() + 500}) - with patch("tests.e2e.modes.cloud.requests.post") as mock_post: - token = cloud_mod._get_jwt() - assert token == "cached.jwt" - mock_post.assert_not_called() -``` - -- [ ] **Step 5: Run tests** - -```bash -conda run -n job-seeker pytest tests/test_e2e_helpers.py -v -``` - -Expected: 11 tests, all PASS. - -- [ ] **Step 6: Commit** - -```bash -git add tests/e2e/modes/ tests/e2e/test_helpers.py -git commit -m "feat(e2e): add mode configs (demo/cloud/local) with Directus JWT auth" -``` - ---- - -## Task 4: `conftest.py` — Browser Fixtures + Streamlit Helpers - -**Files:** -- Create: `tests/e2e/conftest.py` - -This is the heart of the harness. No unit tests for the browser fixtures themselves (they require a live browser), but the helper functions that don't touch the browser get tested in `test_helpers.py`. - -- [ ] **Step 1: Add `get_page_errors` and `get_console_errors` tests to `test_helpers.py`** - -These functions take a `page` object. We can test them with a mock that mimics Playwright's `page.query_selector_all()` and `page.evaluate()` return shapes: - -```python -def test_get_page_errors_finds_exceptions(monkeypatch): - """get_page_errors returns ErrorRecord for stException elements.""" - from tests.e2e.conftest import get_page_errors - - mock_el = MagicMock() - mock_el.get_attribute.return_value = None # no kind attr - mock_el.inner_text.return_value = "RuntimeError: boom" - mock_el.inner_html.return_value = "
RuntimeError: boom
" - - mock_page = MagicMock() - mock_page.query_selector_all.side_effect = lambda sel: ( - [mock_el] if "stException" in sel else [] - ) - - errors = get_page_errors(mock_page) - assert len(errors) == 1 - assert errors[0].type == "exception" - assert "boom" in errors[0].message - - -def test_get_page_errors_finds_alert_errors(monkeypatch): - """get_page_errors returns ErrorRecord for stAlert with stAlertContentError child. - - In Streamlit 1.35+, st.error() renders a child [data-testid="stAlertContentError"]. - The kind attribute is a React prop — it is NOT available via get_attribute() in the DOM. - Detection must use the child element, not the attribute. - """ - from tests.e2e.conftest import get_page_errors - - # Mock the child error element that Streamlit 1.35+ renders inside st.error() - mock_child = MagicMock() - - mock_el = MagicMock() - mock_el.query_selector.return_value = mock_child # stAlertContentError found - mock_el.inner_text.return_value = "Something went wrong" - mock_el.inner_html.return_value = "
Something went wrong
" - - mock_page = MagicMock() - mock_page.query_selector_all.side_effect = lambda sel: ( - [] if "stException" in sel else [mock_el] - ) - - errors = get_page_errors(mock_page) - assert len(errors) == 1 - assert errors[0].type == "alert" - - -def test_get_page_errors_ignores_non_error_alerts(monkeypatch): - """get_page_errors does NOT flag st.warning() or st.info() alerts.""" - from tests.e2e.conftest import get_page_errors - - mock_el = MagicMock() - mock_el.query_selector.return_value = None # no stAlertContentError child - mock_el.inner_text.return_value = "Just a warning" - - mock_page = MagicMock() - mock_page.query_selector_all.side_effect = lambda sel: ( - [] if "stException" in sel else [mock_el] - ) - - errors = get_page_errors(mock_page) - assert errors == [] - - -def test_get_console_errors_filters_noise(): - """get_console_errors filters benign Streamlit WebSocket reconnect messages.""" - from tests.e2e.conftest import get_console_errors - - messages = [ - MagicMock(type="error", text="WebSocket connection closed"), # benign - MagicMock(type="error", text="TypeError: cannot read property"), # real - MagicMock(type="log", text="irrelevant"), - ] - errors = get_console_errors(messages) - assert errors == ["TypeError: cannot read property"] -``` - -- [ ] **Step 2: Run tests — confirm they fail (conftest not yet written)** - -```bash -conda run -n job-seeker pytest tests/e2e/test_helpers.py::test_get_page_errors_finds_exceptions -v 2>&1 | tail -5 -``` - -Expected: `ImportError` from `tests.e2e.conftest`. - -- [ ] **Step 3: Write `tests/e2e/conftest.py`** - -```python -""" -Peregrine E2E test harness — shared fixtures and Streamlit helpers. - -Run with: pytest tests/e2e/ --mode=demo|cloud|local|all -""" -from __future__ import annotations -import os -import time -import logging -from pathlib import Path -from typing import Generator - -import pytest -from dotenv import load_dotenv -from playwright.sync_api import Page, BrowserContext, sync_playwright - -from tests.e2e.models import ErrorRecord, ModeConfig, diff_errors -from tests.e2e.modes.demo import DEMO -from tests.e2e.modes.cloud import CLOUD -from tests.e2e.modes.local import LOCAL - -load_dotenv(".env.e2e") -log = logging.getLogger(__name__) - -_ALL_MODES = {"demo": DEMO, "cloud": CLOUD, "local": LOCAL} - -# ── Noise filter for console errors ────────────────────────────────────────── -_CONSOLE_NOISE = [ - "WebSocket connection", - "WebSocket is closed", - "_stcore/stream", - "favicon.ico", -] - - -# ── pytest option ───────────────────────────────────────────────────────────── -def pytest_addoption(parser): - parser.addoption( - "--mode", - action="store", - default="demo", - choices=["demo", "cloud", "local", "all"], - help="Which Peregrine instance(s) to test against", - ) - - -def pytest_configure(config): - config.addinivalue_line("markers", "e2e: mark test as E2E (requires running Peregrine instance)") - - -# ── Active mode(s) fixture ──────────────────────────────────────────────────── -@pytest.fixture(scope="session") -def active_modes(pytestconfig) -> list[ModeConfig]: - mode_arg = pytestconfig.getoption("--mode") - if mode_arg == "all": - return list(_ALL_MODES.values()) - return [_ALL_MODES[mode_arg]] - - -# ── Browser fixture (session-scoped, headless by default) ───────────────────── -@pytest.fixture(scope="session") -def browser_context_args(): - return { - "viewport": {"width": 1280, "height": 900}, - "ignore_https_errors": True, - } - - -# ── Instance availability guard ─────────────────────────────────────────────── -@pytest.fixture(scope="session", autouse=True) -def assert_instances_reachable(active_modes): - """Fail fast with a clear message if any target instance is not running.""" - import socket - for mode in active_modes: - from urllib.parse import urlparse - parsed = urlparse(mode.base_url) - host, port = parsed.hostname, parsed.port or 80 - try: - with socket.create_connection((host, port), timeout=3): - pass - except OSError: - pytest.exit( - f"[{mode.name}] Instance not reachable at {mode.base_url} — " - "start the instance before running E2E tests.", - returncode=1, - ) - - -# ── Per-mode browser context with auth injected ─────────────────────────────── -@pytest.fixture(scope="session") -def mode_contexts(active_modes, playwright) -> dict[str, BrowserContext]: - """One browser context per active mode, with auth injected via route handler. - - Cloud mode uses context.route() to inject a fresh JWT on every request — - this ensures the token cache refresh logic in cloud.py is exercised mid-run, - even if a test session exceeds the 900s Directus JWT TTL. - """ - from tests.e2e.modes.cloud import _get_jwt - - headless = os.environ.get("E2E_HEADLESS", "true").lower() != "false" - slow_mo = int(os.environ.get("E2E_SLOW_MO", "0")) - browser = playwright.chromium.launch(headless=headless, slow_mo=slow_mo) - contexts = {} - for mode in active_modes: - ctx = browser.new_context(viewport={"width": 1280, "height": 900}) - if mode.name == "cloud": - # Route-based JWT injection: _get_jwt() is called on each request, - # so the token cache refresh fires naturally during long runs. - def _inject_jwt(route, request): - jwt = _get_jwt() - headers = {**request.headers, "x-cf-session": f"cf_session={jwt}"} - route.continue_(headers=headers) - ctx.route(f"{mode.base_url}/**", _inject_jwt) - else: - mode.auth_setup(ctx) - contexts[mode.name] = ctx - yield contexts - browser.close() - - -# ── Streamlit helper: wait for page to settle ───────────────────────────────── -def wait_for_streamlit(page: Page, timeout: int = 10_000) -> None: - """ - Wait until Streamlit has finished rendering: - 1. No stSpinner visible - 2. No stStatusWidget showing 'running' - 3. 2000ms idle window (accounts for 3s fragment poller between ticks) - - NOTE: Do NOT use page.wait_for_load_state("networkidle") — Playwright's - networkidle uses a hard-coded 500ms idle window which is too short for - Peregrine's sidebar fragment poller (fires every 3s). We implement our - own 2000ms window instead. - """ - # Wait for spinners to clear - try: - page.wait_for_selector('[data-testid="stSpinner"]', state="hidden", timeout=timeout) - except Exception: - pass # spinner may not be present at all — not an error - # Wait for status widget to stop showing 'running' - try: - page.wait_for_function( - "() => !document.querySelector('[data-testid=\"stStatusWidget\"]')" - "?.textContent?.includes('running')", - timeout=5_000, - ) - except Exception: - pass - # 2000ms settle window — long enough to confirm quiet between fragment poll ticks - page.wait_for_timeout(2_000) - - -# ── Streamlit helper: scan DOM for errors ──────────────────────────────────── -def get_page_errors(page) -> list[ErrorRecord]: - """ - Scan the DOM for Streamlit error indicators: - - [data-testid="stException"] — unhandled Python exceptions - - [data-testid="stAlert"] with kind="error" — st.error() calls - """ - errors: list[ErrorRecord] = [] - - for el in page.query_selector_all('[data-testid="stException"]'): - errors.append(ErrorRecord( - type="exception", - message=el.inner_text()[:500], - element_html=el.inner_html()[:1000], - )) - - for el in page.query_selector_all('[data-testid="stAlert"]'): - # In Streamlit 1.35+, st.error() renders a child [data-testid="stAlertContentError"]. - # The `kind` attribute is a React prop, not a DOM attribute — get_attribute("kind") - # always returns None in production. Use child element detection as the authoritative check. - if el.query_selector('[data-testid="stAlertContentError"]'): - errors.append(ErrorRecord( - type="alert", - message=el.inner_text()[:500], - element_html=el.inner_html()[:1000], - )) - - return errors - - -# ── Streamlit helper: capture console errors ────────────────────────────────── -def get_console_errors(messages) -> list[str]: - """Filter browser console messages to real errors, excluding Streamlit noise.""" - result = [] - for msg in messages: - if msg.type != "error": - continue - text = msg.text - if any(noise in text for noise in _CONSOLE_NOISE): - continue - result.append(text) - return result - - -# ── Screenshot helper ───────────────────────────────────────────────────────── -def screenshot_on_fail(page: Page, mode_name: str, test_name: str) -> Path: - results_dir = Path(f"tests/e2e/results/{mode_name}/screenshots") - results_dir.mkdir(parents=True, exist_ok=True) - path = results_dir / f"{test_name}.png" - page.screenshot(path=str(path), full_page=True) - return path -``` - -- [ ] **Step 4: Run helper tests — confirm they pass** - -```bash -conda run -n job-seeker pytest tests/e2e/test_helpers.py -v -``` - -Expected: all tests PASS (including the new `get_page_errors` and `get_console_errors` tests). - -- [ ] **Step 5: Commit** - -```bash -git add tests/e2e/conftest.py tests/e2e/test_helpers.py -git commit -m "feat(e2e): add conftest with Streamlit helpers, browser fixtures, console filter" -``` - ---- - -## Task 5: `BasePage` + Page Objects - -**Files:** -- Create: `tests/e2e/pages/base_page.py` -- Create: `tests/e2e/pages/home_page.py` -- Create: `tests/e2e/pages/job_review_page.py` -- Create: `tests/e2e/pages/apply_page.py` -- Create: `tests/e2e/pages/interviews_page.py` -- Create: `tests/e2e/pages/interview_prep_page.py` -- Create: `tests/e2e/pages/survey_page.py` -- Create: `tests/e2e/pages/settings_page.py` - -- [ ] **Step 1: Write `base_page.py`** - -```python -"""Base page object — navigation, error capture, interactable discovery.""" -from __future__ import annotations -import logging -import warnings -import fnmatch -from dataclasses import dataclass, field -from typing import TYPE_CHECKING - -from playwright.sync_api import Page - -from tests.e2e.conftest import wait_for_streamlit, get_page_errors, get_console_errors -from tests.e2e.models import ErrorRecord, ModeConfig - -if TYPE_CHECKING: - pass - -log = logging.getLogger(__name__) - -# Selectors for interactive elements to audit -INTERACTABLE_SELECTORS = [ - '[data-testid="baseButton-primary"] button', - '[data-testid="baseButton-secondary"] button', - '[data-testid="stTab"] button[role="tab"]', - '[data-testid="stSelectbox"]', - '[data-testid="stCheckbox"] input', -] - - -@dataclass -class InteractableElement: - label: str - selector: str - index: int # nth match for this selector - - -class BasePage: - """Base page object for all Peregrine pages.""" - - nav_label: str = "" # sidebar nav link text — override in subclass - - def __init__(self, page: Page, mode: ModeConfig, console_messages: list): - self.page = page - self.mode = mode - self._console_messages = console_messages - - def navigate(self) -> None: - """Navigate to this page by clicking its sidebar nav link.""" - sidebar = self.page.locator('[data-testid="stSidebarNav"]') - sidebar.get_by_text(self.nav_label, exact=False).first.click() - wait_for_streamlit(self.page) - - def get_errors(self) -> list[ErrorRecord]: - return get_page_errors(self.page) - - def get_console_errors(self) -> list[str]: - return get_console_errors(self._console_messages) - - def discover_interactables(self, skip_sidebar: bool = True) -> list[InteractableElement]: - """ - Find all interactive elements on the current page. - Excludes sidebar elements (navigation handled separately). - """ - found: list[InteractableElement] = [] - seen_labels: dict[str, int] = {} - - for selector in INTERACTABLE_SELECTORS: - elements = self.page.query_selector_all(selector) - for i, el in enumerate(elements): - # Skip sidebar elements - if skip_sidebar and el.evaluate( - "el => el.closest('[data-testid=\"stSidebar\"]') !== null" - ): - continue - label = (el.inner_text() or el.get_attribute("aria-label") or f"element-{i}").strip() - label = label[:80] # truncate for report readability - found.append(InteractableElement(label=label, selector=selector, index=i)) - - # Warn on ambiguous expected_failure patterns - for pattern in self.mode.expected_failures: - matches = [e for e in found if fnmatch.fnmatch(e.label, pattern)] - if len(matches) > 1: - warnings.warn( - f"expected_failure pattern '{pattern}' matches {len(matches)} elements: " - + ", ".join(f'"{m.label}"' for m in matches), - stacklevel=2, - ) - - return found -``` - -- [ ] **Step 2: Write page objects for all 7 pages** - -Each page object only needs to declare its `nav_label`. Significant page-specific logic goes here later if needed (e.g., Settings tab iteration). - -Create `tests/e2e/pages/home_page.py`: -```python -from tests.e2e.pages.base_page import BasePage - -class HomePage(BasePage): - nav_label = "Home" -``` - -Create `tests/e2e/pages/job_review_page.py`: -```python -from tests.e2e.pages.base_page import BasePage - -class JobReviewPage(BasePage): - nav_label = "Job Review" -``` - -Create `tests/e2e/pages/apply_page.py`: -```python -from tests.e2e.pages.base_page import BasePage - -class ApplyPage(BasePage): - nav_label = "Apply Workspace" -``` - -Create `tests/e2e/pages/interviews_page.py`: -```python -from tests.e2e.pages.base_page import BasePage - -class InterviewsPage(BasePage): - nav_label = "Interviews" -``` - -Create `tests/e2e/pages/interview_prep_page.py`: -```python -from tests.e2e.pages.base_page import BasePage - -class InterviewPrepPage(BasePage): - nav_label = "Interview Prep" -``` - -Create `tests/e2e/pages/survey_page.py`: -```python -from tests.e2e.pages.base_page import BasePage - -class SurveyPage(BasePage): - nav_label = "Survey Assistant" -``` - -Create `tests/e2e/pages/settings_page.py`: -```python -"""Settings page — tab-aware page object.""" -from __future__ import annotations -import logging - -from tests.e2e.pages.base_page import BasePage, InteractableElement -from tests.e2e.conftest import wait_for_streamlit - -log = logging.getLogger(__name__) - - -class SettingsPage(BasePage): - nav_label = "Settings" - - def discover_interactables(self, skip_sidebar: bool = True) -> list[InteractableElement]: - """ - Settings has multiple tabs. Click each expected tab, collect interactables - within it, then return the full combined list. - """ - all_elements: list[InteractableElement] = [] - tab_labels = self.mode.settings_tabs - - for tab_label in tab_labels: - # Click the tab - # Match on full label text — Playwright's filter(has_text=) handles emoji correctly. - # Do NOT use tab_label.split()[-1]: "My Profile" and "Resume Profile" both end - # in "Profile" causing a collision that silently skips Resume Profile's interactables. - tab_btn = self.page.locator( - '[data-testid="stTab"] button[role="tab"]' - ).filter(has_text=tab_label) - if tab_btn.count() == 0: - log.warning("Settings tab not found: %s", tab_label) - continue - tab_btn.first.click() - wait_for_streamlit(self.page) - - # Collect non-tab interactables within this tab's content - tab_elements = super().discover_interactables(skip_sidebar=skip_sidebar) - # Exclude the tab buttons themselves (already clicked) - tab_elements = [ - e for e in tab_elements - if 'role="tab"' not in e.selector - ] - all_elements.extend(tab_elements) - - return all_elements -``` - -- [ ] **Step 3: Verify imports work** - -```bash -conda run -n job-seeker python -c " -from tests.e2e.pages.home_page import HomePage -from tests.e2e.pages.settings_page import SettingsPage -print('page objects ok') -" -``` - -Expected: `page objects ok` - -- [ ] **Step 4: Commit** - -```bash -git add tests/e2e/pages/ -git commit -m "feat(e2e): add BasePage and 7 page objects" -``` - ---- - -## Task 6: Smoke Tests - -**Files:** -- Create: `tests/e2e/test_smoke.py` - -- [ ] **Step 1: Write `test_smoke.py`** - -```python -""" -Smoke pass — navigate each page, wait for Streamlit to settle, assert no errors on load. -Errors on page load are always real bugs (not mode-specific). - -Run: pytest tests/e2e/test_smoke.py --mode=demo -""" -from __future__ import annotations -import pytest -from playwright.sync_api import sync_playwright - -from tests.e2e.conftest import wait_for_streamlit, get_page_errors, get_console_errors, screenshot_on_fail -from tests.e2e.models import ModeConfig -from tests.e2e.pages.home_page import HomePage -from tests.e2e.pages.job_review_page import JobReviewPage -from tests.e2e.pages.apply_page import ApplyPage -from tests.e2e.pages.interviews_page import InterviewsPage -from tests.e2e.pages.interview_prep_page import InterviewPrepPage -from tests.e2e.pages.survey_page import SurveyPage -from tests.e2e.pages.settings_page import SettingsPage - -PAGE_CLASSES = [ - HomePage, JobReviewPage, ApplyPage, InterviewsPage, - InterviewPrepPage, SurveyPage, SettingsPage, -] - - -@pytest.mark.e2e -def test_smoke_all_pages(active_modes, mode_contexts, playwright): - """For each active mode: navigate to every page and assert no errors on load.""" - failures: list[str] = [] - - for mode in active_modes: - ctx = mode_contexts[mode.name] - page = ctx.new_page() - console_msgs: list = [] - page.on("console", lambda msg: console_msgs.append(msg)) - - # Navigate to app root first to establish session - page.goto(mode.base_url) - wait_for_streamlit(page) - - for PageClass in PAGE_CLASSES: - pg = PageClass(page, mode, console_msgs) - pg.navigate() - console_msgs.clear() # reset per-page - - dom_errors = pg.get_errors() - console_errors = pg.get_console_errors() - - if dom_errors or console_errors: - shot_path = screenshot_on_fail(page, mode.name, f"smoke_{PageClass.__name__}") - detail = "\n".join( - [f" DOM: {e.message}" for e in dom_errors] - + [f" Console: {e}" for e in console_errors] - ) - failures.append( - f"[{mode.name}] {PageClass.nav_label} — errors on load:\n{detail}\n screenshot: {shot_path}" - ) - - page.close() - - if failures: - pytest.fail("Smoke test failures:\n\n" + "\n\n".join(failures)) -``` - -- [ ] **Step 2: Run smoke test against demo mode (demo must be running at 8504)** - -```bash -conda run -n job-seeker pytest tests/e2e/test_smoke.py --mode=demo -v -s 2>&1 | tail -30 -``` - -Expected: test runs and reports results. Failures are expected — that's the point of this tool. Record what breaks. - -- [ ] **Step 3: Commit** - -```bash -git add tests/e2e/test_smoke.py -git commit -m "feat(e2e): add smoke test pass for all pages across modes" -``` - ---- - -## Task 7: Interaction Tests - -**Files:** -- Create: `tests/e2e/test_interactions.py` - -- [ ] **Step 1: Write `test_interactions.py`** - -```python -""" -Interaction pass — discover every interactable element on each page, click it, -diff errors before/after. Demo mode XFAIL patterns are checked; unexpected passes -are flagged as regressions. - -Run: pytest tests/e2e/test_interactions.py --mode=demo -v -""" -from __future__ import annotations -import pytest - -from tests.e2e.conftest import ( - wait_for_streamlit, get_page_errors, screenshot_on_fail, -) -from tests.e2e.models import ModeConfig, diff_errors -from tests.e2e.pages.home_page import HomePage -from tests.e2e.pages.job_review_page import JobReviewPage -from tests.e2e.pages.apply_page import ApplyPage -from tests.e2e.pages.interviews_page import InterviewsPage -from tests.e2e.pages.interview_prep_page import InterviewPrepPage -from tests.e2e.pages.survey_page import SurveyPage -from tests.e2e.pages.settings_page import SettingsPage - -PAGE_CLASSES = [ - HomePage, JobReviewPage, ApplyPage, InterviewsPage, - InterviewPrepPage, SurveyPage, SettingsPage, -] - - -@pytest.mark.e2e -def test_interactions_all_pages(active_modes, mode_contexts, playwright): - """ - For each active mode and page: click every discovered interactable, - diff errors, XFAIL expected demo failures, FAIL on unexpected errors. - XPASS (expected failure that didn't fail) is also reported. - """ - failures: list[str] = [] - xfails: list[str] = [] - xpasses: list[str] = [] - - for mode in active_modes: - ctx = mode_contexts[mode.name] - page = ctx.new_page() - console_msgs: list = [] - page.on("console", lambda msg: console_msgs.append(msg)) - - page.goto(mode.base_url) - wait_for_streamlit(page) - - for PageClass in PAGE_CLASSES: - pg = PageClass(page, mode, console_msgs) - pg.navigate() - - elements = pg.discover_interactables() - - for element in elements: - # Reset to this page before each interaction - pg.navigate() - - before = pg.get_errors() - - # Interact with element (click for buttons/tabs/checkboxes, open for selects) - try: - all_matches = page.query_selector_all(element.selector) - # Filter out sidebar elements - content_matches = [ - el for el in all_matches - if not el.evaluate( - "el => el.closest('[data-testid=\"stSidebar\"]') !== null" - ) - ] - if element.index < len(content_matches): - content_matches[element.index].click() - else: - continue # element disappeared after navigation reset - except Exception as e: - failures.append( - f"[{mode.name}] {PageClass.nav_label} / '{element.label}' — " - f"could not interact: {e}" - ) - continue - - wait_for_streamlit(page) - after = pg.get_errors() - new_errors = diff_errors(before, after) - - is_expected = mode.matches_expected_failure(element.label) - - if new_errors: - if is_expected: - xfails.append( - f"[{mode.name}] {PageClass.nav_label} / '{element.label}' " - f"(expected) — {new_errors[0].message[:120]}" - ) - else: - shot = screenshot_on_fail( - page, mode.name, - f"interact_{PageClass.__name__}_{element.label[:30]}" - ) - failures.append( - f"[{mode.name}] {PageClass.nav_label} / '{element.label}' — " - f"unexpected error: {new_errors[0].message[:200]}\n screenshot: {shot}" - ) - else: - if is_expected: - xpasses.append( - f"[{mode.name}] {PageClass.nav_label} / '{element.label}' " - f"— expected to fail but PASSED (neutering guard may be broken!)" - ) - - page.close() - - # Report summary - report_lines = [] - if xfails: - report_lines.append(f"XFAIL ({len(xfails)} expected failures, demo mode working correctly):") - report_lines.extend(f" {x}" for x in xfails) - if xpasses: - report_lines.append(f"\nXPASS — REGRESSION ({len(xpasses)} neutering guards broken!):") - report_lines.extend(f" {x}" for x in xpasses) - if failures: - report_lines.append(f"\nFAIL ({len(failures)} unexpected errors):") - report_lines.extend(f" {x}" for x in failures) - - if report_lines: - print("\n\n=== E2E Interaction Report ===\n" + "\n".join(report_lines)) - - # XPASSes are regressions — fail the test - if xpasses or failures: - pytest.fail( - f"{len(failures)} unexpected error(s), {len(xpasses)} xpass regression(s). " - "See report above." - ) -``` - -- [ ] **Step 2: Run interaction test against demo** - -```bash -conda run -n job-seeker pytest tests/e2e/test_interactions.py --mode=demo -v -s 2>&1 | tail -40 -``` - -Expected: test runs; XFAILs are logged (LLM buttons in demo mode), any unexpected errors are reported as FAILs. First run will reveal what demo seed data gaps exist. - -- [ ] **Step 3: Commit** - -```bash -git add tests/e2e/test_interactions.py -git commit -m "feat(e2e): add interaction audit pass with XFAIL/XPASS reporting" -``` - ---- - -## Task 8: `compose.e2e.yml`, Reporting Config + Prerequisites - -**Note:** `.env.e2e` and `.env.e2e.example` were already created during pre-implementation -setup (Directus test user provisioned at `e2e@circuitforge.tech`, credentials stored). -This task verifies they exist and adds the remaining config files. - -**Files:** -- Create: `compose.e2e.yml` - -- [ ] **Step 1: Verify `.env.e2e` and `.env.e2e.example` exist** - -```bash -ls -la .env.e2e .env.e2e.example -``` - -Expected: both files present. If `.env.e2e` is missing, copy from example and fill in credentials. - -- [ ] **Step 2: Seed `background_tasks` table to empty state for cloud/local runs** - -Cloud and local mode instances may have background tasks in their DBs that cause -Peregrine's sidebar fragment poller to fire continuously, interfering with -`wait_for_streamlit`. Clear completed/stuck tasks before running E2E: - -```bash -# For cloud instance DB (e2e-test-runner user) -sqlite3 /devl/menagerie-data/e2e-test-runner/peregrine/staging.db \ - "DELETE FROM background_tasks WHERE status IN ('completed','failed','running');" - -# For local instance DB -sqlite3 data/staging.db \ - "DELETE FROM background_tasks WHERE status IN ('completed','failed','running');" -``` - -Add this as a step in the `manage.sh e2e` subcommand — run before pytest. - -- [ ] **Step 3: Write `compose.e2e.yml`** - -```yaml -# compose.e2e.yml — E2E test overlay for cloud instance -# Usage: docker compose -f compose.cloud.yml -f compose.e2e.yml up -d -# -# No secrets here — credentials live in .env.e2e (gitignored) -# This file is safe to commit. -services: - peregrine-cloud: - environment: - - E2E_TEST_USER_ID=e2e-test-runner - - E2E_TEST_USER_EMAIL=e2e@circuitforge.tech -``` - -- [ ] **Step 2: Add `--json-report` to E2E run commands in manage.sh** - -Find the section in `manage.sh` that handles test commands, or add a new `e2e` subcommand: - -```bash -e2e) - MODE="${2:-demo}" - RESULTS_DIR="tests/e2e/results/${MODE}" - mkdir -p "${RESULTS_DIR}" - conda run -n job-seeker pytest tests/e2e/ \ - --mode="${MODE}" \ - --json-report \ - --json-report-file="${RESULTS_DIR}/report.json" \ - --playwright-screenshot=on \ - -v "$@" - ;; -``` - -- [ ] **Step 3: Add results dirs to `.gitignore`** - -Ensure these lines are in `.gitignore` (from Task 1, verify they're present): -``` -tests/e2e/results/demo/ -tests/e2e/results/cloud/ -tests/e2e/results/local/ -``` - -- [ ] **Step 4: Test the manage.sh e2e command** - -```bash -bash manage.sh e2e demo 2>&1 | tail -20 -``` - -Expected: pytest runs with JSON report output. - -- [ ] **Step 5: Commit** - -```bash -git add compose.e2e.yml manage.sh -git commit -m "feat(e2e): add compose.e2e.yml overlay and manage.sh e2e subcommand" -``` - ---- - -## Task 9: Final Verification Run - -- [ ] **Step 1: Run full unit test suite — verify nothing broken** - -```bash -conda run -n job-seeker pytest tests/ -q 2>&1 | tail -10 -``` - -Expected: same pass count as before this feature branch, no regressions. - -- [ ] **Step 2: Run E2E helper unit tests** - -```bash -conda run -n job-seeker pytest tests/e2e/test_helpers.py -v -``` - -Expected: all PASS. - -- [ ] **Step 3: Run smoke pass (demo mode)** - -```bash -bash manage.sh e2e demo tests/e2e/test_smoke.py 2>&1 | tail -30 -``` - -Record any failures — these become demo data gap issues to fix separately. - -- [ ] **Step 4: Run interaction pass (demo mode)** - -```bash -bash manage.sh e2e demo tests/e2e/test_interactions.py 2>&1 | tail -40 -``` - -Record XFAILs (expected) and any unexpected FAILs (open issues). - -- [ ] **Step 5: Open issues for each unexpected FAIL** - -For each unexpected error surfaced by the interaction pass, open a Forgejo issue: -```bash -# Example — adapt per actual failures found -gh issue create --repo git.opensourcesolarpunk.com/Circuit-Forge/peregrine \ - --title "demo: / -``` -Add "Survey →" button with stages `['survey', 'phone_screen', 'interviewing', 'offer']` using the same `card-action` class. - -**InterviewsView.vue:** There are **3** `` instances (kanban columns: phoneScreen line ~462, interviewing ~475, offerHired ~488) — NOT 4. The `survey`-stage jobs live in the pre-list section (lines ~372–432) which renders plain `
` elements, not ``. - -Two changes needed: -1. Add `@survey="router.push('/survey/' + $event)"` to all 3 `` instances (same pattern as `@prep`). -2. Add a "Survey →" button directly to the pre-list row template for `survey`-stage jobs. The pre-list row is at line ~373 inside `v-for="job in pagedApplied"`. Add a button after the existing `btn-move-pre`: -```html - -``` - -**Mount guard:** Read `route.params.id` → redirect to `/interviews` if missing or non-numeric. Look up job in `interviewsStore.jobs`; if status not in `['survey', 'phone_screen', 'interviewing', 'offer']`, redirect. Call `surveyStore.fetchFor(jobId)` on mount; `surveyStore.clear()` on unmount. - -**useApiFetch body pattern:** Look at how `InterviewPrepView.vue` makes POST calls if needed — but the store handles all API calls; the view only calls store methods. - -Look at `web/src/views/InterviewPrepView.vue` as the reference for how views use stores, handle route guards, and apply CSS variables. The theme variables file is at `web/src/assets/theme.css` or similar — check what exists. - -- [ ] **Step 1: Verify existing theme variables and CSS patterns** - -```bash -grep -r "var(--space\|var(--color\|var(--font" web/src/assets/ web/src/views/InterviewPrepView.vue 2>/dev/null | head -20 -ls web/src/assets/ -``` - -This confirms which CSS variables are available for the layout. - -- [ ] **Step 2: Add `/survey/:id` route to router** - -In `web/src/router/index.ts`, add after the existing `/survey` line: -```typescript -{ path: '/survey/:id', component: () => import('../views/SurveyView.vue') }, -``` - -The existing `/survey` (no-id) route will continue to load `SurveyView.vue`, which is fine — the component mount guard immediately redirects to `/interviews` when `jobId` is missing/NaN. No router-level redirect needed. - -- [ ] **Step 3: Implement SurveyView.vue** - -Replace the placeholder stub entirely. Key sections: - -```vue - - -