# Digest Scrape Queue Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Add a persistent digest queue so users can click the π° Digest chip on a signal banner, browse extracted job links from queued digest emails, and send selected URLs through the existing discovery pipeline as pending jobs. **Architecture:** New `digest_queue` table in `staging.db` stores queued digest emails (foreign-keyed to `job_contacts`). Four new FastAPI endpoints in `dev-api.py` handle list/add/extract/queue-jobs/delete. A new Pinia store + `DigestView.vue` page provide the browse UI. The existing reclassify chip handler gets a third fire-and-forget API call for digest entries. **Tech Stack:** Python / FastAPI / SQLite / Vue 3 / TypeScript / Pinia / Heroicons Vue --- ## File Map | File | What changes | |---|---| | `scripts/db.py` | Add `CREATE_DIGEST_QUEUE` string; call it in `init_db()` | | `dev-api.py` | `@app.on_event("startup")` for digest table; `_score_url()` + `extract_links()` helpers; 4 new endpoints; `DigestQueueBody` + `QueueJobsBody` Pydantic models | | `tests/test_dev_api_digest.py` | New file β 11 tests with isolated tmp_db fixture | | `web/src/stores/digest.ts` | New Pinia store β `DigestEntry`, `DigestLink` types; `fetchAll()`, `remove()` actions | | `web/src/views/DigestView.vue` | New page β entry list, expand/extract/select/queue UI | | `web/src/router/index.ts` | Add `/digest` route | | `web/src/components/AppNav.vue` | Add `NewspaperIcon` import; add Digest nav item; import digest store; reactive badge in template | | `web/src/components/InterviewCard.vue` | Third fire-and-forget call in `reclassifySignal` digest branch | | `web/src/views/InterviewsView.vue` | Third fire-and-forget call in `reclassifyPreSignal` digest branch | --- ## Task 1: DB schema β `digest_queue` table **Files:** - Modify: `scripts/db.py:138,188-197` - Modify: `dev-api.py` (add startup event after `_strip_html` definition) - [ ] **Step 1: Add `CREATE_DIGEST_QUEUE` string to `scripts/db.py`** After the `CREATE_SURVEY_RESPONSES` string (around line 138), insert: ```python CREATE_DIGEST_QUEUE = """ CREATE TABLE IF NOT EXISTS digest_queue ( id INTEGER PRIMARY KEY, job_contact_id INTEGER NOT NULL REFERENCES job_contacts(id), created_at TEXT DEFAULT (datetime('now')), UNIQUE(job_contact_id) ) """ ``` - [ ] **Step 2: Call it in `init_db()`** In `init_db()` (around line 195), add after the existing `CREATE_SURVEY_RESPONSES` call: ```python conn.execute(CREATE_DIGEST_QUEUE) ``` The full `init_db` body should now be: ```python def init_db(db_path: Path = DEFAULT_DB) -> None: """Create tables if they don't exist, then run migrations.""" conn = sqlite3.connect(db_path) conn.execute(CREATE_JOBS) conn.execute(CREATE_JOB_CONTACTS) conn.execute(CREATE_COMPANY_RESEARCH) conn.execute(CREATE_BACKGROUND_TASKS) conn.execute(CREATE_SURVEY_RESPONSES) conn.execute(CREATE_DIGEST_QUEUE) conn.commit() conn.close() _migrate_db(db_path) ``` - [ ] **Step 3: Add startup event to `dev-api.py`** After the `_strip_html` function definition, add: ```python @app.on_event("startup") def _startup(): """Ensure digest_queue table exists (dev-api may run against an existing DB).""" db = _get_db() db.execute(""" CREATE TABLE IF NOT EXISTS digest_queue ( id INTEGER PRIMARY KEY, job_contact_id INTEGER NOT NULL REFERENCES job_contacts(id), created_at TEXT DEFAULT (datetime('now')), UNIQUE(job_contact_id) ) """) db.commit() db.close() ``` - [ ] **Step 4: Verify schema creation** ```bash cd /Library/Development/CircuitForge/peregrine/.worktrees/feature-vue-spa conda run -n job-seeker python -c " from scripts.db import init_db import tempfile, os with tempfile.TemporaryDirectory() as d: p = os.path.join(d, 'staging.db') init_db(p) import sqlite3 con = sqlite3.connect(p) tables = con.execute(\"SELECT name FROM sqlite_master WHERE type='table'\").fetchall() print([t[0] for t in tables]) " ``` Expected: list includes `digest_queue` - [ ] **Step 5: Commit** ```bash git add scripts/db.py dev-api.py git commit -m "feat: add digest_queue table to schema and dev-api startup" ``` --- ## Task 2: `GET` + `POST /api/digest-queue` endpoints + tests **Files:** - Create: `tests/test_dev_api_digest.py` - Modify: `dev-api.py` (append after reclassify endpoint) - [ ] **Step 1: Create test file with fixture + GET + POST tests** Create `/Library/Development/CircuitForge/peregrine/.worktrees/feature-vue-spa/tests/test_dev_api_digest.py`: ```python """Tests for digest queue API endpoints.""" import sqlite3 import os import pytest from fastapi.testclient import TestClient @pytest.fixture() def tmp_db(tmp_path): """Create minimal schema in a temp dir with one job_contacts row.""" db_path = str(tmp_path / "staging.db") con = sqlite3.connect(db_path) con.executescript(""" CREATE TABLE jobs ( id INTEGER PRIMARY KEY, title TEXT, company TEXT, url TEXT UNIQUE, location TEXT, is_remote INTEGER DEFAULT 0, salary TEXT, match_score REAL, keyword_gaps TEXT, status TEXT DEFAULT 'pending', date_found TEXT, description TEXT, source TEXT ); CREATE TABLE job_contacts ( id INTEGER PRIMARY KEY, job_id INTEGER, subject TEXT, received_at TEXT, stage_signal TEXT, suggestion_dismissed INTEGER DEFAULT 0, body TEXT, from_addr TEXT ); CREATE TABLE digest_queue ( id INTEGER PRIMARY KEY, job_contact_id INTEGER NOT NULL REFERENCES job_contacts(id), created_at TEXT DEFAULT (datetime('now')), UNIQUE(job_contact_id) ); INSERT INTO jobs (id, title, company, url, status, source, date_found) VALUES (1, 'Engineer', 'Acme', 'https://acme.com/job/1', 'applied', 'test', '2026-03-19'); INSERT INTO job_contacts (id, job_id, subject, received_at, stage_signal, body, from_addr) VALUES ( 10, 1, 'TechCrunch Jobs Weekly', '2026-03-19T10:00:00', 'digest', '
Apply at Senior Engineer or Staff Designer. Unsubscribe: https://unsubscribe.example.com/remove', 'digest@techcrunch.com' ); """) con.close() return db_path @pytest.fixture() def client(tmp_db, monkeypatch): monkeypatch.setenv("STAGING_DB", tmp_db) import importlib import dev_api importlib.reload(dev_api) return TestClient(dev_api.app) # ββ GET /api/digest-queue βββββββββββββββββββββββββββββββββββββββββββββββββββ def test_digest_queue_list_empty(client): resp = client.get("/api/digest-queue") assert resp.status_code == 200 assert resp.json() == [] def test_digest_queue_list_with_entry(client, tmp_db): con = sqlite3.connect(tmp_db) con.execute("INSERT INTO digest_queue (job_contact_id) VALUES (10)") con.commit() con.close() resp = client.get("/api/digest-queue") assert resp.status_code == 200 entries = resp.json() assert len(entries) == 1 assert entries[0]["job_contact_id"] == 10 assert entries[0]["subject"] == "TechCrunch Jobs Weekly" assert entries[0]["from_addr"] == "digest@techcrunch.com" assert "body" in entries[0] assert "created_at" in entries[0] # ββ POST /api/digest-queue ββββββββββββββββββββββββββββββββββββββββββββββββββ def test_digest_queue_add(client, tmp_db): resp = client.post("/api/digest-queue", json={"job_contact_id": 10}) assert resp.status_code == 200 data = resp.json() assert data["ok"] is True assert data["created"] is True con = sqlite3.connect(tmp_db) row = con.execute("SELECT * FROM digest_queue WHERE job_contact_id = 10").fetchone() con.close() assert row is not None def test_digest_queue_add_duplicate(client): client.post("/api/digest-queue", json={"job_contact_id": 10}) resp = client.post("/api/digest-queue", json={"job_contact_id": 10}) assert resp.status_code == 200 data = resp.json() assert data["ok"] is True assert data["created"] is False def test_digest_queue_add_missing_contact(client): resp = client.post("/api/digest-queue", json={"job_contact_id": 9999}) assert resp.status_code == 404 ``` - [ ] **Step 2: Run tests β expect failures** ```bash cd /Library/Development/CircuitForge/peregrine/.worktrees/feature-vue-spa /devl/miniconda3/envs/job-seeker/bin/pytest tests/test_dev_api_digest.py -v 2>&1 | tail -20 ``` Expected: 4 FAILED, 1 PASSED. `test_digest_queue_add_missing_contact` passes immediately because a POST to a non-existent endpoint returns 404, matching the assertion β this is expected and fine. - [ ] **Step 3: Implement `GET /api/digest-queue` + `POST /api/digest-queue` in `dev-api.py`** After the reclassify endpoint (after line ~428), add: ```python # ββ Digest queue models βββββββββββββββββββββββββββββββββββββββββββββββββββ class DigestQueueBody(BaseModel): job_contact_id: int # ββ GET /api/digest-queue βββββββββββββββββββββββββββββββββββββββββββββββββ @app.get("/api/digest-queue") def list_digest_queue(): db = _get_db() rows = db.execute( """SELECT dq.id, dq.job_contact_id, dq.created_at, jc.subject, jc.from_addr, jc.received_at, jc.body FROM digest_queue dq JOIN job_contacts jc ON jc.id = dq.job_contact_id ORDER BY dq.created_at DESC""" ).fetchall() db.close() return [ { "id": r["id"], "job_contact_id": r["job_contact_id"], "created_at": r["created_at"], "subject": r["subject"], "from_addr": r["from_addr"], "received_at": r["received_at"], "body": _strip_html(r["body"]), } for r in rows ] # ββ POST /api/digest-queue ββββββββββββββββββββββββββββββββββββββββββββββββ @app.post("/api/digest-queue") def add_to_digest_queue(body: DigestQueueBody): db = _get_db() exists = db.execute( "SELECT 1 FROM job_contacts WHERE id = ?", (body.job_contact_id,) ).fetchone() if not exists: db.close() raise HTTPException(404, "job_contact_id not found") result = db.execute( "INSERT OR IGNORE INTO digest_queue (job_contact_id) VALUES (?)", (body.job_contact_id,), ) db.commit() created = result.rowcount > 0 db.close() return {"ok": True, "created": created} ``` - [ ] **Step 4: Run tests β expect all pass** ```bash /devl/miniconda3/envs/job-seeker/bin/pytest tests/test_dev_api_digest.py -v 2>&1 | tail -15 ``` Expected: 5 PASSED - [ ] **Step 5: Run full suite to catch regressions** ```bash /devl/miniconda3/envs/job-seeker/bin/pytest tests/ -v 2>&1 | tail -5 ``` Expected: all previously passing tests still pass - [ ] **Step 6: Commit** ```bash git add dev-api.py tests/test_dev_api_digest.py git commit -m "feat: add GET/POST /api/digest-queue endpoints" ``` --- ## Task 3: `POST /api/digest-queue/{id}/extract-links` + tests **Files:** - Modify: `tests/test_dev_api_digest.py` (append tests) - Modify: `dev-api.py` (append helpers + endpoint) - [ ] **Step 1: Add extract-links tests to `test_dev_api_digest.py`** Append to `tests/test_dev_api_digest.py`: ```python # ββ POST /api/digest-queue/{id}/extract-links βββββββββββββββββββββββββββββββ def _add_digest_entry(tmp_db, contact_id=10): """Helper: insert a digest_queue row and return its id.""" con = sqlite3.connect(tmp_db) cur = con.execute("INSERT INTO digest_queue (job_contact_id) VALUES (?)", (contact_id,)) entry_id = cur.lastrowid con.commit() con.close() return entry_id def test_digest_extract_links(client, tmp_db): entry_id = _add_digest_entry(tmp_db) resp = client.post(f"/api/digest-queue/{entry_id}/extract-links") assert resp.status_code == 200 links = resp.json()["links"] urls = [l["url"] for l in links] # greenhouse.io link should be present with score=2 gh_links = [l for l in links if "greenhouse.io" in l["url"]] assert len(gh_links) == 1 assert gh_links[0]["score"] == 2 # lever.co link should be present with score=2 lever_links = [l for l in links if "lever.co" in l["url"]] assert len(lever_links) == 1 assert lever_links[0]["score"] == 2 def test_digest_extract_links_filters_trackers(client, tmp_db): entry_id = _add_digest_entry(tmp_db) resp = client.post(f"/api/digest-queue/{entry_id}/extract-links") assert resp.status_code == 200 links = resp.json()["links"] urls = [l["url"] for l in links] # Unsubscribe URL should be excluded assert not any("unsubscribe" in u for u in urls) def test_digest_extract_links_404(client): resp = client.post("/api/digest-queue/9999/extract-links") assert resp.status_code == 404 ``` - [ ] **Step 2: Run new tests β expect failures** ```bash /devl/miniconda3/envs/job-seeker/bin/pytest tests/test_dev_api_digest.py::test_digest_extract_links tests/test_dev_api_digest.py::test_digest_extract_links_filters_trackers tests/test_dev_api_digest.py::test_digest_extract_links_404 -v 2>&1 | tail -10 ``` Expected: 3 FAILED - [ ] **Step 3: Add `_score_url()` and `extract_links()` to `dev-api.py`** First, add `urlparse` to the import block at the **top of `dev-api.py`** (around line 10, with the other stdlib imports): ```python from urllib.parse import urlparse ``` Then, after the `_startup` event function (before the endpoint definitions), add the helpers: ```python # ββ Link extraction helpers βββββββββββββββββββββββββββββββββββββββββββββββ _JOB_DOMAINS = frozenset({ 'greenhouse.io', 'lever.co', 'workday.com', 'linkedin.com', 'ashbyhq.com', 'smartrecruiters.com', 'icims.com', 'taleo.net', 'jobvite.com', 'breezy.hr', 'recruitee.com', 'bamboohr.com', 'myworkdayjobs.com', }) _JOB_PATH_SEGMENTS = frozenset({'careers', 'jobs'}) _FILTER_RE = re.compile( r'(unsubscribe|mailto:|/track/|pixel\.|\.gif|\.png|\.jpg' r'|/open\?|/click\?|list-unsubscribe)', re.I, ) _URL_RE = re.compile(r'https?://[^\s<>"\')\]]+', re.I) def _score_url(url: str) -> int: """Return 2 for likely job URLs, 1 for others, -1 to exclude.""" if _FILTER_RE.search(url): return -1 parsed = urlparse(url) hostname = (parsed.hostname or '').lower() path = parsed.path.lower() for domain in _JOB_DOMAINS: if domain in hostname or domain in path: return 2 for seg in _JOB_PATH_SEGMENTS: if f'/{seg}/' in path or path.startswith(f'/{seg}'): return 2 return 1 def _extract_links(body: str) -> list[dict]: """Extract and rank URLs from raw HTML email body.""" if not body: return [] seen: set[str] = set() results = [] for m in _URL_RE.finditer(body): url = m.group(0).rstrip('.,;)') if url in seen: continue seen.add(url) score = _score_url(url) if score < 0: continue start = max(0, m.start() - 60) hint = body[start:m.start()].strip().split('\n')[-1].strip() results.append({'url': url, 'score': score, 'hint': hint}) results.sort(key=lambda x: -x['score']) return results ``` - [ ] **Step 4: Add `POST /api/digest-queue/{id}/extract-links` endpoint** After `add_to_digest_queue`, add: ```python # ββ POST /api/digest-queue/{id}/extract-links βββββββββββββββββββββββββββββ @app.post("/api/digest-queue/{digest_id}/extract-links") def extract_digest_links(digest_id: int): db = _get_db() row = db.execute( """SELECT jc.body FROM digest_queue dq JOIN job_contacts jc ON jc.id = dq.job_contact_id WHERE dq.id = ?""", (digest_id,), ).fetchone() db.close() if not row: raise HTTPException(404, "Digest entry not found") return {"links": _extract_links(row["body"] or "")} ``` - [ ] **Step 5: Run tests β expect all pass** ```bash /devl/miniconda3/envs/job-seeker/bin/pytest tests/test_dev_api_digest.py -v 2>&1 | tail -15 ``` Expected: 8 PASSED - [ ] **Step 6: Commit** ```bash git add dev-api.py tests/test_dev_api_digest.py git commit -m "feat: add /extract-links endpoint with URL scoring" ``` --- ## Task 4: `POST /api/digest-queue/{id}/queue-jobs` + `DELETE /api/digest-queue/{id}` + tests **Files:** - Modify: `tests/test_dev_api_digest.py` (append tests) - Modify: `dev-api.py` (append models + endpoints) - [ ] **Step 1: Add queue-jobs and delete tests** Append to `tests/test_dev_api_digest.py`: ```python # ββ POST /api/digest-queue/{id}/queue-jobs ββββββββββββββββββββββββββββββββββ def test_digest_queue_jobs(client, tmp_db): entry_id = _add_digest_entry(tmp_db) resp = client.post( f"/api/digest-queue/{entry_id}/queue-jobs", json={"urls": ["https://greenhouse.io/acme/jobs/456"]}, ) assert resp.status_code == 200 data = resp.json() assert data["queued"] == 1 assert data["skipped"] == 0 con = sqlite3.connect(tmp_db) row = con.execute( "SELECT source, status FROM jobs WHERE url = 'https://greenhouse.io/acme/jobs/456'" ).fetchone() con.close() assert row is not None assert row[0] == "digest" assert row[1] == "pending" def test_digest_queue_jobs_skips_duplicates(client, tmp_db): entry_id = _add_digest_entry(tmp_db) resp = client.post( f"/api/digest-queue/{entry_id}/queue-jobs", json={"urls": [ "https://greenhouse.io/acme/jobs/789", "https://greenhouse.io/acme/jobs/789", # same URL twice in one call ]}, ) assert resp.status_code == 200 data = resp.json() assert data["queued"] == 1 assert data["skipped"] == 1 con = sqlite3.connect(tmp_db) count = con.execute( "SELECT COUNT(*) FROM jobs WHERE url = 'https://greenhouse.io/acme/jobs/789'" ).fetchone()[0] con.close() assert count == 1 def test_digest_queue_jobs_skips_invalid_urls(client, tmp_db): entry_id = _add_digest_entry(tmp_db) resp = client.post( f"/api/digest-queue/{entry_id}/queue-jobs", json={"urls": ["", "ftp://bad.example.com", "https://valid.greenhouse.io/job/1"]}, ) assert resp.status_code == 200 data = resp.json() assert data["queued"] == 1 assert data["skipped"] == 2 def test_digest_queue_jobs_empty_urls(client, tmp_db): entry_id = _add_digest_entry(tmp_db) resp = client.post(f"/api/digest-queue/{entry_id}/queue-jobs", json={"urls": []}) assert resp.status_code == 400 def test_digest_queue_jobs_404(client): resp = client.post("/api/digest-queue/9999/queue-jobs", json={"urls": ["https://example.com"]}) assert resp.status_code == 404 # ββ DELETE /api/digest-queue/{id} βββββββββββββββββββββββββββββββββββββββββββ def test_digest_delete(client, tmp_db): entry_id = _add_digest_entry(tmp_db) resp = client.delete(f"/api/digest-queue/{entry_id}") assert resp.status_code == 200 assert resp.json()["ok"] is True # Second delete β 404 resp2 = client.delete(f"/api/digest-queue/{entry_id}") assert resp2.status_code == 404 ``` - [ ] **Step 2: Run new tests β expect failures** ```bash /devl/miniconda3/envs/job-seeker/bin/pytest tests/test_dev_api_digest.py -k "queue_jobs or delete" -v 2>&1 | tail -15 ``` Expected: 6 FAILED - [ ] **Step 3: Add `QueueJobsBody` model + both endpoints to `dev-api.py`** ```python class QueueJobsBody(BaseModel): urls: list[str] # ββ POST /api/digest-queue/{id}/queue-jobs ββββββββββββββββββββββββββββββββ @app.post("/api/digest-queue/{digest_id}/queue-jobs") def queue_digest_jobs(digest_id: int, body: QueueJobsBody): if not body.urls: raise HTTPException(400, "urls must not be empty") db = _get_db() exists = db.execute( "SELECT 1 FROM digest_queue WHERE id = ?", (digest_id,) ).fetchone() db.close() if not exists: raise HTTPException(404, "Digest entry not found") try: from scripts.db import insert_job except ImportError: raise HTTPException(500, "scripts.db not available") queued = 0 skipped = 0 for url in body.urls: if not url or not url.startswith(('http://', 'https://')): skipped += 1 continue result = insert_job(DB_PATH, { 'url': url, 'title': '', 'company': '', 'source': 'digest', 'date_found': datetime.utcnow().isoformat(), }) if result: queued += 1 else: skipped += 1 return {"ok": True, "queued": queued, "skipped": skipped} # ββ DELETE /api/digest-queue/{id} ββββββββββββββββββββββββββββββββββββββββ @app.delete("/api/digest-queue/{digest_id}") def delete_digest_entry(digest_id: int): db = _get_db() result = db.execute("DELETE FROM digest_queue WHERE id = ?", (digest_id,)) db.commit() rowcount = result.rowcount db.close() if rowcount == 0: raise HTTPException(404, "Digest entry not found") return {"ok": True} ``` - [ ] **Step 4: Run full digest test suite β expect all pass** ```bash /devl/miniconda3/envs/job-seeker/bin/pytest tests/test_dev_api_digest.py -v 2>&1 | tail -20 ``` Expected: 14 PASSED - [ ] **Step 5: Run full test suite** ```bash /devl/miniconda3/envs/job-seeker/bin/pytest tests/ -v 2>&1 | tail -5 ``` Expected: all previously passing tests still pass - [ ] **Step 6: Commit** ```bash git add dev-api.py tests/test_dev_api_digest.py git commit -m "feat: add queue-jobs and delete digest endpoints" ``` --- ## Task 5: Pinia store β `web/src/stores/digest.ts` **Files:** - Create: `web/src/stores/digest.ts` - [ ] **Step 1: Create the store** ```typescript import { defineStore } from 'pinia' import { ref } from 'vue' import { useApiFetch } from '@/composables/useApi' export interface DigestEntry { id: number job_contact_id: number created_at: string subject: string from_addr: string | null received_at: string body: string | null } export interface DigestLink { url: string score: number // 2 = job-likely, 1 = other hint: string } export const useDigestStore = defineStore('digest', () => { const entries = ref