feat(community): community API endpoints — browse, publish, fork, delete, RSS, AP

Adds GET /community/posts, GET /community/posts/{slug}, GET /community/feed.rss,
GET /community/local-feed, POST /community/posts, DELETE /community/posts/{slug},
POST /community/posts/{slug}/fork, and POST /community/posts/{slug}/fork-adapt (501 stub).
Wires init_community_store into main.py lifespan. 7 new tests; 115 total passing.
This commit is contained in:
pyr0ball 2026-04-13 11:14:18 -07:00
parent 81107ed238
commit 9c64de2acf
4 changed files with 367 additions and 2 deletions

View file

@ -0,0 +1,285 @@
# app/api/endpoints/community.py
# MIT License
from __future__ import annotations
import asyncio
import logging
import re
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from app.cloud_session import CloudUser, get_session
from app.core.config import settings
from app.db.store import Store
from app.services.community.feed import posts_to_rss
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/community", tags=["community"])
_community_store = None
def _get_community_store():
return _community_store
def init_community_store(community_db_url: str | None) -> None:
global _community_store
if not community_db_url:
logger.info(
"COMMUNITY_DB_URL not set — community write features disabled. "
"Browse still works via cloud feed."
)
return
from circuitforge_core.community import CommunityDB
from app.services.community.community_store import KiwiCommunityStore
db = CommunityDB(dsn=community_db_url)
db.run_migrations()
_community_store = KiwiCommunityStore(db)
logger.info("Community store initialized.")
@router.get("/posts")
async def list_posts(
post_type: str | None = None,
dietary_tags: str | None = None,
allergen_exclude: str | None = None,
page: int = 1,
page_size: int = 20,
):
store = _get_community_store()
if store is None:
return {"posts": [], "total": 0, "note": "Community DB not available on this instance."}
dietary = [t.strip() for t in dietary_tags.split(",")] if dietary_tags else None
allergen_ex = [t.strip() for t in allergen_exclude.split(",")] if allergen_exclude else None
offset = (page - 1) * min(page_size, 100)
posts = await asyncio.to_thread(
store.list_posts,
limit=min(page_size, 100),
offset=offset,
post_type=post_type,
dietary_tags=dietary,
allergen_exclude=allergen_ex,
)
return {"posts": [_post_to_dict(p) for p in posts], "page": page, "page_size": page_size}
@router.get("/posts/{slug}")
async def get_post(slug: str, request: Request):
store = _get_community_store()
if store is None:
raise HTTPException(status_code=503, detail="Community DB not available on this instance.")
post = await asyncio.to_thread(store.get_post_by_slug, slug)
if post is None:
raise HTTPException(status_code=404, detail="Post not found.")
accept = request.headers.get("accept", "")
if "application/activity+json" in accept:
from app.services.community.ap_compat import post_to_ap_json_ld
base_url = str(request.base_url).rstrip("/")
return post_to_ap_json_ld(_post_to_dict(post), base_url=base_url)
return _post_to_dict(post)
@router.get("/feed.rss")
async def get_rss_feed(request: Request):
store = _get_community_store()
posts_data: list[dict] = []
if store is not None:
posts = await asyncio.to_thread(store.list_posts, limit=50)
posts_data = [_post_to_dict(p) for p in posts]
base_url = str(request.base_url).rstrip("/")
rss = posts_to_rss(posts_data, base_url=base_url)
return Response(content=rss, media_type="application/rss+xml; charset=utf-8")
@router.get("/local-feed")
async def local_feed():
store = _get_community_store()
if store is None:
return []
posts = await asyncio.to_thread(store.list_posts, limit=50)
return [_post_to_dict(p) for p in posts]
@router.post("/posts", status_code=201)
async def publish_post(body: dict, session: CloudUser = Depends(get_session)):
from app.tiers import can_use
if not can_use("community_publish", session.tier, session.has_byok):
raise HTTPException(status_code=402, detail="Community publishing requires Paid tier.")
store = _get_community_store()
if store is None:
raise HTTPException(
status_code=503,
detail="This Kiwi instance is not connected to a community database. "
"Publishing is only available on cloud instances.",
)
from app.services.community.community_store import get_or_create_pseudonym
def _get_pseudonym():
s = Store(session.db)
try:
return get_or_create_pseudonym(
store=s,
directus_user_id=session.user_id,
requested_name=body.get("pseudonym_name"),
)
finally:
s.close()
pseudonym = await asyncio.to_thread(_get_pseudonym)
recipe_ids = [slot["recipe_id"] for slot in body.get("slots", []) if slot.get("recipe_id")]
from app.services.community.element_snapshot import compute_snapshot
def _snapshot():
s = Store(session.db)
try:
return compute_snapshot(recipe_ids=recipe_ids, store=s)
finally:
s.close()
snapshot = await asyncio.to_thread(_snapshot)
slug_title = re.sub(r"[^a-z0-9]+", "-", (body.get("title") or "plan").lower()).strip("-")
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
slug = f"kiwi-{_post_type_prefix(body.get('post_type', 'plan'))}-{pseudonym.lower().replace(' ', '')}-{today}-{slug_title}"[:120]
from circuitforge_core.community.models import CommunityPost
post = CommunityPost(
slug=slug,
pseudonym=pseudonym,
post_type=body.get("post_type", "plan"),
published=datetime.now(timezone.utc),
title=body.get("title", "Untitled"),
description=body.get("description"),
photo_url=body.get("photo_url"),
slots=body.get("slots", []),
recipe_id=body.get("recipe_id"),
recipe_name=body.get("recipe_name"),
level=body.get("level"),
outcome_notes=body.get("outcome_notes"),
seasoning_score=snapshot.seasoning_score,
richness_score=snapshot.richness_score,
brightness_score=snapshot.brightness_score,
depth_score=snapshot.depth_score,
aroma_score=snapshot.aroma_score,
structure_score=snapshot.structure_score,
texture_profile=snapshot.texture_profile,
dietary_tags=list(snapshot.dietary_tags),
allergen_flags=list(snapshot.allergen_flags),
flavor_molecules=list(snapshot.flavor_molecules),
fat_pct=snapshot.fat_pct,
protein_pct=snapshot.protein_pct,
moisture_pct=snapshot.moisture_pct,
)
inserted = await asyncio.to_thread(store.insert_post, post)
return _post_to_dict(inserted)
@router.delete("/posts/{slug}", status_code=204)
async def delete_post(slug: str, session: CloudUser = Depends(get_session)):
store = _get_community_store()
if store is None:
raise HTTPException(status_code=503, detail="Community DB not available.")
def _get_pseudonym():
s = Store(session.db)
try:
return s.get_current_pseudonym(session.user_id)
finally:
s.close()
pseudonym = await asyncio.to_thread(_get_pseudonym)
if not pseudonym:
raise HTTPException(status_code=400, detail="No pseudonym set. Cannot delete posts.")
deleted = await asyncio.to_thread(store.delete_post, slug=slug, pseudonym=pseudonym)
if not deleted:
raise HTTPException(status_code=404, detail="Post not found or you are not the author.")
@router.post("/posts/{slug}/fork", status_code=201)
async def fork_post(slug: str, session: CloudUser = Depends(get_session)):
store = _get_community_store()
if store is None:
raise HTTPException(status_code=503, detail="Community DB not available.")
post = await asyncio.to_thread(store.get_post_by_slug, slug)
if post is None:
raise HTTPException(status_code=404, detail="Post not found.")
if post.post_type != "plan":
raise HTTPException(status_code=400, detail="Only plan posts can be forked as a meal plan.")
from datetime import date
week_start = date.today().strftime("%Y-%m-%d")
def _create_plan():
s = Store(session.db)
try:
meal_types = list({slot["meal_type"] for slot in post.slots})
plan = s.create_meal_plan(week_start=week_start, meal_types=meal_types or ["dinner"])
for slot in post.slots:
s.assign_recipe_to_slot(
plan_id=plan["id"],
day_of_week=slot["day"],
meal_type=slot["meal_type"],
recipe_id=slot["recipe_id"],
)
return plan
finally:
s.close()
plan = await asyncio.to_thread(_create_plan)
return {"plan_id": plan["id"], "week_start": plan["week_start"], "forked_from": slug}
@router.post("/posts/{slug}/fork-adapt", status_code=201)
async def fork_adapt_post(slug: str, session: CloudUser = Depends(get_session)):
from app.tiers import can_use
if not can_use("community_fork_adapt", session.tier, session.has_byok):
raise HTTPException(status_code=402, detail="Fork with adaptation requires Paid tier or BYOK.")
# Stub: full LLM adaptation deferred
raise HTTPException(status_code=501, detail="Fork-adapt not yet implemented.")
def _post_to_dict(post) -> dict:
return {
"slug": post.slug,
"pseudonym": post.pseudonym,
"post_type": post.post_type,
"published": post.published.isoformat() if hasattr(post.published, "isoformat") else str(post.published),
"title": post.title,
"description": post.description,
"photo_url": post.photo_url,
"slots": list(post.slots),
"recipe_id": post.recipe_id,
"recipe_name": post.recipe_name,
"level": post.level,
"outcome_notes": post.outcome_notes,
"element_profiles": {
"seasoning_score": post.seasoning_score,
"richness_score": post.richness_score,
"brightness_score": post.brightness_score,
"depth_score": post.depth_score,
"aroma_score": post.aroma_score,
"structure_score": post.structure_score,
"texture_profile": post.texture_profile,
},
"dietary_tags": list(post.dietary_tags),
"allergen_flags": list(post.allergen_flags),
"flavor_molecules": list(post.flavor_molecules),
"fat_pct": post.fat_pct,
"protein_pct": post.protein_pct,
"moisture_pct": post.moisture_pct,
}
def _post_type_prefix(post_type: str) -> str:
return {"plan": "plan", "recipe_success": "success", "recipe_blooper": "blooper"}.get(post_type, "post")

View file

@ -1,5 +1,5 @@
from fastapi import APIRouter
from app.api.endpoints import health, receipts, export, inventory, ocr, recipes, settings, staples, feedback, household, saved_recipes
from app.api.endpoints import health, receipts, export, inventory, ocr, recipes, settings, staples, feedback, household, saved_recipes, imitate
api_router = APIRouter()
@ -14,3 +14,7 @@ api_router.include_router(staples.router, prefix="/staples", tags=["staples
api_router.include_router(feedback.router, prefix="/feedback", tags=["feedback"])
api_router.include_router(household.router, prefix="/household", tags=["household"])
api_router.include_router(saved_recipes.router, prefix="/recipes/saved", tags=["saved-recipes"])
api_router.include_router(imitate.router, prefix="/imitate", tags=["imitate"])
from app.api.endpoints.community import router as community_router
api_router.include_router(community_router)

View file

@ -23,6 +23,10 @@ async def lifespan(app: FastAPI):
get_scheduler(settings.DB_PATH)
logger.info("Task scheduler started.")
# Initialize community store (no-op if COMMUNITY_DB_URL is not set)
from app.api.endpoints.community import init_community_store
init_community_store(settings.COMMUNITY_DB_URL)
yield
# Graceful scheduler shutdown

View file

@ -0,0 +1,72 @@
# tests/api/test_community_endpoints.py
import pytest
from unittest.mock import patch, MagicMock
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
def test_get_community_posts_no_db_returns_empty():
"""When COMMUNITY_DB_URL is not set, GET /community/posts returns empty list (no 500)."""
with patch("app.api.endpoints.community._community_store", None):
response = client.get("/api/v1/community/posts")
assert response.status_code == 200
data = response.json()
assert "posts" in data
assert isinstance(data["posts"], list)
def test_get_community_post_not_found():
"""GET /community/posts/{slug} returns 404 when slug doesn't exist."""
mock_store = MagicMock()
mock_store.get_post_by_slug.return_value = None
with patch("app.api.endpoints.community._community_store", mock_store):
response = client.get("/api/v1/community/posts/nonexistent-slug")
assert response.status_code == 404
def test_get_community_rss():
"""GET /community/feed.rss returns XML with content-type application/rss+xml."""
mock_store = MagicMock()
mock_store.list_posts.return_value = []
with patch("app.api.endpoints.community._community_store", mock_store):
response = client.get("/api/v1/community/feed.rss")
assert response.status_code == 200
assert "xml" in response.headers.get("content-type", "")
def test_post_community_requires_auth():
"""POST /community/posts requires authentication (401/403/422) or community store (503).
In local/dev mode get_session bypasses JWT auth and returns a privileged user,
so the next gate is the community store check (503 when COMMUNITY_DB_URL is not set).
In cloud mode the endpoint requires a valid session (401/403).
"""
response = client.post("/api/v1/community/posts", json={"title": "Test"})
assert response.status_code in (401, 403, 422, 503)
def test_delete_post_requires_auth():
"""DELETE /community/posts/{slug} requires authentication (401/403) or community store (503).
Same local-mode caveat as test_post_community_requires_auth.
"""
response = client.delete("/api/v1/community/posts/some-slug")
assert response.status_code in (401, 403, 422, 503)
def test_fork_post_route_exists():
"""POST /community/posts/{slug}/fork route exists (not 404)."""
response = client.post("/api/v1/community/posts/some-slug/fork")
assert response.status_code != 404
def test_local_feed_returns_json():
"""GET /community/local-feed returns JSON list for LAN peers."""
mock_store = MagicMock()
mock_store.list_posts.return_value = []
with patch("app.api.endpoints.community._community_store", mock_store):
response = client.get("/api/v1/community/local-feed")
assert response.status_code == 200
assert isinstance(response.json(), list)