magpie/app/services/reddit/discovery.py
Alan Weinstock 35c6e5f7bc feat(discovery): add Lemmy community search, fix dead request, add platform field
- Add app/services/lemmy/discovery.py: searches 5 major Lemmy instances,
  deduplicates by actor_id (AP canonical URL), skips NSFW communities,
  uses community@instance naming convention matching existing Lemmy client
- Update POST /subs/discover: accepts platforms[] param (default both),
  fans out to Reddit + Lemmy search, merges and sorts by subscribers
- Add platform field to all discovery result dicts (Reddit and Lemmy)
- Fix: remove dead _get() call left in search_subs() during earlier refactor
- Frontend: show platform badge on each discovery row, correct hyperlink
  format for Lemmy (https://{instance}/c/{community}), pass r.platform
  to upsertRules on import so Lemmy subs land in the lemmy platform slot
2026-06-13 22:23:31 -07:00

232 lines
8.1 KiB
Python

"""
Subreddit discovery and rule analysis.
Searches Reddit for relevant communities by keyword, fetches each sub's
about page and posting rules, and classifies promo policy automatically.
Results are returned for user review — nothing is stored until the user
explicitly imports a sub via PUT /subs/{sub}.
"""
from __future__ import annotations
import logging
import re
from typing import Any
import httpx
logger = logging.getLogger(__name__)
_USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64) Chrome/124.0.0.0"
_BASE = "https://www.reddit.com"
# Keyword patterns for promo classification (applied to rule title + rule body text)
_BAN_PATTERNS = [
r"\bno\b.{0,20}\b(self.?promo|advertising|promotional|affiliate|soliciting|spam)\b",
r"\b(self.?promo|advertising|promotional)\b.{0,20}\bnot allowed\b",
r"\bdo not\b.{0,20}\b(post|share|submit)\b.{0,20}\b(your|own)\b.{0,20}\b(blog|site|product|service)",
r"\bno\b.{0,20}\bself.?serving\b",
r"\bcommercial\b.{0,10}\bcontent\b.{0,10}\bprohibited\b",
]
_COND_PATTERNS = [
r"\b(9|10)\s*[:/]\s*1\b", # 9:1 / 10:1 rule
r"\blimited\b.{0,20}\bself.?promo\b",
r"\bself.?promo.{0,40}\ballow(ed)?\b.{0,20}\b(friday|thread|megathread|weekly|monthly)\b",
r"\bonly.{0,20}\b(friday|weekly|monthly)\b.{0,30}\bpromo\b",
r"\bself.?promotion\b.{0,40}\bonly\b",
r"\bpromotion.{0,40}\bmoderat",
]
_BAN_RE = [re.compile(p, re.I | re.S) for p in _BAN_PATTERNS]
_COND_RE = [re.compile(p, re.I | re.S) for p in _COND_PATTERNS]
_FLAIR_RE = re.compile(r"\bflair\b", re.I)
def _get(url: str, cookies: dict | None = None, timeout: int = 10) -> httpx.Response:
return httpx.get(
url,
cookies=cookies or {},
headers={"User-Agent": _USER_AGENT},
timeout=timeout,
follow_redirects=True,
)
def _classify_rules(rules: list[dict]) -> tuple[int | None, bool, str | None]:
"""
Returns (promo_allowed, flair_required, notes).
promo_allowed: 0 = banned, 1 = allowed (never set — hard to detect positively), None = unknown
"""
ban_reason: str | None = None
is_conditional = False
flair_required = False
notes_parts: list[str] = []
for rule in rules:
text = f"{rule.get('short_name', '')} {rule.get('description', '')}"
if any(p.search(text) for p in _BAN_RE):
if ban_reason is None:
ban_reason = rule.get("short_name", "Promo banned by rule")
notes_parts.append(f"[ban] {rule.get('short_name', '')}: {rule.get('description', '')[:120]}")
elif any(p.search(text) for p in _COND_RE):
is_conditional = True
notes_parts.append(f"[cond] {rule.get('short_name', '')}: {rule.get('description', '')[:120]}")
if _FLAIR_RE.search(text):
flair_required = True
if ban_reason:
promo_allowed: int | None = 0
elif is_conditional:
promo_allowed = None # keep unknown; notes will explain
else:
promo_allowed = None # can't positively assert allowed
notes = "; ".join(notes_parts) if notes_parts else None
return promo_allowed, flair_required, notes
def _fetch_flairs(sub: str, cookies: dict | None) -> list[str]:
"""Fetch available link flairs (requires auth; returns [] if unavailable)."""
try:
r = _get(f"{_BASE}/r/{sub}/api/link_flair_v2.json", cookies=cookies)
if r.status_code == 200:
return [f.get("text", "") for f in r.json() if f.get("text")]
except Exception:
pass
return []
def analyze_sub(
sub: str,
cookies: dict | None = None,
known_subs: set[str] | None = None,
) -> dict[str, Any] | None:
"""
Fetch about + rules for a single sub and return an analysis dict.
Returns None if the sub doesn't exist or is inaccessible.
"""
try:
about_r = _get(f"{_BASE}/r/{sub}/about.json", cookies=cookies)
if about_r.status_code != 200:
return None
about = about_r.json().get("data", {})
if about.get("subreddit_type") in ("private", "restricted", "employee_only"):
return None
rules_r = _get(f"{_BASE}/r/{sub}/about/rules.json", cookies=cookies)
rules = rules_r.json().get("rules", []) if rules_r.status_code == 200 else []
promo_allowed, flair_required, notes = _classify_rules(rules)
available_flairs = _fetch_flairs(sub, cookies) if flair_required else []
subscribers = about.get("subscribers") or 0
title = about.get("title") or sub
description = (about.get("public_description") or about.get("description") or "").strip()
return {
"sub": sub,
"title": title,
"subscribers": subscribers,
"description": description[:280],
"promo_allowed": promo_allowed,
"flair_required": flair_required,
"available_flairs": available_flairs,
"rule_warning": False,
"notes": notes,
"already_tracked": (sub.lower() in known_subs) if known_subs is not None else False,
"platform": "reddit",
}
except Exception:
logger.exception("Error analyzing r/%s", sub)
return None
def search_subs(
keyword: str,
limit: int = 20,
cookies: dict | None = None,
known_subs: set[str] | None = None,
) -> list[dict[str, Any]]:
"""
Search subreddits by keyword and analyze each result.
Returns a list of analysis dicts sorted by subscriber count (desc).
"""
try:
r = httpx.get(
f"{_BASE}/subreddits/search.json",
params={"q": keyword, "limit": min(limit, 50), "sort": "relevance"},
cookies=cookies or {},
headers={"User-Agent": _USER_AGENT},
timeout=10,
follow_redirects=True,
)
if r.status_code != 200:
logger.warning("Subreddit search returned %d for %r", r.status_code, keyword)
return []
children = r.json().get("data", {}).get("children", [])
except Exception:
logger.exception("Error searching subreddits for %r", keyword)
return []
results: list[dict] = []
for child in children:
data = child.get("data", {})
sub_name = data.get("display_name")
if not sub_name:
continue
if data.get("subreddit_type") in ("private", "restricted", "employee_only"):
continue
# Light analysis from search result data (avoids N per-sub about requests)
promo_allowed, flair_required, notes = _classify_rules([]) # no rules yet
subscribers = data.get("subscribers") or 0
title = data.get("title") or sub_name
description = (data.get("public_description") or data.get("description") or "").strip()
results.append({
"sub": sub_name,
"title": title,
"subscribers": subscribers,
"description": description[:280],
"promo_allowed": None, # unknown until rules are fetched
"flair_required": False,
"available_flairs": [],
"rule_warning": False,
"notes": None,
"already_tracked": (sub_name.lower() in known_subs) if known_subs is not None else False,
"platform": "reddit",
})
# Sort by subscribers descending
results.sort(key=lambda x: x["subscribers"], reverse=True)
return results[:limit]
def search_and_analyze(
keyword: str,
limit: int = 15,
cookies: dict | None = None,
known_subs: set[str] | None = None,
) -> list[dict[str, Any]]:
"""
Search subreddits by keyword, then fetch full rules for each result.
This is the main entry point for the discovery endpoint.
Runs sequentially — limit to 15 to keep latency reasonable.
"""
candidates = search_subs(keyword, limit=limit, cookies=cookies, known_subs=known_subs)
analyzed = []
for c in candidates:
result = analyze_sub(c["sub"], cookies=cookies, known_subs=known_subs)
if result is not None:
analyzed.append(result)
else:
# Sub is inaccessible — skip silently
pass
analyzed.sort(key=lambda x: x["subscribers"], reverse=True)
return analyzed