167 lines
5.9 KiB
Python
167 lines
5.9 KiB
Python
# scripts/linkedin_scraper.py
|
||
"""
|
||
LinkedIn profile scraper.
|
||
|
||
Two entry points:
|
||
scrape_profile(url, stage_path) — Playwright headless fetch
|
||
parse_export_zip(zip_bytes, stage_path) — LinkedIn data archive CSV parse
|
||
|
||
Both write a staging file at stage_path and return the extracted dict.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import csv
|
||
import io
|
||
import json
|
||
import re
|
||
import zipfile
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
|
||
from scripts.linkedin_utils import parse_html
|
||
|
||
_LINKEDIN_PROFILE_RE = re.compile(r"https?://(www\.)?linkedin\.com/in/", re.I)
|
||
|
||
_CHROME_UA = (
|
||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
||
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
||
)
|
||
|
||
|
||
def _write_stage(stage_path: Path, payload: dict) -> None:
|
||
"""Atomic write: write to .tmp then rename to avoid partial reads."""
|
||
tmp = stage_path.with_suffix(".tmp")
|
||
tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2))
|
||
tmp.rename(stage_path)
|
||
|
||
|
||
def scrape_profile(url: str, stage_path: Path) -> dict:
|
||
"""
|
||
Fetch a public LinkedIn profile via Playwright headless Chrome.
|
||
|
||
Raises ValueError if url is not a linkedin.com/in/ URL.
|
||
Raises RuntimeError on scrape failure (timeout, blocked, etc.).
|
||
Returns the extracted dict and writes the staging file.
|
||
"""
|
||
if not _LINKEDIN_PROFILE_RE.match(url):
|
||
raise ValueError(
|
||
f"Expected a LinkedIn profile URL (linkedin.com/in/…), got: {url}"
|
||
)
|
||
|
||
try:
|
||
with sync_playwright() as pw:
|
||
browser = pw.chromium.launch(headless=True)
|
||
page = browser.new_page(user_agent=_CHROME_UA)
|
||
page.goto(url, timeout=30_000)
|
||
page.wait_for_selector(
|
||
"h1, section[data-section], #experience, #about",
|
||
timeout=20_000,
|
||
)
|
||
raw_html = page.content()
|
||
browser.close()
|
||
except PWTimeout:
|
||
raise RuntimeError(
|
||
"LinkedIn did not load in time — the request may have been blocked. "
|
||
"Try the data export option instead."
|
||
)
|
||
|
||
extracted = parse_html(raw_html)
|
||
extracted["linkedin"] = url
|
||
|
||
_write_stage(stage_path, {
|
||
"url": url,
|
||
"scraped_at": datetime.now(timezone.utc).isoformat(),
|
||
"source": "url_scrape",
|
||
"raw_html": raw_html,
|
||
"extracted": extracted,
|
||
})
|
||
return extracted
|
||
|
||
|
||
def parse_export_zip(zip_bytes: bytes, stage_path: Path) -> dict:
|
||
"""
|
||
Parse a LinkedIn data export archive.
|
||
|
||
zip_bytes: raw zip bytes — callers do: zip_bytes = uploaded_file.read()
|
||
Returns the extracted dict and writes the staging file.
|
||
Missing CSV files are skipped silently.
|
||
"""
|
||
extracted: dict = {
|
||
"name": "", "email": "", "phone": "", "linkedin": "",
|
||
"career_summary": "",
|
||
"experience": [], "education": [], "skills": [], "achievements": [],
|
||
}
|
||
|
||
try:
|
||
zf_handle = zipfile.ZipFile(io.BytesIO(zip_bytes))
|
||
except zipfile.BadZipFile as e:
|
||
raise ValueError(f"Not a valid zip file: {e}")
|
||
|
||
with zf_handle as zf:
|
||
names_in_zip = {n.lower(): n for n in zf.namelist()}
|
||
|
||
def _read_csv(filename: str) -> list[dict]:
|
||
key = filename.lower()
|
||
if key not in names_in_zip:
|
||
return []
|
||
text = zf.read(names_in_zip[key]).decode("utf-8-sig", errors="replace")
|
||
return list(csv.DictReader(io.StringIO(text)))
|
||
|
||
for row in _read_csv("Profile.csv"):
|
||
first = row.get("First Name", "").strip()
|
||
last = row.get("Last Name", "").strip()
|
||
extracted["name"] = f"{first} {last}".strip()
|
||
extracted["email"] = row.get("Email Address", "").strip()
|
||
extracted["career_summary"] = row.get("Summary", "").strip()
|
||
break
|
||
|
||
for row in _read_csv("Position.csv"):
|
||
company = row.get("Company Name", "").strip()
|
||
title = row.get("Title", "").strip()
|
||
desc = row.get("Description", "").strip()
|
||
start = row.get("Started On", "").strip()
|
||
end = row.get("Finished On", "").strip()
|
||
date_range = f"{start} – {end}".strip(" –") if start or end else ""
|
||
bullets = [d.strip() for d in re.split(r"[.•\n]+", desc) if d.strip() and len(d.strip()) > 3]
|
||
if company or title:
|
||
extracted["experience"].append({
|
||
"company": company,
|
||
"title": title,
|
||
"date_range": date_range,
|
||
"bullets": bullets,
|
||
})
|
||
|
||
for row in _read_csv("Education.csv"):
|
||
school = row.get("School Name", "").strip()
|
||
degree = row.get("Degree Name", "").strip()
|
||
field = row.get("Field Of Study", "").strip()
|
||
start = row.get("Start Date", "").strip()
|
||
end = row.get("End Date", "").strip()
|
||
dates = f"{start} – {end}".strip(" –") if start or end else ""
|
||
if school or degree:
|
||
extracted["education"].append({
|
||
"school": school,
|
||
"degree": degree,
|
||
"field": field,
|
||
"dates": dates,
|
||
})
|
||
|
||
for row in _read_csv("Skills.csv"):
|
||
skill = row.get("Name", "").strip()
|
||
if skill:
|
||
extracted["skills"].append(skill)
|
||
|
||
for row in _read_csv("Certifications.csv"):
|
||
name = row.get("Name", "").strip()
|
||
if name:
|
||
extracted["achievements"].append(name)
|
||
|
||
_write_stage(stage_path, {
|
||
"url": None,
|
||
"scraped_at": datetime.now(timezone.utc).isoformat(),
|
||
"source": "export_zip",
|
||
"raw_html": None,
|
||
"extracted": extracted,
|
||
})
|
||
return extracted
|