feat(peregrine): wire cloud_session into pages for multi-tenant db path routing

resolve_session() is a no-op in local mode — no behavior change for existing users.
In cloud mode, injects user-scoped db_path into st.session_state at page load.
This commit is contained in:
pyr0ball 2026-03-09 20:22:17 -07:00
parent 634e31968f
commit 5a1fceda84
5 changed files with 58 additions and 50 deletions

View file

@ -18,12 +18,14 @@ _USER_YAML = Path(__file__).parent.parent / "config" / "user.yaml"
_profile = UserProfile(_USER_YAML) if UserProfile.exists(_USER_YAML) else None
_name = _profile.name if _profile else "Job Seeker"
from scripts.db import DEFAULT_DB, init_db, get_job_counts, purge_jobs, purge_email_data, \
from scripts.db import init_db, get_job_counts, purge_jobs, purge_email_data, \
purge_non_remote, archive_jobs, kill_stuck_tasks, get_task_for_job, get_active_tasks, \
insert_job, get_existing_urls
from scripts.task_runner import submit_task
from app.cloud_session import resolve_session, get_db_path
init_db(DEFAULT_DB)
resolve_session("peregrine")
init_db(get_db_path())
def _email_configured() -> bool:
_e = Path(__file__).parent.parent / "config" / "email.yaml"
@ -136,7 +138,7 @@ st.divider()
@st.fragment(run_every=10)
def _live_counts():
counts = get_job_counts(DEFAULT_DB)
counts = get_job_counts(get_db_path())
col1, col2, col3, col4, col5 = st.columns(5)
col1.metric("Pending Review", counts.get("pending", 0))
col2.metric("Approved", counts.get("approved", 0))
@ -155,18 +157,18 @@ with left:
st.subheader("Find New Jobs")
st.caption("Scrapes all configured boards and adds new listings to your review queue.")
_disc_task = get_task_for_job(DEFAULT_DB, "discovery", 0)
_disc_task = get_task_for_job(get_db_path(), "discovery", 0)
_disc_running = _disc_task and _disc_task["status"] in ("queued", "running")
if st.button("🚀 Run Discovery", use_container_width=True, type="primary",
disabled=bool(_disc_running)):
submit_task(DEFAULT_DB, "discovery", 0)
submit_task(get_db_path(), "discovery", 0)
st.rerun()
if _disc_running:
@st.fragment(run_every=4)
def _disc_status():
t = get_task_for_job(DEFAULT_DB, "discovery", 0)
t = get_task_for_job(get_db_path(), "discovery", 0)
if t and t["status"] in ("queued", "running"):
lbl = "Queued…" if t["status"] == "queued" else "Scraping job boards… this may take a minute"
st.info(f"{lbl}")
@ -184,18 +186,18 @@ with enrich_col:
st.subheader("Enrich Descriptions")
st.caption("Re-fetch missing descriptions for any listing (LinkedIn, Indeed, Glassdoor, Adzuna, The Ladders, generic).")
_enrich_task = get_task_for_job(DEFAULT_DB, "enrich_descriptions", 0)
_enrich_task = get_task_for_job(get_db_path(), "enrich_descriptions", 0)
_enrich_running = _enrich_task and _enrich_task["status"] in ("queued", "running")
if st.button("🔍 Fill Missing Descriptions", use_container_width=True, type="primary",
disabled=bool(_enrich_running)):
submit_task(DEFAULT_DB, "enrich_descriptions", 0)
submit_task(get_db_path(), "enrich_descriptions", 0)
st.rerun()
if _enrich_running:
@st.fragment(run_every=4)
def _enrich_status():
t = get_task_for_job(DEFAULT_DB, "enrich_descriptions", 0)
t = get_task_for_job(get_db_path(), "enrich_descriptions", 0)
if t and t["status"] in ("queued", "running"):
st.info("⏳ Fetching descriptions…")
else:
@ -210,7 +212,7 @@ with enrich_col:
with mid:
unscored = sum(1 for j in __import__("scripts.db", fromlist=["get_jobs_by_status"])
.get_jobs_by_status(DEFAULT_DB, "pending")
.get_jobs_by_status(get_db_path(), "pending")
if j.get("match_score") is None and j.get("description"))
st.subheader("Score Listings")
st.caption(f"Run TF-IDF match scoring against {_name}'s resume. {unscored} pending job{'s' if unscored != 1 else ''} unscored.")
@ -231,7 +233,7 @@ with mid:
st.rerun()
with right:
approved_count = get_job_counts(DEFAULT_DB).get("approved", 0)
approved_count = get_job_counts(get_db_path()).get("approved", 0)
st.subheader("Send to Notion")
st.caption("Push all approved jobs to your Notion tracking database.")
if approved_count == 0:
@ -243,7 +245,7 @@ with right:
):
with st.spinner("Syncing to Notion…"):
from scripts.sync import sync_to_notion
count = sync_to_notion(DEFAULT_DB)
count = sync_to_notion(get_db_path())
st.success(f"Synced {count} job{'s' if count != 1 else ''} to Notion!")
st.rerun()
@ -258,18 +260,18 @@ with email_left:
"New recruiter outreach is added to your Job Review queue.")
with email_right:
_email_task = get_task_for_job(DEFAULT_DB, "email_sync", 0)
_email_task = get_task_for_job(get_db_path(), "email_sync", 0)
_email_running = _email_task and _email_task["status"] in ("queued", "running")
if st.button("📧 Sync Emails", use_container_width=True, type="primary",
disabled=bool(_email_running)):
submit_task(DEFAULT_DB, "email_sync", 0)
submit_task(get_db_path(), "email_sync", 0)
st.rerun()
if _email_running:
@st.fragment(run_every=4)
def _email_status():
t = get_task_for_job(DEFAULT_DB, "email_sync", 0)
t = get_task_for_job(get_db_path(), "email_sync", 0)
if t and t["status"] in ("queued", "running"):
st.info("⏳ Syncing emails…")
else:
@ -304,7 +306,7 @@ with url_tab:
disabled=not (url_text or "").strip()):
_urls = [u.strip() for u in url_text.strip().splitlines() if u.strip().startswith("http")]
if _urls:
_n = _queue_url_imports(DEFAULT_DB, _urls)
_n = _queue_url_imports(get_db_path(), _urls)
if _n:
st.success(f"Queued {_n} job{'s' if _n != 1 else ''} for import. Check Job Review shortly.")
else:
@ -327,7 +329,7 @@ with csv_tab:
if _csv_urls:
st.caption(f"Found {len(_csv_urls)} URL(s) in CSV.")
if st.button("📥 Import CSV Jobs", key="add_csv_btn", use_container_width=True):
_n = _queue_url_imports(DEFAULT_DB, _csv_urls)
_n = _queue_url_imports(get_db_path(),_csv_urls)
st.success(f"Queued {_n} job{'s' if _n != 1 else ''} for import.")
st.rerun()
else:
@ -337,7 +339,7 @@ with csv_tab:
@st.fragment(run_every=3)
def _scrape_status():
import sqlite3 as _sq
conn = _sq.connect(DEFAULT_DB)
conn = _sq.connect(get_db_path())
conn.row_factory = _sq.Row
rows = conn.execute(
"""SELECT bt.status, bt.error, j.title, j.company, j.url
@ -384,7 +386,7 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.warning("Are you sure? This cannot be undone.")
c1, c2 = st.columns(2)
if c1.button("Yes, purge", type="primary", use_container_width=True):
deleted = purge_jobs(DEFAULT_DB, statuses=["pending", "rejected"])
deleted = purge_jobs(get_db_path(), statuses=["pending", "rejected"])
st.success(f"Purged {deleted} jobs.")
st.session_state.pop("confirm_purge", None)
st.rerun()
@ -402,7 +404,7 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.warning("This deletes all email contacts and email-sourced jobs. Cannot be undone.")
c1, c2 = st.columns(2)
if c1.button("Yes, purge emails", type="primary", use_container_width=True):
contacts, jobs = purge_email_data(DEFAULT_DB)
contacts, jobs = purge_email_data(get_db_path())
st.success(f"Purged {contacts} email contacts, {jobs} email jobs.")
st.session_state.pop("confirm_purge", None)
st.rerun()
@ -411,11 +413,11 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.rerun()
with tasks_col:
_active = get_active_tasks(DEFAULT_DB)
_active = get_active_tasks(get_db_path())
st.markdown("**Kill stuck tasks**")
st.caption(f"Force-fail all queued/running background tasks. Currently **{len(_active)}** active.")
if st.button("⏹ Kill All Tasks", use_container_width=True, disabled=len(_active) == 0):
killed = kill_stuck_tasks(DEFAULT_DB)
killed = kill_stuck_tasks(get_db_path())
st.success(f"Killed {killed} task(s).")
st.rerun()
@ -429,8 +431,8 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.warning("This will delete ALL pending, approved, and rejected jobs, then re-scrape. Applied and synced records are kept.")
c1, c2 = st.columns(2)
if c1.button("Yes, wipe + scrape", type="primary", use_container_width=True):
purge_jobs(DEFAULT_DB, statuses=["pending", "approved", "rejected"])
submit_task(DEFAULT_DB, "discovery", 0)
purge_jobs(get_db_path(), statuses=["pending", "approved", "rejected"])
submit_task(get_db_path(), "discovery", 0)
st.session_state.pop("confirm_purge", None)
st.rerun()
if c2.button("Cancel ", use_container_width=True):
@ -451,7 +453,7 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.warning("Deletes all pending jobs. Rejected jobs are kept. Cannot be undone.")
c1, c2 = st.columns(2)
if c1.button("Yes, purge pending", type="primary", use_container_width=True):
deleted = purge_jobs(DEFAULT_DB, statuses=["pending"])
deleted = purge_jobs(get_db_path(), statuses=["pending"])
st.success(f"Purged {deleted} pending jobs.")
st.session_state.pop("confirm_purge", None)
st.rerun()
@ -469,7 +471,7 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.warning("Deletes all non-remote jobs not yet applied to. Cannot be undone.")
c1, c2 = st.columns(2)
if c1.button("Yes, purge on-site", type="primary", use_container_width=True):
deleted = purge_non_remote(DEFAULT_DB)
deleted = purge_non_remote(get_db_path())
st.success(f"Purged {deleted} non-remote jobs.")
st.session_state.pop("confirm_purge", None)
st.rerun()
@ -487,7 +489,7 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.warning("Deletes all approved-but-not-applied jobs. Cannot be undone.")
c1, c2 = st.columns(2)
if c1.button("Yes, purge approved", type="primary", use_container_width=True):
deleted = purge_jobs(DEFAULT_DB, statuses=["approved"])
deleted = purge_jobs(get_db_path(), statuses=["approved"])
st.success(f"Purged {deleted} approved jobs.")
st.session_state.pop("confirm_purge", None)
st.rerun()
@ -512,7 +514,7 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.info("Jobs will be archived (not deleted) — URLs are kept for dedup.")
c1, c2 = st.columns(2)
if c1.button("Yes, archive", type="primary", use_container_width=True):
archived = archive_jobs(DEFAULT_DB, statuses=["pending", "rejected"])
archived = archive_jobs(get_db_path(), statuses=["pending", "rejected"])
st.success(f"Archived {archived} jobs.")
st.session_state.pop("confirm_purge", None)
st.rerun()
@ -530,7 +532,7 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.info("Approved jobs will be archived (not deleted).")
c1, c2 = st.columns(2)
if c1.button("Yes, archive approved", type="primary", use_container_width=True):
archived = archive_jobs(DEFAULT_DB, statuses=["approved"])
archived = archive_jobs(get_db_path(), statuses=["approved"])
st.success(f"Archived {archived} approved jobs.")
st.session_state.pop("confirm_purge", None)
st.rerun()

View file

@ -22,6 +22,7 @@ IS_DEMO = os.environ.get("DEMO_MODE", "").lower() in ("1", "true", "yes")
import streamlit as st
from scripts.db import DEFAULT_DB, init_db, get_active_tasks
from app.feedback import inject_feedback_button
from app.cloud_session import resolve_session, get_db_path
import sqlite3
st.set_page_config(
@ -30,7 +31,8 @@ st.set_page_config(
layout="wide",
)
init_db(DEFAULT_DB)
resolve_session("peregrine")
init_db(get_db_path())
# ── Startup cleanup — runs once per server process via cache_resource ──────────
@st.cache_resource
@ -40,7 +42,7 @@ def _startup() -> None:
2. Auto-queues re-runs for any research generated without SearXNG data,
if SearXNG is now reachable.
"""
conn = sqlite3.connect(DEFAULT_DB)
conn = sqlite3.connect(get_db_path())
conn.execute(
"UPDATE background_tasks SET status='failed', error='Interrupted by server restart',"
" finished_at=datetime('now') WHERE status IN ('queued','running')"
@ -61,7 +63,7 @@ def _startup() -> None:
_ACTIVE_STAGES,
).fetchall()
for (job_id,) in rows:
submit_task(str(DEFAULT_DB), "company_research", job_id)
submit_task(str(get_db_path()), "company_research", job_id)
except Exception:
pass # never block startup
@ -113,7 +115,7 @@ pg = st.navigation(pages)
# The sidebar context WRAPS the fragment call — do not write to st.sidebar inside it.
@st.fragment(run_every=3)
def _task_indicator():
tasks = get_active_tasks(DEFAULT_DB)
tasks = get_active_tasks(get_db_path())
if not tasks:
return
st.divider()

View file

@ -15,6 +15,9 @@ sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import streamlit as st
import yaml
from app.cloud_session import resolve_session, get_db_path
resolve_session("peregrine")
_ROOT = Path(__file__).parent.parent.parent
CONFIG_DIR = _ROOT / "config"
USER_YAML = CONFIG_DIR / "user.yaml"
@ -74,18 +77,16 @@ def _suggest_profile(gpus: list[str]) -> str:
def _submit_wizard_task(section: str, input_data: dict) -> int:
"""Submit a wizard_generate background task. Returns task_id."""
from scripts.db import DEFAULT_DB
from scripts.task_runner import submit_task
params = json.dumps({"section": section, "input": input_data})
task_id, _ = submit_task(DEFAULT_DB, "wizard_generate", 0, params=params)
task_id, _ = submit_task(get_db_path(), "wizard_generate", 0, params=params)
return task_id
def _poll_wizard_task(section: str) -> dict | None:
"""Return the most recent wizard_generate task row for a given section, or None."""
import sqlite3
from scripts.db import DEFAULT_DB
conn = sqlite3.connect(DEFAULT_DB)
conn = sqlite3.connect(get_db_path())
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT * FROM background_tasks "

View file

@ -12,11 +12,13 @@ import yaml
import os as _os
from scripts.user_profile import UserProfile
from app.cloud_session import resolve_session, get_db_path
_USER_YAML = Path(__file__).parent.parent.parent / "config" / "user.yaml"
_profile = UserProfile(_USER_YAML) if UserProfile.exists(_USER_YAML) else None
_name = _profile.name if _profile else "Job Seeker"
resolve_session("peregrine")
st.title("⚙️ Settings")
CONFIG_DIR = Path(__file__).parent.parent.parent / "config"
@ -1371,12 +1373,11 @@ with tab_finetune:
st.markdown("**Step 2: Extract Training Pairs**")
import json as _json
import sqlite3 as _sqlite3
from scripts.db import DEFAULT_DB as _FT_DB
jsonl_path = _profile.docs_dir / "training_data" / "cover_letters.jsonl"
# Show task status
_ft_conn = _sqlite3.connect(_FT_DB)
_ft_conn = _sqlite3.connect(get_db_path())
_ft_conn.row_factory = _sqlite3.Row
_ft_task = _ft_conn.execute(
"SELECT * FROM background_tasks WHERE task_type='prepare_training' ORDER BY id DESC LIMIT 1"

View file

@ -26,13 +26,15 @@ from scripts.db import (
get_task_for_job,
)
from scripts.task_runner import submit_task
from app.cloud_session import resolve_session, get_db_path
DOCS_DIR = _profile.docs_dir if _profile else Path.home() / "Documents" / "JobSearch"
RESUME_YAML = Path(__file__).parent.parent.parent / "config" / "plain_text_resume.yaml"
st.title("🚀 Apply Workspace")
init_db(DEFAULT_DB)
resolve_session("peregrine")
init_db(get_db_path())
# ── PDF generation ─────────────────────────────────────────────────────────────
def _make_cover_letter_pdf(job: dict, cover_letter: str, output_dir: Path) -> Path:
@ -156,7 +158,7 @@ def _copy_btn(text: str, label: str = "📋 Copy", done: str = "✅ Copied!", he
)
# ── Job selection ──────────────────────────────────────────────────────────────
approved = get_jobs_by_status(DEFAULT_DB, "approved")
approved = get_jobs_by_status(get_db_path(), "approved")
if not approved:
st.info("No approved jobs — head to Job Review to approve some listings first.")
st.stop()
@ -219,17 +221,17 @@ with col_tools:
if _cl_key not in st.session_state:
st.session_state[_cl_key] = job.get("cover_letter") or ""
_cl_task = get_task_for_job(DEFAULT_DB, "cover_letter", selected_id)
_cl_task = get_task_for_job(get_db_path(), "cover_letter", selected_id)
_cl_running = _cl_task and _cl_task["status"] in ("queued", "running")
if st.button("✨ Generate / Regenerate", use_container_width=True, disabled=bool(_cl_running)):
submit_task(DEFAULT_DB, "cover_letter", selected_id)
submit_task(get_db_path(), "cover_letter", selected_id)
st.rerun()
if _cl_running:
@st.fragment(run_every=3)
def _cl_status_fragment():
t = get_task_for_job(DEFAULT_DB, "cover_letter", selected_id)
t = get_task_for_job(get_db_path(), "cover_letter", selected_id)
if t and t["status"] in ("queued", "running"):
lbl = "Queued…" if t["status"] == "queued" else "Generating via LLM…"
st.info(f"{lbl}")
@ -272,7 +274,7 @@ with col_tools:
key=f"cl_refine_{selected_id}"):
import json as _json
submit_task(
DEFAULT_DB, "cover_letter", selected_id,
get_db_path(), "cover_letter", selected_id,
params=_json.dumps({
"previous_result": cl_text,
"feedback": feedback_text.strip(),
@ -288,7 +290,7 @@ with col_tools:
_copy_btn(cl_text, label="📋 Copy Letter")
with c2:
if st.button("💾 Save draft", use_container_width=True):
update_cover_letter(DEFAULT_DB, selected_id, cl_text)
update_cover_letter(get_db_path(), selected_id, cl_text)
st.success("Saved!")
# PDF generation
@ -297,7 +299,7 @@ with col_tools:
with st.spinner("Generating PDF…"):
try:
pdf_path = _make_cover_letter_pdf(job, cl_text, DOCS_DIR)
update_cover_letter(DEFAULT_DB, selected_id, cl_text)
update_cover_letter(get_db_path(), selected_id, cl_text)
st.success(f"Saved: `{pdf_path.name}`")
except Exception as e:
st.error(f"PDF error: {e}")
@ -312,13 +314,13 @@ with col_tools:
with c4:
if st.button("✅ Mark as Applied", use_container_width=True, type="primary"):
if cl_text:
update_cover_letter(DEFAULT_DB, selected_id, cl_text)
mark_applied(DEFAULT_DB, [selected_id])
update_cover_letter(get_db_path(), selected_id, cl_text)
mark_applied(get_db_path(), [selected_id])
st.success("Marked as applied!")
st.rerun()
if st.button("🚫 Reject listing", use_container_width=True):
update_job_status(DEFAULT_DB, [selected_id], "rejected")
update_job_status(get_db_path(), [selected_id], "rejected")
# Advance selectbox to next job so list doesn't snap to first item
current_idx = ids.index(selected_id) if selected_id in ids else 0
if current_idx + 1 < len(ids):