feat(linkedin): add scraper (Playwright + export zip) with URL validation

This commit is contained in:
pyr0ball 2026-03-13 01:06:39 -07:00
parent a43e29e50d
commit f64ecf81e0
2 changed files with 332 additions and 0 deletions

167
scripts/linkedin_scraper.py Normal file
View file

@ -0,0 +1,167 @@
# scripts/linkedin_scraper.py
"""
LinkedIn profile scraper.
Two entry points:
scrape_profile(url, stage_path) Playwright headless fetch
parse_export_zip(zip_bytes, stage_path) LinkedIn data archive CSV parse
Both write a staging file at stage_path and return the extracted dict.
"""
from __future__ import annotations
import csv
import io
import json
import re
import zipfile
from datetime import datetime, timezone
from pathlib import Path
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
from scripts.linkedin_utils import parse_html
_LINKEDIN_PROFILE_RE = re.compile(r"https?://(www\.)?linkedin\.com/in/", re.I)
_CHROME_UA = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
)
def _write_stage(stage_path: Path, payload: dict) -> None:
"""Atomic write: write to .tmp then rename to avoid partial reads."""
tmp = stage_path.with_suffix(".tmp")
tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2))
tmp.rename(stage_path)
def scrape_profile(url: str, stage_path: Path) -> dict:
"""
Fetch a public LinkedIn profile via Playwright headless Chrome.
Raises ValueError if url is not a linkedin.com/in/ URL.
Raises RuntimeError on scrape failure (timeout, blocked, etc.).
Returns the extracted dict and writes the staging file.
"""
if not _LINKEDIN_PROFILE_RE.match(url):
raise ValueError(
f"Expected a LinkedIn profile URL (linkedin.com/in/…), got: {url}"
)
try:
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=True)
page = browser.new_page(user_agent=_CHROME_UA)
page.goto(url, timeout=30_000)
page.wait_for_selector(
"h1, section[data-section], #experience, #about",
timeout=20_000,
)
raw_html = page.content()
browser.close()
except PWTimeout:
raise RuntimeError(
"LinkedIn did not load in time — the request may have been blocked. "
"Try the data export option instead."
)
extracted = parse_html(raw_html)
extracted["linkedin"] = url
_write_stage(stage_path, {
"url": url,
"scraped_at": datetime.now(timezone.utc).isoformat(),
"source": "url_scrape",
"raw_html": raw_html,
"extracted": extracted,
})
return extracted
def parse_export_zip(zip_bytes: bytes, stage_path: Path) -> dict:
"""
Parse a LinkedIn data export archive.
zip_bytes: raw zip bytes callers do: zip_bytes = uploaded_file.read()
Returns the extracted dict and writes the staging file.
Missing CSV files are skipped silently.
"""
extracted: dict = {
"name": "", "email": "", "phone": "", "linkedin": "",
"career_summary": "",
"experience": [], "education": [], "skills": [], "achievements": [],
}
try:
zf_handle = zipfile.ZipFile(io.BytesIO(zip_bytes))
except zipfile.BadZipFile as e:
raise ValueError(f"Not a valid zip file: {e}")
with zf_handle as zf:
names_in_zip = {n.lower(): n for n in zf.namelist()}
def _read_csv(filename: str) -> list[dict]:
key = filename.lower()
if key not in names_in_zip:
return []
text = zf.read(names_in_zip[key]).decode("utf-8-sig", errors="replace")
return list(csv.DictReader(io.StringIO(text)))
for row in _read_csv("Profile.csv"):
first = row.get("First Name", "").strip()
last = row.get("Last Name", "").strip()
extracted["name"] = f"{first} {last}".strip()
extracted["email"] = row.get("Email Address", "").strip()
extracted["career_summary"] = row.get("Summary", "").strip()
break
for row in _read_csv("Position.csv"):
company = row.get("Company Name", "").strip()
title = row.get("Title", "").strip()
desc = row.get("Description", "").strip()
start = row.get("Started On", "").strip()
end = row.get("Finished On", "").strip()
date_range = f"{start} {end}".strip(" ") if start or end else ""
bullets = [d.strip() for d in re.split(r"[.•\n]+", desc) if d.strip() and len(d.strip()) > 3]
if company or title:
extracted["experience"].append({
"company": company,
"title": title,
"date_range": date_range,
"bullets": bullets,
})
for row in _read_csv("Education.csv"):
school = row.get("School Name", "").strip()
degree = row.get("Degree Name", "").strip()
field = row.get("Field Of Study", "").strip()
start = row.get("Start Date", "").strip()
end = row.get("End Date", "").strip()
dates = f"{start} {end}".strip(" ") if start or end else ""
if school or degree:
extracted["education"].append({
"school": school,
"degree": degree,
"field": field,
"dates": dates,
})
for row in _read_csv("Skills.csv"):
skill = row.get("Name", "").strip()
if skill:
extracted["skills"].append(skill)
for row in _read_csv("Certifications.csv"):
name = row.get("Name", "").strip()
if name:
extracted["achievements"].append(name)
_write_stage(stage_path, {
"url": None,
"scraped_at": datetime.now(timezone.utc).isoformat(),
"source": "export_zip",
"raw_html": None,
"extracted": extracted,
})
return extracted

View file

@ -0,0 +1,165 @@
# tests/test_linkedin_scraper.py
import io
import json
import sys
import zipfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import tempfile
sys.path.insert(0, str(Path(__file__).parent.parent))
def test_invalid_url_raises():
from scripts.linkedin_scraper import scrape_profile
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
try:
scrape_profile("https://linkedin.com/company/acme", stage)
assert False, "should have raised"
except ValueError as e:
assert "linkedin.com/in/" in str(e)
def test_non_linkedin_url_raises():
from scripts.linkedin_scraper import scrape_profile
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
try:
scrape_profile("https://example.com/profile", stage)
assert False, "should have raised"
except ValueError:
pass
def test_valid_linkedin_url_accepted():
from scripts.linkedin_scraper import scrape_profile
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
fixture_html = (Path(__file__).parent / "fixtures" / "linkedin_profile.html").read_text()
mock_page = MagicMock()
mock_page.content.return_value = fixture_html
mock_browser = MagicMock()
mock_browser.new_page.return_value = mock_page
mock_playwright = MagicMock()
mock_playwright.chromium.launch.return_value = mock_browser
with patch("scripts.linkedin_scraper.sync_playwright") as mock_sync_pw:
mock_sync_pw.return_value.__enter__ = MagicMock(return_value=mock_playwright)
mock_sync_pw.return_value.__exit__ = MagicMock(return_value=False)
result = scrape_profile("https://linkedin.com/in/alanw", stage)
assert result["name"] == "Alan Weinstock"
assert stage.exists()
def test_scrape_profile_writes_staging_file():
from scripts.linkedin_scraper import scrape_profile
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
fixture_html = (Path(__file__).parent / "fixtures" / "linkedin_profile.html").read_text()
mock_page = MagicMock()
mock_page.content.return_value = fixture_html
mock_browser = MagicMock()
mock_browser.new_page.return_value = mock_page
mock_playwright = MagicMock()
mock_playwright.chromium.launch.return_value = mock_browser
with patch("scripts.linkedin_scraper.sync_playwright") as mock_sync_pw:
mock_sync_pw.return_value.__enter__ = MagicMock(return_value=mock_playwright)
mock_sync_pw.return_value.__exit__ = MagicMock(return_value=False)
scrape_profile("https://linkedin.com/in/alanw", stage)
data = json.loads(stage.read_text())
assert data["source"] == "url_scrape"
assert data["url"] == "https://linkedin.com/in/alanw"
assert "raw_html" in data
assert "extracted" in data
assert data["extracted"]["name"] == "Alan Weinstock"
def _make_export_zip() -> bytes:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("Position.csv",
"Company Name,Title,Description,Started On,Finished On\n"
"Acme Corp,Staff Engineer,Led migration. Built CI/CD.,Jan 2022,\n"
"Beta Industries,Senior Engineer,Maintained clusters.,Mar 2019,Dec 2021\n"
)
zf.writestr("Education.csv",
"School Name,Degree Name,Field Of Study,Start Date,End Date\n"
"State University,Bachelor of Science,Computer Science,2010,2014\n"
)
zf.writestr("Skills.csv",
"Name,Description\n"
"Python,\n"
"Kubernetes,\n"
)
zf.writestr("Profile.csv",
"First Name,Last Name,Headline,Summary,Email Address\n"
"Alan,Weinstock,Staff Engineer,Experienced engineer.,alan@example.com\n"
)
return buf.getvalue()
def test_parse_export_zip_experience():
from scripts.linkedin_scraper import parse_export_zip
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
result = parse_export_zip(_make_export_zip(), stage)
assert len(result["experience"]) == 2
assert result["experience"][0]["company"] == "Acme Corp"
assert result["experience"][0]["title"] == "Staff Engineer"
def test_parse_export_zip_education():
from scripts.linkedin_scraper import parse_export_zip
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
result = parse_export_zip(_make_export_zip(), stage)
assert result["education"][0]["school"] == "State University"
assert result["education"][0]["field"] == "Computer Science"
def test_parse_export_zip_skills():
from scripts.linkedin_scraper import parse_export_zip
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
result = parse_export_zip(_make_export_zip(), stage)
assert "Python" in result["skills"]
def test_parse_export_zip_name_and_email():
from scripts.linkedin_scraper import parse_export_zip
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
result = parse_export_zip(_make_export_zip(), stage)
assert result["name"] == "Alan Weinstock"
assert result["email"] == "alan@example.com"
def test_parse_export_zip_missing_csv_does_not_raise():
from scripts.linkedin_scraper import parse_export_zip
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("Profile.csv",
"First Name,Last Name,Headline,Summary,Email Address\n"
"Alan,Weinstock,Engineer,Summary here.,alan@example.com\n"
)
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
result = parse_export_zip(buf.getvalue(), stage)
assert result["name"] == "Alan Weinstock"
assert result["experience"] == []
def test_parse_export_zip_writes_staging_file():
from scripts.linkedin_scraper import parse_export_zip
with tempfile.TemporaryDirectory() as tmp:
stage = Path(tmp) / "stage.json"
parse_export_zip(_make_export_zip(), stage)
data = json.loads(stage.read_text())
assert data["source"] == "export_zip"
assert data["raw_html"] is None