feat(engagement): poll Reddit post metrics after posting (#6)
- Add RedditClient.fetch_stats() — fetches score/upvotes/comments/awards via by_id API - Add Store.list_posts_needing_poll() — selects successful Reddit posts not checked within recheck window - Add Store.list_posts() LEFT JOIN latest engagement snapshot (avoids N+1 on frontend) - Add app/services/engagement.py — poll_recent_posts() async service with unauthenticated fallback - Register hourly engagement poll job in APScheduler at startup - Add POST /posts/poll-engagement for manual triggers - Update Post interface with engagement fields (score, comment_count, awards, engagement_checked_at) - Add Score/Comments columns and poll button to PostsView Closes: #6
This commit is contained in:
parent
f90124dabe
commit
dfdde692b8
8 changed files with 260 additions and 8 deletions
|
|
@ -47,3 +47,12 @@ async def get_engagement(post_id: int):
|
|||
if result is None:
|
||||
raise HTTPException(404, "No engagement data for this post")
|
||||
return result
|
||||
|
||||
|
||||
@router.post("/poll-engagement")
|
||||
async def poll_engagement():
|
||||
"""Manually trigger an engagement poll for all recent posts."""
|
||||
from app.services.engagement import poll_recent_posts
|
||||
settings = get_settings()
|
||||
result = await poll_recent_posts(settings.db_path)
|
||||
return result
|
||||
|
|
|
|||
|
|
@ -282,15 +282,48 @@ class Store:
|
|||
limit: int = 50) -> list[dict]:
|
||||
clauses, params = [], []
|
||||
if campaign_id is not None:
|
||||
clauses.append("campaign_id = ?")
|
||||
clauses.append("p.campaign_id = ?")
|
||||
params.append(campaign_id)
|
||||
if target is not None:
|
||||
clauses.append("target = ?")
|
||||
clauses.append("p.target = ?")
|
||||
params.append(target)
|
||||
where = f"WHERE {' AND '.join(clauses)}" if clauses else ""
|
||||
params.append(limit)
|
||||
# LEFT JOIN latest engagement snapshot so the frontend avoids N+1 calls.
|
||||
return self._fetchall(
|
||||
f"SELECT * FROM posts {where} ORDER BY posted_at DESC LIMIT ?", tuple(params)
|
||||
f"""
|
||||
SELECT p.*,
|
||||
e.score, e.upvotes, e.comments AS comment_count, e.awards,
|
||||
e.checked_at AS engagement_checked_at
|
||||
FROM posts p
|
||||
LEFT JOIN engagement e ON e.id = (
|
||||
SELECT id FROM engagement WHERE post_id = p.id ORDER BY checked_at DESC LIMIT 1
|
||||
)
|
||||
{where}
|
||||
ORDER BY p.posted_at DESC LIMIT ?
|
||||
""",
|
||||
tuple(params),
|
||||
)
|
||||
|
||||
def list_posts_needing_poll(
|
||||
self, max_age_hours: int = 72, recheck_hours: int = 1
|
||||
) -> list[dict]:
|
||||
"""Return recent successful Reddit posts that are due for an engagement check."""
|
||||
return self._fetchall(
|
||||
"""
|
||||
SELECT p.* FROM posts p
|
||||
WHERE p.status = 'success'
|
||||
AND p.platform = 'reddit'
|
||||
AND p.url IS NOT NULL
|
||||
AND p.posted_at >= datetime('now', ?)
|
||||
AND (
|
||||
NOT EXISTS (SELECT 1 FROM engagement e WHERE e.post_id = p.id)
|
||||
OR (SELECT MAX(checked_at) FROM engagement e WHERE e.post_id = p.id)
|
||||
<= datetime('now', ?)
|
||||
)
|
||||
ORDER BY p.posted_at ASC
|
||||
""",
|
||||
(f"-{max_age_hours} hours", f"-{recheck_hours} hours"),
|
||||
)
|
||||
|
||||
def create_post(self, campaign_id: int, target: str, variant_id: int | None = None,
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ from app.core.logging_config import configure_logging
|
|||
from app.db.store import Store
|
||||
from app.services.scheduler import (
|
||||
start_scheduler, stop_scheduler, sync_all_campaigns,
|
||||
start_scraper_job,
|
||||
start_scraper_job, start_engagement_job,
|
||||
)
|
||||
|
||||
configure_logging()
|
||||
|
|
@ -47,6 +47,12 @@ async def lifespan(app: FastAPI):
|
|||
start_scraper_job(interval_mins=settings.scraper_interval_mins)
|
||||
logger.info("Signal scraper scheduled every %d min", settings.scraper_interval_mins)
|
||||
|
||||
# Start engagement polling job (always on; runs hourly)
|
||||
if not settings.scheduler_enabled and not settings.scraper_enabled:
|
||||
start_scheduler()
|
||||
start_engagement_job(settings.db_path)
|
||||
logger.info("Engagement poll scheduled hourly")
|
||||
|
||||
store.close()
|
||||
yield
|
||||
|
||||
|
|
|
|||
92
app/services/engagement.py
Normal file
92
app/services/engagement.py
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
"""
|
||||
Engagement polling service.
|
||||
|
||||
Periodically fetches score, comments, and awards for recent successful Reddit
|
||||
posts and records snapshots in the engagement table.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from app.db.store import Store
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _poll_sync(db_path: str, max_age_hours: int = 72, recheck_hours: int = 1) -> dict:
|
||||
store = Store(db_path)
|
||||
try:
|
||||
posts = store.list_posts_needing_poll(max_age_hours, recheck_hours)
|
||||
if not posts:
|
||||
logger.debug("Engagement poll: no posts due for a check")
|
||||
return {"polled": 0, "errors": 0}
|
||||
|
||||
# Import here to avoid circular deps; session validation deferred to first use.
|
||||
from app.services.reddit.client import RedditClient
|
||||
|
||||
try:
|
||||
client = RedditClient()
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Reddit session unavailable — engagement poll will run without auth"
|
||||
)
|
||||
client = None
|
||||
|
||||
polled = errors = 0
|
||||
for post in posts:
|
||||
try:
|
||||
if client is not None:
|
||||
stats = client.fetch_stats(post["url"])
|
||||
else:
|
||||
# Unauthenticated fallback: still works for public posts.
|
||||
import re
|
||||
import httpx
|
||||
|
||||
match = re.search(r"/comments/([a-z0-9]+)/", post["url"])
|
||||
if not match:
|
||||
continue
|
||||
resp = httpx.get(
|
||||
f"https://www.reddit.com/by_id/t3_{match.group(1)}.json",
|
||||
headers={"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Chrome/124.0.0.0"},
|
||||
timeout=15,
|
||||
)
|
||||
children = resp.json().get("data", {}).get("children", []) if resp.status_code == 200 else []
|
||||
if not children:
|
||||
continue
|
||||
d = children[0].get("data", {})
|
||||
stats = {
|
||||
"score": d.get("score"),
|
||||
"upvotes": d.get("ups"),
|
||||
"comments": d.get("num_comments"),
|
||||
"awards": d.get("total_awards_received", 0),
|
||||
}
|
||||
|
||||
if stats:
|
||||
store.record_engagement(
|
||||
post["id"],
|
||||
score=stats.get("score"),
|
||||
upvotes=stats.get("upvotes"),
|
||||
comments=stats.get("comments"),
|
||||
awards=stats.get("awards", 0),
|
||||
)
|
||||
logger.debug(
|
||||
"Post %d engagement: score=%s comments=%s",
|
||||
post["id"], stats.get("score"), stats.get("comments"),
|
||||
)
|
||||
polled += 1
|
||||
except Exception:
|
||||
logger.exception("Failed to poll engagement for post %d", post["id"])
|
||||
errors += 1
|
||||
|
||||
logger.info("Engagement poll done: %d polled, %d errors", polled, errors)
|
||||
return {"polled": polled, "errors": errors}
|
||||
finally:
|
||||
store.close()
|
||||
|
||||
|
||||
async def poll_recent_posts(
|
||||
db_path: str, max_age_hours: int = 72, recheck_hours: int = 1
|
||||
) -> dict:
|
||||
"""Async wrapper for the scheduler and API trigger."""
|
||||
return await asyncio.to_thread(_poll_sync, db_path, max_age_hours, recheck_hours)
|
||||
|
|
@ -124,6 +124,33 @@ class RedditClient:
|
|||
)
|
||||
return permalink
|
||||
|
||||
def fetch_stats(self, url: str) -> dict | None:
|
||||
"""Fetch current score, upvotes, comments, and awards for a Reddit post URL."""
|
||||
import re
|
||||
match = re.search(r"/comments/([a-z0-9]+)/", url)
|
||||
if not match:
|
||||
return None
|
||||
post_id = match.group(1)
|
||||
resp = httpx.get(
|
||||
f"https://www.reddit.com/by_id/t3_{post_id}.json",
|
||||
cookies=self.cookies,
|
||||
headers=self.headers,
|
||||
timeout=15,
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return None
|
||||
data = resp.json()
|
||||
children = data.get("data", {}).get("children", [])
|
||||
if not children:
|
||||
return None
|
||||
post_data = children[0].get("data", {})
|
||||
return {
|
||||
"score": post_data.get("score"),
|
||||
"upvotes": post_data.get("ups"),
|
||||
"comments": post_data.get("num_comments"),
|
||||
"awards": post_data.get("total_awards_received", 0),
|
||||
}
|
||||
|
||||
def delete(self, post_url: str) -> None:
|
||||
"""Delete a post by URL."""
|
||||
import re
|
||||
|
|
|
|||
|
|
@ -171,3 +171,38 @@ def stop_scraper_job() -> None:
|
|||
if existing:
|
||||
existing.remove()
|
||||
logger.info("Signal scraper job removed")
|
||||
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Engagement polling job
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
_ENGAGEMENT_JOB_ID = "engagement_poll"
|
||||
|
||||
|
||||
async def _run_engagement_job(db_path: str) -> None:
|
||||
from app.services.engagement import poll_recent_posts
|
||||
logger.info("Engagement poll job starting")
|
||||
try:
|
||||
result = await poll_recent_posts(db_path)
|
||||
logger.info("Engagement poll done: %s", result)
|
||||
except Exception:
|
||||
logger.exception("Unhandled error in engagement poll job")
|
||||
|
||||
|
||||
def start_engagement_job(db_path: str, interval_hours: int = 1) -> None:
|
||||
"""Register (or replace) the hourly engagement polling job."""
|
||||
sched = get_scheduler()
|
||||
existing = sched.get_job(_ENGAGEMENT_JOB_ID)
|
||||
if existing:
|
||||
existing.remove()
|
||||
|
||||
sched.add_job(
|
||||
_run_engagement_job,
|
||||
trigger=IntervalTrigger(hours=interval_hours, timezone="UTC"),
|
||||
id=_ENGAGEMENT_JOB_ID,
|
||||
args=[db_path],
|
||||
replace_existing=True,
|
||||
misfire_grace_time=600,
|
||||
)
|
||||
logger.info("Engagement poll scheduled every %d hour(s)", interval_hours)
|
||||
|
|
|
|||
|
|
@ -2,6 +2,9 @@
|
|||
<div>
|
||||
<div class="page-header">
|
||||
<h1 class="page-title">Post History</h1>
|
||||
<button class="btn btn-secondary" :disabled="polling" @click="doPoll">
|
||||
{{ polling ? 'Polling…' : '↻ Poll Engagement' }}
|
||||
</button>
|
||||
</div>
|
||||
|
||||
<div class="card" style="padding: 0; overflow: hidden;">
|
||||
|
|
@ -12,6 +15,8 @@
|
|||
<th>Target</th>
|
||||
<th>Status</th>
|
||||
<th>Triggered by</th>
|
||||
<th>Score</th>
|
||||
<th>Comments</th>
|
||||
<th>When</th>
|
||||
<th>Link</th>
|
||||
</tr>
|
||||
|
|
@ -25,14 +30,22 @@
|
|||
<span v-if="p.error_msg" :title="p.error_msg" style="color: var(--color-danger); cursor: help;"> ⚠</span>
|
||||
</td>
|
||||
<td data-label="Triggered by"><span class="badge badge-muted">{{ p.triggered_by }}</span></td>
|
||||
<td data-label="When" style="color: var(--color-text-muted); font-size: 12px;">{{ formatDate(p.posted_at) }}</td>
|
||||
<td data-label="Score" class="engagement-cell">
|
||||
<span v-if="p.score != null">{{ p.score }}</span>
|
||||
<span v-else class="muted">—</span>
|
||||
</td>
|
||||
<td data-label="Comments" class="engagement-cell">
|
||||
<span v-if="p.comment_count != null">{{ p.comment_count }}</span>
|
||||
<span v-else class="muted">—</span>
|
||||
</td>
|
||||
<td data-label="When" class="muted" style="font-size: 12px;">{{ formatDate(p.posted_at) }}</td>
|
||||
<td data-label="Link">
|
||||
<a v-if="p.url" :href="p.url" target="_blank" style="color: var(--color-primary); font-size: 12px;">view →</a>
|
||||
<span v-else style="color: var(--color-text-muted);">—</span>
|
||||
<span v-else class="muted">—</span>
|
||||
</td>
|
||||
</tr>
|
||||
<tr v-if="posts.length === 0">
|
||||
<td colspan="6" class="empty-state">No posts yet.</td>
|
||||
<td colspan="8" class="empty-state">No posts yet.</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
|
|
@ -41,12 +54,16 @@
|
|||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import { onMounted } from 'vue'
|
||||
import { ref, onMounted } from 'vue'
|
||||
import { useToast } from '@/composables/useToast'
|
||||
import { usePostStore, useCampaignStore } from '@/stores/campaigns'
|
||||
import { api } from '@/services/api'
|
||||
|
||||
const postStore = usePostStore()
|
||||
const campaignStore = useCampaignStore()
|
||||
const posts = postStore.posts
|
||||
const toast = useToast()
|
||||
const polling = ref(false)
|
||||
|
||||
onMounted(async () => {
|
||||
await Promise.all([
|
||||
|
|
@ -64,4 +81,28 @@ function formatDate(iso: string) {
|
|||
const d = new Date(iso + 'Z')
|
||||
return d.toLocaleDateString(undefined, { month: 'short', day: 'numeric', hour: '2-digit', minute: '2-digit' })
|
||||
}
|
||||
|
||||
async function doPoll() {
|
||||
polling.value = true
|
||||
try {
|
||||
const result = await api.posts.pollEngagement()
|
||||
toast.success(`Polled ${result.polled} post(s)${result.errors ? ` — ${result.errors} error(s)` : ''}`)
|
||||
await postStore.fetchPosts(undefined, undefined, 100)
|
||||
} catch {
|
||||
toast.error('Engagement poll failed')
|
||||
} finally {
|
||||
polling.value = false
|
||||
}
|
||||
}
|
||||
</script>
|
||||
|
||||
<style scoped>
|
||||
.engagement-cell {
|
||||
text-align: right;
|
||||
font-variant-numeric: tabular-nums;
|
||||
font-size: 13px;
|
||||
}
|
||||
.muted {
|
||||
color: var(--color-text-muted);
|
||||
}
|
||||
</style>
|
||||
|
|
|
|||
|
|
@ -76,6 +76,12 @@ export interface Post {
|
|||
screenshot_path: string | null
|
||||
triggered_by: string
|
||||
posted_at: string
|
||||
// Engagement snapshot (null when no data has been collected yet)
|
||||
score: number | null
|
||||
upvotes: number | null
|
||||
comment_count: number | null
|
||||
awards: number | null
|
||||
engagement_checked_at: string | null
|
||||
}
|
||||
|
||||
export interface SubRules {
|
||||
|
|
@ -274,6 +280,9 @@ export const api = {
|
|||
|
||||
triggerSingle: (campaignId: number, sub: string) =>
|
||||
http.post<Post>('/posts/trigger', { campaign_id: campaignId, sub }).then(r => r.data),
|
||||
|
||||
pollEngagement: () =>
|
||||
http.post<{ polled: number; errors: number }>('/posts/poll-engagement').then(r => r.data),
|
||||
},
|
||||
|
||||
opportunities: {
|
||||
|
|
|
|||
Loading…
Reference in a new issue