magpie/app/services/poster.py
pyr0ball 81a63ab0ec refactor: dispatch poster by campaign.type via platform strategy registry
Replace hardcoded platform dispatch with get_client(campaign["type"]) so
any future campaign type (blog_post, email, etc.) routes automatically
through the strategy registry. Adds dupe-guard opt-out per strategy,
sub_row pre-fetch for extra metadata, and 5 new TDD tests (14 total).
2026-04-27 08:38:12 -07:00

105 lines
3.9 KiB
Python

"""
Posting service: orchestrates variant resolution, dupe guard, and post execution.
Called by the scheduler and by the manual-trigger API endpoint.
"""
from __future__ import annotations
import asyncio
import logging
from pathlib import Path
from app.core.config import get_settings
from app.db.store import Store
from app.services.platforms import get_client
logger = logging.getLogger(__name__)
def _run_post(db_path: str, campaign_id: int, target: str,
triggered_by: str = "scheduler") -> dict:
"""Execute a single post attempt (blocking, runs in a thread)."""
store = Store(db_path)
try:
campaign = store.get_campaign(campaign_id)
if campaign is None:
return {"skipped": True, "reason": f"campaign {campaign_id} not found"}
campaign_type = campaign.get("type", "reddit_post")
# Resolve strategy — skip if type is unknown
try:
strategy = get_client(campaign_type)
except ValueError as exc:
return {"skipped": True, "reason": str(exc)}
# Fetch the campaign_subs row for this target (used for extra + occurrence)
all_subs = store.list_campaign_subs(campaign_id)
sub_row = next((s for s in all_subs if s["sub"] == target), {})
# Dupe guard (opt-out allowed per strategy)
if strategy.supports_dupe_guard() and store.already_posted_this_week(campaign_id, target):
return {"skipped": True, "reason": f"already posted to {target!r} this week"}
# Resolve best content variant for this target
variant = store.resolve_variant(campaign_id, target)
if variant is None:
return {"skipped": True, "reason": "no variant found for campaign"}
# Check platform rules (Reddit-specific; harmless for other platforms)
rules = store.get_sub_rules(target)
if rules and rules.get("promo_allowed") == 0:
return {"skipped": True, "reason": f"{target!r} hard-bans promotion"}
# Create pending post record
post = store.create_post(
campaign_id=campaign_id,
target=target,
variant_id=variant["id"],
platform=campaign.get("platform", "reddit"),
triggered_by=triggered_by,
)
post_id = post["id"]
# Build extra dict from sub_row
extra = dict(sub_row)
# Execute strategy
flair = variant.get("flair") or (rules.get("flair_to_use") if rules else None)
try:
result = strategy.execute(
target=target,
title=variant.get("title", ""),
body=variant.get("body", ""),
flair=flair,
extra=extra,
)
return store.update_post_status(post_id, "success", url=result.url)
except Exception as exc:
logger.exception("Strategy %s failed for target %r", campaign_type, target)
return store.update_post_status(post_id, "failed", error_msg=str(exc))
finally:
store.close()
async def post_campaign_to_sub(campaign_id: int, target: str,
triggered_by: str = "scheduler") -> dict:
"""Async wrapper for API and scheduler use."""
db_path = get_settings().db_path
return await asyncio.to_thread(_run_post, db_path, campaign_id, target, triggered_by)
async def run_campaign(campaign_id: int, triggered_by: str = "scheduler") -> list[dict]:
"""Post a campaign to all of its configured targets, sequentially."""
db_path = get_settings().db_path
store = Store(db_path)
try:
subs = store.list_campaign_subs(campaign_id)
active_targets = [s["sub"] for s in subs if s.get("active", 1)]
finally:
store.close()
results = []
for target in active_targets:
result = await post_campaign_to_sub(campaign_id, target, triggered_by)
results.append(result)
return results