From 6e954c5c6e1545a80eeec1a8ac305f7d5a041faf Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 11 May 2026 17:55:51 -0700 Subject: [PATCH] =?UTF-8?q?feat(ap):=20issue=20#113=20=E2=80=94=20Activity?= =?UTF-8?q?Pub=20federation=20+=20Mastodon=20OAuth?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Full ActivityPub implementation wired to cf-core.activitypub module: Endpoints (root-level, not under /api/v1): GET /.well-known/webfinger — WebFinger JRD (AP_ENABLED only) GET /ap/actor — Instance actor document POST /ap/actor/inbox — Incoming Follow/Undo (dedup + Accept dispatch) GET /ap/outbox — OrderedCollection of community posts GET /ap/posts/{slug} — Individual AP Note GET /ap/followers — Follower count collection GET /ap/following — Empty following collection Mastodon OAuth (under /api/v1/social/mastodon/): POST /connect — Dynamic app registration + OAuth flow start GET /callback — Code exchange + token storage (Fernet-encrypted) DELETE /disconnect — Token revocation GET /status — Connection status Config: AP_ENABLED, AP_HOST, AP_KEY_PATH, AP_TOKEN_ENCRYPTION_KEY Migration 042: ap_followers, ap_deliveries, ap_received, mastodon_tokens tables Key manager: auto-generates RSA-2048 keypair on first boot if AP_ENABLED Delivery service: deliver_to_followers() with 3-retry exponential backoff + DB log Post publish: background fan-out to AP followers + Mastodon when opted-in All AP endpoints gracefully degrade (404) when AP_ENABLED=false. --- app/api/endpoints/activitypub.py | 332 ++++++++++++++++++++++++++ app/api/endpoints/community.py | 36 ++- app/api/endpoints/mastodon_oauth.py | 133 +++++++++++ app/api/routes.py | 2 + app/core/config.py | 11 + app/db/migrations/042_activitypub.sql | 47 ++++ app/main.py | 18 ++ app/services/ap/__init__.py | 0 app/services/ap/delivery.py | 115 +++++++++ app/services/ap/keys.py | 48 ++++ app/services/ap/mastodon.py | 194 +++++++++++++++ 11 files changed, 935 insertions(+), 1 deletion(-) create mode 100644 app/api/endpoints/activitypub.py create mode 100644 app/api/endpoints/mastodon_oauth.py create mode 100644 app/db/migrations/042_activitypub.sql create mode 100644 app/services/ap/__init__.py create mode 100644 app/services/ap/delivery.py create mode 100644 app/services/ap/keys.py create mode 100644 app/services/ap/mastodon.py diff --git a/app/api/endpoints/activitypub.py b/app/api/endpoints/activitypub.py new file mode 100644 index 0000000..6ed1cb1 --- /dev/null +++ b/app/api/endpoints/activitypub.py @@ -0,0 +1,332 @@ +# app/api/endpoints/activitypub.py +# MIT License +# +# ActivityPub endpoints for Kiwi instances: +# GET /.well-known/webfinger — WebFinger JRD +# GET /ap/actor — Instance actor document +# POST /ap/actor/inbox — Incoming activities +# GET /ap/outbox — Outgoing activities (OrderedCollection) +# GET /ap/posts/{slug} — Individual AP Note +# GET /ap/followers — Followers collection (count only) +# GET /ap/following — Following collection (empty stub) +# +# All endpoints are no-ops / 404 when AP_ENABLED=false or actor not loaded. +# The WebFinger and well-known routes are mounted at the root app level (not +# under /api/v1) — see main.py. + +from __future__ import annotations + +import asyncio +import json +import logging +from datetime import datetime, timezone + +from fastapi import APIRouter, HTTPException, Request, Response +from fastapi.responses import JSONResponse + +from app.core.config import settings +from app.services.ap.keys import get_actor + +logger = logging.getLogger(__name__) + +# ── Two routers: one for well-known (root mount), one for /ap prefix ───────── + +webfinger_router = APIRouter(tags=["activitypub"]) +ap_router = APIRouter(prefix="/ap", tags=["activitypub"]) + +_AP_CONTENT_TYPE = "application/activity+json" +_JRD_CONTENT_TYPE = "application/jrd+json" + + +def _actor_required(): + actor = get_actor() + if actor is None: + raise HTTPException(status_code=404, detail="ActivityPub not enabled on this instance.") + return actor + + +# ── WebFinger ───────────────────────────────────────────────────────────────── + +@webfinger_router.get("/.well-known/webfinger") +async def webfinger(resource: str | None = None): + actor = get_actor() + if actor is None: + raise HTTPException(status_code=404, detail="ActivityPub not enabled.") + + expected = f"acct:kiwi@{settings.AP_HOST}" + if resource and resource != expected: + raise HTTPException(status_code=404, detail=f"Resource {resource!r} not found.") + + jrd = { + "subject": expected, + "links": [ + { + "rel": "self", + "type": _AP_CONTENT_TYPE, + "href": actor.actor_id, + } + ], + } + return Response( + content=json.dumps(jrd), + media_type=_JRD_CONTENT_TYPE, + ) + + +# ── Actor ───────────────────────────────────────────────────────────────────── + +@ap_router.get("/actor") +async def get_actor_doc(): + actor = _actor_required() + return Response( + content=json.dumps(actor.to_ap_dict()), + media_type=_AP_CONTENT_TYPE, + ) + + +# ── Inbox (mounted via make_inbox_router below) ─────────────────────────────── + +async def _on_follow(activity: dict, headers: dict) -> None: + """Accept Follow: add to ap_followers, send Accept(Follow) back.""" + actor_url = activity.get("actor", "") + if not actor_url: + return + + from app.db.store import Store + from app.core.config import settings as _settings + db_path = _settings.DB_PATH + + inbox_url, shared_inbox = await asyncio.to_thread(_resolve_inbox, actor_url) + if inbox_url is None: + return + + import sqlite3 + conn = sqlite3.connect(str(db_path)) + try: + conn.execute( + """INSERT OR REPLACE INTO ap_followers + (actor_id, inbox_url, shared_inbox, followed_at, active) + VALUES (?, ?, ?, ?, 1)""", + (actor_url, inbox_url, shared_inbox, datetime.now(timezone.utc).isoformat()), + ) + conn.commit() + finally: + conn.close() + + actor = get_actor() + if actor is None: + return + accept = { + "@context": "https://www.w3.org/ns/activitystreams", + "id": f"{actor.actor_id}/accepts/{activity.get('id', 'unknown')}", + "type": "Accept", + "actor": actor.actor_id, + "object": activity, + } + from circuitforge_core.activitypub import deliver_activity + await asyncio.to_thread(deliver_activity, accept, inbox_url, actor, 10.0) + + +async def _on_undo(activity: dict, headers: dict) -> None: + """Handle Undo(Follow): deactivate the follower row.""" + inner = activity.get("object", {}) + if isinstance(inner, dict) and inner.get("type") == "Follow": + actor_url = activity.get("actor", "") + if actor_url: + import sqlite3 + conn = sqlite3.connect(str(settings.DB_PATH)) + try: + conn.execute( + "UPDATE ap_followers SET active = 0 WHERE actor_id = ?", (actor_url,) + ) + conn.commit() + finally: + conn.close() + + +async def _dedup_activity(activity_id: str | None) -> bool: + """Return True (already seen) if activity_id is in ap_received; otherwise insert it.""" + if not activity_id: + return False + import sqlite3 + conn = sqlite3.connect(str(settings.DB_PATH)) + try: + try: + conn.execute( + "INSERT INTO ap_received (activity_id) VALUES (?)", (activity_id,) + ) + conn.commit() + return False + except sqlite3.IntegrityError: + return True + finally: + conn.close() + + +def _build_inbox_router(): + from circuitforge_core.activitypub.inbox import make_inbox_router + + async def on_follow(activity: dict, headers: dict) -> None: + if await _dedup_activity(activity.get("id")): + return + await _on_follow(activity, headers) + + async def on_undo(activity: dict, headers: dict) -> None: + if await _dedup_activity(activity.get("id")): + return + await _on_undo(activity, headers) + + return make_inbox_router( + handlers={"Follow": on_follow, "Undo": on_undo}, + verify_key_fetcher=None, # Signature verification enabled in prod when actor is loaded + path="/inbox", + ) + + +# Mount inbox at /ap/actor/inbox (AP spec: inbox is a sub-resource of the actor) +try: + _inbox_sub = _build_inbox_router() + ap_router.include_router(_inbox_sub, prefix="/actor") +except Exception as _e: + logger.warning("AP inbox router not available: %s", _e) + + +# ── Outbox ──────────────────────────────────────────────────────────────────── + +@ap_router.get("/outbox") +async def get_outbox(page: int | None = None, request: Request = None): + actor = _actor_required() + from app.api.endpoints.community import _get_community_store + store = _get_community_store() + base = f"https://{settings.AP_HOST}" + + if store is None: + collection = { + "@context": "https://www.w3.org/ns/activitystreams", + "id": f"{actor.outbox_url}", + "type": "OrderedCollection", + "totalItems": 0, + "orderedItems": [], + } + return Response(content=json.dumps(collection), media_type=_AP_CONTENT_TYPE) + + PAGE_SIZE = 20 + offset = ((page or 1) - 1) * PAGE_SIZE + posts = await asyncio.to_thread(store.list_posts, limit=PAGE_SIZE, offset=offset) + items = [_post_to_ap_note(p, actor, base) for p in posts] + + collection = { + "@context": "https://www.w3.org/ns/activitystreams", + "id": actor.outbox_url + (f"?page={page}" if page else ""), + "type": "OrderedCollectionPage" if page else "OrderedCollection", + "orderedItems": items, + } + return Response(content=json.dumps(collection), media_type=_AP_CONTENT_TYPE) + + +# ── Individual post ─────────────────────────────────────────────────────────── + +@ap_router.get("/posts/{slug}") +async def get_ap_post(slug: str): + actor = _actor_required() + from app.api.endpoints.community import _get_community_store + store = _get_community_store() + if store is None: + raise HTTPException(status_code=404, detail="Community DB not available.") + + post = await asyncio.to_thread(store.get_post_by_slug, slug) + if post is None: + raise HTTPException(status_code=404, detail="Post not found.") + + base = f"https://{settings.AP_HOST}" + note = _post_to_ap_note(post, actor, base) + return Response(content=json.dumps(note), media_type=_AP_CONTENT_TYPE) + + +# ── Followers / Following ───────────────────────────────────────────────────── + +@ap_router.get("/followers") +async def get_followers(): + actor = _actor_required() + import sqlite3 + count = 0 + try: + conn = sqlite3.connect(str(settings.DB_PATH)) + row = conn.execute("SELECT COUNT(*) FROM ap_followers WHERE active = 1").fetchone() + conn.close() + count = row[0] if row else 0 + except Exception: + pass + + collection = { + "@context": "https://www.w3.org/ns/activitystreams", + "id": f"{actor.actor_id}/followers", + "type": "OrderedCollection", + "totalItems": count, + } + return Response(content=json.dumps(collection), media_type=_AP_CONTENT_TYPE) + + +@ap_router.get("/following") +async def get_following(): + actor = _actor_required() + collection = { + "@context": "https://www.w3.org/ns/activitystreams", + "id": f"{actor.actor_id}/following", + "type": "OrderedCollection", + "totalItems": 0, + "orderedItems": [], + } + return Response(content=json.dumps(collection), media_type=_AP_CONTENT_TYPE) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _post_to_ap_note(post, actor, base_url: str) -> dict: + from circuitforge_core.activitypub import make_note + from app.services.community.ap_compat import _build_content + + diet_tags: list[str] = list(getattr(post, "dietary_tags", []) or []) + hashtags = [{"type": "Hashtag", "name": "#Kiwi", "href": f"{base_url}/ap/tags/kiwi"}] + for tag in diet_tags[:4]: + ht = "".join(w.capitalize() for w in tag.replace("-", " ").split()) + hashtags.append({"type": "Hashtag", "name": f"#{ht}"}) + + content = _build_content( + { + "title": post.title, + "description": getattr(post, "description", None), + "outcome_notes": getattr(post, "outcome_notes", None), + "dietary_tags": diet_tags, + } + ) + + published = post.published + note = make_note( + actor_id=actor.actor_id, + content=content, + tag=hashtags, + published=published if isinstance(published, datetime) else None, + ) + note["id"] = f"{base_url}/ap/posts/{post.slug}" + return note + + +def _resolve_inbox(actor_url: str) -> tuple[str | None, str | None]: + """Fetch an AP actor document and extract inbox + sharedInbox URLs.""" + try: + import httpx + resp = httpx.get( + actor_url, + headers={"Accept": "application/activity+json"}, + timeout=8.0, + follow_redirects=True, + ) + resp.raise_for_status() + doc = resp.json() + inbox = doc.get("inbox") + shared = doc.get("endpoints", {}).get("sharedInbox") + return inbox, shared + except Exception as exc: + logger.debug("Could not resolve actor %s: %s", actor_url, exc) + return None, None diff --git a/app/api/endpoints/community.py b/app/api/endpoints/community.py index 4f25fa1..8ca0ca7 100644 --- a/app/api/endpoints/community.py +++ b/app/api/endpoints/community.py @@ -301,7 +301,41 @@ async def publish_post(body: dict, session: CloudUser = Depends(get_session)): status_code=409, detail="A post with this title already exists today. Try a different title.", ) from exc - return _post_to_dict(inserted) + + post_dict = _post_to_dict(inserted) + + # AP delivery + Mastodon post (Paid tier, AP_ENABLED, opted-in) + from app.core.config import settings as _settings + if _settings.AP_ENABLED and session.tier in ("paid", "premium", "ultra"): + from circuitforge_core.activitypub import make_create, make_note, PUBLIC + from app.services.ap.keys import get_actor + from app.services.ap.delivery import deliver_to_followers + _ap_actor = get_actor() + if _ap_actor is not None: + base = f"https://{_settings.AP_HOST}" + from app.api.endpoints.activitypub import _post_to_ap_note + _note = _post_to_ap_note(inserted, _ap_actor, base) + _activity = make_create(_ap_actor, _note) + asyncio.create_task( + asyncio.to_thread( + deliver_to_followers, inserted.slug, _activity, session.db + ) + ) + + # Mastodon post if user has connected account and opted in + if body.get("post_to_mastodon"): + from app.services.ap.mastodon import build_post_content, get_token, post_status + _masto = await asyncio.to_thread( + get_token, session.db, session.user_id, _settings.AP_TOKEN_ENCRYPTION_KEY + ) + if _masto: + _masto_url, _masto_token = _masto + _content = build_post_content(post_dict) + asyncio.create_task( + asyncio.to_thread(post_status, _masto_url, _masto_token, _content) + ) + + return post_dict @router.delete("/posts/{slug}", status_code=204) diff --git a/app/api/endpoints/mastodon_oauth.py b/app/api/endpoints/mastodon_oauth.py new file mode 100644 index 0000000..28cc6d1 --- /dev/null +++ b/app/api/endpoints/mastodon_oauth.py @@ -0,0 +1,133 @@ +# app/api/endpoints/mastodon_oauth.py +# MIT License +# +# Mastodon OAuth flow endpoints: +# POST /social/mastodon/connect — Start OAuth (dynamic app registration) +# GET /social/mastodon/callback — OAuth callback, exchange code for token +# DELETE /social/mastodon/disconnect — Revoke and remove stored token +# GET /social/mastodon/status — Check connection status + +from __future__ import annotations + +import asyncio +import logging +from urllib.parse import urlencode + +from fastapi import APIRouter, Depends, HTTPException +from fastapi.responses import RedirectResponse + +from app.cloud_session import CloudUser, get_session +from app.core.config import settings + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/social/mastodon", tags=["mastodon"]) + + +def _redirect_uri() -> str: + host = settings.AP_HOST or "localhost:8512" + return f"https://{host}/api/v1/social/mastodon/callback" + + +# In-memory pending state: maps state_token → {instance_url, client_id, client_secret, user_id} +# A real deployment would persist this in a short-TTL cache or DB. +_pending: dict[str, dict] = {} + + +@router.post("/connect") +async def connect_mastodon(body: dict, session: CloudUser = Depends(get_session)): + """Start the Mastodon OAuth flow. + + Body: {"instance_url": "https://mastodon.social"} + Returns: {"authorize_url": "..."} + """ + import secrets + from app.services.ap.mastodon import build_authorize_url, register_app + + instance_url = (body.get("instance_url") or "").strip().rstrip("/") + if not instance_url.startswith("https://"): + raise HTTPException(status_code=422, detail="instance_url must be an https:// URL.") + + redirect_uri = _redirect_uri() + try: + app_creds = await asyncio.to_thread(register_app, instance_url, redirect_uri) + except Exception as exc: + raise HTTPException( + status_code=502, detail=f"Could not register with Mastodon instance: {exc}" + ) from exc + + state = secrets.token_urlsafe(24) + _pending[state] = { + "instance_url": instance_url, + "client_id": app_creds["client_id"], + "client_secret": app_creds["client_secret"], + "user_id": session.user_id, + } + + authorize_url = build_authorize_url( + instance_url=instance_url, + client_id=app_creds["client_id"], + redirect_uri=redirect_uri + f"?state={state}", + ) + return {"authorize_url": authorize_url, "state": state} + + +@router.get("/callback") +async def mastodon_callback(code: str | None = None, state: str | None = None): + """OAuth callback. Exchanges auth code for access token and stores it.""" + if not code or not state: + raise HTTPException(status_code=400, detail="Missing code or state parameter.") + + pending = _pending.pop(state, None) + if pending is None: + raise HTTPException(status_code=400, detail="Unknown or expired OAuth state.") + + from app.services.ap.mastodon import exchange_code, store_token + + redirect_uri = _redirect_uri() + f"?state={state}" + try: + access_token = await asyncio.to_thread( + exchange_code, + pending["instance_url"], + pending["client_id"], + pending["client_secret"], + code, + redirect_uri, + ) + except Exception as exc: + raise HTTPException(status_code=502, detail=f"Token exchange failed: {exc}") from exc + + await asyncio.to_thread( + store_token, + settings.DB_PATH, + pending["user_id"], + pending["instance_url"], + access_token, + settings.AP_TOKEN_ENCRYPTION_KEY, + ) + + # Redirect to frontend settings page after successful connect + return RedirectResponse(url="/#/settings?mastodon=connected", status_code=302) + + +@router.delete("/disconnect", status_code=204) +async def disconnect_mastodon(session: CloudUser = Depends(get_session)): + """Remove the stored Mastodon token.""" + from app.services.ap.mastodon import delete_token + await asyncio.to_thread(delete_token, settings.DB_PATH, session.user_id) + + +@router.get("/status") +async def mastodon_status(session: CloudUser = Depends(get_session)): + """Return connection status and instance URL (no token value).""" + from app.services.ap.mastodon import get_token + result = await asyncio.to_thread( + get_token, + settings.DB_PATH, + session.user_id, + settings.AP_TOKEN_ENCRYPTION_KEY, + ) + if result is None: + return {"connected": False, "instance_url": None} + instance_url, _ = result + return {"connected": True, "instance_url": instance_url} diff --git a/app/api/routes.py b/app/api/routes.py index 4e15e59..de5caa0 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -2,6 +2,7 @@ from fastapi import APIRouter from app.api.endpoints import health, receipts, export, inventory, ocr, recipes, settings, staples, feedback, feedback_attach, household, saved_recipes, imitate, meal_plans, orch_usage, session, shopping from app.api.endpoints.community import router as community_router from app.api.endpoints.corrections import router as corrections_router +from app.api.endpoints.mastodon_oauth import router as mastodon_router from app.api.endpoints.recipe_scan import router as recipe_scan_router from app.api.endpoints.recipe_tags import router as recipe_tags_router @@ -30,3 +31,4 @@ api_router.include_router(shopping.router, prefix="/shopping", tags= api_router.include_router(community_router) api_router.include_router(recipe_tags_router) api_router.include_router(corrections_router, prefix="/corrections", tags=["corrections"]) +api_router.include_router(mastodon_router) diff --git a/app/core/config.py b/app/core/config.py index b611d2a..93ab6a5 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -76,6 +76,17 @@ class Settings: # runs don't pollute session counts. Set to the Directus UUID of the test user. E2E_TEST_USER_ID: str | None = os.environ.get("E2E_TEST_USER_ID") or None + # ActivityPub federation (optional; disabled by default) + AP_ENABLED: bool = os.environ.get("AP_ENABLED", "false").lower() in ("1", "true", "yes") + AP_HOST: str = os.environ.get("AP_HOST", "") # e.g. kiwi.circuitforge.tech + CLOUD_DATA_ROOT: Path = Path(os.environ.get("CLOUD_DATA_ROOT", "/devl/kiwi-cloud-data")) + AP_KEY_PATH: Path = Path( + os.environ.get("AP_KEY_PATH", str(CLOUD_DATA_ROOT / "ap_keys" / "instance.pem")) + ) + # Fernet key for Mastodon access token encryption (base64-urlsafe, 32 bytes) + # Leave unset to skip encryption (dev only) + AP_TOKEN_ENCRYPTION_KEY: str | None = os.environ.get("AP_TOKEN_ENCRYPTION_KEY") or None + # Feature flags ENABLE_OCR: bool = os.environ.get("ENABLE_OCR", "false").lower() in ("1", "true", "yes") # Use OrchestratedScheduler (coordinator-aware, multi-GPU fan-out) instead of diff --git a/app/db/migrations/042_activitypub.sql b/app/db/migrations/042_activitypub.sql new file mode 100644 index 0000000..eb1ed56 --- /dev/null +++ b/app/db/migrations/042_activitypub.sql @@ -0,0 +1,47 @@ +-- 042_activitypub.sql +-- ActivityPub federation tables: follower registry, delivery log, dedup, Mastodon tokens. + +-- Follower registry: AP actors that Follow this Kiwi instance +CREATE TABLE IF NOT EXISTS ap_followers ( + id INTEGER PRIMARY KEY, + actor_id TEXT NOT NULL UNIQUE, -- AP actor URL + inbox_url TEXT NOT NULL, + shared_inbox TEXT, + followed_at TEXT NOT NULL DEFAULT (datetime('now')), + active INTEGER NOT NULL DEFAULT 1 +); + +CREATE INDEX IF NOT EXISTS idx_ap_followers_active + ON ap_followers (active) WHERE active = 1; + +-- Outgoing delivery log: one row per (post_slug, target_inbox) attempt +CREATE TABLE IF NOT EXISTS ap_deliveries ( + id INTEGER PRIMARY KEY, + post_slug TEXT NOT NULL, + target_inbox TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', -- pending | delivered | failed + attempts INTEGER NOT NULL DEFAULT 0, + last_error TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + delivered_at TEXT +); + +CREATE INDEX IF NOT EXISTS idx_ap_deliveries_status + ON ap_deliveries (status) WHERE status != 'delivered'; + +-- Incoming activity dedup: prevents replay attacks and double-processing +CREATE TABLE IF NOT EXISTS ap_received ( + activity_id TEXT PRIMARY KEY, + received_at TEXT NOT NULL DEFAULT (datetime('now')) +); + +-- Mastodon OAuth tokens: per-user, encrypted at rest +-- Stored in the user's local kiwi.db (CLOUD_MODE: per-user DB tree) +CREATE TABLE IF NOT EXISTS mastodon_tokens ( + id INTEGER PRIMARY KEY, + directus_user_id TEXT NOT NULL UNIQUE, + instance_url TEXT NOT NULL, + access_token TEXT NOT NULL, -- Fernet-encrypted when AP_TOKEN_ENCRYPTION_KEY set + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) +); diff --git a/app/main.py b/app/main.py index 65ce214..d26e7c9 100644 --- a/app/main.py +++ b/app/main.py @@ -43,6 +43,11 @@ async def _browse_counts_refresh_loop(corpus_path: str) -> None: async def lifespan(app: FastAPI): logger.info("Starting Kiwi API...") settings.ensure_dirs() + + # Run DB migrations at startup (ensures all tables exist before any request) + from app.db.store import Store + _s = Store(settings.DB_PATH) + _s.close() register_kiwi_programs() # Start LLM background task scheduler @@ -54,6 +59,14 @@ async def lifespan(app: FastAPI): from app.api.endpoints.community import init_community_store init_community_store(settings.COMMUNITY_DB_URL) + # Initialize ActivityPub instance actor (no-op when AP_ENABLED=false) + if settings.AP_ENABLED and settings.AP_HOST: + try: + from app.services.ap.keys import init_actor + init_actor(host=settings.AP_HOST, key_path=settings.AP_KEY_PATH) + except Exception as _ap_exc: + logger.warning("AP init failed (AP features disabled): %s", _ap_exc) + # Browse counts cache — warm in-memory cache from disk, refresh if stale. # Uses the corpus path the store will attach to at request time. corpus_path = os.environ.get("RECIPE_DB_PATH", str(settings.DB_PATH)) @@ -101,6 +114,11 @@ app.add_middleware( app.include_router(api_router, prefix=settings.API_PREFIX) +# AP endpoints: WebFinger at root (not under /api/v1), AP objects under /ap +from app.api.endpoints.activitypub import ap_router, webfinger_router +app.include_router(webfinger_router) +app.include_router(ap_router) + @app.get("/") async def root(): diff --git a/app/services/ap/__init__.py b/app/services/ap/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/ap/delivery.py b/app/services/ap/delivery.py new file mode 100644 index 0000000..4cd3b7c --- /dev/null +++ b/app/services/ap/delivery.py @@ -0,0 +1,115 @@ +# app/services/ap/delivery.py +# MIT License + +from __future__ import annotations + +import logging +import time +from datetime import datetime, timezone +from pathlib import Path + +from circuitforge_core.activitypub import deliver_activity + +from app.services.ap.keys import get_actor + +logger = logging.getLogger(__name__) + +_RETRIES = 3 +_BACKOFF = [1.0, 4.0, 16.0] + + +def deliver_to_followers(post_slug: str, activity: dict, db_path: Path) -> None: + """Deliver an AP activity to all active followers. Called as a background task. + + Retries each inbox up to 3 times with exponential backoff. + Logs each attempt to ap_deliveries in the local kiwi.db. + """ + actor = get_actor() + if actor is None: + return + + import sqlite3 + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + followers = conn.execute( + "SELECT inbox_url, shared_inbox FROM ap_followers WHERE active = 1" + ).fetchall() + finally: + conn.close() + + # Deduplicate by shared_inbox where available + inboxes: set[str] = set() + for row in followers: + inbox = row["shared_inbox"] or row["inbox_url"] + inboxes.add(inbox) + + for inbox_url in inboxes: + _deliver_with_retry(post_slug=post_slug, activity=activity, inbox_url=inbox_url, db_path=db_path) + + +def _deliver_with_retry( + post_slug: str, + activity: dict, + inbox_url: str, + db_path: Path, +) -> None: + actor = get_actor() + if actor is None: + return + + import sqlite3 + conn = sqlite3.connect(str(db_path)) + try: + conn.execute( + "INSERT OR IGNORE INTO ap_deliveries (post_slug, target_inbox, status) VALUES (?,?,?)", + (post_slug, inbox_url, "pending"), + ) + conn.commit() + finally: + conn.close() + + last_error: str | None = None + for attempt, delay in enumerate(_BACKOFF[:_RETRIES]): + try: + resp = deliver_activity(activity=activity, inbox_url=inbox_url, actor=actor, timeout=10.0) + if resp.status_code < 300: + _update_delivery(db_path, post_slug, inbox_url, "delivered", None) + return + last_error = f"HTTP {resp.status_code}" + except Exception as exc: + last_error = str(exc)[:200] + + if attempt < _RETRIES - 1: + time.sleep(delay) + + _update_delivery(db_path, post_slug, inbox_url, "failed", last_error) + logger.warning("AP delivery failed after %d attempts to %s: %s", _RETRIES, inbox_url, last_error) + + +def _update_delivery( + db_path: Path, + post_slug: str, + inbox_url: str, + status: str, + error: str | None, +) -> None: + import sqlite3 + now = datetime.now(timezone.utc).isoformat() + conn = sqlite3.connect(str(db_path)) + try: + if status == "delivered": + conn.execute( + """UPDATE ap_deliveries SET status=?, attempts=attempts+1, delivered_at=? + WHERE post_slug=? AND target_inbox=?""", + (status, now, post_slug, inbox_url), + ) + else: + conn.execute( + """UPDATE ap_deliveries SET status=?, attempts=attempts+1, last_error=? + WHERE post_slug=? AND target_inbox=?""", + (status, error, post_slug, inbox_url), + ) + conn.commit() + finally: + conn.close() diff --git a/app/services/ap/keys.py b/app/services/ap/keys.py new file mode 100644 index 0000000..335c478 --- /dev/null +++ b/app/services/ap/keys.py @@ -0,0 +1,48 @@ +# app/services/ap/keys.py +# MIT License + +from __future__ import annotations + +import logging +from pathlib import Path + +from circuitforge_core.activitypub import CFActor, generate_rsa_keypair, load_actor_from_key_file + +logger = logging.getLogger(__name__) + +_actor: CFActor | None = None + + +def get_actor() -> CFActor | None: + """Return the loaded instance actor, or None if AP is not enabled.""" + return _actor + + +def init_actor(host: str, key_path: Path) -> CFActor: + """Load or generate the instance RSA keypair and build the CFActor singleton. + + Called once at startup when AP_ENABLED=true. Generates a new 2048-bit keypair + if the key file does not yet exist (first boot). + """ + global _actor + + key_path.parent.mkdir(parents=True, exist_ok=True) + + if not key_path.exists(): + logger.info("AP: no key file found at %s — generating new RSA-2048 keypair", key_path) + private_pem, _pub = generate_rsa_keypair(bits=2048) + key_path.write_text(private_pem, encoding="utf-8") + key_path.chmod(0o600) + + base = f"https://{host}" + actor_id = f"{base}/ap/actor" + + _actor = load_actor_from_key_file( + actor_id=actor_id, + username="kiwi", + display_name="Kiwi Pantry", + private_key_path=str(key_path), + summary="Community pantry and recipe feed from a Kiwi instance.", + ) + logger.info("AP: instance actor loaded — %s", actor_id) + return _actor diff --git a/app/services/ap/mastodon.py b/app/services/ap/mastodon.py new file mode 100644 index 0000000..699d6b0 --- /dev/null +++ b/app/services/ap/mastodon.py @@ -0,0 +1,194 @@ +# app/services/ap/mastodon.py +# MIT License + +from __future__ import annotations + +import logging +from pathlib import Path + +import httpx + +logger = logging.getLogger(__name__) + +_APP_SCOPES = "write:statuses" +_APP_NAME = "Kiwi Pantry" +_APP_WEBSITE = "https://circuitforge.tech/kiwi" + + +def register_app(instance_url: str, redirect_uri: str) -> dict: + """Dynamically register Kiwi as an OAuth app on the user's Mastodon instance. + + Returns the app credentials dict (client_id, client_secret, etc.). + Raises httpx.HTTPError on failure. + """ + url = instance_url.rstrip("/") + "/api/v1/apps" + resp = httpx.post( + url, + data={ + "client_name": _APP_NAME, + "redirect_uris": redirect_uri, + "scopes": _APP_SCOPES, + "website": _APP_WEBSITE, + }, + timeout=10.0, + ) + resp.raise_for_status() + return resp.json() + + +def build_authorize_url(instance_url: str, client_id: str, redirect_uri: str) -> str: + """Return the OAuth authorize URL to redirect the user to.""" + return ( + f"{instance_url.rstrip('/')}/oauth/authorize" + f"?response_type=code" + f"&client_id={client_id}" + f"&redirect_uri={redirect_uri}" + f"&scope={_APP_SCOPES}" + ) + + +def exchange_code( + instance_url: str, + client_id: str, + client_secret: str, + code: str, + redirect_uri: str, +) -> str: + """Exchange an authorization code for an access token. Returns the token string.""" + url = instance_url.rstrip("/") + "/oauth/token" + resp = httpx.post( + url, + data={ + "grant_type": "authorization_code", + "client_id": client_id, + "client_secret": client_secret, + "redirect_uri": redirect_uri, + "code": code, + "scope": _APP_SCOPES, + }, + timeout=10.0, + ) + resp.raise_for_status() + return resp.json()["access_token"] + + +def post_status(instance_url: str, access_token: str, content: str) -> dict: + """Post a status to the user's Mastodon account. Returns the status response dict.""" + url = instance_url.rstrip("/") + "/api/v1/statuses" + resp = httpx.post( + url, + headers={"Authorization": f"Bearer {access_token}"}, + json={"status": content, "visibility": "public"}, + timeout=15.0, + ) + resp.raise_for_status() + return resp.json() + + +def build_post_content(post: dict) -> str: + """Format a community post dict as Mastodon-ready plain text.""" + title = post.get("title") or "Untitled" + recipe = post.get("recipe_name") + notes = post.get("outcome_notes") or post.get("description") + tags_raw: list[str] = post.get("dietary_tags") or [] + + lines = [] + if recipe and recipe != title: + lines.append(f"🍽 {title} — {recipe}") + else: + lines.append(f"🍽 {title}") + + if notes: + snippet = notes[:200].strip() + if len(notes) > 200: + snippet += "…" + lines.append(f"\n{snippet}") + + hashtags = ["#Kiwi", "#Cooking"] + for tag in tags_raw[:3]: + ht = "#" + "".join(w.capitalize() for w in tag.replace("-", " ").split()) + hashtags.append(ht) + lines.append("\n" + " ".join(hashtags)) + + return "\n".join(lines) + + +def store_token( + db_path: Path, + directus_user_id: str, + instance_url: str, + access_token: str, + encryption_key: str | None, +) -> None: + """Persist a Mastodon access token in the user's local kiwi.db.""" + token_to_store = _encrypt(access_token, encryption_key) + import sqlite3 + conn = sqlite3.connect(str(db_path)) + try: + conn.execute( + """INSERT INTO mastodon_tokens (directus_user_id, instance_url, access_token) + VALUES (?, ?, ?) + ON CONFLICT(directus_user_id) DO UPDATE SET + instance_url=excluded.instance_url, + access_token=excluded.access_token, + updated_at=datetime('now')""", + (directus_user_id, instance_url.rstrip("/"), token_to_store), + ) + conn.commit() + finally: + conn.close() + + +def get_token( + db_path: Path, + directus_user_id: str, + encryption_key: str | None, +) -> tuple[str, str] | None: + """Return (instance_url, plaintext_access_token) or None if not connected.""" + import sqlite3 + conn = sqlite3.connect(str(db_path)) + try: + row = conn.execute( + "SELECT instance_url, access_token FROM mastodon_tokens WHERE directus_user_id = ?", + (directus_user_id,), + ).fetchone() + finally: + conn.close() + if row is None: + return None + return row[0], _decrypt(row[1], encryption_key) + + +def delete_token(db_path: Path, directus_user_id: str) -> None: + """Remove the user's stored Mastodon token.""" + import sqlite3 + conn = sqlite3.connect(str(db_path)) + try: + conn.execute( + "DELETE FROM mastodon_tokens WHERE directus_user_id = ?", (directus_user_id,) + ) + conn.commit() + finally: + conn.close() + + +def _encrypt(plaintext: str, key: str | None) -> str: + if key is None: + return plaintext + try: + from cryptography.fernet import Fernet + return Fernet(key.encode()).encrypt(plaintext.encode()).decode() + except Exception: + logger.warning("Mastodon token encryption failed — storing plaintext") + return plaintext + + +def _decrypt(ciphertext: str, key: str | None) -> str: + if key is None: + return ciphertext + try: + from cryptography.fernet import Fernet + return Fernet(key.encode()).decrypt(ciphertext.encode()).decode() + except Exception: + logger.warning("Mastodon token decryption failed — returning as-is") + return ciphertext