avocet/scripts/gather_corpus.py

355 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Corpus gatherer for the voice benchmark fine-tune pipeline.
Pulls writing samples from multiple sources and drops .txt files into
data/voice_corpus/ in the format expected by benchmark_voice.py.
Sources:
- Reddit: u/pyr0ball post history + comment history (public JSON API)
- Campaign copy: claude-bridge/reddit-poster/campaigns/*.py (BODY strings)
- Documents: brainmap, homeprojects notes, selected personal writing
- Discord: requires manual export (see instructions below)
Usage:
# Full gather (Reddit + local sources)
conda run -n cf python scripts/gather_corpus.py
# Reddit only
conda run -n cf python scripts/gather_corpus.py --source reddit
# Local files only (no network)
conda run -n cf python scripts/gather_corpus.py --source local
# Process a Discord data export zip
conda run -n cf python scripts/gather_corpus.py --discord /path/to/discord-export.zip
Discord export instructions:
Discord Settings → Privacy & Safety → Request all my data
Wait for email, download zip, then run with --discord flag.
"""
from __future__ import annotations
import argparse
import ast
import json
import re
import time
import zipfile
from pathlib import Path
from typing import Any
import httpx
# ------------------------------------------------------------------ #
# Paths
# ------------------------------------------------------------------ #
_ROOT = Path(__file__).parent.parent
_CORPUS_DIR = _ROOT / "data" / "style_corpus"
_CLAUDE_BRIDGE = Path("/Library/Development/CircuitForge/claude-bridge")
_DOCUMENTS = Path("/Library/Documents")
_REDDIT_USER = "pyr0ball"
_USER_AGENT = "Avocet/0.1 corpus-gatherer (CircuitForge; personal research)"
_REDDIT_BASE = "https://www.reddit.com"
# Minimum character length to include a sample (filters out one-liners)
_MIN_LENGTH = 80
# Phrases that suggest AI-generated content — skip these
_AI_TELLS = [
"certainly!", "absolutely!", "great question", "i'd be happy to",
"i apologize for", "it's worth noting", "in conclusion,",
"feel free to reach out",
]
# ------------------------------------------------------------------ #
# Helpers
# ------------------------------------------------------------------ #
def _is_ai_generated(text: str) -> bool:
lower = text.lower()
return any(phrase in lower for phrase in _AI_TELLS)
def _clean(text: str) -> str:
"""Strip Reddit formatting artifacts and normalize whitespace."""
text = re.sub(r"\[deleted\]|\[removed\]", "", text)
text = re.sub(r"\s+", " ", text).strip()
return text
def _write_corpus_file(filename: str, samples: list[str], source_label: str) -> None:
"""Write samples to a corpus .txt file with minimal separators."""
path = _CORPUS_DIR / filename
kept = [s for s in samples if len(s) >= _MIN_LENGTH and not _is_ai_generated(s)]
if not kept:
print(f" [skip] {filename} — no samples passed filters")
return
separator = "\n\n---\n\n"
path.write_text(separator.join(kept), encoding="utf-8")
print(f" [ok] {filename}{len(kept)} samples ({path.stat().st_size // 1024}KB)")
# ------------------------------------------------------------------ #
# Reddit source
# ------------------------------------------------------------------ #
def _reddit_fetch_page(
client: httpx.Client,
listing_type: str,
after: str | None,
) -> tuple[list[dict[str, Any]], str | None]:
"""Fetch one page of a user's submitted posts or comments."""
params: dict[str, Any] = {"limit": 100, "raw_json": 1}
if after:
params["after"] = after
url = f"{_REDDIT_BASE}/user/{_REDDIT_USER}/{listing_type}.json"
resp = client.get(url, params=params)
resp.raise_for_status()
data = resp.json()
children = data["data"]["children"]
new_after = data["data"].get("after")
return [c["data"] for c in children], new_after
def _reddit_fetch_all(listing_type: str, max_items: int = 1000) -> list[dict[str, Any]]:
"""Paginate through a user listing until exhausted or max_items reached."""
items: list[dict[str, Any]] = []
after: str | None = None
with httpx.Client(
headers={"User-Agent": _USER_AGENT},
follow_redirects=True,
timeout=20.0,
) as client:
while len(items) < max_items:
try:
page, after = _reddit_fetch_page(client, listing_type, after)
except httpx.HTTPStatusError as exc:
# Reddit blocks unauthenticated pagination after the first page;
# save what we have rather than crashing.
print(f" stopped at {len(items)} {listing_type} (HTTP {exc.response.status_code})")
break
if not page:
break
items.extend(page)
print(f" fetched {len(items)} {listing_type}...")
if not after:
break
time.sleep(1.0) # respect rate limit
return items
def gather_reddit() -> None:
print("Fetching Reddit history for u/pyr0ball...")
# Posts (submitted)
print(" Posts:")
posts = _reddit_fetch_all("submitted")
post_texts: list[str] = []
for p in posts:
body = _clean(p.get("selftext", "") or "")
title = _clean(p.get("title", ""))
if len(body) >= _MIN_LENGTH:
post_texts.append(f"{title}\n\n{body}")
elif len(title) >= 20:
# Title-only posts (link posts) — include title as micro-sample
post_texts.append(title)
_write_corpus_file("social_post_reddit.txt", post_texts, "reddit/submitted")
# Comments
print(" Comments:")
comments = _reddit_fetch_all("comments")
comment_texts: list[str] = []
for c in comments:
body = _clean(c.get("body", "") or "")
if body and body not in ("[deleted]", "[removed]"):
comment_texts.append(body)
_write_corpus_file("social_reply_reddit_comments.txt", comment_texts, "reddit/comments")
print(f" Done. {len(posts)} posts, {len(comments)} comments fetched.")
# ------------------------------------------------------------------ #
# Campaign copy source (claude-bridge)
# ------------------------------------------------------------------ #
def _extract_body_from_campaign(py_file: Path) -> str | None:
"""
Parse a campaign Python file and extract the BODY string literal.
Uses AST to handle multi-line strings safely.
"""
try:
tree = ast.parse(py_file.read_text(encoding="utf-8"))
for node in ast.walk(tree):
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == "BODY":
if isinstance(node.value, ast.Constant):
return str(node.value.value)
except (SyntaxError, UnicodeDecodeError):
pass
return None
def gather_campaigns() -> None:
campaigns_dir = _CLAUDE_BRIDGE / "reddit-poster" / "campaigns"
if not campaigns_dir.exists():
print(f" [skip] campaigns dir not found: {campaigns_dir}")
return
print("Gathering campaign copy from claude-bridge...")
samples: list[str] = []
for py_file in sorted(campaigns_dir.glob("*.py")):
body = _extract_body_from_campaign(py_file)
if body:
samples.append(body.strip())
print(f" {py_file.name}{len(body)} chars")
_write_corpus_file("narrative_campaign_copy.txt", samples, "claude-bridge/campaigns")
# ------------------------------------------------------------------ #
# Documents source
# ------------------------------------------------------------------ #
def gather_documents() -> None:
print("Gathering local Documents...")
samples: list[str] = []
# brainmap — personal planning/thinking notes
brainmap = _DOCUMENTS / "brainmap_v1.md"
if brainmap.exists():
text = _clean(brainmap.read_text(encoding="utf-8"))
if len(text) >= _MIN_LENGTH:
samples.append(text)
print(f" brainmap_v1.md — {len(text)} chars")
# HomeProjects handoff notes — casual technical prose
for handoff in sorted((_DOCUMENTS / "HomeProjects").glob("handoff*.md")):
text = _clean(handoff.read_text(encoding="utf-8", errors="replace"))
if len(text) >= _MIN_LENGTH:
samples.append(text)
print(f" {handoff.name}{len(text)} chars")
# Personal letters (Closet folder) — intimate prose voice
closet = _DOCUMENTS / "Closet"
if closet.exists():
for letter in closet.glob("*.md"):
text = _clean(letter.read_text(encoding="utf-8", errors="replace"))
if len(text) >= _MIN_LENGTH and not _is_ai_generated(text):
samples.append(text)
print(f" {letter.name}{len(text)} chars")
_write_corpus_file("narrative_personal_docs.txt", samples, "documents")
# ------------------------------------------------------------------ #
# Discord export source
# ------------------------------------------------------------------ #
def gather_discord(export_zip: Path) -> None:
"""
Process a Discord data export zip (from Settings → Privacy & Safety → Request all my data).
Expected zip structure:
messages/
c{channel_id}/
messages.json -- list of {ID, Timestamp, Contents, Attachments}
account/
user.json -- {username, ...}
"""
print(f"Processing Discord export: {export_zip}")
samples: list[str] = []
message_count = 0
with zipfile.ZipFile(export_zip) as zf:
# Find all messages.json files
message_files = [n for n in zf.namelist() if n.endswith("/messages.json")]
print(f" Found {len(message_files)} channel(s)")
for mf in message_files:
try:
data = json.loads(zf.read(mf))
except (json.JSONDecodeError, KeyError):
continue
for msg in data:
content = _clean(msg.get("Contents", "") or "")
# Skip system messages, bot commands, very short messages
if (
len(content) < _MIN_LENGTH
or content.startswith("/")
or content.startswith("!")
or _is_ai_generated(content)
):
continue
# Skip messages that are just URLs or attachments
if re.match(r"^https?://\S+$", content):
continue
samples.append(content)
message_count += 1
print(f" {message_count} messages → {len(samples)} passed filters")
_write_corpus_file("social_reply_discord.txt", samples, "discord")
# ------------------------------------------------------------------ #
# Entrypoint
# ------------------------------------------------------------------ #
def main() -> None:
parser = argparse.ArgumentParser(description="Gather writing corpus for voice benchmark")
parser.add_argument(
"--source",
choices=["reddit", "local", "all"],
default="all",
help="Which sources to gather (default: all)",
)
parser.add_argument(
"--discord",
type=Path,
metavar="ZIP",
help="Path to Discord data export zip",
)
args = parser.parse_args()
_CORPUS_DIR.mkdir(parents=True, exist_ok=True)
print(f"Output: {_CORPUS_DIR}\n")
if args.source in ("reddit", "all"):
gather_reddit()
print()
if args.source in ("local", "all"):
gather_campaigns()
print()
gather_documents()
print()
if args.discord:
if not args.discord.exists():
print(f"Error: Discord export not found: {args.discord}")
else:
gather_discord(args.discord)
print()
if not args.discord and args.source in ("local", "all"):
print("Discord: manual step required")
print(" 1. Discord Settings → Privacy & Safety → Request all my data")
print(" 2. Download the zip from the email link")
print(" 3. Run: python scripts/gather_corpus.py --discord /path/to/package.zip")
print()
# Summary
corpus_files = sorted(_CORPUS_DIR.glob("*.txt"))
total_chars = sum(f.stat().st_size for f in corpus_files)
print(f"Corpus: {len(corpus_files)} file(s), {total_chars // 1024}KB total")
for f in corpus_files:
print(f" {f.name}")
if __name__ == "__main__":
main()