Compare commits

..

No commits in common. "8200c154c1f05cae2d8ab5261d98c82ee7e20e33" and "80718e206cd7453e5b564288a39429b94f34c26c" have entirely different histories.

41 changed files with 152 additions and 1800 deletions

View file

@ -1,63 +0,0 @@
from __future__ import annotations
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from app.services.directus import get_blog_post, publish_blog_post, update_blog_post
router = APIRouter(prefix="/blog", tags=["blog"])
class PublishRequest(BaseModel):
title: str
body: str
slug: str | None = None
tags: list[str] | None = None
author: str | None = None
seo_description: str | None = None
published_at: str | None = None # ISO 8601; defaults to now
class UpdateRequest(BaseModel):
title: str | None = None
body: str | None = None
tags: list[str] | None = None
seo_description: str | None = None
published_at: str | None = None
@router.post("", summary="Publish a blog post to the CircuitForge website via Directus")
def publish(req: PublishRequest) -> dict:
try:
item = publish_blog_post(
title=req.title,
body=req.body,
slug=req.slug,
tags=req.tags,
author=req.author,
seo_description=req.seo_description,
published_at=req.published_at,
)
return {"ok": True, "post": item}
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
@router.get("/{slug}", summary="Fetch a blog post by slug")
def get_post(slug: str) -> dict:
post = get_blog_post(slug)
if post is None:
raise HTTPException(status_code=404, detail=f"No blog post with slug '{slug}'")
return post
@router.patch("/{post_id}", summary="Update an existing blog post by Directus ID")
def update(post_id: int, req: UpdateRequest) -> dict:
fields = req.model_dump(exclude_none=True)
if not fields:
raise HTTPException(status_code=400, detail="No fields to update")
try:
item = update_blog_post(post_id, fields)
return {"ok": True, "post": item}
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))

View file

@ -57,10 +57,6 @@ class DismissBody(BaseModel):
note: str | None = None
class MarkPostedBody(BaseModel):
url: str | None = None
# ------------------------------------------------------------------ #
# Routes
# ------------------------------------------------------------------ #
@ -133,29 +129,14 @@ async def approve_opportunity(opportunity_id: int):
@router.post("/{opportunity_id}/mark-posted")
async def mark_posted(opportunity_id: int, body: MarkPostedBody = MarkPostedBody(), manual: bool = False):
"""Record that a post was successfully made (auto or manual).
When manual=True, also writes a row to the posts table for history tracking."""
opp = await asyncio.to_thread(_in_thread, lambda s: s.get_opportunity(opportunity_id))
if opp is None:
raise HTTPException(404, "Opportunity not found")
async def mark_posted(opportunity_id: int, manual: bool = False):
"""Record that a post was successfully made (auto or manual)."""
status = "manual_posted" if manual else "posted"
result = await asyncio.to_thread(
_in_thread, lambda s: s.update_opportunity(opportunity_id, status=status)
)
if manual:
await asyncio.to_thread(
_in_thread,
lambda s: s.log_manual_post(
opportunity_id=opportunity_id,
platform=opp["platform"],
target=opp["community"],
url=body.url,
),
)
if result is None:
raise HTTPException(404, "Opportunity not found")
return result

View file

@ -1,93 +0,0 @@
"""Reddit session management endpoints."""
from __future__ import annotations
import subprocess
import sys
import time
from pathlib import Path
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from app.services.reddit.session import ensure_valid_session, refresh_session, session_is_valid
router = APIRouter(tags=["reddit"])
BRIDGE_SESSION = Path("/Library/Development/CircuitForge/claude-bridge/reddit-poster/session.json")
BRIDGE_POST_SCRIPT = Path("/Library/Development/CircuitForge/claude-bridge/reddit-poster/post.py")
class SessionStatusResponse(BaseModel):
target: str
valid: bool
age_hours: float | None
session_file: str
class RefreshResponse(BaseModel):
target: str
ok: bool
message: str
def _session_age_hours(path: Path) -> float | None:
if not path.exists():
return None
return round((time.time() - path.stat().st_mtime) / 3600, 1)
@router.get("/reddit/session-status")
def session_status(target: str = "magpie") -> SessionStatusResponse:
"""Return session validity and age for magpie or bridge session."""
if target == "bridge":
valid = BRIDGE_SESSION.exists() and _session_age_hours(BRIDGE_SESSION) < 12
return SessionStatusResponse(
target="bridge",
valid=valid,
age_hours=_session_age_hours(BRIDGE_SESSION),
session_file=str(BRIDGE_SESSION),
)
# magpie session
from app.core.config import get_settings
session_file = Path(get_settings().reddit_session_file)
return SessionStatusResponse(
target="magpie",
valid=session_is_valid(session_file),
age_hours=_session_age_hours(session_file),
session_file=str(session_file),
)
@router.post("/reddit/refresh-session")
def refresh_reddit_session(target: str = "magpie") -> RefreshResponse:
"""
Re-establish the Playwright Reddit session.
target: "magpie" (default) refreshes Magpie's session.
"bridge" refreshes claude-bridge/reddit-poster/session.json.
"""
if target == "bridge":
if not BRIDGE_POST_SCRIPT.exists():
raise HTTPException(status_code=404, detail="claude-bridge post.py not found")
result = subprocess.run(
["xvfb-run", "--auto-servernum", sys.executable, str(BRIDGE_POST_SCRIPT), "--login"],
cwd=str(BRIDGE_POST_SCRIPT.parent),
capture_output=True,
text=True,
timeout=120,
)
if result.returncode != 0:
raise HTTPException(
status_code=500,
detail=f"Bridge session refresh failed: {result.stderr.strip()}",
)
return RefreshResponse(target="bridge", ok=True, message="Bridge session refreshed.")
# magpie session
try:
from app.core.config import get_settings
session_file = Path(get_settings().reddit_session_file)
refresh_session(session_file)
return RefreshResponse(target="magpie", ok=True, message="Magpie session refreshed.")
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc))

View file

@ -25,7 +25,6 @@ class SubRulesUpsert(BaseModel):
promo_allowed: bool | None = None # None = unknown
rule_warning: bool = False
notes: str | None = None
post_url: str | None = None # override link for Copy & Post (e.g. megathread)
@router.get("")

View file

@ -1,6 +1,6 @@
from fastapi import FastAPI
from app.api.endpoints import blog, campaigns, opportunities, posts, reddit, scheduler, signals, subs
from app.api.endpoints import campaigns, opportunities, posts, scheduler, signals, subs
def register_routes(app: FastAPI) -> None:
@ -10,5 +10,3 @@ def register_routes(app: FastAPI) -> None:
app.include_router(scheduler.router, prefix="/api/v1")
app.include_router(opportunities.router, prefix="/api/v1")
app.include_router(signals.router, prefix="/api/v1")
app.include_router(blog.router, prefix="/api/v1")
app.include_router(reddit.router, prefix="/api/v1")

View file

@ -21,13 +21,6 @@ class Settings(BaseSettings):
# Scheduler
scheduler_enabled: bool = True
# Directus (CircuitForge website CMS)
directus_url: str = "http://172.31.0.4:8055"
directus_admin_token: str = ""
directus_admin_email: str = ""
directus_admin_password: str = ""
directus_network: str = "website_cf-internal"
# Signal scraper
scraper_enabled: bool = True
scraper_interval_mins: int = 30 # how often to poll (per full pass of all subs)

View file

@ -1,4 +0,0 @@
-- Add optional post_url to sub_rules.
-- When set, the Copy & Post modal links here instead of /r/{sub}/submit.
-- Use for megathreads, weekly threads, or any pinned destination.
ALTER TABLE sub_rules ADD COLUMN post_url TEXT;

View file

@ -1,4 +0,0 @@
-- Link posts to opportunities for manual/signal-driven posts.
-- NULL = campaign-scheduled post (existing behaviour).
ALTER TABLE posts ADD COLUMN opportunity_id INTEGER REFERENCES opportunities(id) ON DELETE SET NULL;
CREATE INDEX IF NOT EXISTS idx_posts_opportunity ON posts(opportunity_id);

View file

@ -1,2 +0,0 @@
-- app/db/migrations/015_campaign_type.sql
ALTER TABLE campaigns ADD COLUMN type TEXT NOT NULL DEFAULT 'reddit_post';

View file

@ -1,15 +0,0 @@
-- app/db/migrations/016_campaign_subs_comment_config.sql
-- thread_title_pattern: case-insensitive substring to match the sticky thread title.
-- Required for reddit_comment campaigns; NULL for reddit_post campaigns.
ALTER TABLE campaign_subs ADD COLUMN thread_title_pattern TEXT;
-- thread_url_override: skip thread detection and comment directly on this URL.
-- Takes precedence over thread_title_pattern when set.
ALTER TABLE campaign_subs ADD COLUMN thread_url_override TEXT;
-- occurrence: scheduling modifier applied inside the job runner before execution.
-- "every" — fire on every scheduled run (default)
-- "first_sunday" — fire only on the first Sunday of the month
-- NULL — treated as "every"
ALTER TABLE campaign_subs ADD COLUMN occurrence TEXT;

View file

@ -1,28 +0,0 @@
-- Make campaign_id nullable on posts to support manual opportunity posts
-- that don't belong to a campaign. SQLite requires a full table rebuild
-- to drop a NOT NULL constraint.
CREATE TABLE posts_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_id INTEGER REFERENCES campaigns(id),
variant_id INTEGER REFERENCES campaign_variants(id),
opportunity_id INTEGER REFERENCES opportunities(id) ON DELETE SET NULL,
platform TEXT NOT NULL DEFAULT 'reddit',
target TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
url TEXT,
error_msg TEXT,
screenshot_path TEXT,
triggered_by TEXT DEFAULT 'scheduler',
posted_at TEXT NOT NULL DEFAULT (datetime('now'))
);
INSERT INTO posts_new SELECT id, campaign_id, variant_id, opportunity_id, platform, target, status, url, error_msg, screenshot_path, triggered_by, posted_at FROM posts;
DROP TABLE posts;
ALTER TABLE posts_new RENAME TO posts;
CREATE INDEX IF NOT EXISTS idx_posts_campaign ON posts(campaign_id);
CREATE INDEX IF NOT EXISTS idx_posts_opportunity ON posts(opportunity_id);
CREATE INDEX IF NOT EXISTS idx_posts_target ON posts(target);
CREATE INDEX IF NOT EXISTS idx_posts_status ON posts(status);
CREATE INDEX IF NOT EXISTS idx_posts_posted_at ON posts(posted_at);

View file

@ -1,2 +0,0 @@
-- Add per-sub post cap; NULL = unlimited (evergreen). Set max_posts=1 for one-shot intro campaigns.
ALTER TABLE campaign_subs ADD COLUMN max_posts INTEGER;

View file

@ -90,32 +90,10 @@ class Store:
return self._fetchone("SELECT * FROM campaigns WHERE id = ?", (campaign_id,))
def create_campaign(self, name: str, product: str, platform: str = "reddit",
cron_schedule: str | None = None, notes: str | None = None,
type: str = "reddit_post") -> dict:
cron_schedule: str | None = None, notes: str | None = None) -> dict:
return self._insert_returning(
"INSERT INTO campaigns (name, product, platform, cron_schedule, notes, type) VALUES (?,?,?,?,?,?) RETURNING *",
(name, product, platform, cron_schedule, notes, type),
)
def get_or_create_campaign(
self,
name: str,
product: str,
platform: str = "reddit",
type: str = "reddit_post",
cron_schedule: str | None = None,
) -> dict:
existing = self._fetchone(
"SELECT * FROM campaigns WHERE name = ? AND product = ?", (name, product)
)
if existing:
return dict(existing)
return self.create_campaign(
name=name,
product=product,
platform=platform,
type=type,
cron_schedule=cron_schedule,
"INSERT INTO campaigns (name, product, platform, cron_schedule, notes) VALUES (?,?,?,?,?) RETURNING *",
(name, product, platform, cron_schedule, notes),
)
def update_campaign(self, campaign_id: int, **fields) -> dict | None:
@ -173,31 +151,6 @@ class Store:
(campaign_id, sub_pattern, title, body, flair, notes),
)
def upsert_variant(
self,
campaign_id: int,
sub_pattern: str,
title: str,
body: str,
flair: str | None = None,
) -> dict:
existing = self._fetchone(
"SELECT * FROM campaign_variants WHERE campaign_id = ? AND sub_pattern = ?",
(campaign_id, sub_pattern),
)
if existing:
self.conn.execute(
"UPDATE campaign_variants SET title=?, body=?, flair=? WHERE id=?",
(title, body, flair, existing["id"]),
)
self.conn.commit()
return self._fetchone("SELECT * FROM campaign_variants WHERE id=?", (existing["id"],))
return self._insert_returning(
"INSERT INTO campaign_variants (campaign_id, sub_pattern, title, body, flair)"
" VALUES (?,?,?,?,?) RETURNING *",
(campaign_id, sub_pattern, title, body, flair),
)
def update_variant(self, variant_id: int, **fields) -> dict | None:
allowed = {"sub_pattern", "title", "body", "flair", "notes"}
updates = {k: v for k, v in fields.items() if k in allowed}
@ -233,32 +186,6 @@ class Store:
(campaign_id, sub, sort_order),
)
def upsert_campaign_sub(
self,
campaign_id: int,
sub: str,
sort_order: int = 0,
thread_title_pattern: str | None = None,
thread_url_override: str | None = None,
occurrence: str | None = None,
) -> dict:
self.conn.execute(
"""INSERT INTO campaign_subs
(campaign_id, sub, sort_order, thread_title_pattern, thread_url_override, occurrence)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(campaign_id, sub) DO UPDATE SET
sort_order = excluded.sort_order,
thread_title_pattern = excluded.thread_title_pattern,
thread_url_override = excluded.thread_url_override,
occurrence = excluded.occurrence""",
(campaign_id, sub, sort_order, thread_title_pattern, thread_url_override, occurrence),
)
self.conn.commit()
return self._fetchone(
"SELECT * FROM campaign_subs WHERE campaign_id = ? AND sub = ?",
(campaign_id, sub),
)
def remove_campaign_sub(self, campaign_id: int, sub: str) -> bool:
cur = self.conn.execute(
"DELETE FROM campaign_subs WHERE campaign_id = ? AND sub = ?", (campaign_id, sub)
@ -286,19 +213,10 @@ class Store:
)
def create_post(self, campaign_id: int, target: str, variant_id: int | None = None,
platform: str = "reddit", triggered_by: str = "scheduler",
opportunity_id: int | None = None) -> dict:
platform: str = "reddit", triggered_by: str = "scheduler") -> dict:
return self._insert_returning(
"INSERT INTO posts (campaign_id, variant_id, platform, target, status, triggered_by, opportunity_id) VALUES (?,?,?,?,'pending',?,?) RETURNING *",
(campaign_id, variant_id, platform, target, triggered_by, opportunity_id),
)
def log_manual_post(self, opportunity_id: int, platform: str, target: str,
url: str | None = None) -> dict:
"""Create a success post record for a manually executed opportunity post."""
return self._insert_returning(
"INSERT INTO posts (campaign_id, opportunity_id, platform, target, status, triggered_by, url) VALUES (NULL,?,?,?,'success','manual',?) RETURNING *",
(opportunity_id, platform, target, url),
"INSERT INTO posts (campaign_id, variant_id, platform, target, status, triggered_by) VALUES (?,?,?,?,'pending',?) RETURNING *",
(campaign_id, variant_id, platform, target, triggered_by),
)
def update_post_status(self, post_id: int, status: str, url: str | None = None,
@ -319,14 +237,6 @@ class Store:
)
return row is not None
def successful_post_count(self, campaign_id: int, target: str) -> int:
"""Return lifetime count of successful posts for this campaign+target pair."""
row = self._fetchone(
"SELECT COUNT(*) AS n FROM posts WHERE campaign_id = ? AND target = ? AND status = 'success'",
(campaign_id, target),
)
return row["n"] if row else 0
# ------------------------------------------------------------------ #
# Sub rules
# ------------------------------------------------------------------ #

View file

@ -1,148 +0,0 @@
"""
Directus blog post publisher for the CircuitForge website CMS.
Directus runs in Docker on the website_cf-internal network and is not
directly reachable from host processes. We shell out to a one-shot
curlimages/curl container joined to that network.
Collection: blog_posts
Fields: id, title, slug, body, published_at, tags, author, seo_description
Environment variables (via Magpie .env):
DIRECTUS_URL Base URL inside the cf-internal network
(default: http://172.31.0.4:8055)
DIRECTUS_ADMIN_TOKEN Static admin token
DIRECTUS_ADMIN_EMAIL Admin email (for fresh JWT fallback)
DIRECTUS_ADMIN_PASSWORD
DIRECTUS_NETWORK Docker network name
(default: website_cf-internal)
IP gotcha: 172.31.0.4 is the current cf-directus address on website_cf-internal.
If calls start returning connection errors run:
docker inspect cf-directus --format '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}'
and update DIRECTUS_URL in Magpie's .env.
"""
from __future__ import annotations
import json
import re
import subprocess
from datetime import datetime, timezone
from typing import Any
from app.core.config import get_settings
_CURL_IMAGE = "curlimages/curl:latest"
def _curl(method: str, path: str, token: str, body: dict[str, Any] | None = None) -> dict:
"""Run a curl request inside a container on the cf-internal network."""
cfg = get_settings()
url = f"{cfg.directus_url}{path}"
cmd = [
"docker", "run", "--rm",
"--network", cfg.directus_network,
_CURL_IMAGE,
"-sf", "-X", method, url,
"-H", f"Authorization: Bearer {token}",
"-H", "Content-Type: application/json",
]
if body is not None:
cmd += ["--data", json.dumps(body)]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(f"Directus {method} {path} failed: {result.stderr.strip()}")
if not result.stdout.strip():
return {}
return json.loads(result.stdout)
def _get_token() -> str:
"""Return a usable token: static admin token, or fresh JWT via login."""
cfg = get_settings()
if cfg.directus_admin_token:
return cfg.directus_admin_token
if not (cfg.directus_admin_email and cfg.directus_admin_password):
raise RuntimeError(
"No Directus credentials configured. "
"Set DIRECTUS_ADMIN_TOKEN or DIRECTUS_ADMIN_EMAIL + DIRECTUS_ADMIN_PASSWORD."
)
resp = _curl("POST", "/auth/login", token="", body={
"email": cfg.directus_admin_email,
"password": cfg.directus_admin_password,
})
access_token = resp.get("data", {}).get("access_token")
if not access_token:
raise RuntimeError(f"Directus login failed: {resp}")
return access_token
def slugify(text: str) -> str:
slug = text.lower().strip()
slug = re.sub(r"[^\w\s-]", "", slug)
slug = re.sub(r"[\s_]+", "-", slug)
slug = re.sub(r"-+", "-", slug).strip("-")
return slug
def publish_blog_post(
title: str,
body: str,
slug: str | None = None,
tags: list[str] | None = None,
author: str | None = None,
seo_description: str | None = None,
published_at: str | None = None,
) -> dict:
"""
Create and publish a blog post in Directus.
Returns the created item dict (id, slug, title, ...).
published_at defaults to now (UTC ISO 8601). Pass None or omit to publish
immediately. Pass a future timestamp to schedule.
"""
token = _get_token()
_slug = slug or slugify(title)
_published_at = published_at or datetime.now(timezone.utc).isoformat()
payload: dict[str, Any] = {
"title": title,
"slug": _slug,
"body": body,
"published_at": _published_at,
}
if tags:
payload["tags"] = tags
if author:
payload["author"] = author
if seo_description:
payload["seo_description"] = seo_description
resp = _curl("POST", "/items/blog_posts", token=token, body=payload)
item = resp.get("data", resp)
return item
def get_blog_post(slug: str) -> dict | None:
"""Fetch a blog post by slug. Returns None if not found."""
from urllib.parse import quote
token = _get_token()
# Directus filter syntax uses brackets which must be percent-encoded for curl CLI
filter_param = f"filter%5Bslug%5D%5B_eq%5D={quote(slug, safe='')}"
resp = _curl(
"GET",
f"/items/blog_posts?{filter_param}&limit=1",
token=token,
)
items = resp.get("data", [])
return items[0] if items else None
def update_blog_post(post_id: int, fields: dict[str, Any]) -> dict:
"""Patch an existing blog post by ID."""
token = _get_token()
resp = _curl("PATCH", f"/items/blog_posts/{post_id}", token=token, body=fields)
return resp.get("data", resp)

37
app/services/platforms.py Normal file
View file

@ -0,0 +1,37 @@
"""
Platform registry: maps platform names to their poster implementations.
Adding a new platform:
1. Create app/services/<platform>/client.py implementing PlatformClient
2. Register it here in REGISTRY
This keeps poster.py platform-agnostic it looks up the right client by
the campaign's `platform` field rather than branching on strings everywhere.
"""
from __future__ import annotations
from typing import Protocol
class PlatformClient(Protocol):
def post(self, target: str, title: str, body: str, flair: str | None = None) -> str:
"""Post content to a target (sub, group, channel). Returns a URL."""
...
def get_client(platform: str) -> PlatformClient:
"""Return an initialized client for the given platform name."""
if platform == "reddit":
from app.services.reddit.client import RedditClient
return RedditClient()
raise NotImplementedError(
f"Platform '{platform}' is not yet implemented. "
f"Add a client in app/services/{platform}/ and register it here."
)
# Platforms with posting support implemented
SUPPORTED_PLATFORMS = {"reddit"}
# Platforms planned but not yet implemented
PLANNED_PLATFORMS = {"facebook", "discord", "lemmy", "mastodon"}

View file

@ -1,29 +0,0 @@
from __future__ import annotations
from app.services.platforms.base import PostingStrategy, PostResult
from app.services.platforms.reddit_post import RedditPostStrategy
from app.services.platforms.reddit_comment import RedditCommentStrategy
_REGISTRY: dict[str, PostingStrategy] = {
s.campaign_type: s()
for s in [
RedditPostStrategy,
RedditCommentStrategy,
# BlogPostStrategy — added in Plan C
]
}
SUPPORTED_PLATFORMS: frozenset[str] = frozenset(_REGISTRY)
def get_client(campaign_type: str) -> PostingStrategy:
"""Return the strategy instance for the given campaign type.
Raises ValueError for unknown types.
"""
if campaign_type not in _REGISTRY:
raise ValueError(f"Unknown campaign type: {campaign_type!r}")
return _REGISTRY[campaign_type]
__all__ = ["get_client", "SUPPORTED_PLATFORMS", "PostingStrategy", "PostResult"]

View file

@ -1,37 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
@dataclass
class PostResult:
url: str
metadata: dict | None = field(default=None)
class PostingStrategy(ABC):
"""Base class for all posting output types.
Subclasses must set class attribute `campaign_type` matching the value
stored in campaigns.type.
"""
campaign_type: str # e.g. "reddit_post", "reddit_comment", "blog_post"
@abstractmethod
def execute(
self,
*,
target: str,
title: str,
body: str,
flair: str | None = None,
extra: dict | None = None,
) -> PostResult:
"""Execute the post. Raise on failure. Return PostResult on success."""
...
def supports_dupe_guard(self) -> bool:
"""Return False to skip the weekly-dupe-guard check for this type."""
return True

View file

@ -1,141 +0,0 @@
from __future__ import annotations
import logging
import re
from datetime import date, timedelta
import httpx
from app.services.platforms.base import PostingStrategy, PostResult
from app.services.reddit.client import RedditClient
from app.core.config import get_settings
logger = logging.getLogger(__name__)
# Weekday names → int (0=Mon, 6=Sun)
_WEEKDAY_MAP = {
"monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3,
"friday": 4, "saturday": 5, "sunday": 6,
}
# Ordinal names → n
_ORDINAL_MAP = {"first": 1, "second": 2, "third": 3}
def is_nth_weekday(dt: date, weekday: int, n: int) -> bool:
"""True if dt is the nth occurrence of weekday (0=Mon, 6=Sun) in its month."""
first_of_month = dt.replace(day=1)
days_until = (weekday - first_of_month.weekday()) % 7
first_occurrence = first_of_month + timedelta(days=days_until)
nth_occurrence = first_occurrence + timedelta(weeks=n - 1)
return dt == nth_occurrence
def parse_occurrence(occurrence: str | None) -> tuple[int, int] | None:
"""Parse an occurrence string into (weekday, n) or None for 'every'.
Supported: "first_sunday", "second_monday", "third_friday", etc.
Returns None for "every" or None input.
Raises ValueError for unrecognised patterns.
"""
if occurrence is None or occurrence == "every":
return None
parts = occurrence.lower().split("_", 1)
if len(parts) != 2:
raise ValueError(f"Unrecognised occurrence format: {occurrence!r}")
ordinal, weekday_name = parts
if ordinal not in _ORDINAL_MAP or weekday_name not in _WEEKDAY_MAP:
raise ValueError(f"Unrecognised occurrence: {occurrence!r}")
return _WEEKDAY_MAP[weekday_name], _ORDINAL_MAP[ordinal]
def _extract_thread_id_from_url(url: str) -> str:
"""Extract a Reddit post ID from a full Reddit URL.
Expects URLs of the form:
https://www.reddit.com/r/<sub>/comments/<id>/<title>/
Raises ValueError if the ID cannot be found.
"""
match = re.search(r"/comments/([a-zA-Z0-9]+)/", url)
if not match:
raise ValueError(f"Cannot extract thread id from {url!r}")
return match.group(1)
def _find_sticky(
sub: str,
title_pattern: str,
session_file: str | None = None,
) -> str | None:
"""Search the hot listing of a subreddit for a post matching title_pattern.
Uses the Reddit public JSON API (no auth required).
Returns the post ID (e.g. "abc123") of the first match, or None.
"""
# TODO: use session_file for authenticated requests on private subs
url = f"https://www.reddit.com/r/{sub}/hot.json?limit=10"
try:
response = httpx.get(url, headers={"User-Agent": "magpie/1.0"}, timeout=10)
response.raise_for_status()
except httpx.HTTPError as exc:
logger.warning("Reddit hot.json request failed for r/%s: %s", sub, exc)
raise RuntimeError(f"Failed to fetch hot listing for r/{sub}") from exc
payload = response.json()
if "data" not in payload:
logger.warning("Unexpected Reddit API response: %r", payload)
return None
children = payload.get("data", {}).get("children", [])
pattern_lower = title_pattern.lower()
for child in children:
post = child.get("data", {})
title = post.get("title", "")
if pattern_lower in title.lower():
return post.get("id")
return None
class RedditCommentStrategy(PostingStrategy):
campaign_type = "reddit_comment"
def supports_dupe_guard(self) -> bool:
return False # comment threads may appear multiple times
def execute(
self,
*,
target: str,
title: str,
body: str,
flair: str | None = None,
extra: dict | None = None,
) -> PostResult:
extra = extra or {}
thread_url_override = extra.get("thread_url_override")
thread_title_pattern = extra.get("thread_title_pattern")
session_file = get_settings().reddit_session_file
# Resolve thread_id
if thread_url_override:
thread_id = _extract_thread_id_from_url(thread_url_override)
elif thread_title_pattern:
thread_id = _find_sticky(
sub=target,
title_pattern=thread_title_pattern,
session_file=session_file,
)
if thread_id is None:
raise ValueError(
f"No thread matching {thread_title_pattern!r} found in r/{target}"
)
else:
raise ValueError(
"RedditCommentStrategy requires thread_url_override or thread_title_pattern in extra"
)
client = RedditClient(session_file=session_file)
comment_url = client.comment(thread_id=thread_id, body=body)
# Reddit comment() may return bare domain or empty; reconstruct from thread_id
if not comment_url or comment_url.rstrip("/") in ("https://reddit.com", "https://www.reddit.com"):
comment_url = f"https://www.reddit.com/r/{target}/comments/{thread_id}/"
return PostResult(url=comment_url, metadata={"thread_id": thread_id})

View file

@ -1,25 +0,0 @@
from __future__ import annotations
from app.core.config import get_settings
from app.services.platforms.base import PostingStrategy, PostResult
from app.services.reddit.client import RedditClient
class RedditPostStrategy(PostingStrategy):
"""Submit a new Reddit text post via RedditClient (Playwright subprocess)."""
campaign_type = "reddit_post"
def execute(
self,
*,
target: str,
title: str,
body: str,
flair: str | None = None,
extra: dict | None = None,
) -> PostResult:
settings = get_settings()
client = RedditClient(session_file=settings.reddit_session_file)
url = client.post(sub=target, title=title, body=body, flair=flair)
return PostResult(url=url)

View file

@ -5,129 +5,83 @@ Called by the scheduler and by the manual-trigger API endpoint.
from __future__ import annotations
import asyncio
import logging
from datetime import date
from pathlib import Path
from app.core.config import get_settings
from app.db.store import Store
from app.services.platforms import get_client
from app.services.platforms.reddit_comment import is_nth_weekday, parse_occurrence
logger = logging.getLogger(__name__)
from app.services.platforms import get_client, SUPPORTED_PLATFORMS
def _run_post(db_path: str, campaign_id: int, target: str,
triggered_by: str = "scheduler") -> dict:
def _run_post(db_path: str, campaign_id: int, sub: str, triggered_by: str = "scheduler") -> dict:
"""Execute a single post attempt (blocking, runs in a thread)."""
store = Store(db_path)
try:
campaign = store.get_campaign(campaign_id)
if campaign is None:
return {"skipped": True, "reason": f"campaign {campaign_id} not found"}
# Dupe guard: skip if already posted to this sub this week
if store.already_posted_this_week(campaign_id, sub):
return {"skipped": True, "reason": f"already posted to r/{sub} this week"}
campaign_type = campaign.get("type", "reddit_post")
# Resolve strategy — skip if type is unknown
try:
strategy = get_client(campaign_type)
except ValueError as exc:
return {"skipped": True, "reason": str(exc)}
# Fetch the campaign_subs row for this target (used for extra + occurrence)
all_subs = store.list_campaign_subs(campaign_id)
sub_row = next((s for s in all_subs if s["sub"] == target), {})
# Occurrence check — skip if not the right week of the month
occurrence_str = sub_row.get("occurrence")
try:
parsed = parse_occurrence(occurrence_str)
except ValueError as exc:
return {"skipped": True, "reason": f"invalid occurrence {occurrence_str!r}: {exc}"}
if parsed is not None:
weekday, n = parsed
if not is_nth_weekday(date.today(), weekday, n):
logger.info(
"Skipping %s / %s — not occurrence %s today",
campaign_id, target, occurrence_str,
)
return {"skipped": True, "reason": f"occurrence {occurrence_str} not today"}
# Per-sub post cap (max_posts=None means unlimited)
max_posts = sub_row.get("max_posts")
if max_posts is not None:
count = store.successful_post_count(campaign_id, target)
if count >= max_posts:
logger.info(
"Skipping %s / %s — reached max_posts=%d (posted %d time(s))",
campaign_id, target, max_posts, count,
)
return {"skipped": True, "reason": f"max_posts={max_posts} reached for {target!r}"}
# Dupe guard (opt-out allowed per strategy)
if strategy.supports_dupe_guard() and store.already_posted_this_week(campaign_id, target):
return {"skipped": True, "reason": f"already posted to {target!r} this week"}
# Resolve best content variant for this target
variant = store.resolve_variant(campaign_id, target)
# Resolve the best variant for this sub
variant = store.resolve_variant(campaign_id, sub)
if variant is None:
return {"skipped": True, "reason": "no variant found for campaign"}
# Check platform rules (Reddit-specific; harmless for other platforms)
rules = store.get_sub_rules(target)
# Check sub rules
rules = store.get_sub_rules(sub)
if rules and rules.get("promo_allowed") == 0:
return {"skipped": True, "reason": f"{target!r} hard-bans promotion"}
return {"skipped": True, "reason": f"r/{sub} hard-bans promotion"}
# Create pending post record
# Check platform support
campaign = store.get_campaign(campaign_id)
platform = campaign["platform"] if campaign else "reddit"
if platform not in SUPPORTED_PLATFORMS:
return {"skipped": True, "reason": f"platform '{platform}' not yet implemented"}
# Create the pending post record
post = store.create_post(
campaign_id=campaign_id,
target=target,
target=sub,
variant_id=variant["id"],
platform=campaign.get("platform", "reddit"),
platform=platform,
triggered_by=triggered_by,
)
post_id = post["id"]
# Build extra dict from sub_row
extra = dict(sub_row)
# Execute strategy
flair = variant.get("flair") or (rules.get("flair_to_use") if rules else None)
# Execute
try:
result = strategy.execute(
target=target,
title=variant.get("title", ""),
body=variant.get("body", ""),
client = get_client(platform)
flair = variant.get("flair") or (rules.get("flair_to_use") if rules else None)
url = client.post(
sub=sub,
title=variant["title"],
body=variant["body"],
flair=flair,
extra=extra,
)
return store.update_post_status(post_id, "success", url=result.url)
return store.update_post_status(post_id, "success", url=url)
except Exception as exc:
logger.exception("Strategy %s failed for target %r", campaign_type, target)
return store.update_post_status(post_id, "failed", error_msg=str(exc))
finally:
store.close()
async def post_campaign_to_sub(campaign_id: int, target: str,
async def post_campaign_to_sub(campaign_id: int, sub: str,
triggered_by: str = "scheduler") -> dict:
"""Async wrapper for API and scheduler use."""
db_path = get_settings().db_path
return await asyncio.to_thread(_run_post, db_path, campaign_id, target, triggered_by)
return await asyncio.to_thread(_run_post, db_path, campaign_id, sub, triggered_by)
async def run_campaign(campaign_id: int, triggered_by: str = "scheduler") -> list[dict]:
"""Post a campaign to all of its configured targets, sequentially."""
"""Post a campaign to all of its configured subs, sequentially."""
db_path = get_settings().db_path
store = Store(db_path)
try:
subs = store.list_campaign_subs(campaign_id)
active_targets = [s["sub"] for s in subs if s.get("active", 1)]
active_subs = [s["sub"] for s in subs if s.get("active", 1)]
finally:
store.close()
results = []
for target in active_targets:
result = await post_campaign_to_sub(campaign_id, target, triggered_by)
for sub in active_subs:
result = await post_campaign_to_sub(campaign_id, sub, triggered_by)
results.append(result)
return results

View file

@ -24,7 +24,7 @@ USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64) Chrome/124.0.0.0"
class RedditClient:
def __init__(self, session_file: Path | None = None) -> None:
settings = get_settings()
self._session_file = Path(session_file) if session_file else Path(settings.reddit_session_file)
self._session_file = session_file or Path(settings.reddit_session_file)
ensure_valid_session(self._session_file)
self.cookies = load_cookies(self._session_file)
self.headers = {"User-Agent": USER_AGENT}
@ -43,36 +43,43 @@ class RedditClient:
return self._modhash
def post(self, sub: str, title: str, body: str, flair: str | None = None) -> str:
"""Submit a text post via Reddit legacy API (httpx). Returns the permalink."""
data = {
"api_type": "json",
"kind": "self",
"sr": sub,
"title": title,
"text": body,
"uh": self.modhash,
"sendreplies": "true",
"nsfw": "false",
"spoiler": "false",
}
resp = httpx.post(
"https://www.reddit.com/api/submit",
cookies=self.cookies,
headers=self.headers,
data=data,
timeout=30,
"""Submit a text post via Playwright (xvfb-run). Returns the permalink."""
settings = get_settings()
cmd = [
"xvfb-run", "--auto-servernum",
sys.executable, str(_POST_SCRIPT),
"--sub", sub,
"--title", title,
"--body", body,
"--yes",
]
if flair:
cmd += ["--flair", flair]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=180,
env={
**__import__("os").environ,
"REDDIT_SESSION_FILE": str(self._session_file),
},
)
resp.raise_for_status()
result = resp.json()
errors = result.get("json", {}).get("errors", [])
if errors:
raise RuntimeError(f"Post to r/{sub} failed: {errors}")
url = result.get("json", {}).get("data", {}).get("url", "")
if not url:
if result.returncode != 0:
raise RuntimeError(
f"Post to r/{sub} may have failed — no URL in response:\n{result}"
f"Post to r/{sub} failed (exit {result.returncode}):\n"
f"STDOUT:\n{result.stdout}\n"
f"STDERR:\n{result.stderr}"
)
for line in result.stdout.splitlines():
if line.startswith("Posted:"):
url = line.split("Posted:", 1)[-1].strip()
return url
raise RuntimeError(
f"Post to r/{sub} may have failed — no 'Posted:' line in output.\n"
f"Full stdout:\n{result.stdout}"
)
def comment(self, thread_id: str, body: str) -> str:
"""Post a top-level comment to a thread. Returns the permalink."""

View file

@ -24,9 +24,9 @@ from dotenv import load_dotenv
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout
from playwright_stealth import Stealth
# Load .env from project root (magpie repo root, 3 levels up from this file)
# Load .env from project root (two levels up from this file)
_HERE = Path(__file__).parent
_PROJECT_ROOT = _HERE.parents[2] # reddit/ → services/ → app/ → magpie/
_PROJECT_ROOT = _HERE.parents[3]
load_dotenv(_PROJECT_ROOT / ".env")
REDDIT_USERNAME = os.getenv("REDDIT_USERNAME", "")
@ -34,10 +34,7 @@ REDDIT_PASSWORD = os.getenv("REDDIT_PASSWORD", "")
CHROME_BIN = os.getenv("CHROME_BIN", "/usr/bin/google-chrome")
# Session file path from env (so the service layer can pass it via env var)
SESSION_FILE = Path(os.getenv(
"REDDIT_SESSION_FILE",
str(Path.home() / ".local" / "share" / "magpie" / "session.json"),
))
SESSION_FILE = Path(os.getenv("REDDIT_SESSION_FILE", str(_HERE / "session.json")))
LOGIN_URL = "https://www.reddit.com/login"
SUBMIT_URL = "https://www.reddit.com/r/{sub}/submit?type=text"
@ -190,24 +187,13 @@ def post(sub: str, title: str, body: str, flair: str | None = None, yes: bool =
time.sleep(0.5)
# Fill body — Reddit shows either a markdown textarea or a Lexical
# rich-text contenteditable depending on user/sub settings.
# Try markdown textarea first (visible in screenshot), fall back to Lexical.
# Fill body (Lexical editor — click to focus, then type)
try:
md_textarea = page.locator('textarea[placeholder="Body text*"]')
if md_textarea.count() > 0:
md_textarea.first.wait_for(state="visible", timeout=5_000)
md_textarea.first.click()
time.sleep(0.3)
md_textarea.first.fill(body)
print(" Body filled via markdown textarea")
else:
body_el = page.locator('div[contenteditable="true"]').first
body_el.wait_for(state="visible", timeout=10_000)
body_el.click()
time.sleep(0.3)
page.keyboard.type(body, delay=2)
print(" Body filled via rich text editor")
except Exception as exc:
print(f" Warning: body fill failed ({exc})")
@ -224,26 +210,11 @@ def post(sub: str, title: str, body: str, flair: str | None = None, yes: bool =
except Exception as exc:
print(f" Warning: flair selection failed ({exc})")
# Submit — try multiple selector strategies; Reddit's form markup varies
# Submit
try:
# Scroll to bottom so button is in viewport
page.keyboard.press("End")
time.sleep(0.3)
for selector in [
'button[type="submit"]',
'button:has-text("Post")',
'[slot="submit-button"] button',
'button.bg-interactive-onbackground',
]:
btn = page.locator(selector).last
if btn.count() > 0:
btn.scroll_into_view_if_needed()
btn.wait_for(state="visible", timeout=5_000)
btn.click()
print(f" Clicked submit via {selector!r}")
break
else:
print(" Warning: no submit button found with any selector")
submit_btn = page.locator('button[type="submit"]').filter(has_text="Post")
submit_btn.wait_for(state="visible", timeout=10_000)
submit_btn.click()
except Exception as exc:
print(f" Warning: submit button click failed ({exc})")

View file

@ -43,13 +43,7 @@
</div>
<div v-for="s in campaignSubs" :key="s.id" style="display: flex; align-items: center; gap: var(--spacing-sm); padding: 6px 0; border-bottom: 1px solid var(--color-border);">
<span>r/{{ s.sub }}</span>
<div style="margin-left: auto; display: flex; gap: 4px;">
<button class="btn btn-ghost btn-sm" @click="openCopyModal(s.sub)" title="Copy & open Reddit">Copy & Post</button>
<button class="btn btn-primary btn-sm" @click="triggerSub(s.sub)" :disabled="triggeringSub === s.sub" title="Auto-post via Playwright">
{{ triggeringSub === s.sub ? '...' : 'Run' }}
</button>
<button class="btn btn-ghost btn-sm" style="color: var(--color-danger);" @click="removeSub(s.sub)"></button>
</div>
<button class="btn btn-ghost btn-sm" style="margin-left: auto; color: var(--color-danger);" @click="removeSub(s.sub)"></button>
</div>
<div v-if="campaignSubs.length === 0" class="empty-state" style="padding: var(--spacing-md);">No subs configured.</div>
</div>
@ -115,55 +109,6 @@
</div>
</div>
<!-- Copy & Open modal -->
<div v-if="copyModal.sub" class="modal-backdrop" @click.self="copyModal.sub = ''">
<div class="modal card" style="width: 620px; max-height: 90vh; overflow-y: auto;">
<div style="display: flex; align-items: center; justify-content: space-between; margin-bottom: var(--spacing-md);">
<h2 style="font-size: 16px; margin: 0;">Post to r/{{ copyModal.sub }}</h2>
<a :href="copyModal.url" target="_blank" class="btn btn-primary btn-sm">Open Reddit </a>
</div>
<div class="form-group">
<div style="display: flex; align-items: center; justify-content: space-between; margin-bottom: 4px;">
<label class="form-label" style="margin: 0;">Title</label>
<button class="btn btn-ghost btn-sm" @click="copy(copyModal.title, 'title')">{{ copied === 'title' ? '✓ Copied' : 'Copy' }}</button>
</div>
<input class="form-input" :value="copyModal.title" readonly @click="($event.target as HTMLInputElement).select()" />
</div>
<div class="form-group">
<div style="display: flex; align-items: center; justify-content: space-between; margin-bottom: 4px;">
<label class="form-label" style="margin: 0;">Body</label>
<button class="btn btn-ghost btn-sm" @click="copy(copyModal.body, 'body')">{{ copied === 'body' ? '✓ Copied' : 'Copy' }}</button>
</div>
<textarea class="form-textarea" :value="copyModal.body" readonly rows="14"
style="font-family: var(--font-mono); font-size: 12px; resize: vertical;"
@click="($event.target as HTMLTextAreaElement).select()" />
</div>
<!-- Sub-specific notes (e.g. AI disclosure requirement) -->
<div v-if="copyModal.notes" class="form-group">
<div style="display: flex; align-items: center; justify-content: space-between; margin-bottom: 4px;">
<label class="form-label" style="margin: 0;">Sub notes</label>
<button class="btn btn-ghost btn-sm" @click="copy(copyModal.notes, 'notes')">{{ copied === 'notes' ? '✓ Copied' : 'Copy' }}</button>
</div>
<textarea class="form-textarea" :value="copyModal.notes" readonly rows="3"
style="font-size: 12px; resize: vertical; color: var(--color-text-muted);"
@click="($event.target as HTMLTextAreaElement).select()" />
</div>
<div style="color: var(--color-text-muted); font-size: 12px; margin-bottom: var(--spacing-md);">
1. Copy title paste into Reddit title field<br>
2. Copy body paste into body<br>
3. Submit on Reddit
</div>
<div style="display: flex; justify-content: flex-end;">
<button class="btn btn-ghost" @click="copyModal.sub = ''">Close</button>
</div>
</div>
</div>
<!-- Add sub modal -->
<div v-if="showAddSub" class="modal-backdrop" @click.self="showAddSub = false">
<div class="modal card" style="width: 360px;">
@ -185,7 +130,7 @@
<script setup lang="ts">
import { onMounted, reactive, ref } from 'vue'
import { useRoute } from 'vue-router'
import { api, type Campaign, type Variant, type CampaignSub, type Post, type SubRules } from '@/services/api'
import { api, type Campaign, type Variant, type CampaignSub, type Post } from '@/services/api'
const route = useRoute()
const campaignId = Number(route.params.id)
@ -194,42 +139,26 @@ const campaign = ref<Campaign | null>(null)
const variants = ref<Variant[]>([])
const campaignSubs = ref<CampaignSub[]>([])
const recentPosts = ref<Post[]>([])
const subRulesMap = ref<Record<string, SubRules>>({})
const triggering = ref(false)
const triggeringSub = ref<string | null>(null)
const showAddVariant = ref(false)
const showAddSub = ref(false)
const copyModal = reactive({ sub: '', title: '', body: '', url: '', notes: '' })
const copied = ref('')
const variantForm = reactive({ sub_pattern: '*', title: '', body: '', flair: '', notes: '' })
const subForm = reactive({ sub: '' })
onMounted(async () => {
const [c, v, s, p, allRules] = await Promise.all([
const [c, v, s, p] = await Promise.all([
api.campaigns.get(campaignId),
api.variants.list(campaignId),
api.subs.listForCampaign(campaignId),
api.posts.list(campaignId, undefined, 20),
api.subs.listRules(),
])
campaign.value = c
variants.value = v
campaignSubs.value = s
recentPosts.value = p
subRulesMap.value = Object.fromEntries(allRules.map(r => [r.sub, r]))
})
async function triggerSub(sub: string) {
triggeringSub.value = sub
try {
await api.posts.trigger(campaignId, sub)
recentPosts.value = await api.posts.list(campaignId, undefined, 20)
} finally {
triggeringSub.value = null
}
}
async function triggerAll() {
triggering.value = true
try {
@ -270,32 +199,6 @@ async function removeSub(sub: string) {
campaignSubs.value = campaignSubs.value.filter(s => s.sub !== sub)
}
function resolveVariant(sub: string): Variant | null {
// Exact sub match first, then wildcard mirrors backend resolve_variant logic
return (
variants.value.find(v => v.sub_pattern === sub) ??
variants.value.find(v => v.sub_pattern === '*') ??
null
)
}
function openCopyModal(sub: string) {
const v = resolveVariant(sub)
const rules = subRulesMap.value[sub]
copyModal.sub = sub
copyModal.title = v?.title ?? ''
copyModal.body = v?.body ?? ''
copyModal.url = rules?.post_url ?? `https://www.reddit.com/r/${sub}/submit?type=TEXT`
copyModal.notes = rules?.notes ?? ''
copied.value = ''
}
async function copy(text: string, which: string) {
await navigator.clipboard.writeText(text)
copied.value = which
setTimeout(() => { copied.value = '' }, 2000)
}
function formatDate(iso: string) {
const d = new Date(iso + 'Z')
return d.toLocaleDateString(undefined, { month: 'short', day: 'numeric', hour: '2-digit', minute: '2-digit' })

View file

@ -30,8 +30,8 @@
</router-link>
</td>
<td data-label="Product"><span class="badge badge-info">{{ c.product }}</span></td>
<td data-label="Schedule" class="text-sm text-muted" :title="c.cron_schedule ?? undefined">
{{ humanizeCron(c.cron_schedule) }}
<td data-label="Schedule" class="text-mono text-sm text-muted">
{{ c.cron_schedule ?? '— manual' }}
</td>
<td data-label="Status">
<span :class="['badge', c.active ? 'badge-success' : 'badge-muted']">
@ -90,7 +90,6 @@
<script setup lang="ts">
import { onMounted, reactive, ref } from 'vue'
import { useCampaignStore } from '@/stores/campaigns'
import { humanizeCron } from '@/utils/cron'
const store = useCampaignStore()
const showCreate = ref(false)

View file

@ -115,48 +115,20 @@
<div class="handoff-actions">
<button class="btn btn-secondary" @click="copyDraft">📋 Copy draft</button>
<a :href="selected.thread_url" target="_blank" class="btn btn-secondary">🔗 Open thread</a>
<button v-if="!confirmingPosted" class="btn btn-success" @click="confirmingPosted = true">
<button class="btn btn-success" :disabled="saving" @click="markManualPosted">
Mark as posted
</button>
</div>
<div v-if="copied" class="copy-confirm">Copied to clipboard</div>
<div v-if="confirmingPosted" class="post-url-confirm">
<input
v-model="postedUrl"
class="input"
placeholder="Post URL (optional — paste link to your comment/post)"
@keydown.enter="markManualPosted"
@keydown.escape="confirmingPosted = false"
/>
<div class="handoff-actions" style="margin-top: var(--spacing-xs);">
<button class="btn btn-success" :disabled="saving" @click="markManualPosted">Confirm</button>
<button class="btn btn-secondary" @click="confirmingPosted = false">Cancel</button>
</div>
</div>
</section>
<!-- Auto-post panel for approved Reddit -->
<section v-if="selected.status === 'approved' && selected.platform === 'reddit'" class="handoff-panel">
<h3 class="section-label">Ready to post</h3>
<p class="handoff-note">Use trigger_sub_post from the Campaigns view, or mark as posted manually if you handled it.</p>
<div v-if="!confirmingPosted">
<button class="btn btn-success" @click="confirmingPosted = true">
<button class="btn btn-success" :disabled="saving" @click="markManualPosted">
Mark as posted
</button>
</div>
<div v-if="confirmingPosted" class="post-url-confirm">
<input
v-model="postedUrl"
class="input"
placeholder="Post URL (optional)"
@keydown.enter="markManualPosted"
@keydown.escape="confirmingPosted = false"
/>
<div class="handoff-actions" style="margin-top: var(--spacing-xs);">
<button class="btn btn-success" :disabled="saving" @click="markManualPosted">Confirm</button>
<button class="btn btn-secondary" @click="confirmingPosted = false">Cancel</button>
</div>
</div>
</section>
</div>
</div>
@ -231,8 +203,6 @@ const showAddModal = ref(false)
const copied = ref(false)
const filterStatus = ref<OpportunityStatus | ''>('')
const loadError = ref<string | null>(null)
const confirmingPosted = ref(false)
const postedUrl = ref('')
const editBody = ref('')
const editTitle = ref('')
@ -270,8 +240,6 @@ function select(opp: Opportunity) {
selected.value = opp
editBody.value = opp.draft_body
editTitle.value = opp.draft_title ?? ''
confirmingPosted.value = false
postedUrl.value = ''
}
watch(selected, opp => {
@ -325,10 +293,8 @@ async function markManualPosted() {
if (!selected.value) return
saving.value = true
try {
const updated = await api.opportunities.markPosted(selected.value.id, true, postedUrl.value || null)
const updated = await api.opportunities.markPosted(selected.value.id, true)
replace(updated)
confirmingPosted.value = false
postedUrl.value = ''
} finally {
saving.value = false
}
@ -518,13 +484,6 @@ onMounted(load)
margin-top: var(--spacing-xs);
}
.post-url-confirm {
margin-top: var(--spacing-sm);
display: flex;
flex-direction: column;
gap: var(--spacing-xs);
}
/* Add form */
.form-grid { display: grid; grid-template-columns: 1fr 1fr; gap: var(--spacing-md); }

View file

@ -55,8 +55,7 @@ onMounted(async () => {
])
})
function campaignName(id: number | null) {
if (id === null) return 'manual'
function campaignName(id: number) {
return campaignStore.campaigns.find(c => c.id === id)?.name ?? `#${id}`
}

View file

@ -86,10 +86,10 @@
<span style="font-weight: 500; font-size: 13px;">{{ r.name }}</span>
<span v-if="r.sub" class="chip chip-sub" style="font-size: 10px;">{{ r.sub }}</span>
<span v-if="r.label" class="chip" :class="`chip-label-${r.label}`" style="font-size: 10px;">{{ r.label }}</span>
<div style="margin-left: auto; display: flex; align-items: center; gap: 4px;">
<span v-if="!r.active" class="badge badge-muted" style="font-size: 10px;">paused</span>
<button class="btn btn-ghost btn-xs" :title="r.active ? 'Pause' : 'Resume'" @click="toggleRule(r)">{{ r.active ? '' : '' }}</button>
<button class="btn btn-ghost btn-xs" style="color: var(--color-danger);" title="Delete" @click="deleteRule(r.id)"></button>
<span v-if="!r.active" class="badge badge-muted" style="margin-left: auto; font-size: 10px;">paused</span>
<div v-else style="margin-left: auto; display: flex; gap: 4px;">
<button class="btn btn-ghost btn-xs" @click="toggleRule(r)"></button>
<button class="btn btn-ghost btn-xs" style="color: var(--color-danger);" @click="deleteRule(r.id)"></button>
</div>
</div>
<div class="rule-keywords">

View file

@ -35,7 +35,7 @@
<span v-if="r.rule_warning" class="badge badge-warning">yes</span>
<span v-else style="color: var(--color-text-muted);"></span>
</td>
<td data-label="Notes" style="color: var(--color-text-muted); max-width: 260px; white-space: normal; word-break: break-word;" :title="r.notes ?? ''">
<td data-label="Notes" style="color: var(--color-text-muted); max-width: 200px; white-space: nowrap; overflow: hidden; text-overflow: ellipsis;">
{{ r.notes ?? '—' }}
</td>
<td data-label="">
@ -94,10 +94,6 @@
<option :value="true">Yes</option>
</select>
</div>
<div class="form-group">
<label class="form-label">Post URL <span style="color: var(--color-text-muted)">(optional overrides /submit, e.g. megathread link)</span></label>
<input class="form-input" v-model="form.post_url" placeholder="https://www.reddit.com/r/selfhosted/comments/..." />
</div>
<div class="form-group">
<label class="form-label">Notes</label>
<textarea class="form-textarea" v-model="form.notes" rows="2" placeholder="Any posting quirks..." />
@ -126,7 +122,6 @@ const form = reactive({
flair_to_use: '',
promo_allowed: null as boolean | null,
rule_warning: false,
post_url: '',
notes: '',
})
@ -143,7 +138,6 @@ function startEdit(r: SubRules) {
flair_to_use: r.flair_to_use ?? '',
promo_allowed: r.promo_allowed === null ? null : !!r.promo_allowed,
rule_warning: !!r.rule_warning,
post_url: r.post_url ?? '',
notes: r.notes ?? '',
})
}
@ -151,7 +145,7 @@ function startEdit(r: SubRules) {
function closeModal() {
showAdd.value = false
editing.value = null
Object.assign(form, { sub: '', platform: 'reddit', flair_required: false, flair_to_use: '', promo_allowed: null, rule_warning: false, post_url: '', notes: '' })
Object.assign(form, { sub: '', platform: 'reddit', flair_required: false, flair_to_use: '', promo_allowed: null, rule_warning: false, notes: '' })
}
async function save() {
@ -162,7 +156,6 @@ async function save() {
flair_to_use: form.flair_to_use || null,
promo_allowed: form.promo_allowed,
rule_warning: form.rule_warning,
post_url: form.post_url || null,
notes: form.notes || null,
}, platform)
const idx = rules.value.findIndex(r => r.sub === sub && r.platform === platform)

View file

@ -64,7 +64,7 @@ export interface CampaignSub {
export interface Post {
id: number
campaign_id: number | null
campaign_id: number
variant_id: number | null
platform: string
target: string
@ -85,7 +85,6 @@ export interface SubRules {
promo_allowed: number | null
rule_warning: number
notes: string | null
post_url: string | null
last_checked: string | null
updated_at: string
}
@ -96,7 +95,6 @@ export interface SubRulesUpsert {
promo_allowed?: boolean | null
rule_warning?: boolean
notes?: string | null
post_url?: string | null
}
export type OpportunityStatus =
@ -277,8 +275,8 @@ export const api = {
approve: (id: number) =>
http.post<ApproveResult>(`/opportunities/${id}/approve`).then(r => r.data),
markPosted: (id: number, manual = false, url?: string | null) =>
http.post<Opportunity>(`/opportunities/${id}/mark-posted`, { url: url ?? null }, { params: { manual } }).then(r => r.data),
markPosted: (id: number, manual = false) =>
http.post<Opportunity>(`/opportunities/${id}/mark-posted`, null, { params: { manual } }).then(r => r.data),
dismiss: (id: number, note?: string) =>
http.post<Opportunity>(`/opportunities/${id}/dismiss`, { note: note ?? null }).then(r => r.data),

View file

@ -1,107 +0,0 @@
/**
* Humanize a 5-field cron expression into plain English.
* Covers the patterns realistically used for social posting schedules.
* Returns the original string for anything it can't parse cleanly.
*
* Field order: minute hour dom month dow
*/
const DAYS = ['Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat']
const MONTHS = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
function formatTime(minute: string, hour: string): string {
const h = parseInt(hour)
const m = parseInt(minute)
const suffix = h < 12 ? 'AM' : 'PM'
const h12 = h % 12 === 0 ? 12 : h % 12
return m === 0 ? `${h12} ${suffix}` : `${h12}:${m.toString().padStart(2, '0')} ${suffix}`
}
function parseDow(dow: string): string {
// Single digit
if (/^\d$/.test(dow)) return DAYS[parseInt(dow)] ?? dow
// Comma list: 1,3,5
if (dow.includes(',')) {
return dow.split(',').map(d => DAYS[parseInt(d)] ?? d).join(', ')
}
// Range: 1-5
if (dow.includes('-')) {
const [start, end] = dow.split('-').map(Number)
if (start === 1 && end === 5) return 'Weekdays'
if (start === 0 && end === 6) return 'Every day'
return `${DAYS[start]}-${DAYS[end]}`
}
return dow
}
function parseMonth(month: string): string {
if (/^\d+$/.test(month)) return MONTHS[parseInt(month) - 1] ?? month
return month
}
export function humanizeCron(expr: string | null | undefined): string {
if (!expr) return 'Manual'
const parts = expr.trim().split(/\s+/)
if (parts.length !== 5) return expr
const [minute, hour, dom, month, dow] = parts
const everyMinute = minute === '*'
const everyHour = hour === '*'
const everyDom = dom === '*'
const everyMonth = month === '*'
const everyDow = dow === '*'
// Reject non-trivial step/range combos in time fields — fall back to raw
if ((minute.includes('/') || minute.includes('-')) && minute !== '*') return expr
if ((hour.includes('/') || hour.includes('-')) && hour !== '*') return expr
// Every minute
if (everyMinute && everyHour && everyDom && everyMonth && everyDow) return 'Every minute'
// Every N minutes: */N * * * *
if (minute.startsWith('*/') && everyHour && everyDom && everyMonth && everyDow) {
const n = minute.slice(2)
return `Every ${n} min`
}
// Hourly at :MM — 30 * * * *
if (!everyMinute && everyHour && everyDom && everyMonth && everyDow) {
return `Hourly at :${minute.padStart(2, '0')}`
}
// Every N hours: 0 */N * * *
if (minute === '0' && hour.startsWith('*/') && everyDom && everyMonth && everyDow) {
const n = hour.slice(2)
return `Every ${n}h`
}
// Fixed time, specific dow(s), every month
if (!everyMinute && !everyHour && everyDom && everyMonth && !everyDow) {
const time = formatTime(minute, hour)
const day = parseDow(dow)
// Multiple days (comma)
if (dow.includes(',')) return `${day} at ${time}`
if (dow === '1-5') return `Weekdays at ${time}`
return `${day}s at ${time}`
}
// Fixed time, every day
if (!everyMinute && !everyHour && everyDom && everyMonth && everyDow) {
return `Daily at ${formatTime(minute, hour)}`
}
// Fixed time, specific dom, every month, every dow
if (!everyMinute && !everyHour && !everyDom && everyMonth && everyDow) {
return `Monthly on the ${dom} at ${formatTime(minute, hour)}`
}
// Fixed time, specific month + dom
if (!everyMinute && !everyHour && !everyDom && !everyMonth && everyDow) {
return `${parseMonth(month)} ${dom} at ${formatTime(minute, hour)}`
}
// Fallback: return raw but trimmed
return expr
}

View file

@ -308,62 +308,12 @@ const TOOLS = [
},
},
// Blog (Directus)
{
name: 'publish_blog_post',
description: 'Publish a blog post to the CircuitForge website via Directus. Defaults to published immediately. Returns the created post including its id and slug.',
inputSchema: {
type: 'object',
properties: {
title: { type: 'string', description: 'Post title' },
body: { type: 'string', description: 'Post body (Markdown)' },
slug: { type: 'string', description: 'URL slug — auto-generated from title if omitted' },
tags: { type: 'array', items: { type: 'string' }, description: 'Tag list (e.g. ["sprint-review", "kiwi"])' },
author: { type: 'string', description: 'Author name (optional)' },
seo_description: { type: 'string', description: 'Short SEO/meta description (optional)' },
published_at: { type: 'string', description: 'ISO 8601 publish timestamp — defaults to now' },
},
required: ['title', 'body'],
},
},
{
name: 'get_blog_post',
description: 'Fetch an existing blog post by its URL slug.',
inputSchema: {
type: 'object',
properties: {
slug: { type: 'string', description: 'The post slug (e.g. "2026-04-28-sprint-review")' },
},
required: ['slug'],
},
},
// Scheduler
{
name: 'scheduler_status',
description: 'Check the scheduler status and see next scheduled run times for all campaigns.',
inputSchema: { type: 'object', properties: {} },
},
{
name: 'refresh_reddit_session',
description: 'Re-establish the Playwright Reddit session. Use target="bridge" to refresh the claude-bridge poster session, or "magpie" (default) for Magpie\'s own session. Takes ~30s.',
inputSchema: {
type: 'object',
properties: {
target: { type: 'string', enum: ['magpie', 'bridge'], default: 'magpie', description: 'Which session to refresh: "magpie" or "bridge" (claude-bridge/reddit-poster)' },
},
},
},
{
name: 'reddit_session_status',
description: 'Check whether the Reddit Playwright session is valid and how old it is.',
inputSchema: {
type: 'object',
properties: {
target: { type: 'string', enum: ['magpie', 'bridge'], default: 'magpie', description: 'Which session to check' },
},
},
},
];
// ─── Dispatch ─────────────────────────────────────────────────────────────────
@ -432,13 +382,6 @@ async function callTool(name, args) {
return await api('PUT', `/subs/${sub}`, body);
}
case 'publish_blog_post': {
const { title, body, ...rest } = args;
return await api('POST', '/blog', { title, body, ...rest });
}
case 'get_blog_post':
return await api('GET', `/blog/${encodeURIComponent(args.slug)}`);
case 'scheduler_status':
return await api('GET', '/scheduler/status');
@ -471,15 +414,6 @@ async function callTool(name, args) {
return await api('PATCH', `/opportunities/${opportunity_id}`, fields);
}
case 'refresh_reddit_session': {
const target = args.target || 'magpie';
return await api('POST', `/reddit/refresh-session?target=${target}`);
}
case 'reddit_session_status': {
const target = args.target || 'magpie';
return await api('GET', `/reddit/session-status?target=${target}`);
}
default:
throw new Error(`Unknown tool: ${name}`);
}

View file

@ -342,79 +342,11 @@ def seed(store: Store) -> None:
store.create_variant(campaign_id=cid, **v)
print(f" variant: {v['sub_pattern']!r}")
# --- r/Flipping Sunday self-promo (Snipe) ---
flipping_campaign = store.get_or_create_campaign(
name="Snipe | Sunday self-promo — r/Flipping",
product="snipe",
platform="reddit",
type="reddit_comment",
cron_schedule="0 16 * * 0", # every Sunday 16:00 UTC
)
flipping_status = "skip" if flipping_campaign["name"] in existing_names else "+"
print(f" [{flipping_status}] campaign {flipping_campaign['id']}: {flipping_campaign['name']!r}")
store.upsert_campaign_sub(
campaign_id=flipping_campaign["id"],
sub="Flipping",
sort_order=0,
thread_title_pattern="Weekly Self-Promotion",
occurrence="every",
)
print(" sub: r/Flipping (thread_title_pattern='Weekly Self-Promotion', occurrence='every')")
store.upsert_variant(
campaign_id=flipping_campaign["id"],
sub_pattern="*",
title="",
body=(
"Working on evaluating auction listings? I built **Snipe** — "
"a trust-scoring tool for eBay and estate auction platforms. "
"It checks seller history, flags marketing photos, and scores "
"listings before you bid.\n\n"
"Still in beta, free to try: https://circuitforge.tech\n\n"
"Happy to answer questions about how it works."
),
)
print(" variant: '*'")
# --- r/cscareerquestions first-Sunday megathread (Peregrine) ---
cscq_campaign = store.get_or_create_campaign(
name="Peregrine | First-Sunday megathread — r/cscareerquestions",
product="peregrine",
platform="reddit",
type="reddit_comment",
cron_schedule="0 16 * * 0", # every Sunday 16:00 UTC; occurrence gates to first_sunday
)
cscq_status = "skip" if cscq_campaign["name"] in existing_names else "+"
print(f" [{cscq_status}] campaign {cscq_campaign['id']}: {cscq_campaign['name']!r}")
store.upsert_campaign_sub(
campaign_id=cscq_campaign["id"],
sub="cscareerquestions",
sort_order=0,
thread_title_pattern="Monthly Resume",
occurrence="first_sunday",
)
print(" sub: r/cscareerquestions (thread_title_pattern='Monthly Resume', occurrence='first_sunday')")
store.upsert_variant(
campaign_id=cscq_campaign["id"],
sub_pattern="*",
title="",
body=(
"I'm building **Peregrine** — a local-first job search assistant "
"for neurodivergent and adaptive-needs folks. It helps with "
"cover letters, interview prep, and tracking applications without "
"your data leaving your machine.\n\n"
"Free tier available: https://circuitforge.tech/peregrine\n\n"
"Built by someone who's been through the grind — genuinely trying "
"to make this less awful."
),
)
print(" variant: '*'")
print()
print("Seeding sub rules...")
for rule in SUB_RULES:
sub = rule["sub"]
fields = {k: v for k, v in rule.items() if k != "sub"}
store.upsert_sub_rules(sub=sub, last_checked="2026-04-21", **fields)
sub = rule.pop("sub")
store.upsert_sub_rules(sub=sub, last_checked="2026-04-21", **rule)
allowed = "OK" if rule.get("promo_allowed") else "BANNED"
print(f" r/{sub}: {allowed}")

View file

View file

@ -1,12 +0,0 @@
from app.services.platforms.base import PostResult
def test_post_result_requires_url():
r = PostResult(url="https://reddit.com/r/test/comments/abc")
assert r.url == "https://reddit.com/r/test/comments/abc"
assert r.metadata is None
def test_post_result_accepts_metadata():
r = PostResult(url="https://example.com", metadata={"id": "abc123"})
assert r.metadata == {"id": "abc123"}

View file

@ -1,101 +0,0 @@
from datetime import date
from unittest.mock import MagicMock
import pytest
from app.services.platforms.reddit_comment import (
is_nth_weekday,
parse_occurrence,
_extract_thread_id_from_url,
_find_sticky,
)
# --- is_nth_weekday ---
def test_first_sunday_of_april_2026():
# 2026-04-05 is the first Sunday of April 2026
assert is_nth_weekday(date(2026, 4, 5), weekday=6, n=1) is True
def test_second_sunday_of_april_2026_is_not_first():
assert is_nth_weekday(date(2026, 4, 12), weekday=6, n=1) is False
def test_first_sunday_of_may_2026():
# 2026-05-03 is the first Sunday of May 2026
assert is_nth_weekday(date(2026, 5, 3), weekday=6, n=1) is True
def test_last_friday_not_matched_by_first():
# 2026-04-24 is the last Friday of April — not the first
assert is_nth_weekday(date(2026, 4, 24), weekday=4, n=1) is False
def test_first_friday_of_april_2026():
# 2026-04-03 is the first Friday
assert is_nth_weekday(date(2026, 4, 3), weekday=4, n=1) is True
# --- parse_occurrence ---
def test_parse_occurrence_every_returns_none():
assert parse_occurrence("every") is None
def test_parse_occurrence_none_returns_none():
assert parse_occurrence(None) is None
def test_parse_occurrence_first_sunday():
weekday, n = parse_occurrence("first_sunday")
assert weekday == 6 and n == 1
def test_parse_occurrence_unknown_raises():
with pytest.raises(ValueError):
parse_occurrence("fourth_wednesday")
# --- _extract_thread_id_from_url ---
def test_extract_thread_id_success():
url = "https://www.reddit.com/r/flipping/comments/abc123/weekly_thread/"
assert _extract_thread_id_from_url(url) == "abc123"
def test_extract_thread_id_invalid():
with pytest.raises(ValueError):
_extract_thread_id_from_url("https://www.reddit.com/r/flipping/")
# --- _find_sticky ---
def _make_hot_json(posts: list[dict]) -> dict:
"""Build a fake Reddit hot.json payload."""
return {"data": {"children": [{"data": p} for p in posts]}}
def test_find_sticky_found(monkeypatch):
fake_response = MagicMock()
fake_response.json.return_value = _make_hot_json([
{"id": "abc123", "title": "Weekly Self-Promotion Thread"},
{"id": "xyz999", "title": "General Discussion"},
])
import app.services.platforms.reddit_comment as rc_module
monkeypatch.setattr(rc_module.httpx, "get", lambda *a, **kw: fake_response)
result = _find_sticky("flipping", "Self-Promotion")
assert result == "abc123"
def test_find_sticky_not_found(monkeypatch):
fake_response = MagicMock()
fake_response.json.return_value = _make_hot_json([
{"id": "xyz999", "title": "General Discussion"},
])
import app.services.platforms.reddit_comment as rc_module
monkeypatch.setattr(rc_module.httpx, "get", lambda *a, **kw: fake_response)
result = _find_sticky("flipping", "Self-Promotion")
assert result is None

View file

@ -1,90 +0,0 @@
from unittest.mock import patch, MagicMock
import pytest
from app.services.platforms.reddit_comment import RedditCommentStrategy
from app.services.platforms.base import PostResult
@pytest.fixture
def strategy():
return RedditCommentStrategy()
def test_execute_with_url_override(strategy):
"""Uses thread_url_override to get thread_id, calls client.comment()"""
mock_client = MagicMock()
mock_client.comment.return_value = "https://www.reddit.com/r/flipping/comments/abc123/_/xyz789/"
with patch("app.services.platforms.reddit_comment.RedditClient", return_value=mock_client):
with patch("app.services.platforms.reddit_comment.get_settings") as mock_settings:
mock_settings.return_value.reddit_session_file = "/fake/session.json"
result = strategy.execute(
target="flipping",
title="ignored",
body="Hello thread!",
extra={"thread_url_override": "https://www.reddit.com/r/flipping/comments/abc123/weekly/"},
)
assert result.url == "https://www.reddit.com/r/flipping/comments/abc123/_/xyz789/"
assert result.metadata["thread_id"] == "abc123"
mock_client.comment.assert_called_once_with(thread_id="abc123", body="Hello thread!")
def test_execute_with_title_pattern_found(strategy):
"""Uses _find_sticky to locate thread, posts comment"""
with patch("app.services.platforms.reddit_comment._find_sticky", return_value="def456"):
with patch("app.services.platforms.reddit_comment.RedditClient") as MockClient:
with patch("app.services.platforms.reddit_comment.get_settings") as mock_settings:
mock_settings.return_value.reddit_session_file = "/fake/session.json"
MockClient.return_value.comment.return_value = ""
result = strategy.execute(
target="cscareerquestions",
title="ignored",
body="Job search tool",
extra={"thread_title_pattern": "Monthly Resume"},
)
assert "def456" in result.url
assert result.metadata["thread_id"] == "def456"
def test_execute_thread_not_found(strategy):
"""Raises ValueError when _find_sticky returns None"""
with patch("app.services.platforms.reddit_comment._find_sticky", return_value=None):
with patch("app.services.platforms.reddit_comment.get_settings") as mock_settings:
mock_settings.return_value.reddit_session_file = "/fake/session.json"
with pytest.raises(ValueError, match="No thread matching"):
strategy.execute(
target="cscareerquestions",
title="ignored",
body="body",
extra={"thread_title_pattern": "Monthly Resume"},
)
def test_execute_no_extra_raises(strategy):
"""Raises ValueError when neither thread_url_override nor thread_title_pattern provided"""
with patch("app.services.platforms.reddit_comment.get_settings") as mock_settings:
mock_settings.return_value.reddit_session_file = "/fake/session.json"
with pytest.raises(ValueError, match="requires thread_url_override or thread_title_pattern"):
strategy.execute(target="flipping", title="t", body="b", extra={})
def test_reconstructed_url_on_empty_comment_url(strategy):
"""When client.comment() returns empty string, reconstructs URL from thread_id"""
with patch("app.services.platforms.reddit_comment.RedditClient") as MockClient:
with patch("app.services.platforms.reddit_comment.get_settings") as mock_settings:
mock_settings.return_value.reddit_session_file = "/fake/session.json"
MockClient.return_value.comment.return_value = ""
result = strategy.execute(
target="flipping",
title="t",
body="b",
extra={"thread_url_override": "https://www.reddit.com/r/flipping/comments/abc123/weekly/"},
)
assert result.url == "https://www.reddit.com/r/flipping/comments/abc123/"
def test_supports_dupe_guard_false(strategy):
assert strategy.supports_dupe_guard() is False
def test_registry_contains_reddit_comment():
from app.services.platforms import _REGISTRY
assert "reddit_comment" in _REGISTRY

View file

@ -1,58 +0,0 @@
from unittest.mock import MagicMock, patch
from app.services.platforms.reddit_post import RedditPostStrategy
from app.services.platforms.base import PostResult
def test_execute_delegates_to_reddit_client(tmp_path):
session_file = tmp_path / "session.json"
session_file.write_text('{"cookies": [], "origins": []}')
mock_client = MagicMock()
mock_client.post.return_value = "https://reddit.com/r/test/comments/abc/title/"
with patch("app.services.platforms.reddit_post.RedditClient", return_value=mock_client):
with patch("app.services.platforms.reddit_post.get_settings") as mock_settings:
mock_settings.return_value.reddit_session_file = str(session_file)
strategy = RedditPostStrategy()
result = strategy.execute(
target="selfhosted",
title="Test Title",
body="Test body",
flair="Showcase",
)
mock_client.post.assert_called_once_with(
sub="selfhosted",
title="Test Title",
body="Test body",
flair="Showcase",
)
assert isinstance(result, PostResult)
assert result.url == "https://reddit.com/r/test/comments/abc/title/"
def test_execute_propagates_client_error(tmp_path):
session_file = tmp_path / "session.json"
session_file.write_text('{"cookies": [], "origins": []}')
mock_client = MagicMock()
mock_client.post.side_effect = RuntimeError("Playwright timeout")
with patch("app.services.platforms.reddit_post.RedditClient", return_value=mock_client):
with patch("app.services.platforms.reddit_post.get_settings") as mock_settings:
mock_settings.return_value.reddit_session_file = str(session_file)
strategy = RedditPostStrategy()
try:
strategy.execute(target="selfhosted", title="T", body="B")
assert False, "Expected RuntimeError"
except RuntimeError as e:
assert "Playwright timeout" in str(e)
def test_campaign_type():
assert RedditPostStrategy.campaign_type == "reddit_post"
def test_supports_dupe_guard():
assert RedditPostStrategy().supports_dupe_guard() is True

View file

@ -1,17 +0,0 @@
import pytest
from app.services.platforms import get_client, SUPPORTED_PLATFORMS
from app.services.platforms.reddit_post import RedditPostStrategy
def test_get_client_returns_reddit_post_strategy():
client = get_client("reddit_post")
assert isinstance(client, RedditPostStrategy)
def test_get_client_unknown_type_raises():
with pytest.raises(ValueError, match="Unknown campaign type"):
get_client("nonexistent_type")
def test_supported_platforms_contains_reddit_post():
assert "reddit_post" in SUPPORTED_PLATFORMS

View file

@ -1,239 +0,0 @@
from unittest.mock import MagicMock, patch
from app.services.poster import _run_post
def _make_store(
*,
already_posted=False,
variant=None,
rules=None,
campaign_type="reddit_post",
subs=None,
):
store = MagicMock()
store.already_posted_this_week.return_value = already_posted
store.resolve_variant.return_value = variant or {
"id": 1,
"title": "Test Title",
"body": "Test body",
"flair": None,
}
store.get_sub_rules.return_value = rules
store.get_campaign.return_value = {
"id": 1,
"name": "Test",
"type": campaign_type,
"platform": "reddit",
}
store.create_post.return_value = {"id": 42}
store.update_post_status.return_value = {
"id": 42, "status": "success",
"url": "https://reddit.com/r/test/comments/abc/",
}
store.list_campaign_subs.return_value = subs or []
return store
def test_run_post_dispatches_by_campaign_type(tmp_path):
db = str(tmp_path / "test.db")
mock_store = _make_store()
mock_result = MagicMock()
mock_result.url = "https://reddit.com/r/test/comments/abc/"
mock_strategy = MagicMock()
mock_strategy.supports_dupe_guard.return_value = True
mock_strategy.execute.return_value = mock_result
with patch("app.services.poster.Store", return_value=mock_store):
with patch("app.services.poster.get_client", return_value=mock_strategy) as mock_get_client:
result = _run_post(db, campaign_id=1, target="selfhosted", triggered_by="manual")
mock_get_client.assert_called_once_with("reddit_post")
mock_strategy.execute.assert_called_once()
assert result["status"] == "success"
def test_run_post_skips_when_dupe_guard_fires(tmp_path):
db = str(tmp_path / "test.db")
mock_store = _make_store(already_posted=True)
mock_strategy = MagicMock()
mock_strategy.supports_dupe_guard.return_value = True
with patch("app.services.poster.Store", return_value=mock_store):
with patch("app.services.poster.get_client", return_value=mock_strategy):
result = _run_post(db, campaign_id=1, target="selfhosted", triggered_by="manual")
assert result["skipped"] is True
mock_strategy.execute.assert_not_called()
def test_run_post_skips_dupe_guard_when_strategy_opts_out(tmp_path):
db = str(tmp_path / "test.db")
mock_store = _make_store(already_posted=True, campaign_type="blog_post")
mock_result = MagicMock()
mock_result.url = "https://circuitforge.tech/blog/test-post"
mock_strategy = MagicMock()
mock_strategy.supports_dupe_guard.return_value = False
mock_strategy.execute.return_value = mock_result
with patch("app.services.poster.Store", return_value=mock_store):
with patch("app.services.poster.get_client", return_value=mock_strategy):
result = _run_post(db, campaign_id=1, target="blog", triggered_by="scheduler")
mock_strategy.execute.assert_called_once()
assert result["status"] == "success"
def test_run_post_skips_banned_sub(tmp_path):
db = str(tmp_path / "test.db")
mock_store = _make_store(rules={"promo_allowed": 0})
mock_strategy = MagicMock()
mock_strategy.supports_dupe_guard.return_value = True
with patch("app.services.poster.Store", return_value=mock_store):
with patch("app.services.poster.get_client", return_value=mock_strategy):
result = _run_post(db, campaign_id=1, target="ADHD", triggered_by="scheduler")
assert result["skipped"] is True
mock_strategy.execute.assert_not_called()
def test_run_post_unknown_type_skips(tmp_path):
db = str(tmp_path / "test.db")
mock_store = _make_store(campaign_type="future_platform")
with patch("app.services.poster.Store", return_value=mock_store):
with patch("app.services.poster.get_client", side_effect=ValueError("Unknown campaign type")):
result = _run_post(db, campaign_id=1, target="some_target", triggered_by="scheduler")
assert result["skipped"] is True
assert "Unknown campaign type" in result["reason"]
def test_occurrence_skip(tmp_path):
"""When occurrence is 'first_sunday' and today is NOT the first Sunday, post is skipped."""
db = str(tmp_path / "test.db")
# 2026-04-19 is a Sunday but the 3rd Sunday of April 2026
mock_store = _make_store(
campaign_type="reddit_comment",
subs=[{"sub": "selfhosted", "active": 1, "occurrence": "first_sunday"}],
)
mock_strategy = MagicMock()
mock_strategy.supports_dupe_guard.return_value = False
with patch("app.services.poster.Store", return_value=mock_store):
with patch("app.services.poster.get_client", return_value=mock_strategy):
with patch("app.services.poster.date") as mock_date:
from datetime import date as real_date
mock_date.today.return_value = real_date(2026, 4, 19)
result = _run_post(db, campaign_id=1, target="selfhosted", triggered_by="scheduler")
assert result["skipped"] is True
assert "occurrence" in result["reason"]
mock_strategy.execute.assert_not_called()
def test_occurrence_passes(tmp_path):
"""When occurrence is 'first_sunday' and today IS the first Sunday, post proceeds."""
db = str(tmp_path / "test.db")
# 2026-05-03 is the first Sunday of May 2026
mock_store = _make_store(
campaign_type="reddit_comment",
subs=[{"sub": "selfhosted", "active": 1, "occurrence": "first_sunday"}],
)
mock_result = MagicMock()
mock_result.url = "https://reddit.com/r/selfhosted/comments/abc/"
mock_strategy = MagicMock()
mock_strategy.supports_dupe_guard.return_value = False
mock_strategy.execute.return_value = mock_result
with patch("app.services.poster.Store", return_value=mock_store):
with patch("app.services.poster.get_client", return_value=mock_strategy):
with patch("app.services.poster.date") as mock_date:
from datetime import date as real_date
mock_date.today.return_value = real_date(2026, 5, 3)
result = _run_post(db, campaign_id=1, target="selfhosted", triggered_by="scheduler")
assert result.get("status") == "success"
mock_strategy.execute.assert_called_once()
def test_occurrence_every_passes_through(tmp_path):
"""occurrence='every' means post every time — no filtering, execute is called."""
db = str(tmp_path / "test.db")
# parse_occurrence("every") returns None → no filtering → execute runs
mock_store = _make_store(
campaign_type="reddit_comment",
subs=[{"sub": "selfhosted", "active": 1, "occurrence": "every"}],
)
mock_result = MagicMock()
mock_result.url = "https://reddit.com/r/selfhosted/comments/abc/"
mock_strategy = MagicMock()
mock_strategy.supports_dupe_guard.return_value = False
mock_strategy.execute.return_value = mock_result
with patch("app.services.poster.Store", return_value=mock_store):
with patch("app.services.poster.get_client", return_value=mock_strategy):
with patch("app.services.poster.date") as mock_date:
from datetime import date as real_date
mock_date.today.return_value = real_date(2026, 5, 3)
result = _run_post(db, campaign_id=1, target="selfhosted", triggered_by="scheduler")
assert result.get("status") == "success"
mock_strategy.execute.assert_called_once()
def test_occurrence_none_sub_row_key_passes_through(tmp_path):
"""Sub row exists but has no occurrence key — should post normally."""
db = str(tmp_path / "test.db")
# sub_row has no "occurrence" key → .get() returns None → parse_occurrence(None) returns None
mock_store = _make_store(
campaign_type="reddit_comment",
subs=[{"sub": "selfhosted", "active": 1}],
)
mock_result = MagicMock()
mock_result.url = "https://reddit.com/r/selfhosted/comments/abc/"
mock_strategy = MagicMock()
mock_strategy.supports_dupe_guard.return_value = False
mock_strategy.execute.return_value = mock_result
with patch("app.services.poster.Store", return_value=mock_store):
with patch("app.services.poster.get_client", return_value=mock_strategy):
with patch("app.services.poster.date") as mock_date:
from datetime import date as real_date
mock_date.today.return_value = real_date(2026, 5, 3)
result = _run_post(db, campaign_id=1, target="selfhosted", triggered_by="scheduler")
assert result.get("status") == "success"
mock_strategy.execute.assert_called_once()
def test_occurrence_invalid_string_skips(tmp_path):
"""Malformed occurrence string results in skipped=True, not a crash."""
db = str(tmp_path / "test.db")
# "weekly" is not a valid occurrence string — parse_occurrence raises ValueError
mock_store = _make_store(
campaign_type="reddit_comment",
subs=[{"sub": "selfhosted", "active": 1, "occurrence": "weekly"}],
)
mock_strategy = MagicMock()
mock_strategy.supports_dupe_guard.return_value = False
with patch("app.services.poster.Store", return_value=mock_store):
with patch("app.services.poster.get_client", return_value=mock_strategy):
with patch("app.services.poster.date") as mock_date:
from datetime import date as real_date
mock_date.today.return_value = real_date(2026, 5, 3)
result = _run_post(db, campaign_id=1, target="selfhosted", triggered_by="scheduler")
assert result.get("skipped") is True
assert "invalid occurrence" in result.get("reason", "")
mock_strategy.execute.assert_not_called()