From 5a0ba92fc6e2bd664b4b8c9c47d3bda9d2cc5b42 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Fri, 24 Apr 2026 15:29:26 -0700 Subject: [PATCH] chore: add README + gather_corpus.py script --- README.md | 106 ++++++++++++ scripts/gather_corpus.py | 355 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 461 insertions(+) create mode 100644 README.md create mode 100644 scripts/gather_corpus.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..77f4798 --- /dev/null +++ b/README.md @@ -0,0 +1,106 @@ +# Avocet — Email Classifier Training Tool + +> *Part of the CircuitForge LLC internal infrastructure suite.* + +**Status:** Internal beta — label tool and benchmark harness complete. Used to build training data for Peregrine's email classifier. + +--- + +## What it does + +Avocet is the data pipeline for building and benchmarking email classifiers. It has two layers: + +**No LLM required.** Avocet uses zero-shot HuggingFace classification models — no API key, no cloud inference, no GPU required for the label tool. The benchmark harness can optionally export LLM-labeled emails from a Peregrine staging DB, but human labeling via the card-stack UI is the primary workflow. + +**Layer 1 — Label tool** +Card-stack UI for building ground-truth classifier benchmark data. Fetch emails from one or more IMAP accounts (with targeted date-range and sender/subject filters), review them card-by-card, and label each with a job-search category. Labeled output feeds the benchmark harness. + +**Layer 2 — Benchmark harness** +Scores HuggingFace zero-shot classification models against the labeled dataset. Supports slow/large model inclusion, visual side-by-side comparison on live emails, and export of LLM-labeled emails from a Peregrine staging DB. + +--- + +## Labels + +| Label | Key | +|-------|-----| +| `interview_scheduled` | 1 | +| `offer_received` | 2 | +| `rejected` | 3 | +| `positive_response` | 4 | +| `survey_received` | 5 | +| `neutral` | 6 | +| `event_rescheduled` | 7 | +| `unrelated` | 8 | +| `digest` | 9 | + +--- + +## Stack + +| Layer | Tech | +|-------|------| +| Label UI | Streamlit (port 8503, auto-increments on collision) | +| Benchmark | Python + HuggingFace Transformers | +| Email fetch | IMAP (multi-account, targeted date/sender/subject filter) | +| Data | JSONL (`data/email_label_queue.jsonl`, `data/email_score.jsonl`) | +| Config | `config/label_tool.yaml` (gitignored — see `.example`) | + +Conda environments: +- `job-seeker` — label tool UI +- `job-seeker-classifiers` — benchmark harness (separate env for heavy deps) + +--- + +## Running + +```bash +./manage.sh start # start label tool UI (port collision-safe from 8503) +./manage.sh stop # stop +./manage.sh restart # restart +./manage.sh status # show running state and port +./manage.sh logs # tail label tool log +./manage.sh open # open in browser +``` + +Benchmark: +```bash +./manage.sh benchmark --list-models # list available zero-shot models +./manage.sh score # score models against labeled JSONL +./manage.sh score --include-slow # include large/slow models +./manage.sh compare --limit 30 # visual comparison on live IMAP emails +``` + +Dev: +```bash +./manage.sh test # run pytest suite +``` + +--- + +## Data flow + +``` +IMAP accounts → fetch (targeted or wide) → email_label_queue.jsonl +→ label tool card UI → email_score.jsonl +→ benchmark harness → model rankings +→ best model → Peregrine classifier adapter +``` + +Targeted fetch: date range + sender/subject filter for pulling historical emails on specific senders or topics without flooding the queue. + +Discard: removes an email from the queue without writing to the score file — for emails that don't belong in the training set. + +--- + +## Classifier adapters + +`app/classifier_adapters.py` provides a common interface for swapping classifier backends. Falls back to the label name when no `LABEL_DESCRIPTIONS` entry is configured for a label (RerankerAdapter). + +--- + +## License + +BSL 1.1 — internal tool, not user-facing. + +© 2026 Circuit Forge LLC diff --git a/scripts/gather_corpus.py b/scripts/gather_corpus.py new file mode 100644 index 0000000..20df7a0 --- /dev/null +++ b/scripts/gather_corpus.py @@ -0,0 +1,355 @@ +#!/usr/bin/env python3 +""" +Corpus gatherer for the voice benchmark fine-tune pipeline. + +Pulls writing samples from multiple sources and drops .txt files into +data/voice_corpus/ in the format expected by benchmark_voice.py. + +Sources: + - Reddit: u/pyr0ball post history + comment history (public JSON API) + - Campaign copy: claude-bridge/reddit-poster/campaigns/*.py (BODY strings) + - Documents: brainmap, homeprojects notes, selected personal writing + - Discord: requires manual export (see instructions below) + +Usage: + # Full gather (Reddit + local sources) + conda run -n cf python scripts/gather_corpus.py + + # Reddit only + conda run -n cf python scripts/gather_corpus.py --source reddit + + # Local files only (no network) + conda run -n cf python scripts/gather_corpus.py --source local + + # Process a Discord data export zip + conda run -n cf python scripts/gather_corpus.py --discord /path/to/discord-export.zip + +Discord export instructions: + Discord Settings → Privacy & Safety → Request all my data + Wait for email, download zip, then run with --discord flag. +""" +from __future__ import annotations + +import argparse +import ast +import json +import re +import time +import zipfile +from pathlib import Path +from typing import Any + +import httpx + +# ------------------------------------------------------------------ # +# Paths +# ------------------------------------------------------------------ # + +_ROOT = Path(__file__).parent.parent +_CORPUS_DIR = _ROOT / "data" / "style_corpus" +_CLAUDE_BRIDGE = Path("/Library/Development/CircuitForge/claude-bridge") +_DOCUMENTS = Path("/Library/Documents") + +_REDDIT_USER = "pyr0ball" +_USER_AGENT = "Avocet/0.1 corpus-gatherer (CircuitForge; personal research)" +_REDDIT_BASE = "https://www.reddit.com" + +# Minimum character length to include a sample (filters out one-liners) +_MIN_LENGTH = 80 + +# Phrases that suggest AI-generated content — skip these +_AI_TELLS = [ + "certainly!", "absolutely!", "great question", "i'd be happy to", + "i apologize for", "it's worth noting", "in conclusion,", + "feel free to reach out", +] + + +# ------------------------------------------------------------------ # +# Helpers +# ------------------------------------------------------------------ # + +def _is_ai_generated(text: str) -> bool: + lower = text.lower() + return any(phrase in lower for phrase in _AI_TELLS) + + +def _clean(text: str) -> str: + """Strip Reddit formatting artifacts and normalize whitespace.""" + text = re.sub(r"\[deleted\]|\[removed\]", "", text) + text = re.sub(r"\s+", " ", text).strip() + return text + + +def _write_corpus_file(filename: str, samples: list[str], source_label: str) -> None: + """Write samples to a corpus .txt file with minimal separators.""" + path = _CORPUS_DIR / filename + kept = [s for s in samples if len(s) >= _MIN_LENGTH and not _is_ai_generated(s)] + if not kept: + print(f" [skip] {filename} — no samples passed filters") + return + separator = "\n\n---\n\n" + path.write_text(separator.join(kept), encoding="utf-8") + print(f" [ok] {filename} — {len(kept)} samples ({path.stat().st_size // 1024}KB)") + + +# ------------------------------------------------------------------ # +# Reddit source +# ------------------------------------------------------------------ # + +def _reddit_fetch_page( + client: httpx.Client, + listing_type: str, + after: str | None, +) -> tuple[list[dict[str, Any]], str | None]: + """Fetch one page of a user's submitted posts or comments.""" + params: dict[str, Any] = {"limit": 100, "raw_json": 1} + if after: + params["after"] = after + url = f"{_REDDIT_BASE}/user/{_REDDIT_USER}/{listing_type}.json" + resp = client.get(url, params=params) + resp.raise_for_status() + data = resp.json() + children = data["data"]["children"] + new_after = data["data"].get("after") + return [c["data"] for c in children], new_after + + +def _reddit_fetch_all(listing_type: str, max_items: int = 1000) -> list[dict[str, Any]]: + """Paginate through a user listing until exhausted or max_items reached.""" + items: list[dict[str, Any]] = [] + after: str | None = None + with httpx.Client( + headers={"User-Agent": _USER_AGENT}, + follow_redirects=True, + timeout=20.0, + ) as client: + while len(items) < max_items: + try: + page, after = _reddit_fetch_page(client, listing_type, after) + except httpx.HTTPStatusError as exc: + # Reddit blocks unauthenticated pagination after the first page; + # save what we have rather than crashing. + print(f" stopped at {len(items)} {listing_type} (HTTP {exc.response.status_code})") + break + if not page: + break + items.extend(page) + print(f" fetched {len(items)} {listing_type}...") + if not after: + break + time.sleep(1.0) # respect rate limit + return items + + +def gather_reddit() -> None: + print("Fetching Reddit history for u/pyr0ball...") + + # Posts (submitted) + print(" Posts:") + posts = _reddit_fetch_all("submitted") + post_texts: list[str] = [] + for p in posts: + body = _clean(p.get("selftext", "") or "") + title = _clean(p.get("title", "")) + if len(body) >= _MIN_LENGTH: + post_texts.append(f"{title}\n\n{body}") + elif len(title) >= 20: + # Title-only posts (link posts) — include title as micro-sample + post_texts.append(title) + _write_corpus_file("social_post_reddit.txt", post_texts, "reddit/submitted") + + # Comments + print(" Comments:") + comments = _reddit_fetch_all("comments") + comment_texts: list[str] = [] + for c in comments: + body = _clean(c.get("body", "") or "") + if body and body not in ("[deleted]", "[removed]"): + comment_texts.append(body) + _write_corpus_file("social_reply_reddit_comments.txt", comment_texts, "reddit/comments") + + print(f" Done. {len(posts)} posts, {len(comments)} comments fetched.") + + +# ------------------------------------------------------------------ # +# Campaign copy source (claude-bridge) +# ------------------------------------------------------------------ # + +def _extract_body_from_campaign(py_file: Path) -> str | None: + """ + Parse a campaign Python file and extract the BODY string literal. + Uses AST to handle multi-line strings safely. + """ + try: + tree = ast.parse(py_file.read_text(encoding="utf-8")) + for node in ast.walk(tree): + if isinstance(node, ast.Assign): + for target in node.targets: + if isinstance(target, ast.Name) and target.id == "BODY": + if isinstance(node.value, ast.Constant): + return str(node.value.value) + except (SyntaxError, UnicodeDecodeError): + pass + return None + + +def gather_campaigns() -> None: + campaigns_dir = _CLAUDE_BRIDGE / "reddit-poster" / "campaigns" + if not campaigns_dir.exists(): + print(f" [skip] campaigns dir not found: {campaigns_dir}") + return + + print("Gathering campaign copy from claude-bridge...") + samples: list[str] = [] + for py_file in sorted(campaigns_dir.glob("*.py")): + body = _extract_body_from_campaign(py_file) + if body: + samples.append(body.strip()) + print(f" {py_file.name} — {len(body)} chars") + + _write_corpus_file("narrative_campaign_copy.txt", samples, "claude-bridge/campaigns") + + +# ------------------------------------------------------------------ # +# Documents source +# ------------------------------------------------------------------ # + +def gather_documents() -> None: + print("Gathering local Documents...") + samples: list[str] = [] + + # brainmap — personal planning/thinking notes + brainmap = _DOCUMENTS / "brainmap_v1.md" + if brainmap.exists(): + text = _clean(brainmap.read_text(encoding="utf-8")) + if len(text) >= _MIN_LENGTH: + samples.append(text) + print(f" brainmap_v1.md — {len(text)} chars") + + # HomeProjects handoff notes — casual technical prose + for handoff in sorted((_DOCUMENTS / "HomeProjects").glob("handoff*.md")): + text = _clean(handoff.read_text(encoding="utf-8", errors="replace")) + if len(text) >= _MIN_LENGTH: + samples.append(text) + print(f" {handoff.name} — {len(text)} chars") + + # Personal letters (Closet folder) — intimate prose voice + closet = _DOCUMENTS / "Closet" + if closet.exists(): + for letter in closet.glob("*.md"): + text = _clean(letter.read_text(encoding="utf-8", errors="replace")) + if len(text) >= _MIN_LENGTH and not _is_ai_generated(text): + samples.append(text) + print(f" {letter.name} — {len(text)} chars") + + _write_corpus_file("narrative_personal_docs.txt", samples, "documents") + + +# ------------------------------------------------------------------ # +# Discord export source +# ------------------------------------------------------------------ # + +def gather_discord(export_zip: Path) -> None: + """ + Process a Discord data export zip (from Settings → Privacy & Safety → Request all my data). + + Expected zip structure: + messages/ + c{channel_id}/ + messages.json -- list of {ID, Timestamp, Contents, Attachments} + account/ + user.json -- {username, ...} + """ + print(f"Processing Discord export: {export_zip}") + samples: list[str] = [] + message_count = 0 + + with zipfile.ZipFile(export_zip) as zf: + # Find all messages.json files + message_files = [n for n in zf.namelist() if n.endswith("/messages.json")] + print(f" Found {len(message_files)} channel(s)") + + for mf in message_files: + try: + data = json.loads(zf.read(mf)) + except (json.JSONDecodeError, KeyError): + continue + + for msg in data: + content = _clean(msg.get("Contents", "") or "") + # Skip system messages, bot commands, very short messages + if ( + len(content) < _MIN_LENGTH + or content.startswith("/") + or content.startswith("!") + or _is_ai_generated(content) + ): + continue + # Skip messages that are just URLs or attachments + if re.match(r"^https?://\S+$", content): + continue + samples.append(content) + message_count += 1 + + print(f" {message_count} messages → {len(samples)} passed filters") + _write_corpus_file("social_reply_discord.txt", samples, "discord") + + +# ------------------------------------------------------------------ # +# Entrypoint +# ------------------------------------------------------------------ # + +def main() -> None: + parser = argparse.ArgumentParser(description="Gather writing corpus for voice benchmark") + parser.add_argument( + "--source", + choices=["reddit", "local", "all"], + default="all", + help="Which sources to gather (default: all)", + ) + parser.add_argument( + "--discord", + type=Path, + metavar="ZIP", + help="Path to Discord data export zip", + ) + args = parser.parse_args() + + _CORPUS_DIR.mkdir(parents=True, exist_ok=True) + print(f"Output: {_CORPUS_DIR}\n") + + if args.source in ("reddit", "all"): + gather_reddit() + print() + + if args.source in ("local", "all"): + gather_campaigns() + print() + gather_documents() + print() + + if args.discord: + if not args.discord.exists(): + print(f"Error: Discord export not found: {args.discord}") + else: + gather_discord(args.discord) + print() + + if not args.discord and args.source in ("local", "all"): + print("Discord: manual step required") + print(" 1. Discord Settings → Privacy & Safety → Request all my data") + print(" 2. Download the zip from the email link") + print(" 3. Run: python scripts/gather_corpus.py --discord /path/to/package.zip") + print() + + # Summary + corpus_files = sorted(_CORPUS_DIR.glob("*.txt")) + total_chars = sum(f.stat().st_size for f in corpus_files) + print(f"Corpus: {len(corpus_files)} file(s), {total_chars // 1024}KB total") + for f in corpus_files: + print(f" {f.name}") + + +if __name__ == "__main__": + main()