Compare commits

...

12 commits

Author SHA1 Message Date
7d15980bdd docs: update backlog with LinkedIn import follow-up items
Some checks are pending
CI / test (push) Waiting to run
2026-03-13 11:24:55 -07:00
9603d591a3 fix(cloud): use per-user config dir for wizard gate; redirect on invalid session
- app.py: wizard gate now reads get_config_dir()/user.yaml instead of
  hardcoded repo-level config/ — fixes perpetual onboarding loop in
  cloud mode where per-user wizard_complete was never seen
- app.py: page title corrected to "Peregrine"
- cloud_session.py: add get_config_dir() returning per-user config path
  in cloud mode, repo config/ locally
- cloud_session.py: replace st.error() with JS redirect on missing/invalid
  session token so users land on login page instead of error screen
- Home.py, 4_Apply.py, migrate.py: remove remaining AIHawk UI references
2026-03-13 11:24:42 -07:00
f3617abb6b fix(linkedin): conservative settings merge, mkdir guard, split dockerfile playwright layer 2026-03-13 10:58:58 -07:00
6b59804d35 fix(linkedin): move session state pop before tabs; add rerun after settings merge
- Pop _linkedin_extracted before st.tabs() so tab_builder sees the
  freshly populated _parsed_resume in the same render pass (no extra rerun needed)
- Fix tab label capitalisation: "Build Manually" (capital M) per spec
- Add st.rerun() after LinkedIn merge in Settings so form fields
  refresh immediately to show the newly applied data
2026-03-13 10:55:25 -07:00
7b9e758861 feat(linkedin): install Playwright Chromium in Docker image 2026-03-13 10:44:03 -07:00
070be6c2e9 feat(linkedin): add LinkedIn import expander to Settings Resume Profile tab 2026-03-13 10:44:02 -07:00
083dff2ec8 feat(linkedin): add LinkedIn tab to wizard resume step 2026-03-13 10:43:53 -07:00
ac1db1ea7f feat(linkedin): add shared LinkedIn import Streamlit widget 2026-03-13 10:32:23 -07:00
260d186c86 feat(linkedin): add staging file parser with re-parse support 2026-03-13 10:18:01 -07:00
04d0a66f21 fix(linkedin): improve scraper error handling, current-job date range, add missing tests 2026-03-13 06:02:03 -07:00
32ed451933 feat(linkedin): add scraper (Playwright + export zip) with URL validation 2026-03-13 01:06:39 -07:00
6c61290218 feat(linkedin): add HTML parser utils with fixture tests 2026-03-13 01:01:05 -07:00
18 changed files with 1312 additions and 78 deletions

View file

@ -10,8 +10,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
COPY requirements.txt . COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
# Install Playwright browser (cached separately from Python deps so requirements
# changes don't bust the ~600900 MB Chromium layer and vice versa)
RUN playwright install chromium && playwright install-deps chromium
# Bundle companyScraper (company research web scraper) # Bundle companyScraper (company research web scraper)
COPY scrapers/ /app/scrapers/ COPY scrapers/ /app/scrapers/

View file

@ -69,7 +69,7 @@ _SETUP_BANNERS = [
{"key": "upload_corpus", "text": "Upload your cover letter corpus for voice fine-tuning", {"key": "upload_corpus", "text": "Upload your cover letter corpus for voice fine-tuning",
"link_label": "Settings → Fine-Tune"}, "link_label": "Settings → Fine-Tune"},
{"key": "configure_linkedin", "text": "Configure LinkedIn Easy Apply automation", {"key": "configure_linkedin", "text": "Configure LinkedIn Easy Apply automation",
"link_label": "Settings → AIHawk"}, "link_label": "Settings → Integrations"},
{"key": "setup_searxng", "text": "Set up company research with SearXNG", {"key": "setup_searxng", "text": "Set up company research with SearXNG",
"link_label": "Settings → Services"}, "link_label": "Settings → Services"},
{"key": "target_companies", "text": "Build a target company list for focused outreach", {"key": "target_companies", "text": "Build a target company list for focused outreach",

View file

@ -22,11 +22,11 @@ IS_DEMO = os.environ.get("DEMO_MODE", "").lower() in ("1", "true", "yes")
import streamlit as st import streamlit as st
from scripts.db import DEFAULT_DB, init_db, get_active_tasks from scripts.db import DEFAULT_DB, init_db, get_active_tasks
from app.feedback import inject_feedback_button from app.feedback import inject_feedback_button
from app.cloud_session import resolve_session, get_db_path from app.cloud_session import resolve_session, get_db_path, get_config_dir
import sqlite3 import sqlite3
st.set_page_config( st.set_page_config(
page_title="Job Seeker", page_title="Peregrine",
page_icon="💼", page_icon="💼",
layout="wide", layout="wide",
) )
@ -80,7 +80,7 @@ except Exception:
# ── First-run wizard gate ─────────────────────────────────────────────────────── # ── First-run wizard gate ───────────────────────────────────────────────────────
from scripts.user_profile import UserProfile as _UserProfile from scripts.user_profile import UserProfile as _UserProfile
_USER_YAML = Path(__file__).parent.parent / "config" / "user.yaml" _USER_YAML = get_config_dir() / "user.yaml"
_show_wizard = not IS_DEMO and ( _show_wizard = not IS_DEMO and (
not _UserProfile.exists(_USER_YAML) not _UserProfile.exists(_USER_YAML)

View file

@ -112,13 +112,19 @@ def resolve_session(app: str = "peregrine") -> None:
cookie_header = st.context.headers.get("x-cf-session", "") cookie_header = st.context.headers.get("x-cf-session", "")
session_jwt = _extract_session_token(cookie_header) session_jwt = _extract_session_token(cookie_header)
if not session_jwt: if not session_jwt:
st.error("Session token missing. Please log in at circuitforge.tech.") st.components.v1.html(
'<script>window.top.location.href = "https://circuitforge.tech/login";</script>',
height=0,
)
st.stop() st.stop()
try: try:
user_id = validate_session_jwt(session_jwt) user_id = validate_session_jwt(session_jwt)
except Exception as exc: except Exception:
st.error(f"Invalid session — please log in again. ({exc})") st.components.v1.html(
'<script>window.top.location.href = "https://circuitforge.tech/login";</script>',
height=0,
)
st.stop() st.stop()
user_path = _user_data_path(user_id, app) user_path = _user_data_path(user_id, app)
@ -141,6 +147,19 @@ def get_db_path() -> Path:
return st.session_state.get("db_path", DEFAULT_DB) return st.session_state.get("db_path", DEFAULT_DB)
def get_config_dir() -> Path:
"""
Return the config directory for this session.
Cloud: per-user path (<data_root>/<user_id>/peregrine/config/) so each
user's YAML files (user.yaml, plain_text_resume.yaml, etc.) are
isolated and never shared across tenants.
Local: repo-level config/ directory.
"""
if CLOUD_MODE and st.session_state.get("db_path"):
return Path(st.session_state["db_path"]).parent / "config"
return Path(__file__).parent.parent.parent / "config"
def get_cloud_tier() -> str: def get_cloud_tier() -> str:
""" """
Return the current user's cloud tier. Return the current user's cloud tier.

View file

@ -0,0 +1 @@
# app/components/__init__.py

View file

@ -0,0 +1,185 @@
# app/components/linkedin_import.py
"""
Shared LinkedIn import widget.
Usage in a page:
from app.components.linkedin_import import render_linkedin_tab
# At top of page render — check for pending import:
_li_data = st.session_state.pop("_linkedin_extracted", None)
if _li_data:
st.session_state["_parsed_resume"] = _li_data
st.rerun()
# Inside the LinkedIn tab:
with tab_linkedin:
render_linkedin_tab(config_dir=CONFIG_DIR, tier=tier)
"""
from __future__ import annotations
import json
import re
from datetime import datetime, timezone
from pathlib import Path
import streamlit as st
_LINKEDIN_PROFILE_RE = re.compile(r"https?://(www\.)?linkedin\.com/in/", re.I)
def _stage_path(config_dir: Path) -> Path:
return config_dir / "linkedin_stage.json"
def _load_stage(config_dir: Path) -> dict | None:
path = _stage_path(config_dir)
if not path.exists():
return None
try:
return json.loads(path.read_text())
except Exception:
return None
def _days_ago(iso_ts: str) -> str:
try:
dt = datetime.fromisoformat(iso_ts)
delta = datetime.now(timezone.utc) - dt
days = delta.days
if days == 0:
return "today"
if days == 1:
return "yesterday"
return f"{days} days ago"
except Exception:
return "unknown"
def _do_scrape(url: str, config_dir: Path) -> None:
"""Validate URL, run scrape, update state."""
if not _LINKEDIN_PROFILE_RE.match(url):
st.error("Please enter a LinkedIn profile URL (linkedin.com/in/…)")
return
with st.spinner("Fetching LinkedIn profile… (1020 seconds)"):
try:
from scripts.linkedin_scraper import scrape_profile
scrape_profile(url, _stage_path(config_dir))
st.success("Profile imported successfully.")
st.rerun()
except ValueError as e:
st.error(str(e))
except RuntimeError as e:
st.warning(str(e))
except Exception as e:
st.error(f"Unexpected error: {e}")
def render_linkedin_tab(config_dir: Path, tier: str) -> None:
"""
Render the LinkedIn import UI.
When the user clicks "Use this data", writes the extracted dict to
st.session_state["_linkedin_extracted"] and calls st.rerun().
Caller reads: data = st.session_state.pop("_linkedin_extracted", None)
"""
stage = _load_stage(config_dir)
# ── Staged data status bar ────────────────────────────────────────────────
if stage:
scraped_at = stage.get("scraped_at", "")
source_label = "LinkedIn export" if stage.get("source") == "export_zip" else "LinkedIn profile"
col_info, col_refresh = st.columns([4, 1])
col_info.caption(f"Last imported from {source_label}: {_days_ago(scraped_at)}")
if col_refresh.button("🔄 Refresh", key="li_refresh"):
url = stage.get("url")
if url:
_do_scrape(url, config_dir)
else:
st.info("Original URL not available — paste the URL below to re-import.")
# ── URL import ────────────────────────────────────────────────────────────
st.markdown("**Import from LinkedIn profile URL**")
url_input = st.text_input(
"LinkedIn profile URL",
placeholder="https://linkedin.com/in/your-name",
label_visibility="collapsed",
key="li_url_input",
)
if st.button("🔗 Import from LinkedIn", key="li_import_btn", type="primary"):
if not url_input.strip():
st.warning("Please enter your LinkedIn profile URL.")
else:
_do_scrape(url_input.strip(), config_dir)
st.caption(
"Imports from your public LinkedIn profile. No login or credentials required. "
"Scraping typically takes 1020 seconds."
)
# ── Section preview + use button ─────────────────────────────────────────
if stage:
from scripts.linkedin_parser import parse_stage
extracted, err = parse_stage(_stage_path(config_dir))
if err:
st.warning(f"Could not read staged data: {err}")
else:
st.divider()
st.markdown("**Preview**")
col1, col2, col3 = st.columns(3)
col1.metric("Experience entries", len(extracted.get("experience", [])))
col2.metric("Skills", len(extracted.get("skills", [])))
col3.metric("Certifications", len(extracted.get("achievements", [])))
if extracted.get("career_summary"):
with st.expander("Summary"):
st.write(extracted["career_summary"])
if extracted.get("experience"):
with st.expander(f"Experience ({len(extracted['experience'])} entries)"):
for exp in extracted["experience"]:
st.markdown(f"**{exp.get('title')}** @ {exp.get('company')} · {exp.get('date_range', '')}")
if extracted.get("education"):
with st.expander("Education"):
for edu in extracted["education"]:
st.markdown(f"**{edu.get('school')}** — {edu.get('degree')} {edu.get('field', '')}".strip())
if extracted.get("skills"):
with st.expander("Skills"):
st.write(", ".join(extracted["skills"]))
st.divider()
if st.button("✅ Use this data", key="li_use_btn", type="primary"):
st.session_state["_linkedin_extracted"] = extracted
st.rerun()
# ── Advanced: data export ─────────────────────────────────────────────────
with st.expander("⬇️ Import from LinkedIn data export (advanced)", expanded=False):
st.caption(
"Download your LinkedIn data: **Settings & Privacy → Data Privacy → "
"Get a copy of your data → Request archive → Fast file**. "
"The Fast file is available immediately and contains your profile, "
"experience, education, and skills."
)
zip_file = st.file_uploader(
"Upload LinkedIn export zip", type=["zip"], key="li_zip_upload"
)
if zip_file is not None:
if st.button("📦 Parse export", key="li_parse_zip"):
with st.spinner("Parsing export archive…"):
try:
from scripts.linkedin_scraper import parse_export_zip
extracted = parse_export_zip(
zip_file.read(), _stage_path(config_dir)
)
st.success(
f"Imported {len(extracted.get('experience', []))} experience entries, "
f"{len(extracted.get('skills', []))} skills. "
"Click 'Use this data' above to apply."
)
st.rerun()
except Exception as e:
st.error(f"Failed to parse export: {e}")

View file

@ -15,14 +15,14 @@ sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import streamlit as st import streamlit as st
import yaml import yaml
from app.cloud_session import resolve_session, get_db_path from app.cloud_session import resolve_session, get_db_path, get_config_dir
resolve_session("peregrine") resolve_session("peregrine")
_ROOT = Path(__file__).parent.parent.parent _ROOT = Path(__file__).parent.parent.parent
CONFIG_DIR = _ROOT / "config" CONFIG_DIR = get_config_dir() # per-user dir in cloud; repo config/ locally
USER_YAML = CONFIG_DIR / "user.yaml" USER_YAML = CONFIG_DIR / "user.yaml"
STEPS = 6 # mandatory steps STEPS = 6 # mandatory steps
STEP_LABELS = ["Hardware", "Tier", "Identity", "Resume", "Inference", "Search"] STEP_LABELS = ["Hardware", "Tier", "Resume", "Identity", "Inference", "Search"]
# ── Helpers ──────────────────────────────────────────────────────────────────── # ── Helpers ────────────────────────────────────────────────────────────────────
@ -179,6 +179,13 @@ st.divider()
# ── Step 1: Hardware ─────────────────────────────────────────────────────────── # ── Step 1: Hardware ───────────────────────────────────────────────────────────
if step == 1: if step == 1:
from app.cloud_session import CLOUD_MODE as _CLOUD_MODE
if _CLOUD_MODE:
# Cloud deployment: always single-gpu (Heimdall), skip hardware selection
_save_yaml({"inference_profile": "single-gpu", "wizard_step": 1})
st.session_state.wizard_step = 2
st.rerun()
from app.wizard.step_hardware import validate, PROFILES from app.wizard.step_hardware import validate, PROFILES
st.subheader("Step 1 \u2014 Hardware Detection") st.subheader("Step 1 \u2014 Hardware Detection")
@ -212,6 +219,14 @@ if step == 1:
# ── Step 2: Tier ─────────────────────────────────────────────────────────────── # ── Step 2: Tier ───────────────────────────────────────────────────────────────
elif step == 2: elif step == 2:
from app.cloud_session import CLOUD_MODE as _CLOUD_MODE
if _CLOUD_MODE:
# Cloud mode: tier already resolved from Heimdall at session init
cloud_tier = st.session_state.get("cloud_tier", "free")
_save_yaml({"tier": cloud_tier, "wizard_step": 2})
st.session_state.wizard_step = 3
st.rerun()
from app.wizard.step_tier import validate from app.wizard.step_tier import validate
st.subheader("Step 2 \u2014 Choose Your Plan") st.subheader("Step 2 \u2014 Choose Your Plan")
@ -248,63 +263,21 @@ elif step == 2:
st.rerun() st.rerun()
# ── Step 3: Identity ─────────────────────────────────────────────────────────── # ── Step 3: Resume ─────────────────────────────────────────────────────────────
elif step == 3: elif step == 3:
from app.wizard.step_identity import validate
st.subheader("Step 3 \u2014 Your Identity")
st.caption("Used in cover letter PDFs, LLM prompts, and the app header.")
c1, c2 = st.columns(2)
name = c1.text_input("Full Name *", saved_yaml.get("name", ""))
email = c1.text_input("Email *", saved_yaml.get("email", ""))
phone = c2.text_input("Phone", saved_yaml.get("phone", ""))
linkedin = c2.text_input("LinkedIn URL", saved_yaml.get("linkedin", ""))
# Career summary with optional LLM generation
summary_default = st.session_state.get("_gen_result_career_summary") or saved_yaml.get("career_summary", "")
summary = st.text_area(
"Career Summary *", value=summary_default, height=120,
placeholder="Experienced professional with X years in [field]. Specialise in [skills].",
help="Injected into cover letter and research prompts as your professional context.",
)
gen_result = _generation_widget(
section="career_summary",
label="Generate from resume",
tier=_tier,
feature_key="llm_career_summary",
input_data={"resume_text": saved_yaml.get("_raw_resume_text", "")},
)
if gen_result and gen_result != summary:
st.info(f"\u2728 Suggested summary \u2014 paste it above if it looks good:\n\n{gen_result}")
col_back, col_next = st.columns([1, 4])
if col_back.button("\u2190 Back", key="ident_back"):
st.session_state.wizard_step = 2
st.rerun()
if col_next.button("Next \u2192", type="primary", key="ident_next"):
errs = validate({"name": name, "email": email, "career_summary": summary})
if errs:
st.error("\n".join(errs))
else:
_save_yaml({
"name": name, "email": email, "phone": phone,
"linkedin": linkedin, "career_summary": summary,
"wizard_complete": False, "wizard_step": 3,
})
st.session_state.wizard_step = 4
st.rerun()
# ── Step 4: Resume ─────────────────────────────────────────────────────────────
elif step == 4:
from app.wizard.step_resume import validate from app.wizard.step_resume import validate
st.subheader("Step 4 \u2014 Resume") st.subheader("Step 3 \u2014 Resume")
st.caption("Upload your resume for fast parsing, or build it section by section.") st.caption("Upload your resume for fast parsing, or build it section by section.")
tab_upload, tab_builder = st.tabs(["\U0001f4ce Upload", "\U0001f4dd Build manually"]) # Read LinkedIn import result before tabs render (spec: "at step render time")
_li_data = st.session_state.pop("_linkedin_extracted", None)
if _li_data:
st.session_state["_parsed_resume"] = _li_data
tab_upload, tab_builder, tab_linkedin = st.tabs([
"\U0001f4ce Upload", "\U0001f4dd Build Manually", "\U0001f517 LinkedIn"
])
with tab_upload: with tab_upload:
uploaded = st.file_uploader("Upload PDF, DOCX, or ODT", type=["pdf", "docx", "odt"]) uploaded = st.file_uploader("Upload PDF, DOCX, or ODT", type=["pdf", "docx", "odt"])
@ -393,9 +366,13 @@ elif step == 4:
input_data={"bullet_notes": all_bullets}, input_data={"bullet_notes": all_bullets},
) )
with tab_linkedin:
from app.components.linkedin_import import render_linkedin_tab
render_linkedin_tab(config_dir=CONFIG_DIR, tier=_tier)
col_back, col_next = st.columns([1, 4]) col_back, col_next = st.columns([1, 4])
if col_back.button("\u2190 Back", key="resume_back"): if col_back.button("\u2190 Back", key="resume_back"):
st.session_state.wizard_step = 3 st.session_state.wizard_step = 2
st.rerun() st.rerun()
if col_next.button("Next \u2192", type="primary", key="resume_next"): if col_next.button("Next \u2192", type="primary", key="resume_next"):
parsed = st.session_state.get("_parsed_resume", {}) parsed = st.session_state.get("_parsed_resume", {})
@ -407,19 +384,75 @@ elif step == 4:
if errs: if errs:
st.error("\n".join(errs)) st.error("\n".join(errs))
else: else:
resume_yaml_path = _ROOT / "config" / "plain_text_resume.yaml" resume_yaml_path = CONFIG_DIR / "plain_text_resume.yaml"
resume_yaml_path.parent.mkdir(parents=True, exist_ok=True) resume_yaml_path.parent.mkdir(parents=True, exist_ok=True)
resume_data = {**parsed, "experience": experience} if parsed else {"experience": experience} resume_data = {**parsed, "experience": experience} if parsed else {"experience": experience}
resume_yaml_path.write_text( resume_yaml_path.write_text(
yaml.dump(resume_data, default_flow_style=False, allow_unicode=True) yaml.dump(resume_data, default_flow_style=False, allow_unicode=True)
) )
_save_yaml({"wizard_step": 4}) _save_yaml({"wizard_step": 3})
st.session_state.wizard_step = 4
st.rerun()
# ── Step 4: Identity ───────────────────────────────────────────────────────────
elif step == 4:
from app.wizard.step_identity import validate
st.subheader("Step 4 \u2014 Your Identity")
st.caption("Used in cover letter PDFs, LLM prompts, and the app header.")
c1, c2 = st.columns(2)
name = c1.text_input("Full Name *", saved_yaml.get("name", ""))
email = c1.text_input("Email *", saved_yaml.get("email", ""))
phone = c2.text_input("Phone", saved_yaml.get("phone", ""))
linkedin = c2.text_input("LinkedIn URL", saved_yaml.get("linkedin", ""))
# Career summary with optional LLM generation — resume text available now (step 3 ran first)
summary_default = st.session_state.get("_gen_result_career_summary") or saved_yaml.get("career_summary", "")
summary = st.text_area(
"Career Summary *", value=summary_default, height=120,
placeholder="Experienced professional with X years in [field]. Specialise in [skills].",
help="Injected into cover letter and research prompts as your professional context.",
)
gen_result = _generation_widget(
section="career_summary",
label="Generate from resume",
tier=_tier,
feature_key="llm_career_summary",
input_data={"resume_text": saved_yaml.get("_raw_resume_text", "")},
)
if gen_result and gen_result != summary:
st.info(f"\u2728 Suggested summary \u2014 paste it above if it looks good:\n\n{gen_result}")
col_back, col_next = st.columns([1, 4])
if col_back.button("\u2190 Back", key="ident_back"):
st.session_state.wizard_step = 3
st.rerun()
if col_next.button("Next \u2192", type="primary", key="ident_next"):
errs = validate({"name": name, "email": email, "career_summary": summary})
if errs:
st.error("\n".join(errs))
else:
_save_yaml({
"name": name, "email": email, "phone": phone,
"linkedin": linkedin, "career_summary": summary,
"wizard_complete": False, "wizard_step": 4,
})
st.session_state.wizard_step = 5 st.session_state.wizard_step = 5
st.rerun() st.rerun()
# ── Step 5: Inference ────────────────────────────────────────────────────────── # ── Step 5: Inference ──────────────────────────────────────────────────────────
elif step == 5: elif step == 5:
from app.cloud_session import CLOUD_MODE as _CLOUD_MODE
if _CLOUD_MODE:
# Cloud deployment: inference is managed server-side; skip this step
_save_yaml({"wizard_step": 5})
st.session_state.wizard_step = 6
st.rerun()
from app.wizard.step_inference import validate from app.wizard.step_inference import validate
st.subheader("Step 5 \u2014 Inference & API Keys") st.subheader("Step 5 \u2014 Inference & API Keys")

View file

@ -12,23 +12,24 @@ import yaml
import os as _os import os as _os
from scripts.user_profile import UserProfile from scripts.user_profile import UserProfile
from app.cloud_session import resolve_session, get_db_path, CLOUD_MODE from app.cloud_session import resolve_session, get_db_path, get_config_dir, CLOUD_MODE
_USER_YAML = Path(__file__).parent.parent.parent / "config" / "user.yaml"
_profile = UserProfile(_USER_YAML) if UserProfile.exists(_USER_YAML) else None
_name = _profile.name if _profile else "Job Seeker"
resolve_session("peregrine") resolve_session("peregrine")
st.title("⚙️ Settings") st.title("⚙️ Settings")
CONFIG_DIR = Path(__file__).parent.parent.parent / "config" # Config paths — per-user directory in cloud mode, shared repo config/ locally
CONFIG_DIR = get_config_dir()
SEARCH_CFG = CONFIG_DIR / "search_profiles.yaml" SEARCH_CFG = CONFIG_DIR / "search_profiles.yaml"
BLOCKLIST_CFG = CONFIG_DIR / "blocklist.yaml" BLOCKLIST_CFG = CONFIG_DIR / "blocklist.yaml"
LLM_CFG = CONFIG_DIR / "llm.yaml" LLM_CFG = CONFIG_DIR / "llm.yaml"
NOTION_CFG = CONFIG_DIR / "notion.yaml" NOTION_CFG = CONFIG_DIR / "notion.yaml"
RESUME_PATH = Path(__file__).parent.parent.parent / "config" / "plain_text_resume.yaml" RESUME_PATH = CONFIG_DIR / "plain_text_resume.yaml"
KEYWORDS_CFG = CONFIG_DIR / "resume_keywords.yaml" KEYWORDS_CFG = CONFIG_DIR / "resume_keywords.yaml"
_USER_YAML = CONFIG_DIR / "user.yaml"
_profile = UserProfile(_USER_YAML) if UserProfile.exists(_USER_YAML) else None
_name = _profile.name if _profile else "Peregrine User"
def load_yaml(path: Path) -> dict: def load_yaml(path: Path) -> dict:
if path.exists(): if path.exists():
return yaml.safe_load(path.read_text()) or {} return yaml.safe_load(path.read_text()) or {}
@ -54,8 +55,9 @@ def _suggest_search_terms(current_titles, resume_path, blocklist=None, user_prof
_show_finetune = bool(_profile and _profile.inference_profile in ("single-gpu", "dual-gpu")) _show_finetune = bool(_profile and _profile.inference_profile in ("single-gpu", "dual-gpu"))
USER_CFG = CONFIG_DIR / "user.yaml" USER_CFG = CONFIG_DIR / "user.yaml"
SERVER_CFG = CONFIG_DIR / "server.yaml" # Server config is always repo-level — it controls the container, not the user
SERVER_CFG_EXAMPLE = CONFIG_DIR / "server.yaml.example" SERVER_CFG = Path(__file__).parent.parent.parent / "config" / "server.yaml"
SERVER_CFG_EXAMPLE = Path(__file__).parent.parent.parent / "config" / "server.yaml.example"
_dev_mode = _os.getenv("DEV_MODE", "").lower() in ("true", "1", "yes") _dev_mode = _os.getenv("DEV_MODE", "").lower() in ("true", "1", "yes")
_u_for_dev = yaml.safe_load(USER_CFG.read_text()) or {} if USER_CFG.exists() else {} _u_for_dev = yaml.safe_load(USER_CFG.read_text()) or {} if USER_CFG.exists() else {}
@ -587,6 +589,23 @@ def _upload_resume_widget(key_prefix: str) -> None:
) )
with tab_resume: with tab_resume:
# ── LinkedIn import ───────────────────────────────────────────────────────
_li_data = st.session_state.pop("_linkedin_extracted", None)
if _li_data:
# Merge imported data into resume YAML — only bootstrap empty fields,
# never overwrite existing detail with sparse LinkedIn data
existing = load_yaml(RESUME_PATH)
existing.update({k: v for k, v in _li_data.items() if v and not existing.get(k)})
RESUME_PATH.parent.mkdir(parents=True, exist_ok=True)
save_yaml(RESUME_PATH, existing)
st.success("LinkedIn data applied to resume profile.")
st.rerun()
with st.expander("🔗 Import from LinkedIn", expanded=False):
from app.components.linkedin_import import render_linkedin_tab
_tab_tier = _profile.tier if _profile else "free"
render_linkedin_tab(config_dir=CONFIG_DIR, tier=_tab_tier)
st.caption( st.caption(
f"Edit {_name}'s application profile. " f"Edit {_name}'s application profile. "
"Bullets are used as paste-able shortcuts in the Apply Workspace." "Bullets are used as paste-able shortcuts in the Apply Workspace."
@ -867,6 +886,14 @@ with tab_resume:
with tab_system: with tab_system:
st.caption("Infrastructure, LLM backends, integrations, and service connections.") st.caption("Infrastructure, LLM backends, integrations, and service connections.")
if CLOUD_MODE:
st.info(
"**Your instance is managed by CircuitForge.**\n\n"
"Infrastructure, LLM backends, and service settings are configured by the platform. "
"To change your plan or billing, visit your [account page](https://circuitforge.tech/account)."
)
st.stop()
# ── File Paths & Inference ──────────────────────────────────────────────── # ── File Paths & Inference ────────────────────────────────────────────────
with st.expander("📁 File Paths & Inference Profile"): with st.expander("📁 File Paths & Inference Profile"):
_su = _yaml_up.safe_load(USER_CFG.read_text()) or {} if USER_CFG.exists() else {} _su = _yaml_up.safe_load(USER_CFG.read_text()) or {} if USER_CFG.exists() else {}
@ -1464,6 +1491,13 @@ with tab_finetune:
with tab_license: with tab_license:
st.subheader("🔑 License") st.subheader("🔑 License")
if CLOUD_MODE:
_cloud_tier = st.session_state.get("cloud_tier", "free")
st.success(f"**{_cloud_tier.title()} tier** — managed via your CircuitForge account")
st.caption("Your plan is tied to your account and applied automatically.")
st.page_link("https://circuitforge.tech/account", label="Manage plan →", icon="🔗")
st.stop()
from scripts.license import ( from scripts.license import (
verify_local as _verify_local, verify_local as _verify_local,
activate as _activate, activate as _activate,

View file

@ -389,7 +389,7 @@ with col_tools:
st.markdown("---") st.markdown("---")
else: else:
st.warning("Resume YAML not found — check that AIHawk is cloned.") st.warning("Resume profile not found — complete setup or upload a resume in Settings → Resume Profile.")
# ── Application Q&A ─────────────────────────────────────────────────────── # ── Application Q&A ───────────────────────────────────────────────────────
with st.expander("💬 Answer Application Questions"): with st.expander("💬 Answer Application Questions"):

View file

@ -2,6 +2,52 @@
Unscheduled ideas and deferred features. Roughly grouped by area. Unscheduled ideas and deferred features. Roughly grouped by area.
See also: `circuitforge-plans/shared/2026-03-07-launch-checklist.md` for pre-launch blockers
(legal docs, Stripe live keys, website deployment, demo DB ownership fix).
---
## Launch Blockers (tracked in shared launch checklist)
- **ToS + Refund Policy** — required before live Stripe charges. Files go in `website/content/legal/`.
- **Stripe live key rotation** — swap test keys to live in `website/.env` (zero code changes).
- **Website deployment to bastion** — Caddy route for Nuxt frontend at `circuitforge.tech`.
- **Demo DB ownership**`demo/data/staging.db` is root-owned (Docker artifact); fix with `sudo chown alan:alan` then re-run `demo/seed_demo.py`.
---
## Post-Launch / Infrastructure
- **Accessibility Statement** — WCAG 2.1 conformance doc at `website/content/legal/accessibility.md`. High credibility value for ND audience.
- **Data deletion request process** — published procedure at `website/content/legal/data-deletion.md` (GDPR/CCPA; references `privacy@circuitforge.tech`).
- **Uptime Kuma monitors** — 6 monitors need to be added manually (website, Heimdall, demo, Directus, Forgejo, Peregrine container health).
- **Directus admin password rotation** — change from `changeme-set-via-ui-on-first-run` before website goes public.
---
## Discovery — Community Scraper Plugin System
Design doc: `circuitforge-plans/peregrine/2026-03-07-community-scraper-plugin-design.md`
**Summary:** Add a `scripts/plugins/` directory with auto-discovery and a documented MIT-licensed
plugin API. Separates CF-built custom scrapers (paid, BSL 1.1, in `scripts/custom_boards/`) from
community-contributed and CF-freebie scrapers (free, MIT, in `scripts/plugins/`).
**Implementation tasks:**
- [ ] Add `scripts/plugins/` with `__init__.py`, `README.md`, and `example_plugin.py`
- [ ] Add `config/plugins/` directory with `.gitkeep`; gitignore `config/plugins/*.yaml` (not `.example`)
- [ ] Update `discover.py`: `load_plugins()` auto-discovery + tier gate (`custom_boards` = paid, `plugins` = free)
- [ ] Update `search_profiles.yaml` schema: add `plugins:` list + `plugin_config:` block
- [ ] Migrate `scripts/custom_boards/craigslist.py``scripts/plugins/craigslist.py` (CF freebie)
- [ ] Settings UI: render `CONFIG_SCHEMA` fields for installed plugins (Settings → Search)
- [ ] Rewrite `docs/developer-guide/adding-scrapers.md` to document the plugin API
- [ ] Add `scripts/plugins/LICENSE` (MIT) to make the dual-license split explicit
**CF freebie candidates** (future, after plugin system ships):
- Dice.com (tech-focused, no API key)
- We Work Remotely (remote-only, clean HTML)
- Wellfound / AngelList (startup roles)
--- ---
## Settings / Data Management ## Settings / Data Management

View file

@ -0,0 +1,56 @@
# scripts/linkedin_parser.py
"""
LinkedIn staging file reader.
parse_stage(stage_path) reads an existing staging file and returns
a structured dict. For url_scrape sources it re-runs the HTML parser
so improvements to linkedin_utils take effect without a new scrape.
"""
from __future__ import annotations
import json
from pathlib import Path
from scripts.linkedin_utils import parse_html
def parse_stage(stage_path: Path) -> tuple[dict, str]:
"""
Read and return the extracted profile data from a staging file.
For url_scrape sources: re-runs parse_html on stored raw_html so
parser improvements are applied without re-scraping.
Returns (extracted_dict, error_string).
On any failure returns ({}, error_message).
"""
if not stage_path.exists():
return {}, f"No staged data found at {stage_path}"
try:
data = json.loads(stage_path.read_text())
except Exception as e:
return {}, f"Could not read staging file: {e}"
source = data.get("source")
raw_html = data.get("raw_html")
if source == "url_scrape" and raw_html:
# Re-run the parser — picks up any selector improvements
extracted = parse_html(raw_html)
# Preserve linkedin URL — parse_html always returns "" for this field
extracted["linkedin"] = extracted.get("linkedin") or data.get("url") or ""
# Write updated extracted back to staging file atomically
data["extracted"] = extracted
tmp = stage_path.with_suffix(".tmp")
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2))
tmp.rename(stage_path)
return extracted, ""
extracted = data.get("extracted")
if not extracted:
return {}, "Staging file has no extracted data"
return extracted, ""

169
scripts/linkedin_scraper.py Normal file
View file

@ -0,0 +1,169 @@
# scripts/linkedin_scraper.py
"""
LinkedIn profile scraper.
Two entry points:
scrape_profile(url, stage_path) Playwright headless fetch
parse_export_zip(zip_bytes, stage_path) LinkedIn data archive CSV parse
Both write a staging file at stage_path and return the extracted dict.
"""
from __future__ import annotations
import csv
import io
import json
import re
import zipfile
from datetime import datetime, timezone
from pathlib import Path
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
from scripts.linkedin_utils import parse_html
_LINKEDIN_PROFILE_RE = re.compile(r"https?://(www\.)?linkedin\.com/in/", re.I)
_CHROME_UA = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
)
def _write_stage(stage_path: Path, payload: dict) -> None:
"""Atomic write: write to .tmp then rename to avoid partial reads."""
tmp = stage_path.with_suffix(".tmp")
tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2))
tmp.rename(stage_path)
def scrape_profile(url: str, stage_path: Path) -> dict:
"""
Fetch a public LinkedIn profile via Playwright headless Chrome.
Raises ValueError if url is not a linkedin.com/in/ URL.
Raises RuntimeError on scrape failure (timeout, blocked, etc.).
Returns the extracted dict and writes the staging file.
"""
if not _LINKEDIN_PROFILE_RE.match(url):
raise ValueError(
f"Expected a LinkedIn profile URL (linkedin.com/in/…), got: {url}"
)
try:
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=True)
page = browser.new_page(user_agent=_CHROME_UA)
page.goto(url, timeout=30_000)
page.wait_for_selector(
"h1, section[data-section], #experience, #about",
timeout=20_000,
)
raw_html = page.content()
browser.close()
except PWTimeout:
raise RuntimeError(
"LinkedIn did not load in time — the request may have been blocked. "
"Try the data export option instead."
)
except Exception as e:
raise RuntimeError(f"LinkedIn scrape failed: {e}") from e
extracted = parse_html(raw_html)
extracted["linkedin"] = url
_write_stage(stage_path, {
"url": url,
"scraped_at": datetime.now(timezone.utc).isoformat(),
"source": "url_scrape",
"raw_html": raw_html,
"extracted": extracted,
})
return extracted
def parse_export_zip(zip_bytes: bytes, stage_path: Path) -> dict:
"""
Parse a LinkedIn data export archive.
zip_bytes: raw zip bytes callers do: zip_bytes = uploaded_file.read()
Returns the extracted dict and writes the staging file.
Missing CSV files are skipped silently.
"""
extracted: dict = {
"name": "", "email": "", "phone": "", "linkedin": "",
"career_summary": "",
"experience": [], "education": [], "skills": [], "achievements": [],
}
try:
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
names_in_zip = {n.lower(): n for n in zf.namelist()}
def _read_csv(filename: str) -> list[dict]:
key = filename.lower()
if key not in names_in_zip:
return []
text = zf.read(names_in_zip[key]).decode("utf-8-sig", errors="replace")
return list(csv.DictReader(io.StringIO(text)))
for row in _read_csv("Profile.csv"):
first = row.get("First Name", "").strip()
last = row.get("Last Name", "").strip()
extracted["name"] = f"{first} {last}".strip()
extracted["email"] = row.get("Email Address", "").strip()
extracted["career_summary"] = row.get("Summary", "").strip()
break
for row in _read_csv("Position.csv"):
company = row.get("Company Name", "").strip()
title = row.get("Title", "").strip()
desc = row.get("Description", "").strip()
start = row.get("Started On", "").strip()
end = row.get("Finished On", "").strip()
end_label = end if end else ("Present" if start else "")
date_range = f"{start} {end_label}".strip(" ") if (start or end) else ""
bullets = [d.strip() for d in re.split(r"[.•\n]+", desc) if d.strip() and len(d.strip()) > 3]
if company or title:
extracted["experience"].append({
"company": company,
"title": title,
"date_range": date_range,
"bullets": bullets,
})
for row in _read_csv("Education.csv"):
school = row.get("School Name", "").strip()
degree = row.get("Degree Name", "").strip()
field = row.get("Field Of Study", "").strip()
start = row.get("Start Date", "").strip()
end = row.get("End Date", "").strip()
dates = f"{start} {end}".strip(" ") if start or end else ""
if school or degree:
extracted["education"].append({
"school": school,
"degree": degree,
"field": field,
"dates": dates,
})
for row in _read_csv("Skills.csv"):
skill = row.get("Name", "").strip()
if skill:
extracted["skills"].append(skill)
for row in _read_csv("Certifications.csv"):
name = row.get("Name", "").strip()
if name:
extracted["achievements"].append(name)
except zipfile.BadZipFile as e:
raise ValueError(f"Not a valid zip file: {e}")
_write_stage(stage_path, {
"url": None,
"scraped_at": datetime.now(timezone.utc).isoformat(),
"source": "export_zip",
"raw_html": None,
"extracted": extracted,
})
return extracted

194
scripts/linkedin_utils.py Normal file
View file

@ -0,0 +1,194 @@
# scripts/linkedin_utils.py
"""
LinkedIn profile HTML parser.
Extracts structured profile data from a raw LinkedIn public profile page.
No Playwright dependency importable by both linkedin_scraper and linkedin_parser.
Selectors target the 2024-2025 LinkedIn public profile DOM.
When LinkedIn changes their markup, update the selector lists here only.
Each section uses ordered fallbacks first matching selector wins.
"""
from __future__ import annotations
import re
from bs4 import BeautifulSoup
# ── Selector fallback lists ────────────────────────────────────────────────────
_NAME_SELECTORS = [
"h1.top-card-layout__title",
"h1[class*='title']",
".pv-top-card--list h1",
"h1",
]
_SUMMARY_SELECTORS = [
"section[data-section='about'] .show-more-less-text__text--less",
"section[data-section='about'] p",
"#about ~ * p.show-more-less-text__text--less",
".pv-about-section p",
]
_EXPERIENCE_ITEM_SELECTORS = [
"section[data-section='experience'] li.experience-item",
"section[data-section='experience'] li",
"#experience-section li",
"#experience ~ * li",
]
_EXP_TITLE_SELECTORS = ["span.experience-item__title", "span[class*='title']", "h3"]
_EXP_COMPANY_SELECTORS = ["span.experience-item__subtitle", "span[class*='subtitle']", "p[class*='company']"]
_EXP_DATE_SELECTORS = ["span.date-range", "[class*='date-range']", "span[class*='duration']"]
_EXP_DESC_SELECTORS = [".show-more-less-text__text--less", "p[class*='description']", "p"]
_EDUCATION_ITEM_SELECTORS = [
"section[data-section='education'] li.education__list-item",
"section[data-section='education'] li",
"#education ~ * li",
]
_EDU_SCHOOL_SELECTORS = ["h3.education__school-name", "h3[class*='school']", "h3"]
_EDU_DEGREE_SELECTORS = ["span.education__item--degree-name", "span[class*='degree']", "p[class*='degree']"]
_EDU_DATES_SELECTORS = ["span.education__item--duration", "span[class*='duration']", "time"]
_SKILLS_SELECTORS = [
"section[data-section='skills'] span.mr1",
"section[data-section='skills'] li span[class*='bold']",
"section[data-section='skills'] li span",
"#skills ~ * li span",
]
_CERT_ITEM_SELECTORS = [
"section[data-section='certifications'] li",
"#certifications ~ * li",
"#licenses_and_certifications ~ * li",
]
_CERT_NAME_SELECTORS = ["h3.certifications__name", "h3[class*='name']", "h3", "span[class*='title']"]
# ── Helpers ───────────────────────────────────────────────────────────────────
def _select_first(soup, selectors):
for sel in selectors:
try:
el = soup.select_one(sel)
if el and el.get_text(strip=True):
return el.get_text(strip=True)
except Exception:
continue
return ""
def _select_all(soup, selectors):
for sel in selectors:
try:
els = soup.select(sel)
if els:
return els
except Exception:
continue
return []
def _split_bullets(text):
parts = re.split(r"[•·]\s*|(?<=\s)\s+|\n+", text)
return [p.strip() for p in parts if p.strip() and len(p.strip()) > 3]
def _date_range_text(item):
for sel in _EXP_DATE_SELECTORS:
try:
el = item.select_one(sel)
if el:
times = [t.get_text(strip=True) for t in el.find_all("time")]
if times:
return " ".join(times)
text = el.get_text(strip=True)
if text:
return text
except Exception:
continue
return ""
# ── Public API ────────────────────────────────────────────────────────────────
def parse_html(raw_html: str) -> dict:
"""
Extract structured profile data from a raw LinkedIn public profile HTML page.
Returns a dict with keys: name, email, phone, linkedin, career_summary,
experience[], education[], skills[], achievements[]
Never raises returns empty values for sections that cannot be parsed.
"""
soup = BeautifulSoup(raw_html, "lxml")
name = _select_first(soup, _NAME_SELECTORS)
career_summary = _select_first(soup, _SUMMARY_SELECTORS)
experience = []
for item in _select_all(soup, _EXPERIENCE_ITEM_SELECTORS):
title = _select_first(item, _EXP_TITLE_SELECTORS)
company = _select_first(item, _EXP_COMPANY_SELECTORS)
dates = _date_range_text(item)
desc_el = None
for sel in _EXP_DESC_SELECTORS:
try:
desc_el = item.select_one(sel)
if desc_el:
break
except Exception:
continue
bullets = _split_bullets(desc_el.get_text(" ", strip=True)) if desc_el else []
if title or company:
experience.append({
"company": company,
"title": title,
"date_range": dates,
"bullets": bullets,
})
education = []
for item in _select_all(soup, _EDUCATION_ITEM_SELECTORS):
school = _select_first(item, _EDU_SCHOOL_SELECTORS)
degree = _select_first(item, _EDU_DEGREE_SELECTORS)
dates = ""
for sel in _EDU_DATES_SELECTORS:
try:
el = item.select_one(sel)
if el:
dates = el.get_text(strip=True)
break
except Exception:
continue
if school or degree:
education.append({
"school": school,
"degree": degree,
"field": "",
"dates": dates,
})
skills = [el.get_text(strip=True) for el in _select_all(soup, _SKILLS_SELECTORS)
if el.get_text(strip=True)]
skills = list(dict.fromkeys(skills))
achievements = []
for item in _select_all(soup, _CERT_ITEM_SELECTORS):
label = _select_first(item, _CERT_NAME_SELECTORS)
if label:
achievements.append(label)
return {
"name": name,
"email": "",
"phone": "",
"linkedin": "",
"career_summary": career_summary,
"experience": experience,
"education": education,
"skills": skills,
"achievements": achievements,
}

View file

@ -83,10 +83,10 @@ def _extract_career_summary(source: Path) -> str:
def _extract_personal_info(source: Path) -> dict: def _extract_personal_info(source: Path) -> dict:
"""Extract personal info from aihawk resume yaml.""" """Extract personal info from resume yaml."""
resume = source / "config" / "plain_text_resume.yaml" resume = source / "config" / "plain_text_resume.yaml"
if not resume.exists(): if not resume.exists():
resume = source / "aihawk" / "data_folder" / "plain_text_resume.yaml" resume = source / "aihawk" / "data_folder" / "plain_text_resume.yaml" # legacy path
if not resume.exists(): if not resume.exists():
return {} return {}
data = _load_yaml(resume) data = _load_yaml(resume)
@ -196,7 +196,7 @@ def _copy_configs(source: Path, dest: Path, apply: bool) -> None:
def _copy_aihawk_resume(source: Path, dest: Path, apply: bool) -> None: def _copy_aihawk_resume(source: Path, dest: Path, apply: bool) -> None:
print("\n── Copying AIHawk resume profile") print("\n── Copying resume profile")
src = source / "config" / "plain_text_resume.yaml" src = source / "config" / "plain_text_resume.yaml"
if not src.exists(): if not src.exists():
src = source / "aihawk" / "data_folder" / "plain_text_resume.yaml" src = source / "aihawk" / "data_folder" / "plain_text_resume.yaml"

110
tests/fixtures/linkedin_profile.html vendored Normal file
View file

@ -0,0 +1,110 @@
<!-- tests/fixtures/linkedin_profile.html -->
<!DOCTYPE html>
<html>
<head><title>Alan Weinstock | LinkedIn</title></head>
<body>
<!-- Name and headline -->
<div class="top-card-layout__entity-info">
<h1 class="top-card-layout__title">Alan Weinstock</h1>
<h2 class="top-card-layout__headline">Staff Engineer · Open to Work</h2>
</div>
<!-- About / Summary -->
<section data-section="about">
<div class="core-section-container__content">
<p class="show-more-less-text__text--less">
Experienced engineer with 10 years in embedded systems and DevOps.
Passionate about open-source and accessibility tooling.
</p>
</div>
</section>
<!-- Experience -->
<section data-section="experience">
<ul>
<li class="experience-item">
<div class="experience-item__info">
<span class="experience-item__title">Staff Engineer</span>
<span class="experience-item__subtitle">Acme Corp</span>
<span class="experience-item__duration">
<span class="date-range">
<time>Jan 2022</time>
<time>Present</time>
</span>
</span>
</div>
<div class="experience-item__description">
<p class="show-more-less-text__text--less">
Led migration of monolith to microservices. &bull;
Reduced p99 latency by 40%. &bull;
Mentored three junior engineers.
</p>
</div>
</li>
<li class="experience-item">
<div class="experience-item__info">
<span class="experience-item__title">Senior Engineer</span>
<span class="experience-item__subtitle">Beta Industries</span>
<span class="experience-item__duration">
<span class="date-range">
<time>Mar 2019</time>
<time>Dec 2021</time>
</span>
</span>
</div>
<div class="experience-item__description">
<p class="show-more-less-text__text--less">
Designed CI/CD pipeline. &bull; Maintained Kubernetes clusters.
</p>
</div>
</li>
</ul>
</section>
<!-- Education -->
<section data-section="education">
<ul>
<li class="education__list-item">
<div class="education__item--degree-info">
<h3 class="education__school-name">State University</h3>
<span class="education__item--degree-name">B.S. Computer Science</span>
<span class="education__item--duration">2010 2014</span>
</div>
</li>
</ul>
</section>
<!-- Skills -->
<section data-section="skills">
<ul>
<li class="skills-section__list-item">
<div class="skills-section__skill">
<span class="mr1 t-bold">Python</span>
</div>
</li>
<li class="skills-section__list-item">
<div class="skills-section__skill">
<span class="mr1 t-bold">Kubernetes</span>
</div>
</li>
<li class="skills-section__list-item">
<div class="skills-section__skill">
<span class="mr1 t-bold">PostgreSQL</span>
</div>
</li>
</ul>
</section>
<!-- Certifications -->
<section data-section="certifications">
<ul>
<li class="certifications__list-item">
<h3 class="certifications__name">AWS Solutions Architect Associate</h3>
</li>
<li class="certifications__list-item">
<h3 class="certifications__name">CKA: Certified Kubernetes Administrator</h3>
</li>
</ul>
</section>
</body>
</html>

View file

@ -0,0 +1,96 @@
# tests/test_linkedin_parser.py
import json
import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
FIXTURE_HTML = (Path(__file__).parent / "fixtures" / "linkedin_profile.html").read_text()
def _write_url_stage(path: Path) -> None:
"""Write a minimal url_scrape staging file with intentionally stale extracted data."""
path.write_text(json.dumps({
"url": "https://linkedin.com/in/alanw",
"scraped_at": "2026-03-12T14:30:00+00:00",
"source": "url_scrape",
"raw_html": FIXTURE_HTML,
"extracted": {
"name": "Alan Weinstock (stale)", # stale — re-parse should update this
"career_summary": "",
"experience": [], "education": [], "skills": [], "achievements": [],
"email": "", "phone": "", "linkedin": "",
},
}))
def _write_zip_stage(path: Path) -> None:
"""Write a minimal export_zip staging file (no raw_html)."""
path.write_text(json.dumps({
"url": None,
"scraped_at": "2026-03-12T14:30:00+00:00",
"source": "export_zip",
"raw_html": None,
"extracted": {
"name": "Alan Weinstock",
"career_summary": "Engineer",
"experience": [{"company": "Acme", "title": "SE", "date_range": "", "bullets": []}],
"education": [], "skills": ["Python"], "achievements": [],
"email": "alan@example.com", "phone": "", "linkedin": "",
},
}))
def test_parse_stage_reruns_parser_on_url_scrape():
"""parse_stage re-runs parse_html from raw_html, ignoring stale extracted data."""
from scripts.linkedin_parser import parse_stage
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
_write_url_stage(stage)
result, err = parse_stage(stage)
assert err == ""
assert result["name"] == "Alan Weinstock" # fresh parse, not "(stale)"
assert len(result["experience"]) == 2
def test_parse_stage_returns_stored_data_for_zip():
"""parse_stage returns stored extracted dict for export_zip (no raw_html to re-parse)."""
from scripts.linkedin_parser import parse_stage
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
_write_zip_stage(stage)
result, err = parse_stage(stage)
assert err == ""
assert result["name"] == "Alan Weinstock"
assert result["email"] == "alan@example.com"
assert "Python" in result["skills"]
def test_parse_stage_missing_file_returns_error():
from scripts.linkedin_parser import parse_stage
result, err = parse_stage(Path("/nonexistent/stage.json"))
assert result == {}
assert err != ""
def test_parse_stage_corrupted_file_returns_error():
from scripts.linkedin_parser import parse_stage
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
stage.write_text("not valid json {{{{")
result, err = parse_stage(stage)
assert result == {}
assert err != ""
def test_parse_stage_updates_staging_file_after_reparse():
"""After re-parsing, the staging file's extracted dict is updated."""
from scripts.linkedin_parser import parse_stage
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
_write_url_stage(stage)
parse_stage(stage)
updated = json.loads(stage.read_text())
assert updated["extracted"]["name"] == "Alan Weinstock"
assert len(updated["extracted"]["experience"]) == 2

View file

@ -0,0 +1,213 @@
# tests/test_linkedin_scraper.py
import io
import json
import sys
import zipfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import tempfile
sys.path.insert(0, str(Path(__file__).parent.parent))
def test_invalid_url_raises():
from scripts.linkedin_scraper import scrape_profile
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
try:
scrape_profile("https://linkedin.com/company/acme", stage)
assert False, "should have raised"
except ValueError as e:
assert "linkedin.com/in/" in str(e)
def test_non_linkedin_url_raises():
from scripts.linkedin_scraper import scrape_profile
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
try:
scrape_profile("https://example.com/profile", stage)
assert False, "should have raised"
except ValueError:
pass
def test_valid_linkedin_url_accepted():
from scripts.linkedin_scraper import scrape_profile
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
fixture_html = (Path(__file__).parent / "fixtures" / "linkedin_profile.html").read_text()
mock_page = MagicMock()
mock_page.content.return_value = fixture_html
mock_browser = MagicMock()
mock_browser.new_page.return_value = mock_page
mock_playwright = MagicMock()
mock_playwright.chromium.launch.return_value = mock_browser
with patch("scripts.linkedin_scraper.sync_playwright") as mock_sync_pw:
mock_sync_pw.return_value.__enter__ = MagicMock(return_value=mock_playwright)
mock_sync_pw.return_value.__exit__ = MagicMock(return_value=False)
result = scrape_profile("https://linkedin.com/in/alanw", stage)
assert result["name"] == "Alan Weinstock"
assert stage.exists()
def test_scrape_profile_writes_staging_file():
from scripts.linkedin_scraper import scrape_profile
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
fixture_html = (Path(__file__).parent / "fixtures" / "linkedin_profile.html").read_text()
mock_page = MagicMock()
mock_page.content.return_value = fixture_html
mock_browser = MagicMock()
mock_browser.new_page.return_value = mock_page
mock_playwright = MagicMock()
mock_playwright.chromium.launch.return_value = mock_browser
with patch("scripts.linkedin_scraper.sync_playwright") as mock_sync_pw:
mock_sync_pw.return_value.__enter__ = MagicMock(return_value=mock_playwright)
mock_sync_pw.return_value.__exit__ = MagicMock(return_value=False)
scrape_profile("https://linkedin.com/in/alanw", stage)
data = json.loads(stage.read_text())
assert data["source"] == "url_scrape"
assert data["url"] == "https://linkedin.com/in/alanw"
assert "raw_html" in data
assert "extracted" in data
assert data["extracted"]["name"] == "Alan Weinstock"
def _make_export_zip() -> bytes:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("Position.csv",
"Company Name,Title,Description,Started On,Finished On\n"
"Acme Corp,Staff Engineer,Led migration. Built CI/CD.,Jan 2022,\n"
"Beta Industries,Senior Engineer,Maintained clusters.,Mar 2019,Dec 2021\n"
)
zf.writestr("Education.csv",
"School Name,Degree Name,Field Of Study,Start Date,End Date\n"
"State University,Bachelor of Science,Computer Science,2010,2014\n"
)
zf.writestr("Skills.csv",
"Name,Description\n"
"Python,\n"
"Kubernetes,\n"
)
zf.writestr("Profile.csv",
"First Name,Last Name,Headline,Summary,Email Address\n"
"Alan,Weinstock,Staff Engineer,Experienced engineer.,alan@example.com\n"
)
return buf.getvalue()
def test_parse_export_zip_experience():
from scripts.linkedin_scraper import parse_export_zip
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
result = parse_export_zip(_make_export_zip(), stage)
assert len(result["experience"]) == 2
assert result["experience"][0]["company"] == "Acme Corp"
assert result["experience"][0]["title"] == "Staff Engineer"
def test_parse_export_zip_education():
from scripts.linkedin_scraper import parse_export_zip
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
result = parse_export_zip(_make_export_zip(), stage)
assert result["education"][0]["school"] == "State University"
assert result["education"][0]["field"] == "Computer Science"
def test_parse_export_zip_skills():
from scripts.linkedin_scraper import parse_export_zip
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
result = parse_export_zip(_make_export_zip(), stage)
assert "Python" in result["skills"]
def test_parse_export_zip_name_and_email():
from scripts.linkedin_scraper import parse_export_zip
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
result = parse_export_zip(_make_export_zip(), stage)
assert result["name"] == "Alan Weinstock"
assert result["email"] == "alan@example.com"
def test_parse_export_zip_missing_csv_does_not_raise():
from scripts.linkedin_scraper import parse_export_zip
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("Profile.csv",
"First Name,Last Name,Headline,Summary,Email Address\n"
"Alan,Weinstock,Engineer,Summary here.,alan@example.com\n"
)
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
result = parse_export_zip(buf.getvalue(), stage)
assert result["name"] == "Alan Weinstock"
assert result["experience"] == []
def test_parse_export_zip_writes_staging_file():
from scripts.linkedin_scraper import parse_export_zip
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
parse_export_zip(_make_export_zip(), stage)
data = json.loads(stage.read_text())
assert data["source"] == "export_zip"
assert data["raw_html"] is None
def test_scrape_profile_sets_linkedin_url():
from scripts.linkedin_scraper import scrape_profile
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
fixture_html = (Path(__file__).parent / "fixtures" / "linkedin_profile.html").read_text()
mock_page = MagicMock()
mock_page.content.return_value = fixture_html
mock_browser = MagicMock()
mock_browser.new_page.return_value = mock_page
mock_playwright = MagicMock()
mock_playwright.chromium.launch.return_value = mock_browser
with patch("scripts.linkedin_scraper.sync_playwright") as mock_sync_pw:
mock_sync_pw.return_value.__enter__ = MagicMock(return_value=mock_playwright)
mock_sync_pw.return_value.__exit__ = MagicMock(return_value=False)
result = scrape_profile("https://linkedin.com/in/alanw", stage)
assert result["linkedin"] == "https://linkedin.com/in/alanw"
def test_parse_export_zip_bad_zip_raises():
from scripts.linkedin_scraper import parse_export_zip
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
try:
parse_export_zip(b"not a zip file at all", stage)
assert False, "should have raised"
except ValueError as e:
assert "zip" in str(e).lower()
def test_parse_export_zip_current_job_shows_present():
"""Empty Finished On renders as ' Present', not truncated."""
from scripts.linkedin_scraper import parse_export_zip
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("Position.csv",
"Company Name,Title,Description,Started On,Finished On\n"
"Acme Corp,Staff Engineer,,Jan 2022,\n"
)
zf.writestr("Profile.csv",
"First Name,Last Name,Headline,Summary,Email Address\n"
"Alan,Weinstock,Engineer,,\n"
)
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
result = parse_export_zip(buf.getvalue(), stage)
assert result["experience"][0]["date_range"] == "Jan 2022 Present"

View file

@ -0,0 +1,73 @@
# tests/test_linkedin_utils.py
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
FIXTURE = (Path(__file__).parent / "fixtures" / "linkedin_profile.html").read_text()
def test_parse_html_name():
from scripts.linkedin_utils import parse_html
result = parse_html(FIXTURE)
assert result["name"] == "Alan Weinstock"
def test_parse_html_summary():
from scripts.linkedin_utils import parse_html
result = parse_html(FIXTURE)
assert "embedded systems" in result["career_summary"]
def test_parse_html_experience_count():
from scripts.linkedin_utils import parse_html
result = parse_html(FIXTURE)
assert len(result["experience"]) == 2
def test_parse_html_experience_fields():
from scripts.linkedin_utils import parse_html
result = parse_html(FIXTURE)
first = result["experience"][0]
assert first["company"] == "Acme Corp"
assert first["title"] == "Staff Engineer"
assert "Jan 2022" in first["date_range"]
assert len(first["bullets"]) >= 2
assert any("latency" in b for b in first["bullets"])
def test_parse_html_education():
from scripts.linkedin_utils import parse_html
result = parse_html(FIXTURE)
assert len(result["education"]) == 1
edu = result["education"][0]
assert edu["school"] == "State University"
assert "Computer Science" in edu["degree"]
def test_parse_html_skills():
from scripts.linkedin_utils import parse_html
result = parse_html(FIXTURE)
assert "Python" in result["skills"]
assert "Kubernetes" in result["skills"]
def test_parse_html_achievements():
from scripts.linkedin_utils import parse_html
result = parse_html(FIXTURE)
assert any("AWS" in a for a in result["achievements"])
def test_parse_html_missing_section_returns_empty():
"""A profile with no skills section returns empty skills list, not an error."""
from scripts.linkedin_utils import parse_html
html_no_skills = FIXTURE.replace('data-section="skills"', 'data-section="hidden"')
result = parse_html(html_no_skills)
assert result["skills"] == []
def test_parse_html_returns_all_keys():
from scripts.linkedin_utils import parse_html
result = parse_html(FIXTURE)
for key in ("name", "email", "phone", "linkedin", "career_summary",
"experience", "education", "skills", "achievements"):
assert key in result, f"Missing key: {key}"