""" Subreddit discovery and rule analysis. Searches Reddit for relevant communities by keyword, fetches each sub's about page and posting rules, and classifies promo policy automatically. Results are returned for user review — nothing is stored until the user explicitly imports a sub via PUT /subs/{sub}. """ from __future__ import annotations import logging import re from typing import Any import httpx logger = logging.getLogger(__name__) _USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64) Chrome/124.0.0.0" _BASE = "https://www.reddit.com" # Keyword patterns for promo classification (applied to rule title + rule body text) _BAN_PATTERNS = [ r"\bno\b.{0,20}\b(self.?promo|advertising|promotional|affiliate|soliciting|spam)\b", r"\b(self.?promo|advertising|promotional)\b.{0,20}\bnot allowed\b", r"\bdo not\b.{0,20}\b(post|share|submit)\b.{0,20}\b(your|own)\b.{0,20}\b(blog|site|product|service)", r"\bno\b.{0,20}\bself.?serving\b", r"\bcommercial\b.{0,10}\bcontent\b.{0,10}\bprohibited\b", ] _COND_PATTERNS = [ r"\b(9|10)\s*[:/]\s*1\b", # 9:1 / 10:1 rule r"\blimited\b.{0,20}\bself.?promo\b", r"\bself.?promo.{0,40}\ballow(ed)?\b.{0,20}\b(friday|thread|megathread|weekly|monthly)\b", r"\bonly.{0,20}\b(friday|weekly|monthly)\b.{0,30}\bpromo\b", r"\bself.?promotion\b.{0,40}\bonly\b", r"\bpromotion.{0,40}\bmoderat", ] _BAN_RE = [re.compile(p, re.I | re.S) for p in _BAN_PATTERNS] _COND_RE = [re.compile(p, re.I | re.S) for p in _COND_PATTERNS] _FLAIR_RE = re.compile(r"\bflair\b", re.I) def _get(url: str, cookies: dict | None = None, timeout: int = 10) -> httpx.Response: return httpx.get( url, cookies=cookies or {}, headers={"User-Agent": _USER_AGENT}, timeout=timeout, follow_redirects=True, ) def _classify_rules(rules: list[dict]) -> tuple[int | None, bool, str | None]: """ Returns (promo_allowed, flair_required, notes). promo_allowed: 0 = banned, 1 = allowed (never set — hard to detect positively), None = unknown """ ban_reason: str | None = None is_conditional = False flair_required = False notes_parts: list[str] = [] for rule in rules: text = f"{rule.get('short_name', '')} {rule.get('description', '')}" if any(p.search(text) for p in _BAN_RE): if ban_reason is None: ban_reason = rule.get("short_name", "Promo banned by rule") notes_parts.append(f"[ban] {rule.get('short_name', '')}: {rule.get('description', '')[:120]}") elif any(p.search(text) for p in _COND_RE): is_conditional = True notes_parts.append(f"[cond] {rule.get('short_name', '')}: {rule.get('description', '')[:120]}") if _FLAIR_RE.search(text): flair_required = True if ban_reason: promo_allowed: int | None = 0 elif is_conditional: promo_allowed = None # keep unknown; notes will explain else: promo_allowed = None # can't positively assert allowed notes = "; ".join(notes_parts) if notes_parts else None return promo_allowed, flair_required, notes def _fetch_flairs(sub: str, cookies: dict | None) -> list[str]: """Fetch available link flairs (requires auth; returns [] if unavailable).""" try: r = _get(f"{_BASE}/r/{sub}/api/link_flair_v2.json", cookies=cookies) if r.status_code == 200: return [f.get("text", "") for f in r.json() if f.get("text")] except Exception: pass return [] def analyze_sub( sub: str, cookies: dict | None = None, known_subs: set[str] | None = None, ) -> dict[str, Any] | None: """ Fetch about + rules for a single sub and return an analysis dict. Returns None if the sub doesn't exist or is inaccessible. """ try: about_r = _get(f"{_BASE}/r/{sub}/about.json", cookies=cookies) if about_r.status_code != 200: return None about = about_r.json().get("data", {}) if about.get("subreddit_type") in ("private", "restricted", "employee_only"): return None rules_r = _get(f"{_BASE}/r/{sub}/about/rules.json", cookies=cookies) rules = rules_r.json().get("rules", []) if rules_r.status_code == 200 else [] promo_allowed, flair_required, notes = _classify_rules(rules) available_flairs = _fetch_flairs(sub, cookies) if flair_required else [] subscribers = about.get("subscribers") or 0 title = about.get("title") or sub description = (about.get("public_description") or about.get("description") or "").strip() return { "sub": sub, "title": title, "subscribers": subscribers, "description": description[:280], "promo_allowed": promo_allowed, "flair_required": flair_required, "available_flairs": available_flairs, "rule_warning": False, "notes": notes, "already_tracked": (sub.lower() in known_subs) if known_subs is not None else False, } except Exception: logger.exception("Error analyzing r/%s", sub) return None def search_subs( keyword: str, limit: int = 20, cookies: dict | None = None, known_subs: set[str] | None = None, ) -> list[dict[str, Any]]: """ Search subreddits by keyword and analyze each result. Returns a list of analysis dicts sorted by subscriber count (desc). """ try: search_r = _get( f"{_BASE}/subreddits/search.json", cookies=cookies, ) # httpx doesn't support params kwarg above since we're using _get; rebuild r = httpx.get( f"{_BASE}/subreddits/search.json", params={"q": keyword, "limit": min(limit, 50), "sort": "relevance"}, cookies=cookies or {}, headers={"User-Agent": _USER_AGENT}, timeout=10, follow_redirects=True, ) if r.status_code != 200: logger.warning("Subreddit search returned %d for %r", r.status_code, keyword) return [] children = r.json().get("data", {}).get("children", []) except Exception: logger.exception("Error searching subreddits for %r", keyword) return [] results: list[dict] = [] for child in children: data = child.get("data", {}) sub_name = data.get("display_name") if not sub_name: continue if data.get("subreddit_type") in ("private", "restricted", "employee_only"): continue # Light analysis from search result data (avoids N per-sub about requests) promo_allowed, flair_required, notes = _classify_rules([]) # no rules yet subscribers = data.get("subscribers") or 0 title = data.get("title") or sub_name description = (data.get("public_description") or data.get("description") or "").strip() results.append({ "sub": sub_name, "title": title, "subscribers": subscribers, "description": description[:280], "promo_allowed": None, # unknown until rules are fetched "flair_required": False, "available_flairs": [], "rule_warning": False, "notes": None, "already_tracked": (sub_name.lower() in known_subs) if known_subs is not None else False, }) # Sort by subscribers descending results.sort(key=lambda x: x["subscribers"], reverse=True) return results[:limit] def search_and_analyze( keyword: str, limit: int = 15, cookies: dict | None = None, known_subs: set[str] | None = None, ) -> list[dict[str, Any]]: """ Search subreddits by keyword, then fetch full rules for each result. This is the main entry point for the discovery endpoint. Runs sequentially — limit to 15 to keep latency reasonable. """ candidates = search_subs(keyword, limit=limit, cookies=cookies, known_subs=known_subs) analyzed = [] for c in candidates: result = analyze_sub(c["sub"], cookies=cookies, known_subs=known_subs) if result is not None: analyzed.append(result) else: # Sub is inaccessible — skip silently pass analyzed.sort(key=lambda x: x["subscribers"], reverse=True) return analyzed