feat: add resume library CRUD helpers to db.py

This commit is contained in:
pyr0ball 2026-04-12 10:39:32 -07:00
parent 6e73bfc48a
commit 365eff1506
2 changed files with 327 additions and 0 deletions

View file

@ -345,6 +345,96 @@ def get_optimized_resume(db_path: Path = DEFAULT_DB, job_id: int = None) -> dict
} }
def save_resume_draft(db_path: Path = DEFAULT_DB, job_id: int = None,
draft_json: str = "") -> None:
"""Persist a structured resume review draft (awaiting user approval)."""
if job_id is None:
return
conn = sqlite3.connect(db_path)
conn.execute(
"UPDATE jobs SET resume_draft_json = ? WHERE id = ?",
(draft_json or None, job_id),
)
conn.commit()
conn.close()
def get_resume_draft(db_path: Path = DEFAULT_DB, job_id: int = None) -> dict | None:
"""Return the pending review draft, or None if no draft is waiting."""
if job_id is None:
return None
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT resume_draft_json FROM jobs WHERE id = ?", (job_id,)
).fetchone()
conn.close()
if not row or not row["resume_draft_json"]:
return None
import json
try:
return json.loads(row["resume_draft_json"])
except Exception:
return None
def finalize_resume(db_path: Path = DEFAULT_DB, job_id: int = None,
final_text: str = "") -> None:
"""Save approved resume text, archive the previous version, and clear draft."""
if job_id is None:
return
import json
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT optimized_resume, resume_archive_json FROM jobs WHERE id = ?", (job_id,)
).fetchone()
conn.row_factory = None
# Archive current finalized version if present
archive: list = []
if row:
if row["resume_archive_json"]:
try:
archive = json.loads(row["resume_archive_json"])
except Exception:
archive = []
if row["optimized_resume"]:
from datetime import datetime
archive.append({
"archived_at": datetime.now().isoformat()[:16],
"text": row["optimized_resume"],
})
conn.execute(
"UPDATE jobs SET optimized_resume = ?, resume_draft_json = NULL, "
"resume_archive_json = ? WHERE id = ?",
(final_text, json.dumps(archive), job_id),
)
conn.commit()
conn.close()
def get_resume_archive(db_path: Path = DEFAULT_DB, job_id: int = None) -> list:
"""Return list of past finalized resume versions (newest archived first)."""
if job_id is None:
return []
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT resume_archive_json FROM jobs WHERE id = ?", (job_id,)
).fetchone()
conn.close()
if not row or not row["resume_archive_json"]:
return []
import json
try:
entries = json.loads(row["resume_archive_json"])
return list(reversed(entries)) # newest first
except Exception:
return []
_UPDATABLE_JOB_COLS = { _UPDATABLE_JOB_COLS = {
"title", "company", "url", "source", "location", "is_remote", "title", "company", "url", "source", "location", "is_remote",
"salary", "description", "match_score", "keyword_gaps", "salary", "description", "match_score", "keyword_gaps",
@ -831,3 +921,151 @@ def get_task_for_job(db_path: Path = DEFAULT_DB, task_type: str = "",
).fetchone() ).fetchone()
conn.close() conn.close()
return dict(row) if row else None return dict(row) if row else None
# ── Resume library helpers ────────────────────────────────────────────────────
def _resume_as_dict(row) -> dict:
"""Convert a sqlite3.Row from the resumes table to a plain dict."""
return {
"id": row["id"],
"name": row["name"],
"source": row["source"],
"job_id": row["job_id"],
"text": row["text"],
"struct_json": row["struct_json"],
"word_count": row["word_count"],
"is_default": row["is_default"],
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def create_resume(
db_path: Path = DEFAULT_DB,
name: str = "",
text: str = "",
source: str = "manual",
job_id: int | None = None,
struct_json: str | None = None,
) -> dict:
"""Insert a new resume into the library. Returns the created row as a dict."""
word_count = len(text.split()) if text else 0
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
cur = conn.execute(
"""INSERT INTO resumes (name, source, job_id, text, struct_json, word_count)
VALUES (?, ?, ?, ?, ?, ?)""",
(name, source, job_id, text, struct_json, word_count),
)
conn.commit()
row = conn.execute("SELECT * FROM resumes WHERE id=?", (cur.lastrowid,)).fetchone()
return _resume_as_dict(row)
finally:
conn.close()
def list_resumes(db_path: Path = DEFAULT_DB) -> list[dict]:
"""Return all resumes ordered by default-first then newest-first."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"SELECT * FROM resumes ORDER BY is_default DESC, created_at DESC"
).fetchall()
return [_resume_as_dict(r) for r in rows]
finally:
conn.close()
def get_resume(db_path: Path = DEFAULT_DB, resume_id: int = 0) -> dict | None:
"""Return a single resume by id, or None if not found."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
row = conn.execute("SELECT * FROM resumes WHERE id=?", (resume_id,)).fetchone()
return _resume_as_dict(row) if row else None
finally:
conn.close()
def update_resume(
db_path: Path = DEFAULT_DB,
resume_id: int = 0,
name: str | None = None,
text: str | None = None,
) -> dict | None:
"""Update name and/or text of a resume. Returns updated row or None."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
if name is not None:
conn.execute(
"UPDATE resumes SET name=?, updated_at=datetime('now') WHERE id=?",
(name, resume_id),
)
if text is not None:
word_count = len(text.split())
conn.execute(
"UPDATE resumes SET text=?, word_count=?, updated_at=datetime('now') WHERE id=?",
(text, word_count, resume_id),
)
conn.commit()
row = conn.execute("SELECT * FROM resumes WHERE id=?", (resume_id,)).fetchone()
return _resume_as_dict(row) if row else None
finally:
conn.close()
def delete_resume(db_path: Path = DEFAULT_DB, resume_id: int = 0) -> None:
"""Delete a resume by id."""
conn = sqlite3.connect(db_path)
try:
conn.execute("DELETE FROM resumes WHERE id=?", (resume_id,))
conn.commit()
finally:
conn.close()
def set_default_resume(db_path: Path = DEFAULT_DB, resume_id: int = 0) -> None:
"""Set one resume as default, clearing the flag on all others."""
conn = sqlite3.connect(db_path)
try:
conn.execute("UPDATE resumes SET is_default=0")
conn.execute("UPDATE resumes SET is_default=1 WHERE id=?", (resume_id,))
conn.commit()
finally:
conn.close()
def get_job_resume(db_path: Path = DEFAULT_DB, job_id: int = 0) -> dict | None:
"""Return the resume for a job: job-specific first, then default, then None."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
row = conn.execute(
"""SELECT r.* FROM resumes r
JOIN jobs j ON j.resume_id = r.id
WHERE j.id=?""",
(job_id,),
).fetchone()
if row:
return _resume_as_dict(row)
row = conn.execute(
"SELECT * FROM resumes WHERE is_default=1 LIMIT 1"
).fetchone()
return _resume_as_dict(row) if row else None
finally:
conn.close()
def set_job_resume(db_path: Path = DEFAULT_DB, job_id: int = 0, resume_id: int = 0) -> None:
"""Attach a specific resume to a job (overrides default for that job)."""
conn = sqlite3.connect(db_path)
try:
conn.execute("UPDATE jobs SET resume_id=? WHERE id=?", (resume_id, job_id))
conn.commit()
finally:
conn.close()

89
tests/test_db_resumes.py Normal file
View file

@ -0,0 +1,89 @@
"""Tests for resume library db helpers."""
import sqlite3
import tempfile
from pathlib import Path
import pytest
from scripts.db_migrate import migrate_db
@pytest.fixture
def db(tmp_path):
path = tmp_path / "test.db"
migrate_db(path)
return path
def test_create_and_get_resume(db):
from scripts.db import create_resume, get_resume
r = create_resume(db, name="Q1 2026", text="Software engineer with 5 years experience.")
assert r["id"] > 0
assert r["name"] == "Q1 2026"
assert r["word_count"] == 6
assert r["source"] == "manual"
assert r["is_default"] == 0
fetched = get_resume(db, r["id"])
assert fetched["name"] == "Q1 2026"
def test_list_resumes(db):
from scripts.db import create_resume, list_resumes
create_resume(db, name="A", text="alpha beta")
create_resume(db, name="B", text="gamma delta")
results = list_resumes(db)
assert len(results) == 2
def test_update_resume(db):
from scripts.db import create_resume, update_resume
r = create_resume(db, name="Old name", text="old text here")
updated = update_resume(db, r["id"], name="New name", text="new text content here updated")
assert updated["name"] == "New name"
assert updated["word_count"] == 5
def test_delete_resume(db):
from scripts.db import create_resume, delete_resume, get_resume
r = create_resume(db, name="Temp", text="temp text")
delete_resume(db, r["id"])
assert get_resume(db, r["id"]) is None
def test_set_default_resume(db):
from scripts.db import create_resume, set_default_resume, list_resumes
a = create_resume(db, name="A", text="text a")
b = create_resume(db, name="B", text="text b")
set_default_resume(db, a["id"])
set_default_resume(db, b["id"])
resumes = {r["id"]: r for r in list_resumes(db)}
assert resumes[a["id"]]["is_default"] == 0
assert resumes[b["id"]]["is_default"] == 1
def test_get_job_resume_default_fallback(db):
from scripts.db import create_resume, set_default_resume, get_job_resume
# Insert a minimal job row
conn = sqlite3.connect(db)
conn.execute("INSERT INTO jobs (id, title, company, source) VALUES (1, 'Eng', 'Co', 'test')")
conn.commit()
conn.close()
r = create_resume(db, name="Default", text="default resume text")
set_default_resume(db, r["id"])
result = get_job_resume(db, 1)
assert result["id"] == r["id"]
def test_get_job_resume_job_specific_override(db):
from scripts.db import create_resume, set_default_resume, get_job_resume, set_job_resume
conn = sqlite3.connect(db)
conn.execute("INSERT INTO jobs (id, title, company, source) VALUES (1, 'Eng', 'Co', 'test')")
conn.commit()
conn.close()
default_r = create_resume(db, name="Default", text="default resume text")
set_default_resume(db, default_r["id"])
specific_r = create_resume(db, name="Specific", text="job specific resume text")
set_job_resume(db, job_id=1, resume_id=specific_r["id"])
result = get_job_resume(db, 1)
assert result["id"] == specific_r["id"]