fix: async survey/analyze via task queue (#107)

Move POST /api/jobs/:id/survey/analyze off the FastAPI worker thread
by routing it through the LLM task queue (same pattern as cover_letter,
company_research, resume_optimize).

- Extract prompt builders + run_survey_analyze() to scripts/survey_assistant.py
- Add survey_analyze to LLM_TASK_TYPES (task_scheduler.py) with 2.5 GB VRAM budget
  (text mode: phi3:mini; visual mode uses vision service's own VRAM pool)
- Add elif branch in task_runner._run_task; result stored as JSON in error col
- Replace sync endpoint body with submit_task(); add GET /survey/analyze/task poll
- Update survey.ts store: analyze() now fires task + polls at 3s interval;
  silently attaches to existing in-flight task when is_new=false
- SurveyView button label shows task stage while polling

Fixes load-test spike: ~22 greenlets blocking on LLM inference at 100 concurrent
users, causing 90s poll timeouts on cover_letter and research tasks.
This commit is contained in:
pyr0ball 2026-04-20 11:06:14 -07:00
parent acc04b04eb
commit 9101e716ba
6 changed files with 254 additions and 90 deletions

View file

@ -1297,40 +1297,10 @@ def calendar_push(job_id: int):
from scripts.llm_router import LLMRouter
from scripts.db import insert_survey_response, get_survey_responses
_SURVEY_SYSTEM = (
"You are a job application advisor helping a candidate answer a culture-fit survey. "
"The candidate values collaborative teamwork, clear communication, growth, and impact. "
"Choose answers that present them in the best professional light."
)
def _build_text_prompt(text: str, mode: str) -> str:
if mode == "quick":
return (
"Answer each survey question below. For each, give ONLY the letter of the best "
"option and a single-sentence reason. Format exactly as:\n"
"1. B — reason here\n2. A — reason here\n\n"
f"Survey:\n{text}"
)
return (
"Analyze each survey question below. For each question:\n"
"- Briefly evaluate each option (1 sentence each)\n"
"- State your recommendation with reasoning\n\n"
f"Survey:\n{text}"
)
def _build_image_prompt(mode: str) -> str:
if mode == "quick":
return (
"This is a screenshot of a culture-fit survey. Read all questions and answer each "
"with the letter of the best option for a collaborative, growth-oriented candidate. "
"Format: '1. B — brief reason' on separate lines."
)
return (
"This is a screenshot of a culture-fit survey. For each question, evaluate each option "
"and recommend the best choice for a collaborative, growth-oriented candidate. "
"Include a brief breakdown per option and a clear recommendation."
from scripts.survey_assistant import (
SURVEY_SYSTEM as _SURVEY_SYSTEM,
build_text_prompt as _build_text_prompt,
build_image_prompt as _build_image_prompt,
)
@ -1353,29 +1323,62 @@ class SurveyAnalyzeBody(BaseModel):
def survey_analyze(job_id: int, body: SurveyAnalyzeBody):
if body.mode not in ("quick", "detailed"):
raise HTTPException(400, f"Invalid mode: {body.mode!r}")
import json as _json
from scripts.task_runner import submit_task
params = _json.dumps({
"text": body.text,
"image_b64": body.image_b64,
"mode": body.mode,
})
try:
router = LLMRouter()
if body.image_b64:
prompt = _build_image_prompt(body.mode)
output = router.complete(
prompt,
images=[body.image_b64],
fallback_order=router.config.get("vision_fallback_order"),
task_id, is_new = submit_task(
db_path=Path(_request_db.get() or DB_PATH),
task_type="survey_analyze",
job_id=job_id,
params=params,
)
source = "screenshot"
else:
prompt = _build_text_prompt(body.text or "", body.mode)
output = router.complete(
prompt,
system=_SURVEY_SYSTEM,
fallback_order=router.config.get("research_fallback_order"),
)
source = "text_paste"
return {"output": output, "source": source}
return {"task_id": task_id, "is_new": is_new}
except Exception as e:
raise HTTPException(500, str(e))
# ── GET /api/jobs/:id/survey/analyze/task ────────────────────────────────────
@app.get("/api/jobs/{job_id}/survey/analyze/task")
def survey_analyze_task(job_id: int, task_id: Optional[int] = None):
import json as _json
db = _get_db()
if task_id is not None:
row = db.execute(
"SELECT status, stage, error FROM background_tasks WHERE id = ? AND job_id = ?",
(task_id, job_id),
).fetchone()
else:
row = db.execute(
"SELECT status, stage, error FROM background_tasks "
"WHERE task_type = 'survey_analyze' AND job_id = ? "
"ORDER BY id DESC LIMIT 1",
(job_id,),
).fetchone()
db.close()
if not row:
return {"status": "none", "stage": None, "result": None, "message": None}
result = None
message = row["error"]
if row["status"] == "completed" and row["error"]:
try:
result = _json.loads(row["error"])
message = None
except (ValueError, TypeError):
pass
return {
"status": row["status"],
"stage": row["stage"],
"result": result,
"message": message,
}
class SurveySaveBody(BaseModel):
survey_name: Optional[str] = None
mode: str

View file

@ -0,0 +1,86 @@
# MIT License — see LICENSE
"""Survey assistant: prompt builders and LLM inference for culture-fit survey analysis.
Extracted from dev-api.py so task_runner can import this without importing the
FastAPI application. Callable directly or via the survey_analyze background task.
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Optional
log = logging.getLogger(__name__)
SURVEY_SYSTEM = (
"You are a job application advisor helping a candidate answer a culture-fit survey. "
"The candidate values collaborative teamwork, clear communication, growth, and impact. "
"Choose answers that present them in the best professional light."
)
def build_text_prompt(text: str, mode: str) -> str:
if mode == "quick":
return (
"Answer each survey question below. For each, give ONLY the letter of the best "
"option and a single-sentence reason. Format exactly as:\n"
"1. B — reason here\n2. A — reason here\n\n"
f"Survey:\n{text}"
)
return (
"Analyze each survey question below. For each question:\n"
"- Briefly evaluate each option (1 sentence each)\n"
"- State your recommendation with reasoning\n\n"
f"Survey:\n{text}"
)
def build_image_prompt(mode: str) -> str:
if mode == "quick":
return (
"This is a screenshot of a culture-fit survey. Read all questions and answer each "
"with the letter of the best option for a collaborative, growth-oriented candidate. "
"Format: '1. B — brief reason' on separate lines."
)
return (
"This is a screenshot of a culture-fit survey. For each question, evaluate each option "
"and recommend the best choice for a collaborative, growth-oriented candidate. "
"Include a brief breakdown per option and a clear recommendation."
)
def run_survey_analyze(
text: Optional[str],
image_b64: Optional[str],
mode: str,
config_path: Optional[Path] = None,
) -> dict:
"""Run LLM inference for survey analysis.
Returns {"output": str, "source": "text_paste" | "screenshot"}.
Raises on LLM failure caller is responsible for error handling.
"""
from scripts.llm_router import LLMRouter
router = LLMRouter(config_path=config_path) if config_path else LLMRouter()
if image_b64:
prompt = build_image_prompt(mode)
output = router.complete(
prompt,
images=[image_b64],
fallback_order=router.config.get("vision_fallback_order"),
)
source = "screenshot"
else:
prompt = build_text_prompt(text or "", mode)
output = router.complete(
prompt,
system=SURVEY_SYSTEM,
fallback_order=router.config.get("research_fallback_order"),
)
source = "text_paste"
return {"output": output, "source": source}

View file

@ -404,6 +404,24 @@ def _run_task(db_path: Path, task_id: int, task_type: str, job_id: int,
save_optimized_resume(db_path, job_id=job_id,
text="", gap_report=gap_report)
elif task_type == "survey_analyze":
import json as _json
from scripts.survey_assistant import run_survey_analyze
p = _json.loads(params or "{}")
_cfg_path = Path(db_path).parent / "config" / "llm.yaml"
update_task_stage(db_path, task_id, "analyzing survey")
result = run_survey_analyze(
text=p.get("text"),
image_b64=p.get("image_b64"),
mode=p.get("mode", "quick"),
config_path=_cfg_path if _cfg_path.exists() else None,
)
update_task_status(
db_path, task_id, "completed",
error=_json.dumps(result),
)
return
elif task_type == "prepare_training":
from scripts.prepare_training_data import build_records, write_jsonl, DEFAULT_OUTPUT
records = build_records()

View file

@ -34,6 +34,7 @@ LLM_TASK_TYPES: frozenset[str] = frozenset({
"company_research",
"wizard_generate",
"resume_optimize",
"survey_analyze",
})
# Conservative peak VRAM estimates (GB) per task type.
@ -43,6 +44,7 @@ DEFAULT_VRAM_BUDGETS: dict[str, float] = {
"company_research": 5.0, # llama3.1:8b or vllm model
"wizard_generate": 2.5, # same model family as cover_letter
"resume_optimize": 5.0, # section-by-section rewrite; same budget as research
"survey_analyze": 2.5, # text: phi3:mini; visual: vision service (own VRAM pool)
}
_DEFAULT_MAX_QUEUE_DEPTH = 500

View file

@ -28,14 +28,33 @@ export interface SurveyResponse {
created_at: string | null
}
interface TaskStatus {
status: 'queued' | 'running' | 'completed' | 'failed' | 'none' | null
stage: string | null
result: { output: string; source: string } | null
message: string | null
}
export const useSurveyStore = defineStore('survey', () => {
const analysis = ref<SurveyAnalysis | null>(null)
const history = ref<SurveyResponse[]>([])
const loading = ref(false)
const saving = ref(false)
const error = ref<string | null>(null)
const taskStatus = ref<TaskStatus>({ status: null, stage: null, result: null, message: null })
const visionAvailable = ref(false)
const currentJobId = ref<number | null>(null)
// Pending analyze payload held across the poll lifecycle so rawInput/mode survive
const _pendingPayload = ref<{ text?: string; image_b64?: string; mode: 'quick' | 'detailed' } | null>(null)
let pollInterval: ReturnType<typeof setInterval> | null = null
function _clearInterval() {
if (pollInterval !== null) {
clearInterval(pollInterval)
pollInterval = null
}
}
async function fetchFor(jobId: number) {
if (jobId !== currentJobId.value) {
@ -43,6 +62,7 @@ export const useSurveyStore = defineStore('survey', () => {
history.value = []
error.value = null
visionAvailable.value = false
taskStatus.value = { status: null, stage: null, result: null, message: null }
currentJobId.value = jobId
}
@ -69,23 +89,55 @@ export const useSurveyStore = defineStore('survey', () => {
jobId: number,
payload: { text?: string; image_b64?: string; mode: 'quick' | 'detailed' }
) {
_clearInterval()
loading.value = true
error.value = null
const { data, error: fetchError } = await useApiFetch<{ output: string; source: string }>(
_pendingPayload.value = payload
const { data, error: fetchError } = await useApiFetch<{ task_id: number; is_new: boolean }>(
`/api/jobs/${jobId}/survey/analyze`,
{ method: 'POST', body: JSON.stringify(payload) }
)
loading.value = false
if (fetchError || !data) {
error.value = 'Analysis failed. Please try again.'
loading.value = false
error.value = 'Failed to start analysis. Please try again.'
return
}
analysis.value = {
output: data.output,
source: isValidSource(data.source) ? data.source : 'text_paste',
mode: payload.mode,
rawInput: payload.text ?? null,
// Silently attach to the existing task if is_new=false — same task_id, same poll
taskStatus.value = { status: 'queued', stage: null, result: null, message: null }
pollTask(jobId, data.task_id)
}
function pollTask(jobId: number, taskId: number) {
_clearInterval()
pollInterval = setInterval(async () => {
const { data } = await useApiFetch<TaskStatus>(
`/api/jobs/${jobId}/survey/analyze/task?task_id=${taskId}`
)
if (!data) return
taskStatus.value = data
if (data.status === 'completed' || data.status === 'failed') {
_clearInterval()
loading.value = false
if (data.status === 'completed' && data.result) {
const payload = _pendingPayload.value
analysis.value = {
output: data.result.output,
source: isValidSource(data.result.source) ? data.result.source : 'text_paste',
mode: payload?.mode ?? 'quick',
rawInput: payload?.text ?? null,
}
} else if (data.status === 'failed') {
error.value = data.message ?? 'Analysis failed. Please try again.'
}
_pendingPayload.value = null
}
}, 3000)
}
async function saveResponse(
@ -113,7 +165,6 @@ export const useSurveyStore = defineStore('survey', () => {
error.value = 'Save failed. Your analysis is preserved — try again.'
return
}
// Prepend the saved response to history
const now = new Date().toISOString()
const saved: SurveyResponse = {
id: data.id,
@ -132,13 +183,16 @@ export const useSurveyStore = defineStore('survey', () => {
}
function clear() {
_clearInterval()
analysis.value = null
history.value = []
loading.value = false
saving.value = false
error.value = null
taskStatus.value = { status: null, stage: null, result: null, message: null }
visionAvailable.value = false
currentJobId.value = null
_pendingPayload.value = null
}
return {
@ -147,6 +201,7 @@ export const useSurveyStore = defineStore('survey', () => {
loading,
saving,
error,
taskStatus,
visionAvailable,
currentJobId,
fetchFor,

View file

@ -269,7 +269,7 @@ function toggleHistoryEntry(id: number) {
@click="runAnalyze"
>
<span v-if="surveyStore.loading" class="spinner" aria-hidden="true"></span>
{{ surveyStore.loading ? 'Analyzing…' : '🔍 Analyze' }}
{{ surveyStore.loading ? (surveyStore.taskStatus.stage ? surveyStore.taskStatus.stage + '…' : 'Analyzing…') : '🔍 Analyze' }}
</button>
<!-- Analyze error -->