From f759f5fbc063ffeafda2fd2c6113ea8d31fb10c9 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Fri, 13 Mar 2026 01:06:39 -0700 Subject: [PATCH] feat(linkedin): add scraper (Playwright + export zip) with URL validation --- scripts/linkedin_scraper.py | 167 +++++++++++++++++++++++++++++++++ tests/test_linkedin_scraper.py | 165 ++++++++++++++++++++++++++++++++ 2 files changed, 332 insertions(+) create mode 100644 scripts/linkedin_scraper.py create mode 100644 tests/test_linkedin_scraper.py diff --git a/scripts/linkedin_scraper.py b/scripts/linkedin_scraper.py new file mode 100644 index 0000000..5bf9b6a --- /dev/null +++ b/scripts/linkedin_scraper.py @@ -0,0 +1,167 @@ +# scripts/linkedin_scraper.py +""" +LinkedIn profile scraper. + +Two entry points: + scrape_profile(url, stage_path) — Playwright headless fetch + parse_export_zip(zip_bytes, stage_path) — LinkedIn data archive CSV parse + +Both write a staging file at stage_path and return the extracted dict. +""" +from __future__ import annotations + +import csv +import io +import json +import re +import zipfile +from datetime import datetime, timezone +from pathlib import Path + +from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout +from scripts.linkedin_utils import parse_html + +_LINKEDIN_PROFILE_RE = re.compile(r"https?://(www\.)?linkedin\.com/in/", re.I) + +_CHROME_UA = ( + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" +) + + +def _write_stage(stage_path: Path, payload: dict) -> None: + """Atomic write: write to .tmp then rename to avoid partial reads.""" + tmp = stage_path.with_suffix(".tmp") + tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2)) + tmp.rename(stage_path) + + +def scrape_profile(url: str, stage_path: Path) -> dict: + """ + Fetch a public LinkedIn profile via Playwright headless Chrome. + + Raises ValueError if url is not a linkedin.com/in/ URL. + Raises RuntimeError on scrape failure (timeout, blocked, etc.). + Returns the extracted dict and writes the staging file. + """ + if not _LINKEDIN_PROFILE_RE.match(url): + raise ValueError( + f"Expected a LinkedIn profile URL (linkedin.com/in/…), got: {url}" + ) + + try: + with sync_playwright() as pw: + browser = pw.chromium.launch(headless=True) + page = browser.new_page(user_agent=_CHROME_UA) + page.goto(url, timeout=30_000) + page.wait_for_selector( + "h1, section[data-section], #experience, #about", + timeout=20_000, + ) + raw_html = page.content() + browser.close() + except PWTimeout: + raise RuntimeError( + "LinkedIn did not load in time — the request may have been blocked. " + "Try the data export option instead." + ) + + extracted = parse_html(raw_html) + extracted["linkedin"] = url + + _write_stage(stage_path, { + "url": url, + "scraped_at": datetime.now(timezone.utc).isoformat(), + "source": "url_scrape", + "raw_html": raw_html, + "extracted": extracted, + }) + return extracted + + +def parse_export_zip(zip_bytes: bytes, stage_path: Path) -> dict: + """ + Parse a LinkedIn data export archive. + + zip_bytes: raw zip bytes — callers do: zip_bytes = uploaded_file.read() + Returns the extracted dict and writes the staging file. + Missing CSV files are skipped silently. + """ + extracted: dict = { + "name": "", "email": "", "phone": "", "linkedin": "", + "career_summary": "", + "experience": [], "education": [], "skills": [], "achievements": [], + } + + try: + zf_handle = zipfile.ZipFile(io.BytesIO(zip_bytes)) + except zipfile.BadZipFile as e: + raise ValueError(f"Not a valid zip file: {e}") + + with zf_handle as zf: + names_in_zip = {n.lower(): n for n in zf.namelist()} + + def _read_csv(filename: str) -> list[dict]: + key = filename.lower() + if key not in names_in_zip: + return [] + text = zf.read(names_in_zip[key]).decode("utf-8-sig", errors="replace") + return list(csv.DictReader(io.StringIO(text))) + + for row in _read_csv("Profile.csv"): + first = row.get("First Name", "").strip() + last = row.get("Last Name", "").strip() + extracted["name"] = f"{first} {last}".strip() + extracted["email"] = row.get("Email Address", "").strip() + extracted["career_summary"] = row.get("Summary", "").strip() + break + + for row in _read_csv("Position.csv"): + company = row.get("Company Name", "").strip() + title = row.get("Title", "").strip() + desc = row.get("Description", "").strip() + start = row.get("Started On", "").strip() + end = row.get("Finished On", "").strip() + date_range = f"{start} – {end}".strip(" –") if start or end else "" + bullets = [d.strip() for d in re.split(r"[.•\n]+", desc) if d.strip() and len(d.strip()) > 3] + if company or title: + extracted["experience"].append({ + "company": company, + "title": title, + "date_range": date_range, + "bullets": bullets, + }) + + for row in _read_csv("Education.csv"): + school = row.get("School Name", "").strip() + degree = row.get("Degree Name", "").strip() + field = row.get("Field Of Study", "").strip() + start = row.get("Start Date", "").strip() + end = row.get("End Date", "").strip() + dates = f"{start} – {end}".strip(" –") if start or end else "" + if school or degree: + extracted["education"].append({ + "school": school, + "degree": degree, + "field": field, + "dates": dates, + }) + + for row in _read_csv("Skills.csv"): + skill = row.get("Name", "").strip() + if skill: + extracted["skills"].append(skill) + + for row in _read_csv("Certifications.csv"): + name = row.get("Name", "").strip() + if name: + extracted["achievements"].append(name) + + _write_stage(stage_path, { + "url": None, + "scraped_at": datetime.now(timezone.utc).isoformat(), + "source": "export_zip", + "raw_html": None, + "extracted": extracted, + }) + return extracted diff --git a/tests/test_linkedin_scraper.py b/tests/test_linkedin_scraper.py new file mode 100644 index 0000000..9d53042 --- /dev/null +++ b/tests/test_linkedin_scraper.py @@ -0,0 +1,165 @@ +# tests/test_linkedin_scraper.py +import io +import json +import sys +import zipfile +from pathlib import Path +from unittest.mock import MagicMock, patch +import tempfile + +sys.path.insert(0, str(Path(__file__).parent.parent)) + + +def test_invalid_url_raises(): + from scripts.linkedin_scraper import scrape_profile + with tempfile.TemporaryDirectory() as tmp: + stage = Path(tmp) / "stage.json" + try: + scrape_profile("https://linkedin.com/company/acme", stage) + assert False, "should have raised" + except ValueError as e: + assert "linkedin.com/in/" in str(e) + + +def test_non_linkedin_url_raises(): + from scripts.linkedin_scraper import scrape_profile + with tempfile.TemporaryDirectory() as tmp: + stage = Path(tmp) / "stage.json" + try: + scrape_profile("https://example.com/profile", stage) + assert False, "should have raised" + except ValueError: + pass + + +def test_valid_linkedin_url_accepted(): + from scripts.linkedin_scraper import scrape_profile + with tempfile.TemporaryDirectory() as tmp: + stage = Path(tmp) / "stage.json" + fixture_html = (Path(__file__).parent / "fixtures" / "linkedin_profile.html").read_text() + + mock_page = MagicMock() + mock_page.content.return_value = fixture_html + mock_browser = MagicMock() + mock_browser.new_page.return_value = mock_page + mock_playwright = MagicMock() + mock_playwright.chromium.launch.return_value = mock_browser + + with patch("scripts.linkedin_scraper.sync_playwright") as mock_sync_pw: + mock_sync_pw.return_value.__enter__ = MagicMock(return_value=mock_playwright) + mock_sync_pw.return_value.__exit__ = MagicMock(return_value=False) + result = scrape_profile("https://linkedin.com/in/alanw", stage) + + assert result["name"] == "Alan Weinstock" + assert stage.exists() + + +def test_scrape_profile_writes_staging_file(): + from scripts.linkedin_scraper import scrape_profile + with tempfile.TemporaryDirectory() as tmp: + stage = Path(tmp) / "stage.json" + fixture_html = (Path(__file__).parent / "fixtures" / "linkedin_profile.html").read_text() + + mock_page = MagicMock() + mock_page.content.return_value = fixture_html + mock_browser = MagicMock() + mock_browser.new_page.return_value = mock_page + mock_playwright = MagicMock() + mock_playwright.chromium.launch.return_value = mock_browser + + with patch("scripts.linkedin_scraper.sync_playwright") as mock_sync_pw: + mock_sync_pw.return_value.__enter__ = MagicMock(return_value=mock_playwright) + mock_sync_pw.return_value.__exit__ = MagicMock(return_value=False) + scrape_profile("https://linkedin.com/in/alanw", stage) + + data = json.loads(stage.read_text()) + assert data["source"] == "url_scrape" + assert data["url"] == "https://linkedin.com/in/alanw" + assert "raw_html" in data + assert "extracted" in data + assert data["extracted"]["name"] == "Alan Weinstock" + + +def _make_export_zip() -> bytes: + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr("Position.csv", + "Company Name,Title,Description,Started On,Finished On\n" + "Acme Corp,Staff Engineer,Led migration. Built CI/CD.,Jan 2022,\n" + "Beta Industries,Senior Engineer,Maintained clusters.,Mar 2019,Dec 2021\n" + ) + zf.writestr("Education.csv", + "School Name,Degree Name,Field Of Study,Start Date,End Date\n" + "State University,Bachelor of Science,Computer Science,2010,2014\n" + ) + zf.writestr("Skills.csv", + "Name,Description\n" + "Python,\n" + "Kubernetes,\n" + ) + zf.writestr("Profile.csv", + "First Name,Last Name,Headline,Summary,Email Address\n" + "Alan,Weinstock,Staff Engineer,Experienced engineer.,alan@example.com\n" + ) + return buf.getvalue() + + +def test_parse_export_zip_experience(): + from scripts.linkedin_scraper import parse_export_zip + with tempfile.TemporaryDirectory() as tmp: + stage = Path(tmp) / "stage.json" + result = parse_export_zip(_make_export_zip(), stage) + assert len(result["experience"]) == 2 + assert result["experience"][0]["company"] == "Acme Corp" + assert result["experience"][0]["title"] == "Staff Engineer" + + +def test_parse_export_zip_education(): + from scripts.linkedin_scraper import parse_export_zip + with tempfile.TemporaryDirectory() as tmp: + stage = Path(tmp) / "stage.json" + result = parse_export_zip(_make_export_zip(), stage) + assert result["education"][0]["school"] == "State University" + assert result["education"][0]["field"] == "Computer Science" + + +def test_parse_export_zip_skills(): + from scripts.linkedin_scraper import parse_export_zip + with tempfile.TemporaryDirectory() as tmp: + stage = Path(tmp) / "stage.json" + result = parse_export_zip(_make_export_zip(), stage) + assert "Python" in result["skills"] + + +def test_parse_export_zip_name_and_email(): + from scripts.linkedin_scraper import parse_export_zip + with tempfile.TemporaryDirectory() as tmp: + stage = Path(tmp) / "stage.json" + result = parse_export_zip(_make_export_zip(), stage) + assert result["name"] == "Alan Weinstock" + assert result["email"] == "alan@example.com" + + +def test_parse_export_zip_missing_csv_does_not_raise(): + from scripts.linkedin_scraper import parse_export_zip + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr("Profile.csv", + "First Name,Last Name,Headline,Summary,Email Address\n" + "Alan,Weinstock,Engineer,Summary here.,alan@example.com\n" + ) + with tempfile.TemporaryDirectory() as tmp: + stage = Path(tmp) / "stage.json" + result = parse_export_zip(buf.getvalue(), stage) + assert result["name"] == "Alan Weinstock" + assert result["experience"] == [] + + +def test_parse_export_zip_writes_staging_file(): + from scripts.linkedin_scraper import parse_export_zip + with tempfile.TemporaryDirectory() as tmp: + stage = Path(tmp) / "stage.json" + parse_export_zip(_make_export_zip(), stage) + data = json.loads(stage.read_text()) + assert data["source"] == "export_zip" + assert data["raw_html"] is None