fix(linkedin): improve scraper error handling, current-job date range, add missing tests
This commit is contained in:
parent
f64ecf81e0
commit
e937094884
2 changed files with 110 additions and 60 deletions
|
|
@ -65,6 +65,8 @@ def scrape_profile(url: str, stage_path: Path) -> dict:
|
||||||
"LinkedIn did not load in time — the request may have been blocked. "
|
"LinkedIn did not load in time — the request may have been blocked. "
|
||||||
"Try the data export option instead."
|
"Try the data export option instead."
|
||||||
)
|
)
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(f"LinkedIn scrape failed: {e}") from e
|
||||||
|
|
||||||
extracted = parse_html(raw_html)
|
extracted = parse_html(raw_html)
|
||||||
extracted["linkedin"] = url
|
extracted["linkedin"] = url
|
||||||
|
|
@ -94,69 +96,69 @@ def parse_export_zip(zip_bytes: bytes, stage_path: Path) -> dict:
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
zf_handle = zipfile.ZipFile(io.BytesIO(zip_bytes))
|
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
|
||||||
|
names_in_zip = {n.lower(): n for n in zf.namelist()}
|
||||||
|
|
||||||
|
def _read_csv(filename: str) -> list[dict]:
|
||||||
|
key = filename.lower()
|
||||||
|
if key not in names_in_zip:
|
||||||
|
return []
|
||||||
|
text = zf.read(names_in_zip[key]).decode("utf-8-sig", errors="replace")
|
||||||
|
return list(csv.DictReader(io.StringIO(text)))
|
||||||
|
|
||||||
|
for row in _read_csv("Profile.csv"):
|
||||||
|
first = row.get("First Name", "").strip()
|
||||||
|
last = row.get("Last Name", "").strip()
|
||||||
|
extracted["name"] = f"{first} {last}".strip()
|
||||||
|
extracted["email"] = row.get("Email Address", "").strip()
|
||||||
|
extracted["career_summary"] = row.get("Summary", "").strip()
|
||||||
|
break
|
||||||
|
|
||||||
|
for row in _read_csv("Position.csv"):
|
||||||
|
company = row.get("Company Name", "").strip()
|
||||||
|
title = row.get("Title", "").strip()
|
||||||
|
desc = row.get("Description", "").strip()
|
||||||
|
start = row.get("Started On", "").strip()
|
||||||
|
end = row.get("Finished On", "").strip()
|
||||||
|
end_label = end if end else ("Present" if start else "")
|
||||||
|
date_range = f"{start} – {end_label}".strip(" –") if (start or end) else ""
|
||||||
|
bullets = [d.strip() for d in re.split(r"[.•\n]+", desc) if d.strip() and len(d.strip()) > 3]
|
||||||
|
if company or title:
|
||||||
|
extracted["experience"].append({
|
||||||
|
"company": company,
|
||||||
|
"title": title,
|
||||||
|
"date_range": date_range,
|
||||||
|
"bullets": bullets,
|
||||||
|
})
|
||||||
|
|
||||||
|
for row in _read_csv("Education.csv"):
|
||||||
|
school = row.get("School Name", "").strip()
|
||||||
|
degree = row.get("Degree Name", "").strip()
|
||||||
|
field = row.get("Field Of Study", "").strip()
|
||||||
|
start = row.get("Start Date", "").strip()
|
||||||
|
end = row.get("End Date", "").strip()
|
||||||
|
dates = f"{start} – {end}".strip(" –") if start or end else ""
|
||||||
|
if school or degree:
|
||||||
|
extracted["education"].append({
|
||||||
|
"school": school,
|
||||||
|
"degree": degree,
|
||||||
|
"field": field,
|
||||||
|
"dates": dates,
|
||||||
|
})
|
||||||
|
|
||||||
|
for row in _read_csv("Skills.csv"):
|
||||||
|
skill = row.get("Name", "").strip()
|
||||||
|
if skill:
|
||||||
|
extracted["skills"].append(skill)
|
||||||
|
|
||||||
|
for row in _read_csv("Certifications.csv"):
|
||||||
|
name = row.get("Name", "").strip()
|
||||||
|
if name:
|
||||||
|
extracted["achievements"].append(name)
|
||||||
|
|
||||||
except zipfile.BadZipFile as e:
|
except zipfile.BadZipFile as e:
|
||||||
raise ValueError(f"Not a valid zip file: {e}")
|
raise ValueError(f"Not a valid zip file: {e}")
|
||||||
|
|
||||||
with zf_handle as zf:
|
|
||||||
names_in_zip = {n.lower(): n for n in zf.namelist()}
|
|
||||||
|
|
||||||
def _read_csv(filename: str) -> list[dict]:
|
|
||||||
key = filename.lower()
|
|
||||||
if key not in names_in_zip:
|
|
||||||
return []
|
|
||||||
text = zf.read(names_in_zip[key]).decode("utf-8-sig", errors="replace")
|
|
||||||
return list(csv.DictReader(io.StringIO(text)))
|
|
||||||
|
|
||||||
for row in _read_csv("Profile.csv"):
|
|
||||||
first = row.get("First Name", "").strip()
|
|
||||||
last = row.get("Last Name", "").strip()
|
|
||||||
extracted["name"] = f"{first} {last}".strip()
|
|
||||||
extracted["email"] = row.get("Email Address", "").strip()
|
|
||||||
extracted["career_summary"] = row.get("Summary", "").strip()
|
|
||||||
break
|
|
||||||
|
|
||||||
for row in _read_csv("Position.csv"):
|
|
||||||
company = row.get("Company Name", "").strip()
|
|
||||||
title = row.get("Title", "").strip()
|
|
||||||
desc = row.get("Description", "").strip()
|
|
||||||
start = row.get("Started On", "").strip()
|
|
||||||
end = row.get("Finished On", "").strip()
|
|
||||||
date_range = f"{start} – {end}".strip(" –") if start or end else ""
|
|
||||||
bullets = [d.strip() for d in re.split(r"[.•\n]+", desc) if d.strip() and len(d.strip()) > 3]
|
|
||||||
if company or title:
|
|
||||||
extracted["experience"].append({
|
|
||||||
"company": company,
|
|
||||||
"title": title,
|
|
||||||
"date_range": date_range,
|
|
||||||
"bullets": bullets,
|
|
||||||
})
|
|
||||||
|
|
||||||
for row in _read_csv("Education.csv"):
|
|
||||||
school = row.get("School Name", "").strip()
|
|
||||||
degree = row.get("Degree Name", "").strip()
|
|
||||||
field = row.get("Field Of Study", "").strip()
|
|
||||||
start = row.get("Start Date", "").strip()
|
|
||||||
end = row.get("End Date", "").strip()
|
|
||||||
dates = f"{start} – {end}".strip(" –") if start or end else ""
|
|
||||||
if school or degree:
|
|
||||||
extracted["education"].append({
|
|
||||||
"school": school,
|
|
||||||
"degree": degree,
|
|
||||||
"field": field,
|
|
||||||
"dates": dates,
|
|
||||||
})
|
|
||||||
|
|
||||||
for row in _read_csv("Skills.csv"):
|
|
||||||
skill = row.get("Name", "").strip()
|
|
||||||
if skill:
|
|
||||||
extracted["skills"].append(skill)
|
|
||||||
|
|
||||||
for row in _read_csv("Certifications.csv"):
|
|
||||||
name = row.get("Name", "").strip()
|
|
||||||
if name:
|
|
||||||
extracted["achievements"].append(name)
|
|
||||||
|
|
||||||
_write_stage(stage_path, {
|
_write_stage(stage_path, {
|
||||||
"url": None,
|
"url": None,
|
||||||
"scraped_at": datetime.now(timezone.utc).isoformat(),
|
"scraped_at": datetime.now(timezone.utc).isoformat(),
|
||||||
|
|
|
||||||
|
|
@ -163,3 +163,51 @@ def test_parse_export_zip_writes_staging_file():
|
||||||
data = json.loads(stage.read_text())
|
data = json.loads(stage.read_text())
|
||||||
assert data["source"] == "export_zip"
|
assert data["source"] == "export_zip"
|
||||||
assert data["raw_html"] is None
|
assert data["raw_html"] is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_scrape_profile_sets_linkedin_url():
|
||||||
|
from scripts.linkedin_scraper import scrape_profile
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
stage = Path(tmp) / "stage.json"
|
||||||
|
fixture_html = (Path(__file__).parent / "fixtures" / "linkedin_profile.html").read_text()
|
||||||
|
mock_page = MagicMock()
|
||||||
|
mock_page.content.return_value = fixture_html
|
||||||
|
mock_browser = MagicMock()
|
||||||
|
mock_browser.new_page.return_value = mock_page
|
||||||
|
mock_playwright = MagicMock()
|
||||||
|
mock_playwright.chromium.launch.return_value = mock_browser
|
||||||
|
with patch("scripts.linkedin_scraper.sync_playwright") as mock_sync_pw:
|
||||||
|
mock_sync_pw.return_value.__enter__ = MagicMock(return_value=mock_playwright)
|
||||||
|
mock_sync_pw.return_value.__exit__ = MagicMock(return_value=False)
|
||||||
|
result = scrape_profile("https://linkedin.com/in/alanw", stage)
|
||||||
|
assert result["linkedin"] == "https://linkedin.com/in/alanw"
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_export_zip_bad_zip_raises():
|
||||||
|
from scripts.linkedin_scraper import parse_export_zip
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
stage = Path(tmp) / "stage.json"
|
||||||
|
try:
|
||||||
|
parse_export_zip(b"not a zip file at all", stage)
|
||||||
|
assert False, "should have raised"
|
||||||
|
except ValueError as e:
|
||||||
|
assert "zip" in str(e).lower()
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_export_zip_current_job_shows_present():
|
||||||
|
"""Empty Finished On renders as '– Present', not truncated."""
|
||||||
|
from scripts.linkedin_scraper import parse_export_zip
|
||||||
|
buf = io.BytesIO()
|
||||||
|
with zipfile.ZipFile(buf, "w") as zf:
|
||||||
|
zf.writestr("Position.csv",
|
||||||
|
"Company Name,Title,Description,Started On,Finished On\n"
|
||||||
|
"Acme Corp,Staff Engineer,,Jan 2022,\n"
|
||||||
|
)
|
||||||
|
zf.writestr("Profile.csv",
|
||||||
|
"First Name,Last Name,Headline,Summary,Email Address\n"
|
||||||
|
"Alan,Weinstock,Engineer,,\n"
|
||||||
|
)
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
stage = Path(tmp) / "stage.json"
|
||||||
|
result = parse_export_zip(buf.getvalue(), stage)
|
||||||
|
assert result["experience"][0]["date_range"] == "Jan 2022 – Present"
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue