peregrine/scripts/linkedin_scraper.py

167 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# scripts/linkedin_scraper.py
"""
LinkedIn profile scraper.
Two entry points:
scrape_profile(url, stage_path) — Playwright headless fetch
parse_export_zip(zip_bytes, stage_path) — LinkedIn data archive CSV parse
Both write a staging file at stage_path and return the extracted dict.
"""
from __future__ import annotations
import csv
import io
import json
import re
import zipfile
from datetime import datetime, timezone
from pathlib import Path
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
from scripts.linkedin_utils import parse_html
_LINKEDIN_PROFILE_RE = re.compile(r"https?://(www\.)?linkedin\.com/in/", re.I)
_CHROME_UA = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
)
def _write_stage(stage_path: Path, payload: dict) -> None:
"""Atomic write: write to .tmp then rename to avoid partial reads."""
tmp = stage_path.with_suffix(".tmp")
tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2))
tmp.rename(stage_path)
def scrape_profile(url: str, stage_path: Path) -> dict:
"""
Fetch a public LinkedIn profile via Playwright headless Chrome.
Raises ValueError if url is not a linkedin.com/in/ URL.
Raises RuntimeError on scrape failure (timeout, blocked, etc.).
Returns the extracted dict and writes the staging file.
"""
if not _LINKEDIN_PROFILE_RE.match(url):
raise ValueError(
f"Expected a LinkedIn profile URL (linkedin.com/in/…), got: {url}"
)
try:
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=True)
page = browser.new_page(user_agent=_CHROME_UA)
page.goto(url, timeout=30_000)
page.wait_for_selector(
"h1, section[data-section], #experience, #about",
timeout=20_000,
)
raw_html = page.content()
browser.close()
except PWTimeout:
raise RuntimeError(
"LinkedIn did not load in time — the request may have been blocked. "
"Try the data export option instead."
)
extracted = parse_html(raw_html)
extracted["linkedin"] = url
_write_stage(stage_path, {
"url": url,
"scraped_at": datetime.now(timezone.utc).isoformat(),
"source": "url_scrape",
"raw_html": raw_html,
"extracted": extracted,
})
return extracted
def parse_export_zip(zip_bytes: bytes, stage_path: Path) -> dict:
"""
Parse a LinkedIn data export archive.
zip_bytes: raw zip bytes — callers do: zip_bytes = uploaded_file.read()
Returns the extracted dict and writes the staging file.
Missing CSV files are skipped silently.
"""
extracted: dict = {
"name": "", "email": "", "phone": "", "linkedin": "",
"career_summary": "",
"experience": [], "education": [], "skills": [], "achievements": [],
}
try:
zf_handle = zipfile.ZipFile(io.BytesIO(zip_bytes))
except zipfile.BadZipFile as e:
raise ValueError(f"Not a valid zip file: {e}")
with zf_handle as zf:
names_in_zip = {n.lower(): n for n in zf.namelist()}
def _read_csv(filename: str) -> list[dict]:
key = filename.lower()
if key not in names_in_zip:
return []
text = zf.read(names_in_zip[key]).decode("utf-8-sig", errors="replace")
return list(csv.DictReader(io.StringIO(text)))
for row in _read_csv("Profile.csv"):
first = row.get("First Name", "").strip()
last = row.get("Last Name", "").strip()
extracted["name"] = f"{first} {last}".strip()
extracted["email"] = row.get("Email Address", "").strip()
extracted["career_summary"] = row.get("Summary", "").strip()
break
for row in _read_csv("Position.csv"):
company = row.get("Company Name", "").strip()
title = row.get("Title", "").strip()
desc = row.get("Description", "").strip()
start = row.get("Started On", "").strip()
end = row.get("Finished On", "").strip()
date_range = f"{start} {end}".strip(" ") if start or end else ""
bullets = [d.strip() for d in re.split(r"[.•\n]+", desc) if d.strip() and len(d.strip()) > 3]
if company or title:
extracted["experience"].append({
"company": company,
"title": title,
"date_range": date_range,
"bullets": bullets,
})
for row in _read_csv("Education.csv"):
school = row.get("School Name", "").strip()
degree = row.get("Degree Name", "").strip()
field = row.get("Field Of Study", "").strip()
start = row.get("Start Date", "").strip()
end = row.get("End Date", "").strip()
dates = f"{start} {end}".strip(" ") if start or end else ""
if school or degree:
extracted["education"].append({
"school": school,
"degree": degree,
"field": field,
"dates": dates,
})
for row in _read_csv("Skills.csv"):
skill = row.get("Name", "").strip()
if skill:
extracted["skills"].append(skill)
for row in _read_csv("Certifications.csv"):
name = row.get("Name", "").strip()
if name:
extracted["achievements"].append(name)
_write_stage(stage_path, {
"url": None,
"scraped_at": datetime.now(timezone.utc).isoformat(),
"source": "export_zip",
"raw_html": None,
"extracted": extracted,
})
return extracted