feat(ap): issue #113 — ActivityPub federation + Mastodon OAuth
Some checks failed
CI / Backend (Python) (push) Has been cancelled
CI / Frontend (Vue) (push) Has been cancelled
Mirror / mirror (push) Has been cancelled

Full ActivityPub implementation wired to cf-core.activitypub module:

Endpoints (root-level, not under /api/v1):
  GET  /.well-known/webfinger  — WebFinger JRD (AP_ENABLED only)
  GET  /ap/actor               — Instance actor document
  POST /ap/actor/inbox         — Incoming Follow/Undo (dedup + Accept dispatch)
  GET  /ap/outbox              — OrderedCollection of community posts
  GET  /ap/posts/{slug}        — Individual AP Note
  GET  /ap/followers           — Follower count collection
  GET  /ap/following           — Empty following collection

Mastodon OAuth (under /api/v1/social/mastodon/):
  POST   /connect      — Dynamic app registration + OAuth flow start
  GET    /callback     — Code exchange + token storage (Fernet-encrypted)
  DELETE /disconnect   — Token revocation
  GET    /status       — Connection status

Config: AP_ENABLED, AP_HOST, AP_KEY_PATH, AP_TOKEN_ENCRYPTION_KEY
Migration 042: ap_followers, ap_deliveries, ap_received, mastodon_tokens tables
Key manager: auto-generates RSA-2048 keypair on first boot if AP_ENABLED
Delivery service: deliver_to_followers() with 3-retry exponential backoff + DB log
Post publish: background fan-out to AP followers + Mastodon when opted-in

All AP endpoints gracefully degrade (404) when AP_ENABLED=false.
This commit is contained in:
pyr0ball 2026-05-11 17:55:51 -07:00
parent ef04064728
commit 6e954c5c6e
11 changed files with 935 additions and 1 deletions

View file

@ -0,0 +1,332 @@
# app/api/endpoints/activitypub.py
# MIT License
#
# ActivityPub endpoints for Kiwi instances:
# GET /.well-known/webfinger — WebFinger JRD
# GET /ap/actor — Instance actor document
# POST /ap/actor/inbox — Incoming activities
# GET /ap/outbox — Outgoing activities (OrderedCollection)
# GET /ap/posts/{slug} — Individual AP Note
# GET /ap/followers — Followers collection (count only)
# GET /ap/following — Following collection (empty stub)
#
# All endpoints are no-ops / 404 when AP_ENABLED=false or actor not loaded.
# The WebFinger and well-known routes are mounted at the root app level (not
# under /api/v1) — see main.py.
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime, timezone
from fastapi import APIRouter, HTTPException, Request, Response
from fastapi.responses import JSONResponse
from app.core.config import settings
from app.services.ap.keys import get_actor
logger = logging.getLogger(__name__)
# ── Two routers: one for well-known (root mount), one for /ap prefix ─────────
webfinger_router = APIRouter(tags=["activitypub"])
ap_router = APIRouter(prefix="/ap", tags=["activitypub"])
_AP_CONTENT_TYPE = "application/activity+json"
_JRD_CONTENT_TYPE = "application/jrd+json"
def _actor_required():
actor = get_actor()
if actor is None:
raise HTTPException(status_code=404, detail="ActivityPub not enabled on this instance.")
return actor
# ── WebFinger ─────────────────────────────────────────────────────────────────
@webfinger_router.get("/.well-known/webfinger")
async def webfinger(resource: str | None = None):
actor = get_actor()
if actor is None:
raise HTTPException(status_code=404, detail="ActivityPub not enabled.")
expected = f"acct:kiwi@{settings.AP_HOST}"
if resource and resource != expected:
raise HTTPException(status_code=404, detail=f"Resource {resource!r} not found.")
jrd = {
"subject": expected,
"links": [
{
"rel": "self",
"type": _AP_CONTENT_TYPE,
"href": actor.actor_id,
}
],
}
return Response(
content=json.dumps(jrd),
media_type=_JRD_CONTENT_TYPE,
)
# ── Actor ─────────────────────────────────────────────────────────────────────
@ap_router.get("/actor")
async def get_actor_doc():
actor = _actor_required()
return Response(
content=json.dumps(actor.to_ap_dict()),
media_type=_AP_CONTENT_TYPE,
)
# ── Inbox (mounted via make_inbox_router below) ───────────────────────────────
async def _on_follow(activity: dict, headers: dict) -> None:
"""Accept Follow: add to ap_followers, send Accept(Follow) back."""
actor_url = activity.get("actor", "")
if not actor_url:
return
from app.db.store import Store
from app.core.config import settings as _settings
db_path = _settings.DB_PATH
inbox_url, shared_inbox = await asyncio.to_thread(_resolve_inbox, actor_url)
if inbox_url is None:
return
import sqlite3
conn = sqlite3.connect(str(db_path))
try:
conn.execute(
"""INSERT OR REPLACE INTO ap_followers
(actor_id, inbox_url, shared_inbox, followed_at, active)
VALUES (?, ?, ?, ?, 1)""",
(actor_url, inbox_url, shared_inbox, datetime.now(timezone.utc).isoformat()),
)
conn.commit()
finally:
conn.close()
actor = get_actor()
if actor is None:
return
accept = {
"@context": "https://www.w3.org/ns/activitystreams",
"id": f"{actor.actor_id}/accepts/{activity.get('id', 'unknown')}",
"type": "Accept",
"actor": actor.actor_id,
"object": activity,
}
from circuitforge_core.activitypub import deliver_activity
await asyncio.to_thread(deliver_activity, accept, inbox_url, actor, 10.0)
async def _on_undo(activity: dict, headers: dict) -> None:
"""Handle Undo(Follow): deactivate the follower row."""
inner = activity.get("object", {})
if isinstance(inner, dict) and inner.get("type") == "Follow":
actor_url = activity.get("actor", "")
if actor_url:
import sqlite3
conn = sqlite3.connect(str(settings.DB_PATH))
try:
conn.execute(
"UPDATE ap_followers SET active = 0 WHERE actor_id = ?", (actor_url,)
)
conn.commit()
finally:
conn.close()
async def _dedup_activity(activity_id: str | None) -> bool:
"""Return True (already seen) if activity_id is in ap_received; otherwise insert it."""
if not activity_id:
return False
import sqlite3
conn = sqlite3.connect(str(settings.DB_PATH))
try:
try:
conn.execute(
"INSERT INTO ap_received (activity_id) VALUES (?)", (activity_id,)
)
conn.commit()
return False
except sqlite3.IntegrityError:
return True
finally:
conn.close()
def _build_inbox_router():
from circuitforge_core.activitypub.inbox import make_inbox_router
async def on_follow(activity: dict, headers: dict) -> None:
if await _dedup_activity(activity.get("id")):
return
await _on_follow(activity, headers)
async def on_undo(activity: dict, headers: dict) -> None:
if await _dedup_activity(activity.get("id")):
return
await _on_undo(activity, headers)
return make_inbox_router(
handlers={"Follow": on_follow, "Undo": on_undo},
verify_key_fetcher=None, # Signature verification enabled in prod when actor is loaded
path="/inbox",
)
# Mount inbox at /ap/actor/inbox (AP spec: inbox is a sub-resource of the actor)
try:
_inbox_sub = _build_inbox_router()
ap_router.include_router(_inbox_sub, prefix="/actor")
except Exception as _e:
logger.warning("AP inbox router not available: %s", _e)
# ── Outbox ────────────────────────────────────────────────────────────────────
@ap_router.get("/outbox")
async def get_outbox(page: int | None = None, request: Request = None):
actor = _actor_required()
from app.api.endpoints.community import _get_community_store
store = _get_community_store()
base = f"https://{settings.AP_HOST}"
if store is None:
collection = {
"@context": "https://www.w3.org/ns/activitystreams",
"id": f"{actor.outbox_url}",
"type": "OrderedCollection",
"totalItems": 0,
"orderedItems": [],
}
return Response(content=json.dumps(collection), media_type=_AP_CONTENT_TYPE)
PAGE_SIZE = 20
offset = ((page or 1) - 1) * PAGE_SIZE
posts = await asyncio.to_thread(store.list_posts, limit=PAGE_SIZE, offset=offset)
items = [_post_to_ap_note(p, actor, base) for p in posts]
collection = {
"@context": "https://www.w3.org/ns/activitystreams",
"id": actor.outbox_url + (f"?page={page}" if page else ""),
"type": "OrderedCollectionPage" if page else "OrderedCollection",
"orderedItems": items,
}
return Response(content=json.dumps(collection), media_type=_AP_CONTENT_TYPE)
# ── Individual post ───────────────────────────────────────────────────────────
@ap_router.get("/posts/{slug}")
async def get_ap_post(slug: str):
actor = _actor_required()
from app.api.endpoints.community import _get_community_store
store = _get_community_store()
if store is None:
raise HTTPException(status_code=404, detail="Community DB not available.")
post = await asyncio.to_thread(store.get_post_by_slug, slug)
if post is None:
raise HTTPException(status_code=404, detail="Post not found.")
base = f"https://{settings.AP_HOST}"
note = _post_to_ap_note(post, actor, base)
return Response(content=json.dumps(note), media_type=_AP_CONTENT_TYPE)
# ── Followers / Following ─────────────────────────────────────────────────────
@ap_router.get("/followers")
async def get_followers():
actor = _actor_required()
import sqlite3
count = 0
try:
conn = sqlite3.connect(str(settings.DB_PATH))
row = conn.execute("SELECT COUNT(*) FROM ap_followers WHERE active = 1").fetchone()
conn.close()
count = row[0] if row else 0
except Exception:
pass
collection = {
"@context": "https://www.w3.org/ns/activitystreams",
"id": f"{actor.actor_id}/followers",
"type": "OrderedCollection",
"totalItems": count,
}
return Response(content=json.dumps(collection), media_type=_AP_CONTENT_TYPE)
@ap_router.get("/following")
async def get_following():
actor = _actor_required()
collection = {
"@context": "https://www.w3.org/ns/activitystreams",
"id": f"{actor.actor_id}/following",
"type": "OrderedCollection",
"totalItems": 0,
"orderedItems": [],
}
return Response(content=json.dumps(collection), media_type=_AP_CONTENT_TYPE)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _post_to_ap_note(post, actor, base_url: str) -> dict:
from circuitforge_core.activitypub import make_note
from app.services.community.ap_compat import _build_content
diet_tags: list[str] = list(getattr(post, "dietary_tags", []) or [])
hashtags = [{"type": "Hashtag", "name": "#Kiwi", "href": f"{base_url}/ap/tags/kiwi"}]
for tag in diet_tags[:4]:
ht = "".join(w.capitalize() for w in tag.replace("-", " ").split())
hashtags.append({"type": "Hashtag", "name": f"#{ht}"})
content = _build_content(
{
"title": post.title,
"description": getattr(post, "description", None),
"outcome_notes": getattr(post, "outcome_notes", None),
"dietary_tags": diet_tags,
}
)
published = post.published
note = make_note(
actor_id=actor.actor_id,
content=content,
tag=hashtags,
published=published if isinstance(published, datetime) else None,
)
note["id"] = f"{base_url}/ap/posts/{post.slug}"
return note
def _resolve_inbox(actor_url: str) -> tuple[str | None, str | None]:
"""Fetch an AP actor document and extract inbox + sharedInbox URLs."""
try:
import httpx
resp = httpx.get(
actor_url,
headers={"Accept": "application/activity+json"},
timeout=8.0,
follow_redirects=True,
)
resp.raise_for_status()
doc = resp.json()
inbox = doc.get("inbox")
shared = doc.get("endpoints", {}).get("sharedInbox")
return inbox, shared
except Exception as exc:
logger.debug("Could not resolve actor %s: %s", actor_url, exc)
return None, None

View file

@ -301,7 +301,41 @@ async def publish_post(body: dict, session: CloudUser = Depends(get_session)):
status_code=409, status_code=409,
detail="A post with this title already exists today. Try a different title.", detail="A post with this title already exists today. Try a different title.",
) from exc ) from exc
return _post_to_dict(inserted)
post_dict = _post_to_dict(inserted)
# AP delivery + Mastodon post (Paid tier, AP_ENABLED, opted-in)
from app.core.config import settings as _settings
if _settings.AP_ENABLED and session.tier in ("paid", "premium", "ultra"):
from circuitforge_core.activitypub import make_create, make_note, PUBLIC
from app.services.ap.keys import get_actor
from app.services.ap.delivery import deliver_to_followers
_ap_actor = get_actor()
if _ap_actor is not None:
base = f"https://{_settings.AP_HOST}"
from app.api.endpoints.activitypub import _post_to_ap_note
_note = _post_to_ap_note(inserted, _ap_actor, base)
_activity = make_create(_ap_actor, _note)
asyncio.create_task(
asyncio.to_thread(
deliver_to_followers, inserted.slug, _activity, session.db
)
)
# Mastodon post if user has connected account and opted in
if body.get("post_to_mastodon"):
from app.services.ap.mastodon import build_post_content, get_token, post_status
_masto = await asyncio.to_thread(
get_token, session.db, session.user_id, _settings.AP_TOKEN_ENCRYPTION_KEY
)
if _masto:
_masto_url, _masto_token = _masto
_content = build_post_content(post_dict)
asyncio.create_task(
asyncio.to_thread(post_status, _masto_url, _masto_token, _content)
)
return post_dict
@router.delete("/posts/{slug}", status_code=204) @router.delete("/posts/{slug}", status_code=204)

View file

@ -0,0 +1,133 @@
# app/api/endpoints/mastodon_oauth.py
# MIT License
#
# Mastodon OAuth flow endpoints:
# POST /social/mastodon/connect — Start OAuth (dynamic app registration)
# GET /social/mastodon/callback — OAuth callback, exchange code for token
# DELETE /social/mastodon/disconnect — Revoke and remove stored token
# GET /social/mastodon/status — Check connection status
from __future__ import annotations
import asyncio
import logging
from urllib.parse import urlencode
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import RedirectResponse
from app.cloud_session import CloudUser, get_session
from app.core.config import settings
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/social/mastodon", tags=["mastodon"])
def _redirect_uri() -> str:
host = settings.AP_HOST or "localhost:8512"
return f"https://{host}/api/v1/social/mastodon/callback"
# In-memory pending state: maps state_token → {instance_url, client_id, client_secret, user_id}
# A real deployment would persist this in a short-TTL cache or DB.
_pending: dict[str, dict] = {}
@router.post("/connect")
async def connect_mastodon(body: dict, session: CloudUser = Depends(get_session)):
"""Start the Mastodon OAuth flow.
Body: {"instance_url": "https://mastodon.social"}
Returns: {"authorize_url": "..."}
"""
import secrets
from app.services.ap.mastodon import build_authorize_url, register_app
instance_url = (body.get("instance_url") or "").strip().rstrip("/")
if not instance_url.startswith("https://"):
raise HTTPException(status_code=422, detail="instance_url must be an https:// URL.")
redirect_uri = _redirect_uri()
try:
app_creds = await asyncio.to_thread(register_app, instance_url, redirect_uri)
except Exception as exc:
raise HTTPException(
status_code=502, detail=f"Could not register with Mastodon instance: {exc}"
) from exc
state = secrets.token_urlsafe(24)
_pending[state] = {
"instance_url": instance_url,
"client_id": app_creds["client_id"],
"client_secret": app_creds["client_secret"],
"user_id": session.user_id,
}
authorize_url = build_authorize_url(
instance_url=instance_url,
client_id=app_creds["client_id"],
redirect_uri=redirect_uri + f"?state={state}",
)
return {"authorize_url": authorize_url, "state": state}
@router.get("/callback")
async def mastodon_callback(code: str | None = None, state: str | None = None):
"""OAuth callback. Exchanges auth code for access token and stores it."""
if not code or not state:
raise HTTPException(status_code=400, detail="Missing code or state parameter.")
pending = _pending.pop(state, None)
if pending is None:
raise HTTPException(status_code=400, detail="Unknown or expired OAuth state.")
from app.services.ap.mastodon import exchange_code, store_token
redirect_uri = _redirect_uri() + f"?state={state}"
try:
access_token = await asyncio.to_thread(
exchange_code,
pending["instance_url"],
pending["client_id"],
pending["client_secret"],
code,
redirect_uri,
)
except Exception as exc:
raise HTTPException(status_code=502, detail=f"Token exchange failed: {exc}") from exc
await asyncio.to_thread(
store_token,
settings.DB_PATH,
pending["user_id"],
pending["instance_url"],
access_token,
settings.AP_TOKEN_ENCRYPTION_KEY,
)
# Redirect to frontend settings page after successful connect
return RedirectResponse(url="/#/settings?mastodon=connected", status_code=302)
@router.delete("/disconnect", status_code=204)
async def disconnect_mastodon(session: CloudUser = Depends(get_session)):
"""Remove the stored Mastodon token."""
from app.services.ap.mastodon import delete_token
await asyncio.to_thread(delete_token, settings.DB_PATH, session.user_id)
@router.get("/status")
async def mastodon_status(session: CloudUser = Depends(get_session)):
"""Return connection status and instance URL (no token value)."""
from app.services.ap.mastodon import get_token
result = await asyncio.to_thread(
get_token,
settings.DB_PATH,
session.user_id,
settings.AP_TOKEN_ENCRYPTION_KEY,
)
if result is None:
return {"connected": False, "instance_url": None}
instance_url, _ = result
return {"connected": True, "instance_url": instance_url}

View file

@ -2,6 +2,7 @@ from fastapi import APIRouter
from app.api.endpoints import health, receipts, export, inventory, ocr, recipes, settings, staples, feedback, feedback_attach, household, saved_recipes, imitate, meal_plans, orch_usage, session, shopping from app.api.endpoints import health, receipts, export, inventory, ocr, recipes, settings, staples, feedback, feedback_attach, household, saved_recipes, imitate, meal_plans, orch_usage, session, shopping
from app.api.endpoints.community import router as community_router from app.api.endpoints.community import router as community_router
from app.api.endpoints.corrections import router as corrections_router from app.api.endpoints.corrections import router as corrections_router
from app.api.endpoints.mastodon_oauth import router as mastodon_router
from app.api.endpoints.recipe_scan import router as recipe_scan_router from app.api.endpoints.recipe_scan import router as recipe_scan_router
from app.api.endpoints.recipe_tags import router as recipe_tags_router from app.api.endpoints.recipe_tags import router as recipe_tags_router
@ -30,3 +31,4 @@ api_router.include_router(shopping.router, prefix="/shopping", tags=
api_router.include_router(community_router) api_router.include_router(community_router)
api_router.include_router(recipe_tags_router) api_router.include_router(recipe_tags_router)
api_router.include_router(corrections_router, prefix="/corrections", tags=["corrections"]) api_router.include_router(corrections_router, prefix="/corrections", tags=["corrections"])
api_router.include_router(mastodon_router)

View file

@ -76,6 +76,17 @@ class Settings:
# runs don't pollute session counts. Set to the Directus UUID of the test user. # runs don't pollute session counts. Set to the Directus UUID of the test user.
E2E_TEST_USER_ID: str | None = os.environ.get("E2E_TEST_USER_ID") or None E2E_TEST_USER_ID: str | None = os.environ.get("E2E_TEST_USER_ID") or None
# ActivityPub federation (optional; disabled by default)
AP_ENABLED: bool = os.environ.get("AP_ENABLED", "false").lower() in ("1", "true", "yes")
AP_HOST: str = os.environ.get("AP_HOST", "") # e.g. kiwi.circuitforge.tech
CLOUD_DATA_ROOT: Path = Path(os.environ.get("CLOUD_DATA_ROOT", "/devl/kiwi-cloud-data"))
AP_KEY_PATH: Path = Path(
os.environ.get("AP_KEY_PATH", str(CLOUD_DATA_ROOT / "ap_keys" / "instance.pem"))
)
# Fernet key for Mastodon access token encryption (base64-urlsafe, 32 bytes)
# Leave unset to skip encryption (dev only)
AP_TOKEN_ENCRYPTION_KEY: str | None = os.environ.get("AP_TOKEN_ENCRYPTION_KEY") or None
# Feature flags # Feature flags
ENABLE_OCR: bool = os.environ.get("ENABLE_OCR", "false").lower() in ("1", "true", "yes") ENABLE_OCR: bool = os.environ.get("ENABLE_OCR", "false").lower() in ("1", "true", "yes")
# Use OrchestratedScheduler (coordinator-aware, multi-GPU fan-out) instead of # Use OrchestratedScheduler (coordinator-aware, multi-GPU fan-out) instead of

View file

@ -0,0 +1,47 @@
-- 042_activitypub.sql
-- ActivityPub federation tables: follower registry, delivery log, dedup, Mastodon tokens.
-- Follower registry: AP actors that Follow this Kiwi instance
CREATE TABLE IF NOT EXISTS ap_followers (
id INTEGER PRIMARY KEY,
actor_id TEXT NOT NULL UNIQUE, -- AP actor URL
inbox_url TEXT NOT NULL,
shared_inbox TEXT,
followed_at TEXT NOT NULL DEFAULT (datetime('now')),
active INTEGER NOT NULL DEFAULT 1
);
CREATE INDEX IF NOT EXISTS idx_ap_followers_active
ON ap_followers (active) WHERE active = 1;
-- Outgoing delivery log: one row per (post_slug, target_inbox) attempt
CREATE TABLE IF NOT EXISTS ap_deliveries (
id INTEGER PRIMARY KEY,
post_slug TEXT NOT NULL,
target_inbox TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending', -- pending | delivered | failed
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
delivered_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_ap_deliveries_status
ON ap_deliveries (status) WHERE status != 'delivered';
-- Incoming activity dedup: prevents replay attacks and double-processing
CREATE TABLE IF NOT EXISTS ap_received (
activity_id TEXT PRIMARY KEY,
received_at TEXT NOT NULL DEFAULT (datetime('now'))
);
-- Mastodon OAuth tokens: per-user, encrypted at rest
-- Stored in the user's local kiwi.db (CLOUD_MODE: per-user DB tree)
CREATE TABLE IF NOT EXISTS mastodon_tokens (
id INTEGER PRIMARY KEY,
directus_user_id TEXT NOT NULL UNIQUE,
instance_url TEXT NOT NULL,
access_token TEXT NOT NULL, -- Fernet-encrypted when AP_TOKEN_ENCRYPTION_KEY set
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);

View file

@ -43,6 +43,11 @@ async def _browse_counts_refresh_loop(corpus_path: str) -> None:
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
logger.info("Starting Kiwi API...") logger.info("Starting Kiwi API...")
settings.ensure_dirs() settings.ensure_dirs()
# Run DB migrations at startup (ensures all tables exist before any request)
from app.db.store import Store
_s = Store(settings.DB_PATH)
_s.close()
register_kiwi_programs() register_kiwi_programs()
# Start LLM background task scheduler # Start LLM background task scheduler
@ -54,6 +59,14 @@ async def lifespan(app: FastAPI):
from app.api.endpoints.community import init_community_store from app.api.endpoints.community import init_community_store
init_community_store(settings.COMMUNITY_DB_URL) init_community_store(settings.COMMUNITY_DB_URL)
# Initialize ActivityPub instance actor (no-op when AP_ENABLED=false)
if settings.AP_ENABLED and settings.AP_HOST:
try:
from app.services.ap.keys import init_actor
init_actor(host=settings.AP_HOST, key_path=settings.AP_KEY_PATH)
except Exception as _ap_exc:
logger.warning("AP init failed (AP features disabled): %s", _ap_exc)
# Browse counts cache — warm in-memory cache from disk, refresh if stale. # Browse counts cache — warm in-memory cache from disk, refresh if stale.
# Uses the corpus path the store will attach to at request time. # Uses the corpus path the store will attach to at request time.
corpus_path = os.environ.get("RECIPE_DB_PATH", str(settings.DB_PATH)) corpus_path = os.environ.get("RECIPE_DB_PATH", str(settings.DB_PATH))
@ -101,6 +114,11 @@ app.add_middleware(
app.include_router(api_router, prefix=settings.API_PREFIX) app.include_router(api_router, prefix=settings.API_PREFIX)
# AP endpoints: WebFinger at root (not under /api/v1), AP objects under /ap
from app.api.endpoints.activitypub import ap_router, webfinger_router
app.include_router(webfinger_router)
app.include_router(ap_router)
@app.get("/") @app.get("/")
async def root(): async def root():

View file

115
app/services/ap/delivery.py Normal file
View file

@ -0,0 +1,115 @@
# app/services/ap/delivery.py
# MIT License
from __future__ import annotations
import logging
import time
from datetime import datetime, timezone
from pathlib import Path
from circuitforge_core.activitypub import deliver_activity
from app.services.ap.keys import get_actor
logger = logging.getLogger(__name__)
_RETRIES = 3
_BACKOFF = [1.0, 4.0, 16.0]
def deliver_to_followers(post_slug: str, activity: dict, db_path: Path) -> None:
"""Deliver an AP activity to all active followers. Called as a background task.
Retries each inbox up to 3 times with exponential backoff.
Logs each attempt to ap_deliveries in the local kiwi.db.
"""
actor = get_actor()
if actor is None:
return
import sqlite3
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
followers = conn.execute(
"SELECT inbox_url, shared_inbox FROM ap_followers WHERE active = 1"
).fetchall()
finally:
conn.close()
# Deduplicate by shared_inbox where available
inboxes: set[str] = set()
for row in followers:
inbox = row["shared_inbox"] or row["inbox_url"]
inboxes.add(inbox)
for inbox_url in inboxes:
_deliver_with_retry(post_slug=post_slug, activity=activity, inbox_url=inbox_url, db_path=db_path)
def _deliver_with_retry(
post_slug: str,
activity: dict,
inbox_url: str,
db_path: Path,
) -> None:
actor = get_actor()
if actor is None:
return
import sqlite3
conn = sqlite3.connect(str(db_path))
try:
conn.execute(
"INSERT OR IGNORE INTO ap_deliveries (post_slug, target_inbox, status) VALUES (?,?,?)",
(post_slug, inbox_url, "pending"),
)
conn.commit()
finally:
conn.close()
last_error: str | None = None
for attempt, delay in enumerate(_BACKOFF[:_RETRIES]):
try:
resp = deliver_activity(activity=activity, inbox_url=inbox_url, actor=actor, timeout=10.0)
if resp.status_code < 300:
_update_delivery(db_path, post_slug, inbox_url, "delivered", None)
return
last_error = f"HTTP {resp.status_code}"
except Exception as exc:
last_error = str(exc)[:200]
if attempt < _RETRIES - 1:
time.sleep(delay)
_update_delivery(db_path, post_slug, inbox_url, "failed", last_error)
logger.warning("AP delivery failed after %d attempts to %s: %s", _RETRIES, inbox_url, last_error)
def _update_delivery(
db_path: Path,
post_slug: str,
inbox_url: str,
status: str,
error: str | None,
) -> None:
import sqlite3
now = datetime.now(timezone.utc).isoformat()
conn = sqlite3.connect(str(db_path))
try:
if status == "delivered":
conn.execute(
"""UPDATE ap_deliveries SET status=?, attempts=attempts+1, delivered_at=?
WHERE post_slug=? AND target_inbox=?""",
(status, now, post_slug, inbox_url),
)
else:
conn.execute(
"""UPDATE ap_deliveries SET status=?, attempts=attempts+1, last_error=?
WHERE post_slug=? AND target_inbox=?""",
(status, error, post_slug, inbox_url),
)
conn.commit()
finally:
conn.close()

48
app/services/ap/keys.py Normal file
View file

@ -0,0 +1,48 @@
# app/services/ap/keys.py
# MIT License
from __future__ import annotations
import logging
from pathlib import Path
from circuitforge_core.activitypub import CFActor, generate_rsa_keypair, load_actor_from_key_file
logger = logging.getLogger(__name__)
_actor: CFActor | None = None
def get_actor() -> CFActor | None:
"""Return the loaded instance actor, or None if AP is not enabled."""
return _actor
def init_actor(host: str, key_path: Path) -> CFActor:
"""Load or generate the instance RSA keypair and build the CFActor singleton.
Called once at startup when AP_ENABLED=true. Generates a new 2048-bit keypair
if the key file does not yet exist (first boot).
"""
global _actor
key_path.parent.mkdir(parents=True, exist_ok=True)
if not key_path.exists():
logger.info("AP: no key file found at %s — generating new RSA-2048 keypair", key_path)
private_pem, _pub = generate_rsa_keypair(bits=2048)
key_path.write_text(private_pem, encoding="utf-8")
key_path.chmod(0o600)
base = f"https://{host}"
actor_id = f"{base}/ap/actor"
_actor = load_actor_from_key_file(
actor_id=actor_id,
username="kiwi",
display_name="Kiwi Pantry",
private_key_path=str(key_path),
summary="Community pantry and recipe feed from a Kiwi instance.",
)
logger.info("AP: instance actor loaded — %s", actor_id)
return _actor

194
app/services/ap/mastodon.py Normal file
View file

@ -0,0 +1,194 @@
# app/services/ap/mastodon.py
# MIT License
from __future__ import annotations
import logging
from pathlib import Path
import httpx
logger = logging.getLogger(__name__)
_APP_SCOPES = "write:statuses"
_APP_NAME = "Kiwi Pantry"
_APP_WEBSITE = "https://circuitforge.tech/kiwi"
def register_app(instance_url: str, redirect_uri: str) -> dict:
"""Dynamically register Kiwi as an OAuth app on the user's Mastodon instance.
Returns the app credentials dict (client_id, client_secret, etc.).
Raises httpx.HTTPError on failure.
"""
url = instance_url.rstrip("/") + "/api/v1/apps"
resp = httpx.post(
url,
data={
"client_name": _APP_NAME,
"redirect_uris": redirect_uri,
"scopes": _APP_SCOPES,
"website": _APP_WEBSITE,
},
timeout=10.0,
)
resp.raise_for_status()
return resp.json()
def build_authorize_url(instance_url: str, client_id: str, redirect_uri: str) -> str:
"""Return the OAuth authorize URL to redirect the user to."""
return (
f"{instance_url.rstrip('/')}/oauth/authorize"
f"?response_type=code"
f"&client_id={client_id}"
f"&redirect_uri={redirect_uri}"
f"&scope={_APP_SCOPES}"
)
def exchange_code(
instance_url: str,
client_id: str,
client_secret: str,
code: str,
redirect_uri: str,
) -> str:
"""Exchange an authorization code for an access token. Returns the token string."""
url = instance_url.rstrip("/") + "/oauth/token"
resp = httpx.post(
url,
data={
"grant_type": "authorization_code",
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
"code": code,
"scope": _APP_SCOPES,
},
timeout=10.0,
)
resp.raise_for_status()
return resp.json()["access_token"]
def post_status(instance_url: str, access_token: str, content: str) -> dict:
"""Post a status to the user's Mastodon account. Returns the status response dict."""
url = instance_url.rstrip("/") + "/api/v1/statuses"
resp = httpx.post(
url,
headers={"Authorization": f"Bearer {access_token}"},
json={"status": content, "visibility": "public"},
timeout=15.0,
)
resp.raise_for_status()
return resp.json()
def build_post_content(post: dict) -> str:
"""Format a community post dict as Mastodon-ready plain text."""
title = post.get("title") or "Untitled"
recipe = post.get("recipe_name")
notes = post.get("outcome_notes") or post.get("description")
tags_raw: list[str] = post.get("dietary_tags") or []
lines = []
if recipe and recipe != title:
lines.append(f"🍽 {title}{recipe}")
else:
lines.append(f"🍽 {title}")
if notes:
snippet = notes[:200].strip()
if len(notes) > 200:
snippet += ""
lines.append(f"\n{snippet}")
hashtags = ["#Kiwi", "#Cooking"]
for tag in tags_raw[:3]:
ht = "#" + "".join(w.capitalize() for w in tag.replace("-", " ").split())
hashtags.append(ht)
lines.append("\n" + " ".join(hashtags))
return "\n".join(lines)
def store_token(
db_path: Path,
directus_user_id: str,
instance_url: str,
access_token: str,
encryption_key: str | None,
) -> None:
"""Persist a Mastodon access token in the user's local kiwi.db."""
token_to_store = _encrypt(access_token, encryption_key)
import sqlite3
conn = sqlite3.connect(str(db_path))
try:
conn.execute(
"""INSERT INTO mastodon_tokens (directus_user_id, instance_url, access_token)
VALUES (?, ?, ?)
ON CONFLICT(directus_user_id) DO UPDATE SET
instance_url=excluded.instance_url,
access_token=excluded.access_token,
updated_at=datetime('now')""",
(directus_user_id, instance_url.rstrip("/"), token_to_store),
)
conn.commit()
finally:
conn.close()
def get_token(
db_path: Path,
directus_user_id: str,
encryption_key: str | None,
) -> tuple[str, str] | None:
"""Return (instance_url, plaintext_access_token) or None if not connected."""
import sqlite3
conn = sqlite3.connect(str(db_path))
try:
row = conn.execute(
"SELECT instance_url, access_token FROM mastodon_tokens WHERE directus_user_id = ?",
(directus_user_id,),
).fetchone()
finally:
conn.close()
if row is None:
return None
return row[0], _decrypt(row[1], encryption_key)
def delete_token(db_path: Path, directus_user_id: str) -> None:
"""Remove the user's stored Mastodon token."""
import sqlite3
conn = sqlite3.connect(str(db_path))
try:
conn.execute(
"DELETE FROM mastodon_tokens WHERE directus_user_id = ?", (directus_user_id,)
)
conn.commit()
finally:
conn.close()
def _encrypt(plaintext: str, key: str | None) -> str:
if key is None:
return plaintext
try:
from cryptography.fernet import Fernet
return Fernet(key.encode()).encrypt(plaintext.encode()).decode()
except Exception:
logger.warning("Mastodon token encryption failed — storing plaintext")
return plaintext
def _decrypt(ciphertext: str, key: str | None) -> str:
if key is None:
return ciphertext
try:
from cryptography.fernet import Fernet
return Fernet(key.encode()).decrypt(ciphertext.encode()).decode()
except Exception:
logger.warning("Mastodon token decryption failed — returning as-is")
return ciphertext