kiwi/app/services/ap/mastodon.py
pyr0ball 6e954c5c6e
Some checks failed
CI / Backend (Python) (push) Has been cancelled
CI / Frontend (Vue) (push) Has been cancelled
Mirror / mirror (push) Has been cancelled
feat(ap): issue #113 — ActivityPub federation + Mastodon OAuth
Full ActivityPub implementation wired to cf-core.activitypub module:

Endpoints (root-level, not under /api/v1):
  GET  /.well-known/webfinger  — WebFinger JRD (AP_ENABLED only)
  GET  /ap/actor               — Instance actor document
  POST /ap/actor/inbox         — Incoming Follow/Undo (dedup + Accept dispatch)
  GET  /ap/outbox              — OrderedCollection of community posts
  GET  /ap/posts/{slug}        — Individual AP Note
  GET  /ap/followers           — Follower count collection
  GET  /ap/following           — Empty following collection

Mastodon OAuth (under /api/v1/social/mastodon/):
  POST   /connect      — Dynamic app registration + OAuth flow start
  GET    /callback     — Code exchange + token storage (Fernet-encrypted)
  DELETE /disconnect   — Token revocation
  GET    /status       — Connection status

Config: AP_ENABLED, AP_HOST, AP_KEY_PATH, AP_TOKEN_ENCRYPTION_KEY
Migration 042: ap_followers, ap_deliveries, ap_received, mastodon_tokens tables
Key manager: auto-generates RSA-2048 keypair on first boot if AP_ENABLED
Delivery service: deliver_to_followers() with 3-retry exponential backoff + DB log
Post publish: background fan-out to AP followers + Mastodon when opted-in

All AP endpoints gracefully degrade (404) when AP_ENABLED=false.
2026-05-11 17:55:51 -07:00

194 lines
5.6 KiB
Python

# app/services/ap/mastodon.py
# MIT License
from __future__ import annotations
import logging
from pathlib import Path
import httpx
logger = logging.getLogger(__name__)
_APP_SCOPES = "write:statuses"
_APP_NAME = "Kiwi Pantry"
_APP_WEBSITE = "https://circuitforge.tech/kiwi"
def register_app(instance_url: str, redirect_uri: str) -> dict:
"""Dynamically register Kiwi as an OAuth app on the user's Mastodon instance.
Returns the app credentials dict (client_id, client_secret, etc.).
Raises httpx.HTTPError on failure.
"""
url = instance_url.rstrip("/") + "/api/v1/apps"
resp = httpx.post(
url,
data={
"client_name": _APP_NAME,
"redirect_uris": redirect_uri,
"scopes": _APP_SCOPES,
"website": _APP_WEBSITE,
},
timeout=10.0,
)
resp.raise_for_status()
return resp.json()
def build_authorize_url(instance_url: str, client_id: str, redirect_uri: str) -> str:
"""Return the OAuth authorize URL to redirect the user to."""
return (
f"{instance_url.rstrip('/')}/oauth/authorize"
f"?response_type=code"
f"&client_id={client_id}"
f"&redirect_uri={redirect_uri}"
f"&scope={_APP_SCOPES}"
)
def exchange_code(
instance_url: str,
client_id: str,
client_secret: str,
code: str,
redirect_uri: str,
) -> str:
"""Exchange an authorization code for an access token. Returns the token string."""
url = instance_url.rstrip("/") + "/oauth/token"
resp = httpx.post(
url,
data={
"grant_type": "authorization_code",
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
"code": code,
"scope": _APP_SCOPES,
},
timeout=10.0,
)
resp.raise_for_status()
return resp.json()["access_token"]
def post_status(instance_url: str, access_token: str, content: str) -> dict:
"""Post a status to the user's Mastodon account. Returns the status response dict."""
url = instance_url.rstrip("/") + "/api/v1/statuses"
resp = httpx.post(
url,
headers={"Authorization": f"Bearer {access_token}"},
json={"status": content, "visibility": "public"},
timeout=15.0,
)
resp.raise_for_status()
return resp.json()
def build_post_content(post: dict) -> str:
"""Format a community post dict as Mastodon-ready plain text."""
title = post.get("title") or "Untitled"
recipe = post.get("recipe_name")
notes = post.get("outcome_notes") or post.get("description")
tags_raw: list[str] = post.get("dietary_tags") or []
lines = []
if recipe and recipe != title:
lines.append(f"🍽 {title}{recipe}")
else:
lines.append(f"🍽 {title}")
if notes:
snippet = notes[:200].strip()
if len(notes) > 200:
snippet += ""
lines.append(f"\n{snippet}")
hashtags = ["#Kiwi", "#Cooking"]
for tag in tags_raw[:3]:
ht = "#" + "".join(w.capitalize() for w in tag.replace("-", " ").split())
hashtags.append(ht)
lines.append("\n" + " ".join(hashtags))
return "\n".join(lines)
def store_token(
db_path: Path,
directus_user_id: str,
instance_url: str,
access_token: str,
encryption_key: str | None,
) -> None:
"""Persist a Mastodon access token in the user's local kiwi.db."""
token_to_store = _encrypt(access_token, encryption_key)
import sqlite3
conn = sqlite3.connect(str(db_path))
try:
conn.execute(
"""INSERT INTO mastodon_tokens (directus_user_id, instance_url, access_token)
VALUES (?, ?, ?)
ON CONFLICT(directus_user_id) DO UPDATE SET
instance_url=excluded.instance_url,
access_token=excluded.access_token,
updated_at=datetime('now')""",
(directus_user_id, instance_url.rstrip("/"), token_to_store),
)
conn.commit()
finally:
conn.close()
def get_token(
db_path: Path,
directus_user_id: str,
encryption_key: str | None,
) -> tuple[str, str] | None:
"""Return (instance_url, plaintext_access_token) or None if not connected."""
import sqlite3
conn = sqlite3.connect(str(db_path))
try:
row = conn.execute(
"SELECT instance_url, access_token FROM mastodon_tokens WHERE directus_user_id = ?",
(directus_user_id,),
).fetchone()
finally:
conn.close()
if row is None:
return None
return row[0], _decrypt(row[1], encryption_key)
def delete_token(db_path: Path, directus_user_id: str) -> None:
"""Remove the user's stored Mastodon token."""
import sqlite3
conn = sqlite3.connect(str(db_path))
try:
conn.execute(
"DELETE FROM mastodon_tokens WHERE directus_user_id = ?", (directus_user_id,)
)
conn.commit()
finally:
conn.close()
def _encrypt(plaintext: str, key: str | None) -> str:
if key is None:
return plaintext
try:
from cryptography.fernet import Fernet
return Fernet(key.encode()).encrypt(plaintext.encode()).decode()
except Exception:
logger.warning("Mastodon token encryption failed — storing plaintext")
return plaintext
def _decrypt(ciphertext: str, key: str | None) -> str:
if key is None:
return ciphertext
try:
from cryptography.fernet import Fernet
return Fernet(key.encode()).decrypt(ciphertext.encode()).decode()
except Exception:
logger.warning("Mastodon token decryption failed — returning as-is")
return ciphertext