Compare commits

...

10 commits

Author SHA1 Message Date
95c5a12196 feat(cloud): add Heimdall tier resolution to cloud_session
Some checks failed
CI / test (push) Has been cancelled
Calls /admin/cloud/resolve after JWT validation to inject the user's
current subscription tier (free/paid/premium/ultra) into session_state
as cloud_tier. Cached 5 minutes via st.cache_data to avoid Heimdall
spam on every Streamlit rerun. Degrades gracefully to free on timeout
or missing token.

New env vars: HEIMDALL_URL, HEIMDALL_ADMIN_TOKEN (added to .env.example
and compose.cloud.yml). HEIMDALL_URL defaults to http://cf-license:8000
for internal Docker network access.

New helper: get_cloud_tier() — returns tier string in cloud mode, "local"
in local-first mode, so pages can distinguish self-hosted from cloud.
2026-03-10 12:31:14 -07:00
cf16af05d2 fix(cloud): extract cf_session cookie by name from X-CF-Session header 2026-03-10 09:22:08 -07:00
ba295cb010 docs: add cloud architecture + cloud-deployment.md
architecture.md: updated Docker Compose table (3 compose files), database
layer (Postgres platform + SQLite-per-user), cloud session middleware,
telemetry system, and cloud design decisions.

cloud-deployment.md (new): full operational runbook — env vars, data root
layout, GDPR deletion, platform DB queries, telemetry, backup/restore,
Caddy routing, demo instance, and onboarding a new app to the cloud.
2026-03-09 23:02:29 -07:00
a893ba6527 feat(cloud): fix backup/restore for cloud mode — SQLCipher encrypt/decrypt
T13: Three fixes:
1. backup.py: _decrypt_db_to_bytes() decrypts SQLCipher DB before archiving
   so the zip is portable to any local Docker install (plain SQLite).
2. backup.py: _encrypt_db_from_bytes() re-encrypts on restore in cloud mode
   so the app can open the restored DB normally.
3. 2_Settings.py: _base_dir uses get_db_path().parent in cloud mode (user's
   per-tenant data dir) instead of the hardcoded app root; db_key wired
   through both create_backup() and restore_backup() calls.

6 new cloud backup tests + 2 unit tests for SQLCipher helpers (pysqlcipher3
mocked — not available in the local conda test env). 419/419 total passing.
2026-03-09 22:41:44 -07:00
f230588291 feat(cloud): Privacy & Telemetry tab in Settings + update_consent()
T11: Add CLOUD_MODE-gated Privacy tab to Settings with full telemetry
consent UI — hard kill switch, anonymous usage toggle, de-identified
content sharing toggle, and time-limited support access grant. All changes
persist to telemetry_consent table via new update_consent() in telemetry.py.

Tab and all DB calls are completely no-op in local mode (CLOUD_MODE=false).
2026-03-09 22:14:22 -07:00
3b9bd5f551 feat(cloud): add compose.cloud.yml and telemetry consent middleware
T8: compose.cloud.yml — multi-tenant cloud stack on port 8505, CLOUD_MODE=true,
per-user encrypted data at /devl/menagerie-data, joins caddy-proxy_caddy-internal
network; .env.example extended with five cloud-only env vars.

T10: app/telemetry.py — log_usage_event() is the ONLY entry point to usage_events
table; hard kill switch (all_disabled) checked before any DB write; complete no-op
in local mode; swallows all exceptions so telemetry never crashes the app;
psycopg2-binary added to requirements.txt. Event calls wired into 4_Apply.py at
cover_letter_generated and job_applied. 5 tests, 413/413 total passing.
2026-03-09 22:10:18 -07:00
357891d335 feat(peregrine): wire cloud_session into pages for multi-tenant db path routing
resolve_session() is a no-op in local mode — no behavior change for existing users.
In cloud mode, injects user-scoped db_path into st.session_state at page load.
2026-03-09 20:22:17 -07:00
6d8f4385d9 feat(peregrine): add cloud_session middleware + SQLCipher get_connection()
cloud_session.py: no-op in local mode; in cloud mode resolves Directus JWT
from X-CF-Session header to per-user db_path in st.session_state.

get_connection() in scripts/db.py: transparent SQLCipher/sqlite3 switch —
uses encrypted driver when CLOUD_MODE=true and key provided, vanilla sqlite3
otherwise. libsqlcipher-dev added to Dockerfile for Docker builds.

6 new cloud_session tests + 1 new get_connection test — 34/34 db tests pass.
2026-03-09 19:43:42 -07:00
9efb6e0432 fix(peregrine): correct port comment in compose.demo.yml, update CLAUDE.md 2026-03-09 15:22:10 -07:00
a3433ab732 chore(peregrine): rename compose.menagerie.yml to compose.demo.yml
Public demo instances moving to demo.circuitforge.tech;
menagerie.circuitforge.tech reserved for cloud-hosted managed instances.
2026-03-09 14:55:38 -07:00
21 changed files with 1377 additions and 139 deletions

View file

@ -27,3 +27,12 @@ FORGEJO_REPO=pyr0ball/peregrine
FORGEJO_API_URL=https://git.opensourcesolarpunk.com/api/v1
# GITHUB_TOKEN= # future — enable when public mirror is active
# GITHUB_REPO= # future
# Cloud multi-tenancy (compose.cloud.yml only — do not set for local installs)
CLOUD_MODE=false
CLOUD_DATA_ROOT=/devl/menagerie-data
DIRECTUS_JWT_SECRET= # must match website/.env DIRECTUS_SECRET value
CF_SERVER_SECRET= # random 64-char hex — generate: openssl rand -hex 32
PLATFORM_DB_URL=postgresql://cf_platform:<password>@host.docker.internal:5433/circuitforge_platform
HEIMDALL_URL=http://cf-license:8000 # internal Docker URL; override for external access
HEIMDALL_ADMIN_TOKEN= # must match ADMIN_TOKEN in circuitforge-license .env

View file

@ -4,8 +4,9 @@ FROM python:3.11-slim
WORKDIR /app
# System deps for companyScraper (beautifulsoup4, fake-useragent, lxml) and PDF gen
# libsqlcipher-dev: required to build pysqlcipher3 (SQLCipher AES-256 encryption for cloud mode)
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc libffi-dev curl \
gcc libffi-dev curl libsqlcipher-dev \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .

View file

@ -18,12 +18,14 @@ _USER_YAML = Path(__file__).parent.parent / "config" / "user.yaml"
_profile = UserProfile(_USER_YAML) if UserProfile.exists(_USER_YAML) else None
_name = _profile.name if _profile else "Job Seeker"
from scripts.db import DEFAULT_DB, init_db, get_job_counts, purge_jobs, purge_email_data, \
from scripts.db import init_db, get_job_counts, purge_jobs, purge_email_data, \
purge_non_remote, archive_jobs, kill_stuck_tasks, get_task_for_job, get_active_tasks, \
insert_job, get_existing_urls
from scripts.task_runner import submit_task
from app.cloud_session import resolve_session, get_db_path
init_db(DEFAULT_DB)
resolve_session("peregrine")
init_db(get_db_path())
def _email_configured() -> bool:
_e = Path(__file__).parent.parent / "config" / "email.yaml"
@ -136,7 +138,7 @@ st.divider()
@st.fragment(run_every=10)
def _live_counts():
counts = get_job_counts(DEFAULT_DB)
counts = get_job_counts(get_db_path())
col1, col2, col3, col4, col5 = st.columns(5)
col1.metric("Pending Review", counts.get("pending", 0))
col2.metric("Approved", counts.get("approved", 0))
@ -155,18 +157,18 @@ with left:
st.subheader("Find New Jobs")
st.caption("Scrapes all configured boards and adds new listings to your review queue.")
_disc_task = get_task_for_job(DEFAULT_DB, "discovery", 0)
_disc_task = get_task_for_job(get_db_path(), "discovery", 0)
_disc_running = _disc_task and _disc_task["status"] in ("queued", "running")
if st.button("🚀 Run Discovery", use_container_width=True, type="primary",
disabled=bool(_disc_running)):
submit_task(DEFAULT_DB, "discovery", 0)
submit_task(get_db_path(), "discovery", 0)
st.rerun()
if _disc_running:
@st.fragment(run_every=4)
def _disc_status():
t = get_task_for_job(DEFAULT_DB, "discovery", 0)
t = get_task_for_job(get_db_path(), "discovery", 0)
if t and t["status"] in ("queued", "running"):
lbl = "Queued…" if t["status"] == "queued" else "Scraping job boards… this may take a minute"
st.info(f"{lbl}")
@ -184,18 +186,18 @@ with enrich_col:
st.subheader("Enrich Descriptions")
st.caption("Re-fetch missing descriptions for any listing (LinkedIn, Indeed, Glassdoor, Adzuna, The Ladders, generic).")
_enrich_task = get_task_for_job(DEFAULT_DB, "enrich_descriptions", 0)
_enrich_task = get_task_for_job(get_db_path(), "enrich_descriptions", 0)
_enrich_running = _enrich_task and _enrich_task["status"] in ("queued", "running")
if st.button("🔍 Fill Missing Descriptions", use_container_width=True, type="primary",
disabled=bool(_enrich_running)):
submit_task(DEFAULT_DB, "enrich_descriptions", 0)
submit_task(get_db_path(), "enrich_descriptions", 0)
st.rerun()
if _enrich_running:
@st.fragment(run_every=4)
def _enrich_status():
t = get_task_for_job(DEFAULT_DB, "enrich_descriptions", 0)
t = get_task_for_job(get_db_path(), "enrich_descriptions", 0)
if t and t["status"] in ("queued", "running"):
st.info("⏳ Fetching descriptions…")
else:
@ -210,7 +212,7 @@ with enrich_col:
with mid:
unscored = sum(1 for j in __import__("scripts.db", fromlist=["get_jobs_by_status"])
.get_jobs_by_status(DEFAULT_DB, "pending")
.get_jobs_by_status(get_db_path(), "pending")
if j.get("match_score") is None and j.get("description"))
st.subheader("Score Listings")
st.caption(f"Run TF-IDF match scoring against {_name}'s resume. {unscored} pending job{'s' if unscored != 1 else ''} unscored.")
@ -231,7 +233,7 @@ with mid:
st.rerun()
with right:
approved_count = get_job_counts(DEFAULT_DB).get("approved", 0)
approved_count = get_job_counts(get_db_path()).get("approved", 0)
st.subheader("Send to Notion")
st.caption("Push all approved jobs to your Notion tracking database.")
if approved_count == 0:
@ -243,7 +245,7 @@ with right:
):
with st.spinner("Syncing to Notion…"):
from scripts.sync import sync_to_notion
count = sync_to_notion(DEFAULT_DB)
count = sync_to_notion(get_db_path())
st.success(f"Synced {count} job{'s' if count != 1 else ''} to Notion!")
st.rerun()
@ -258,18 +260,18 @@ with email_left:
"New recruiter outreach is added to your Job Review queue.")
with email_right:
_email_task = get_task_for_job(DEFAULT_DB, "email_sync", 0)
_email_task = get_task_for_job(get_db_path(), "email_sync", 0)
_email_running = _email_task and _email_task["status"] in ("queued", "running")
if st.button("📧 Sync Emails", use_container_width=True, type="primary",
disabled=bool(_email_running)):
submit_task(DEFAULT_DB, "email_sync", 0)
submit_task(get_db_path(), "email_sync", 0)
st.rerun()
if _email_running:
@st.fragment(run_every=4)
def _email_status():
t = get_task_for_job(DEFAULT_DB, "email_sync", 0)
t = get_task_for_job(get_db_path(), "email_sync", 0)
if t and t["status"] in ("queued", "running"):
st.info("⏳ Syncing emails…")
else:
@ -304,7 +306,7 @@ with url_tab:
disabled=not (url_text or "").strip()):
_urls = [u.strip() for u in url_text.strip().splitlines() if u.strip().startswith("http")]
if _urls:
_n = _queue_url_imports(DEFAULT_DB, _urls)
_n = _queue_url_imports(get_db_path(), _urls)
if _n:
st.success(f"Queued {_n} job{'s' if _n != 1 else ''} for import. Check Job Review shortly.")
else:
@ -327,7 +329,7 @@ with csv_tab:
if _csv_urls:
st.caption(f"Found {len(_csv_urls)} URL(s) in CSV.")
if st.button("📥 Import CSV Jobs", key="add_csv_btn", use_container_width=True):
_n = _queue_url_imports(DEFAULT_DB, _csv_urls)
_n = _queue_url_imports(get_db_path(),_csv_urls)
st.success(f"Queued {_n} job{'s' if _n != 1 else ''} for import.")
st.rerun()
else:
@ -337,7 +339,7 @@ with csv_tab:
@st.fragment(run_every=3)
def _scrape_status():
import sqlite3 as _sq
conn = _sq.connect(DEFAULT_DB)
conn = _sq.connect(get_db_path())
conn.row_factory = _sq.Row
rows = conn.execute(
"""SELECT bt.status, bt.error, j.title, j.company, j.url
@ -384,7 +386,7 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.warning("Are you sure? This cannot be undone.")
c1, c2 = st.columns(2)
if c1.button("Yes, purge", type="primary", use_container_width=True):
deleted = purge_jobs(DEFAULT_DB, statuses=["pending", "rejected"])
deleted = purge_jobs(get_db_path(), statuses=["pending", "rejected"])
st.success(f"Purged {deleted} jobs.")
st.session_state.pop("confirm_purge", None)
st.rerun()
@ -402,7 +404,7 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.warning("This deletes all email contacts and email-sourced jobs. Cannot be undone.")
c1, c2 = st.columns(2)
if c1.button("Yes, purge emails", type="primary", use_container_width=True):
contacts, jobs = purge_email_data(DEFAULT_DB)
contacts, jobs = purge_email_data(get_db_path())
st.success(f"Purged {contacts} email contacts, {jobs} email jobs.")
st.session_state.pop("confirm_purge", None)
st.rerun()
@ -411,11 +413,11 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.rerun()
with tasks_col:
_active = get_active_tasks(DEFAULT_DB)
_active = get_active_tasks(get_db_path())
st.markdown("**Kill stuck tasks**")
st.caption(f"Force-fail all queued/running background tasks. Currently **{len(_active)}** active.")
if st.button("⏹ Kill All Tasks", use_container_width=True, disabled=len(_active) == 0):
killed = kill_stuck_tasks(DEFAULT_DB)
killed = kill_stuck_tasks(get_db_path())
st.success(f"Killed {killed} task(s).")
st.rerun()
@ -429,8 +431,8 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.warning("This will delete ALL pending, approved, and rejected jobs, then re-scrape. Applied and synced records are kept.")
c1, c2 = st.columns(2)
if c1.button("Yes, wipe + scrape", type="primary", use_container_width=True):
purge_jobs(DEFAULT_DB, statuses=["pending", "approved", "rejected"])
submit_task(DEFAULT_DB, "discovery", 0)
purge_jobs(get_db_path(), statuses=["pending", "approved", "rejected"])
submit_task(get_db_path(), "discovery", 0)
st.session_state.pop("confirm_purge", None)
st.rerun()
if c2.button("Cancel ", use_container_width=True):
@ -451,7 +453,7 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.warning("Deletes all pending jobs. Rejected jobs are kept. Cannot be undone.")
c1, c2 = st.columns(2)
if c1.button("Yes, purge pending", type="primary", use_container_width=True):
deleted = purge_jobs(DEFAULT_DB, statuses=["pending"])
deleted = purge_jobs(get_db_path(), statuses=["pending"])
st.success(f"Purged {deleted} pending jobs.")
st.session_state.pop("confirm_purge", None)
st.rerun()
@ -469,7 +471,7 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.warning("Deletes all non-remote jobs not yet applied to. Cannot be undone.")
c1, c2 = st.columns(2)
if c1.button("Yes, purge on-site", type="primary", use_container_width=True):
deleted = purge_non_remote(DEFAULT_DB)
deleted = purge_non_remote(get_db_path())
st.success(f"Purged {deleted} non-remote jobs.")
st.session_state.pop("confirm_purge", None)
st.rerun()
@ -487,7 +489,7 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.warning("Deletes all approved-but-not-applied jobs. Cannot be undone.")
c1, c2 = st.columns(2)
if c1.button("Yes, purge approved", type="primary", use_container_width=True):
deleted = purge_jobs(DEFAULT_DB, statuses=["approved"])
deleted = purge_jobs(get_db_path(), statuses=["approved"])
st.success(f"Purged {deleted} approved jobs.")
st.session_state.pop("confirm_purge", None)
st.rerun()
@ -512,7 +514,7 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.info("Jobs will be archived (not deleted) — URLs are kept for dedup.")
c1, c2 = st.columns(2)
if c1.button("Yes, archive", type="primary", use_container_width=True):
archived = archive_jobs(DEFAULT_DB, statuses=["pending", "rejected"])
archived = archive_jobs(get_db_path(), statuses=["pending", "rejected"])
st.success(f"Archived {archived} jobs.")
st.session_state.pop("confirm_purge", None)
st.rerun()
@ -530,7 +532,7 @@ with st.expander("⚠️ Danger Zone", expanded=False):
st.info("Approved jobs will be archived (not deleted).")
c1, c2 = st.columns(2)
if c1.button("Yes, archive approved", type="primary", use_container_width=True):
archived = archive_jobs(DEFAULT_DB, statuses=["approved"])
archived = archive_jobs(get_db_path(), statuses=["approved"])
st.success(f"Archived {archived} approved jobs.")
st.session_state.pop("confirm_purge", None)
st.rerun()

View file

@ -22,6 +22,7 @@ IS_DEMO = os.environ.get("DEMO_MODE", "").lower() in ("1", "true", "yes")
import streamlit as st
from scripts.db import DEFAULT_DB, init_db, get_active_tasks
from app.feedback import inject_feedback_button
from app.cloud_session import resolve_session, get_db_path
import sqlite3
st.set_page_config(
@ -30,7 +31,8 @@ st.set_page_config(
layout="wide",
)
init_db(DEFAULT_DB)
resolve_session("peregrine")
init_db(get_db_path())
# ── Startup cleanup — runs once per server process via cache_resource ──────────
@st.cache_resource
@ -40,7 +42,7 @@ def _startup() -> None:
2. Auto-queues re-runs for any research generated without SearXNG data,
if SearXNG is now reachable.
"""
conn = sqlite3.connect(DEFAULT_DB)
conn = sqlite3.connect(get_db_path())
conn.execute(
"UPDATE background_tasks SET status='failed', error='Interrupted by server restart',"
" finished_at=datetime('now') WHERE status IN ('queued','running')"
@ -61,7 +63,7 @@ def _startup() -> None:
_ACTIVE_STAGES,
).fetchall()
for (job_id,) in rows:
submit_task(str(DEFAULT_DB), "company_research", job_id)
submit_task(str(get_db_path()), "company_research", job_id)
except Exception:
pass # never block startup
@ -113,7 +115,7 @@ pg = st.navigation(pages)
# The sidebar context WRAPS the fragment call — do not write to st.sidebar inside it.
@st.fragment(run_every=3)
def _task_indicator():
tasks = get_active_tasks(DEFAULT_DB)
tasks = get_active_tasks(get_db_path())
if not tasks:
return
st.divider()

152
app/cloud_session.py Normal file
View file

@ -0,0 +1,152 @@
# peregrine/app/cloud_session.py
"""
Cloud session middleware for multi-tenant Peregrine deployment.
In local-first mode (CLOUD_MODE unset or false), all functions are no-ops.
In cloud mode (CLOUD_MODE=true), resolves the Directus session JWT from the
X-CF-Session header, validates it, and injects user_id + db_path into
st.session_state.
All Peregrine pages call get_db_path() instead of DEFAULT_DB directly to
transparently support both local and cloud deployments.
"""
import logging
import os
import re
import hmac
import hashlib
from pathlib import Path
import requests
import streamlit as st
from scripts.db import DEFAULT_DB
log = logging.getLogger(__name__)
CLOUD_MODE: bool = os.environ.get("CLOUD_MODE", "").lower() in ("1", "true", "yes")
CLOUD_DATA_ROOT: Path = Path(os.environ.get("CLOUD_DATA_ROOT", "/devl/menagerie-data"))
DIRECTUS_JWT_SECRET: str = os.environ.get("DIRECTUS_JWT_SECRET", "")
SERVER_SECRET: str = os.environ.get("CF_SERVER_SECRET", "")
# Heimdall license server — internal URL preferred when running on the same host
HEIMDALL_URL: str = os.environ.get("HEIMDALL_URL", "https://license.circuitforge.tech")
HEIMDALL_ADMIN_TOKEN: str = os.environ.get("HEIMDALL_ADMIN_TOKEN", "")
def _extract_session_token(cookie_header: str) -> str:
"""Extract cf_session value from a Cookie header string."""
m = re.search(r'(?:^|;)\s*cf_session=([^;]+)', cookie_header)
return m.group(1).strip() if m else ""
@st.cache_data(ttl=300, show_spinner=False)
def _fetch_cloud_tier(user_id: str, product: str) -> str:
"""Call Heimdall to resolve the current cloud tier for this user.
Cached per (user_id, product) for 5 minutes to avoid hammering Heimdall
on every Streamlit rerun. Returns "free" on any error so the app degrades
gracefully rather than blocking the user.
"""
if not HEIMDALL_ADMIN_TOKEN:
log.warning("HEIMDALL_ADMIN_TOKEN not set — defaulting tier to free")
return "free"
try:
resp = requests.post(
f"{HEIMDALL_URL}/admin/cloud/resolve",
json={"user_id": user_id, "product": product},
headers={"Authorization": f"Bearer {HEIMDALL_ADMIN_TOKEN}"},
timeout=5,
)
if resp.status_code == 200:
return resp.json().get("tier", "free")
if resp.status_code == 404:
# No cloud key yet — user signed up before provision ran; return free.
return "free"
log.warning("Heimdall resolve returned %s — defaulting tier to free", resp.status_code)
except Exception as exc:
log.warning("Heimdall tier resolve failed: %s — defaulting to free", exc)
return "free"
def validate_session_jwt(token: str) -> str:
"""Validate a Directus session JWT and return the user UUID. Raises on failure."""
import jwt # PyJWT — lazy import so local mode never needs it
payload = jwt.decode(token, DIRECTUS_JWT_SECRET, algorithms=["HS256"])
user_id = payload.get("id") or payload.get("sub")
if not user_id:
raise ValueError("JWT missing user id claim")
return user_id
def _user_data_path(user_id: str, app: str) -> Path:
return CLOUD_DATA_ROOT / user_id / app
def derive_db_key(user_id: str) -> str:
"""Derive a per-user SQLCipher encryption key from the server secret."""
return hmac.new(
SERVER_SECRET.encode(),
user_id.encode(),
hashlib.sha256,
).hexdigest()
def resolve_session(app: str = "peregrine") -> None:
"""
Call at the top of each Streamlit page.
In local mode: no-op.
In cloud mode: reads X-CF-Session header, validates JWT, creates user
data directory on first visit, and sets st.session_state keys:
- user_id: str
- db_path: Path
- db_key: str (SQLCipher key for this user)
- cloud_tier: str (free | paid | premium | ultra resolved from Heimdall)
Idempotent skips if user_id already in session_state.
"""
if not CLOUD_MODE:
return
if st.session_state.get("user_id"):
return
cookie_header = st.context.headers.get("x-cf-session", "")
session_jwt = _extract_session_token(cookie_header)
if not session_jwt:
st.error("Session token missing. Please log in at circuitforge.tech.")
st.stop()
try:
user_id = validate_session_jwt(session_jwt)
except Exception as exc:
st.error(f"Invalid session — please log in again. ({exc})")
st.stop()
user_path = _user_data_path(user_id, app)
user_path.mkdir(parents=True, exist_ok=True)
(user_path / "config").mkdir(exist_ok=True)
(user_path / "data").mkdir(exist_ok=True)
st.session_state["user_id"] = user_id
st.session_state["db_path"] = user_path / "staging.db"
st.session_state["db_key"] = derive_db_key(user_id)
st.session_state["cloud_tier"] = _fetch_cloud_tier(user_id, app)
def get_db_path() -> Path:
"""
Return the active db_path for this session.
Cloud: user-scoped path from session_state.
Local: DEFAULT_DB (from STAGING_DB env var or repo default).
"""
return st.session_state.get("db_path", DEFAULT_DB)
def get_cloud_tier() -> str:
"""
Return the current user's cloud tier.
Cloud mode: resolved from Heimdall at session start (cached 5 min).
Local mode: always returns "local" so pages can distinguish self-hosted from cloud.
"""
if not CLOUD_MODE:
return "local"
return st.session_state.get("cloud_tier", "free")

View file

@ -15,6 +15,9 @@ sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import streamlit as st
import yaml
from app.cloud_session import resolve_session, get_db_path
resolve_session("peregrine")
_ROOT = Path(__file__).parent.parent.parent
CONFIG_DIR = _ROOT / "config"
USER_YAML = CONFIG_DIR / "user.yaml"
@ -74,18 +77,16 @@ def _suggest_profile(gpus: list[str]) -> str:
def _submit_wizard_task(section: str, input_data: dict) -> int:
"""Submit a wizard_generate background task. Returns task_id."""
from scripts.db import DEFAULT_DB
from scripts.task_runner import submit_task
params = json.dumps({"section": section, "input": input_data})
task_id, _ = submit_task(DEFAULT_DB, "wizard_generate", 0, params=params)
task_id, _ = submit_task(get_db_path(), "wizard_generate", 0, params=params)
return task_id
def _poll_wizard_task(section: str) -> dict | None:
"""Return the most recent wizard_generate task row for a given section, or None."""
import sqlite3
from scripts.db import DEFAULT_DB
conn = sqlite3.connect(DEFAULT_DB)
conn = sqlite3.connect(get_db_path())
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT * FROM background_tasks "

View file

@ -12,11 +12,13 @@ import yaml
import os as _os
from scripts.user_profile import UserProfile
from app.cloud_session import resolve_session, get_db_path, CLOUD_MODE
_USER_YAML = Path(__file__).parent.parent.parent / "config" / "user.yaml"
_profile = UserProfile(_USER_YAML) if UserProfile.exists(_USER_YAML) else None
_name = _profile.name if _profile else "Job Seeker"
resolve_session("peregrine")
st.title("⚙️ Settings")
CONFIG_DIR = Path(__file__).parent.parent.parent / "config"
@ -63,10 +65,13 @@ _tab_names = [
"👤 My Profile", "📝 Resume Profile", "🔎 Search",
"⚙️ System", "🎯 Fine-Tune", "🔑 License", "💾 Data"
]
if CLOUD_MODE:
_tab_names.append("🔒 Privacy")
if _show_dev_tab:
_tab_names.append("🛠️ Developer")
_all_tabs = st.tabs(_tab_names)
tab_profile, tab_resume, tab_search, tab_system, tab_finetune, tab_license, tab_data = _all_tabs[:7]
tab_privacy = _all_tabs[7] if CLOUD_MODE else None
# ── Inline LLM generate buttons ───────────────────────────────────────────────
# Unlocked when user has a configured LLM backend (BYOK) OR a paid tier.
@ -1371,12 +1376,11 @@ with tab_finetune:
st.markdown("**Step 2: Extract Training Pairs**")
import json as _json
import sqlite3 as _sqlite3
from scripts.db import DEFAULT_DB as _FT_DB
jsonl_path = _profile.docs_dir / "training_data" / "cover_letters.jsonl"
# Show task status
_ft_conn = _sqlite3.connect(_FT_DB)
_ft_conn = _sqlite3.connect(get_db_path())
_ft_conn.row_factory = _sqlite3.Row
_ft_task = _ft_conn.execute(
"SELECT * FROM background_tasks WHERE task_type='prepare_training' ORDER BY id DESC LIMIT 1"
@ -1513,7 +1517,10 @@ with tab_data:
from scripts.backup import create_backup, list_backup_contents, restore_backup as _do_restore
_base_dir = Path(__file__).parent.parent.parent
# Cloud mode: per-user data lives at get_db_path().parent — not the app root.
# db_key is used to transparently decrypt on export and re-encrypt on import.
_db_key = st.session_state.get("db_key", "") if CLOUD_MODE else ""
_base_dir = get_db_path().parent if (CLOUD_MODE and st.session_state.get("db_path")) else Path(__file__).parent.parent.parent
# ── Backup ────────────────────────────────────────────────────────────────
st.markdown("### 📦 Create Backup")
@ -1521,7 +1528,7 @@ with tab_data:
if st.button("Create Backup", key="backup_create"):
with st.spinner("Creating backup…"):
try:
_zip_bytes = create_backup(_base_dir, include_db=_incl_db)
_zip_bytes = create_backup(_base_dir, include_db=_incl_db, db_key=_db_key)
_info = list_backup_contents(_zip_bytes)
from datetime import datetime as _dt
_ts = _dt.now().strftime("%Y%m%d-%H%M%S")
@ -1568,6 +1575,7 @@ with tab_data:
_zip_bytes, _base_dir,
include_db=_restore_db,
overwrite=_restore_overwrite,
db_key=_db_key,
)
st.success(f"Restored {len(_result['restored'])} files.")
with st.expander("Details"):
@ -1725,3 +1733,103 @@ if _show_dev_tab:
st.caption("Label distribution:")
for _lbl, _cnt in sorted(_label_counts.items(), key=lambda x: -x[1]):
st.caption(f" `{_lbl}`: {_cnt}")
# ── Privacy & Telemetry (cloud mode only) ─────────────────────────────────────
if CLOUD_MODE and tab_privacy is not None:
with tab_privacy:
from app.telemetry import get_consent as _get_consent, update_consent as _update_consent
st.subheader("🔒 Privacy & Telemetry")
st.caption(
"You have full, unconditional control over what data leaves your session. "
"Changes take effect immediately."
)
_uid = st.session_state.get("user_id", "")
_consent = _get_consent(_uid) if _uid else {
"all_disabled": False,
"usage_events_enabled": True,
"content_sharing_enabled": False,
"support_access_enabled": False,
}
with st.expander("📊 Usage & Telemetry", expanded=True):
st.markdown(
"CircuitForge is built by a tiny team. Anonymous usage data helps us fix the "
"parts of the job search that are broken. You can opt out at any time."
)
_all_off = st.toggle(
"🚫 Disable ALL telemetry",
value=bool(_consent.get("all_disabled", False)),
key="privacy_all_disabled",
help="Hard kill switch — overrides all options below. Nothing is written or transmitted.",
)
if _all_off != _consent.get("all_disabled", False) and _uid:
_update_consent(_uid, all_disabled=_all_off)
st.rerun()
st.divider()
_disabled = _all_off # grey out individual toggles when master switch is on
_usage_on = st.toggle(
"📈 Share anonymous usage statistics",
value=bool(_consent.get("usage_events_enabled", True)),
disabled=_disabled,
key="privacy_usage_events",
help="Feature usage, error rates, completion counts — no content, no PII.",
)
if not _disabled and _usage_on != _consent.get("usage_events_enabled", True) and _uid:
_update_consent(_uid, usage_events_enabled=_usage_on)
st.rerun()
_content_on = st.toggle(
"📝 Share de-identified content for model improvement",
value=bool(_consent.get("content_sharing_enabled", False)),
disabled=_disabled,
key="privacy_content_sharing",
help=(
"Opt-in: anonymised cover letters (PII stripped) may be used to improve "
"the CircuitForge fine-tuned model. Never shared with third parties."
),
)
if not _disabled and _content_on != _consent.get("content_sharing_enabled", False) and _uid:
_update_consent(_uid, content_sharing_enabled=_content_on)
st.rerun()
st.divider()
with st.expander("🎫 Temporary Support Access", expanded=False):
st.caption(
"Grant CircuitForge support read-only access to your session for a specific "
"support ticket. Time-limited and revocable. You will be notified when access "
"expires or is used."
)
from datetime import datetime as _dt, timedelta as _td
_hours = st.selectbox(
"Access duration", [4, 8, 24, 48, 72],
format_func=lambda h: f"{h} hours",
key="privacy_support_hours",
)
_ticket = st.text_input("Support ticket reference (optional)", key="privacy_ticket_ref")
if st.button("Grant temporary support access", key="privacy_support_grant"):
if _uid:
try:
from app.telemetry import get_platform_conn as _get_pc
_pc = _get_pc()
_expires = _dt.utcnow() + _td(hours=_hours)
with _pc.cursor() as _cur:
_cur.execute(
"INSERT INTO support_access_grants "
"(user_id, expires_at, ticket_ref) VALUES (%s, %s, %s)",
(_uid, _expires, _ticket or None),
)
_pc.commit()
st.success(
f"Support access granted until {_expires.strftime('%Y-%m-%d %H:%M')} UTC. "
"You can revoke it here at any time."
)
except Exception as _e:
st.error(f"Could not save grant: {_e}")
else:
st.warning("Session not resolved — please reload the page.")

View file

@ -26,13 +26,16 @@ from scripts.db import (
get_task_for_job,
)
from scripts.task_runner import submit_task
from app.cloud_session import resolve_session, get_db_path
from app.telemetry import log_usage_event
DOCS_DIR = _profile.docs_dir if _profile else Path.home() / "Documents" / "JobSearch"
RESUME_YAML = Path(__file__).parent.parent.parent / "config" / "plain_text_resume.yaml"
st.title("🚀 Apply Workspace")
init_db(DEFAULT_DB)
resolve_session("peregrine")
init_db(get_db_path())
# ── PDF generation ─────────────────────────────────────────────────────────────
def _make_cover_letter_pdf(job: dict, cover_letter: str, output_dir: Path) -> Path:
@ -156,7 +159,7 @@ def _copy_btn(text: str, label: str = "📋 Copy", done: str = "✅ Copied!", he
)
# ── Job selection ──────────────────────────────────────────────────────────────
approved = get_jobs_by_status(DEFAULT_DB, "approved")
approved = get_jobs_by_status(get_db_path(), "approved")
if not approved:
st.info("No approved jobs — head to Job Review to approve some listings first.")
st.stop()
@ -219,17 +222,17 @@ with col_tools:
if _cl_key not in st.session_state:
st.session_state[_cl_key] = job.get("cover_letter") or ""
_cl_task = get_task_for_job(DEFAULT_DB, "cover_letter", selected_id)
_cl_task = get_task_for_job(get_db_path(), "cover_letter", selected_id)
_cl_running = _cl_task and _cl_task["status"] in ("queued", "running")
if st.button("✨ Generate / Regenerate", use_container_width=True, disabled=bool(_cl_running)):
submit_task(DEFAULT_DB, "cover_letter", selected_id)
submit_task(get_db_path(), "cover_letter", selected_id)
st.rerun()
if _cl_running:
@st.fragment(run_every=3)
def _cl_status_fragment():
t = get_task_for_job(DEFAULT_DB, "cover_letter", selected_id)
t = get_task_for_job(get_db_path(), "cover_letter", selected_id)
if t and t["status"] in ("queued", "running"):
lbl = "Queued…" if t["status"] == "queued" else "Generating via LLM…"
st.info(f"{lbl}")
@ -272,7 +275,7 @@ with col_tools:
key=f"cl_refine_{selected_id}"):
import json as _json
submit_task(
DEFAULT_DB, "cover_letter", selected_id,
get_db_path(), "cover_letter", selected_id,
params=_json.dumps({
"previous_result": cl_text,
"feedback": feedback_text.strip(),
@ -288,7 +291,7 @@ with col_tools:
_copy_btn(cl_text, label="📋 Copy Letter")
with c2:
if st.button("💾 Save draft", use_container_width=True):
update_cover_letter(DEFAULT_DB, selected_id, cl_text)
update_cover_letter(get_db_path(), selected_id, cl_text)
st.success("Saved!")
# PDF generation
@ -297,8 +300,10 @@ with col_tools:
with st.spinner("Generating PDF…"):
try:
pdf_path = _make_cover_letter_pdf(job, cl_text, DOCS_DIR)
update_cover_letter(DEFAULT_DB, selected_id, cl_text)
update_cover_letter(get_db_path(), selected_id, cl_text)
st.success(f"Saved: `{pdf_path.name}`")
if user_id := st.session_state.get("user_id"):
log_usage_event(user_id, "peregrine", "cover_letter_generated")
except Exception as e:
st.error(f"PDF error: {e}")
@ -312,13 +317,15 @@ with col_tools:
with c4:
if st.button("✅ Mark as Applied", use_container_width=True, type="primary"):
if cl_text:
update_cover_letter(DEFAULT_DB, selected_id, cl_text)
mark_applied(DEFAULT_DB, [selected_id])
update_cover_letter(get_db_path(), selected_id, cl_text)
mark_applied(get_db_path(), [selected_id])
st.success("Marked as applied!")
if user_id := st.session_state.get("user_id"):
log_usage_event(user_id, "peregrine", "job_applied")
st.rerun()
if st.button("🚫 Reject listing", use_container_width=True):
update_job_status(DEFAULT_DB, [selected_id], "rejected")
update_job_status(get_db_path(), [selected_id], "rejected")
# Advance selectbox to next job so list doesn't snap to first item
current_idx = ids.index(selected_id) if selected_id in ids else 0
if current_idx + 1 < len(ids):

127
app/telemetry.py Normal file
View file

@ -0,0 +1,127 @@
# peregrine/app/telemetry.py
"""
Usage event telemetry for cloud-hosted Peregrine.
In local-first mode (CLOUD_MODE unset/false), all functions are no-ops
no network calls, no DB writes, no imports of psycopg2.
In cloud mode, events are written to the platform Postgres DB ONLY after
confirming the user's telemetry consent.
THE HARD RULE: if telemetry_consent.all_disabled is True for a user,
nothing is written, no exceptions. This function is the ONLY path to
usage_events no feature may write there directly.
"""
import os
import json
from typing import Any
CLOUD_MODE: bool = os.environ.get("CLOUD_MODE", "").lower() in ("1", "true", "yes")
PLATFORM_DB_URL: str = os.environ.get("PLATFORM_DB_URL", "")
_platform_conn = None
def get_platform_conn():
"""Lazy psycopg2 connection to the platform Postgres DB. Reconnects if closed."""
global _platform_conn
if _platform_conn is None or _platform_conn.closed:
import psycopg2
_platform_conn = psycopg2.connect(PLATFORM_DB_URL)
return _platform_conn
def get_consent(user_id: str) -> dict:
"""
Fetch telemetry consent for the user.
Returns safe defaults if record doesn't exist yet:
- usage_events_enabled: True (new cloud users start opted-in, per onboarding disclosure)
- all_disabled: False
"""
conn = get_platform_conn()
with conn.cursor() as cur:
cur.execute(
"SELECT all_disabled, usage_events_enabled "
"FROM telemetry_consent WHERE user_id = %s",
(user_id,)
)
row = cur.fetchone()
if row is None:
return {"all_disabled": False, "usage_events_enabled": True}
return {"all_disabled": row[0], "usage_events_enabled": row[1]}
def log_usage_event(
user_id: str,
app: str,
event_type: str,
metadata: dict[str, Any] | None = None,
) -> None:
"""
Write a usage event to the platform DB if consent allows.
Silent no-op in local mode. Silent no-op if telemetry is disabled.
Swallows all exceptions telemetry must never crash the app.
Args:
user_id: Directus user UUID (from st.session_state["user_id"])
app: App slug ('peregrine', 'falcon', etc.)
event_type: Snake_case event label ('cover_letter_generated', 'job_applied', etc.)
metadata: Optional JSON-serialisable dict NO PII
"""
if not CLOUD_MODE:
return
try:
consent = get_consent(user_id)
if consent.get("all_disabled") or not consent.get("usage_events_enabled", True):
return
conn = get_platform_conn()
with conn.cursor() as cur:
cur.execute(
"INSERT INTO usage_events (user_id, app, event_type, metadata) "
"VALUES (%s, %s, %s, %s)",
(user_id, app, event_type, json.dumps(metadata) if metadata else None),
)
conn.commit()
except Exception:
# Telemetry must never crash the app
pass
def update_consent(user_id: str, **fields) -> None:
"""
UPSERT telemetry consent for a user.
Accepted keyword args (all optional, any subset may be provided):
all_disabled: bool
usage_events_enabled: bool
content_sharing_enabled: bool
support_access_enabled: bool
Safe to call in cloud mode only no-op in local mode.
Swallows all exceptions so the Settings UI is never broken by a DB hiccup.
"""
if not CLOUD_MODE:
return
allowed = {"all_disabled", "usage_events_enabled", "content_sharing_enabled", "support_access_enabled"}
cols = {k: v for k, v in fields.items() if k in allowed}
if not cols:
return
try:
conn = get_platform_conn()
col_names = ", ".join(cols)
placeholders = ", ".join(["%s"] * len(cols))
set_clause = ", ".join(f"{k} = EXCLUDED.{k}" for k in cols)
col_vals = list(cols.values())
with conn.cursor() as cur:
cur.execute(
f"INSERT INTO telemetry_consent (user_id, {col_names}) "
f"VALUES (%s, {placeholders}) "
f"ON CONFLICT (user_id) DO UPDATE SET {set_clause}, updated_at = NOW()",
[user_id] + col_vals,
)
conn.commit()
except Exception:
pass

57
compose.cloud.yml Normal file
View file

@ -0,0 +1,57 @@
# compose.cloud.yml — Multi-tenant cloud stack for menagerie.circuitforge.tech/peregrine
#
# Each authenticated user gets their own encrypted SQLite data tree at
# /devl/menagerie-data/<user-id>/peregrine/
#
# Caddy injects the Directus session cookie as X-CF-Session header before forwarding.
# cloud_session.py resolves user_id → per-user db_path at session init.
#
# Usage:
# docker compose -f compose.cloud.yml --project-name peregrine-cloud up -d
# docker compose -f compose.cloud.yml --project-name peregrine-cloud down
# docker compose -f compose.cloud.yml --project-name peregrine-cloud logs app -f
services:
app:
build: .
container_name: peregrine-cloud
ports:
- "8505:8501"
volumes:
- /devl/menagerie-data:/devl/menagerie-data # per-user data trees
environment:
- CLOUD_MODE=true
- CLOUD_DATA_ROOT=/devl/menagerie-data
- DIRECTUS_JWT_SECRET=${DIRECTUS_JWT_SECRET}
- CF_SERVER_SECRET=${CF_SERVER_SECRET}
- PLATFORM_DB_URL=${PLATFORM_DB_URL}
- HEIMDALL_URL=${HEIMDALL_URL:-http://cf-license:8000}
- HEIMDALL_ADMIN_TOKEN=${HEIMDALL_ADMIN_TOKEN}
- STAGING_DB=/devl/menagerie-data/cloud-default.db # fallback only — never used
- DOCS_DIR=/tmp/cloud-docs
- STREAMLIT_SERVER_BASE_URL_PATH=peregrine
- PYTHONUNBUFFERED=1
- DEMO_MODE=false
depends_on:
searxng:
condition: service_healthy
extra_hosts:
- "host.docker.internal:host-gateway"
restart: unless-stopped
searxng:
image: searxng/searxng:latest
volumes:
- ./docker/searxng:/etc/searxng:ro
healthcheck:
test: ["CMD", "wget", "-q", "--spider", "http://localhost:8080/"]
interval: 10s
timeout: 5s
retries: 3
restart: unless-stopped
# No host port — internal only
networks:
default:
external: true
name: caddy-proxy_caddy-internal

View file

@ -1,17 +1,17 @@
# compose.menagerie.yml — Public demo stack for menagerie.circuitforge.tech/peregrine
# compose.demo.yml — Public demo stack for demo.circuitforge.tech/peregrine
#
# Runs a fully isolated, neutered Peregrine instance:
# - DEMO_MODE=true: blocks all LLM inference in llm_router.py
# - demo/config/: pre-seeded demo user profile, all backends disabled
# - demo/data/: isolated SQLite DB (no personal job data)
# - No personal documents mounted
# - Port 8503 (separate from the personal instance on 8502)
# - Port 8504 (separate from the personal instance on 8502)
#
# Usage:
# docker compose -f compose.menagerie.yml --project-name peregrine-demo up -d
# docker compose -f compose.menagerie.yml --project-name peregrine-demo down
# docker compose -f compose.demo.yml --project-name peregrine-demo up -d
# docker compose -f compose.demo.yml --project-name peregrine-demo down
#
# Caddy menagerie.circuitforge.tech/peregrine* → host port 8504
# Caddy demo.circuitforge.tech/peregrine* → host port 8504
services:

View file

@ -6,87 +6,179 @@ This page describes Peregrine's system structure, layer boundaries, and key desi
## System Overview
### Pipeline
```mermaid
flowchart LR
sources["JobSpy\nCustom Boards"]
discover["discover.py"]
db[("staging.db\nSQLite")]
match["match.py\nScoring"]
review["Job Review\nApprove / Reject"]
apply["Apply Workspace\nCover letter + PDF"]
kanban["Interviews\nphone_screen → hired"]
sync["sync.py"]
notion["Notion DB"]
sources --> discover --> db --> match --> review --> apply --> kanban
db --> sync --> notion
```
┌─────────────────────────────────────────────────────────────┐
│ Docker Compose │
│ │
│ ┌──────────┐ ┌──────────┐ ┌───────┐ ┌───────────────┐ │
│ │ app │ │ ollama │ │ vllm │ │ vision │ │
│ │ :8501 │ │ :11434 │ │ :8000 │ │ :8002 │ │
│ │Streamlit │ │ Local LLM│ │ vLLM │ │ Moondream2 │ │
│ └────┬─────┘ └──────────┘ └───────┘ └───────────────┘ │
│ │ │
│ ┌────┴───────┐ ┌─────────────┐ │
│ │ searxng │ │ staging.db │ │
│ │ :8888 │ │ (SQLite) │ │
│ └────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Streamlit App Layer │
│ │
│ app/app.py (entry point, navigation, sidebar task badge) │
│ │
│ app/pages/ │
│ 0_Setup.py First-run wizard (gates everything) │
│ 1_Job_Review.py Approve / reject queue │
│ 2_Settings.py All user configuration │
│ 4_Apply.py Cover letter gen + PDF export │
│ 5_Interviews.py Kanban: phone_screen → hired │
│ 6_Interview_Prep.py Research brief + practice Q&A │
│ 7_Survey.py Culture-fit survey assistant │
│ │
│ app/wizard/ │
│ step_hardware.py ... step_integrations.py │
│ tiers.py Feature gate definitions │
└─────────────────────────────────────────────────────────────┘
### Docker Compose Services
┌─────────────────────────────────────────────────────────────┐
│ Scripts Layer │
│ (framework-independent — could be called by FastAPI) │
│ │
│ discover.py JobSpy + custom board orchestration │
│ match.py Resume keyword scoring │
│ db.py All SQLite helpers (single source) │
│ llm_router.py LLM fallback chain │
│ generate_cover_letter.py Cover letter generation │
│ company_research.py Pre-interview research brief │
│ task_runner.py Background daemon thread executor │
│ imap_sync.py IMAP email fetch + classify │
│ sync.py Push to external integrations │
│ user_profile.py UserProfile wrapper for user.yaml │
│ preflight.py Port + resource check │
│ │
│ custom_boards/ Per-board scrapers │
│ integrations/ Per-service integration drivers │
│ vision_service/ FastAPI Moondream2 inference server │
└─────────────────────────────────────────────────────────────┘
Three compose files serve different deployment contexts:
┌─────────────────────────────────────────────────────────────┐
│ Config Layer │
│ │
│ config/user.yaml Personal data + wizard state │
│ config/llm.yaml LLM backends + fallback chains │
│ config/search_profiles.yaml Job search configuration │
│ config/resume_keywords.yaml Scoring keywords │
│ config/blocklist.yaml Excluded companies/domains │
│ config/email.yaml IMAP credentials │
│ config/integrations/ Per-integration credentials │
└─────────────────────────────────────────────────────────────┘
| File | Project name | Port | Purpose |
|------|-------------|------|---------|
| `compose.yml` | `peregrine` | 8502 | Local self-hosted install (default) |
| `compose.demo.yml` | `peregrine-demo` | 8504 | Public demo at `demo.circuitforge.tech/peregrine``DEMO_MODE=true`, no LLM |
| `compose.cloud.yml` | `peregrine-cloud` | 8505 | Cloud managed instance at `menagerie.circuitforge.tech/peregrine``CLOUD_MODE=true`, per-user data |
┌─────────────────────────────────────────────────────────────┐
│ Database Layer │
│ │
│ staging.db (SQLite, local, gitignored) │
│ │
│ jobs Core pipeline — all job data │
│ job_contacts Email thread log per job │
│ company_research LLM-generated research briefs │
│ background_tasks Async task queue state │
│ survey_responses Culture-fit survey Q&A pairs │
└─────────────────────────────────────────────────────────────┘
```mermaid
flowchart TB
subgraph local["compose.yml (local)"]
app_l["**app** :8502\nStreamlit UI"]
ollama_l["**ollama**\nLocal LLM"]
vllm_l["**vllm**\nvLLM"]
vision_l["**vision**\nMoondream2"]
searxng_l["**searxng**\nWeb Search"]
db_l[("staging.db\nSQLite")]
end
subgraph cloud["compose.cloud.yml (cloud)"]
app_c["**app** :8505\nStreamlit UI\nCLOUD_MODE=true"]
searxng_c["**searxng**\nWeb Search"]
db_c[("menagerie-data/\n&lt;user-id&gt;/staging.db\nSQLCipher")]
pg[("Postgres\nplatform DB\n:5433")]
end
```
Solid lines = always connected. Dashed lines = optional/profile-dependent backends.
### Streamlit App Layer
```mermaid
flowchart TD
entry["app/app.py\nEntry point · navigation · sidebar task badge"]
setup["0_Setup.py\nFirst-run wizard\n⚠ Gates everything"]
review["1_Job_Review.py\nApprove / reject queue"]
settings["2_Settings.py\nAll user configuration"]
apply["4_Apply.py\nCover letter gen + PDF export"]
interviews["5_Interviews.py\nKanban: phone_screen → hired"]
prep["6_Interview_Prep.py\nResearch brief + practice Q&A"]
survey["7_Survey.py\nCulture-fit survey assistant"]
wizard["app/wizard/\nstep_hardware.py … step_integrations.py\ntiers.py — feature gate definitions"]
entry --> setup
entry --> review
entry --> settings
entry --> apply
entry --> interviews
entry --> prep
entry --> survey
setup <-.->|wizard steps| wizard
```
### Scripts Layer
Framework-independent — no Streamlit imports. Can be called from CLI, FastAPI, or background threads.
| Script | Purpose |
|--------|---------|
| `discover.py` | JobSpy + custom board orchestration |
| `match.py` | Resume keyword scoring |
| `db.py` | All SQLite helpers (single source of truth) |
| `llm_router.py` | LLM fallback chain |
| `generate_cover_letter.py` | Cover letter generation |
| `company_research.py` | Pre-interview research brief |
| `task_runner.py` | Background daemon thread executor |
| `imap_sync.py` | IMAP email fetch + classify |
| `sync.py` | Push to external integrations |
| `user_profile.py` | `UserProfile` wrapper for `user.yaml` |
| `preflight.py` | Port + resource check |
| `custom_boards/` | Per-board scrapers |
| `integrations/` | Per-service integration drivers |
| `vision_service/` | FastAPI Moondream2 inference server |
### Config Layer
Plain YAML files. Gitignored files contain secrets; `.example` files are committed as templates.
| File | Purpose |
|------|---------|
| `config/user.yaml` | Personal data + wizard state |
| `config/llm.yaml` | LLM backends + fallback chains |
| `config/search_profiles.yaml` | Job search configuration |
| `config/resume_keywords.yaml` | Scoring keywords |
| `config/blocklist.yaml` | Excluded companies/domains |
| `config/email.yaml` | IMAP credentials |
| `config/integrations/` | Per-integration credentials |
### Database Layer
**Local mode** — `staging.db`: SQLite, single file, gitignored.
**Cloud mode** — Hybrid:
- **Postgres (platform layer):** account data, subscriptions, telemetry consent. Shared across all users.
- **SQLite-per-user (content layer):** each user's job data in an isolated, SQLCipher-encrypted file at `/devl/menagerie-data/<user-id>/peregrine/staging.db`. Schema is identical to local — the app sees no difference.
#### Local SQLite tables
| Table | Purpose |
|-------|---------|
| `jobs` | Core pipeline — all job data |
| `job_contacts` | Email thread log per job |
| `company_research` | LLM-generated research briefs |
| `background_tasks` | Async task queue state |
| `survey_responses` | Culture-fit survey Q&A pairs |
#### Postgres platform tables (cloud only)
| Table | Purpose |
|-------|---------|
| `subscriptions` | User tier, license JWT, product |
| `usage_events` | Anonymous usage telemetry (consent-gated) |
| `telemetry_consent` | Per-user telemetry preferences + hard kill switch |
| `support_access_grants` | Time-limited support session grants |
---
### Cloud Session Middleware
`app/cloud_session.py` handles multi-tenant routing transparently:
```
Request → Caddy injects X-CF-Session header (from Directus session cookie)
→ resolve_session() validates JWT, derives db_path + db_key
→ all DB calls use get_db_path() instead of DEFAULT_DB
```
Key functions:
| Function | Purpose |
|----------|---------|
| `resolve_session(app)` | Called at top of every page — no-op in local mode |
| `get_db_path()` | Returns per-user `db_path` (cloud) or `DEFAULT_DB` (local) |
| `derive_db_key(user_id)` | `HMAC(SERVER_SECRET, user_id)` — deterministic per-user SQLCipher key |
The app code never branches on `CLOUD_MODE` except at the entry points (`resolve_session` and `get_db_path`). Everything downstream is transparent.
### Telemetry (cloud only)
`app/telemetry.py` is the **only** path to the `usage_events` table. No feature may write there directly.
```python
from app.telemetry import log_usage_event
log_usage_event(user_id, "peregrine", "cover_letter_generated", {"words": 350})
```
- Complete no-op when `CLOUD_MODE=false`
- Checks `telemetry_consent.all_disabled` first — if set, nothing is written, no exceptions
- Swallows all exceptions so telemetry never crashes the app
---
## Layer Boundaries
@ -129,7 +221,18 @@ submit_task(db_path, task_type="cover_letter", job_id=42)
submit_task(db_path, task_type="company_research", job_id=42)
```
Tasks are recorded in the `background_tasks` table with statuses: `queued → running → completed / failed`.
Tasks are recorded in the `background_tasks` table with the following state machine:
```mermaid
stateDiagram-v2
[*] --> queued : submit_task()
queued --> running : daemon picks up
running --> completed
running --> failed
queued --> failed : server restart clears stuck tasks
completed --> [*]
failed --> [*]
```
**Dedup rule:** Only one `queued` or `running` task per `(task_type, job_id)` pair is allowed at a time. Submitting a duplicate is a silent no-op.
@ -166,3 +269,18 @@ The scripts layer was deliberately kept free of Streamlit imports. This means th
### Vision service is a separate process
Moondream2 requires `torch` and `transformers`, which are incompatible with the lightweight main conda environment. The vision service runs as a separate FastAPI process in a separate conda environment (`job-seeker-vision`), keeping the main env free of GPU dependencies.
### Cloud mode is a transparent layer, not a fork
`CLOUD_MODE=true` activates two entry points (`resolve_session`, `get_db_path`) and the telemetry middleware. Every other line of app code is unchanged. There is no cloud branch, no conditional imports, no schema divergence. The local-first architecture is preserved end-to-end; the cloud layer sits on top of it.
### SQLite-per-user instead of shared Postgres
Each cloud user gets their own encrypted SQLite file. This means:
- No SQL migrations when the schema changes — new users get the latest schema, existing users keep their file as-is
- Zero risk of cross-user data leakage at the DB layer
- GDPR deletion is `rm -rf /devl/menagerie-data/<user-id>/` — auditable and complete
- The app can be tested locally with `CLOUD_MODE=false` without any Postgres dependency
The Postgres platform DB holds only account metadata (subscriptions, consent, telemetry) — never job search content.

View file

@ -0,0 +1,198 @@
# Cloud Deployment
This page covers operating the Peregrine cloud managed instance at `menagerie.circuitforge.tech/peregrine`.
---
## Architecture Overview
```
Browser → Caddy (bastion) → host:8505 → peregrine-cloud container
┌─────────────────────────┼──────────────────────────┐
│ │ │
cloud_session.py /devl/menagerie-data/ Postgres :5433
(session routing) <user-id>/peregrine/ (platform DB)
staging.db (SQLCipher)
```
Caddy injects the Directus session cookie as `X-CF-Session`. `cloud_session.py` validates the JWT, derives the per-user db path and SQLCipher key, and injects both into `st.session_state`. All downstream DB calls are transparent — the app never knows it's multi-tenant.
---
## Compose File
```bash
# Start
docker compose -f compose.cloud.yml --project-name peregrine-cloud --env-file .env up -d
# Stop
docker compose -f compose.cloud.yml --project-name peregrine-cloud down
# Logs
docker compose -f compose.cloud.yml --project-name peregrine-cloud logs app -f
# Rebuild after code changes
docker compose -f compose.cloud.yml --project-name peregrine-cloud build app
docker compose -f compose.cloud.yml --project-name peregrine-cloud up -d
```
---
## Required Environment Variables
These must be present in `.env` (gitignored) before starting the cloud stack:
| Variable | Description | Where to find |
|----------|-------------|---------------|
| `CLOUD_MODE` | Must be `true` | Hardcoded in compose.cloud.yml |
| `CLOUD_DATA_ROOT` | Host path for per-user data trees | `/devl/menagerie-data` |
| `DIRECTUS_JWT_SECRET` | Directus signing secret — validates session JWTs | `website/.env``DIRECTUS_SECRET` |
| `CF_SERVER_SECRET` | Server secret for SQLCipher key derivation | Generate: `openssl rand -base64 32 \| tr -d '/=+' \| cut -c1-32` |
| `PLATFORM_DB_URL` | Postgres connection string for platform DB | `postgresql://cf_platform:<pass>@host.docker.internal:5433/circuitforge_platform` |
!!! warning "SECRET ROTATION"
`CF_SERVER_SECRET` is used to derive all per-user SQLCipher keys via `HMAC(secret, user_id)`. Rotating this secret renders all existing user databases unreadable. Do not rotate it without a migration plan.
---
## Data Root
User data lives at `/devl/menagerie-data/` on the host, bind-mounted into the container:
```
/devl/menagerie-data/
<directus-user-uuid>/
peregrine/
staging.db ← SQLCipher-encrypted (AES-256)
config/ ← llm.yaml, server.yaml, user.yaml, etc.
data/ ← documents, exports, attachments
```
The directory is created automatically on first login. The SQLCipher key for each user is derived deterministically: `HMAC-SHA256(CF_SERVER_SECRET, user_id)`.
### GDPR / Data deletion
To fully delete a user's data:
```bash
# Remove all content data
rm -rf /devl/menagerie-data/<user-id>/
# Remove platform DB rows (cascades)
docker exec cf-platform-db psql -U cf_platform -d circuitforge_platform \
-c "DELETE FROM subscriptions WHERE user_id = '<user-id>';"
```
---
## Platform Database
The Postgres platform DB runs as `cf-platform-db` in the website compose stack (port 5433 on host).
```bash
# Connect
docker exec cf-platform-db psql -U cf_platform -d circuitforge_platform
# Check tables
\dt
# View telemetry consent for a user
SELECT * FROM telemetry_consent WHERE user_id = '<uuid>';
# View recent usage events
SELECT user_id, event_type, occurred_at FROM usage_events
ORDER BY occurred_at DESC LIMIT 20;
```
The schema is initialised on container start from `platform-db/init.sql` in the website repo.
---
## Telemetry
`app/telemetry.py` is the **only** entry point to `usage_events`. Never write to that table directly.
```python
from app.telemetry import log_usage_event
# Fires in cloud mode only; no-op locally
log_usage_event(user_id, "peregrine", "cover_letter_generated", {"words": 350})
```
Events are blocked if:
1. `telemetry_consent.all_disabled = true` (hard kill switch, overrides all)
2. `telemetry_consent.usage_events_enabled = false`
The user controls both from Settings → 🔒 Privacy.
---
## Backup / Restore (Cloud Mode)
The Settings → 💾 Data tab handles backup/restore transparently. In cloud mode:
- **Export:** the SQLCipher-encrypted DB is decrypted before zipping — the downloaded `.zip` is a portable plain SQLite archive, compatible with any local Docker install.
- **Import:** a plain SQLite backup is re-encrypted with the user's key on restore.
The user's `base_dir` in cloud mode is `get_db_path().parent` (`/devl/menagerie-data/<user-id>/peregrine/`), not the app root.
---
## Routing (Caddy)
`menagerie.circuitforge.tech` in `/devl/caddy-proxy/Caddyfile`:
```caddy
menagerie.circuitforge.tech {
encode gzip zstd
handle /peregrine* {
reverse_proxy http://host.docker.internal:8505 {
header_up X-CF-Session {header.Cookie}
}
}
handle {
respond "This app is not yet available in the managed cloud — check back soon." 503
}
log {
output file /data/logs/menagerie.circuitforge.tech.log
format json
}
}
```
`header_up X-CF-Session {header.Cookie}` passes the full cookie header so `cloud_session.py` can extract the Directus session token.
!!! note "Caddy inode gotcha"
After editing the Caddyfile, run `docker restart caddy-proxy` — not `caddy reload`. The Edit tool creates a new inode; Docker bind mounts pin to the original inode and `caddy reload` re-reads the stale one.
---
## Demo Instance
The public demo at `demo.circuitforge.tech/peregrine` runs separately:
```bash
# Start demo
docker compose -f compose.demo.yml --project-name peregrine-demo up -d
# Rebuild after code changes
docker compose -f compose.demo.yml --project-name peregrine-demo build app
docker compose -f compose.demo.yml --project-name peregrine-demo up -d
```
`DEMO_MODE=true` blocks all LLM inference calls at `llm_router.py`. Discovery, job enrichment, and the UI work normally. Demo data lives in `demo/config/` and `demo/data/` — isolated from personal data.
---
## Adding a New App to the Cloud
To onboard a new menagerie app (e.g. `falcon`) to the cloud:
1. Add `resolve_session("falcon")` at the top of each page (calls `cloud_session.py` with the app slug)
2. Replace `DEFAULT_DB` references with `get_db_path()`
3. Add `app/telemetry.py` import and `log_usage_event()` calls at key action points
4. Create `compose.cloud.yml` following the Peregrine pattern (port, `CLOUD_MODE=true`, data mount)
5. Add a Caddy `handle /falcon*` block in `menagerie.circuitforge.tech`, routing to the new port
6. `cloud_session.py` automatically creates `<data_root>/<user-id>/falcon/` on first login

View file

@ -1,9 +1,9 @@
site_name: Peregrine
site_description: AI-powered job search pipeline
site_author: Circuit Forge LLC
site_url: https://docs.circuitforge.io/peregrine
repo_url: https://git.circuitforge.io/circuitforge/peregrine
repo_name: circuitforge/peregrine
site_url: https://docs.circuitforge.tech/peregrine
repo_url: https://git.opensourcesolarpunk.com/pyr0ball/peregrine
repo_name: pyr0ball/peregrine
theme:
name: material
@ -32,7 +32,11 @@ theme:
markdown_extensions:
- admonition
- pymdownx.details
- pymdownx.superfences
- pymdownx.superfences:
custom_fences:
- name: mermaid
class: mermaid
format: !!python/name:pymdownx.superfences.fence_code_format
- pymdownx.highlight:
anchor_linenums: true
- pymdownx.tabbed:
@ -58,6 +62,7 @@ nav:
- Developer Guide:
- Contributing: developer-guide/contributing.md
- Architecture: developer-guide/architecture.md
- Cloud Deployment: developer-guide/cloud-deployment.md
- Adding a Scraper: developer-guide/adding-scrapers.md
- Adding an Integration: developer-guide/adding-integrations.md
- Testing: developer-guide/testing.md

View file

@ -54,6 +54,10 @@ python-dotenv
# ── Auth / licensing ──────────────────────────────────────────────────────
PyJWT>=2.8
pysqlcipher3
# ── Cloud / telemetry ─────────────────────────────────────────────────────────
psycopg2-binary
# ── Utilities ─────────────────────────────────────────────────────────────
sqlalchemy

View file

@ -4,6 +4,16 @@ Creates a portable zip of all gitignored configs + optionally the staging DB.
Intended for: machine migrations, Docker volume transfers, and safe wizard testing.
Supports both the Peregrine Docker instance and the legacy /devl/job-seeker install.
Cloud mode notes
----------------
In cloud mode (CLOUD_MODE=true), the staging DB is SQLCipher-encrypted.
Pass the per-user ``db_key`` to ``create_backup()`` to have it transparently
decrypt the DB before archiving producing a portable, plain SQLite file
that works with any local Docker install.
Pass the same ``db_key`` to ``restore_backup()`` and it will re-encrypt the
plain DB on its way in, so the cloud app can open it normally.
Usage (CLI):
conda run -n job-seeker python scripts/backup.py --create backup.zip
conda run -n job-seeker python scripts/backup.py --create backup.zip --no-db
@ -21,6 +31,8 @@ from __future__ import annotations
import io
import json
import os
import tempfile
import zipfile
from datetime import datetime
from pathlib import Path
@ -62,6 +74,63 @@ _DB_CANDIDATES = ["data/staging.db", "staging.db"]
_MANIFEST_NAME = "backup-manifest.json"
# ---------------------------------------------------------------------------
# SQLCipher helpers (cloud mode only — only called when db_key is set)
# ---------------------------------------------------------------------------
def _decrypt_db_to_bytes(db_path: Path, db_key: str) -> bytes:
"""Open a SQLCipher-encrypted DB and return plain SQLite bytes.
Uses SQLCipher's ATTACH + sqlcipher_export() to produce a portable
unencrypted copy. Only called in cloud mode (db_key non-empty).
pysqlcipher3 is available in the Docker image (Dockerfile installs
libsqlcipher-dev); never called in local-mode tests.
"""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
tmp_path = tmp.name
try:
from pysqlcipher3 import dbapi2 as _sqlcipher # type: ignore[import]
conn = _sqlcipher.connect(str(db_path))
conn.execute(f"PRAGMA key='{db_key}'")
conn.execute(f"ATTACH DATABASE '{tmp_path}' AS plaintext KEY ''")
conn.execute("SELECT sqlcipher_export('plaintext')")
conn.execute("DETACH DATABASE plaintext")
conn.close()
return Path(tmp_path).read_bytes()
finally:
try:
os.unlink(tmp_path)
except Exception:
pass
def _encrypt_db_from_bytes(plain_bytes: bytes, dest_path: Path, db_key: str) -> None:
"""Write plain SQLite bytes as a SQLCipher-encrypted DB at dest_path.
Used on restore in cloud mode to convert a portable plain backup into
the per-user encrypted format the app expects.
"""
dest_path.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
tmp.write(plain_bytes)
tmp_path = tmp.name
try:
from pysqlcipher3 import dbapi2 as _sqlcipher # type: ignore[import]
# Open the plain DB (empty key = no encryption in SQLCipher)
conn = _sqlcipher.connect(tmp_path)
conn.execute("PRAGMA key=''")
# Attach the encrypted destination and export there
conn.execute(f"ATTACH DATABASE '{dest_path}' AS encrypted KEY '{db_key}'")
conn.execute("SELECT sqlcipher_export('encrypted')")
conn.execute("DETACH DATABASE encrypted")
conn.close()
finally:
try:
os.unlink(tmp_path)
except Exception:
pass
# ---------------------------------------------------------------------------
# Source detection
# ---------------------------------------------------------------------------
@ -90,6 +159,7 @@ def create_backup(
base_dir: Path,
include_db: bool = True,
source_label: str | None = None,
db_key: str = "",
) -> bytes:
"""Return a zip archive as raw bytes.
@ -98,6 +168,9 @@ def create_backup(
include_db: If True, include staging.db in the archive.
source_label: Human-readable instance name stored in the manifest
(e.g. "peregrine", "job-seeker"). Auto-detected if None.
db_key: SQLCipher key for the DB (cloud mode). When set, the DB
is decrypted before archiving so the backup is portable
to any local Docker install.
"""
buf = io.BytesIO()
included: list[str] = []
@ -128,7 +201,12 @@ def create_backup(
for candidate in _DB_CANDIDATES:
p = base_dir / candidate
if p.exists():
zf.write(p, candidate)
if db_key:
# Cloud mode: decrypt to plain SQLite before archiving
plain_bytes = _decrypt_db_to_bytes(p, db_key)
zf.writestr(candidate, plain_bytes)
else:
zf.write(p, candidate)
included.append(candidate)
break
@ -167,6 +245,7 @@ def restore_backup(
base_dir: Path,
include_db: bool = True,
overwrite: bool = True,
db_key: str = "",
) -> dict[str, list[str]]:
"""Extract a backup zip into base_dir.
@ -175,6 +254,9 @@ def restore_backup(
base_dir: Repo root to restore into.
include_db: If False, skip any .db files.
overwrite: If False, skip files that already exist.
db_key: SQLCipher key (cloud mode). When set, any .db file in the
zip (plain SQLite) is re-encrypted on the way in so the
cloud app can open it normally.
Returns:
{"restored": [...], "skipped": [...]}
@ -194,7 +276,12 @@ def restore_backup(
skipped.append(name)
continue
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(zf.read(name))
raw = zf.read(name)
if db_key and name.endswith(".db"):
# Cloud mode: the zip contains plain SQLite — re-encrypt on restore
_encrypt_db_from_bytes(raw, dest, db_key)
else:
dest.write_bytes(raw)
restored.append(name)
return {"restored": restored, "skipped": skipped}

View file

@ -11,6 +11,30 @@ from typing import Optional
DEFAULT_DB = Path(os.environ.get("STAGING_DB", Path(__file__).parent.parent / "staging.db"))
def get_connection(db_path: Path = DEFAULT_DB, key: str = "") -> "sqlite3.Connection":
"""
Open a database connection.
In cloud mode with a key: uses SQLCipher (AES-256 encrypted, API-identical to sqlite3).
Otherwise: vanilla sqlite3.
Args:
db_path: Path to the SQLite/SQLCipher database file.
key: SQLCipher encryption key (hex string). Empty = unencrypted.
"""
import os as _os
cloud_mode = _os.environ.get("CLOUD_MODE", "").lower() in ("1", "true", "yes")
if cloud_mode and key:
from pysqlcipher3 import dbapi2 as _sqlcipher
conn = _sqlcipher.connect(str(db_path))
conn.execute(f"PRAGMA key='{key}'")
return conn
else:
import sqlite3 as _sqlite3
return _sqlite3.connect(str(db_path))
CREATE_JOBS = """
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,

View file

@ -4,11 +4,14 @@ from __future__ import annotations
import json
import zipfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from scripts.backup import (
_decrypt_db_to_bytes,
_detect_source_label,
_encrypt_db_from_bytes,
create_backup,
list_backup_contents,
restore_backup,
@ -229,3 +232,141 @@ class TestDetectSourceLabel:
base = tmp_path / "job-seeker"
base.mkdir()
assert _detect_source_label(base) == "job-seeker"
# ---------------------------------------------------------------------------
# Cloud mode — SQLCipher encrypt / decrypt (pysqlcipher3 mocked)
# ---------------------------------------------------------------------------
class _FakeCursor:
def __enter__(self): return self
def __exit__(self, *a): return False
def execute(self, *a): pass
def fetchone(self): return None
def _make_mock_sqlcipher_conn(plain_bytes: bytes, tmp_path: Path):
"""Return a mock pysqlcipher3 connection that writes plain_bytes to the
first 'ATTACH DATABASE' path it sees (simulating sqlcipher_export)."""
attached: dict = {}
conn = MagicMock()
def fake_execute(sql, *args):
if "ATTACH DATABASE" in sql:
# Extract path between first pair of quotes
parts = sql.split("'")
path = parts[1]
attached["path"] = path
elif "sqlcipher_export" in sql:
# Simulate export: write plain_bytes to the attached path
Path(attached["path"]).write_bytes(plain_bytes)
conn.execute.side_effect = fake_execute
conn.close = MagicMock()
return conn
class TestCloudBackup:
"""Backup/restore with SQLCipher encryption — pysqlcipher3 mocked out."""
def test_create_backup_decrypts_db_when_key_set(self, tmp_path):
"""With db_key, _decrypt_db_to_bytes is called and plain bytes go into zip."""
base = _make_instance(tmp_path, "cloud-user")
plain_db = b"SQLite format 3\x00plain-content"
with patch("scripts.backup._decrypt_db_to_bytes", return_value=plain_db) as mock_dec:
data = create_backup(base, include_db=True, db_key="testkey")
mock_dec.assert_called_once()
# The zip should contain the plain bytes, not the raw encrypted file
with zipfile.ZipFile(__import__("io").BytesIO(data)) as zf:
db_files = [n for n in zf.namelist() if n.endswith(".db")]
assert len(db_files) == 1
assert zf.read(db_files[0]) == plain_db
def test_create_backup_no_key_reads_file_directly(self, tmp_path):
"""Without db_key, _decrypt_db_to_bytes is NOT called."""
base = _make_instance(tmp_path, "local-user")
with patch("scripts.backup._decrypt_db_to_bytes") as mock_dec:
create_backup(base, include_db=True, db_key="")
mock_dec.assert_not_called()
def test_restore_backup_encrypts_db_when_key_set(self, tmp_path):
"""With db_key, _encrypt_db_from_bytes is called for .db files."""
src = _make_instance(tmp_path, "cloud-src")
dst = tmp_path / "cloud-dst"
dst.mkdir()
plain_db = b"SQLite format 3\x00plain-content"
# Create a backup with plain DB bytes
with patch("scripts.backup._decrypt_db_to_bytes", return_value=plain_db):
data = create_backup(src, include_db=True, db_key="testkey")
with patch("scripts.backup._encrypt_db_from_bytes") as mock_enc:
restore_backup(data, dst, include_db=True, db_key="testkey")
mock_enc.assert_called_once()
call_args = mock_enc.call_args
assert call_args[0][0] == plain_db # plain_bytes
assert call_args[0][2] == "testkey" # db_key
def test_restore_backup_no_key_writes_file_directly(self, tmp_path):
"""Without db_key, _encrypt_db_from_bytes is NOT called."""
src = _make_instance(tmp_path, "local-src")
dst = tmp_path / "local-dst"
dst.mkdir()
data = create_backup(src, include_db=True, db_key="")
with patch("scripts.backup._encrypt_db_from_bytes") as mock_enc:
restore_backup(data, dst, include_db=True, db_key="")
mock_enc.assert_not_called()
def test_decrypt_db_to_bytes_calls_sqlcipher(self, tmp_path):
"""_decrypt_db_to_bytes imports pysqlcipher3.dbapi2 and calls sqlcipher_export."""
fake_db = tmp_path / "staging.db"
fake_db.write_bytes(b"encrypted")
plain_bytes = b"SQLite format 3\x00"
mock_conn = _make_mock_sqlcipher_conn(plain_bytes, tmp_path)
mock_module = MagicMock()
mock_module.connect.return_value = mock_conn
# Must set dbapi2 explicitly on the package mock so `from pysqlcipher3 import
# dbapi2` resolves to mock_module (not a new auto-created MagicMock attr).
mock_pkg = MagicMock()
mock_pkg.dbapi2 = mock_module
with patch.dict("sys.modules", {"pysqlcipher3": mock_pkg, "pysqlcipher3.dbapi2": mock_module}):
result = _decrypt_db_to_bytes(fake_db, "testkey")
mock_module.connect.assert_called_once_with(str(fake_db))
assert result == plain_bytes
def test_encrypt_db_from_bytes_calls_sqlcipher(self, tmp_path):
"""_encrypt_db_from_bytes imports pysqlcipher3.dbapi2 and calls sqlcipher_export."""
dest = tmp_path / "staging.db"
plain_bytes = b"SQLite format 3\x00"
mock_conn = MagicMock()
mock_module = MagicMock()
mock_module.connect.return_value = mock_conn
mock_pkg = MagicMock()
mock_pkg.dbapi2 = mock_module
with patch.dict("sys.modules", {"pysqlcipher3": mock_pkg, "pysqlcipher3.dbapi2": mock_module}):
_encrypt_db_from_bytes(plain_bytes, dest, "testkey")
mock_module.connect.assert_called_once()
# Verify ATTACH DATABASE call included the dest path and key
attach_calls = [
call for call in mock_conn.execute.call_args_list
if "ATTACH DATABASE" in str(call)
]
assert len(attach_calls) == 1
assert str(dest) in str(attach_calls[0])
assert "testkey" in str(attach_calls[0])

View file

@ -0,0 +1,96 @@
import pytest
import os
from unittest.mock import patch, MagicMock
from pathlib import Path
def test_resolve_session_is_noop_in_local_mode(monkeypatch):
"""resolve_session() does nothing when CLOUD_MODE is not set."""
monkeypatch.delenv("CLOUD_MODE", raising=False)
# Must reimport after env change
import importlib
import app.cloud_session as cs
importlib.reload(cs)
# Should return without touching st
cs.resolve_session("peregrine") # no error = pass
def test_resolve_session_sets_db_path(tmp_path, monkeypatch):
"""resolve_session() sets st.session_state.db_path from a valid JWT."""
monkeypatch.setenv("CLOUD_MODE", "true")
import importlib
import app.cloud_session as cs
importlib.reload(cs)
mock_state = {}
with patch.object(cs, "validate_session_jwt", return_value="user-uuid-123"), \
patch.object(cs, "st") as mock_st, \
patch.object(cs, "CLOUD_DATA_ROOT", tmp_path):
mock_st.session_state = mock_state
mock_st.context.headers = {"x-cf-session": "cf_session=valid.jwt.token"}
cs.resolve_session("peregrine")
assert mock_state["user_id"] == "user-uuid-123"
assert mock_state["db_path"] == tmp_path / "user-uuid-123" / "peregrine" / "staging.db"
def test_resolve_session_creates_user_dir(tmp_path, monkeypatch):
"""resolve_session() creates the user data directory on first login."""
monkeypatch.setenv("CLOUD_MODE", "true")
import importlib
import app.cloud_session as cs
importlib.reload(cs)
mock_state = {}
with patch.object(cs, "validate_session_jwt", return_value="new-user"), \
patch.object(cs, "st") as mock_st, \
patch.object(cs, "CLOUD_DATA_ROOT", tmp_path):
mock_st.session_state = mock_state
mock_st.context.headers = {"x-cf-session": "cf_session=valid.jwt.token"}
cs.resolve_session("peregrine")
assert (tmp_path / "new-user" / "peregrine").is_dir()
assert (tmp_path / "new-user" / "peregrine" / "config").is_dir()
assert (tmp_path / "new-user" / "peregrine" / "data").is_dir()
def test_resolve_session_idempotent(monkeypatch):
"""resolve_session() skips if user_id already in session state."""
monkeypatch.setenv("CLOUD_MODE", "true")
import importlib
import app.cloud_session as cs
importlib.reload(cs)
with patch.object(cs, "st") as mock_st:
mock_st.session_state = {"user_id": "existing-user"}
# Should not try to read headers or validate JWT
cs.resolve_session("peregrine")
# context.headers should never be accessed
mock_st.context.headers.__getitem__.assert_not_called() if hasattr(mock_st.context, 'headers') else None
def test_get_db_path_returns_session_path(tmp_path, monkeypatch):
"""get_db_path() returns session-scoped path when set."""
import importlib
import app.cloud_session as cs
importlib.reload(cs)
session_db = tmp_path / "staging.db"
with patch.object(cs, "st") as mock_st:
mock_st.session_state = {"db_path": session_db}
result = cs.get_db_path()
assert result == session_db
def test_get_db_path_falls_back_to_default(monkeypatch):
"""get_db_path() returns DEFAULT_DB when no session path set."""
monkeypatch.delenv("CLOUD_MODE", raising=False)
import importlib
import app.cloud_session as cs
importlib.reload(cs)
from scripts.db import DEFAULT_DB
with patch.object(cs, "st") as mock_st:
mock_st.session_state = {}
result = cs.get_db_path()
assert result == DEFAULT_DB

View file

@ -576,3 +576,17 @@ def test_insert_task_with_params(tmp_path):
params2 = json.dumps({"section": "job_titles"})
task_id3, is_new3 = insert_task(db, "wizard_generate", 0, params=params2)
assert is_new3 is True
def test_get_connection_local_mode(tmp_path):
"""get_connection() returns a working sqlite3 connection in local mode (no key)."""
from scripts.db import get_connection
db = tmp_path / "test_conn.db"
conn = get_connection(db)
conn.execute("CREATE TABLE t (x INTEGER)")
conn.execute("INSERT INTO t VALUES (42)")
conn.commit()
result = conn.execute("SELECT x FROM t").fetchone()
conn.close()
assert result[0] == 42
assert db.exists()

85
tests/test_telemetry.py Normal file
View file

@ -0,0 +1,85 @@
import pytest
import os
from unittest.mock import patch, MagicMock, call
def test_no_op_in_local_mode(monkeypatch):
"""log_usage_event() is completely silent when CLOUD_MODE is not set."""
monkeypatch.delenv("CLOUD_MODE", raising=False)
import importlib
import app.telemetry as tel
importlib.reload(tel)
# Should not raise, should not touch anything
tel.log_usage_event("user-1", "peregrine", "any_event")
def test_event_not_logged_when_all_disabled(monkeypatch):
"""No DB write when telemetry all_disabled is True."""
monkeypatch.setenv("CLOUD_MODE", "true")
import importlib
import app.telemetry as tel
importlib.reload(tel)
mock_conn = MagicMock()
mock_cursor = MagicMock()
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
with patch.object(tel, "get_platform_conn", return_value=mock_conn), \
patch.object(tel, "get_consent", return_value={"all_disabled": True, "usage_events_enabled": True}):
tel.log_usage_event("user-1", "peregrine", "cover_letter_generated")
mock_cursor.execute.assert_not_called()
def test_event_not_logged_when_usage_events_disabled(monkeypatch):
"""No DB write when usage_events_enabled is False."""
monkeypatch.setenv("CLOUD_MODE", "true")
import importlib
import app.telemetry as tel
importlib.reload(tel)
mock_conn = MagicMock()
mock_cursor = MagicMock()
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
with patch.object(tel, "get_platform_conn", return_value=mock_conn), \
patch.object(tel, "get_consent", return_value={"all_disabled": False, "usage_events_enabled": False}):
tel.log_usage_event("user-1", "peregrine", "cover_letter_generated")
mock_cursor.execute.assert_not_called()
def test_event_logged_when_consent_given(monkeypatch):
"""Usage event is written to usage_events table when consent is given."""
monkeypatch.setenv("CLOUD_MODE", "true")
import importlib
import app.telemetry as tel
importlib.reload(tel)
mock_conn = MagicMock()
mock_cursor = MagicMock()
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
with patch.object(tel, "get_platform_conn", return_value=mock_conn), \
patch.object(tel, "get_consent", return_value={"all_disabled": False, "usage_events_enabled": True}):
tel.log_usage_event("user-1", "peregrine", "cover_letter_generated", {"words": 350})
mock_cursor.execute.assert_called_once()
sql = mock_cursor.execute.call_args[0][0]
assert "usage_events" in sql
mock_conn.commit.assert_called_once()
def test_telemetry_never_crashes_app(monkeypatch):
"""log_usage_event() swallows all exceptions — must never crash the app."""
monkeypatch.setenv("CLOUD_MODE", "true")
import importlib
import app.telemetry as tel
importlib.reload(tel)
with patch.object(tel, "get_platform_conn", side_effect=Exception("DB down")):
# Should not raise
tel.log_usage_event("user-1", "peregrine", "any_event")