feat(demo): add IS_DEMO write-block guard on mutating endpoints

This commit is contained in:
pyr0ball 2026-04-15 23:02:10 -07:00
parent cd5bd80a2a
commit 854b18ecbd
3 changed files with 114 additions and 2 deletions

View file

@ -46,6 +46,7 @@ DB_PATH = os.environ.get("STAGING_DB", "/devl/job-seeker/staging.db")
_CLOUD_MODE = os.environ.get("CLOUD_MODE", "").lower() in ("1", "true")
_CLOUD_DATA_ROOT = Path(os.environ.get("CLOUD_DATA_ROOT", "/devl/menagerie-data"))
_DIRECTUS_SECRET = os.environ.get("DIRECTUS_JWT_SECRET", "")
IS_DEMO: bool = os.environ.get("DEMO_MODE", "").lower() in ("1", "true", "yes")
# Per-request DB path — set by cloud_session_middleware; falls back to DB_PATH
_request_db: ContextVar[str | None] = ContextVar("_request_db", default=None)
@ -81,6 +82,12 @@ app.include_router(_feedback_router, prefix="/api/feedback")
_log = logging.getLogger("peregrine.session")
def _demo_guard() -> None:
"""Raise 403 if running in demo mode. Call at the top of any write endpoint."""
if IS_DEMO:
raise HTTPException(status_code=403, detail="demo-write-blocked")
def _resolve_cf_user_id(cookie_str: str) -> str | None:
"""Extract cf_session JWT from Cookie string and return Directus user_id.
@ -296,6 +303,7 @@ def job_counts():
@app.post("/api/jobs/{job_id}/approve")
def approve_job(job_id: int):
_demo_guard()
db = _get_db()
db.execute("UPDATE jobs SET status = 'approved' WHERE id = ?", (job_id,))
db.commit()
@ -307,6 +315,7 @@ def approve_job(job_id: int):
@app.post("/api/jobs/{job_id}/reject")
def reject_job(job_id: int):
_demo_guard()
db = _get_db()
db.execute("UPDATE jobs SET status = 'rejected' WHERE id = ?", (job_id,))
db.commit()
@ -396,6 +405,7 @@ def save_cover_letter(job_id: int, body: CoverLetterBody):
@app.post("/api/jobs/{job_id}/cover_letter/generate")
def generate_cover_letter(job_id: int):
_demo_guard()
try:
from scripts.task_runner import submit_task
task_id, is_new = submit_task(
@ -1520,6 +1530,7 @@ class HiredFeedbackPayload(BaseModel):
@app.post("/api/jobs/{job_id}/hired-feedback")
def save_hired_feedback(job_id: int, payload: HiredFeedbackPayload):
_demo_guard()
db = _get_db()
row = db.execute("SELECT status FROM jobs WHERE id = ?", (job_id,)).fetchone()
if not row:

89
tests/test_demo_guard.py Normal file
View file

@ -0,0 +1,89 @@
"""IS_DEMO write-block guard tests."""
import importlib
import os
import sqlite3
import pytest
from fastapi.testclient import TestClient
_SCHEMA = """
CREATE TABLE jobs (
id INTEGER PRIMARY KEY, title TEXT, company TEXT, url TEXT,
location TEXT, is_remote INTEGER DEFAULT 0, salary TEXT,
match_score REAL, keyword_gaps TEXT, status TEXT DEFAULT 'pending',
date_found TEXT, cover_letter TEXT, interview_date TEXT,
rejection_stage TEXT, applied_at TEXT, phone_screen_at TEXT,
interviewing_at TEXT, offer_at TEXT, hired_at TEXT,
survey_at TEXT, date_posted TEXT, hired_feedback TEXT
);
CREATE TABLE background_tasks (
id INTEGER PRIMARY KEY, task_type TEXT, job_id INTEGER,
status TEXT DEFAULT 'queued', finished_at TEXT
);
"""
def _make_db(path: str) -> None:
con = sqlite3.connect(path)
con.executescript(_SCHEMA)
con.execute(
"INSERT INTO jobs (id, title, company, url, status) VALUES (1,'UX Designer','Spotify','https://ex.com/1','pending')"
)
con.execute(
"INSERT INTO jobs (id, title, company, url, status) VALUES (2,'Designer','Figma','https://ex.com/2','hired')"
)
con.commit()
con.close()
@pytest.fixture()
def demo_client(tmp_path, monkeypatch):
db_path = str(tmp_path / "staging.db")
_make_db(db_path)
monkeypatch.setenv("DEMO_MODE", "true")
monkeypatch.setenv("STAGING_DB", db_path)
import dev_api
importlib.reload(dev_api)
return TestClient(dev_api.app)
@pytest.fixture()
def normal_client(tmp_path, monkeypatch):
db_path = str(tmp_path / "staging.db")
_make_db(db_path)
monkeypatch.delenv("DEMO_MODE", raising=False)
monkeypatch.setenv("STAGING_DB", db_path)
import dev_api
importlib.reload(dev_api)
return TestClient(dev_api.app)
class TestDemoWriteBlock:
def test_approve_blocked_in_demo(self, demo_client):
r = demo_client.post("/api/jobs/1/approve")
assert r.status_code == 403
assert r.json()["detail"] == "demo-write-blocked"
def test_reject_blocked_in_demo(self, demo_client):
r = demo_client.post("/api/jobs/1/reject")
assert r.status_code == 403
assert r.json()["detail"] == "demo-write-blocked"
def test_cover_letter_generate_blocked_in_demo(self, demo_client):
r = demo_client.post("/api/jobs/1/cover_letter/generate")
assert r.status_code == 403
assert r.json()["detail"] == "demo-write-blocked"
def test_hired_feedback_blocked_in_demo(self, demo_client):
r = demo_client.post("/api/jobs/2/hired-feedback", json={"factors": [], "notes": ""})
assert r.status_code == 403
assert r.json()["detail"] == "demo-write-blocked"
def test_approve_allowed_in_normal_mode(self, normal_client):
r = normal_client.post("/api/jobs/1/approve")
assert r.status_code != 403
def test_config_reports_is_demo_true(self, demo_client):
r = demo_client.get("/api/config/app")
assert r.status_code == 200
assert r.json()["isDemo"] is True

View file

@ -1,3 +1,5 @@
import { showToast } from './useToast'
export type ApiError =
| { kind: 'network'; message: string }
| { kind: 'http'; status: number; detail: string }
@ -12,8 +14,18 @@ export async function useApiFetch<T>(
try {
const res = await fetch(_apiBase + url, opts)
if (!res.ok) {
const detail = await res.text().catch(() => '')
return { data: null, error: { kind: 'http', status: res.status, detail } }
const rawText = await res.text().catch(() => '')
// Demo mode: show toast and swallow the error so callers don't need to handle it
if (res.status === 403) {
try {
const body = JSON.parse(rawText) as { detail?: string }
if (body.detail === 'demo-write-blocked') {
showToast('Demo mode — sign in to save changes')
return { data: null, error: null }
}
} catch { /* not JSON — fall through to normal error */ }
}
return { data: null, error: { kind: 'http', status: res.status, detail: rawText } }
}
const data = await res.json() as T
return { data, error: null }