feat: merge feedback-button branch — BYOK warning, PII scrub, LLM suggest, sidebar indicator

Key changes in this branch:
- BYOK cloud backend detection (scripts/byok_guard.py) with full test coverage
- Sidebar amber badge when any cloud LLM backend is active
- Activation warning + acknowledgment required when enabling cloud backend in Settings
- Privacy policy reference doc added
- Suggest search terms, resume keywords, and LLM suggest button in Settings
- Test suite anonymized: real personal data replaced with fictional Alex Rivera
- Full PII scrub from git history (name, email, phone number)
- Digest email parser design doc
- Settings widget crash fixes, Docker service controls, backup/restore script
This commit is contained in:
pyr0ball 2026-03-06 16:01:44 -08:00
commit 3fae4de3ad
21 changed files with 2158 additions and 76 deletions

View file

@ -20,3 +20,10 @@ OLLAMA_DEFAULT_MODEL=llama3.2:3b
ANTHROPIC_API_KEY=
OPENAI_COMPAT_URL=
OPENAI_COMPAT_KEY=
# Feedback button — Forgejo issue filing
FORGEJO_API_TOKEN=
FORGEJO_REPO=pyr0ball/peregrine
FORGEJO_API_URL=https://git.opensourcesolarpunk.com/api/v1
# GITHUB_TOKEN= # future — enable when public mirror is active
# GITHUB_REPO= # future

7
PRIVACY.md Normal file
View file

@ -0,0 +1,7 @@
# Privacy Policy
CircuitForge LLC's privacy policy applies to this product and is published at:
**<https://circuitforge.tech/privacy>**
Last reviewed: March 2026.

View file

@ -25,17 +25,45 @@ from scripts.task_runner import submit_task
init_db(DEFAULT_DB)
def _email_configured() -> bool:
_e = Path(__file__).parent.parent / "config" / "email.yaml"
if not _e.exists():
return False
import yaml as _yaml
_cfg = _yaml.safe_load(_e.read_text()) or {}
return bool(_cfg.get("username") or _cfg.get("user") or _cfg.get("imap_host"))
def _notion_configured() -> bool:
_n = Path(__file__).parent.parent / "config" / "notion.yaml"
if not _n.exists():
return False
import yaml as _yaml
_cfg = _yaml.safe_load(_n.read_text()) or {}
return bool(_cfg.get("token"))
def _keywords_configured() -> bool:
_k = Path(__file__).parent.parent / "config" / "resume_keywords.yaml"
if not _k.exists():
return False
import yaml as _yaml
_cfg = _yaml.safe_load(_k.read_text()) or {}
return bool(_cfg.get("keywords") or _cfg.get("required") or _cfg.get("preferred"))
_SETUP_BANNERS = [
{"key": "connect_cloud", "text": "Connect a cloud service for resume/cover letter storage",
"link_label": "Settings → Integrations"},
"link_label": "Settings → Integrations",
"done": _notion_configured},
{"key": "setup_email", "text": "Set up email sync to catch recruiter outreach",
"link_label": "Settings → Email"},
"link_label": "Settings → Email",
"done": _email_configured},
{"key": "setup_email_labels", "text": "Set up email label filters for auto-classification",
"link_label": "Settings → Email (label guide)"},
"link_label": "Settings → Email (label guide)",
"done": _email_configured},
{"key": "tune_mission", "text": "Tune your mission preferences for better cover letters",
"link_label": "Settings → My Profile"},
{"key": "configure_keywords", "text": "Configure keywords and blocklist for smarter search",
"link_label": "Settings → Search"},
"link_label": "Settings → Search",
"done": _keywords_configured},
{"key": "upload_corpus", "text": "Upload your cover letter corpus for voice fine-tuning",
"link_label": "Settings → Fine-Tune"},
{"key": "configure_linkedin", "text": "Configure LinkedIn Easy Apply automation",
@ -513,7 +541,10 @@ with st.expander("⚠️ Danger Zone", expanded=False):
# ── Setup banners ─────────────────────────────────────────────────────────────
if _profile and _profile.wizard_complete:
_dismissed = set(_profile.dismissed_banners)
_pending_banners = [b for b in _SETUP_BANNERS if b["key"] not in _dismissed]
_pending_banners = [
b for b in _SETUP_BANNERS
if b["key"] not in _dismissed and not b.get("done", lambda: False)()
]
if _pending_banners:
st.divider()
st.markdown("#### Finish setting up Peregrine")

View file

@ -21,6 +21,7 @@ IS_DEMO = os.environ.get("DEMO_MODE", "").lower() in ("1", "true", "yes")
import streamlit as st
from scripts.db import DEFAULT_DB, init_db, get_active_tasks
from app.feedback import inject_feedback_button
import sqlite3
st.set_page_config(
@ -162,7 +163,27 @@ with st.sidebar:
icon="🔒",
)
_task_indicator()
# Cloud LLM indicator — shown whenever any cloud backend is active
_llm_cfg_path = Path(__file__).parent.parent / "config" / "llm.yaml"
try:
import yaml as _yaml
from scripts.byok_guard import cloud_backends as _cloud_backends
_active_cloud = _cloud_backends(_yaml.safe_load(_llm_cfg_path.read_text(encoding="utf-8")) or {})
except Exception:
_active_cloud = []
if _active_cloud:
_provider_names = ", ".join(b.replace("_", " ").title() for b in _active_cloud)
st.warning(
f"**Cloud LLM active**\n\n"
f"{_provider_names}\n\n"
"AI features send content to this provider. "
"[Change in Settings](2_Settings)",
icon="🔓",
)
st.divider()
st.caption(f"Peregrine {_get_version()}")
inject_feedback_button(page=pg.title)
pg.run()

View file

@ -0,0 +1,31 @@
"""
Paste-from-clipboard / drag-and-drop image component.
Uses st.components.v1.declare_component so JS can return image bytes to Python
(st.components.v1.html() is one-way only). No build step required the
frontend is a single index.html file.
"""
from __future__ import annotations
import base64
from pathlib import Path
import streamlit.components.v1 as components
_FRONTEND = Path(__file__).parent / "paste_image_ui"
_paste_image = components.declare_component("paste_image", path=str(_FRONTEND))
def paste_image_component(key: str | None = None) -> bytes | None:
"""
Render the paste/drop zone. Returns PNG/JPEG bytes when an image is
pasted or dropped, or None if nothing has been submitted yet.
"""
result = _paste_image(key=key)
if result:
try:
return base64.b64decode(result)
except Exception:
return None
return None

View file

@ -0,0 +1,142 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: -apple-system, BlinkMacSystemFont, "Source Sans Pro", sans-serif;
background: transparent;
}
.zone {
width: 100%;
min-height: 72px;
border: 2px dashed var(--border, #ccc);
border-radius: 8px;
display: flex;
align-items: center;
justify-content: center;
flex-direction: column;
gap: 6px;
padding: 12px 16px;
cursor: pointer;
outline: none;
transition: border-color 0.15s, background 0.15s;
color: var(--text-muted, #888);
font-size: 13px;
text-align: center;
user-select: none;
}
.zone:focus { border-color: var(--primary, #ff4b4b); background: var(--primary-faint, rgba(255,75,75,0.06)); }
.zone.dragover { border-color: var(--primary, #ff4b4b); background: var(--primary-faint, rgba(255,75,75,0.06)); }
.zone.done { border-style: solid; border-color: #00c853; color: #00c853; }
.icon { font-size: 22px; line-height: 1; }
.hint { font-size: 11px; opacity: 0.7; }
.status { margin-top: 5px; font-size: 11px; text-align: center; color: var(--text-muted, #888); min-height: 16px; }
</style>
</head>
<body>
<div class="zone" id="zone" tabindex="0" role="button"
aria-label="Click to focus, then paste with Ctrl+V, or drag and drop an image">
<span class="icon">📋</span>
<span id="mainMsg"><strong>Click here</strong>, then <strong>Ctrl+V</strong> to paste</span>
<span class="hint" id="hint">or drag &amp; drop an image file</span>
</div>
<div class="status" id="status"></div>
<script>
const zone = document.getElementById('zone');
const status = document.getElementById('status');
const mainMsg = document.getElementById('mainMsg');
const hint = document.getElementById('hint');
// ── Streamlit handshake ─────────────────────────────────────────────────
window.parent.postMessage({ type: "streamlit:componentReady", apiVersion: 1 }, "*");
function setHeight() {
const h = document.body.scrollHeight + 4;
window.parent.postMessage({ type: "streamlit:setFrameHeight", height: h }, "*");
}
setHeight();
// ── Theme ───────────────────────────────────────────────────────────────
window.addEventListener("message", (e) => {
if (e.data && e.data.type === "streamlit:render") {
const t = e.data.args && e.data.args.theme;
if (!t) return;
const r = document.documentElement;
r.style.setProperty("--primary", t.primaryColor || "#ff4b4b");
r.style.setProperty("--primary-faint", (t.primaryColor || "#ff4b4b") + "10");
r.style.setProperty("--text-muted", t.textColor ? t.textColor + "99" : "#888");
r.style.setProperty("--border", t.textColor ? t.textColor + "33" : "#ccc");
document.body.style.background = t.backgroundColor || "transparent";
}
});
// ── Image handling ──────────────────────────────────────────────────────
function markDone() {
zone.classList.add('done');
// Clear children and rebuild with safe DOM methods
while (zone.firstChild) zone.removeChild(zone.firstChild);
const icon = document.createElement('span');
icon.className = 'icon';
icon.textContent = '\u2705';
const msg = document.createElement('span');
msg.textContent = 'Image ready \u2014 remove or replace below';
zone.appendChild(icon);
zone.appendChild(msg);
setHeight();
}
function sendImage(blob) {
const reader = new FileReader();
reader.onload = function(ev) {
const dataUrl = ev.target.result;
const b64 = dataUrl.slice(dataUrl.indexOf(',') + 1);
window.parent.postMessage({ type: "streamlit:setComponentValue", value: b64 }, "*");
markDone();
};
reader.readAsDataURL(blob);
}
function findImageItem(items) {
if (!items) return null;
for (let i = 0; i < items.length; i++) {
if (items[i].type && items[i].type.indexOf('image/') === 0) return items[i];
}
return null;
}
// Ctrl+V paste (works over HTTP — uses paste event, not Clipboard API)
document.addEventListener('paste', function(e) {
const item = findImageItem(e.clipboardData && e.clipboardData.items);
if (item) { sendImage(item.getAsFile()); e.preventDefault(); }
});
// Drag and drop
zone.addEventListener('dragover', function(e) {
e.preventDefault();
zone.classList.add('dragover');
});
zone.addEventListener('dragleave', function() {
zone.classList.remove('dragover');
});
zone.addEventListener('drop', function(e) {
e.preventDefault();
zone.classList.remove('dragover');
const files = e.dataTransfer && e.dataTransfer.files;
if (files && files.length) {
for (let i = 0; i < files.length; i++) {
if (files[i].type.indexOf('image/') === 0) { sendImage(files[i]); return; }
}
}
// Fallback: dataTransfer items (e.g. dragged from browser)
const item = findImageItem(e.dataTransfer && e.dataTransfer.items);
if (item) sendImage(item.getAsFile());
});
// Click to focus so Ctrl+V lands in this iframe
zone.addEventListener('click', function() { zone.focus(); });
</script>
</body>
</html>

247
app/feedback.py Normal file
View file

@ -0,0 +1,247 @@
"""
Floating feedback button + dialog thin Streamlit shell.
All business logic lives in scripts/feedback_api.py.
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
import streamlit as st
# ── CSS: float the button to the bottom-right corner ─────────────────────────
# Targets the button by its aria-label (set via `help=` parameter).
_FLOAT_CSS = """
<style>
button[aria-label="Send feedback or report a bug"] {
position: fixed !important;
bottom: 2rem !important;
right: 2rem !important;
z-index: 9999 !important;
border-radius: 25px !important;
padding: 0.5rem 1.25rem !important;
box-shadow: 0 4px 16px rgba(0,0,0,0.25) !important;
font-size: 0.9rem !important;
}
</style>
"""
@st.dialog("Send Feedback", width="large")
def _feedback_dialog(page: str) -> None:
"""Two-step feedback dialog: form → consent/attachments → submit."""
from scripts.feedback_api import (
collect_context, collect_logs, collect_listings,
build_issue_body, create_forgejo_issue, upload_attachment,
)
from scripts.db import DEFAULT_DB
# ── Initialise step counter ───────────────────────────────────────────────
if "fb_step" not in st.session_state:
st.session_state.fb_step = 1
# ═════════════════════════════════════════════════════════════════════════
# STEP 1 — Form
# ═════════════════════════════════════════════════════════════════════════
if st.session_state.fb_step == 1:
st.subheader("What's on your mind?")
fb_type = st.selectbox(
"Type", ["Bug", "Feature Request", "Other"], key="fb_type"
)
fb_title = st.text_input(
"Title", placeholder="Short summary of the issue or idea", key="fb_title"
)
fb_desc = st.text_area(
"Description",
placeholder="Describe what happened or what you'd like to see...",
key="fb_desc",
)
if fb_type == "Bug":
st.text_area(
"Reproduction steps",
placeholder="1. Go to...\n2. Click...\n3. See error",
key="fb_repro",
)
col_cancel, _, col_next = st.columns([1, 3, 1])
with col_cancel:
if st.button("Cancel"):
_clear_feedback_state()
st.rerun() # intentionally closes the dialog
with col_next:
if st.button("Next →", type="primary"):
# Read widget values NOW (same rerun as the click — values are
# available here even on first click). Copy to non-widget keys
# so they survive step 2's render (Streamlit removes widget
# state for widgets that are no longer rendered).
title = fb_title.strip()
desc = fb_desc.strip()
if not title or not desc:
st.error("Please fill in both Title and Description.")
else:
st.session_state.fb_data_type = fb_type
st.session_state.fb_data_title = title
st.session_state.fb_data_desc = desc
st.session_state.fb_data_repro = st.session_state.get("fb_repro", "")
st.session_state.fb_step = 2
# ═════════════════════════════════════════════════════════════════════════
# STEP 2 — Consent + attachments
# ═════════════════════════════════════════════════════════════════════════
elif st.session_state.fb_step == 2:
st.subheader("Optional: attach diagnostic data")
# ── Diagnostic data toggle + preview ─────────────────────────────────
include_diag = st.toggle(
"Include diagnostic data (logs + recent listings)", key="fb_diag"
)
if include_diag:
with st.expander("Preview what will be sent", expanded=True):
st.caption("**App logs (last 100 lines, PII masked):**")
st.code(collect_logs(100), language=None)
st.caption("**Recent listings (title / company / URL only):**")
for j in collect_listings(DEFAULT_DB, 5):
st.write(f"- {j['title']} @ {j['company']}{j['url']}")
# ── Screenshot ────────────────────────────────────────────────────────
st.divider()
st.caption("**Screenshot** (optional)")
from app.components.paste_image import paste_image_component
# Keyed so we can reset the component when the user removes the image
if "fb_paste_key" not in st.session_state:
st.session_state.fb_paste_key = 0
pasted = paste_image_component(key=f"fb_paste_{st.session_state.fb_paste_key}")
if pasted:
st.session_state.fb_screenshot = pasted
st.caption("or upload a file:")
uploaded = st.file_uploader(
"Upload screenshot",
type=["png", "jpg", "jpeg"],
label_visibility="collapsed",
key="fb_upload",
)
if uploaded:
st.session_state.fb_screenshot = uploaded.read()
if st.session_state.get("fb_screenshot"):
st.image(
st.session_state["fb_screenshot"],
caption="Screenshot preview — this will be attached to the issue",
use_container_width=True,
)
if st.button("🗑 Remove screenshot"):
st.session_state.pop("fb_screenshot", None)
st.session_state.fb_paste_key = st.session_state.get("fb_paste_key", 0) + 1
# no st.rerun() — button click already re-renders the dialog
# ── Attribution consent ───────────────────────────────────────────────
st.divider()
submitter: str | None = None
try:
import yaml
_ROOT = Path(__file__).parent.parent
user = yaml.safe_load((_ROOT / "config" / "user.yaml").read_text()) or {}
name = (user.get("name") or "").strip()
email = (user.get("email") or "").strip()
if name or email:
label = f"Include my name & email in the report: **{name}** ({email})"
if st.checkbox(label, key="fb_attr"):
submitter = f"{name} <{email}>"
except Exception:
pass
# ── Navigation ────────────────────────────────────────────────────────
col_back, _, col_submit = st.columns([1, 3, 2])
with col_back:
if st.button("← Back"):
st.session_state.fb_step = 1
# no st.rerun() — button click already re-renders the dialog
with col_submit:
if st.button("Submit Feedback", type="primary"):
_submit(page, include_diag, submitter, collect_context,
collect_logs, collect_listings, build_issue_body,
create_forgejo_issue, upload_attachment, DEFAULT_DB)
def _submit(page, include_diag, submitter, collect_context, collect_logs,
collect_listings, build_issue_body, create_forgejo_issue,
upload_attachment, db_path) -> None:
"""Handle form submission: build body, file issue, upload screenshot."""
with st.spinner("Filing issue…"):
context = collect_context(page)
attachments: dict = {}
if include_diag:
attachments["logs"] = collect_logs(100)
attachments["listings"] = collect_listings(db_path, 5)
if submitter:
attachments["submitter"] = submitter
fb_type = st.session_state.get("fb_data_type", "Other")
type_key = {"Bug": "bug", "Feature Request": "feature", "Other": "other"}.get(
fb_type, "other"
)
labels = ["beta-feedback", "needs-triage"]
labels.append(
{"bug": "bug", "feature": "feature-request"}.get(type_key, "question")
)
form = {
"type": type_key,
"description": st.session_state.get("fb_data_desc", ""),
"repro": st.session_state.get("fb_data_repro", "") if type_key == "bug" else "",
}
body = build_issue_body(form, context, attachments)
try:
result = create_forgejo_issue(
st.session_state.get("fb_data_title", "Feedback"), body, labels
)
screenshot = st.session_state.get("fb_screenshot")
if screenshot:
upload_attachment(result["number"], screenshot)
_clear_feedback_state()
st.success(f"Issue filed! [View on Forgejo]({result['url']})")
st.balloons()
except Exception as exc:
st.error(f"Failed to file issue: {exc}")
def _clear_feedback_state() -> None:
for key in [
"fb_step",
"fb_type", "fb_title", "fb_desc", "fb_repro", # widget keys
"fb_data_type", "fb_data_title", "fb_data_desc", "fb_data_repro", # saved data
"fb_diag", "fb_upload", "fb_attr", "fb_screenshot", "fb_paste_key",
]:
st.session_state.pop(key, None)
def inject_feedback_button(page: str = "Unknown") -> None:
"""
Inject the floating feedback button. Call once per page render in app.py.
Hidden automatically in DEMO_MODE.
"""
if os.environ.get("DEMO_MODE", "").lower() in ("1", "true", "yes"):
return
if not os.environ.get("FORGEJO_API_TOKEN"):
return # silently skip if not configured
st.markdown(_FLOAT_CSS, unsafe_allow_html=True)
if st.button(
"💬 Feedback",
key="__feedback_floating_btn__",
help="Send feedback or report a bug",
):
_feedback_dialog(page)

View file

@ -36,47 +36,18 @@ def save_yaml(path: Path, data: dict) -> None:
path.write_text(yaml.dump(data, default_flow_style=False, allow_unicode=True))
def _suggest_search_terms(current_titles: list[str], resume_path: Path) -> dict:
"""Call LLM to suggest additional job titles and exclude keywords."""
import json
import re
from scripts.llm_router import LLMRouter
from scripts.suggest_helpers import (
suggest_search_terms as _suggest_search_terms_impl,
suggest_resume_keywords as _suggest_resume_keywords,
)
resume_context = ""
if resume_path.exists():
resume = load_yaml(resume_path)
lines = []
for exp in (resume.get("experience_details") or [])[:3]:
pos = exp.get("position", "")
co = exp.get("company", "")
skills = ", ".join((exp.get("skills_acquired") or [])[:5])
lines.append(f"- {pos} at {co}: {skills}")
resume_context = "\n".join(lines)
titles_str = "\n".join(f"- {t}" for t in current_titles)
prompt = f"""You are helping a job seeker optimize their search criteria.
Their background (from resume):
{resume_context or "Customer success and technical account management leader"}
Current job titles being searched:
{titles_str}
Suggest:
1. 5-8 additional job titles they might be missing (alternative names, adjacent roles, senior variants)
2. 3-5 keywords to add to the exclusion filter (to screen out irrelevant postings)
Return ONLY valid JSON in this exact format:
{{"suggested_titles": ["Title 1", "Title 2"], "suggested_excludes": ["keyword 1", "keyword 2"]}}"""
result = LLMRouter().complete(prompt).strip()
m = re.search(r"\{.*\}", result, re.DOTALL)
if m:
try:
return json.loads(m.group())
except Exception:
pass
return {"suggested_titles": [], "suggested_excludes": []}
def _suggest_search_terms(current_titles, resume_path, blocklist=None, user_profile=None):
return _suggest_search_terms_impl(
current_titles,
resume_path,
blocklist or {},
user_profile or {},
)
_show_finetune = bool(_profile and _profile.inference_profile in ("single-gpu", "dual-gpu"))
@ -324,6 +295,18 @@ with tab_search:
st.session_state["_sp_excludes"] = "\n".join(p.get("exclude_keywords", []))
st.session_state["_sp_hash"] = _sp_hash
# Apply any pending programmatic updates BEFORE widgets are instantiated.
# Streamlit forbids writing to a widget's key after it renders on the same pass;
# button handlers write to *_pending keys instead, consumed here on the next pass.
for _pend, _wkey in [("_sp_titles_pending", "_sp_titles_multi"),
("_sp_locs_pending", "_sp_locations_multi"),
("_sp_new_title_pending", "_sp_new_title"),
("_sp_paste_titles_pending", "_sp_paste_titles"),
("_sp_new_loc_pending", "_sp_new_loc"),
("_sp_paste_locs_pending", "_sp_paste_locs")]:
if _pend in st.session_state:
st.session_state[_wkey] = st.session_state.pop(_pend)
# ── Titles ────────────────────────────────────────────────────────────────
_title_row, _suggest_btn_col = st.columns([4, 1])
with _title_row:
@ -331,7 +314,7 @@ with tab_search:
with _suggest_btn_col:
st.write("")
_run_suggest = st.button("✨ Suggest", key="sp_suggest_btn",
help="Ask the LLM to suggest additional titles and exclude keywords based on your resume")
help="Ask the LLM to suggest additional titles and smarter exclude keywords — using your blocklist, mission values, and career background.")
st.multiselect(
"Job titles",
@ -355,8 +338,8 @@ with tab_search:
st.session_state["_sp_title_options"] = _opts
if _t not in _sel:
_sel.append(_t)
st.session_state["_sp_titles_multi"] = _sel
st.session_state["_sp_new_title"] = ""
st.session_state["_sp_titles_pending"] = _sel
st.session_state["_sp_new_title_pending"] = ""
st.rerun()
with st.expander("📋 Paste a list of titles"):
st.text_area("One title per line", key="_sp_paste_titles", height=80, label_visibility="collapsed",
@ -371,23 +354,34 @@ with tab_search:
if _t not in _sel:
_sel.append(_t)
st.session_state["_sp_title_options"] = _opts
st.session_state["_sp_titles_multi"] = _sel
st.session_state["_sp_paste_titles"] = ""
st.session_state["_sp_titles_pending"] = _sel
st.session_state["_sp_paste_titles_pending"] = ""
st.rerun()
# ── LLM suggestions panel ────────────────────────────────────────────────
if _run_suggest:
_current_titles = list(st.session_state.get("_sp_titles_multi", []))
_blocklist = load_yaml(BLOCKLIST_CFG)
_user_profile = load_yaml(USER_CFG)
with st.spinner("Asking LLM for suggestions…"):
suggestions = _suggest_search_terms(_current_titles, RESUME_PATH)
# Add suggested titles to options list (not auto-selected — user picks from dropdown)
_opts = list(st.session_state.get("_sp_title_options", []))
for _t in suggestions.get("suggested_titles", []):
if _t not in _opts:
_opts.append(_t)
st.session_state["_sp_title_options"] = _opts
st.session_state["_sp_suggestions"] = suggestions
st.rerun()
try:
suggestions = _suggest_search_terms(_current_titles, RESUME_PATH, _blocklist, _user_profile)
except RuntimeError as _e:
st.warning(
f"No LLM backend available: {_e}. "
"Check that Ollama is running and has GPU access, or enable a cloud backend in Settings → System → LLM.",
icon="⚠️",
)
suggestions = None
if suggestions is not None:
# Add suggested titles to options list (not auto-selected — user picks from dropdown)
_opts = list(st.session_state.get("_sp_title_options", []))
for _t in suggestions.get("suggested_titles", []):
if _t not in _opts:
_opts.append(_t)
st.session_state["_sp_title_options"] = _opts
st.session_state["_sp_suggestions"] = suggestions
st.rerun()
if st.session_state.get("_sp_suggestions"):
sugg = st.session_state["_sp_suggestions"]
@ -436,8 +430,8 @@ with tab_search:
st.session_state["_sp_loc_options"] = _opts
if _l not in _sel:
_sel.append(_l)
st.session_state["_sp_locations_multi"] = _sel
st.session_state["_sp_new_loc"] = ""
st.session_state["_sp_locs_pending"] = _sel
st.session_state["_sp_new_loc_pending"] = ""
st.rerun()
with st.expander("📋 Paste a list of locations"):
st.text_area("One location per line", key="_sp_paste_locs", height=80, label_visibility="collapsed",
@ -452,8 +446,8 @@ with tab_search:
if _l not in _sel:
_sel.append(_l)
st.session_state["_sp_loc_options"] = _opts
st.session_state["_sp_locations_multi"] = _sel
st.session_state["_sp_paste_locs"] = ""
st.session_state["_sp_locs_pending"] = _sel
st.session_state["_sp_paste_locs_pending"] = ""
st.rerun()
st.subheader("Exclude Keywords")
@ -747,11 +741,33 @@ with tab_resume:
st.balloons()
st.divider()
st.subheader("🏷️ Skills & Keywords")
st.caption(
f"Matched against job descriptions to surface {_name}'s most relevant experience "
"and highlight keyword overlap in research briefs. Search the bundled list or add your own."
)
_kw_header_col, _kw_btn_col = st.columns([5, 1])
with _kw_header_col:
st.subheader("🏷️ Skills & Keywords")
st.caption(
f"Matched against job descriptions to surface {_name}'s most relevant experience "
"and highlight keyword overlap in research briefs. Search the bundled list or add your own."
)
with _kw_btn_col:
st.write("")
st.write("")
_run_kw_suggest = st.button(
"✨ Suggest", key="kw_suggest_btn",
help="Ask the LLM to suggest skills, domains, and keywords based on your resume.",
)
if _run_kw_suggest:
_kw_current = load_yaml(KEYWORDS_CFG) if KEYWORDS_CFG.exists() else {}
with st.spinner("Asking LLM for keyword suggestions…"):
try:
_kw_sugg = _suggest_resume_keywords(RESUME_PATH, _kw_current)
st.session_state["_kw_suggestions"] = _kw_sugg
except RuntimeError as _e:
st.warning(
f"No LLM backend available: {_e}. "
"Check that Ollama is running and has GPU access, or enable a cloud backend in Settings → System → LLM.",
icon="⚠️",
)
from scripts.skills_utils import load_suggestions as _load_sugg, filter_tag as _filter_tag
@ -815,6 +831,33 @@ with tab_resume:
save_yaml(KEYWORDS_CFG, kw_data)
st.rerun()
# ── LLM keyword suggestion chips ──────────────────────────────────────
_kw_sugg_data = st.session_state.get("_kw_suggestions")
if _kw_sugg_data:
_KW_ICONS = {"skills": "🛠️", "domains": "🏢", "keywords": "🔑"}
_any_shown = False
for _cat, _icon in _KW_ICONS.items():
_cat_sugg = [t for t in _kw_sugg_data.get(_cat, [])
if t not in kw_data.get(_cat, [])]
if not _cat_sugg:
continue
_any_shown = True
st.caption(f"**{_icon} {_cat.capitalize()} suggestions** — click to add:")
_chip_cols = st.columns(min(len(_cat_sugg), 4))
for _i, _tag in enumerate(_cat_sugg):
with _chip_cols[_i % 4]:
if st.button(f"+ {_tag}", key=f"kw_sugg_{_cat}_{_i}"):
_new_list = list(kw_data.get(_cat, [])) + [_tag]
kw_data[_cat] = _new_list
save_yaml(KEYWORDS_CFG, kw_data)
_kw_sugg_data[_cat] = [t for t in _kw_sugg_data[_cat] if t != _tag]
st.session_state["_kw_suggestions"] = _kw_sugg_data
st.rerun()
if _any_shown:
if st.button("✕ Clear suggestions", key="kw_clear_sugg"):
st.session_state.pop("_kw_suggestions", None)
st.rerun()
# ── System tab ────────────────────────────────────────────────────────────────
with tab_system:
st.caption("Infrastructure, LLM backends, integrations, and service connections.")
@ -1005,18 +1048,88 @@ with tab_system:
f"{'' if llm_backends.get(n, {}).get('enabled', True) else ''} {n}"
for n in llm_new_order
))
if st.button("💾 Save LLM settings", type="primary", key="sys_save_llm"):
save_yaml(LLM_CFG, {**llm_cfg, "backends": llm_updated_backends, "fallback_order": llm_new_order})
# ── Cloud backend warning + acknowledgment ─────────────────────────────
from scripts.byok_guard import cloud_backends as _cloud_backends
_pending_cfg = {**llm_cfg, "backends": llm_updated_backends, "fallback_order": llm_new_order}
_pending_cloud = set(_cloud_backends(_pending_cfg))
_user_cfg_for_ack = yaml.safe_load(USER_CFG.read_text(encoding="utf-8")) or {} if USER_CFG.exists() else {}
_already_acked = set(_user_cfg_for_ack.get("byok_acknowledged_backends", []))
# Intentional: once a backend is acknowledged, it stays acknowledged even if
# temporarily disabled and re-enabled. This avoids nagging returning users.
_unacknowledged = _pending_cloud - _already_acked
def _do_save_llm(ack_backends: set) -> None:
"""Write llm.yaml and update acknowledgment in user.yaml."""
save_yaml(LLM_CFG, _pending_cfg)
st.session_state.pop("_llm_order", None)
st.session_state.pop("_llm_order_cfg_key", None)
if ack_backends:
# Re-read user.yaml at save time (not at render time) to avoid
# overwriting changes made by other processes between render and save.
_uy = yaml.safe_load(USER_CFG.read_text(encoding="utf-8")) or {} if USER_CFG.exists() else {}
_uy["byok_acknowledged_backends"] = sorted(_already_acked | ack_backends)
save_yaml(USER_CFG, _uy)
st.success("LLM settings saved!")
if _unacknowledged:
_provider_labels = ", ".join(b.replace("_", " ").title() for b in sorted(_unacknowledged))
_policy_links = []
for _b in sorted(_unacknowledged):
if _b in ("anthropic", "claude_code"):
_policy_links.append("[Anthropic privacy policy](https://www.anthropic.com/privacy)")
elif _b == "openai":
_policy_links.append("[OpenAI privacy policy](https://openai.com/policies/privacy-policy)")
_policy_str = " · ".join(_policy_links) if _policy_links else "Review your provider's documentation."
st.warning(
f"**Cloud LLM active — your data will leave this machine**\n\n"
f"Enabling **{_provider_labels}** means AI features will send content "
f"directly to that provider. CircuitForge does not receive or log it, "
f"but their privacy policy governs it — not ours.\n\n"
f"**What leaves your machine:**\n"
f"- Cover letter generation: your resume, job description, and profile\n"
f"- Keyword suggestions: your skills list and resume summary\n"
f"- Survey assistant: survey question text\n"
f"- Company research / Interview prep: company name and role only\n\n"
f"**What stays local always:** your jobs database, email credentials, "
f"license key, and Notion token.\n\n"
f"For sensitive data (disability, immigration, medical), a local model is "
f"strongly recommended. These tools assist with paperwork — they don't "
f"replace professional advice.\n\n"
f"{_policy_str} · "
f"[CircuitForge privacy policy](https://circuitforge.tech/privacy)",
icon="⚠️",
)
_ack = st.checkbox(
f"I understand — content will be sent to **{_provider_labels}** when I use AI features",
key="byok_ack_checkbox",
)
_col_cancel, _col_save = st.columns(2)
if _col_cancel.button("Cancel", key="byok_cancel"):
st.session_state.pop("byok_ack_checkbox", None)
st.rerun()
if _col_save.button(
"💾 Save with cloud LLM",
type="primary",
key="sys_save_llm_cloud",
disabled=not _ack,
):
_do_save_llm(_unacknowledged)
else:
if st.button("💾 Save LLM settings", type="primary", key="sys_save_llm"):
_do_save_llm(set())
# ── Services ──────────────────────────────────────────────────────────────
with st.expander("🔌 Services", expanded=True):
import subprocess as _sp
import shutil as _shutil
import os as _os
TOKENS_CFG = CONFIG_DIR / "tokens.yaml"
COMPOSE_DIR = str(Path(__file__).parent.parent.parent)
_compose_env = {**_os.environ, "COMPOSE_PROJECT_NAME": "peregrine"}
_docker_available = bool(_shutil.which("docker"))
_sys_profile_name = _profile.inference_profile if _profile else "remote"
SYS_SERVICES = [
@ -1108,7 +1221,7 @@ with tab_system:
elif up:
if st.button("⏹ Stop", key=f"sys_svc_stop_{svc['port']}", use_container_width=True):
with st.spinner(f"Stopping {svc['name']}"):
r = _sp.run(svc["stop"], capture_output=True, text=True, cwd=svc["cwd"])
r = _sp.run(svc["stop"], capture_output=True, text=True, cwd=svc["cwd"], env=_compose_env)
st.success("Stopped.") if r.returncode == 0 else st.error(r.stderr or r.stdout)
st.rerun()
else:
@ -1119,7 +1232,7 @@ with tab_system:
_start_cmd.append(_sel)
if st.button("▶ Start", key=f"sys_svc_start_{svc['port']}", use_container_width=True, type="primary"):
with st.spinner(f"Starting {svc['name']}"):
r = _sp.run(_start_cmd, capture_output=True, text=True, cwd=svc["cwd"])
r = _sp.run(_start_cmd, capture_output=True, text=True, cwd=svc["cwd"], env=_compose_env)
st.success("Started!") if r.returncode == 0 else st.error(r.stderr or r.stdout)
st.rerun()

View file

@ -4,12 +4,20 @@ services:
app:
build: .
command: >
bash -c "streamlit run app/app.py
--server.port=8501
--server.headless=true
--server.fileWatcherType=none
2>&1 | tee /app/data/.streamlit.log"
ports:
- "${STREAMLIT_PORT:-8501}:8501"
volumes:
- ./config:/app/config
- ./data:/app/data
- ${DOCS_DIR:-~/Documents/JobSearch}:/docs
- /var/run/docker.sock:/var/run/docker.sock
- /usr/bin/docker:/usr/bin/docker:ro
environment:
- STAGING_DB=/app/data/staging.db
- DOCS_DIR=/docs
@ -20,6 +28,9 @@ services:
- PEREGRINE_GPU_NAMES=${PEREGRINE_GPU_NAMES:-}
- RECOMMENDED_PROFILE=${RECOMMENDED_PROFILE:-remote}
- STREAMLIT_SERVER_BASE_URL_PATH=${STREAMLIT_BASE_URL_PATH:-}
- FORGEJO_API_TOKEN=${FORGEJO_API_TOKEN:-}
- FORGEJO_REPO=${FORGEJO_REPO:-}
- FORGEJO_API_URL=${FORGEJO_API_URL:-}
- PYTHONUNBUFFERED=1
- PYTHONLOGGING=WARNING
depends_on:

View file

@ -12,7 +12,7 @@ streamlit-paste-button>=0.1.0
# ── Job scraping ──────────────────────────────────────────────────────────
python-jobspy>=1.1
playwright
playwright>=1.40
selenium
undetected-chromedriver
webdriver-manager

277
scripts/backup.py Normal file
View file

@ -0,0 +1,277 @@
"""Config backup / restore / teleport for Peregrine.
Creates a portable zip of all gitignored configs + optionally the staging DB.
Intended for: machine migrations, Docker volume transfers, and safe wizard testing.
Supports both the Peregrine Docker instance and the legacy /devl/job-seeker install.
Usage (CLI):
conda run -n job-seeker python scripts/backup.py --create backup.zip
conda run -n job-seeker python scripts/backup.py --create backup.zip --no-db
conda run -n job-seeker python scripts/backup.py --create backup.zip --base-dir /devl/job-seeker
conda run -n job-seeker python scripts/backup.py --restore backup.zip
conda run -n job-seeker python scripts/backup.py --list backup.zip
Usage (programmatic called from Settings UI):
from scripts.backup import create_backup, restore_backup, list_backup_contents
zip_bytes = create_backup(base_dir, include_db=True)
info = list_backup_contents(zip_bytes)
result = restore_backup(zip_bytes, base_dir, include_db=True)
"""
from __future__ import annotations
import io
import json
import zipfile
from datetime import datetime
from pathlib import Path
# ---------------------------------------------------------------------------
# Files included in every backup (relative to repo root)
# ---------------------------------------------------------------------------
# Gitignored config files that hold secrets / personal data
_SECRET_CONFIGS = [
"config/notion.yaml",
"config/tokens.yaml",
"config/email.yaml",
"config/adzuna.yaml",
"config/craigslist.yaml",
"config/user.yaml",
"config/plain_text_resume.yaml",
"config/license.json",
"config/user.yaml.working",
]
# Gitignored integration configs (glob pattern — each matching file is added)
_INTEGRATION_CONFIG_GLOB = "config/integrations/*.yaml"
# Non-secret committed configs worth preserving for portability
# (also present in the legacy /devl/job-seeker instance)
_EXTRA_CONFIGS = [
"config/llm.yaml",
"config/search_profiles.yaml",
"config/resume_keywords.yaml", # personal keyword list — present in both instances
"config/skills_suggestions.yaml",
"config/blocklist.yaml",
"config/server.yaml", # deployment config (base URL path, port) — Peregrine only
]
# Candidate DB paths (first one that exists wins)
_DB_CANDIDATES = ["data/staging.db", "staging.db"]
_MANIFEST_NAME = "backup-manifest.json"
# ---------------------------------------------------------------------------
# Source detection
# ---------------------------------------------------------------------------
def _detect_source_label(base_dir: Path) -> str:
"""Return a human-readable label for the instance being backed up.
Uses the directory name stable as long as the repo root isn't renamed,
which is the normal case for both the Docker install (peregrine/) and the
legacy Conda install (job-seeker/).
Args:
base_dir: The root directory being backed up.
Returns:
A short identifier string, e.g. "peregrine" or "job-seeker".
"""
return base_dir.name
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def create_backup(
base_dir: Path,
include_db: bool = True,
source_label: str | None = None,
) -> bytes:
"""Return a zip archive as raw bytes.
Args:
base_dir: Repo root (parent of config/ and staging.db).
include_db: If True, include staging.db in the archive.
source_label: Human-readable instance name stored in the manifest
(e.g. "peregrine", "job-seeker"). Auto-detected if None.
"""
buf = io.BytesIO()
included: list[str] = []
with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
# Gitignored secret configs
for rel in _SECRET_CONFIGS:
p = base_dir / rel
if p.exists():
zf.write(p, rel)
included.append(rel)
# Integration configs (glob)
for p in sorted((base_dir).glob(_INTEGRATION_CONFIG_GLOB)):
rel = str(p.relative_to(base_dir))
zf.write(p, rel)
included.append(rel)
# Extra non-secret configs
for rel in _EXTRA_CONFIGS:
p = base_dir / rel
if p.exists():
zf.write(p, rel)
included.append(rel)
# Staging DB
if include_db:
for candidate in _DB_CANDIDATES:
p = base_dir / candidate
if p.exists():
zf.write(p, candidate)
included.append(candidate)
break
# Manifest
manifest = {
"created_at": datetime.now().isoformat(),
"source": source_label or _detect_source_label(base_dir),
"source_path": str(base_dir.resolve()),
"peregrine_version": "1.0",
"files": included,
"includes_db": include_db and any(f.endswith(".db") for f in included),
}
zf.writestr(_MANIFEST_NAME, json.dumps(manifest, indent=2))
return buf.getvalue()
def list_backup_contents(zip_bytes: bytes) -> dict:
"""Return manifest + file list from a backup zip (no extraction)."""
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
names = [n for n in zf.namelist() if n != _MANIFEST_NAME]
manifest: dict = {}
if _MANIFEST_NAME in zf.namelist():
manifest = json.loads(zf.read(_MANIFEST_NAME))
sizes = {info.filename: info.file_size for info in zf.infolist()}
return {
"manifest": manifest,
"files": names,
"sizes": sizes,
"total_bytes": sum(sizes[n] for n in names if n in sizes),
}
def restore_backup(
zip_bytes: bytes,
base_dir: Path,
include_db: bool = True,
overwrite: bool = True,
) -> dict[str, list[str]]:
"""Extract a backup zip into base_dir.
Args:
zip_bytes: Raw bytes of the backup zip.
base_dir: Repo root to restore into.
include_db: If False, skip any .db files.
overwrite: If False, skip files that already exist.
Returns:
{"restored": [...], "skipped": [...]}
"""
restored: list[str] = []
skipped: list[str] = []
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for name in zf.namelist():
if name == _MANIFEST_NAME:
continue
if not include_db and name.endswith(".db"):
skipped.append(name)
continue
dest = base_dir / name
if dest.exists() and not overwrite:
skipped.append(name)
continue
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(zf.read(name))
restored.append(name)
return {"restored": restored, "skipped": skipped}
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main() -> None:
import argparse
import sys
parser = argparse.ArgumentParser(description="Peregrine config backup / restore / teleport")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--create", metavar="OUT.zip", help="Create a backup zip")
group.add_argument("--restore", metavar="IN.zip", help="Restore from a backup zip")
group.add_argument("--list", metavar="IN.zip", help="List contents of a backup zip")
parser.add_argument("--no-db", action="store_true", help="Exclude staging.db (--create/--restore)")
parser.add_argument("--no-overwrite", action="store_true",
help="Skip files that already exist (--restore)")
parser.add_argument(
"--base-dir", metavar="PATH",
help="Root of the instance to back up/restore (default: this repo root). "
"Use /devl/job-seeker to target the legacy Conda install.",
)
args = parser.parse_args()
base_dir = Path(args.base_dir).resolve() if args.base_dir else Path(__file__).parent.parent
if args.create:
out = Path(args.create)
data = create_backup(base_dir, include_db=not args.no_db)
out.write_bytes(data)
info = list_backup_contents(data)
m = info["manifest"]
print(f"Backup created: {out} ({len(data):,} bytes)")
print(f" Source: {m.get('source', '?')} ({base_dir})")
print(f" {len(info['files'])} files archived:")
for name in info["files"]:
size = info["sizes"].get(name, 0)
print(f" {name} ({size:,} bytes)")
elif args.restore:
in_path = Path(args.restore)
if not in_path.exists():
print(f"ERROR: {in_path} not found", file=sys.stderr)
sys.exit(1)
data = in_path.read_bytes()
result = restore_backup(data, base_dir,
include_db=not args.no_db,
overwrite=not args.no_overwrite)
print(f"Restored {len(result['restored'])} files:")
for name in result["restored"]:
print(f"{name}")
if result["skipped"]:
print(f"Skipped {len(result['skipped'])} files:")
for name in result["skipped"]:
print(f" - {name}")
elif args.list:
in_path = Path(args.list)
if not in_path.exists():
print(f"ERROR: {in_path} not found", file=sys.stderr)
sys.exit(1)
data = in_path.read_bytes()
info = list_backup_contents(data)
m = info["manifest"]
if m:
print(f"Created: {m.get('created_at', 'unknown')}")
print(f"Source: {m.get('source', '?')} ({m.get('source_path', '?')})")
print(f"Has DB: {m.get('includes_db', '?')}")
print(f"\n{len(info['files'])} files ({info['total_bytes']:,} bytes uncompressed):")
for name in info["files"]:
size = info["sizes"].get(name, 0)
print(f" {name} ({size:,} bytes)")
if __name__ == "__main__":
main()

58
scripts/byok_guard.py Normal file
View file

@ -0,0 +1,58 @@
"""
BYOK cloud backend detection.
Determines whether LLM backends in llm.yaml send data to third-party cloud
providers. Used by Settings (activation warning) and app.py (sidebar indicator).
No Streamlit dependency pure Python so it's unit-testable and reusable.
"""
# 0.0.0.0 is a bind address (all interfaces), not a true loopback, but a backend
# configured to call it is talking to the local machine — treat as local.
LOCAL_URL_MARKERS = ("localhost", "127.0.0.1", "0.0.0.0")
def is_cloud_backend(name: str, cfg: dict) -> bool:
"""Return True if this backend sends prompts to a third-party cloud provider.
Classification rules (applied in order):
1. local: true in cfg always local (user override)
2. vision_service type always local
3. anthropic or claude_code type always cloud
4. openai_compat with a localhost/loopback base_url local
5. openai_compat with any other base_url cloud
6. anything else local (unknown types assumed safe)
"""
if cfg.get("local", False):
return False
btype = cfg.get("type", "")
if btype == "vision_service":
return False
if btype in ("anthropic", "claude_code"):
return True
if btype == "openai_compat":
url = cfg.get("base_url", "")
return not any(marker in url for marker in LOCAL_URL_MARKERS)
return False
def cloud_backends(llm_cfg: dict) -> list[str]:
"""Return names of enabled cloud backends from a parsed llm.yaml dict.
Args:
llm_cfg: parsed contents of config/llm.yaml
Returns:
List of backend names that are enabled and classified as cloud.
Empty list means fully local configuration.
"""
return [
name
for name, cfg in llm_cfg.get("backends", {}).items()
if cfg.get("enabled", True) and is_cloud_backend(name, cfg)
]

View file

@ -3,12 +3,13 @@ SQLite staging layer for job listings.
Jobs flow: pending approved/rejected applied synced
applied phone_screen interviewing offer hired (or rejected)
"""
import os
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Optional
DEFAULT_DB = Path(__file__).parent.parent / "staging.db"
DEFAULT_DB = Path(os.environ.get("STAGING_DB", Path(__file__).parent.parent / "staging.db"))
CREATE_JOBS = """
CREATE TABLE IF NOT EXISTS jobs (

223
scripts/feedback_api.py Normal file
View file

@ -0,0 +1,223 @@
"""
Feedback API pure Python backend, no Streamlit imports.
Called directly from app/feedback.py now; wrappable in a FastAPI route later.
"""
from __future__ import annotations
import os
import platform
import re
import subprocess
from datetime import datetime, timezone
from pathlib import Path
import requests
import yaml
_ROOT = Path(__file__).parent.parent
_EMAIL_RE = re.compile(r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}")
_PHONE_RE = re.compile(r"(\+?1[\s\-.]?)?\(?\d{3}\)?[\s\-.]?\d{3}[\s\-.]?\d{4}")
def mask_pii(text: str) -> str:
"""Redact email addresses and phone numbers from text."""
text = _EMAIL_RE.sub("[email redacted]", text)
text = _PHONE_RE.sub("[phone redacted]", text)
return text
def collect_context(page: str) -> dict:
"""Collect app context: page, version, tier, LLM backend, OS, timestamp."""
# App version from git
try:
version = subprocess.check_output(
["git", "describe", "--tags", "--always"],
cwd=_ROOT, text=True, timeout=5,
).strip()
except Exception:
version = "dev"
# Tier from user.yaml
tier = "unknown"
try:
user = yaml.safe_load((_ROOT / "config" / "user.yaml").read_text()) or {}
tier = user.get("tier", "unknown")
except Exception:
pass
# LLM backend from llm.yaml — report first entry in fallback_order that's enabled
llm_backend = "unknown"
try:
llm = yaml.safe_load((_ROOT / "config" / "llm.yaml").read_text()) or {}
backends = llm.get("backends", {})
for name in llm.get("fallback_order", []):
if backends.get(name, {}).get("enabled", False):
llm_backend = name
break
except Exception:
pass
return {
"page": page,
"version": version,
"tier": tier,
"llm_backend": llm_backend,
"os": platform.platform(),
"timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
}
def collect_logs(n: int = 100, log_path: Path | None = None) -> str:
"""Return last n lines of the Streamlit log, with PII masked."""
path = log_path or (_ROOT / "data" / ".streamlit.log")
if not path.exists():
return "(no log file found)"
lines = path.read_text(errors="replace").splitlines()
return mask_pii("\n".join(lines[-n:]))
def collect_listings(db_path: Path | None = None, n: int = 5) -> list[dict]:
"""Return the n most-recent job listings — title, company, url only."""
import sqlite3
from scripts.db import DEFAULT_DB
path = db_path or DEFAULT_DB
with sqlite3.connect(path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT title, company, url FROM jobs ORDER BY id DESC LIMIT ?", (n,)
).fetchall()
return [{"title": r["title"], "company": r["company"], "url": r["url"]} for r in rows]
def build_issue_body(form: dict, context: dict, attachments: dict) -> str:
"""Assemble the Forgejo issue markdown body from form data, context, and attachments."""
_TYPE_LABELS = {"bug": "🐛 Bug", "feature": "✨ Feature Request", "other": "💬 Other"}
lines: list[str] = [
f"## {_TYPE_LABELS.get(form.get('type', 'other'), '💬 Other')}",
"",
form.get("description", ""),
"",
]
if form.get("type") == "bug" and form.get("repro"):
lines += ["### Reproduction Steps", "", form["repro"], ""]
if context:
lines += ["### Context", ""]
for k, v in context.items():
lines.append(f"- **{k}:** {v}")
lines.append("")
if attachments.get("logs"):
lines += [
"<details>",
"<summary>App Logs (last 100 lines)</summary>",
"",
"```",
attachments["logs"],
"```",
"</details>",
"",
]
if attachments.get("listings"):
lines += ["### Recent Listings", ""]
for j in attachments["listings"]:
lines.append(f"- [{j['title']} @ {j['company']}]({j['url']})")
lines.append("")
if attachments.get("submitter"):
lines += ["---", f"*Submitted by: {attachments['submitter']}*"]
return "\n".join(lines)
def _ensure_labels(
label_names: list[str], base_url: str, headers: dict, repo: str
) -> list[int]:
"""Look up or create Forgejo labels by name. Returns list of IDs."""
_COLORS = {
"beta-feedback": "#0075ca",
"needs-triage": "#e4e669",
"bug": "#d73a4a",
"feature-request": "#a2eeef",
"question": "#d876e3",
}
resp = requests.get(f"{base_url}/repos/{repo}/labels", headers=headers, timeout=10)
existing = {lb["name"]: lb["id"] for lb in resp.json()} if resp.ok else {}
ids: list[int] = []
for name in label_names:
if name in existing:
ids.append(existing[name])
else:
r = requests.post(
f"{base_url}/repos/{repo}/labels",
headers=headers,
json={"name": name, "color": _COLORS.get(name, "#ededed")},
timeout=10,
)
if r.ok:
ids.append(r.json()["id"])
return ids
def create_forgejo_issue(title: str, body: str, labels: list[str]) -> dict:
"""Create a Forgejo issue. Returns {"number": int, "url": str}."""
token = os.environ.get("FORGEJO_API_TOKEN", "")
repo = os.environ.get("FORGEJO_REPO", "pyr0ball/peregrine")
base = os.environ.get("FORGEJO_API_URL", "https://git.opensourcesolarpunk.com/api/v1")
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
label_ids = _ensure_labels(labels, base, headers, repo)
resp = requests.post(
f"{base}/repos/{repo}/issues",
headers=headers,
json={"title": title, "body": body, "labels": label_ids},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
return {"number": data["number"], "url": data["html_url"]}
def upload_attachment(
issue_number: int, image_bytes: bytes, filename: str = "screenshot.png"
) -> str:
"""Upload a screenshot to an existing Forgejo issue. Returns attachment URL."""
token = os.environ.get("FORGEJO_API_TOKEN", "")
repo = os.environ.get("FORGEJO_REPO", "pyr0ball/peregrine")
base = os.environ.get("FORGEJO_API_URL", "https://git.opensourcesolarpunk.com/api/v1")
headers = {"Authorization": f"token {token}"}
resp = requests.post(
f"{base}/repos/{repo}/issues/{issue_number}/assets",
headers=headers,
files={"attachment": (filename, image_bytes, "image/png")},
timeout=15,
)
resp.raise_for_status()
return resp.json().get("browser_download_url", "")
def screenshot_page(port: int | None = None) -> bytes | None:
"""
Capture a screenshot of the running Peregrine UI using Playwright.
Returns PNG bytes, or None if Playwright is not installed or capture fails.
"""
try:
from playwright.sync_api import sync_playwright
except ImportError:
return None
if port is None:
port = int(os.environ.get("STREAMLIT_PORT", os.environ.get("STREAMLIT_SERVER_PORT", "8502")))
try:
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page(viewport={"width": 1280, "height": 800})
page.goto(f"http://localhost:{port}", timeout=10_000)
page.wait_for_load_state("networkidle", timeout=10_000)
png = page.screenshot(full_page=False)
browser.close()
return png
except Exception:
return None

160
scripts/suggest_helpers.py Normal file
View file

@ -0,0 +1,160 @@
"""
LLM-powered suggestion helpers for Settings UI.
Two functions, each makes one LLMRouter call:
- suggest_search_terms: enhanced title + three-angle exclude suggestions
- suggest_resume_keywords: skills/domains/keywords gap analysis
"""
import json
import re
from pathlib import Path
from typing import Any
from scripts.llm_router import LLMRouter
def _load_resume_context(resume_path: Path) -> str:
"""Extract 3 most recent positions from plain_text_resume.yaml as a short summary."""
import yaml
if not resume_path.exists():
return ""
resume = yaml.safe_load(resume_path.read_text()) or {}
lines = []
for exp in (resume.get("experience_details") or [])[:3]:
pos = exp.get("position", "")
co = exp.get("company", "")
skills = ", ".join((exp.get("skills_acquired") or [])[:5])
lines.append(f"- {pos} at {co}: {skills}")
return "\n".join(lines)
def _parse_json(text: str) -> dict[str, Any]:
"""Extract the first JSON object from LLM output. Returns {} on failure."""
m = re.search(r"\{.*\}", text, re.DOTALL)
if m:
try:
return json.loads(m.group())
except Exception:
pass
return {}
def suggest_search_terms(
current_titles: list[str],
resume_path: Path,
blocklist: dict[str, Any],
user_profile: dict[str, Any],
) -> dict:
"""
Suggest additional job titles and exclude keywords.
Three-angle exclude analysis:
A: Blocklist alias expansion (blocked companies/industries keyword variants)
B: Values misalignment (mission preferences industries/culture to avoid)
C: Role-type filter (career summary role types that don't fit)
Returns: {"suggested_titles": [...], "suggested_excludes": [...]}
"""
resume_context = _load_resume_context(resume_path)
titles_str = "\n".join(f"- {t}" for t in current_titles) or "(none yet)"
bl_companies = ", ".join(blocklist.get("companies", [])) or "none"
bl_industries = ", ".join(blocklist.get("industries", [])) or "none"
nda = ", ".join(user_profile.get("nda_companies", [])) or "none"
career_summary = user_profile.get("career_summary", "") or "Not provided"
mission_raw = user_profile.get("mission_preferences", {}) or {}
# Three exclude angles are intentionally collapsed into one flat suggested_excludes list
mission_str = "\n".join(
f" - {k}: {v}" for k, v in mission_raw.items() if v and isinstance(v, str) and v.strip()
) or " (none specified)"
prompt = f"""You are helping a job seeker optimise their search configuration.
--- RESUME BACKGROUND ---
{resume_context or "Not provided"}
--- CAREER SUMMARY ---
{career_summary}
--- CURRENT TITLES BEING SEARCHED ---
{titles_str}
--- BLOCKED ENTITIES ---
Companies blocked: {bl_companies}
Industries blocked: {bl_industries}
NDA / confidential employers: {nda}
--- MISSION & VALUES ---
{mission_str}
Provide all four of the following:
1. TITLE SUGGESTIONS
5-8 additional job titles they may be missing: alternative names, adjacent roles, or senior variants of their current titles.
2. EXCLUDE KEYWORDS BLOCKLIST ALIASES
The user has blocked the companies/industries above. Suggest keyword variants that would also catch their aliases, subsidiaries, or related brands.
Example: blocking "Meta" also exclude "facebook", "instagram", "metaverse", "oculus".
3. EXCLUDE KEYWORDS VALUES MISALIGNMENT
Based on the user's mission and values above, suggest industry or culture keywords to exclude.
Examples: "tobacco", "gambling", "fossil fuel", "defense contractor", "MLM", "commission-only", "pyramid".
4. EXCLUDE KEYWORDS ROLE TYPE FILTER
Based on the user's career background, suggest role-type terms that don't match their trajectory.
Examples for a CS/TAM leader: "cold calling", "door to door", "quota-driven", "SDR", "sales development rep".
Return ONLY valid JSON in exactly this format (no extra text):
{{"suggested_titles": ["Title 1", "Title 2"],
"suggested_excludes": ["keyword 1", "keyword 2", "keyword 3"]}}"""
raw = LLMRouter().complete(prompt).strip()
parsed = _parse_json(raw)
return {
"suggested_titles": parsed.get("suggested_titles", []),
"suggested_excludes": parsed.get("suggested_excludes", []),
}
def suggest_resume_keywords(
resume_path: Path,
current_kw: dict[str, list[str]],
) -> dict:
"""
Suggest skills, domains, and keywords not already in the user's resume_keywords.yaml.
Returns: {"skills": [...], "domains": [...], "keywords": [...]}
"""
resume_context = _load_resume_context(resume_path)
already_skills = ", ".join(current_kw.get("skills", [])) or "none"
already_domains = ", ".join(current_kw.get("domains", [])) or "none"
already_keywords = ", ".join(current_kw.get("keywords", [])) or "none"
prompt = f"""You are helping a job seeker build a keyword profile used to score job description matches.
--- RESUME BACKGROUND ---
{resume_context or "Not provided"}
--- ALREADY SELECTED (do not repeat these) ---
Skills: {already_skills}
Domains: {already_domains}
Keywords: {already_keywords}
Suggest additional tags in each of the three categories below. Only suggest tags NOT already in the lists above.
SKILLS specific technical or soft skills (e.g. "Salesforce", "Executive Communication", "SQL", "Stakeholder Management")
DOMAINS industry verticals, company types, or functional areas (e.g. "B2B SaaS", "EdTech", "Non-profit", "Series A-C")
KEYWORDS specific terms, methodologies, metrics, or JD phrases (e.g. "NPS", "churn prevention", "QBR", "cross-functional")
Return ONLY valid JSON in exactly this format (no extra text):
{{"skills": ["Skill A", "Skill B"],
"domains": ["Domain A"],
"keywords": ["Keyword A", "Keyword B"]}}"""
raw = LLMRouter().complete(prompt).strip()
parsed = _parse_json(raw)
return {
"skills": parsed.get("skills", []),
"domains": parsed.get("domains", []),
"keywords": parsed.get("keywords", []),
}

231
tests/test_backup.py Normal file
View file

@ -0,0 +1,231 @@
"""Tests for scripts/backup.py — create, list, restore, and multi-instance support."""
from __future__ import annotations
import json
import zipfile
from pathlib import Path
import pytest
from scripts.backup import (
_detect_source_label,
create_backup,
list_backup_contents,
restore_backup,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_instance(tmp_path: Path, name: str, *, root_db: bool = False) -> Path:
"""Build a minimal fake instance directory for testing."""
base = tmp_path / name
base.mkdir()
# Secret configs
(base / "config").mkdir()
(base / "config" / "notion.yaml").write_text("token: secret")
(base / "config" / "email.yaml").write_text("user: test@example.com")
# Extra config
(base / "config" / "llm.yaml").write_text("backend: ollama")
(base / "config" / "resume_keywords.yaml").write_text("keywords: [python]")
(base / "config" / "server.yaml").write_text("port: 8502")
# DB — either at data/staging.db (Peregrine) or staging.db root (legacy)
if root_db:
(base / "staging.db").write_bytes(b"SQLite legacy")
else:
(base / "data").mkdir()
(base / "data" / "staging.db").write_bytes(b"SQLite peregrine")
return base
# ---------------------------------------------------------------------------
# create_backup
# ---------------------------------------------------------------------------
class TestCreateBackup:
def test_returns_valid_zip(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
assert zipfile.is_zipfile(__import__("io").BytesIO(data))
def test_includes_secret_configs(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
info = list_backup_contents(data)
assert "config/notion.yaml" in info["files"]
assert "config/email.yaml" in info["files"]
def test_includes_extra_configs(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
info = list_backup_contents(data)
assert "config/llm.yaml" in info["files"]
assert "config/resume_keywords.yaml" in info["files"]
assert "config/server.yaml" in info["files"]
def test_includes_db_by_default(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
info = list_backup_contents(data)
assert info["manifest"]["includes_db"] is True
assert any(f.endswith(".db") for f in info["files"])
def test_excludes_db_when_flag_false(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base, include_db=False)
info = list_backup_contents(data)
assert info["manifest"]["includes_db"] is False
assert not any(f.endswith(".db") for f in info["files"])
def test_silently_skips_missing_files(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
# tokens.yaml not created in fixture — should not raise
data = create_backup(base)
info = list_backup_contents(data)
assert "config/tokens.yaml" not in info["files"]
def test_manifest_contains_source_label(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
info = list_backup_contents(data)
assert info["manifest"]["source"] == "peregrine"
def test_source_label_override(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base, source_label="custom-label")
info = list_backup_contents(data)
assert info["manifest"]["source"] == "custom-label"
# ---------------------------------------------------------------------------
# Legacy instance (staging.db at repo root)
# ---------------------------------------------------------------------------
class TestLegacyInstance:
def test_picks_up_root_db(self, tmp_path):
base = _make_instance(tmp_path, "job-seeker", root_db=True)
data = create_backup(base)
info = list_backup_contents(data)
assert "staging.db" in info["files"]
assert "data/staging.db" not in info["files"]
def test_source_label_is_job_seeker(self, tmp_path):
base = _make_instance(tmp_path, "job-seeker", root_db=True)
data = create_backup(base)
info = list_backup_contents(data)
assert info["manifest"]["source"] == "job-seeker"
def test_missing_peregrine_only_configs_skipped(self, tmp_path):
"""Legacy doesn't have server.yaml, user.yaml, etc. — should not error."""
base = _make_instance(tmp_path, "job-seeker", root_db=True)
# Remove server.yaml to simulate legacy (it won't exist there)
(base / "config" / "server.yaml").unlink()
data = create_backup(base)
info = list_backup_contents(data)
assert "config/server.yaml" not in info["files"]
assert "config/notion.yaml" in info["files"]
# ---------------------------------------------------------------------------
# list_backup_contents
# ---------------------------------------------------------------------------
class TestListBackupContents:
def test_returns_manifest_and_files(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
info = list_backup_contents(data)
assert "manifest" in info
assert "files" in info
assert "sizes" in info
assert "total_bytes" in info
def test_total_bytes_is_sum_of_file_sizes(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
info = list_backup_contents(data)
expected = sum(info["sizes"][f] for f in info["files"] if f in info["sizes"])
assert info["total_bytes"] == expected
def test_manifest_not_in_files_list(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
info = list_backup_contents(data)
assert "backup-manifest.json" not in info["files"]
# ---------------------------------------------------------------------------
# restore_backup
# ---------------------------------------------------------------------------
class TestRestoreBackup:
def test_restores_all_files(self, tmp_path):
src = _make_instance(tmp_path, "peregrine")
dst = tmp_path / "restored"
dst.mkdir()
data = create_backup(src)
result = restore_backup(data, dst)
assert len(result["restored"]) > 0
assert (dst / "config" / "notion.yaml").exists()
def test_skips_db_when_flag_false(self, tmp_path):
src = _make_instance(tmp_path, "peregrine")
dst = tmp_path / "restored"
dst.mkdir()
data = create_backup(src)
result = restore_backup(data, dst, include_db=False)
assert not any(f.endswith(".db") for f in result["restored"])
assert any(f.endswith(".db") for f in result["skipped"])
def test_no_overwrite_skips_existing(self, tmp_path):
src = _make_instance(tmp_path, "peregrine")
dst = tmp_path / "restored"
dst.mkdir()
(dst / "config").mkdir()
existing = dst / "config" / "notion.yaml"
existing.write_text("original content")
data = create_backup(src)
result = restore_backup(data, dst, overwrite=False)
assert "config/notion.yaml" in result["skipped"]
assert existing.read_text() == "original content"
def test_overwrite_replaces_existing(self, tmp_path):
src = _make_instance(tmp_path, "peregrine")
dst = tmp_path / "restored"
dst.mkdir()
(dst / "config").mkdir()
(dst / "config" / "notion.yaml").write_text("stale content")
data = create_backup(src)
restore_backup(data, dst, overwrite=True)
assert (dst / "config" / "notion.yaml").read_text() == "token: secret"
def test_roundtrip_preserves_content(self, tmp_path):
src = _make_instance(tmp_path, "peregrine")
original = (src / "config" / "notion.yaml").read_text()
dst = tmp_path / "restored"
dst.mkdir()
data = create_backup(src)
restore_backup(data, dst)
assert (dst / "config" / "notion.yaml").read_text() == original
# ---------------------------------------------------------------------------
# _detect_source_label
# ---------------------------------------------------------------------------
class TestDetectSourceLabel:
def test_returns_directory_name(self, tmp_path):
base = tmp_path / "peregrine"
base.mkdir()
assert _detect_source_label(base) == "peregrine"
def test_legacy_label(self, tmp_path):
base = tmp_path / "job-seeker"
base.mkdir()
assert _detect_source_label(base) == "job-seeker"

101
tests/test_byok_guard.py Normal file
View file

@ -0,0 +1,101 @@
"""Tests for BYOK cloud backend detection."""
import pytest
from scripts.byok_guard import is_cloud_backend, cloud_backends
class TestIsCloudBackend:
def test_anthropic_type_is_always_cloud(self):
assert is_cloud_backend("anthropic", {"type": "anthropic", "enabled": True}) is True
def test_claude_code_type_is_cloud(self):
assert is_cloud_backend("claude_code", {"type": "claude_code", "enabled": True}) is True
def test_vision_service_is_always_local(self):
assert is_cloud_backend("vision", {"type": "vision_service"}) is False
def test_openai_compat_localhost_is_local(self):
cfg = {"type": "openai_compat", "base_url": "http://localhost:11434/v1"}
assert is_cloud_backend("ollama", cfg) is False
def test_openai_compat_127_is_local(self):
cfg = {"type": "openai_compat", "base_url": "http://127.0.0.1:8000/v1"}
assert is_cloud_backend("vllm", cfg) is False
def test_openai_compat_0000_is_local(self):
cfg = {"type": "openai_compat", "base_url": "http://0.0.0.0:8000/v1"}
assert is_cloud_backend("vllm", cfg) is False
def test_openai_compat_remote_url_is_cloud(self):
cfg = {"type": "openai_compat", "base_url": "https://api.openai.com/v1"}
assert is_cloud_backend("openai", cfg) is True
def test_openai_compat_together_is_cloud(self):
cfg = {"type": "openai_compat", "base_url": "https://api.together.xyz/v1"}
assert is_cloud_backend("together", cfg) is True
def test_local_override_suppresses_cloud_detection(self):
cfg = {"type": "openai_compat", "base_url": "http://192.168.1.100:11434/v1", "local": True}
assert is_cloud_backend("nas_ollama", cfg) is False
def test_local_override_on_anthropic_suppresses_detection(self):
cfg = {"type": "anthropic", "local": True}
assert is_cloud_backend("anthropic", cfg) is False
def test_openai_compat_missing_base_url_treated_as_cloud(self):
# No base_url → unknown destination → defensively treated as cloud
cfg = {"type": "openai_compat"}
assert is_cloud_backend("unknown", cfg) is True
def test_unknown_type_without_url_is_local(self):
assert is_cloud_backend("mystery", {"type": "unknown_type"}) is False
class TestCloudBackends:
def test_empty_config_returns_empty(self):
assert cloud_backends({}) == []
def test_fully_local_config_returns_empty(self):
cfg = {
"backends": {
"ollama": {"type": "openai_compat", "base_url": "http://localhost:11434/v1", "enabled": True},
"vision": {"type": "vision_service", "enabled": True},
}
}
assert cloud_backends(cfg) == []
def test_cloud_backend_returned(self):
cfg = {
"backends": {
"anthropic": {"type": "anthropic", "enabled": True},
}
}
assert cloud_backends(cfg) == ["anthropic"]
def test_disabled_cloud_backend_excluded(self):
cfg = {
"backends": {
"anthropic": {"type": "anthropic", "enabled": False},
}
}
assert cloud_backends(cfg) == []
def test_mix_returns_only_enabled_cloud(self):
cfg = {
"backends": {
"ollama": {"type": "openai_compat", "base_url": "http://localhost:11434/v1", "enabled": True},
"anthropic": {"type": "anthropic", "enabled": True},
"openai": {"type": "openai_compat", "base_url": "https://api.openai.com/v1", "enabled": False},
}
}
result = cloud_backends(cfg)
assert result == ["anthropic"]
def test_multiple_cloud_backends_all_returned(self):
cfg = {
"backends": {
"anthropic": {"type": "anthropic", "enabled": True},
"openai": {"type": "openai_compat", "base_url": "https://api.openai.com/v1", "enabled": True},
}
}
result = cloud_backends(cfg)
assert set(result) == {"anthropic", "openai"}

View file

@ -21,7 +21,7 @@ class TestGenerateRefinement:
"""Call generate() with a mock router and return the captured prompt."""
captured = {}
mock_router = MagicMock()
mock_router.complete.side_effect = lambda p: (captured.update({"prompt": p}), "result")[1]
mock_router.complete.side_effect = lambda p, **kwargs: (captured.update({"prompt": p}), "result")[1]
with patch("scripts.generate_cover_letter.load_corpus", return_value=[]), \
patch("scripts.generate_cover_letter.find_similar_letters", return_value=[]):
from scripts.generate_cover_letter import generate

273
tests/test_feedback_api.py Normal file
View file

@ -0,0 +1,273 @@
"""Tests for the feedback API backend."""
import pytest
from unittest.mock import patch, MagicMock
from pathlib import Path
# ── mask_pii ──────────────────────────────────────────────────────────────────
def test_mask_pii_email():
from scripts.feedback_api import mask_pii
assert mask_pii("contact foo@bar.com please") == "contact [email redacted] please"
def test_mask_pii_phone_dashes():
from scripts.feedback_api import mask_pii
assert mask_pii("call 555-123-4567 now") == "call [phone redacted] now"
def test_mask_pii_phone_parens():
from scripts.feedback_api import mask_pii
assert mask_pii("(555) 867-5309") == "[phone redacted]"
def test_mask_pii_clean_text():
from scripts.feedback_api import mask_pii
assert mask_pii("no sensitive data here") == "no sensitive data here"
def test_mask_pii_multiple_emails():
from scripts.feedback_api import mask_pii
result = mask_pii("a@b.com and c@d.com")
assert result == "[email redacted] and [email redacted]"
# ── collect_context ───────────────────────────────────────────────────────────
def test_collect_context_required_keys():
from scripts.feedback_api import collect_context
ctx = collect_context("Home")
for key in ("page", "version", "tier", "llm_backend", "os", "timestamp"):
assert key in ctx, f"missing key: {key}"
def test_collect_context_page_value():
from scripts.feedback_api import collect_context
ctx = collect_context("MyPage")
assert ctx["page"] == "MyPage"
def test_collect_context_timestamp_is_utc():
from scripts.feedback_api import collect_context
ctx = collect_context("X")
assert ctx["timestamp"].endswith("Z")
# ── collect_logs ──────────────────────────────────────────────────────────────
def test_collect_logs_returns_string(tmp_path):
from scripts.feedback_api import collect_logs
log = tmp_path / ".streamlit.log"
log.write_text("line1\nline2\nline3\n")
result = collect_logs(log_path=log, n=10)
assert isinstance(result, str)
assert "line3" in result
def test_collect_logs_tails_n_lines(tmp_path):
from scripts.feedback_api import collect_logs
log = tmp_path / ".streamlit.log"
log.write_text("\n".join(f"line{i}" for i in range(200)))
result = collect_logs(log_path=log, n=10)
assert "line199" in result
assert "line0" not in result
def test_collect_logs_masks_pii(tmp_path):
from scripts.feedback_api import collect_logs
log = tmp_path / "test.log"
log.write_text("user foo@bar.com connected\n")
result = collect_logs(log_path=log)
assert "foo@bar.com" not in result
assert "[email redacted]" in result
def test_collect_logs_missing_file(tmp_path):
from scripts.feedback_api import collect_logs
result = collect_logs(log_path=tmp_path / "nonexistent.log")
assert "no log file" in result.lower()
# ── collect_listings ──────────────────────────────────────────────────────────
def test_collect_listings_safe_fields_only(tmp_path):
"""Only title, company, url — no cover letters, notes, or emails."""
from scripts.db import init_db, insert_job
from scripts.feedback_api import collect_listings
db = tmp_path / "test.db"
init_db(db)
insert_job(db, {
"title": "CSM", "company": "Acme", "url": "https://example.com/1",
"source": "linkedin", "location": "Remote", "is_remote": True,
"salary": "", "description": "great role", "date_found": "2026-03-01",
})
results = collect_listings(db_path=db, n=5)
assert len(results) == 1
assert set(results[0].keys()) == {"title", "company", "url"}
assert results[0]["title"] == "CSM"
def test_collect_listings_respects_n(tmp_path):
from scripts.db import init_db, insert_job
from scripts.feedback_api import collect_listings
db = tmp_path / "test.db"
init_db(db)
for i in range(10):
insert_job(db, {
"title": f"Job {i}", "company": "Acme", "url": f"https://example.com/{i}",
"source": "linkedin", "location": "Remote", "is_remote": False,
"salary": "", "description": "", "date_found": "2026-03-01",
})
assert len(collect_listings(db_path=db, n=3)) == 3
# ── build_issue_body ──────────────────────────────────────────────────────────
def test_build_issue_body_contains_description():
from scripts.feedback_api import build_issue_body
form = {"type": "bug", "title": "Test", "description": "it broke", "repro": ""}
ctx = {"page": "Home", "version": "v1.0", "tier": "free",
"llm_backend": "ollama", "os": "Linux", "timestamp": "2026-03-03T00:00:00Z"}
body = build_issue_body(form, ctx, {})
assert "it broke" in body
assert "Home" in body
assert "v1.0" in body
def test_build_issue_body_bug_includes_repro():
from scripts.feedback_api import build_issue_body
form = {"type": "bug", "title": "X", "description": "desc", "repro": "step 1\nstep 2"}
body = build_issue_body(form, {}, {})
assert "step 1" in body
assert "Reproduction" in body
def test_build_issue_body_no_repro_for_feature():
from scripts.feedback_api import build_issue_body
form = {"type": "feature", "title": "X", "description": "add dark mode", "repro": "ignored"}
body = build_issue_body(form, {}, {})
assert "Reproduction" not in body
def test_build_issue_body_logs_in_collapsible():
from scripts.feedback_api import build_issue_body
form = {"type": "other", "title": "X", "description": "Y", "repro": ""}
body = build_issue_body(form, {}, {"logs": "log line 1\nlog line 2"})
assert "<details>" in body
assert "log line 1" in body
def test_build_issue_body_omits_logs_when_not_provided():
from scripts.feedback_api import build_issue_body
form = {"type": "bug", "title": "X", "description": "Y", "repro": ""}
body = build_issue_body(form, {}, {})
assert "<details>" not in body
def test_build_issue_body_submitter_attribution():
from scripts.feedback_api import build_issue_body
form = {"type": "bug", "title": "X", "description": "Y", "repro": ""}
body = build_issue_body(form, {}, {"submitter": "Jane Doe <jane@example.com>"})
assert "Jane Doe" in body
def test_build_issue_body_listings_shown():
from scripts.feedback_api import build_issue_body
form = {"type": "bug", "title": "X", "description": "Y", "repro": ""}
listings = [{"title": "CSM", "company": "Acme", "url": "https://example.com/1"}]
body = build_issue_body(form, {}, {"listings": listings})
assert "CSM" in body
assert "Acme" in body
# ── Forgejo API ───────────────────────────────────────────────────────────────
@patch("scripts.feedback_api.requests.get")
@patch("scripts.feedback_api.requests.post")
def test_ensure_labels_uses_existing(mock_post, mock_get):
from scripts.feedback_api import _ensure_labels
mock_get.return_value.ok = True
mock_get.return_value.json.return_value = [
{"name": "beta-feedback", "id": 1},
{"name": "bug", "id": 2},
]
ids = _ensure_labels(
["beta-feedback", "bug"],
"https://example.com/api/v1", {"Authorization": "token x"}, "owner/repo"
)
assert ids == [1, 2]
mock_post.assert_not_called()
@patch("scripts.feedback_api.requests.get")
@patch("scripts.feedback_api.requests.post")
def test_ensure_labels_creates_missing(mock_post, mock_get):
from scripts.feedback_api import _ensure_labels
mock_get.return_value.ok = True
mock_get.return_value.json.return_value = []
mock_post.return_value.ok = True
mock_post.return_value.json.return_value = {"id": 99}
ids = _ensure_labels(
["needs-triage"],
"https://example.com/api/v1", {"Authorization": "token x"}, "owner/repo"
)
assert 99 in ids
@patch("scripts.feedback_api._ensure_labels", return_value=[1, 2])
@patch("scripts.feedback_api.requests.post")
def test_create_forgejo_issue_success(mock_post, mock_labels, monkeypatch):
from scripts.feedback_api import create_forgejo_issue
monkeypatch.setenv("FORGEJO_API_TOKEN", "testtoken")
monkeypatch.setenv("FORGEJO_REPO", "owner/repo")
monkeypatch.setenv("FORGEJO_API_URL", "https://example.com/api/v1")
mock_post.return_value.status_code = 201
mock_post.return_value.raise_for_status = lambda: None
mock_post.return_value.json.return_value = {"number": 42, "html_url": "https://example.com/issues/42"}
result = create_forgejo_issue("Test issue", "body text", ["beta-feedback", "bug"])
assert result["number"] == 42
assert "42" in result["url"]
@patch("scripts.feedback_api.requests.post")
def test_upload_attachment_returns_url(mock_post, monkeypatch):
from scripts.feedback_api import upload_attachment
monkeypatch.setenv("FORGEJO_API_TOKEN", "testtoken")
monkeypatch.setenv("FORGEJO_REPO", "owner/repo")
monkeypatch.setenv("FORGEJO_API_URL", "https://example.com/api/v1")
mock_post.return_value.status_code = 201
mock_post.return_value.raise_for_status = lambda: None
mock_post.return_value.json.return_value = {
"uuid": "abc", "browser_download_url": "https://example.com/assets/abc"
}
url = upload_attachment(42, b"\x89PNG", "screenshot.png")
assert url == "https://example.com/assets/abc"
# ── screenshot_page ───────────────────────────────────────────────────────────
def test_screenshot_page_returns_none_on_failure(monkeypatch):
"""screenshot_page returns None gracefully when capture fails."""
from scripts.feedback_api import screenshot_page
import playwright.sync_api as pw_api
original = pw_api.sync_playwright
def bad_playwright():
raise RuntimeError("browser unavailable")
monkeypatch.setattr(pw_api, "sync_playwright", bad_playwright)
result = screenshot_page(port=9999)
assert result is None
@patch("playwright.sync_api.sync_playwright")
def test_screenshot_page_returns_bytes(mock_pw):
"""screenshot_page returns PNG bytes when playwright is available."""
from scripts.feedback_api import screenshot_page
fake_png = b"\x89PNG\r\n\x1a\n"
mock_context = MagicMock()
mock_pw.return_value.__enter__ = lambda s: mock_context
mock_pw.return_value.__exit__ = MagicMock(return_value=False)
mock_browser = mock_context.chromium.launch.return_value
mock_page = mock_browser.new_page.return_value
mock_page.screenshot.return_value = fake_png
result = screenshot_page(port=8502)
assert result == fake_png

View file

@ -391,7 +391,7 @@ def test_rejection_uppercase_lowercased():
def test_rejection_phrase_in_quoted_thread_beyond_limit_not_blocked():
"""Rejection phrase beyond 1500-char body window does not block the email."""
from scripts.imap_sync import _has_rejection_or_ats_signal
clean_intro = "Hi Alex, we'd love to schedule a call with you. " * 30 # ~1500 chars
clean_intro = "Hi Alex, we'd love to schedule a call with you. " * 32 # ~1500 chars
quoted_footer = "\n\nOn Mon, Jan 1 wrote:\n> Unfortunately we went with another candidate."
body = clean_intro + quoted_footer
# The phrase lands after the 1500-char cutoff — should NOT be blocked

View file

@ -0,0 +1,148 @@
"""Tests for scripts/suggest_helpers.py."""
import json
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock
RESUME_PATH = Path(__file__).parent.parent / "config" / "plain_text_resume.yaml"
# ── _parse_json ───────────────────────────────────────────────────────────────
def test_parse_json_extracts_valid_object():
from scripts.suggest_helpers import _parse_json
raw = 'Here is the result: {"a": [1, 2], "b": "hello"} done.'
assert _parse_json(raw) == {"a": [1, 2], "b": "hello"}
def test_parse_json_returns_empty_on_invalid():
from scripts.suggest_helpers import _parse_json
assert _parse_json("no json here") == {}
assert _parse_json('{"broken": ') == {}
# ── suggest_search_terms ──────────────────────────────────────────────────────
BLOCKLIST = {
"companies": ["Meta", "Amazon"],
"industries": ["gambling"],
"locations": [],
}
USER_PROFILE = {
"career_summary": "Customer success leader with 10 years in B2B SaaS.",
"mission_preferences": {
"animal_welfare": "I volunteer at my local shelter.",
"education": "",
},
"nda_companies": ["Acme Corp"],
}
def _mock_llm(response_dict: dict):
"""Return a patcher that makes LLMRouter().complete() return a JSON string."""
mock_router = MagicMock()
mock_router.complete.return_value = json.dumps(response_dict)
return patch("scripts.suggest_helpers.LLMRouter", return_value=mock_router)
def test_suggest_search_terms_returns_titles_and_excludes():
from scripts.suggest_helpers import suggest_search_terms
payload = {"suggested_titles": ["VP Customer Success"], "suggested_excludes": ["cold calling"]}
with _mock_llm(payload):
result = suggest_search_terms(["Customer Success Manager"], RESUME_PATH, BLOCKLIST, USER_PROFILE)
assert result["suggested_titles"] == ["VP Customer Success"]
assert result["suggested_excludes"] == ["cold calling"]
def test_suggest_search_terms_prompt_contains_blocklist_companies():
from scripts.suggest_helpers import suggest_search_terms
with _mock_llm({"suggested_titles": [], "suggested_excludes": []}) as mock_cls:
suggest_search_terms(["CSM"], RESUME_PATH, BLOCKLIST, USER_PROFILE)
prompt_sent = mock_cls.return_value.complete.call_args[0][0]
assert "Meta" in prompt_sent
assert "Amazon" in prompt_sent
def test_suggest_search_terms_prompt_contains_mission():
from scripts.suggest_helpers import suggest_search_terms
with _mock_llm({"suggested_titles": [], "suggested_excludes": []}) as mock_cls:
suggest_search_terms(["CSM"], RESUME_PATH, BLOCKLIST, USER_PROFILE)
prompt_sent = mock_cls.return_value.complete.call_args[0][0]
assert "animal_welfare" in prompt_sent or "animal welfare" in prompt_sent.lower()
def test_suggest_search_terms_prompt_contains_career_summary():
from scripts.suggest_helpers import suggest_search_terms
with _mock_llm({"suggested_titles": [], "suggested_excludes": []}) as mock_cls:
suggest_search_terms(["CSM"], RESUME_PATH, BLOCKLIST, USER_PROFILE)
prompt_sent = mock_cls.return_value.complete.call_args[0][0]
assert "Customer success leader" in prompt_sent
def test_suggest_search_terms_returns_empty_on_bad_json():
from scripts.suggest_helpers import suggest_search_terms
mock_router = MagicMock()
mock_router.complete.return_value = "sorry, I cannot help with that"
with patch("scripts.suggest_helpers.LLMRouter", return_value=mock_router):
result = suggest_search_terms(["CSM"], RESUME_PATH, BLOCKLIST, USER_PROFILE)
assert result == {"suggested_titles": [], "suggested_excludes": []}
def test_suggest_search_terms_raises_on_llm_exhausted():
from scripts.suggest_helpers import suggest_search_terms
mock_router = MagicMock()
mock_router.complete.side_effect = RuntimeError("All LLM backends exhausted")
with patch("scripts.suggest_helpers.LLMRouter", return_value=mock_router):
with pytest.raises(RuntimeError, match="All LLM backends exhausted"):
suggest_search_terms(["CSM"], RESUME_PATH, BLOCKLIST, USER_PROFILE)
# ── suggest_resume_keywords ───────────────────────────────────────────────────
CURRENT_KW = {
"skills": ["Customer Success", "SQL"],
"domains": ["B2B SaaS"],
"keywords": ["NPS"],
}
def test_suggest_resume_keywords_returns_all_three_categories():
from scripts.suggest_helpers import suggest_resume_keywords
payload = {
"skills": ["Project Management"],
"domains": ["EdTech"],
"keywords": ["churn prevention"],
}
with _mock_llm(payload):
result = suggest_resume_keywords(RESUME_PATH, CURRENT_KW)
assert "skills" in result
assert "domains" in result
assert "keywords" in result
def test_suggest_resume_keywords_excludes_already_selected():
from scripts.suggest_helpers import suggest_resume_keywords
with _mock_llm({"skills": [], "domains": [], "keywords": []}) as mock_cls:
suggest_resume_keywords(RESUME_PATH, CURRENT_KW)
prompt_sent = mock_cls.return_value.complete.call_args[0][0]
# Already-selected tags should appear in the prompt so LLM knows to skip them
assert "Customer Success" in prompt_sent
assert "NPS" in prompt_sent
def test_suggest_resume_keywords_returns_empty_on_bad_json():
from scripts.suggest_helpers import suggest_resume_keywords
mock_router = MagicMock()
mock_router.complete.return_value = "I cannot assist."
with patch("scripts.suggest_helpers.LLMRouter", return_value=mock_router):
result = suggest_resume_keywords(RESUME_PATH, CURRENT_KW)
assert result == {"skills": [], "domains": [], "keywords": []}
def test_suggest_resume_keywords_raises_on_llm_exhausted():
from scripts.suggest_helpers import suggest_resume_keywords
mock_router = MagicMock()
mock_router.complete.side_effect = RuntimeError("All LLM backends exhausted")
with patch("scripts.suggest_helpers.LLMRouter", return_value=mock_router):
with pytest.raises(RuntimeError, match="All LLM backends exhausted"):
suggest_resume_keywords(RESUME_PATH, CURRENT_KW)