diff --git a/app/api/endpoints/posts.py b/app/api/endpoints/posts.py index 4c956ab..abaa298 100644 --- a/app/api/endpoints/posts.py +++ b/app/api/endpoints/posts.py @@ -47,3 +47,12 @@ async def get_engagement(post_id: int): if result is None: raise HTTPException(404, "No engagement data for this post") return result + + +@router.post("/poll-engagement") +async def poll_engagement(): + """Manually trigger an engagement poll for all recent posts.""" + from app.services.engagement import poll_recent_posts + settings = get_settings() + result = await poll_recent_posts(settings.db_path) + return result diff --git a/app/db/store.py b/app/db/store.py index 1708009..fd57a48 100644 --- a/app/db/store.py +++ b/app/db/store.py @@ -282,15 +282,48 @@ class Store: limit: int = 50) -> list[dict]: clauses, params = [], [] if campaign_id is not None: - clauses.append("campaign_id = ?") + clauses.append("p.campaign_id = ?") params.append(campaign_id) if target is not None: - clauses.append("target = ?") + clauses.append("p.target = ?") params.append(target) where = f"WHERE {' AND '.join(clauses)}" if clauses else "" params.append(limit) + # LEFT JOIN latest engagement snapshot so the frontend avoids N+1 calls. return self._fetchall( - f"SELECT * FROM posts {where} ORDER BY posted_at DESC LIMIT ?", tuple(params) + f""" + SELECT p.*, + e.score, e.upvotes, e.comments AS comment_count, e.awards, + e.checked_at AS engagement_checked_at + FROM posts p + LEFT JOIN engagement e ON e.id = ( + SELECT id FROM engagement WHERE post_id = p.id ORDER BY checked_at DESC LIMIT 1 + ) + {where} + ORDER BY p.posted_at DESC LIMIT ? + """, + tuple(params), + ) + + def list_posts_needing_poll( + self, max_age_hours: int = 72, recheck_hours: int = 1 + ) -> list[dict]: + """Return recent successful Reddit posts that are due for an engagement check.""" + return self._fetchall( + """ + SELECT p.* FROM posts p + WHERE p.status = 'success' + AND p.platform = 'reddit' + AND p.url IS NOT NULL + AND p.posted_at >= datetime('now', ?) + AND ( + NOT EXISTS (SELECT 1 FROM engagement e WHERE e.post_id = p.id) + OR (SELECT MAX(checked_at) FROM engagement e WHERE e.post_id = p.id) + <= datetime('now', ?) + ) + ORDER BY p.posted_at ASC + """, + (f"-{max_age_hours} hours", f"-{recheck_hours} hours"), ) def create_post(self, campaign_id: int, target: str, variant_id: int | None = None, diff --git a/app/main.py b/app/main.py index b0ab2b8..05c665c 100644 --- a/app/main.py +++ b/app/main.py @@ -13,7 +13,7 @@ from app.core.logging_config import configure_logging from app.db.store import Store from app.services.scheduler import ( start_scheduler, stop_scheduler, sync_all_campaigns, - start_scraper_job, + start_scraper_job, start_engagement_job, ) configure_logging() @@ -47,6 +47,12 @@ async def lifespan(app: FastAPI): start_scraper_job(interval_mins=settings.scraper_interval_mins) logger.info("Signal scraper scheduled every %d min", settings.scraper_interval_mins) + # Start engagement polling job (always on; runs hourly) + if not settings.scheduler_enabled and not settings.scraper_enabled: + start_scheduler() + start_engagement_job(settings.db_path) + logger.info("Engagement poll scheduled hourly") + store.close() yield diff --git a/app/services/engagement.py b/app/services/engagement.py new file mode 100644 index 0000000..111edf9 --- /dev/null +++ b/app/services/engagement.py @@ -0,0 +1,92 @@ +""" +Engagement polling service. + +Periodically fetches score, comments, and awards for recent successful Reddit +posts and records snapshots in the engagement table. +""" +from __future__ import annotations + +import asyncio +import logging + +from app.db.store import Store + +logger = logging.getLogger(__name__) + + +def _poll_sync(db_path: str, max_age_hours: int = 72, recheck_hours: int = 1) -> dict: + store = Store(db_path) + try: + posts = store.list_posts_needing_poll(max_age_hours, recheck_hours) + if not posts: + logger.debug("Engagement poll: no posts due for a check") + return {"polled": 0, "errors": 0} + + # Import here to avoid circular deps; session validation deferred to first use. + from app.services.reddit.client import RedditClient + + try: + client = RedditClient() + except Exception: + logger.warning( + "Reddit session unavailable — engagement poll will run without auth" + ) + client = None + + polled = errors = 0 + for post in posts: + try: + if client is not None: + stats = client.fetch_stats(post["url"]) + else: + # Unauthenticated fallback: still works for public posts. + import re + import httpx + + match = re.search(r"/comments/([a-z0-9]+)/", post["url"]) + if not match: + continue + resp = httpx.get( + f"https://www.reddit.com/by_id/t3_{match.group(1)}.json", + headers={"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Chrome/124.0.0.0"}, + timeout=15, + ) + children = resp.json().get("data", {}).get("children", []) if resp.status_code == 200 else [] + if not children: + continue + d = children[0].get("data", {}) + stats = { + "score": d.get("score"), + "upvotes": d.get("ups"), + "comments": d.get("num_comments"), + "awards": d.get("total_awards_received", 0), + } + + if stats: + store.record_engagement( + post["id"], + score=stats.get("score"), + upvotes=stats.get("upvotes"), + comments=stats.get("comments"), + awards=stats.get("awards", 0), + ) + logger.debug( + "Post %d engagement: score=%s comments=%s", + post["id"], stats.get("score"), stats.get("comments"), + ) + polled += 1 + except Exception: + logger.exception("Failed to poll engagement for post %d", post["id"]) + errors += 1 + + logger.info("Engagement poll done: %d polled, %d errors", polled, errors) + return {"polled": polled, "errors": errors} + finally: + store.close() + + +async def poll_recent_posts( + db_path: str, max_age_hours: int = 72, recheck_hours: int = 1 +) -> dict: + """Async wrapper for the scheduler and API trigger.""" + return await asyncio.to_thread(_poll_sync, db_path, max_age_hours, recheck_hours) diff --git a/app/services/reddit/client.py b/app/services/reddit/client.py index a41a786..61ad8a4 100644 --- a/app/services/reddit/client.py +++ b/app/services/reddit/client.py @@ -124,6 +124,33 @@ class RedditClient: ) return permalink + def fetch_stats(self, url: str) -> dict | None: + """Fetch current score, upvotes, comments, and awards for a Reddit post URL.""" + import re + match = re.search(r"/comments/([a-z0-9]+)/", url) + if not match: + return None + post_id = match.group(1) + resp = httpx.get( + f"https://www.reddit.com/by_id/t3_{post_id}.json", + cookies=self.cookies, + headers=self.headers, + timeout=15, + ) + if resp.status_code != 200: + return None + data = resp.json() + children = data.get("data", {}).get("children", []) + if not children: + return None + post_data = children[0].get("data", {}) + return { + "score": post_data.get("score"), + "upvotes": post_data.get("ups"), + "comments": post_data.get("num_comments"), + "awards": post_data.get("total_awards_received", 0), + } + def delete(self, post_url: str) -> None: """Delete a post by URL.""" import re diff --git a/app/services/scheduler.py b/app/services/scheduler.py index 9089ece..5885638 100644 --- a/app/services/scheduler.py +++ b/app/services/scheduler.py @@ -171,3 +171,38 @@ def stop_scraper_job() -> None: if existing: existing.remove() logger.info("Signal scraper job removed") + + +# ------------------------------------------------------------------ # +# Engagement polling job +# ------------------------------------------------------------------ # + +_ENGAGEMENT_JOB_ID = "engagement_poll" + + +async def _run_engagement_job(db_path: str) -> None: + from app.services.engagement import poll_recent_posts + logger.info("Engagement poll job starting") + try: + result = await poll_recent_posts(db_path) + logger.info("Engagement poll done: %s", result) + except Exception: + logger.exception("Unhandled error in engagement poll job") + + +def start_engagement_job(db_path: str, interval_hours: int = 1) -> None: + """Register (or replace) the hourly engagement polling job.""" + sched = get_scheduler() + existing = sched.get_job(_ENGAGEMENT_JOB_ID) + if existing: + existing.remove() + + sched.add_job( + _run_engagement_job, + trigger=IntervalTrigger(hours=interval_hours, timezone="UTC"), + id=_ENGAGEMENT_JOB_ID, + args=[db_path], + replace_existing=True, + misfire_grace_time=600, + ) + logger.info("Engagement poll scheduled every %d hour(s)", interval_hours) diff --git a/frontend/src/components/PostsView.vue b/frontend/src/components/PostsView.vue index 9e5b18a..6d4d497 100644 --- a/frontend/src/components/PostsView.vue +++ b/frontend/src/components/PostsView.vue @@ -2,6 +2,9 @@
@@ -12,6 +15,8 @@ Target Status Triggered by + Score + Comments When Link @@ -25,14 +30,22 @@ {{ p.triggered_by }} - {{ formatDate(p.posted_at) }} + + {{ p.score }} + + + + {{ p.comment_count }} + + + {{ formatDate(p.posted_at) }} view → - + - No posts yet. + No posts yet. @@ -41,12 +54,16 @@ + + diff --git a/frontend/src/services/api.ts b/frontend/src/services/api.ts index 1850cb4..04e2aaa 100644 --- a/frontend/src/services/api.ts +++ b/frontend/src/services/api.ts @@ -76,6 +76,12 @@ export interface Post { screenshot_path: string | null triggered_by: string posted_at: string + // Engagement snapshot (null when no data has been collected yet) + score: number | null + upvotes: number | null + comment_count: number | null + awards: number | null + engagement_checked_at: string | null } export interface SubRules { @@ -274,6 +280,9 @@ export const api = { triggerSingle: (campaignId: number, sub: string) => http.post('/posts/trigger', { campaign_id: campaignId, sub }).then(r => r.data), + + pollEngagement: () => + http.post<{ polled: number; errors: number }>('/posts/poll-engagement').then(r => r.data), }, opportunities: {