From 04d0a66f21e4ec9cb091a7f4534c220d7953093d Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Fri, 13 Mar 2026 06:02:03 -0700 Subject: [PATCH] fix(linkedin): improve scraper error handling, current-job date range, add missing tests --- scripts/linkedin_scraper.py | 122 +++++++++++++++++---------------- tests/test_linkedin_scraper.py | 48 +++++++++++++ 2 files changed, 110 insertions(+), 60 deletions(-) diff --git a/scripts/linkedin_scraper.py b/scripts/linkedin_scraper.py index 5bf9b6a..ec836e1 100644 --- a/scripts/linkedin_scraper.py +++ b/scripts/linkedin_scraper.py @@ -65,6 +65,8 @@ def scrape_profile(url: str, stage_path: Path) -> dict: "LinkedIn did not load in time — the request may have been blocked. " "Try the data export option instead." ) + except Exception as e: + raise RuntimeError(f"LinkedIn scrape failed: {e}") from e extracted = parse_html(raw_html) extracted["linkedin"] = url @@ -94,69 +96,69 @@ def parse_export_zip(zip_bytes: bytes, stage_path: Path) -> dict: } try: - zf_handle = zipfile.ZipFile(io.BytesIO(zip_bytes)) + with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: + names_in_zip = {n.lower(): n for n in zf.namelist()} + + def _read_csv(filename: str) -> list[dict]: + key = filename.lower() + if key not in names_in_zip: + return [] + text = zf.read(names_in_zip[key]).decode("utf-8-sig", errors="replace") + return list(csv.DictReader(io.StringIO(text))) + + for row in _read_csv("Profile.csv"): + first = row.get("First Name", "").strip() + last = row.get("Last Name", "").strip() + extracted["name"] = f"{first} {last}".strip() + extracted["email"] = row.get("Email Address", "").strip() + extracted["career_summary"] = row.get("Summary", "").strip() + break + + for row in _read_csv("Position.csv"): + company = row.get("Company Name", "").strip() + title = row.get("Title", "").strip() + desc = row.get("Description", "").strip() + start = row.get("Started On", "").strip() + end = row.get("Finished On", "").strip() + end_label = end if end else ("Present" if start else "") + date_range = f"{start} – {end_label}".strip(" –") if (start or end) else "" + bullets = [d.strip() for d in re.split(r"[.•\n]+", desc) if d.strip() and len(d.strip()) > 3] + if company or title: + extracted["experience"].append({ + "company": company, + "title": title, + "date_range": date_range, + "bullets": bullets, + }) + + for row in _read_csv("Education.csv"): + school = row.get("School Name", "").strip() + degree = row.get("Degree Name", "").strip() + field = row.get("Field Of Study", "").strip() + start = row.get("Start Date", "").strip() + end = row.get("End Date", "").strip() + dates = f"{start} – {end}".strip(" –") if start or end else "" + if school or degree: + extracted["education"].append({ + "school": school, + "degree": degree, + "field": field, + "dates": dates, + }) + + for row in _read_csv("Skills.csv"): + skill = row.get("Name", "").strip() + if skill: + extracted["skills"].append(skill) + + for row in _read_csv("Certifications.csv"): + name = row.get("Name", "").strip() + if name: + extracted["achievements"].append(name) + except zipfile.BadZipFile as e: raise ValueError(f"Not a valid zip file: {e}") - with zf_handle as zf: - names_in_zip = {n.lower(): n for n in zf.namelist()} - - def _read_csv(filename: str) -> list[dict]: - key = filename.lower() - if key not in names_in_zip: - return [] - text = zf.read(names_in_zip[key]).decode("utf-8-sig", errors="replace") - return list(csv.DictReader(io.StringIO(text))) - - for row in _read_csv("Profile.csv"): - first = row.get("First Name", "").strip() - last = row.get("Last Name", "").strip() - extracted["name"] = f"{first} {last}".strip() - extracted["email"] = row.get("Email Address", "").strip() - extracted["career_summary"] = row.get("Summary", "").strip() - break - - for row in _read_csv("Position.csv"): - company = row.get("Company Name", "").strip() - title = row.get("Title", "").strip() - desc = row.get("Description", "").strip() - start = row.get("Started On", "").strip() - end = row.get("Finished On", "").strip() - date_range = f"{start} – {end}".strip(" –") if start or end else "" - bullets = [d.strip() for d in re.split(r"[.•\n]+", desc) if d.strip() and len(d.strip()) > 3] - if company or title: - extracted["experience"].append({ - "company": company, - "title": title, - "date_range": date_range, - "bullets": bullets, - }) - - for row in _read_csv("Education.csv"): - school = row.get("School Name", "").strip() - degree = row.get("Degree Name", "").strip() - field = row.get("Field Of Study", "").strip() - start = row.get("Start Date", "").strip() - end = row.get("End Date", "").strip() - dates = f"{start} – {end}".strip(" –") if start or end else "" - if school or degree: - extracted["education"].append({ - "school": school, - "degree": degree, - "field": field, - "dates": dates, - }) - - for row in _read_csv("Skills.csv"): - skill = row.get("Name", "").strip() - if skill: - extracted["skills"].append(skill) - - for row in _read_csv("Certifications.csv"): - name = row.get("Name", "").strip() - if name: - extracted["achievements"].append(name) - _write_stage(stage_path, { "url": None, "scraped_at": datetime.now(timezone.utc).isoformat(), diff --git a/tests/test_linkedin_scraper.py b/tests/test_linkedin_scraper.py index 9d53042..8fb5e96 100644 --- a/tests/test_linkedin_scraper.py +++ b/tests/test_linkedin_scraper.py @@ -163,3 +163,51 @@ def test_parse_export_zip_writes_staging_file(): data = json.loads(stage.read_text()) assert data["source"] == "export_zip" assert data["raw_html"] is None + + +def test_scrape_profile_sets_linkedin_url(): + from scripts.linkedin_scraper import scrape_profile + with tempfile.TemporaryDirectory() as tmp: + stage = Path(tmp) / "stage.json" + fixture_html = (Path(__file__).parent / "fixtures" / "linkedin_profile.html").read_text() + mock_page = MagicMock() + mock_page.content.return_value = fixture_html + mock_browser = MagicMock() + mock_browser.new_page.return_value = mock_page + mock_playwright = MagicMock() + mock_playwright.chromium.launch.return_value = mock_browser + with patch("scripts.linkedin_scraper.sync_playwright") as mock_sync_pw: + mock_sync_pw.return_value.__enter__ = MagicMock(return_value=mock_playwright) + mock_sync_pw.return_value.__exit__ = MagicMock(return_value=False) + result = scrape_profile("https://linkedin.com/in/alanw", stage) + assert result["linkedin"] == "https://linkedin.com/in/alanw" + + +def test_parse_export_zip_bad_zip_raises(): + from scripts.linkedin_scraper import parse_export_zip + with tempfile.TemporaryDirectory() as tmp: + stage = Path(tmp) / "stage.json" + try: + parse_export_zip(b"not a zip file at all", stage) + assert False, "should have raised" + except ValueError as e: + assert "zip" in str(e).lower() + + +def test_parse_export_zip_current_job_shows_present(): + """Empty Finished On renders as '– Present', not truncated.""" + from scripts.linkedin_scraper import parse_export_zip + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr("Position.csv", + "Company Name,Title,Description,Started On,Finished On\n" + "Acme Corp,Staff Engineer,,Jan 2022,\n" + ) + zf.writestr("Profile.csv", + "First Name,Last Name,Headline,Summary,Email Address\n" + "Alan,Weinstock,Engineer,,\n" + ) + with tempfile.TemporaryDirectory() as tmp: + stage = Path(tmp) / "stage.json" + result = parse_export_zip(buf.getvalue(), stage) + assert result["experience"][0]["date_range"] == "Jan 2022 – Present"