feat(db): add reset_running_tasks() for durable scheduler restart

This commit is contained in:
pyr0ball 2026-03-15 03:22:45 -07:00
parent eef2478948
commit 905db2f147
2 changed files with 70 additions and 0 deletions

View file

@ -366,6 +366,18 @@ def kill_stuck_tasks(db_path: Path = DEFAULT_DB) -> int:
return count return count
def reset_running_tasks(db_path: Path = DEFAULT_DB) -> int:
"""On restart: mark in-flight tasks failed. Queued tasks survive for the scheduler."""
conn = sqlite3.connect(db_path)
count = conn.execute(
"UPDATE background_tasks SET status='failed', error='Interrupted by restart',"
" finished_at=datetime('now') WHERE status='running'"
).rowcount
conn.commit()
conn.close()
return count
def purge_email_data(db_path: Path = DEFAULT_DB) -> tuple[int, int]: def purge_email_data(db_path: Path = DEFAULT_DB) -> tuple[int, int]:
"""Delete all job_contacts rows and email-sourced pending jobs. """Delete all job_contacts rows and email-sourced pending jobs.
Returns (contacts_deleted, jobs_deleted). Returns (contacts_deleted, jobs_deleted).

View file

@ -0,0 +1,58 @@
# tests/test_task_scheduler.py
"""Tests for scripts/task_scheduler.py and related db helpers."""
import sqlite3
import threading
import time
from collections import deque
from pathlib import Path
import pytest
from scripts.db import init_db, reset_running_tasks
@pytest.fixture
def tmp_db(tmp_path):
db = tmp_path / "test.db"
init_db(db)
return db
def test_reset_running_tasks_resets_only_running(tmp_db):
"""reset_running_tasks() marks running→failed but leaves queued untouched."""
conn = sqlite3.connect(tmp_db)
conn.execute(
"INSERT INTO background_tasks (task_type, job_id, status) VALUES (?,?,?)",
("cover_letter", 1, "running"),
)
conn.execute(
"INSERT INTO background_tasks (task_type, job_id, status) VALUES (?,?,?)",
("company_research", 2, "queued"),
)
conn.commit()
conn.close()
count = reset_running_tasks(tmp_db)
conn = sqlite3.connect(tmp_db)
rows = {r[0]: r[1] for r in conn.execute(
"SELECT task_type, status FROM background_tasks"
).fetchall()}
conn.close()
assert count == 1
assert rows["cover_letter"] == "failed"
assert rows["company_research"] == "queued"
def test_reset_running_tasks_returns_zero_when_nothing_running(tmp_db):
"""Returns 0 when no running tasks exist."""
conn = sqlite3.connect(tmp_db)
conn.execute(
"INSERT INTO background_tasks (task_type, job_id, status) VALUES (?,?,?)",
("cover_letter", 1, "queued"),
)
conn.commit()
conn.close()
assert reset_running_tasks(tmp_db) == 0