Compare commits

..

No commits in common. "818e46c17e50279e77070d162cc0c1c2eba29dad" and "16d27c91fcbece697cae9085d7b580186e63e939" have entirely different histories.

13 changed files with 6031 additions and 39 deletions

View file

@ -1,7 +1,7 @@
"""
Tier definitions and feature gates for Peregrine.
Tiers: free < paid < premium < ultra (ultra reserved; no Peregrine features use it yet)
Tiers: free < paid < premium
FEATURES maps feature key minimum tier required.
Features not in FEATURES are available to all tiers (free).
@ -25,11 +25,7 @@ from __future__ import annotations
import os as _os
from pathlib import Path
from circuitforge_core.tiers import (
can_use as _core_can_use,
TIERS,
tier_label as _core_tier_label,
)
TIERS = ["free", "paid", "premium"]
# Maps feature key → minimum tier string required.
# Features absent from this dict are free (available to all).
@ -136,20 +132,25 @@ def can_use(
Returns False for unknown/invalid tier strings.
"""
effective_tier = demo_tier if (demo_tier is not None and _DEMO_MODE) else tier
# Pass Peregrine's BYOK_UNLOCKABLE via has_byok collapse — core's frozenset is empty
required = FEATURES.get(feature)
if required is None:
return True # not gated — available to all
if has_byok and feature in BYOK_UNLOCKABLE:
return True
return _core_can_use(feature, effective_tier, _features=FEATURES)
try:
return TIERS.index(effective_tier) >= TIERS.index(required)
except ValueError:
return False # invalid tier string
def tier_label(feature: str, has_byok: bool = False) -> str:
"""Return a display label for a locked feature, or '' if free/unlocked."""
if has_byok and feature in BYOK_UNLOCKABLE:
return ""
raw = _core_tier_label(feature, _features=FEATURES)
if not raw or raw == "free":
required = FEATURES.get(feature)
if required is None:
return ""
return "🔒 Paid" if raw == "paid" else "⭐ Premium"
return "🔒 Paid" if required == "paid" else "⭐ Premium"
def effective_tier(

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,700 @@
# Jobgether Integration Implementation Plan
> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Filter Jobgether listings out of all other scrapers, add a dedicated Jobgether scraper and URL scraper (Playwright-based), and add recruiter-aware cover letter framing for Jobgether jobs.
**Architecture:** Blocklist config handles filtering with zero code changes. A new `_scrape_jobgether()` in `scrape_url.py` handles manual URL imports via Playwright with URL slug fallback. A new `scripts/custom_boards/jobgether.py` handles discovery. Cover letter framing is an `is_jobgether` flag threaded from `task_runner.py``generate()``build_prompt()`.
**Tech Stack:** Python, Playwright (already installed), SQLite, PyTest, YAML config
**Spec:** `/Library/Development/CircuitForge/peregrine/docs/superpowers/specs/2026-03-15-jobgether-integration-design.md`
---
## Worktree Setup
- [ ] **Create worktree for this feature**
```bash
cd /Library/Development/CircuitForge/peregrine
git worktree add .worktrees/jobgether-integration -b feature/jobgether-integration
```
All implementation work happens in `/Library/Development/CircuitForge/peregrine/.worktrees/jobgether-integration/`.
---
## Chunk 1: Blocklist filter + scrape_url.py
### Task 1: Add Jobgether to blocklist
**Files:**
- Modify: `/Library/Development/CircuitForge/peregrine/config/blocklist.yaml`
- [ ] **Step 1: Edit blocklist.yaml**
```yaml
companies:
- jobgether
```
- [ ] **Step 2: Verify the existing `_is_blocklisted` test passes (or write one)**
Check `/Library/Development/CircuitForge/peregrine/tests/test_discover.py` for existing blocklist tests. If none cover company matching, add:
```python
def test_is_blocklisted_jobgether():
from scripts.discover import _is_blocklisted
blocklist = {"companies": ["jobgether"], "industries": [], "locations": []}
assert _is_blocklisted({"company": "Jobgether", "location": "", "description": ""}, blocklist)
assert _is_blocklisted({"company": "jobgether inc", "location": "", "description": ""}, blocklist)
assert not _is_blocklisted({"company": "Acme Corp", "location": "", "description": ""}, blocklist)
```
Run: `conda run -n job-seeker python -m pytest tests/test_discover.py -v -k "blocklist"`
Expected: PASS
- [ ] **Step 3: Commit**
```bash
git add config/blocklist.yaml tests/test_discover.py
git commit -m "feat: filter Jobgether listings via blocklist"
```
---
### Task 2: Add Jobgether detection to scrape_url.py
**Files:**
- Modify: `/Library/Development/CircuitForge/peregrine/scripts/scrape_url.py`
- Modify: `/Library/Development/CircuitForge/peregrine/tests/test_scrape_url.py`
- [ ] **Step 1: Write failing tests**
In `/Library/Development/CircuitForge/peregrine/tests/test_scrape_url.py`, add:
```python
def test_detect_board_jobgether():
from scripts.scrape_url import _detect_board
assert _detect_board("https://jobgether.com/offer/69b42d9d24d79271ee0618e8-csm---resware") == "jobgether"
assert _detect_board("https://www.jobgether.com/offer/abc-role---company") == "jobgether"
def test_jobgether_slug_company_extraction():
from scripts.scrape_url import _company_from_jobgether_url
assert _company_from_jobgether_url(
"https://jobgether.com/offer/69b42d9d24d79271ee0618e8-customer-success-manager---resware"
) == "Resware"
assert _company_from_jobgether_url(
"https://jobgether.com/offer/abc123-director-of-cs---acme-corp"
) == "Acme Corp"
assert _company_from_jobgether_url(
"https://jobgether.com/offer/abc123-no-separator-here"
) == ""
def test_scrape_jobgether_no_playwright(tmp_path):
"""When Playwright is unavailable, _scrape_jobgether falls back to URL slug for company."""
# Patch playwright.sync_api to None in sys.modules so the local import inside
# _scrape_jobgether raises ImportError at call time (local imports run at call time,
# not at module load time — so no reload needed).
import sys
import unittest.mock as mock
url = "https://jobgether.com/offer/69b42d9d24d79271ee0618e8-customer-success-manager---resware"
with mock.patch.dict(sys.modules, {"playwright": None, "playwright.sync_api": None}):
from scripts.scrape_url import _scrape_jobgether
result = _scrape_jobgether(url)
assert result.get("company") == "Resware"
assert result.get("source") == "jobgether"
```
Run: `conda run -n job-seeker python -m pytest tests/test_scrape_url.py::test_detect_board_jobgether tests/test_scrape_url.py::test_jobgether_slug_company_extraction tests/test_scrape_url.py::test_scrape_jobgether_no_playwright -v`
Expected: FAIL (functions not yet defined)
- [ ] **Step 2: Add `_company_from_jobgether_url()` to scrape_url.py**
Add after the `_STRIP_PARAMS` block (around line 34):
```python
def _company_from_jobgether_url(url: str) -> str:
"""Extract company name from Jobgether offer URL slug.
Slug format: /offer/{24-hex-hash}-{title-slug}---{company-slug}
Triple-dash separator delimits title from company.
Returns title-cased company name, or "" if pattern not found.
"""
m = re.search(r"---([^/?]+)$", urlparse(url).path)
if not m:
print(f"[scrape_url] Jobgether URL slug: no company separator found in {url}")
return ""
return m.group(1).replace("-", " ").title()
```
- [ ] **Step 3: Add `"jobgether"` branch to `_detect_board()`**
In `/Library/Development/CircuitForge/peregrine/scripts/scrape_url.py`, modify `_detect_board()` (add before `return "generic"`):
```python
if "jobgether.com" in url_lower:
return "jobgether"
```
- [ ] **Step 4: Add `_scrape_jobgether()` function**
Add after `_scrape_glassdoor()` (around line 137):
```python
def _scrape_jobgether(url: str) -> dict:
"""Scrape a Jobgether offer page using Playwright to bypass 403.
Falls back to URL slug for company name when Playwright is unavailable.
Does not use requests — no raise_for_status().
"""
try:
from playwright.sync_api import sync_playwright
except ImportError:
company = _company_from_jobgether_url(url)
if company:
print(f"[scrape_url] Jobgether: Playwright not installed, using slug fallback → {company}")
return {"company": company, "source": "jobgether"} if company else {}
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
try:
ctx = browser.new_context(user_agent=_HEADERS["User-Agent"])
page = ctx.new_page()
page.goto(url, timeout=30_000)
page.wait_for_load_state("networkidle", timeout=20_000)
result = page.evaluate("""() => {
const title = document.querySelector('h1')?.textContent?.trim() || '';
const company = document.querySelector('[class*="company"], [class*="employer"], [data-testid*="company"]')
?.textContent?.trim() || '';
const location = document.querySelector('[class*="location"], [data-testid*="location"]')
?.textContent?.trim() || '';
const desc = document.querySelector('[class*="description"], [class*="job-desc"], article')
?.innerText?.trim() || '';
return { title, company, location, description: desc };
}""")
finally:
browser.close()
# Fall back to slug for company if DOM extraction missed it
if not result.get("company"):
result["company"] = _company_from_jobgether_url(url)
result["source"] = "jobgether"
return {k: v for k, v in result.items() if v}
except Exception as exc:
print(f"[scrape_url] Jobgether Playwright error for {url}: {exc}")
# Last resort: slug fallback
company = _company_from_jobgether_url(url)
return {"company": company, "source": "jobgether"} if company else {}
```
> ⚠️ **The CSS selectors in the `page.evaluate()` call are placeholders.** Before committing, inspect `https://jobgether.com/offer/` in a browser to find the actual class names for title, company, location, and description. Update the selectors accordingly.
- [ ] **Step 5: Add dispatch branch in `scrape_job_url()`**
In the `if board == "linkedin":` dispatch chain (around line 208), add before the `else`:
```python
elif board == "jobgether":
fields = _scrape_jobgether(url)
```
- [ ] **Step 6: Run tests to verify they pass**
Run: `conda run -n job-seeker python -m pytest tests/test_scrape_url.py -v`
Expected: All PASS (including pre-existing tests)
- [ ] **Step 7: Commit**
```bash
git add scripts/scrape_url.py tests/test_scrape_url.py
git commit -m "feat: add Jobgether URL detection and scraper to scrape_url.py"
```
---
## Chunk 2: Jobgether custom board scraper
> ⚠️ **Pre-condition:** Before writing the scraper, inspect `https://jobgether.com/remote-jobs` live to determine the actual URL/filter param format and DOM card selectors. Use the Playwright MCP browser tool or Chrome devtools. Record: (1) the query param for job title search, (2) the job card CSS selectors for title, company, URL, location, salary.
### Task 3: Inspect Jobgether search live
**Files:** None (research step)
- [ ] **Step 1: Navigate to Jobgether remote jobs and inspect search params**
Using browser devtools or Playwright network capture, navigate to `https://jobgether.com/remote-jobs`, search for "Customer Success Manager", and capture:
- The resulting URL (query params)
- Network requests (XHR/fetch) if the page uses API calls
- CSS selectors for job card elements
Record findings here before proceeding.
- [ ] **Step 2: Test a Playwright page.evaluate() extraction manually**
```python
# Run interactively to validate selectors
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=False) # headless=False to see the page
page = browser.new_page()
page.goto("https://jobgether.com/remote-jobs")
page.wait_for_load_state("networkidle")
# Test your selectors here
cards = page.query_selector_all("[YOUR_CARD_SELECTOR]")
print(len(cards))
browser.close()
```
---
### Task 4: Write jobgether.py scraper
**Files:**
- Create: `/Library/Development/CircuitForge/peregrine/scripts/custom_boards/jobgether.py`
- Modify: `/Library/Development/CircuitForge/peregrine/tests/test_discover.py` (or create `tests/test_jobgether.py`)
- [ ] **Step 1: Write failing test**
In `/Library/Development/CircuitForge/peregrine/tests/test_discover.py` (or a new `tests/test_jobgether.py`):
```python
def test_jobgether_scraper_returns_empty_on_missing_playwright(monkeypatch):
"""Graceful fallback when Playwright is unavailable."""
import scripts.custom_boards.jobgether as jg
monkeypatch.setattr("scripts.custom_boards.jobgether.sync_playwright", None)
result = jg.scrape({"titles": ["Customer Success Manager"]}, "Remote", results_wanted=5)
assert result == []
def test_jobgether_scraper_respects_results_wanted(monkeypatch):
"""Scraper caps results at results_wanted."""
import scripts.custom_boards.jobgether as jg
fake_jobs = [
{"title": f"CSM {i}", "href": f"/offer/abc{i}-csm---acme", "company": f"Acme {i}",
"location": "Remote", "is_remote": True, "salary": ""}
for i in range(20)
]
class FakePage:
def goto(self, *a, **kw): pass
def wait_for_load_state(self, *a, **kw): pass
def evaluate(self, _): return fake_jobs
class FakeCtx:
def new_page(self): return FakePage()
class FakeBrowser:
def new_context(self, **kw): return FakeCtx()
def close(self): pass
class FakeChromium:
def launch(self, **kw): return FakeBrowser()
class FakeP:
chromium = FakeChromium()
def __enter__(self): return self
def __exit__(self, *a): pass
monkeypatch.setattr("scripts.custom_boards.jobgether.sync_playwright", lambda: FakeP())
result = jg.scrape({"titles": ["CSM"]}, "Remote", results_wanted=5)
assert len(result) <= 5
```
Run: `conda run -n job-seeker python -m pytest tests/ -v -k "jobgether"`
Expected: FAIL (module not found)
- [ ] **Step 2: Create `scripts/custom_boards/jobgether.py`**
```python
"""Jobgether scraper — Playwright-based (requires chromium installed).
Jobgether (jobgether.com) is a remote-work job aggregator. It blocks plain
requests with 403, so we use Playwright to render the page and extract cards.
Install Playwright: conda run -n job-seeker pip install playwright &&
conda run -n job-seeker python -m playwright install chromium
Returns a list of dicts compatible with scripts.db.insert_job().
"""
from __future__ import annotations
import re
import time
from typing import Any
_BASE = "https://jobgether.com"
_SEARCH_PATH = "/remote-jobs"
# TODO: Replace with confirmed query param key after live inspection (Task 3)
_QUERY_PARAM = "search"
# Module-level import so tests can monkeypatch scripts.custom_boards.jobgether.sync_playwright
try:
from playwright.sync_api import sync_playwright
except ImportError:
sync_playwright = None
def scrape(profile: dict, location: str, results_wanted: int = 50) -> list[dict]:
"""
Scrape job listings from Jobgether using Playwright.
Args:
profile: Search profile dict (uses 'titles').
location: Location string — Jobgether is remote-focused; location used
only if the site exposes a location filter.
results_wanted: Maximum results to return across all titles.
Returns:
List of job dicts with keys: title, company, url, source, location,
is_remote, salary, description.
"""
if sync_playwright is None:
print(
" [jobgether] playwright not installed.\n"
" Install: conda run -n job-seeker pip install playwright && "
"conda run -n job-seeker python -m playwright install chromium"
)
return []
results: list[dict] = []
seen_urls: set[str] = set()
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
ctx = browser.new_context(
user_agent=(
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
)
)
page = ctx.new_page()
for title in profile.get("titles", []):
if len(results) >= results_wanted:
break
# TODO: Confirm URL param format from live inspection (Task 3)
url = f"{_BASE}{_SEARCH_PATH}?{_QUERY_PARAM}={title.replace(' ', '+')}"
try:
page.goto(url, timeout=30_000)
page.wait_for_load_state("networkidle", timeout=20_000)
except Exception as exc:
print(f" [jobgether] Page load error for '{title}': {exc}")
continue
# TODO: Replace JS selector with confirmed card selector from Task 3
try:
raw_jobs: list[dict[str, Any]] = page.evaluate(_extract_jobs_js())
except Exception as exc:
print(f" [jobgether] JS extract error for '{title}': {exc}")
continue
if not raw_jobs:
print(f" [jobgether] No cards found for '{title}' — selector may need updating")
continue
for job in raw_jobs:
href = job.get("href", "")
if not href:
continue
full_url = _BASE + href if href.startswith("/") else href
if full_url in seen_urls:
continue
seen_urls.add(full_url)
results.append({
"title": job.get("title", ""),
"company": job.get("company", ""),
"url": full_url,
"source": "jobgether",
"location": job.get("location") or "Remote",
"is_remote": True, # Jobgether is remote-focused
"salary": job.get("salary") or "",
"description": "", # not in card view; scrape_url fills in
})
if len(results) >= results_wanted:
break
time.sleep(1) # polite pacing between titles
browser.close()
return results[:results_wanted]
def _extract_jobs_js() -> str:
"""JS to run in page context — extracts job data from rendered card elements.
TODO: Replace selectors with confirmed values from Task 3 live inspection.
"""
return """() => {
// TODO: replace '[class*=job-card]' with confirmed card selector
const cards = document.querySelectorAll('[class*="job-card"], [data-testid*="job"]');
return Array.from(cards).map(card => {
// TODO: replace these selectors with confirmed values
const titleEl = card.querySelector('h2, h3, [class*="title"]');
const companyEl = card.querySelector('[class*="company"], [class*="employer"]');
const linkEl = card.querySelector('a');
const salaryEl = card.querySelector('[class*="salary"]');
const locationEl = card.querySelector('[class*="location"]');
return {
title: titleEl ? titleEl.textContent.trim() : null,
company: companyEl ? companyEl.textContent.trim() : null,
href: linkEl ? linkEl.getAttribute('href') : null,
salary: salaryEl ? salaryEl.textContent.trim() : null,
location: locationEl ? locationEl.textContent.trim() : null,
is_remote: true,
};
}).filter(j => j.title && j.href);
}"""
```
- [ ] **Step 3: Run tests**
Run: `conda run -n job-seeker python -m pytest tests/ -v -k "jobgether"`
Expected: PASS
- [ ] **Step 4: Commit**
```bash
git add scripts/custom_boards/jobgether.py tests/test_discover.py
git commit -m "feat: add Jobgether custom board scraper (selectors pending live inspection)"
```
---
## Chunk 3: Registration, config, cover letter framing
### Task 5: Register scraper in discover.py + update search_profiles.yaml
**Files:**
- Modify: `/Library/Development/CircuitForge/peregrine/scripts/discover.py`
- Modify: `/Library/Development/CircuitForge/peregrine/config/search_profiles.yaml`
- Modify: `/Library/Development/CircuitForge/peregrine/config/search_profiles.yaml.example` (if it exists)
- [ ] **Step 1: Add import to discover.py import block (lines 2022)**
`jobgether.py` absorbs the Playwright `ImportError` internally (module-level `try/except`), so it always imports successfully. Match the existing pattern exactly:
```python
from scripts.custom_boards import jobgether as _jobgether
```
- [ ] **Step 2: Add to CUSTOM_SCRAPERS dict literal (lines 3034)**
```python
CUSTOM_SCRAPERS: dict[str, object] = {
"adzuna": _adzuna.scrape,
"theladders": _theladders.scrape,
"craigslist": _craigslist.scrape,
"jobgether": _jobgether.scrape,
}
```
When Playwright is absent, `_jobgether.scrape()` returns `[]` gracefully — no special guard needed in `discover.py`.
- [ ] **Step 3: Add `jobgether` to remote-eligible profiles in search_profiles.yaml**
Add `- jobgether` to the `custom_boards` list for every profile that has `Remote` in its `locations`. Based on the current file, that means: `cs_leadership`, `music_industry`, `animal_welfare`, `education`. Do NOT add it to `default` (locations: San Francisco CA only).
- [ ] **Step 4: Run discover tests**
Run: `conda run -n job-seeker python -m pytest tests/test_discover.py -v`
Expected: All PASS
- [ ] **Step 5: Commit**
```bash
git add scripts/discover.py config/search_profiles.yaml
git commit -m "feat: register Jobgether scraper and add to remote search profiles"
```
---
### Task 6: Cover letter recruiter framing
**Files:**
- Modify: `/Library/Development/CircuitForge/peregrine/scripts/generate_cover_letter.py`
- Modify: `/Library/Development/CircuitForge/peregrine/scripts/task_runner.py`
- Modify: `/Library/Development/CircuitForge/peregrine/tests/test_match.py` or add `tests/test_cover_letter.py`
- [ ] **Step 1: Write failing test**
Create or add to `/Library/Development/CircuitForge/peregrine/tests/test_cover_letter.py`:
```python
def test_build_prompt_jobgether_framing_unknown_company():
from scripts.generate_cover_letter import build_prompt
prompt = build_prompt(
title="Customer Success Manager",
company="Jobgether",
description="CSM role at an undisclosed company.",
examples=[],
is_jobgether=True,
)
assert "Your client" in prompt
assert "recruiter" in prompt.lower() or "jobgether" in prompt.lower()
def test_build_prompt_jobgether_framing_known_company():
from scripts.generate_cover_letter import build_prompt
prompt = build_prompt(
title="Customer Success Manager",
company="Resware",
description="CSM role at Resware.",
examples=[],
is_jobgether=True,
)
assert "Your client at Resware" in prompt
def test_build_prompt_no_jobgether_framing_by_default():
from scripts.generate_cover_letter import build_prompt
prompt = build_prompt(
title="Customer Success Manager",
company="Acme Corp",
description="CSM role.",
examples=[],
)
assert "Your client" not in prompt
```
Run: `conda run -n job-seeker python -m pytest tests/test_cover_letter.py -v`
Expected: FAIL
- [ ] **Step 2: Add `is_jobgether` to `build_prompt()` in generate_cover_letter.py**
Modify the `build_prompt()` signature (line 186):
```python
def build_prompt(
title: str,
company: str,
description: str,
examples: list[dict],
mission_hint: str | None = None,
is_jobgether: bool = False,
) -> str:
```
Add the recruiter hint block after the `mission_hint` block (after line 203):
```python
if is_jobgether:
if company and company.lower() != "jobgether":
recruiter_note = (
f"🤝 Recruiter context: This listing is posted by Jobgether on behalf of "
f"{company}. Address the cover letter to the Jobgether recruiter, not directly "
f"to the hiring company. Use framing like 'Your client at {company} will "
f"appreciate...' rather than addressing {company} directly. The role "
f"requirements are those of the actual employer."
)
else:
recruiter_note = (
"🤝 Recruiter context: This listing is posted by Jobgether on behalf of an "
"undisclosed employer. Address the cover letter to the Jobgether recruiter. "
"Use framing like 'Your client will appreciate...' rather than addressing "
"the company directly."
)
parts.append(f"{recruiter_note}\n")
```
- [ ] **Step 3: Add `is_jobgether` to `generate()` signature**
Modify `generate()` (line 233):
```python
def generate(
title: str,
company: str,
description: str = "",
previous_result: str = "",
feedback: str = "",
is_jobgether: bool = False,
_router=None,
) -> str:
```
Pass it through to `build_prompt()` (line 254):
```python
prompt = build_prompt(title, company, description, examples,
mission_hint=mission_hint, is_jobgether=is_jobgether)
```
- [ ] **Step 4: Pass `is_jobgether` from task_runner.py**
In `/Library/Development/CircuitForge/peregrine/scripts/task_runner.py`, modify the `generate()` call inside the `cover_letter` task block (`elif task_type == "cover_letter":` starts at line 152; the `generate()` call is at ~line 156):
```python
elif task_type == "cover_letter":
import json as _json
p = _json.loads(params or "{}")
from scripts.generate_cover_letter import generate
result = generate(
job.get("title", ""),
job.get("company", ""),
job.get("description", ""),
previous_result=p.get("previous_result", ""),
feedback=p.get("feedback", ""),
is_jobgether=job.get("source") == "jobgether",
)
update_cover_letter(db_path, job_id, result)
```
- [ ] **Step 5: Run tests**
Run: `conda run -n job-seeker python -m pytest tests/test_cover_letter.py -v`
Expected: All PASS
- [ ] **Step 6: Run full test suite**
Run: `conda run -n job-seeker python -m pytest tests/ -v`
Expected: All PASS
- [ ] **Step 7: Commit**
```bash
git add scripts/generate_cover_letter.py scripts/task_runner.py tests/test_cover_letter.py
git commit -m "feat: add Jobgether recruiter framing to cover letter generation"
```
---
## Final: Merge
- [ ] **Merge worktree branch to main**
```bash
cd /Library/Development/CircuitForge/peregrine
git merge feature/jobgether-integration
git worktree remove .worktrees/jobgether-integration
```
- [ ] **Push to remote**
```bash
git push origin main
```
---
## Manual verification after merge
1. Add the stuck Jobgether manual import (job 2286) — delete the old stuck row and re-add the URL via "Add Jobs by URL" in the Home page. Verify the scraper resolves company = "Resware".
2. Run a short discovery (`discover.py` with `results_per_board: 5`) and confirm no `company="Jobgether"` rows appear in `staging.db`.
3. Generate a cover letter for a Jobgether-sourced job and confirm recruiter framing appears.

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,477 @@
# LLM Queue Optimizer — Design Spec
**Date:** 2026-03-14
**Branch:** `feature/llm-queue-optimizer`
**Closes:** [#2](https://git.opensourcesolarpunk.com/Circuit-Forge/peregrine/issues/2)
**Author:** pyr0ball
---
## Problem
On single-GPU and CPU-only systems, the background task runner spawns a daemon thread for every task immediately on submission. When a user approves N jobs at once, N threads race to load their respective LLM models simultaneously, causing repeated model swaps and significant latency overhead.
The root issue is that `submit_task()` is a spawn-per-task model with no scheduling layer. SQLite's `background_tasks` table is a status log, not a consumed work queue.
Additionally, on restart all `queued` and `running` tasks are cleared to `failed` (inline SQL in `app.py`'s `_startup()`), discarding pending work that had not yet started executing.
---
## Goals
- Eliminate unnecessary model switching by batching LLM tasks by type
- Allow concurrent model execution when VRAM permits multiple models simultaneously
- Preserve FIFO ordering within each task type
- Survive process restarts — `queued` tasks resume after restart; only `running` tasks (whose results are unknown) are reset to `failed`
- Apply to all tiers (no tier gating)
- Keep non-LLM tasks (discovery, email sync, scrape, enrich) unaffected — they continue to spawn free threads
---
## Non-Goals
- Changing the LLM router fallback chain
- Adding new task types
- Tier gating on the scheduler
- Persistent task history in memory
- Durability for non-LLM task types (discovery, email_sync, etc. — these do not survive restarts, same as current behavior)
- Dynamic VRAM tracking — `_available_vram` is read once at startup and not refreshed (see Known Limitations)
---
## Architecture
### Task Classification
```python
LLM_TASK_TYPES = {"cover_letter", "company_research", "wizard_generate"}
```
The routing rule is: if `task_type in LLM_TASK_TYPES`, route through the scheduler. Everything else spawns a free thread unchanged from the current implementation. **Future task types default to bypass mode** unless explicitly added to `LLM_TASK_TYPES` — which is the safe default (bypass = current behavior).
`LLM_TASK_TYPES` is defined in `scripts/task_scheduler.py` and imported by `scripts/task_runner.py` for routing. This import direction (task_runner imports from task_scheduler) avoids circular imports because `task_scheduler.py` does **not** import from `task_runner.py`.
Current non-LLM types (all bypass scheduler): `discovery`, `email_sync`, `scrape_url`, `enrich_descriptions`, `enrich_craigslist`, `prepare_training`.
### Routing in `submit_task()` — No Circular Import
The routing split lives entirely in `submit_task()` in `task_runner.py`:
```python
def submit_task(db_path, task_type, job_id=None, params=None):
task_id, is_new = insert_task(db_path, task_type, job_id or 0, params=params)
if is_new:
from scripts.task_scheduler import get_scheduler, LLM_TASK_TYPES
if task_type in LLM_TASK_TYPES:
get_scheduler(db_path).enqueue(task_id, task_type, job_id or 0, params)
else:
t = threading.Thread(
target=_run_task,
args=(db_path, task_id, task_type, job_id or 0, params),
daemon=True,
)
t.start()
return task_id, is_new
```
`TaskScheduler.enqueue()` only handles LLM task types and never imports or calls `_run_task`. This eliminates any circular import between `task_runner` and `task_scheduler`.
### Component Overview
```
submit_task()
├── task_type in LLM_TASK_TYPES?
│ │ yes │ no
│ ▼ ▼
│ get_scheduler().enqueue() spawn free thread (unchanged)
│ │
│ ▼
│ per-type deque
│ │
│ ▼
│ Scheduler loop (daemon thread)
│ (wakes on enqueue or batch completion)
│ │
│ Sort eligible types by queue depth (desc)
│ │
│ For each type:
│ reserved_vram + budget[type] ≤ available_vram?
│ │ yes │ no
│ ▼ ▼
│ Start batch worker skip (wait for slot)
│ (serial: one task at a time)
│ │
│ Batch worker signals done → scheduler re-evaluates
```
### New File: `scripts/task_scheduler.py`
**State:**
| Attribute | Type | Purpose |
|---|---|---|
| `_queues` | `dict[str, deque[TaskSpec]]` | Per-type pending task deques |
| `_active` | `dict[str, Thread]` | Currently running batch worker per type |
| `_budgets` | `dict[str, float]` | VRAM budget per task type (GB). Loaded at construction by merging `DEFAULT_VRAM_BUDGETS` with `scheduler.vram_budgets` from `config/llm.yaml`. Config path derived from `db_path` (e.g. `db_path.parent.parent / "config/llm.yaml"`). Missing file or key → defaults used as-is. At construction, a warning is logged for any type in `LLM_TASK_TYPES` with no budget entry after the merge. |
| `_reserved_vram` | `float` | Sum of `_budgets` values for currently active type batches |
| `_available_vram` | `float` | Total VRAM from `get_gpus()` summed across all GPUs at construction; 999.0 on CPU-only systems. Static — not refreshed after startup (see Known Limitations). |
| `_max_queue_depth` | `int` | Max tasks per type queue before drops. From `scheduler.max_queue_depth` in config; default 500. |
| `_lock` | `threading.Lock` | Protects all mutable scheduler state |
| `_wake` | `threading.Event` | Pulsed on enqueue or batch completion |
| `_stop` | `threading.Event` | Set by `shutdown()` to terminate the loop |
**Default VRAM budgets (module-level constant):**
```python
DEFAULT_VRAM_BUDGETS: dict[str, float] = {
"cover_letter": 2.5, # alex-cover-writer:latest (~2GB GGUF + headroom)
"company_research": 5.0, # llama3.1:8b or vllm model
"wizard_generate": 2.5, # same model family as cover_letter
}
```
At construction, the scheduler validates that every type in `LLM_TASK_TYPES` has an entry
in the merged `_budgets`. If any type is missing, a warning is logged:
```
WARNING task_scheduler: No VRAM budget defined for LLM task type 'foo' — defaulting to 0.0 GB (unlimited concurrency for this type)
```
**Scheduler loop:**
```python
while not _stop.is_set():
_wake.wait(timeout=30)
_wake.clear()
with _lock:
# Defense in depth: reap dead threads not yet cleaned by their finally block.
# In the normal path, a batch worker's finally block calls _active.pop() and
# decrements _reserved_vram BEFORE firing _wake — so by the time we scan here,
# the entry is already gone and there is no double-decrement risk.
# This reap only catches threads killed externally (daemon exit on shutdown).
for t, thread in list(_active.items()):
if not thread.is_alive():
_reserved_vram -= _budgets.get(t, 0)
del _active[t]
# Start new batches where VRAM allows
candidates = sorted(
[t for t in _queues if _queues[t] and t not in _active],
key=lambda t: len(_queues[t]),
reverse=True,
)
for task_type in candidates:
budget = _budgets.get(task_type, 0)
if _reserved_vram + budget <= _available_vram:
thread = Thread(target=_batch_worker, args=(task_type,), daemon=True)
_active[task_type] = thread
_reserved_vram += budget
thread.start()
```
**Batch worker:**
The `finally` block is the single authoritative path for releasing `_reserved_vram` and
removing the entry from `_active`. Because `_active.pop` runs in `finally` before
`_wake.set()`, the scheduler loop's dead-thread scan will never find this entry —
no double-decrement is possible in the normal execution path.
```python
def _batch_worker(task_type: str) -> None:
try:
while True:
with _lock:
if not _queues[task_type]:
break
task = _queues[task_type].popleft()
_run_task(db_path, task.id, task_type, task.job_id, task.params)
finally:
with _lock:
_active.pop(task_type, None)
_reserved_vram -= _budgets.get(task_type, 0)
_wake.set()
```
`_run_task` here refers to `task_runner._run_task`, passed in as a callable at
construction (e.g. `self._run_task = run_task_fn`). The caller (`task_runner.py`)
passes `_run_task` when constructing the scheduler, avoiding any import of `task_runner`
from within `task_scheduler`.
**`enqueue()` method:**
`enqueue()` only accepts LLM task types. Non-LLM routing is handled in `submit_task()`
before `enqueue()` is called (see Routing section above).
```python
def enqueue(self, task_id: int, task_type: str, job_id: int, params: str | None) -> None:
with self._lock:
q = self._queues.setdefault(task_type, deque())
if len(q) >= self._max_queue_depth:
logger.warning(
"Queue depth limit reached for %s (max=%d) — task %d dropped",
task_type, self._max_queue_depth, task_id,
)
update_task_status(self._db_path, task_id, "failed",
error="Queue depth limit reached")
return
q.append(TaskSpec(task_id, job_id, params))
self._wake.set()
```
When a task is dropped at the depth limit, `update_task_status()` marks it `failed` in
SQLite immediately — the row inserted by `insert_task()` is never left as a permanent
ghost in `queued` state.
**Singleton access — thread-safe initialization:**
```python
_scheduler: TaskScheduler | None = None
_scheduler_lock = threading.Lock()
def get_scheduler(db_path: Path) -> TaskScheduler:
global _scheduler
if _scheduler is None: # fast path — avoids lock on steady state
with _scheduler_lock:
if _scheduler is None: # re-check under lock (double-checked locking)
_scheduler = TaskScheduler(db_path)
_scheduler.start()
return _scheduler
def reset_scheduler() -> None:
"""Tear down and clear singleton. Test teardown only."""
global _scheduler
with _scheduler_lock:
if _scheduler:
_scheduler.shutdown()
_scheduler = None
```
The safety guarantee comes from the **inner `with _scheduler_lock:` block and re-check**,
not from GIL atomicity. The outer `if _scheduler is None` is a performance optimization
(avoid acquiring the lock on every `submit_task()` call once the scheduler is running).
Two threads racing at startup will both pass the outer check, but only one will win the
inner lock and construct the scheduler; the other will see a non-None value on its
inner re-check and return the already-constructed instance.
---
## Required Call Ordering in `app.py`
`reset_running_tasks()` **must complete before** `get_scheduler()` is ever called.
The scheduler's durability query reads `status='queued'` rows; if `reset_running_tasks()`
has not yet run, a row stuck in `status='running'` from a prior crash would be loaded
into the deque and re-executed, producing a duplicate result.
In practice, the first call to `get_scheduler()` is triggered by the `submit_task()` call
inside `_startup()`'s SearXNG auto-recovery block — not by a user action. The ordering
holds because `reset_running_tasks()` is called on an earlier line within the same
`_startup()` function body. **Do not reorder these calls.**
```python
@st.cache_resource
def _startup() -> None:
# Step 1: Reset interrupted tasks — MUST come first
from scripts.db import reset_running_tasks
reset_running_tasks(get_db_path())
# Step 2 (later in same function): SearXNG re-queue calls submit_task(),
# which triggers get_scheduler() for the first time. Ordering is guaranteed
# because _startup() runs synchronously and step 1 is already complete.
conn = sqlite3.connect(get_db_path())
# ... existing SearXNG re-queue logic using conn ...
conn.close()
```
---
## Changes to Existing Files
### `scripts/task_runner.py`
`submit_task()` gains routing logic; `_run_task` is passed to the scheduler at first call:
```python
def submit_task(db_path, task_type, job_id=None, params=None):
task_id, is_new = insert_task(db_path, task_type, job_id or 0, params=params)
if is_new:
from scripts.task_scheduler import get_scheduler, LLM_TASK_TYPES
if task_type in LLM_TASK_TYPES:
get_scheduler(db_path, run_task_fn=_run_task).enqueue(
task_id, task_type, job_id or 0, params
)
else:
t = threading.Thread(
target=_run_task,
args=(db_path, task_id, task_type, job_id or 0, params),
daemon=True,
)
t.start()
return task_id, is_new
```
`get_scheduler()` accepts `run_task_fn` only on first call (when constructing); subsequent
calls ignore it (singleton already initialized). `_run_task()` and all handler branches
remain unchanged.
### `scripts/db.py`
Add `reset_running_tasks()` alongside the existing `kill_stuck_tasks()`. Like
`kill_stuck_tasks()`, it uses a plain `sqlite3.connect()` — consistent with the
existing pattern in this file, and appropriate because this call happens before the
app's connection pooling is established:
```python
def reset_running_tasks(db_path: Path = DEFAULT_DB) -> int:
"""On restart: mark in-flight tasks failed. Queued tasks survive for the scheduler."""
conn = sqlite3.connect(db_path)
count = conn.execute(
"UPDATE background_tasks SET status='failed', error='Interrupted by restart',"
" finished_at=datetime('now') WHERE status='running'"
).rowcount
conn.commit()
conn.close()
return count
```
### `app/app.py`
Inside `_startup()`, replace the inline SQL block that wipes both `queued` and `running`
rows with a call to `reset_running_tasks()`. The replacement must be the **first operation
in `_startup()`** — before the SearXNG re-queue logic that calls `submit_task()`:
```python
# REMOVE this block:
conn.execute(
"UPDATE background_tasks SET status='failed', error='Interrupted by server restart',"
" finished_at=datetime('now') WHERE status IN ('queued','running')"
)
# ADD at the top of _startup(), before any submit_task() calls:
from scripts.db import reset_running_tasks
reset_running_tasks(get_db_path())
```
The existing `conn` used for subsequent SearXNG logic is unaffected — `reset_running_tasks()`
opens and closes its own connection.
### `config/llm.yaml.example`
Add `scheduler:` section:
```yaml
scheduler:
vram_budgets:
cover_letter: 2.5 # alex-cover-writer:latest (~2GB GGUF + headroom)
company_research: 5.0 # llama3.1:8b or vllm model
wizard_generate: 2.5 # same model family as cover_letter
max_queue_depth: 500
```
---
## Data Model
No schema changes. The existing `background_tasks` table supports all scheduler needs:
| Column | Scheduler use |
|---|---|
| `task_type` | Queue routing — determines which deque receives the task |
| `status` | `queued` → in deque; `running` → batch worker executing; `completed`/`failed` → done |
| `created_at` | FIFO ordering within type (durability startup query sorts by this) |
| `params` | Passed through to `_run_task()` unchanged |
---
## Durability
Scope: **LLM task types only** (`cover_letter`, `company_research`, `wizard_generate`).
Non-LLM tasks do not survive restarts, same as current behavior.
On construction, `TaskScheduler.__init__()` queries:
```sql
SELECT id, task_type, job_id, params
FROM background_tasks
WHERE status = 'queued'
AND task_type IN ('cover_letter', 'company_research', 'wizard_generate')
ORDER BY created_at ASC
```
Results are pushed onto their respective deques. This query runs inside `__init__` before
`start()` is called (before the scheduler loop thread exists), so there is no concurrency
concern with deque population.
`running` rows are reset to `failed` by `reset_running_tasks()` before `get_scheduler()`
is called — see Required Call Ordering above.
---
## Known Limitations
**Static `_available_vram`:** Total GPU VRAM is read from `get_gpus()` once at scheduler
construction and never refreshed. Changes after startup — another process releasing VRAM,
a GPU going offline, Ollama unloading a model — are not reflected. The scheduler's
correctness depends on per-task VRAM budgets being conservative estimates of **peak model
footprint** (not free VRAM at a given moment). On a system where Ollama and vLLM share
the GPU, budgets should account for both models potentially resident simultaneously.
Dynamic VRAM polling is a future enhancement.
---
## Memory Safety
- **`finally` block owns VRAM release** — batch worker always decrements `_reserved_vram`
and removes its `_active` entry before firing `_wake`, even on exception. The scheduler
loop's dead-thread scan is defense in depth for externally-killed daemons only; it cannot
double-decrement because `_active.pop` in `finally` runs first.
- **Max queue depth with DB cleanup**`enqueue()` rejects tasks past `max_queue_depth`,
logs a warning, and immediately marks the dropped task `failed` in SQLite to prevent
permanent ghost rows in `queued` state.
- **No in-memory history** — deques hold only pending `TaskSpec` namedtuples. Completed
and failed state lives exclusively in SQLite. Memory footprint is `O(pending tasks)`.
- **Thread-safe singleton** — double-checked locking with `_scheduler_lock` prevents
double-construction. Safety comes from the inner lock + re-check; the outer `None`
check is a performance optimization only.
- **Missing budget warning** — any `LLM_TASK_TYPES` entry with no budget entry after
config merge logs a warning at construction; defaults to 0.0 GB (unlimited concurrency
for that type). This prevents silent incorrect scheduling for future task types.
- **`reset_scheduler()`** — explicit teardown for test isolation: sets `_stop`, joins
scheduler thread with timeout, clears module-level reference under `_scheduler_lock`.
---
## Testing (`tests/test_task_scheduler.py`)
All tests mock `_run_task` to avoid real LLM calls. `reset_scheduler()` is called in
an `autouse` fixture for isolation between test cases.
| Test | What it verifies |
|---|---|
| `test_deepest_queue_wins_first_slot` | N cover_letter + M research enqueued (N > M); cover_letter batch starts first when `_available_vram` only fits one model budget, because it has the deeper queue |
| `test_fifo_within_type` | Arrival order preserved within a type batch |
| `test_concurrent_batches_when_vram_allows` | Two type batches start simultaneously when `_available_vram` fits both budgets combined |
| `test_new_tasks_picked_up_mid_batch` | Task enqueued via `enqueue()` while a batch is active is consumed by the running worker in the same batch |
| `test_worker_crash_releases_vram` | `_run_task` raises; `_reserved_vram` returns to 0; scheduler continues; no double-decrement |
| `test_non_llm_tasks_bypass_scheduler` | `discovery`, `email_sync` etc. spawn free threads via `submit_task()`; scheduler deques untouched |
| `test_durability_llm_tasks_on_startup` | DB has existing `queued` LLM-type rows; scheduler loads them into deques on construction |
| `test_durability_excludes_non_llm` | `queued` non-LLM rows in DB are not loaded into deques on startup |
| `test_running_rows_reset_before_scheduler` | `reset_running_tasks()` sets `running``failed`; `queued` rows untouched |
| `test_max_queue_depth_marks_failed` | Enqueue past limit logs warning, does not add to deque, and marks task `failed` in DB |
| `test_missing_budget_logs_warning` | Type in `LLM_TASK_TYPES` with no budget entry at construction logs a warning |
| `test_singleton_thread_safe` | Concurrent calls to `get_scheduler()` produce exactly one scheduler instance |
| `test_reset_scheduler_cleans_up` | `reset_scheduler()` stops loop thread; no lingering threads after call |
---
## Files Touched
| File | Change |
|---|---|
| `scripts/task_scheduler.py` | **New** — ~180 lines |
| `scripts/task_runner.py` | `submit_task()` routing shim — ~12 lines changed |
| `scripts/db.py` | `reset_running_tasks()` added — ~10 lines |
| `app/app.py` | `_startup()`: inline SQL block → `reset_running_tasks()` call, placed first |
| `config/llm.yaml.example` | Add `scheduler:` section |
| `tests/test_task_scheduler.py` | **New** — ~240 lines |

View file

@ -0,0 +1,173 @@
# Jobgether Integration Design
**Date:** 2026-03-15
**Status:** Approved
**Scope:** Peregrine — discovery pipeline + manual URL import
---
## Problem
Jobgether is a job aggregator that posts listings on LinkedIn and other boards with `company = "Jobgether"` rather than the actual employer. This causes two problems:
1. **Misleading listings** — Jobs appear to be at "Jobgether" rather than the real hiring company. Meg sees "Jobgether" as employer throughout the pipeline (Job Review, cover letters, company research).
2. **Broken manual import** — Direct `jobgether.com` URLs return HTTP 403 when scraped with plain `requests`, leaving jobs stuck as `title = "Importing…"`.
**Evidence from DB:** 29+ Jobgether-sourced LinkedIn listings with `company = "Jobgether"`. Actual employer is intentionally withheld by Jobgether's business model ("on behalf of a partner company").
---
## Decision: Option A — Filter + Dedicated Scraper
Drop Jobgether listings from other scrapers entirely and replace with a direct Jobgether scraper that retrieves accurate company names. Existing Jobgether-via-LinkedIn listings in the DB are left as-is for manual review/rejection.
**Why not Option B (follow-through):** LinkedIn→Jobgether→employer is a two-hop chain where the employer is deliberately hidden. Jobgether blocks `requests`. Not worth the complexity for unreliable data.
---
## Components
### 1. Jobgether company filter — `config/blocklist.yaml`
Add `"jobgether"` to the `companies` list in `config/blocklist.yaml`. The existing `_is_blocklisted()` function in `discover.py` already performs a partial case-insensitive match on the company field and applies to all scrapers (JobSpy boards + all custom boards). No code change required.
```yaml
companies:
- jobgether
```
This is the correct mechanism — it is user-visible, config-driven, and applies uniformly. Log output already reports blocklisted jobs per run.
### 2. URL handling in `scrape_url.py`
Three changes required:
**a) `_detect_board()`** — add `"jobgether"` branch returning `"jobgether"` when `"jobgether.com"` is in the URL. Must be added before the `return "generic"` fallback.
**b) dispatch block in `scrape_job_url()`** — add `elif board == "jobgether": fields = _scrape_jobgether(url)` to the `if/elif` chain (lines 208215). Without this, the new `_detect_board()` branch silently falls through to `_scrape_generic()`.
**c) `_scrape_jobgether(url)`** — Playwright-based scraper to bypass 403. Extracts:
- `title` — job title from page heading
- `company` — actual employer name (visible on Jobgether offer pages)
- `location` — remote/location info
- `description` — full job description
- `source = "jobgether"`
Playwright errors (`playwright.sync_api.Error`, `TimeoutError`) are not subclasses of `requests.RequestException` but are caught by the existing broad `except Exception` handler in `scrape_job_url()` — no changes needed to the error handling block.
**URL slug fallback for company name (manual import path only):** Jobgether offer URLs follow the pattern:
```
https://jobgether.com/offer/{24-hex-hash}-{title-slug}---{company-slug}
```
When Playwright is unavailable, parse `company-slug` using:
```python
m = re.search(r'---([^/?]+)$', parsed_path)
company = m.group(1).replace("-", " ").title() if m else ""
```
Example: `/offer/69b42d9d24d79271ee0618e8-customer-success-manager---resware``"Resware"`.
This fallback is scoped to `_scrape_jobgether()` in `scrape_url.py` only; the discovery scraper always gets company name from the rendered DOM. `_scrape_jobgether()` does not make any `requests` calls — there is no `raise_for_status()` — so the `requests.RequestException` handler in `scrape_job_url()` is irrelevant to this path; only the broad `except Exception` applies.
**Pre-implementation checkpoint:** Confirm that Jobgether offer URLs have no tracking query params beyond UTM (already covered by `_STRIP_PARAMS`). No `canonicalize_url()` changes are expected but verify before implementation.
### 3. `scripts/custom_boards/jobgether.py`
Playwright-based search scraper following the same interface as `theladders.py`:
```python
def scrape(profile: dict, location: str, results_wanted: int = 50) -> list[dict]
```
- Base URL: `https://jobgether.com/remote-jobs`
- Search strategy: iterate over `profile["titles"]`, apply search/filter params
- **Pre-condition — do not begin implementation of this file until live URL inspection is complete.** Use browser dev tools or a Playwright `page.on("request")` capture to determine the actual query parameter format for title/location filtering. Jobgether may use URL query params, path segments, or JS-driven state — this cannot be assumed from the URL alone.
- Extraction: job cards from rendered DOM (Playwright `page.evaluate()`)
- Returns standard job dicts: `title, company, url, source, location, is_remote, salary, description`
- `source = "jobgether"`
- Graceful `ImportError` handling if Playwright not installed (same pattern as `theladders.py`)
- Polite pacing: 1s sleep between title iterations
- Company name comes from DOM; URL slug parse is not needed in this path
### 4. Registration + config
**`discover.py` — import block (lines 2022):**
```python
from scripts.custom_boards import jobgether as _jobgether
```
**`discover.py` — `CUSTOM_SCRAPERS` dict literal (lines 3034):**
```python
CUSTOM_SCRAPERS: dict[str, object] = {
"adzuna": _adzuna.scrape,
"theladders": _theladders.scrape,
"craigslist": _craigslist.scrape,
"jobgether": _jobgether.scrape, # ← add this line
}
```
**`config/search_profiles.yaml` (and `.example`):**
Add `jobgether` to `custom_boards` for any profile that includes `Remote` in its `locations` list. Jobgether is a remote-work-focused aggregator; adding it to location-specific non-remote profiles is not useful. Do not add a `custom_boards` key to profiles that don't already have one unless they are remote-eligible.
```yaml
custom_boards:
- jobgether
```
---
## Data Flow
```
discover.py
├── JobSpy boards → _is_blocklisted(company="jobgether") → drop → DB insert
├── custom: adzuna → _is_blocklisted(company="jobgether") → drop → DB insert
├── custom: theladders → _is_blocklisted(company="jobgether") → drop → DB insert
├── custom: craigslist → _is_blocklisted(company="jobgether") → drop → DB insert
└── custom: jobgether → (company = real employer, never "jobgether") → DB insert
scrape_url.py
└── jobgether.com URL → _detect_board() = "jobgether"
→ _scrape_jobgether()
├── Playwright available → full job fields from page
└── Playwright unavailable → company from URL slug only
```
---
## Implementation Notes
- **Slug fallback None-guard:** The regex `r'---([^/?]+)$'` returns a wrong value (not `None`) if the URL slug doesn't follow the expected format. Add a logged warning and return `""` rather than title-casing garbage.
- **Import guard in `discover.py`:** Wrap the `jobgether` import with `try/except ImportError`, setting `_jobgether = None`, and gate the `CUSTOM_SCRAPERS` registration with `if _jobgether is not None`. This ensures the graceful ImportError in `jobgether.py` (for missing Playwright) propagates cleanly to the caller rather than crashing discovery.
### 5. Cover letter recruiter framing — `scripts/generate_cover_letter.py`
When `source = "jobgether"`, inject a system hint that shifts the cover letter addressee from the employer to the Jobgether recruiter. Use Policy A: recruiter framing applies for all Jobgether-sourced jobs regardless of whether the real company name was resolved.
- If company is known (e.g. "Resware"): *"Your client at Resware will appreciate..."*
- If company is unknown: *"Your client will appreciate..."*
The real company name is always stored in the DB as resolved by the scraper — this is internal knowledge only. The framing shift is purely in the generated letter text, not in how the job is stored or displayed.
Implementation: add an `is_jobgether` flag to the cover letter prompt context (same pattern as `mission_hint` injection). Add a conditional block in the system prompt / Para 1 instructions when the flag is true.
---
## Out of Scope
- Retroactively fixing existing `company = "Jobgether"` rows in the DB (left for manual review/rejection)
- Jobgether discovery scraper — **decided against during implementation (2026-03-15)**: Cloudflare Turnstile blocks all headless browsers on all Jobgether pages; `filter-api.jobgether.com` requires auth; `robots.txt` blocks all bots. The email digest → manual URL paste → slug company extraction flow covers the actual use case.
- Jobgether authentication / logged-in scraping
- Pagination
- Dedup between Jobgether and other boards (existing URL dedup handles this)
---
## Files Changed
| File | Change |
|------|--------|
| `config/blocklist.yaml` | Add `"jobgether"` to `companies` list |
| `scripts/discover.py` | Add import + entry in `CUSTOM_SCRAPERS` dict literal |
| `scripts/scrape_url.py` | Add `_detect_board` branch, dispatch branch, `_scrape_jobgether()` |
| `scripts/custom_boards/jobgether.py` | New file — Playwright search scraper |
| `config/search_profiles.yaml` | Add `jobgether` to `custom_boards` |
| `config/search_profiles.yaml.example` | Same |

View file

@ -0,0 +1,258 @@
# UI Switcher — Design Spec
**Date:** 2026-03-22
**Status:** Approved
**Scope:** Peregrine v0.7.0
---
## Overview
Add a Reddit-style UI switcher that lets paid-tier users opt into the new Vue 3 SPA while the Streamlit UI remains the default. The Vue SPA ships merged into `main` (gated behind a paid-tier feature flag), served by a new nginx Docker service alongside Streamlit. The demo instance gets both the UI switcher (open to all visitors) and a simulated tier switcher so demo visitors can explore all feature tiers.
---
## Decisions
| Question | Decision |
|---|---|
| Switcher placement | Banner (once per session, dismissible) + Settings → System toggle |
| Vue SPA serving | New `web` Docker service (nginx) in all three compose files |
| Preference persistence | JS cookie (`prgn_ui`) as Caddy routing signal; `user.yaml` as durability layer |
| Switching mechanism | JS cookie injection via `st.components.v1.html()` (Streamlit→Vue); client-side JS (Vue→Streamlit) |
| Tier gate | `vue_ui_beta: "paid"` in `tiers.py`; bypassed in `DEMO_MODE` |
| Branch strategy | Merge `feature-vue-spa``main` now; future Vue work uses `feature/vue-*``main` PRs |
| Demo UI switcher | Open to all demo visitors (no tier gate) |
| Demo tier switcher | Slim full-width toolbar above nav; cookie-based persistence (`prgn_demo_tier`) |
| Banner dismissal | Uses existing `dismissed_banners` list in `user.yaml` (key: `ui_switcher_beta`) |
---
## Port Reference
| Compose file | Host port | Purpose |
|---|---|---|
| `compose.yml` | 8501 | Personal dev instance |
| `compose.demo.yml` | 8504 | Demo (`demo.circuitforge.tech`) |
| `compose.cloud.yml` | 8505 | Cloud managed (`menagerie.circuitforge.tech`) |
| `compose.yml` (web) | 8506 | Vue SPA — dev |
| `compose.demo.yml` (web) | 8507 | Vue SPA — demo |
| `compose.cloud.yml` (web) | 8508 | Vue SPA — cloud |
---
## Architecture
Six additive components — nothing removed from the existing stack.
### 1. `web` Docker service
A minimal nginx container serving the Vue SPA `dist/` build. Added to `compose.yml`, `compose.demo.yml`, and `compose.cloud.yml`.
- `docker/web/Dockerfile``FROM nginx:alpine`, copies `nginx.conf`, copies `web/dist/` into `/usr/share/nginx/html/`
- `docker/web/nginx.conf` — standard SPA config with `try_files $uri /index.html` fallback
- Build step is image-baked (not a bind-mount): `docker compose build web` runs `vite build` in `web/` via a multi-stage Dockerfile, then copies the resulting `dist/` into the nginx image. This ensures a fresh clone + `manage.sh start` works without a separate manual build step.
- `manage.sh` updated: `build` target runs `docker compose build web app` so both are built together.
### 2. Caddy cookie routing
Caddy inspects the `prgn_ui` cookie on all Peregrine requests. Two vhost blocks require changes:
**`menagerie.circuitforge.tech` (cloud, port 8505/8508):**
```
handle /peregrine* {
@no_session not header Cookie *cf_session*
redir @no_session https://circuitforge.tech/login?next={uri} 302
@vue_ui header Cookie *prgn_ui=vue*
handle @vue_ui {
reverse_proxy http://host.docker.internal:8508
}
handle {
reverse_proxy http://host.docker.internal:8505
}
}
```
**`demo.circuitforge.tech` (demo, port 8504/8507):**
```
handle /peregrine* {
@vue_ui header Cookie *prgn_ui=vue*
handle @vue_ui {
reverse_proxy http://host.docker.internal:8507
}
handle {
reverse_proxy http://host.docker.internal:8504
}
}
```
Error handling: a `handle_errors { ... }` block on each vhost catches 502 from the Vue SPA service, redirects to the Streamlit upstream with `?ui_fallback=1`, and includes a `Set-Cookie: prgn_ui=streamlit; Path=/` response header to clear the routing cookie.
### 3. Streamlit switch mechanism
New module `app/components/ui_switcher.py`:
- `sync_ui_cookie()` — called **in the render pass** (after `pg.run()` in `app.py`), not inside the cached startup hook. Reads `user.yaml.ui_preference`; injects JS to set/clear `prgn_ui` cookie. Cookie/user.yaml conflict: **cookie wins** — if `prgn_ui` cookie is already present, writes user.yaml to match before re-injecting. If `DEMO_MODE`, skips tier check. If not `DEMO_MODE` and not `can_use("vue_ui_beta")`, resets preference to `streamlit` and clears cookie.
- `switch_ui(to: str)` — writes `user.yaml.ui_preference`, calls `sync_ui_cookie()`, then `st.rerun()`.
- `render_banner()` — dismissible banner shown to eligible users when `ui_switcher_beta` is not in `user_profile.dismissed_banners`. On dismiss: appends `ui_switcher_beta` to `dismissed_banners`, saves `user.yaml`. On "Try it": calls `switch_ui("vue")`. Also detects `?ui_fallback=1` in `st.query_params` and shows a toast ("New UI temporarily unavailable — switched back to Classic") then clears the param.
- `render_settings_toggle()` — toggle in Settings → System → Deployment expander. Calls `switch_ui()` on change.
### 4. Vue SPA switch-back
New `web/src/components/ClassicUIButton.vue`:
```js
function switchToClassic() {
document.cookie = 'prgn_ui=streamlit; path=/; SameSite=Lax';
const url = new URL(window.location.href);
url.searchParams.set('prgn_switch', 'streamlit');
window.location.href = url.toString();
}
```
**Why the query param?** Streamlit cannot read HTTP cookies from Python — only client-side JS can. The `?prgn_switch=streamlit` param acts as a bridge: `sync_ui_cookie()` reads it via `st.query_params`, updates user.yaml to match, then clears the param. The cookie is set by the JS before the navigation so Caddy routes the request to Streamlit, and the param ensures user.yaml stays consistent with the cookie.
### 5. Tier gate
`app/wizard/tiers.py`:
```python
FEATURES: dict[str, str] = {
...
"vue_ui_beta": "paid", # add this
}
```
Not in `BYOK_UNLOCKABLE` — the Vue UI has no LLM dependency; the gate is purely about beta access management.
`can_use()` signature change — keyword-only argument with a safe default:
```python
def can_use(
tier: str,
feature: str,
has_byok: bool = False,
*,
demo_tier: str | None = None,
) -> bool:
effective_tier = demo_tier if (demo_tier and DEMO_MODE_FLAG) else tier
...
```
Argument order preserved from the existing implementation (`tier` first, `feature` second) — no existing call sites need updating. `DEMO_MODE_FLAG` is read from the environment, not from `st.session_state`, so this function is safe to call from background task threads and tests. `st.session_state.simulated_tier` is only read by the **caller** (`render_banner()`, `render_settings_toggle()`, page feature gates) which then passes it as `demo_tier=`.
### 6. Demo toolbar
New module `app/components/demo_toolbar.py`:
- `render_demo_toolbar()` — slim full-width bar rendered at the top of `app.py`'s render pass when `DEMO_MODE=true`. Shows `🎭 Demo mode · Free · Paid · Premium` pills with the active tier highlighted.
- `set_simulated_tier(tier: str)` — injects JS to set `prgn_demo_tier` cookie, updates `st.session_state.simulated_tier`, calls `st.rerun()`.
- Initialization: on each page load in demo mode, `app.py` reads `prgn_demo_tier` from `st.query_params` or the cookie (via a JS→hidden Streamlit input bridge, same pattern used by existing components) and sets `st.session_state.simulated_tier`. **Default if not set: `paid`** — shows the full feature set immediately on first demo load.
`useFeatureFlag.ts` (Vue SPA, `web/src/composables/`) is **demo-toolbar only** — it reads `prgn_demo_tier` cookie for the visual indicator in the Vue SPA's ClassicUIButton area. It is **not** an authoritative feature gate. All real feature gating in the Vue SPA will use a future `/api/features` endpoint (tracked under issue #8). This composable exists solely so the demo toolbar's simulated tier is visually consistent when the user has switched to the Vue SPA.
---
## File Changes
### New files
| File | Purpose |
|---|---|
| `app/components/ui_switcher.py` | `sync_ui_cookie`, `switch_ui`, `render_banner`, `render_settings_toggle` |
| `app/components/demo_toolbar.py` | `render_demo_toolbar`, `set_simulated_tier` |
| `docker/web/Dockerfile` | Multi-stage: `node` build stage → `nginx:alpine` serve stage |
| `docker/web/nginx.conf` | SPA-aware nginx config |
| `web/` | Vue SPA source (merged from `feature-vue-spa` worktree) |
| `web/src/components/ClassicUIButton.vue` | Switch-back button for Vue SPA nav |
| `web/src/composables/useFeatureFlag.ts` | Demo toolbar tier display (not a production gate) |
### Modified files
| File | Change |
|---|---|
| `app/app.py` | Call `sync_ui_cookie()` + `render_demo_toolbar()` + `render_banner()` in render pass |
| `app/wizard/tiers.py` | Add `vue_ui_beta: "paid"` to `FEATURES`; add `demo_tier` keyword arg to `can_use()` |
| `app/pages/2_Settings.py` | Add `render_settings_toggle()` in System → Deployment expander |
| `config/user.yaml.example` | Add `ui_preference: streamlit` |
| `scripts/user_profile.py` | Add `ui_preference` field to schema (default: `streamlit`) |
| `compose.yml` | Add `web` service (port 8506) |
| `compose.demo.yml` | Add `web` service (port 8507) |
| `compose.cloud.yml` | Add `web` service (port 8508) |
| `manage.sh` | `build` target includes `web` service |
| `/devl/caddy-proxy/Caddyfile` | Cookie routing in `menagerie.circuitforge.tech` + `demo.circuitforge.tech` peregrine blocks |
---
## Data Flow
### Streamlit → Vue
```
User clicks "Try it" banner or Settings toggle
→ switch_ui(to="vue")
→ write user.yaml: ui_preference: vue
→ sync_ui_cookie(): inject JS → document.cookie = 'prgn_ui=vue; path=/'
→ st.rerun()
→ browser reloads → Caddy sees prgn_ui=vue → :8508/:8507 (Vue SPA)
```
### Vue → Streamlit
```
User clicks "Classic UI" in Vue nav
→ document.cookie = 'prgn_ui=streamlit; path=/'
→ navigate to current URL + ?prgn_switch=streamlit
→ Caddy sees prgn_ui=streamlit → :8505/:8504 (Streamlit)
→ app.py render pass: sync_ui_cookie() sees ?prgn_switch=streamlit in st.query_params
→ writes user.yaml: ui_preference: streamlit
→ clears query param
→ injects JS to re-confirm cookie
```
### Demo tier switch
```
User clicks tier pill in demo toolbar
→ set_simulated_tier("paid")
→ inject JS → document.cookie = 'prgn_demo_tier=paid; path=/'
→ st.session_state.simulated_tier = "paid"
→ st.rerun()
→ render_banner() / page feature gates call can_use(..., demo_tier=st.session_state.simulated_tier)
```
### Cookie cleared (durability)
```
Browser cookies cleared
→ next Streamlit load: sync_ui_cookie() reads user.yaml: ui_preference: vue
→ re-injects prgn_ui=vue cookie
→ next navigation: Caddy routes to Vue SPA
```
---
## Error Handling
| Scenario | Handling |
|---|---|
| Vue SPA service down (502) | Caddy `handle_errors` sets `Set-Cookie: prgn_ui=streamlit` + redirects to Streamlit with `?ui_fallback=1` |
| `?ui_fallback=1` detected | `render_banner()` shows toast "New UI temporarily unavailable — switched back to Classic"; calls `switch_ui("streamlit")` |
| user.yaml missing/malformed | `sync_ui_cookie()` try/except defaults to `streamlit`; no crash |
| Cookie/user.yaml conflict | Cookie wins — `sync_ui_cookie()` writes user.yaml to match cookie if present |
| Tier downgrade with vue cookie | `sync_ui_cookie()` detects `not can_use("vue_ui_beta")` → clears cookie + resets user.yaml |
| Demo toolbar in non-demo mode | `render_demo_toolbar()` only called when `DEMO_MODE=true`; `prgn_demo_tier` ignored by `can_use()` outside demo |
| `can_use()` called from background thread | `demo_tier` param defaults to `None`; `DEMO_MODE_FLAG` is env-only — no `st.session_state` access in the function body; thread-safe |
| First demo load (no cookie yet) | `st.session_state.simulated_tier` initialized to `"paid"` if `prgn_demo_tier` cookie absent |
---
## Testing
- **Unit**: `sync_ui_cookie()` with all three conflict cases; `can_use("vue_ui_beta")` for free/paid/premium/demo tiers; `set_simulated_tier()` state transitions; `can_use()` called with `demo_tier=` from a non-Streamlit context (no `RuntimeError`)
- **Integration**: Caddy routing with mocked cookie headers (both directions); 502 fallback redirect + cookie clear chain
- **E2E**: Streamlit→Vue switch → verify served from Vue SPA port; Vue→Streamlit → verify Streamlit port; demo tier pill → verify feature gate state changes; cookie persistence across Streamlit restart; fresh clone `./manage.sh start` builds and serves Vue SPA correctly
---
## Out of Scope
- Vue SPA feature parity with Streamlit (tracked under issue #8)
- Removing the Streamlit UI (v1 GA milestone)
- `old.peregrine.circuitforge.tech` subdomain alias (not needed — cookie approach is sufficient)
- Authoritative Vue-side feature gating via `/api/features` endpoint (post-parity, issue #8)
- Fine-tuned model or integrations gating in the Vue SPA (future work)

View file

@ -2,9 +2,6 @@
# Extracted from environment.yml for Docker pip installs
# Keep in sync with environment.yml
# ── CircuitForge shared core ───────────────────────────────────────────────
-e ../circuitforge-core
# ── Web UI ────────────────────────────────────────────────────────────────
streamlit>=1.35
watchdog
@ -81,10 +78,3 @@ lxml
# ── Documentation ────────────────────────────────────────────────────────
mkdocs>=1.5
mkdocs-material>=9.5
# ── Vue SPA API backend ──────────────────────────────────────────────────
fastapi>=0.100.0
uvicorn[standard]>=0.20.0
PyJWT>=2.8.0
cryptography>=40.0.0
python-multipart>=0.0.6

View file

@ -9,14 +9,30 @@ from datetime import datetime
from pathlib import Path
from typing import Optional
from circuitforge_core.db import get_connection as _cf_get_connection
DEFAULT_DB = Path(os.environ.get("STAGING_DB", Path(__file__).parent.parent / "staging.db"))
def get_connection(db_path: Path = DEFAULT_DB, key: str = "") -> "sqlite3.Connection":
"""Thin shim — delegates to circuitforge_core.db.get_connection."""
return _cf_get_connection(db_path, key)
"""
Open a database connection.
In cloud mode with a key: uses SQLCipher (AES-256 encrypted, API-identical to sqlite3).
Otherwise: vanilla sqlite3.
Args:
db_path: Path to the SQLite/SQLCipher database file.
key: SQLCipher encryption key (hex string). Empty = unencrypted.
"""
import os as _os
cloud_mode = _os.environ.get("CLOUD_MODE", "").lower() in ("1", "true", "yes")
if cloud_mode and key:
from pysqlcipher3 import dbapi2 as _sqlcipher
conn = _sqlcipher.connect(str(db_path))
conn.execute(f"PRAGMA key='{key}'")
return conn
else:
import sqlite3 as _sqlite3
return _sqlite3.connect(str(db_path))
CREATE_JOBS = """

View file

@ -2,18 +2,168 @@
LLM abstraction layer with priority fallback chain.
Reads config/llm.yaml. Tries backends in order; falls back on any error.
"""
import os
import yaml
import requests
from pathlib import Path
from circuitforge_core.llm import LLMRouter as _CoreLLMRouter
from openai import OpenAI
CONFIG_PATH = Path(__file__).parent.parent / "config" / "llm.yaml"
class LLMRouter(_CoreLLMRouter):
"""Peregrine-specific LLMRouter — defaults to Peregrine's config/llm.yaml."""
class LLMRouter:
def __init__(self, config_path: Path = CONFIG_PATH):
super().__init__(config_path)
with open(config_path) as f:
self.config = yaml.safe_load(f)
def _is_reachable(self, base_url: str) -> bool:
"""Quick health-check ping. Returns True if backend is up."""
health_url = base_url.rstrip("/").removesuffix("/v1") + "/health"
try:
resp = requests.get(health_url, timeout=2)
return resp.status_code < 500
except Exception:
return False
def _resolve_model(self, client: OpenAI, model: str) -> str:
"""Resolve __auto__ to the first model served by vLLM."""
if model != "__auto__":
return model
models = client.models.list()
return models.data[0].id
def complete(self, prompt: str, system: str | None = None,
model_override: str | None = None,
fallback_order: list[str] | None = None,
images: list[str] | None = None,
max_tokens: int | None = None) -> str:
"""
Generate a completion. Tries each backend in fallback_order.
model_override: when set, replaces the configured model for
openai_compat backends (e.g. pass a research-specific ollama model).
fallback_order: when set, overrides config fallback_order for this
call (e.g. pass config["research_fallback_order"] for research tasks).
images: optional list of base64-encoded PNG/JPG strings. When provided,
backends without supports_images=true are skipped. vision_service backends
are only tried when images is provided.
Raises RuntimeError if all backends are exhausted.
"""
if os.environ.get("DEMO_MODE", "").lower() in ("1", "true", "yes"):
raise RuntimeError(
"AI inference is disabled in the public demo. "
"Run your own instance to use AI features."
)
order = fallback_order if fallback_order is not None else self.config["fallback_order"]
for name in order:
backend = self.config["backends"][name]
if not backend.get("enabled", True):
print(f"[LLMRouter] {name}: disabled, skipping")
continue
supports_images = backend.get("supports_images", False)
is_vision_service = backend["type"] == "vision_service"
# vision_service only used when images provided
if is_vision_service and not images:
print(f"[LLMRouter] {name}: vision_service skipped (no images)")
continue
# non-vision backends skipped when images provided and they don't support it
if images and not supports_images and not is_vision_service:
print(f"[LLMRouter] {name}: no image support, skipping")
continue
if is_vision_service:
if not self._is_reachable(backend["base_url"]):
print(f"[LLMRouter] {name}: unreachable, skipping")
continue
try:
resp = requests.post(
backend["base_url"].rstrip("/") + "/analyze",
json={
"prompt": prompt,
"image_base64": images[0] if images else "",
},
timeout=60,
)
resp.raise_for_status()
print(f"[LLMRouter] Used backend: {name} (vision_service)")
return resp.json()["text"]
except Exception as e:
print(f"[LLMRouter] {name}: error — {e}, trying next")
continue
elif backend["type"] == "openai_compat":
if not self._is_reachable(backend["base_url"]):
print(f"[LLMRouter] {name}: unreachable, skipping")
continue
try:
client = OpenAI(
base_url=backend["base_url"],
api_key=backend.get("api_key") or "any",
)
raw_model = model_override or backend["model"]
model = self._resolve_model(client, raw_model)
messages = []
if system:
messages.append({"role": "system", "content": system})
if images and supports_images:
content = [{"type": "text", "text": prompt}]
for img in images:
content.append({
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{img}"},
})
messages.append({"role": "user", "content": content})
else:
messages.append({"role": "user", "content": prompt})
create_kwargs: dict = {"model": model, "messages": messages}
if max_tokens is not None:
create_kwargs["max_tokens"] = max_tokens
resp = client.chat.completions.create(**create_kwargs)
print(f"[LLMRouter] Used backend: {name} ({model})")
return resp.choices[0].message.content
except Exception as e:
print(f"[LLMRouter] {name}: error — {e}, trying next")
continue
elif backend["type"] == "anthropic":
api_key = os.environ.get(backend["api_key_env"], "")
if not api_key:
print(f"[LLMRouter] {name}: {backend['api_key_env']} not set, skipping")
continue
try:
import anthropic as _anthropic
client = _anthropic.Anthropic(api_key=api_key)
if images and supports_images:
content = []
for img in images:
content.append({
"type": "image",
"source": {"type": "base64", "media_type": "image/png", "data": img},
})
content.append({"type": "text", "text": prompt})
else:
content = prompt
kwargs: dict = {
"model": backend["model"],
"max_tokens": 4096,
"messages": [{"role": "user", "content": content}],
}
if system:
kwargs["system"] = system
msg = client.messages.create(**kwargs)
print(f"[LLMRouter] Used backend: {name}")
return msg.content[0].text
except Exception as e:
print(f"[LLMRouter] {name}: error — {e}, trying next")
continue
raise RuntimeError("All LLM backends exhausted")
# Module-level singleton for convenience

View file

@ -24,7 +24,7 @@ def test_router_uses_first_reachable_backend():
mock_response.choices[0].message.content = "hello"
with patch.object(router, "_is_reachable", side_effect=[False, True, True, True, True]), \
patch("circuitforge_core.llm.router.OpenAI") as MockOpenAI:
patch("scripts.llm_router.OpenAI") as MockOpenAI:
instance = MockOpenAI.return_value
instance.chat.completions.create.return_value = mock_response
mock_model = MagicMock()
@ -54,7 +54,7 @@ def test_is_reachable_returns_false_on_connection_error():
router = LLMRouter(CONFIG_PATH)
with patch("circuitforge_core.llm.router.requests.get", side_effect=requests.ConnectionError):
with patch("scripts.llm_router.requests.get", side_effect=requests.ConnectionError):
result = router._is_reachable("http://localhost:9999/v1")
assert result is False
@ -92,8 +92,8 @@ def test_complete_skips_backend_without_image_support(tmp_path):
mock_resp.status_code = 200
mock_resp.json.return_value = {"text": "B — collaborative"}
with patch("circuitforge_core.llm.router.requests.get") as mock_get, \
patch("circuitforge_core.llm.router.requests.post") as mock_post:
with patch("scripts.llm_router.requests.get") as mock_get, \
patch("scripts.llm_router.requests.post") as mock_post:
# health check returns ok for vision_service
mock_get.return_value = MagicMock(status_code=200)
mock_post.return_value = mock_resp
@ -127,7 +127,7 @@ def test_complete_without_images_skips_vision_service(tmp_path):
cfg_file.write_text(yaml.dump(cfg))
router = LLMRouter(config_path=cfg_file)
with patch("circuitforge_core.llm.router.requests.post") as mock_post:
with patch("scripts.llm_router.requests.post") as mock_post:
try:
router.complete("text only prompt")
except RuntimeError:

View file

@ -8,9 +8,7 @@ from app.wizard.tiers import can_use, tier_label, TIERS, FEATURES, BYOK_UNLOCKAB
def test_tiers_list():
# Peregrine uses the core tier list; "ultra" is included but no features require it yet
assert TIERS[:3] == ["free", "paid", "premium"]
assert "ultra" in TIERS
assert TIERS == ["free", "paid", "premium"]
def test_can_use_free_feature_always():