magpie/app/api/endpoints/stats.py
Alan Weinstock a863960266 feat: structured logging, frontend error toasts, stats bar (#14 #15 #16)
#14 — Structured logging
- app/core/logging_config.py: configure_logging() sets stdout handler
  with timestamped format; called at import time in main.py
- Global FastAPI exception_handler logs 500s with full traceback
- opportunities.py: logger added; create/approve/mark-posted/dismiss
  each emit an info line so failures are traceable

#15 — Frontend error handling
- frontend/src/composables/useToast.ts: shared toast composable
  (error/success/info, auto-dismiss, module-level singleton)
- frontend/src/components/ToastList.vue: fixed-position overlay,
  theme-aware, accessible (role=alert, aria-live=polite)
- OpportunitiesView: all 6 async actions have catch + toast.error()
- CampaignDetail: onMounted + all 6 mutation functions wrapped

#16 — Aggregate stats
- app/api/endpoints/stats.py: GET /api/v1/stats — single DB pass
  via GROUP BY; returns posts totals, 7-day count, top communities,
  platform breakdown, and opportunity queue counts
- frontend/src/components/StatsBar.vue: slim header bar above
  router-view; chips for posts ok/failed/week, queue pending/approved/
  posted, top community; hides gracefully on API error
2026-05-25 15:02:15 -07:00

87 lines
2.9 KiB
Python

"""
Aggregate stats endpoint — counts across posts and opportunities.
Returns a single payload with everything the StatsBar needs.
"""
from __future__ import annotations
import asyncio
import logging
from fastapi import APIRouter
from app.core.config import get_settings
from app.db.store import Store
router = APIRouter(prefix="/stats", tags=["stats"])
logger = logging.getLogger(__name__)
def _fetch_stats(db_path: str) -> dict:
store = Store(db_path)
try:
# --- Post counts ---
# All-time by status
post_rows = store._fetchall(
"SELECT status, COUNT(*) AS n FROM posts GROUP BY status"
)
posts_by_status: dict[str, int] = {r["status"]: r["n"] for r in post_rows}
# Past 7 days total (any status)
posts_7d = store._fetchone(
"SELECT COUNT(*) AS n FROM posts WHERE posted_at >= datetime('now', '-7 days')"
)
# Top 5 communities by successful post count
top_communities = store._fetchall(
"""
SELECT target, COUNT(*) AS n
FROM posts
WHERE status = 'success'
GROUP BY target
ORDER BY n DESC
LIMIT 5
"""
)
# Platform breakdown (success only)
platform_rows = store._fetchall(
"SELECT platform, COUNT(*) AS n FROM posts WHERE status = 'success' GROUP BY platform"
)
# --- Opportunity counts ---
opp_rows = store._fetchall(
"SELECT status, COUNT(*) AS n FROM opportunities GROUP BY status"
)
opps_by_status: dict[str, int] = {r["status"]: r["n"] for r in opp_rows}
return {
"posts": {
"total": sum(posts_by_status.values()),
"success": posts_by_status.get("success", 0),
"failed": posts_by_status.get("failed", 0),
"skipped": posts_by_status.get("skipped", 0),
"last_7_days": (posts_7d or {}).get("n", 0),
"by_platform": {r["platform"]: r["n"] for r in platform_rows},
"top_communities": [
{"community": r["target"], "count": r["n"]} for r in top_communities
],
},
"opportunities": {
"total": sum(opps_by_status.values()),
"pending_review": opps_by_status.get("pending_review", 0),
"approved": opps_by_status.get("approved", 0),
"posted": opps_by_status.get("posted", 0),
"manual_posted": opps_by_status.get("manual_posted", 0),
"dismissed": opps_by_status.get("dismissed", 0),
},
}
finally:
store.close()
@router.get("")
async def get_stats() -> dict:
"""Aggregate stats across posts and opportunities."""
db_path = get_settings().db_path
logger.debug("Fetching aggregate stats")
return await asyncio.to_thread(_fetch_stats, db_path)