feat(db): add reset_running_tasks() for durable scheduler restart
This commit is contained in:
parent
12974f030c
commit
376e028af5
2 changed files with 70 additions and 0 deletions
|
|
@ -366,6 +366,18 @@ def kill_stuck_tasks(db_path: Path = DEFAULT_DB) -> int:
|
|||
return count
|
||||
|
||||
|
||||
def reset_running_tasks(db_path: Path = DEFAULT_DB) -> int:
|
||||
"""On restart: mark in-flight tasks failed. Queued tasks survive for the scheduler."""
|
||||
conn = sqlite3.connect(db_path)
|
||||
count = conn.execute(
|
||||
"UPDATE background_tasks SET status='failed', error='Interrupted by restart',"
|
||||
" finished_at=datetime('now') WHERE status='running'"
|
||||
).rowcount
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return count
|
||||
|
||||
|
||||
def purge_email_data(db_path: Path = DEFAULT_DB) -> tuple[int, int]:
|
||||
"""Delete all job_contacts rows and email-sourced pending jobs.
|
||||
Returns (contacts_deleted, jobs_deleted).
|
||||
|
|
|
|||
58
tests/test_task_scheduler.py
Normal file
58
tests/test_task_scheduler.py
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
# tests/test_task_scheduler.py
|
||||
"""Tests for scripts/task_scheduler.py and related db helpers."""
|
||||
import sqlite3
|
||||
import threading
|
||||
import time
|
||||
from collections import deque
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from scripts.db import init_db, reset_running_tasks
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_db(tmp_path):
|
||||
db = tmp_path / "test.db"
|
||||
init_db(db)
|
||||
return db
|
||||
|
||||
|
||||
def test_reset_running_tasks_resets_only_running(tmp_db):
|
||||
"""reset_running_tasks() marks running→failed but leaves queued untouched."""
|
||||
conn = sqlite3.connect(tmp_db)
|
||||
conn.execute(
|
||||
"INSERT INTO background_tasks (task_type, job_id, status) VALUES (?,?,?)",
|
||||
("cover_letter", 1, "running"),
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO background_tasks (task_type, job_id, status) VALUES (?,?,?)",
|
||||
("company_research", 2, "queued"),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
count = reset_running_tasks(tmp_db)
|
||||
|
||||
conn = sqlite3.connect(tmp_db)
|
||||
rows = {r[0]: r[1] for r in conn.execute(
|
||||
"SELECT task_type, status FROM background_tasks"
|
||||
).fetchall()}
|
||||
conn.close()
|
||||
|
||||
assert count == 1
|
||||
assert rows["cover_letter"] == "failed"
|
||||
assert rows["company_research"] == "queued"
|
||||
|
||||
|
||||
def test_reset_running_tasks_returns_zero_when_nothing_running(tmp_db):
|
||||
"""Returns 0 when no running tasks exist."""
|
||||
conn = sqlite3.connect(tmp_db)
|
||||
conn.execute(
|
||||
"INSERT INTO background_tasks (task_type, job_id, status) VALUES (?,?,?)",
|
||||
("cover_letter", 1, "queued"),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
assert reset_running_tasks(tmp_db) == 0
|
||||
Loading…
Reference in a new issue