peregrine/scripts/db.py
pyr0ball edb169959a feat: wizard fields in UserProfile + params column in background_tasks
- Add tier, dev_tier_override, wizard_complete, wizard_step, dismissed_banners
  fields to UserProfile with defaults and effective_tier property
- Add params TEXT column to background_tasks table (CREATE + migration)
- Update insert_task() to accept params with params-aware dedup logic
- Update submit_task() and _run_task() to thread params through
- Add test_wizard_defaults, test_effective_tier_override,
  test_effective_tier_no_override, and test_insert_task_with_params
2026-02-25 07:27:14 -08:00

745 lines
26 KiB
Python

"""
SQLite staging layer for job listings.
Jobs flow: pending → approved/rejected → applied → synced
applied → phone_screen → interviewing → offer → hired (or rejected)
"""
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Optional
DEFAULT_DB = Path(__file__).parent.parent / "staging.db"
CREATE_JOBS = """
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
company TEXT,
url TEXT UNIQUE,
source TEXT,
location TEXT,
is_remote INTEGER DEFAULT 0,
salary TEXT,
description TEXT,
match_score REAL,
keyword_gaps TEXT,
date_found TEXT,
status TEXT DEFAULT 'pending',
notion_page_id TEXT,
cover_letter TEXT,
applied_at TEXT
);
"""
CREATE_JOB_CONTACTS = """
CREATE TABLE IF NOT EXISTS job_contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id INTEGER NOT NULL,
direction TEXT DEFAULT 'inbound',
subject TEXT,
from_addr TEXT,
to_addr TEXT,
body TEXT,
received_at TEXT,
is_response_needed INTEGER DEFAULT 0,
responded_at TEXT,
message_id TEXT,
FOREIGN KEY (job_id) REFERENCES jobs(id)
);
"""
_CONTACT_MIGRATIONS = [
("message_id", "TEXT"),
("stage_signal", "TEXT"),
("suggestion_dismissed", "INTEGER DEFAULT 0"),
]
_RESEARCH_MIGRATIONS = [
("tech_brief", "TEXT"),
("funding_brief", "TEXT"),
("competitors_brief", "TEXT"),
("red_flags", "TEXT"),
("scrape_used", "INTEGER"), # 1 = SearXNG contributed data, 0 = LLM-only
("accessibility_brief", "TEXT"), # Inclusion & Accessibility section
]
CREATE_COMPANY_RESEARCH = """
CREATE TABLE IF NOT EXISTS company_research (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id INTEGER NOT NULL UNIQUE,
generated_at TEXT,
company_brief TEXT,
ceo_brief TEXT,
talking_points TEXT,
raw_output TEXT,
tech_brief TEXT,
funding_brief TEXT,
competitors_brief TEXT,
red_flags TEXT,
FOREIGN KEY (job_id) REFERENCES jobs(id)
);
"""
CREATE_BACKGROUND_TASKS = """
CREATE TABLE IF NOT EXISTS background_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_type TEXT NOT NULL,
job_id INTEGER DEFAULT 0,
params TEXT,
status TEXT NOT NULL DEFAULT 'queued',
error TEXT,
created_at DATETIME DEFAULT (datetime('now')),
started_at DATETIME,
finished_at DATETIME,
stage TEXT,
updated_at DATETIME
)
"""
CREATE_SURVEY_RESPONSES = """
CREATE TABLE IF NOT EXISTS survey_responses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id INTEGER NOT NULL REFERENCES jobs(id),
survey_name TEXT,
received_at DATETIME,
source TEXT,
raw_input TEXT,
image_path TEXT,
mode TEXT,
llm_output TEXT,
reported_score TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
"""
_MIGRATIONS = [
("cover_letter", "TEXT"),
("applied_at", "TEXT"),
("interview_date", "TEXT"),
("rejection_stage", "TEXT"),
("phone_screen_at", "TEXT"),
("interviewing_at", "TEXT"),
("offer_at", "TEXT"),
("hired_at", "TEXT"),
("survey_at", "TEXT"),
]
def _migrate_db(db_path: Path) -> None:
"""Add new columns to existing tables without breaking old data."""
conn = sqlite3.connect(db_path)
for col, coltype in _MIGRATIONS:
try:
conn.execute(f"ALTER TABLE jobs ADD COLUMN {col} {coltype}")
except sqlite3.OperationalError:
pass # column already exists
for col, coltype in _CONTACT_MIGRATIONS:
try:
conn.execute(f"ALTER TABLE job_contacts ADD COLUMN {col} {coltype}")
except sqlite3.OperationalError:
pass
for col, coltype in _RESEARCH_MIGRATIONS:
try:
conn.execute(f"ALTER TABLE company_research ADD COLUMN {col} {coltype}")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE background_tasks ADD COLUMN stage TEXT")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE background_tasks ADD COLUMN updated_at TEXT")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE background_tasks ADD COLUMN params TEXT")
except sqlite3.OperationalError:
pass # column already exists
conn.commit()
conn.close()
def init_db(db_path: Path = DEFAULT_DB) -> None:
"""Create tables if they don't exist, then run migrations."""
conn = sqlite3.connect(db_path)
conn.execute(CREATE_JOBS)
conn.execute(CREATE_JOB_CONTACTS)
conn.execute(CREATE_COMPANY_RESEARCH)
conn.execute(CREATE_BACKGROUND_TASKS)
conn.execute(CREATE_SURVEY_RESPONSES)
conn.commit()
conn.close()
_migrate_db(db_path)
def insert_job(db_path: Path = DEFAULT_DB, job: dict = None) -> Optional[int]:
"""Insert a job. Returns row id, or None if URL already exists."""
if job is None:
return None
conn = sqlite3.connect(db_path)
try:
cursor = conn.execute(
"""INSERT INTO jobs
(title, company, url, source, location, is_remote, salary, description, date_found)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
job.get("title", ""),
job.get("company", ""),
job.get("url", ""),
job.get("source", ""),
job.get("location", ""),
int(bool(job.get("is_remote", False))),
job.get("salary", ""),
job.get("description", ""),
job.get("date_found", ""),
),
)
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
return None # duplicate URL
finally:
conn.close()
def get_job_by_id(db_path: Path = DEFAULT_DB, job_id: int = None) -> Optional[dict]:
"""Return a single job by ID, or None if not found."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
row = conn.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()
conn.close()
return dict(row) if row else None
def get_jobs_by_status(db_path: Path = DEFAULT_DB, status: str = "pending") -> list[dict]:
"""Return all jobs with the given status as a list of dicts."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"SELECT * FROM jobs WHERE status = ? ORDER BY date_found DESC, id DESC",
(status,),
)
rows = [dict(row) for row in cursor.fetchall()]
conn.close()
return rows
def get_email_leads(db_path: Path = DEFAULT_DB) -> list[dict]:
"""Return pending jobs with source='email', newest first."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT * FROM jobs WHERE source = 'email' AND status = 'pending' "
"ORDER BY date_found DESC, id DESC"
).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_job_counts(db_path: Path = DEFAULT_DB) -> dict:
"""Return counts per status."""
conn = sqlite3.connect(db_path)
cursor = conn.execute(
"SELECT status, COUNT(*) as n FROM jobs GROUP BY status"
)
counts = {row[0]: row[1] for row in cursor.fetchall()}
conn.close()
return counts
def update_job_status(db_path: Path = DEFAULT_DB, ids: list[int] = None, status: str = "approved") -> None:
"""Batch-update status for a list of job IDs."""
if not ids:
return
conn = sqlite3.connect(db_path)
conn.execute(
f"UPDATE jobs SET status = ? WHERE id IN ({','.join('?' * len(ids))})",
[status] + list(ids),
)
conn.commit()
conn.close()
def get_existing_urls(db_path: Path = DEFAULT_DB) -> set[str]:
"""Return all URLs already in staging (any status)."""
conn = sqlite3.connect(db_path)
cursor = conn.execute("SELECT url FROM jobs")
urls = {row[0] for row in cursor.fetchall()}
conn.close()
return urls
def write_match_scores(db_path: Path = DEFAULT_DB, job_id: int = None,
score: float = 0.0, gaps: str = "") -> None:
"""Write match score and keyword gaps back to a job row."""
conn = sqlite3.connect(db_path)
conn.execute(
"UPDATE jobs SET match_score = ?, keyword_gaps = ? WHERE id = ?",
(score, gaps, job_id),
)
conn.commit()
conn.close()
def update_cover_letter(db_path: Path = DEFAULT_DB, job_id: int = None, text: str = "") -> None:
"""Persist a generated/edited cover letter for a job."""
if job_id is None:
return
conn = sqlite3.connect(db_path)
conn.execute("UPDATE jobs SET cover_letter = ? WHERE id = ?", (text, job_id))
conn.commit()
conn.close()
_UPDATABLE_JOB_COLS = {
"title", "company", "url", "source", "location", "is_remote",
"salary", "description", "match_score", "keyword_gaps",
}
def update_job_fields(db_path: Path = DEFAULT_DB, job_id: int = None,
fields: dict = None) -> None:
"""Update arbitrary job columns. Unknown keys are silently ignored."""
if job_id is None or not fields:
return
safe = {k: v for k, v in fields.items() if k in _UPDATABLE_JOB_COLS}
if not safe:
return
conn = sqlite3.connect(db_path)
sets = ", ".join(f"{col} = ?" for col in safe)
conn.execute(
f"UPDATE jobs SET {sets} WHERE id = ?",
(*safe.values(), job_id),
)
conn.commit()
conn.close()
def mark_applied(db_path: Path = DEFAULT_DB, ids: list[int] = None) -> None:
"""Set status='applied' and record today's date for a list of job IDs."""
if not ids:
return
today = datetime.now().isoformat()[:10]
conn = sqlite3.connect(db_path)
conn.execute(
f"UPDATE jobs SET status = 'applied', applied_at = ? WHERE id IN ({','.join('?' * len(ids))})",
[today] + list(ids),
)
conn.commit()
conn.close()
def kill_stuck_tasks(db_path: Path = DEFAULT_DB) -> int:
"""Mark all queued/running background tasks as failed. Returns count killed."""
conn = sqlite3.connect(db_path)
count = conn.execute(
"UPDATE background_tasks SET status='failed', error='Killed by user',"
" finished_at=datetime('now') WHERE status IN ('queued','running')"
).rowcount
conn.commit()
conn.close()
return count
def purge_email_data(db_path: Path = DEFAULT_DB) -> tuple[int, int]:
"""Delete all job_contacts rows and email-sourced pending jobs.
Returns (contacts_deleted, jobs_deleted).
"""
conn = sqlite3.connect(db_path)
c1 = conn.execute("DELETE FROM job_contacts").rowcount
c2 = conn.execute("DELETE FROM jobs WHERE source='email'").rowcount
conn.commit()
conn.close()
return c1, c2
def purge_jobs(db_path: Path = DEFAULT_DB, statuses: list[str] = None) -> int:
"""Delete jobs matching given statuses. Returns number of rows deleted.
If statuses is None or empty, deletes ALL jobs (full reset).
"""
conn = sqlite3.connect(db_path)
if statuses:
placeholders = ",".join("?" * len(statuses))
cur = conn.execute(f"DELETE FROM jobs WHERE status IN ({placeholders})", statuses)
else:
cur = conn.execute("DELETE FROM jobs")
count = cur.rowcount
conn.commit()
conn.close()
return count
def purge_non_remote(db_path: Path = DEFAULT_DB) -> int:
"""Delete non-remote jobs that are not yet in the active pipeline.
Preserves applied, phone_screen, interviewing, offer, hired, and synced records.
Returns number of rows deleted.
"""
_safe = ("applied", "phone_screen", "interviewing", "offer", "hired", "synced")
placeholders = ",".join("?" * len(_safe))
conn = sqlite3.connect(db_path)
count = conn.execute(
f"DELETE FROM jobs WHERE (is_remote = 0 OR is_remote IS NULL)"
f" AND status NOT IN ({placeholders})",
_safe,
).rowcount
conn.commit()
conn.close()
return count
def archive_jobs(db_path: Path = DEFAULT_DB, statuses: list[str] = None) -> int:
"""Set status='archived' for jobs matching given statuses.
Archived jobs stay in the DB (preserving dedup by URL) but are invisible
to Job Review and other pipeline views.
Returns number of rows updated.
"""
if not statuses:
return 0
placeholders = ",".join("?" * len(statuses))
conn = sqlite3.connect(db_path)
count = conn.execute(
f"UPDATE jobs SET status = 'archived' WHERE status IN ({placeholders})",
statuses,
).rowcount
conn.commit()
conn.close()
return count
# ── Interview pipeline helpers ────────────────────────────────────────────────
_STAGE_TS_COL = {
"phone_screen": "phone_screen_at",
"interviewing": "interviewing_at",
"offer": "offer_at",
"hired": "hired_at",
"survey": "survey_at",
}
def get_interview_jobs(db_path: Path = DEFAULT_DB) -> dict[str, list[dict]]:
"""Return jobs grouped by interview/post-apply stage."""
stages = ["applied", "survey", "phone_screen", "interviewing", "offer", "hired", "rejected"]
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
result: dict[str, list[dict]] = {}
for stage in stages:
cursor = conn.execute(
"SELECT * FROM jobs WHERE status = ? ORDER BY applied_at DESC, id DESC",
(stage,),
)
result[stage] = [dict(row) for row in cursor.fetchall()]
conn.close()
return result
def advance_to_stage(db_path: Path = DEFAULT_DB, job_id: int = None, stage: str = "") -> None:
"""Move a job to the next interview stage and record a timestamp."""
now = datetime.now().isoformat()[:16]
ts_col = _STAGE_TS_COL.get(stage)
conn = sqlite3.connect(db_path)
if ts_col:
conn.execute(
f"UPDATE jobs SET status = ?, {ts_col} = ? WHERE id = ?",
(stage, now, job_id),
)
else:
conn.execute("UPDATE jobs SET status = ? WHERE id = ?", (stage, job_id))
conn.commit()
conn.close()
def reject_at_stage(db_path: Path = DEFAULT_DB, job_id: int = None,
rejection_stage: str = "") -> None:
"""Mark a job as rejected and record at which stage it was rejected."""
conn = sqlite3.connect(db_path)
conn.execute(
"UPDATE jobs SET status = 'rejected', rejection_stage = ? WHERE id = ?",
(rejection_stage, job_id),
)
conn.commit()
conn.close()
def set_interview_date(db_path: Path = DEFAULT_DB, job_id: int = None,
date_str: str = "") -> None:
"""Persist an interview date for a job."""
conn = sqlite3.connect(db_path)
conn.execute("UPDATE jobs SET interview_date = ? WHERE id = ?", (date_str, job_id))
conn.commit()
conn.close()
# ── Contact log helpers ───────────────────────────────────────────────────────
def add_contact(db_path: Path = DEFAULT_DB, job_id: int = None,
direction: str = "inbound", subject: str = "",
from_addr: str = "", to_addr: str = "",
body: str = "", received_at: str = "",
message_id: str = "",
stage_signal: str = "") -> int:
"""Log an email contact. Returns the new row id."""
ts = received_at or datetime.now().isoformat()[:16]
conn = sqlite3.connect(db_path)
cur = conn.execute(
"""INSERT INTO job_contacts
(job_id, direction, subject, from_addr, to_addr, body,
received_at, message_id, stage_signal)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(job_id, direction, subject, from_addr, to_addr, body,
ts, message_id, stage_signal or None),
)
conn.commit()
row_id = cur.lastrowid
conn.close()
return row_id
def get_contacts(db_path: Path = DEFAULT_DB, job_id: int = None) -> list[dict]:
"""Return all contact log entries for a job, oldest first."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"SELECT * FROM job_contacts WHERE job_id = ? ORDER BY received_at ASC",
(job_id,),
)
rows = [dict(row) for row in cursor.fetchall()]
conn.close()
return rows
def get_unread_stage_signals(db_path: Path = DEFAULT_DB,
job_id: int = None) -> list[dict]:
"""Return inbound contacts with a non-neutral, non-dismissed stage signal."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
rows = conn.execute(
"""SELECT * FROM job_contacts
WHERE job_id = ?
AND direction = 'inbound'
AND stage_signal IS NOT NULL
AND stage_signal != 'neutral'
AND (suggestion_dismissed IS NULL OR suggestion_dismissed = 0)
ORDER BY received_at ASC""",
(job_id,),
).fetchall()
conn.close()
return [dict(r) for r in rows]
def dismiss_stage_signal(db_path: Path = DEFAULT_DB,
contact_id: int = None) -> None:
"""Mark a stage signal suggestion as dismissed."""
conn = sqlite3.connect(db_path)
conn.execute(
"UPDATE job_contacts SET suggestion_dismissed = 1 WHERE id = ?",
(contact_id,),
)
conn.commit()
conn.close()
def get_all_message_ids(db_path: Path = DEFAULT_DB) -> set[str]:
"""Return all known Message-IDs across all job contacts."""
conn = sqlite3.connect(db_path)
rows = conn.execute(
"SELECT message_id FROM job_contacts WHERE message_id IS NOT NULL AND message_id != ''"
).fetchall()
conn.close()
return {r[0] for r in rows}
# ── Company research helpers ──────────────────────────────────────────────────
def save_research(db_path: Path = DEFAULT_DB, job_id: int = None,
company_brief: str = "", ceo_brief: str = "",
talking_points: str = "", raw_output: str = "",
tech_brief: str = "", funding_brief: str = "",
competitors_brief: str = "", red_flags: str = "",
accessibility_brief: str = "",
scrape_used: int = 0) -> None:
"""Insert or replace a company research record for a job."""
now = datetime.now().isoformat()[:16]
conn = sqlite3.connect(db_path)
conn.execute(
"""INSERT INTO company_research
(job_id, generated_at, company_brief, ceo_brief, talking_points,
raw_output, tech_brief, funding_brief, competitors_brief, red_flags,
accessibility_brief, scrape_used)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(job_id) DO UPDATE SET
generated_at = excluded.generated_at,
company_brief = excluded.company_brief,
ceo_brief = excluded.ceo_brief,
talking_points = excluded.talking_points,
raw_output = excluded.raw_output,
tech_brief = excluded.tech_brief,
funding_brief = excluded.funding_brief,
competitors_brief = excluded.competitors_brief,
red_flags = excluded.red_flags,
accessibility_brief = excluded.accessibility_brief,
scrape_used = excluded.scrape_used""",
(job_id, now, company_brief, ceo_brief, talking_points, raw_output,
tech_brief, funding_brief, competitors_brief, red_flags,
accessibility_brief, scrape_used),
)
conn.commit()
conn.close()
def get_research(db_path: Path = DEFAULT_DB, job_id: int = None) -> Optional[dict]:
"""Return the company research record for a job, or None if absent."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"SELECT * FROM company_research WHERE job_id = ?", (job_id,)
)
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
# ── Survey response helpers ───────────────────────────────────────────────────
def insert_survey_response(
db_path: Path = DEFAULT_DB,
job_id: int = None,
survey_name: str = "",
received_at: str = "",
source: str = "text_paste",
raw_input: str = "",
image_path: str = "",
mode: str = "quick",
llm_output: str = "",
reported_score: str = "",
) -> int:
"""Insert a survey response row. Returns the new row id."""
conn = sqlite3.connect(db_path)
cur = conn.execute(
"""INSERT INTO survey_responses
(job_id, survey_name, received_at, source, raw_input,
image_path, mode, llm_output, reported_score)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(job_id, survey_name or None, received_at or None,
source, raw_input or None, image_path or None,
mode, llm_output, reported_score or None),
)
conn.commit()
row_id = cur.lastrowid
conn.close()
return row_id
def get_survey_responses(db_path: Path = DEFAULT_DB, job_id: int = None) -> list[dict]:
"""Return all survey responses for a job, newest first."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT * FROM survey_responses WHERE job_id = ? ORDER BY created_at DESC",
(job_id,),
).fetchall()
conn.close()
return [dict(r) for r in rows]
# ── Background task helpers ───────────────────────────────────────────────────
def insert_task(db_path: Path = DEFAULT_DB, task_type: str = "",
job_id: int = None,
params: Optional[str] = None) -> tuple[int, bool]:
"""Insert a new background task.
Returns (task_id, True) if inserted, or (existing_id, False) if a
queued/running task for the same (task_type, job_id) already exists.
Dedup key: (task_type, job_id) when params is None;
(task_type, job_id, params) when params is provided.
"""
conn = sqlite3.connect(db_path)
try:
if params is not None:
existing = conn.execute(
"SELECT id FROM background_tasks WHERE task_type=? AND job_id=? "
"AND params=? AND status IN ('queued','running')",
(task_type, job_id, params),
).fetchone()
else:
existing = conn.execute(
"SELECT id FROM background_tasks WHERE task_type=? AND job_id=? "
"AND status IN ('queued','running')",
(task_type, job_id),
).fetchone()
if existing:
return existing[0], False
cur = conn.execute(
"INSERT INTO background_tasks (task_type, job_id, params) VALUES (?,?,?)",
(task_type, job_id, params),
)
conn.commit()
return cur.lastrowid, True
finally:
conn.close()
def update_task_status(db_path: Path = DEFAULT_DB, task_id: int = None,
status: str = "", error: Optional[str] = None) -> None:
"""Update a task's status and set the appropriate timestamp."""
now = datetime.now().isoformat()[:16]
conn = sqlite3.connect(db_path)
if status == "running":
conn.execute(
"UPDATE background_tasks SET status=?, started_at=?, updated_at=? WHERE id=?",
(status, now, now, task_id),
)
elif status in ("completed", "failed"):
conn.execute(
"UPDATE background_tasks SET status=?, finished_at=?, updated_at=?, error=? WHERE id=?",
(status, now, now, error, task_id),
)
else:
conn.execute(
"UPDATE background_tasks SET status=?, updated_at=? WHERE id=?",
(status, now, task_id),
)
conn.commit()
conn.close()
def update_task_stage(db_path: Path = DEFAULT_DB, task_id: int = None,
stage: str = "") -> None:
"""Update the stage label on a running task (for progress display)."""
conn = sqlite3.connect(db_path)
conn.execute("UPDATE background_tasks SET stage=? WHERE id=?", (stage, task_id))
conn.commit()
conn.close()
def get_active_tasks(db_path: Path = DEFAULT_DB) -> list[dict]:
"""Return all queued/running tasks with job title and company joined in."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
rows = conn.execute("""
SELECT bt.*, j.title, j.company
FROM background_tasks bt
LEFT JOIN jobs j ON j.id = bt.job_id
WHERE bt.status IN ('queued', 'running')
ORDER BY bt.created_at ASC
""").fetchall()
conn.close()
return [dict(r) for r in rows]
def get_task_for_job(db_path: Path = DEFAULT_DB, task_type: str = "",
job_id: int = None) -> Optional[dict]:
"""Return the most recent task row for a (task_type, job_id) pair, or None."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
row = conn.execute(
"""SELECT * FROM background_tasks
WHERE task_type=? AND job_id=?
ORDER BY id DESC LIMIT 1""",
(task_type, job_id),
).fetchone()
conn.close()
return dict(row) if row else None