- Add tests/e2e/test_smoke.py: page-load error check for all pages - Add tests/e2e/test_interactions.py: click every interactable, diff errors, XFAIL expected demo failures, flag regressions as XPASS - Fix conftest get_page_errors() to use text_content() instead of inner_text() so errors inside collapsed expanders are captured with their actual message text (inner_text respects CSS display:none) - Fix tests/e2e/modes/demo.py base_url to include /peregrine path prefix (STREAMLIT_SERVER_BASE_URL_PATH=peregrine set in demo container) App fixes surfaced by the harness: - task_runner.py: add DEMO_MODE guard for discovery task — previously crashed with FileNotFoundError on search_profiles.yaml before any demo guard could fire; now returns friendly error immediately - 6_Interview_Prep.py: stop auto-triggering LLM session on page load in demo mode; show "AI features disabled" info instead, preventing a silent st.error() inside the collapsed Practice Q&A expander Both smoke and interaction tests now pass clean against demo mode.
283 lines
12 KiB
Python
283 lines
12 KiB
Python
# scripts/task_runner.py
|
|
"""
|
|
Background task runner for LLM generation tasks.
|
|
|
|
Submitting a task inserts a row in background_tasks and spawns a daemon thread.
|
|
The thread calls the appropriate generator, writes results to existing tables,
|
|
and marks the task completed or failed.
|
|
|
|
Deduplication: only one queued/running task per (task_type, job_id) is allowed.
|
|
Different task types for the same job run concurrently (e.g. cover letter + research).
|
|
"""
|
|
import sqlite3
|
|
import threading
|
|
from pathlib import Path
|
|
|
|
from scripts.db import (
|
|
DEFAULT_DB,
|
|
insert_task,
|
|
update_task_status,
|
|
update_task_stage,
|
|
update_cover_letter,
|
|
save_research,
|
|
)
|
|
|
|
|
|
def submit_task(db_path: Path = DEFAULT_DB, task_type: str = "",
|
|
job_id: int = None,
|
|
params: str | None = None) -> tuple[int, bool]:
|
|
"""Submit a background task.
|
|
|
|
LLM task types (cover_letter, company_research, wizard_generate) are routed
|
|
through the TaskScheduler for VRAM-aware batch scheduling.
|
|
All other types spawn a free daemon thread as before.
|
|
|
|
Returns (task_id, True) if a new task was queued.
|
|
Returns (existing_id, False) if an identical task is already in-flight.
|
|
"""
|
|
task_id, is_new = insert_task(db_path, task_type, job_id or 0, params=params)
|
|
if is_new:
|
|
from scripts.task_scheduler import get_scheduler, LLM_TASK_TYPES
|
|
if task_type in LLM_TASK_TYPES:
|
|
get_scheduler(db_path, run_task_fn=_run_task).enqueue(
|
|
task_id, task_type, job_id or 0, params
|
|
)
|
|
else:
|
|
t = threading.Thread(
|
|
target=_run_task,
|
|
args=(db_path, task_id, task_type, job_id or 0, params),
|
|
daemon=True,
|
|
)
|
|
t.start()
|
|
return task_id, is_new
|
|
|
|
|
|
_WIZARD_PROMPTS: dict[str, str] = {
|
|
"career_summary": (
|
|
"Based on the following resume text, write a concise 2-4 sentence professional "
|
|
"career summary in first person. Focus on years of experience, key skills, and "
|
|
"what makes this person distinctive. Return only the summary text, no labels.\n\n"
|
|
"Resume:\n{resume_text}"
|
|
),
|
|
"expand_bullets": (
|
|
"Rewrite these rough responsibility notes as polished STAR-format bullet points "
|
|
"(Situation/Task, Action, Result). Each bullet should start with a strong action verb. "
|
|
"Return a JSON array of bullet strings only.\n\nNotes:\n{bullet_notes}"
|
|
),
|
|
"suggest_skills": (
|
|
"Based on these work experience descriptions, suggest additional skills to add to "
|
|
"a resume. Return a JSON array of skill strings only — no explanations.\n\n"
|
|
"Experience:\n{experience_text}"
|
|
),
|
|
"voice_guidelines": (
|
|
"Analyze the writing style and tone of this resume and cover letter corpus. "
|
|
"Return 3-5 concise guidelines for maintaining this person's authentic voice in "
|
|
"future cover letters (e.g. 'Uses direct, confident statements'). "
|
|
"Return a JSON array of guideline strings.\n\nContent:\n{content}"
|
|
),
|
|
"job_titles": (
|
|
"Given these job titles and resume, suggest 5-8 additional job title variations "
|
|
"this person should search for. Return a JSON array of title strings only.\n\n"
|
|
"Current titles: {current_titles}\nResume summary: {resume_text}"
|
|
),
|
|
"keywords": (
|
|
"Based on this resume and target job titles, suggest important keywords and phrases "
|
|
"to include in job applications. Return a JSON array of keyword strings.\n\n"
|
|
"Titles: {titles}\nResume: {resume_text}"
|
|
),
|
|
"blocklist": (
|
|
"Based on this resume and job search context, suggest companies, industries, or "
|
|
"keywords to blocklist (avoid in job search results). "
|
|
"Return a JSON array of strings.\n\nContext: {resume_text}"
|
|
),
|
|
"mission_notes": (
|
|
"Based on this resume, write a short personal note (1-2 sentences) about why this "
|
|
"person might genuinely care about each of these industries: music, animal_welfare, education. "
|
|
"Return a JSON object with those three industry keys and note values. "
|
|
"If the resume shows no clear connection to an industry, set its value to empty string.\n\n"
|
|
"Resume: {resume_text}"
|
|
),
|
|
}
|
|
|
|
|
|
def _run_wizard_generate(section: str, input_data: dict) -> str:
|
|
"""Run LLM generation for a wizard section. Returns result string.
|
|
|
|
Raises ValueError for unknown sections.
|
|
Raises any LLM exception on failure.
|
|
"""
|
|
template = _WIZARD_PROMPTS.get(section)
|
|
if template is None:
|
|
raise ValueError(f"Unknown wizard_generate section: {section!r}")
|
|
# Format the prompt, substituting available keys; unknown placeholders become empty string
|
|
import re as _re
|
|
|
|
def _safe_format(tmpl: str, kwargs: dict) -> str:
|
|
"""Format template substituting available keys; leaves missing keys as empty string."""
|
|
def replacer(m):
|
|
key = m.group(1)
|
|
return str(kwargs.get(key, ""))
|
|
return _re.sub(r"\{(\w+)\}", replacer, tmpl)
|
|
|
|
prompt = _safe_format(template, {k: str(v) for k, v in input_data.items()})
|
|
# Append iterative refinement context if provided
|
|
previous_result = input_data.get("previous_result", "")
|
|
feedback = input_data.get("feedback", "")
|
|
if previous_result:
|
|
prompt += f"\n\n---\nPrevious output:\n{previous_result}"
|
|
if feedback:
|
|
prompt += f"\n\nUser feedback / requested changes:\n{feedback}\n\nPlease revise accordingly."
|
|
from scripts.llm_router import LLMRouter
|
|
return LLMRouter().complete(prompt)
|
|
|
|
|
|
def _run_task(db_path: Path, task_id: int, task_type: str, job_id: int,
|
|
params: str | None = None) -> None:
|
|
"""Thread body: run the generator and persist the result."""
|
|
# job_id == 0 means a global task (e.g. discovery) with no associated job row.
|
|
job: dict = {}
|
|
if job_id:
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
row = conn.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()
|
|
conn.close()
|
|
if row is None:
|
|
update_task_status(db_path, task_id, "failed", error=f"Job {job_id} not found")
|
|
return
|
|
job = dict(row)
|
|
|
|
update_task_status(db_path, task_id, "running")
|
|
|
|
try:
|
|
if task_type == "discovery":
|
|
import os as _os
|
|
if _os.environ.get("DEMO_MODE", "").lower() in ("1", "true", "yes"):
|
|
update_task_status(
|
|
db_path, task_id, "failed",
|
|
error="Discovery is disabled in the public demo. Run your own instance to use this feature.",
|
|
)
|
|
return
|
|
from scripts.discover import run_discovery
|
|
new_count = run_discovery(db_path)
|
|
n = new_count or 0
|
|
update_task_status(
|
|
db_path, task_id, "completed",
|
|
error=f"{n} new listing{'s' if n != 1 else ''} added",
|
|
)
|
|
return
|
|
|
|
elif task_type == "cover_letter":
|
|
import json as _json
|
|
p = _json.loads(params or "{}")
|
|
from scripts.generate_cover_letter import generate
|
|
result = generate(
|
|
job.get("title", ""),
|
|
job.get("company", ""),
|
|
job.get("description", ""),
|
|
previous_result=p.get("previous_result", ""),
|
|
feedback=p.get("feedback", ""),
|
|
is_jobgether=job.get("source") == "jobgether",
|
|
)
|
|
update_cover_letter(db_path, job_id, result)
|
|
|
|
elif task_type == "company_research":
|
|
from scripts.company_research import research_company
|
|
result = research_company(
|
|
job,
|
|
on_stage=lambda s: update_task_stage(db_path, task_id, s),
|
|
)
|
|
save_research(db_path, job_id=job_id, **result)
|
|
|
|
elif task_type == "enrich_descriptions":
|
|
from scripts.enrich_descriptions import enrich_all_descriptions
|
|
r = enrich_all_descriptions(db_path)
|
|
errs = len(r.get("errors", []))
|
|
msg = (
|
|
f"{r['succeeded']} description(s) fetched, {r['failed']} failed"
|
|
+ (f", {errs} error(s)" if errs else "")
|
|
)
|
|
update_task_status(db_path, task_id, "completed", error=msg)
|
|
return
|
|
|
|
elif task_type == "scrape_url":
|
|
from scripts.scrape_url import scrape_job_url
|
|
fields = scrape_job_url(db_path, job_id)
|
|
title = fields.get("title") or job.get("url", "?")
|
|
company = fields.get("company", "")
|
|
msg = f"{title}" + (f" @ {company}" if company else "")
|
|
update_task_status(db_path, task_id, "completed", error=msg)
|
|
# Auto-enrich company/salary for Craigslist jobs
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
job_row = conn.execute(
|
|
"SELECT source, company FROM jobs WHERE id=?", (job_id,)
|
|
).fetchone()
|
|
conn.close()
|
|
if job_row and job_row["source"] == "craigslist" and not job_row["company"]:
|
|
submit_task(db_path, "enrich_craigslist", job_id)
|
|
return
|
|
|
|
elif task_type == "enrich_craigslist":
|
|
from scripts.enrich_descriptions import enrich_craigslist_fields
|
|
extracted = enrich_craigslist_fields(db_path, job_id)
|
|
company = extracted.get("company", "")
|
|
msg = f"company={company}" if company else "no company found"
|
|
update_task_status(db_path, task_id, "completed", error=msg)
|
|
return
|
|
|
|
elif task_type == "email_sync":
|
|
try:
|
|
from scripts.imap_sync import sync_all
|
|
result = sync_all(db_path,
|
|
on_stage=lambda s: update_task_stage(db_path, task_id, s))
|
|
leads = result.get("new_leads", 0)
|
|
todo = result.get("todo_attached", 0)
|
|
errs = len(result.get("errors", []))
|
|
msg = (
|
|
f"{result['synced']} jobs updated, "
|
|
f"+{result['inbound']} in, +{result['outbound']} out"
|
|
+ (f", {leads} new lead(s)" if leads else "")
|
|
+ (f", {todo} todo attached" if todo else "")
|
|
+ (f", {errs} error(s)" if errs else "")
|
|
)
|
|
update_task_status(db_path, task_id, "completed", error=msg)
|
|
return
|
|
except FileNotFoundError:
|
|
update_task_status(db_path, task_id, "failed",
|
|
error="Email not configured — go to Settings → Email")
|
|
return
|
|
|
|
elif task_type == "wizard_generate":
|
|
import json as _json
|
|
p = _json.loads(params or "{}")
|
|
section = p.get("section", "")
|
|
input_data = p.get("input", {})
|
|
if not section:
|
|
raise ValueError("wizard_generate: 'section' key is required in params")
|
|
result = _run_wizard_generate(section, input_data)
|
|
update_task_status(
|
|
db_path, task_id, "completed",
|
|
error=_json.dumps({"section": section, "result": result}),
|
|
)
|
|
return
|
|
|
|
elif task_type == "prepare_training":
|
|
from scripts.prepare_training_data import build_records, write_jsonl, DEFAULT_OUTPUT
|
|
records = build_records()
|
|
write_jsonl(records, DEFAULT_OUTPUT)
|
|
n = len(records)
|
|
update_task_status(
|
|
db_path, task_id, "completed",
|
|
error=f"{n} training pair{'s' if n != 1 else ''} extracted",
|
|
)
|
|
return
|
|
|
|
else:
|
|
raise ValueError(f"Unknown task_type: {task_type!r}")
|
|
|
|
update_task_status(db_path, task_id, "completed")
|
|
|
|
except BaseException as exc:
|
|
# BaseException catches SystemExit (from companyScraper sys.exit calls)
|
|
# in addition to regular exceptions.
|
|
update_task_status(db_path, task_id, "failed", error=str(exc))
|