From 1553ff163028c5eef71525bf3a94c4f7fa445d84 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 20 Apr 2026 13:18:03 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20add=20activitypub=20module=20=E2=80=94?= =?UTF-8?q?=20actor,=20objects,=20signing,=20delivery,=20Lemmy,=20inbox=20?= =?UTF-8?q?(closes=20#51)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CFActor (frozen dataclass, RSA keygen), AS2 object constructors (Note, Offer, Request, Create), HTTP Signatures (draft-cavage-http-signatures-08, rsa-sha256), signed delivery via requests, Lemmy REST client (JWT auth), FastAPI inbox router with optional signature verification. Digest header re-verified against actual body bytes on verify_signature() to prevent body-swap attacks. inbox.py omits __future__ annotations to avoid FastAPI's annotation-resolution-against-module-globals constraint. 105 tests. Bumps to v0.14.0. --- CHANGELOG.md | 21 +++ circuitforge_core/__init__.py | 2 +- circuitforge_core/activitypub/__init__.py | 55 ++++++ circuitforge_core/activitypub/actor.py | 146 ++++++++++++++++ circuitforge_core/activitypub/delivery.py | 56 ++++++ circuitforge_core/activitypub/inbox.py | 128 ++++++++++++++ circuitforge_core/activitypub/lemmy.py | 173 +++++++++++++++++++ circuitforge_core/activitypub/objects.py | 168 ++++++++++++++++++ circuitforge_core/activitypub/signing.py | 197 ++++++++++++++++++++++ pyproject.toml | 2 +- tests/test_activitypub/__init__.py | 0 tests/test_activitypub/test_actor.py | 152 +++++++++++++++++ tests/test_activitypub/test_delivery.py | 61 +++++++ tests/test_activitypub/test_inbox.py | 145 ++++++++++++++++ tests/test_activitypub/test_lemmy.py | 163 ++++++++++++++++++ tests/test_activitypub/test_objects.py | 178 +++++++++++++++++++ tests/test_activitypub/test_signing.py | 126 ++++++++++++++ 17 files changed, 1771 insertions(+), 2 deletions(-) create mode 100644 circuitforge_core/activitypub/__init__.py create mode 100644 circuitforge_core/activitypub/actor.py create mode 100644 circuitforge_core/activitypub/delivery.py create mode 100644 circuitforge_core/activitypub/inbox.py create mode 100644 circuitforge_core/activitypub/lemmy.py create mode 100644 circuitforge_core/activitypub/objects.py create mode 100644 circuitforge_core/activitypub/signing.py create mode 100644 tests/test_activitypub/__init__.py create mode 100644 tests/test_activitypub/test_actor.py create mode 100644 tests/test_activitypub/test_delivery.py create mode 100644 tests/test_activitypub/test_inbox.py create mode 100644 tests/test_activitypub/test_lemmy.py create mode 100644 tests/test_activitypub/test_objects.py create mode 100644 tests/test_activitypub/test_signing.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 73cce1c..6194c2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,27 @@ Versions follow [Semantic Versioning](https://semver.org/spec/v2.0.0.html). --- +## [0.14.0] — 2026-04-20 + +### Added + +**`circuitforge_core.activitypub`** — ActivityPub actor management, object construction, HTTP Signature signing, delivery, and Lemmy integration (MIT, closes #51) + +- `actor.py` — `CFActor` frozen dataclass; `generate_rsa_keypair(bits)`; `make_actor()`; `load_actor_from_key_file()`. `to_ap_dict()` produces an ActivityPub Application/Person object and never includes the private key. +- `objects.py` — `make_note()`, `make_offer()`, `make_request()` (CF namespace extension), `make_create()`. All return plain dicts; IDs minted with UUID4. `make_request` uses `https://circuitforge.tech/ns/activitystreams` context extension for the non-AS2 Request type. +- `signing.py` — `sign_headers()` (draft-cavage-http-signatures-08, rsa-sha256; signs `(request-target)`, `host`, `date`, `digest`, `content-type`). `verify_signature()` re-computes Digest from actual body after signature verification to catch body-swap attacks. +- `delivery.py` — `deliver_activity(activity, inbox_url, actor)` — synchronous `requests.post` with signed headers and `Content-Type: application/activity+json`. +- `lemmy.py` — `LemmyConfig` frozen dataclass; `LemmyClient` with `login()`, `resolve_community()` (bare name or `!community@instance` address), `post_to_community()`. Uses Lemmy v0.19+ REST API (JWT auth). `LemmyAuthError` / `LemmyCommunityNotFound` exceptions. +- `inbox.py` — `make_inbox_router(handlers, verify_key_fetcher, path)` — FastAPI APIRouter stub; dispatches by activity type; optional HTTP Signature verification via async `verify_key_fetcher` callback. FastAPI imported at module level with `_FASTAPI_AVAILABLE` guard (avoids annotation-resolution bug with lazy string annotations). +- 105 tests across all six files. + +**Key design notes:** +- `inbox` not re-exported from `__init__` — requires fastapi, imported explicitly by products that need it +- Signing Digest + re-verifying digest against body on verify — prevents body-swap attacks even when signature is valid +- `from __future__ import annotations` intentionally omitted in `inbox.py` — FastAPI resolves `Request` annotation against module globals at route registration time + +--- + ## [0.13.0] — 2026-04-20 ### Added diff --git a/circuitforge_core/__init__.py b/circuitforge_core/__init__.py index 172b8e1..c6a28d9 100644 --- a/circuitforge_core/__init__.py +++ b/circuitforge_core/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.13.0" +__version__ = "0.14.0" try: from circuitforge_core.community import CommunityDB, CommunityPost, SharedStore diff --git a/circuitforge_core/activitypub/__init__.py b/circuitforge_core/activitypub/__init__.py new file mode 100644 index 0000000..91e7917 --- /dev/null +++ b/circuitforge_core/activitypub/__init__.py @@ -0,0 +1,55 @@ +""" +circuitforge_core.activitypub — ActivityPub actor management, object construction, +HTTP Signature signing, delivery, and Lemmy integration. + +MIT licensed. +""" + +from circuitforge_core.activitypub.actor import ( + CFActor, + generate_rsa_keypair, + load_actor_from_key_file, + make_actor, +) +from circuitforge_core.activitypub.delivery import deliver_activity +from circuitforge_core.activitypub.lemmy import ( + LemmyAuthError, + LemmyClient, + LemmyCommunityNotFound, + LemmyConfig, +) +from circuitforge_core.activitypub.objects import ( + PUBLIC, + make_create, + make_note, + make_offer, + make_request, +) +from circuitforge_core.activitypub.signing import sign_headers, verify_signature + +__all__ = [ + # Actor + "CFActor", + "generate_rsa_keypair", + "load_actor_from_key_file", + "make_actor", + # Objects + "PUBLIC", + "make_note", + "make_offer", + "make_request", + "make_create", + # Signing + "sign_headers", + "verify_signature", + # Delivery + "deliver_activity", + # Lemmy + "LemmyConfig", + "LemmyClient", + "LemmyAuthError", + "LemmyCommunityNotFound", +] + +# inbox is optional (requires fastapi) — import it when needed: +# from circuitforge_core.activitypub.inbox import make_inbox_router diff --git a/circuitforge_core/activitypub/actor.py b/circuitforge_core/activitypub/actor.py new file mode 100644 index 0000000..9f7d4d0 --- /dev/null +++ b/circuitforge_core/activitypub/actor.py @@ -0,0 +1,146 @@ +""" +CFActor — ActivityPub actor identity for CircuitForge products. + +An actor holds RSA key material and its ActivityPub identity URLs. +The private key is in-memory only; to_ap_dict() never includes it. + +MIT licensed. +""" +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass(frozen=True) +class CFActor: + """ActivityPub actor for a CircuitForge product instance.""" + + actor_id: str # e.g. "https://kiwi.circuitforge.tech/actors/kiwi" + username: str + display_name: str + inbox_url: str + outbox_url: str + public_key_pem: str + private_key_pem: str # Never included in to_ap_dict() + icon_url: str | None = None + summary: str | None = None + + def to_ap_dict(self) -> dict: + """Return an ActivityPub Person/Application object (public only).""" + obj: dict = { + "@context": [ + "https://www.w3.org/ns/activitystreams", + "https://w3id.org/security/v1", + ], + "id": self.actor_id, + "type": "Application", + "preferredUsername": self.username, + "name": self.display_name, + "inbox": self.inbox_url, + "outbox": self.outbox_url, + "publicKey": { + "id": f"{self.actor_id}#main-key", + "owner": self.actor_id, + "publicKeyPem": self.public_key_pem, + }, + } + if self.summary: + obj["summary"] = self.summary + if self.icon_url: + obj["icon"] = { + "type": "Image", + "mediaType": "image/png", + "url": self.icon_url, + } + return obj + + +def generate_rsa_keypair(bits: int = 2048) -> tuple[str, str]: + """ + Generate a new RSA keypair. + + Returns: + (private_key_pem, public_key_pem) as PEM-encoded strings. + """ + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import rsa + + private_key = rsa.generate_private_key(public_exponent=65537, key_size=bits) + private_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode() + public_pem = private_key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode() + return private_pem, public_pem + + +def make_actor( + actor_id: str, + username: str, + display_name: str, + private_key_pem: str, + public_key_pem: str, + icon_url: str | None = None, + summary: str | None = None, +) -> CFActor: + """ + Construct a CFActor from an existing keypair. + + Inbox and outbox URLs are derived from actor_id by convention: + {actor_id}/inbox and {actor_id}/outbox + """ + return CFActor( + actor_id=actor_id, + username=username, + display_name=display_name, + inbox_url=f"{actor_id}/inbox", + outbox_url=f"{actor_id}/outbox", + public_key_pem=public_key_pem, + private_key_pem=private_key_pem, + icon_url=icon_url, + summary=summary, + ) + + +def load_actor_from_key_file( + actor_id: str, + username: str, + display_name: str, + private_key_path: str, + icon_url: str | None = None, + summary: str | None = None, +) -> CFActor: + """ + Load a CFActor from a PEM private key file on disk. + + The public key is derived from the private key — no separate public key + file is required. + """ + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.serialization import load_pem_private_key + + pem_bytes = Path(private_key_path).read_bytes() + private_key = load_pem_private_key(pem_bytes, password=None) + private_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode() + public_pem = private_key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode() + return make_actor( + actor_id=actor_id, + username=username, + display_name=display_name, + private_key_pem=private_pem, + public_key_pem=public_pem, + icon_url=icon_url, + summary=summary, + ) diff --git a/circuitforge_core/activitypub/delivery.py b/circuitforge_core/activitypub/delivery.py new file mode 100644 index 0000000..11ad3da --- /dev/null +++ b/circuitforge_core/activitypub/delivery.py @@ -0,0 +1,56 @@ +""" +ActivityPub HTTP delivery — POST a signed activity to a remote inbox. + +Synchronous (uses requests). Async callers can wrap in asyncio.to_thread. + +MIT licensed. +""" +from __future__ import annotations + +import json +from typing import TYPE_CHECKING + +import requests + +from circuitforge_core.activitypub.signing import sign_headers + +if TYPE_CHECKING: + from circuitforge_core.activitypub.actor import CFActor + +ACTIVITY_CONTENT_TYPE = "application/activity+json" + + +def deliver_activity( + activity: dict, + inbox_url: str, + actor: "CFActor", + timeout: float = 10.0, +) -> requests.Response: + """ + POST a signed ActivityPub activity to a remote inbox. + + The activity dict is serialized to JSON, signed with the actor's private + key (HTTP Signatures, rsa-sha256), and delivered via HTTP POST. + + Args: + activity: ActivityPub activity dict (e.g. from make_create()). + inbox_url: Target inbox URL (e.g. "https://lemmy.ml/inbox"). + actor: CFActor whose key signs the request. + timeout: Request timeout in seconds. + + Returns: + The raw requests.Response. Caller decides retry / error policy. + + Raises: + requests.RequestException: On network-level failure. + """ + body = json.dumps(activity).encode() + base_headers = {"Content-Type": ACTIVITY_CONTENT_TYPE} + signed = sign_headers( + method="POST", + url=inbox_url, + headers=base_headers, + body=body, + actor=actor, + ) + return requests.post(inbox_url, data=body, headers=signed, timeout=timeout) diff --git a/circuitforge_core/activitypub/inbox.py b/circuitforge_core/activitypub/inbox.py new file mode 100644 index 0000000..0cbb009 --- /dev/null +++ b/circuitforge_core/activitypub/inbox.py @@ -0,0 +1,128 @@ +""" +ActivityPub inbox router — FastAPI stub for receiving federated activities. + +Products mount this router to handle incoming Create, Follow, Like, Announce, +and other ActivityPub activities from the Fediverse. + +Requires fastapi (optional dep). ImportError is raised with a clear message +when fastapi is not installed. + +NOTE: from __future__ import annotations is intentionally omitted here. +FastAPI resolves route parameter annotations against module globals at +definition time; lazy string annotations break the Request injection. + +MIT licensed. +""" + +import json as _json +import re +from typing import Awaitable, Callable + +# Handler type: receives (activity_dict, request_headers) and returns None +InboxHandler = Callable[[dict, dict], Awaitable[None]] + +# FastAPI imports at module level so annotations resolve correctly. +# Products that don't use the inbox router are not affected by this import +# since circuitforge_core.activitypub.__init__ does NOT import inbox. +try: + from fastapi import APIRouter, HTTPException, Request + from fastapi.responses import JSONResponse + _FASTAPI_AVAILABLE = True +except ImportError: + _FASTAPI_AVAILABLE = False + # Provide stubs so the module can be imported without fastapi + APIRouter = None # type: ignore[assignment,misc] + HTTPException = None # type: ignore[assignment] + Request = None # type: ignore[assignment] + JSONResponse = None # type: ignore[assignment] + + +def make_inbox_router( + handlers: dict[str, InboxHandler] | None = None, + verify_key_fetcher: Callable[[str], Awaitable[str | None]] | None = None, + path: str = "/inbox", +) -> "APIRouter": # type: ignore[name-defined] + """ + Build a FastAPI router that handles ActivityPub inbox POSTs. + + The router: + 1. Parses the JSON body into an activity dict + 2. Optionally verifies the HTTP Signature (when verify_key_fetcher is provided) + 3. Dispatches activity["type"] to the matching handler from *handlers* + 4. Returns 202 Accepted on success, 400 on bad JSON, 401 on bad signature + + Args: + handlers: Dict mapping activity type strings (e.g. "Create", + "Follow") to async handler callables. + verify_key_fetcher: Async callable that takes a keyId URL and returns the + actor's public key PEM, or None if not found. + When None, signature verification is skipped (dev mode). + path: Inbox endpoint path (default "/inbox"). + + Returns: + FastAPI APIRouter. + + Example:: + + async def on_create(activity: dict, headers: dict) -> None: + print("Received Create:", activity) + + router = make_inbox_router(handlers={"Create": on_create}) + app.include_router(router, prefix="/actors/kiwi") + """ + if not _FASTAPI_AVAILABLE: + raise ImportError( + "circuitforge_core.activitypub.inbox requires fastapi. " + "Install with: pip install fastapi" + ) + + from circuitforge_core.activitypub.signing import verify_signature + + router = APIRouter() + _handlers: dict[str, InboxHandler] = handlers or {} + + @router.post(path, status_code=202) + async def inbox_endpoint(request: Request) -> JSONResponse: + # Parse body — read bytes first (needed for signature verification), + # then decode JSON manually to avoid double-read issues. + try: + body = await request.body() + activity = _json.loads(body) + except Exception: + raise HTTPException(status_code=400, detail="Invalid JSON body.") + + # Optional signature verification + if verify_key_fetcher is not None: + sig_header = request.headers.get("Signature", "") + key_id = _parse_key_id(sig_header) + if not key_id: + raise HTTPException(status_code=401, detail="Missing or malformed Signature header.") + public_key_pem = await verify_key_fetcher(key_id) + if public_key_pem is None: + raise HTTPException(status_code=401, detail=f"Unknown keyId: {key_id}") + ok = verify_signature( + headers=dict(request.headers), + method="POST", + path=request.url.path, + body=body, + public_key_pem=public_key_pem, + ) + if not ok: + raise HTTPException(status_code=401, detail="Signature verification failed.") + + activity_type = activity.get("type", "") + handler = _handlers.get(activity_type) + if handler is None: + # Unknown types are silently accepted per AP spec — return 202 + return JSONResponse(status_code=202, content={"status": "accepted", "type": activity_type}) + + await handler(activity, dict(request.headers)) + return JSONResponse(status_code=202, content={"status": "accepted"}) + + return router + + +def _parse_key_id(sig_header: str) -> str | None: + """Extract keyId value from a Signature header string.""" + match = re.search(r'keyId="([^"]+)"', sig_header) + return match.group(1) if match else None diff --git a/circuitforge_core/activitypub/lemmy.py b/circuitforge_core/activitypub/lemmy.py new file mode 100644 index 0000000..cfe6114 --- /dev/null +++ b/circuitforge_core/activitypub/lemmy.py @@ -0,0 +1,173 @@ +""" +Lemmy REST API client for posting to Lemmy communities. + +Uses JWT authentication (Lemmy v0.19+ API). Does not require ActivityPub +federation setup — the Lemmy REST API is simpler and more reliable for +the initial integration. + +MIT licensed. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +import requests + + +class LemmyAuthError(Exception): + """Raised when Lemmy login fails.""" + + +class LemmyCommunityNotFound(Exception): + """Raised when a community cannot be resolved by name.""" + + +@dataclass(frozen=True) +class LemmyConfig: + """Connection config for a Lemmy instance.""" + + instance_url: str # e.g. "https://lemmy.ml" (no trailing slash) + username: str + password: str # Load from env/config; never hardcode + + +class LemmyClient: + """ + Lemmy REST API client. + + Usage:: + + config = LemmyConfig(instance_url="https://lemmy.ml", username="bot", password="...") + client = LemmyClient(config) + client.login() + community_id = client.resolve_community("!cooking@lemmy.world") + client.post_to_community(community_id, title="Fresh pesto recipe", body="...") + """ + + def __init__(self, config: LemmyConfig) -> None: + self._config = config + self._jwt: str | None = None + self._session = requests.Session() + self._session.headers.update({"Content-Type": "application/json"}) + + @property + def _api(self) -> str: + return f"{self._config.instance_url.rstrip('/')}/api/v3" + + def _auth_headers(self) -> dict[str, str]: + if not self._jwt: + raise LemmyAuthError("Not logged in — call login() first.") + return {"Authorization": f"Bearer {self._jwt}"} + + def login(self) -> None: + """ + Authenticate with the Lemmy instance and store the JWT. + + Raises: + LemmyAuthError: If credentials are rejected or the request fails. + """ + resp = self._session.post( + f"{self._api}/user/login", + json={"username_or_email": self._config.username, "password": self._config.password}, + timeout=10, + ) + if resp.status_code != 200: + raise LemmyAuthError( + f"Lemmy login failed ({resp.status_code}): {resp.text[:200]}" + ) + data = resp.json() + token = data.get("jwt") + if not token: + raise LemmyAuthError("Lemmy login response missing 'jwt' field.") + self._jwt = token + + def resolve_community(self, name: str) -> int: + """ + Resolve a community name or address to its numeric Lemmy ID. + + Accepts: + - Bare name: "cooking" + - Fediverse address: "!cooking@lemmy.world" + - Display name search (best-effort) + + Args: + name: Community identifier. + + Returns: + Numeric community ID. + + Raises: + LemmyCommunityNotFound: If not found or multiple matches are ambiguous. + LemmyAuthError: If not logged in. + """ + # Strip leading ! for address lookups + lookup = name.lstrip("!") + resp = self._session.get( + f"{self._api}/search", + params={"q": lookup, "type_": "Communities", "limit": 5}, + headers=self._auth_headers(), + timeout=10, + ) + if resp.status_code != 200: + raise LemmyCommunityNotFound( + f"Community search failed ({resp.status_code}): {resp.text[:200]}" + ) + communities = resp.json().get("communities", []) + if not communities: + raise LemmyCommunityNotFound(f"No communities found for '{name}'.") + # Prefer exact actor_id match (e.g. !cooking@lemmy.world) + for item in communities: + view = item.get("community", {}) + if "@" in lookup: + actor_id: str = view.get("actor_id", "") + if lookup.lower() in actor_id.lower(): + return int(view["id"]) + else: + if view.get("name", "").lower() == lookup.lower(): + return int(view["id"]) + # Fall back to first result + return int(communities[0]["community"]["id"]) + + def post_to_community( + self, + community_id: int, + title: str, + body: str, + url: str | None = None, + nsfw: bool = False, + ) -> dict[str, Any]: + """ + Create a post in a Lemmy community. + + Args: + community_id: Numeric community ID (from resolve_community()). + title: Post title. + body: Markdown post body. + url: Optional external URL to attach. + nsfw: Mark NSFW (default False). + + Returns: + Lemmy API response dict (contains 'post_view', etc.). + + Raises: + LemmyAuthError: If not logged in. + requests.RequestException: On network failure. + """ + payload: dict[str, Any] = { + "community_id": community_id, + "name": title, + "body": body, + "nsfw": nsfw, + } + if url: + payload["url"] = url + + resp = self._session.post( + f"{self._api}/post", + json=payload, + headers=self._auth_headers(), + timeout=15, + ) + resp.raise_for_status() + return resp.json() diff --git a/circuitforge_core/activitypub/objects.py b/circuitforge_core/activitypub/objects.py new file mode 100644 index 0000000..aece744 --- /dev/null +++ b/circuitforge_core/activitypub/objects.py @@ -0,0 +1,168 @@ +""" +ActivityStreams 2.0 object constructors. + +All functions return plain dicts (no classes) — they are serialized to JSON +for delivery. IDs are minted with UUID4 so callers don't need to track them. + +Custom types: +- "Offer" — AS2 Offer (Rook exchange offers) +- "Request" — custom CF extension (Rook exchange requests); not in core AS2 + +MIT licensed. +""" +from __future__ import annotations + +import uuid +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from circuitforge_core.activitypub.actor import CFActor + +# AS2 public address (all followers) +PUBLIC = "https://www.w3.org/ns/activitystreams#Public" + +# Custom context extension for CF-specific types +_CF_CONTEXT = "https://circuitforge.tech/ns/activitystreams" + + +def _now_iso() -> str: + return datetime.now(tz=timezone.utc).isoformat().replace("+00:00", "Z") + + +def _mint_id(actor_id: str, type_slug: str) -> str: + """Generate a unique ID scoped to the actor's namespace.""" + return f"{actor_id}/{type_slug}/{uuid.uuid4().hex}" + + +def make_note( + actor_id: str, + content: str, + to: list[str] | None = None, + cc: list[str] | None = None, + in_reply_to: str | None = None, + tag: list[dict] | None = None, + published: datetime | None = None, +) -> dict: + """ + Construct an AS2 Note object. + + Args: + actor_id: The actor's ID URL (attributedTo). + content: HTML or plain-text body. + to: Direct recipients (defaults to [PUBLIC]). + cc: CC recipients. + in_reply_to: URL of the parent note when replying. + tag: Mention/hashtag tag dicts. + published: Post timestamp (defaults to now UTC). + """ + note: dict = { + "@context": "https://www.w3.org/ns/activitystreams", + "id": _mint_id(actor_id, "notes"), + "type": "Note", + "attributedTo": actor_id, + "content": content, + "to": to if to is not None else [PUBLIC], + "published": published.isoformat().replace("+00:00", "Z") if published else _now_iso(), + } + if cc: + note["cc"] = cc + if in_reply_to: + note["inReplyTo"] = in_reply_to + if tag: + note["tag"] = tag + return note + + +def make_offer( + actor_id: str, + summary: str, + content: str, + to: list[str] | None = None, + cc: list[str] | None = None, +) -> dict: + """ + Construct an AS2 Offer object (Rook exchange offers). + + The Offer type is part of core ActivityStreams 2.0. + + Args: + actor_id: The actor's ID URL (actor field). + summary: Short one-line description (used as title in Lemmy). + content: Full HTML/plain-text description. + to: Recipients (defaults to [PUBLIC]). + cc: CC recipients. + """ + return { + "@context": "https://www.w3.org/ns/activitystreams", + "id": _mint_id(actor_id, "offers"), + "type": "Offer", + "actor": actor_id, + "summary": summary, + "content": content, + "to": to if to is not None else [PUBLIC], + "cc": cc or [], + "published": _now_iso(), + } + + +def make_request( + actor_id: str, + summary: str, + content: str, + to: list[str] | None = None, + cc: list[str] | None = None, +) -> dict: + """ + Construct a CF-extension Request object (Rook exchange requests). + + "Request" is not in core AS2 vocabulary — the CF namespace context + extension is included so federating servers don't reject it. + + Args: + actor_id: The actor's ID URL. + summary: Short one-line description. + content: Full HTML/plain-text description. + to: Recipients (defaults to [PUBLIC]). + cc: CC recipients. + """ + return { + "@context": [ + "https://www.w3.org/ns/activitystreams", + _CF_CONTEXT, + ], + "id": _mint_id(actor_id, "requests"), + "type": "Request", + "actor": actor_id, + "summary": summary, + "content": content, + "to": to if to is not None else [PUBLIC], + "cc": cc or [], + "published": _now_iso(), + } + + +def make_create(actor: "CFActor", obj: dict) -> dict: + """ + Wrap any object dict in an AS2 Create activity. + + The Create activity's id, actor, to, cc, and published fields are + derived from the wrapped object where available. + + Args: + actor: The CFActor originating the Create. + obj: An object dict (Note, Offer, Request, etc.). + """ + # Propagate context from inner object if it's a list (custom types) + ctx = obj.get("@context", "https://www.w3.org/ns/activitystreams") + + return { + "@context": ctx, + "id": _mint_id(actor.actor_id, "activities"), + "type": "Create", + "actor": actor.actor_id, + "to": obj.get("to", [PUBLIC]), + "cc": obj.get("cc", []), + "published": obj.get("published", _now_iso()), + "object": obj, + } diff --git a/circuitforge_core/activitypub/signing.py b/circuitforge_core/activitypub/signing.py new file mode 100644 index 0000000..208ed53 --- /dev/null +++ b/circuitforge_core/activitypub/signing.py @@ -0,0 +1,197 @@ +""" +HTTP Signatures for ActivityPub (draft-cavage-http-signatures-08). + +This is the signing convention used by Mastodon, Lemmy, and the broader +ActivityPub ecosystem. It is distinct from the newer RFC 9421. + +Signing algorithm: rsa-sha256 +Signed headers: (request-target) host date [digest] content-type +Digest header: SHA-256 of request body (when body is present) +keyId: {actor.actor_id}#main-key + +MIT licensed. +""" +from __future__ import annotations + +import base64 +import hashlib +import re +from email.utils import formatdate +from typing import TYPE_CHECKING +from urllib.parse import urlparse + +if TYPE_CHECKING: + from circuitforge_core.activitypub.actor import CFActor + + +def _rfc1123_now() -> str: + """Return current UTC time in RFC 1123 format as required by HTTP Date header.""" + return formatdate(usegmt=True) + + +def _sha256_digest(body: bytes) -> str: + """Return 'SHA-256=' digest string for body.""" + digest = hashlib.sha256(body).digest() + return f"SHA-256={base64.b64encode(digest).decode()}" + + +def sign_headers( + method: str, + url: str, + headers: dict, + body: bytes | None, + actor: "CFActor", # type: ignore[name-defined] +) -> dict: + """ + Return a new headers dict with Date, Digest (if body), and Signature added. + + The input *headers* dict is not mutated. + + Args: + method: HTTP method string (e.g. "POST"), case-insensitive. + url: Full request URL. + headers: Existing headers dict (Content-Type, etc.). + body: Request body bytes, or None for bodyless requests. + actor: CFActor whose private key signs the request. + + Returns: + New dict with all original headers plus Date, Digest (if body), Signature. + """ + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import padding + from cryptography.hazmat.primitives.serialization import load_pem_private_key + + parsed = urlparse(url) + host = parsed.netloc + path = parsed.path or "/" + if parsed.query: + path = f"{path}?{parsed.query}" + + method_lower = method.lower() + date = _rfc1123_now() + + out = dict(headers) + out["Date"] = date + out["Host"] = host + + signed_header_names = ["(request-target)", "host", "date"] + + if body is not None: + digest = _sha256_digest(body) + out["Digest"] = digest + signed_header_names.append("digest") + + if "Content-Type" in out: + signed_header_names.append("content-type") + + # Build the signature string — header names in the spec are lowercase, + # but the dict uses Title-Case HTTP convention, so look up case-insensitively. + def _ci_get(d: dict, key: str) -> str: + for k, v in d.items(): + if k.lower() == key.lower(): + return v + raise KeyError(key) + + lines = [] + for name in signed_header_names: + if name == "(request-target)": + lines.append(f"(request-target): {method_lower} {path}") + else: + lines.append(f"{name}: {_ci_get(out, name)}") + + signature_string = "\n".join(lines).encode() + + private_key = load_pem_private_key(actor.private_key_pem.encode(), password=None) + raw_sig = private_key.sign(signature_string, padding.PKCS1v15(), hashes.SHA256()) + b64_sig = base64.b64encode(raw_sig).decode() + + key_id = f"{actor.actor_id}#main-key" + headers_param = " ".join(signed_header_names) + + out["Signature"] = ( + f'keyId="{key_id}",' + f'algorithm="rsa-sha256",' + f'headers="{headers_param}",' + f'signature="{b64_sig}"' + ) + + return out + + +def verify_signature( + headers: dict, + method: str, + path: str, + body: bytes | None, + public_key_pem: str, +) -> bool: + """ + Verify an incoming ActivityPub HTTP Signature. + + Returns False on any parse or verification failure — never raises. + + Args: + headers: Request headers dict (case-insensitive lookup attempted). + method: HTTP method (e.g. "POST"). + path: Request path (e.g. "/actors/kiwi/inbox"). + body: Raw request body bytes, or None. + public_key_pem: PEM-encoded RSA public key of the signing actor. + """ + from cryptography.exceptions import InvalidSignature + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import padding + from cryptography.hazmat.primitives.serialization import load_pem_public_key + + try: + # Case-insensitive header lookup helper + def _get(name: str) -> str | None: + name_lower = name.lower() + for k, v in headers.items(): + if k.lower() == name_lower: + return v + return None + + sig_header = _get("Signature") + if not sig_header: + return False + + # Parse Signature header key=value pairs + params: dict[str, str] = {} + for match in re.finditer(r'(\w+)="([^"]*)"', sig_header): + params[match.group(1)] = match.group(2) + + if "signature" not in params or "headers" not in params: + return False + + signed_header_names = params["headers"].split() + method_lower = method.lower() + + lines = [] + for name in signed_header_names: + if name == "(request-target)": + lines.append(f"(request-target): {method_lower} {path}") + else: + val = _get(name) + if val is None: + return False + lines.append(f"{name}: {val}") + + signature_string = "\n".join(lines).encode() + raw_sig = base64.b64decode(params["signature"]) + + public_key = load_pem_public_key(public_key_pem.encode()) + public_key.verify(raw_sig, signature_string, padding.PKCS1v15(), hashes.SHA256()) + + # Also verify the Digest header matches the actual body, if both are present. + # Signing the Digest header proves it wasn't swapped; re-computing it proves + # the body wasn't replaced after signing. + digest_val = _get("Digest") + if digest_val and body is not None: + expected = _sha256_digest(body) + if digest_val != expected: + return False + + return True + + except (InvalidSignature, Exception): + return False diff --git a/pyproject.toml b/pyproject.toml index d466510..1fa4138 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "circuitforge-core" -version = "0.13.0" +version = "0.14.0" description = "Shared scaffold for CircuitForge products (MIT)" requires-python = ">=3.11" dependencies = [ diff --git a/tests/test_activitypub/__init__.py b/tests/test_activitypub/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_activitypub/test_actor.py b/tests/test_activitypub/test_actor.py new file mode 100644 index 0000000..71ac6d8 --- /dev/null +++ b/tests/test_activitypub/test_actor.py @@ -0,0 +1,152 @@ +"""Tests for CFActor and key generation utilities.""" +import pytest +from circuitforge_core.activitypub.actor import ( + CFActor, + generate_rsa_keypair, + make_actor, + load_actor_from_key_file, +) + + +@pytest.fixture(scope="module") +def keypair(): + return generate_rsa_keypair(bits=1024) # small key for speed in tests + + +@pytest.fixture(scope="module") +def actor(keypair): + priv, pub = keypair + return make_actor( + actor_id="https://kiwi.example.com/actors/kiwi", + username="kiwi", + display_name="Kiwi Pantry Bot", + private_key_pem=priv, + public_key_pem=pub, + summary="Community recipe posts from Kiwi.", + ) + + +class TestGenerateRsaKeypair: + def test_returns_two_strings(self, keypair): + priv, pub = keypair + assert isinstance(priv, str) + assert isinstance(pub, str) + + def test_private_key_pem_header(self, keypair): + priv, _ = keypair + assert "BEGIN PRIVATE KEY" in priv + + def test_public_key_pem_header(self, keypair): + _, pub = keypair + assert "BEGIN PUBLIC KEY" in pub + + def test_keys_are_different(self, keypair): + priv, pub = keypair + assert priv != pub + + +class TestMakeActor: + def test_actor_id_stored(self, actor): + assert actor.actor_id == "https://kiwi.example.com/actors/kiwi" + + def test_inbox_derived_from_actor_id(self, actor): + assert actor.inbox_url == "https://kiwi.example.com/actors/kiwi/inbox" + + def test_outbox_derived_from_actor_id(self, actor): + assert actor.outbox_url == "https://kiwi.example.com/actors/kiwi/outbox" + + def test_username(self, actor): + assert actor.username == "kiwi" + + def test_display_name(self, actor): + assert actor.display_name == "Kiwi Pantry Bot" + + def test_summary_stored(self, actor): + assert actor.summary == "Community recipe posts from Kiwi." + + def test_icon_url_defaults_none(self, keypair): + priv, pub = keypair + a = make_actor("https://x.com/a", "a", "A", priv, pub) + assert a.icon_url is None + + def test_actor_is_frozen(self, actor): + with pytest.raises((AttributeError, TypeError)): + actor.username = "changed" # type: ignore[misc] + + +class TestToApDict: + def test_type_is_application(self, actor): + d = actor.to_ap_dict() + assert d["type"] == "Application" + + def test_id_matches_actor_id(self, actor): + d = actor.to_ap_dict() + assert d["id"] == actor.actor_id + + def test_preferred_username(self, actor): + d = actor.to_ap_dict() + assert d["preferredUsername"] == "kiwi" + + def test_public_key_present(self, actor): + d = actor.to_ap_dict() + assert "publicKey" in d + assert d["publicKey"]["publicKeyPem"] == actor.public_key_pem + + def test_key_id_includes_main_key_fragment(self, actor): + d = actor.to_ap_dict() + assert d["publicKey"]["id"].endswith("#main-key") + + def test_private_key_not_in_dict(self, actor): + d = actor.to_ap_dict() + import json + serialized = json.dumps(d) + assert "PRIVATE KEY" not in serialized + + def test_context_includes_security(self, actor): + d = actor.to_ap_dict() + ctx = d["@context"] + assert "https://w3id.org/security/v1" in ctx + + def test_summary_included_when_set(self, actor): + d = actor.to_ap_dict() + assert d["summary"] == actor.summary + + def test_summary_omitted_when_none(self, keypair): + priv, pub = keypair + a = make_actor("https://x.com/a", "a", "A", priv, pub) + d = a.to_ap_dict() + assert "summary" not in d + + def test_icon_included_when_set(self, keypair): + priv, pub = keypair + a = make_actor("https://x.com/a", "a", "A", priv, pub, icon_url="https://x.com/icon.png") + d = a.to_ap_dict() + assert d["icon"]["url"] == "https://x.com/icon.png" + + def test_icon_omitted_when_none(self, actor): + d = actor.to_ap_dict() + assert "icon" not in d + + +class TestLoadActorFromKeyFile: + def test_loads_and_derives_public_key(self, keypair, tmp_path): + priv, pub = keypair + key_file = tmp_path / "actor.pem" + key_file.write_text(priv) + loaded = load_actor_from_key_file( + actor_id="https://test.example/actors/x", + username="x", + display_name="X", + private_key_path=str(key_file), + ) + assert "BEGIN PUBLIC KEY" in loaded.public_key_pem + assert loaded.actor_id == "https://test.example/actors/x" + + def test_missing_file_raises(self, tmp_path): + with pytest.raises(FileNotFoundError): + load_actor_from_key_file( + actor_id="https://x.com/a", + username="a", + display_name="A", + private_key_path=str(tmp_path / "missing.pem"), + ) diff --git a/tests/test_activitypub/test_delivery.py b/tests/test_activitypub/test_delivery.py new file mode 100644 index 0000000..34f72e2 --- /dev/null +++ b/tests/test_activitypub/test_delivery.py @@ -0,0 +1,61 @@ +"""Tests for deliver_activity — mocked at requests layer.""" +import json +import pytest +from unittest.mock import MagicMock, patch + +from circuitforge_core.activitypub.actor import generate_rsa_keypair, make_actor +from circuitforge_core.activitypub.delivery import deliver_activity +from circuitforge_core.activitypub.objects import make_note, make_create + +ACTOR_ID = "https://kiwi.example.com/actors/kiwi" +INBOX_URL = "https://lemmy.example.com/inbox" + + +@pytest.fixture(scope="module") +def actor(): + priv, pub = generate_rsa_keypair(bits=1024) + return make_actor(ACTOR_ID, "kiwi", "Kiwi", priv, pub) + + +@pytest.fixture(scope="module") +def activity(actor): + note = make_note(ACTOR_ID, "Hello Lemmy!") + return make_create(actor, note) + + +class TestDeliverActivity: + def test_posts_to_inbox_url(self, actor, activity): + mock_resp = MagicMock(status_code=202) + with patch("circuitforge_core.activitypub.delivery.requests.post", return_value=mock_resp) as mock_post: + deliver_activity(activity, INBOX_URL, actor) + mock_post.assert_called_once() + call_url = mock_post.call_args[0][0] + assert call_url == INBOX_URL + + def test_content_type_is_activity_json(self, actor, activity): + mock_resp = MagicMock(status_code=202) + with patch("circuitforge_core.activitypub.delivery.requests.post", return_value=mock_resp) as mock_post: + deliver_activity(activity, INBOX_URL, actor) + headers = mock_post.call_args[1]["headers"] + assert headers.get("Content-Type") == "application/activity+json" + + def test_body_is_json_serialized(self, actor, activity): + mock_resp = MagicMock(status_code=202) + with patch("circuitforge_core.activitypub.delivery.requests.post", return_value=mock_resp) as mock_post: + deliver_activity(activity, INBOX_URL, actor) + body = mock_post.call_args[1]["data"] + parsed = json.loads(body) + assert parsed["type"] == "Create" + + def test_signature_header_present(self, actor, activity): + mock_resp = MagicMock(status_code=202) + with patch("circuitforge_core.activitypub.delivery.requests.post", return_value=mock_resp) as mock_post: + deliver_activity(activity, INBOX_URL, actor) + headers = mock_post.call_args[1]["headers"] + assert "Signature" in headers + + def test_returns_response(self, actor, activity): + mock_resp = MagicMock(status_code=202) + with patch("circuitforge_core.activitypub.delivery.requests.post", return_value=mock_resp): + result = deliver_activity(activity, INBOX_URL, actor) + assert result is mock_resp diff --git a/tests/test_activitypub/test_inbox.py b/tests/test_activitypub/test_inbox.py new file mode 100644 index 0000000..c122833 --- /dev/null +++ b/tests/test_activitypub/test_inbox.py @@ -0,0 +1,145 @@ +"""Tests for the ActivityPub inbox FastAPI router.""" +import json +import pytest +from unittest.mock import AsyncMock, MagicMock + +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from circuitforge_core.activitypub.actor import generate_rsa_keypair, make_actor +from circuitforge_core.activitypub.inbox import make_inbox_router +from circuitforge_core.activitypub.signing import sign_headers + +ACTOR_ID = "https://kiwi.example.com/actors/kiwi" + + +@pytest.fixture(scope="module") +def actor(): + priv, pub = generate_rsa_keypair(bits=1024) + return make_actor(ACTOR_ID, "kiwi", "Kiwi", priv, pub) + + +@pytest.fixture +def app_no_verify(): + """App with inbox router, no signature verification (dev mode).""" + received = [] + + async def on_create(activity, headers): + received.append(activity) + + router = make_inbox_router(handlers={"Create": on_create}) + app = FastAPI() + app.include_router(router) + app._received = received + return app + + +@pytest.fixture +def client_no_verify(app_no_verify): + return TestClient(app_no_verify) + + +class TestInboxNoVerification: + def test_202_on_known_activity_type(self, client_no_verify): + resp = client_no_verify.post( + "/inbox", + json={"type": "Create", "actor": ACTOR_ID, "object": {}}, + ) + assert resp.status_code == 202 + + def test_202_on_unknown_activity_type(self, client_no_verify): + resp = client_no_verify.post( + "/inbox", + json={"type": "Undo", "actor": ACTOR_ID}, + ) + assert resp.status_code == 202 + + def test_response_body_contains_accepted(self, client_no_verify): + resp = client_no_verify.post( + "/inbox", + json={"type": "Create", "actor": ACTOR_ID, "object": {}}, + ) + assert resp.json()["status"] == "accepted" + + def test_400_on_invalid_json(self, client_no_verify): + resp = client_no_verify.post( + "/inbox", + data=b"not json at all", + headers={"Content-Type": "application/json"}, + ) + assert resp.status_code == 400 + + def test_handler_called_with_activity(self, app_no_verify, client_no_verify): + app_no_verify._received.clear() + activity = {"type": "Create", "actor": ACTOR_ID, "object": {"type": "Note"}} + client_no_verify.post("/inbox", json=activity) + assert len(app_no_verify._received) == 1 + assert app_no_verify._received[0]["type"] == "Create" + + def test_no_handlers_still_returns_202(self): + router = make_inbox_router() + app = FastAPI() + app.include_router(router) + with TestClient(app) as c: + resp = c.post("/inbox", json={"type": "Follow"}) + assert resp.status_code == 202 + + +class TestInboxWithSignatureVerification: + def test_valid_signature_accepted(self, actor): + async def key_fetcher(key_id: str): + return actor.public_key_pem + + router = make_inbox_router(handlers={}, verify_key_fetcher=key_fetcher) + app = FastAPI() + app.include_router(router) + + activity = {"type": "Create", "actor": ACTOR_ID} + body = json.dumps(activity).encode() + headers = sign_headers( + method="POST", + url="http://testserver/inbox", + headers={"Content-Type": "application/activity+json"}, + body=body, + actor=actor, + ) + + with TestClient(app) as c: + resp = c.post("/inbox", content=body, headers=headers) + assert resp.status_code == 202 + + def test_missing_signature_returns_401(self, actor): + async def key_fetcher(key_id: str): + return actor.public_key_pem + + router = make_inbox_router(handlers={}, verify_key_fetcher=key_fetcher) + app = FastAPI() + app.include_router(router) + + with TestClient(app) as c: + resp = c.post("/inbox", json={"type": "Create"}) + assert resp.status_code == 401 + + def test_unknown_key_id_returns_401(self, actor): + async def key_fetcher(key_id: str): + return None # Unknown actor + + router = make_inbox_router(handlers={}, verify_key_fetcher=key_fetcher) + app = FastAPI() + app.include_router(router) + + activity = {"type": "Create"} + body = json.dumps(activity).encode() + headers = sign_headers("POST", "http://testserver/inbox", {}, body, actor) + + with TestClient(app) as c: + resp = c.post("/inbox", content=body, headers=headers) + assert resp.status_code == 401 + + +class TestMakeInboxRouterImportError: + def test_raises_on_missing_fastapi(self, monkeypatch): + import circuitforge_core.activitypub.inbox as inbox_mod + monkeypatch.setattr(inbox_mod, "_FASTAPI_AVAILABLE", False) + with pytest.raises(ImportError, match="fastapi"): + make_inbox_router() diff --git a/tests/test_activitypub/test_lemmy.py b/tests/test_activitypub/test_lemmy.py new file mode 100644 index 0000000..b081be7 --- /dev/null +++ b/tests/test_activitypub/test_lemmy.py @@ -0,0 +1,163 @@ +"""Tests for LemmyClient — mocked at the requests layer.""" +import pytest +from unittest.mock import MagicMock, patch + +from circuitforge_core.activitypub.lemmy import ( + LemmyAuthError, + LemmyClient, + LemmyCommunityNotFound, + LemmyConfig, +) + +CONFIG = LemmyConfig( + instance_url="https://lemmy.example.com", + username="kiwi_bot", + password="s3cret", +) + + +@pytest.fixture +def client(): + return LemmyClient(CONFIG) + + +def _mock_response(status_code: int, json_data: dict): + resp = MagicMock() + resp.status_code = status_code + resp.json.return_value = json_data + resp.text = str(json_data) + resp.raise_for_status = MagicMock() + return resp + + +class TestLemmyConfig: + def test_fields_stored(self): + assert CONFIG.instance_url == "https://lemmy.example.com" + assert CONFIG.username == "kiwi_bot" + assert CONFIG.password == "s3cret" + + def test_frozen(self): + with pytest.raises((AttributeError, TypeError)): + CONFIG.username = "other" # type: ignore[misc] + + +class TestLogin: + def test_successful_login_stores_jwt(self, client): + with patch.object(client._session, "post", return_value=_mock_response(200, {"jwt": "token123"})): + client.login() + assert client._jwt == "token123" + + def test_401_raises_lemmy_auth_error(self, client): + with patch.object(client._session, "post", return_value=_mock_response(401, {})): + with pytest.raises(LemmyAuthError): + client.login() + + def test_missing_jwt_field_raises(self, client): + with patch.object(client._session, "post", return_value=_mock_response(200, {})): + with pytest.raises(LemmyAuthError, match="missing 'jwt'"): + client.login() + + +class TestResolveCommunity: + def _logged_in_client(self): + c = LemmyClient(CONFIG) + c._jwt = "fake-jwt" + return c + + def test_resolves_exact_name_match(self): + client = self._logged_in_client() + community_resp = { + "communities": [ + {"community": {"id": 42, "name": "cooking", "actor_id": "https://lemmy.world/c/cooking"}} + ] + } + with patch.object(client._session, "get", return_value=_mock_response(200, community_resp)): + cid = client.resolve_community("cooking") + assert cid == 42 + + def test_resolves_fediverse_address(self): + client = self._logged_in_client() + community_resp = { + "communities": [ + {"community": {"id": 99, "name": "cooking", "actor_id": "https://lemmy.world/c/cooking"}} + ] + } + with patch.object(client._session, "get", return_value=_mock_response(200, community_resp)): + cid = client.resolve_community("!cooking@lemmy.world") + assert cid == 99 + + def test_empty_results_raises_not_found(self): + client = self._logged_in_client() + with patch.object(client._session, "get", return_value=_mock_response(200, {"communities": []})): + with pytest.raises(LemmyCommunityNotFound): + client.resolve_community("nonexistent") + + def test_search_failure_raises_not_found(self): + client = self._logged_in_client() + with patch.object(client._session, "get", return_value=_mock_response(500, {})): + with pytest.raises(LemmyCommunityNotFound): + client.resolve_community("cooking") + + def test_not_logged_in_raises_auth_error(self, client): + with pytest.raises(LemmyAuthError): + client.resolve_community("cooking") + + +class TestPostToCommunity: + def _logged_in_client(self): + c = LemmyClient(CONFIG) + c._jwt = "fake-jwt" + return c + + def test_successful_post_returns_dict(self): + client = self._logged_in_client() + post_resp = {"post_view": {"post": {"id": 123, "name": "Recipe post"}}} + with patch.object(client._session, "post", return_value=_mock_response(200, post_resp)): + result = client.post_to_community(42, "Recipe post", "Great recipe!") + assert result == post_resp + + def test_post_includes_title_and_body(self): + client = self._logged_in_client() + post_resp = {"post_view": {}} + captured = {} + + def fake_post(url, json=None, headers=None, timeout=None): + captured["json"] = json + return _mock_response(200, post_resp) + + with patch.object(client._session, "post", side_effect=fake_post): + client.post_to_community(42, "My Title", "My body") + + assert captured["json"]["name"] == "My Title" + assert captured["json"]["body"] == "My body" + assert captured["json"]["community_id"] == 42 + + def test_optional_url_included_when_set(self): + client = self._logged_in_client() + captured = {} + + def fake_post(url, json=None, headers=None, timeout=None): + captured["json"] = json + return _mock_response(200, {}) + + with patch.object(client._session, "post", side_effect=fake_post): + client.post_to_community(42, "Title", "Body", url="https://example.com") + + assert captured["json"]["url"] == "https://example.com" + + def test_url_absent_when_not_set(self): + client = self._logged_in_client() + captured = {} + + def fake_post(url, json=None, headers=None, timeout=None): + captured["json"] = json + return _mock_response(200, {}) + + with patch.object(client._session, "post", side_effect=fake_post): + client.post_to_community(42, "Title", "Body") + + assert "url" not in captured["json"] + + def test_not_logged_in_raises_auth_error(self, client): + with pytest.raises(LemmyAuthError): + client.post_to_community(42, "Title", "Body") diff --git a/tests/test_activitypub/test_objects.py b/tests/test_activitypub/test_objects.py new file mode 100644 index 0000000..689ed3b --- /dev/null +++ b/tests/test_activitypub/test_objects.py @@ -0,0 +1,178 @@ +"""Tests for ActivityStreams 2.0 object constructors.""" +import pytest +from circuitforge_core.activitypub.actor import generate_rsa_keypair, make_actor +from circuitforge_core.activitypub.objects import ( + PUBLIC, + make_create, + make_note, + make_offer, + make_request, +) + +ACTOR_ID = "https://rook.example.com/actors/rook" + + +@pytest.fixture(scope="module") +def actor(): + priv, pub = generate_rsa_keypair(bits=1024) + return make_actor(ACTOR_ID, "rook", "Rook Exchange", priv, pub) + + +class TestPublicConstant: + def test_is_as_public_uri(self): + assert PUBLIC == "https://www.w3.org/ns/activitystreams#Public" + + +class TestMakeNote: + def test_type_is_note(self): + assert make_note(ACTOR_ID, "Hello")["type"] == "Note" + + def test_attributed_to_actor(self): + n = make_note(ACTOR_ID, "Hello") + assert n["attributedTo"] == ACTOR_ID + + def test_content_stored(self): + n = make_note(ACTOR_ID, "Hello world") + assert n["content"] == "Hello world" + + def test_default_to_is_public(self): + n = make_note(ACTOR_ID, "Hello") + assert PUBLIC in n["to"] + + def test_custom_to(self): + n = make_note(ACTOR_ID, "Hello", to=["https://other.example/inbox"]) + assert "https://other.example/inbox" in n["to"] + + def test_cc_present_when_set(self): + n = make_note(ACTOR_ID, "Hello", cc=["https://x.com/followers"]) + assert n["cc"] == ["https://x.com/followers"] + + def test_cc_absent_when_not_set(self): + n = make_note(ACTOR_ID, "Hello") + assert "cc" not in n + + def test_in_reply_to_included(self): + n = make_note(ACTOR_ID, "Reply", in_reply_to="https://mastodon.social/notes/123") + assert n["inReplyTo"] == "https://mastodon.social/notes/123" + + def test_in_reply_to_absent_by_default(self): + assert "inReplyTo" not in make_note(ACTOR_ID, "Hello") + + def test_tag_included(self): + tag = [{"type": "Mention", "href": "https://mastodon.social/users/alice"}] + n = make_note(ACTOR_ID, "Hi @alice", tag=tag) + assert n["tag"] == tag + + def test_id_is_unique(self): + a = make_note(ACTOR_ID, "Hello") + b = make_note(ACTOR_ID, "Hello") + assert a["id"] != b["id"] + + def test_id_scoped_to_actor(self): + n = make_note(ACTOR_ID, "Hello") + assert n["id"].startswith(ACTOR_ID) + + def test_published_present(self): + n = make_note(ACTOR_ID, "Hello") + assert "published" in n + assert n["published"].endswith("Z") + + def test_custom_published(self): + from datetime import datetime, timezone + ts = datetime(2026, 1, 15, 12, 0, 0, tzinfo=timezone.utc) + n = make_note(ACTOR_ID, "Hello", published=ts) + assert "2026-01-15" in n["published"] + + def test_context_is_activitystreams(self): + n = make_note(ACTOR_ID, "Hello") + assert "activitystreams" in n["@context"] + + +class TestMakeOffer: + def test_type_is_offer(self): + assert make_offer(ACTOR_ID, "Free apples", "Lots of apples")["type"] == "Offer" + + def test_summary_stored(self): + o = make_offer(ACTOR_ID, "Free apples", "Lots of apples") + assert o["summary"] == "Free apples" + + def test_content_stored(self): + o = make_offer(ACTOR_ID, "Free apples", "Lots of apples available.") + assert o["content"] == "Lots of apples available." + + def test_actor_field_set(self): + o = make_offer(ACTOR_ID, "x", "y") + assert o["actor"] == ACTOR_ID + + def test_default_to_is_public(self): + o = make_offer(ACTOR_ID, "x", "y") + assert PUBLIC in o["to"] + + def test_id_is_unique(self): + assert make_offer(ACTOR_ID, "x", "y")["id"] != make_offer(ACTOR_ID, "x", "y")["id"] + + +class TestMakeRequest: + def test_type_is_request(self): + assert make_request(ACTOR_ID, "Need a ladder", "Borrowing a ladder")["type"] == "Request" + + def test_context_includes_cf_namespace(self): + r = make_request(ACTOR_ID, "Need", "Need something") + ctx = r["@context"] + assert isinstance(ctx, list) + assert any("circuitforge" in c for c in ctx) + + def test_summary_stored(self): + r = make_request(ACTOR_ID, "Need a ladder", "...") + assert r["summary"] == "Need a ladder" + + def test_actor_field_set(self): + assert make_request(ACTOR_ID, "x", "y")["actor"] == ACTOR_ID + + def test_id_is_unique(self): + a = make_request(ACTOR_ID, "x", "y") + b = make_request(ACTOR_ID, "x", "y") + assert a["id"] != b["id"] + + +class TestMakeCreate: + def test_type_is_create(self, actor): + note = make_note(ACTOR_ID, "Hello") + c = make_create(actor, note) + assert c["type"] == "Create" + + def test_actor_field_matches(self, actor): + note = make_note(ACTOR_ID, "Hello") + c = make_create(actor, note) + assert c["actor"] == ACTOR_ID + + def test_object_is_inner_dict(self, actor): + note = make_note(ACTOR_ID, "Hello") + c = make_create(actor, note) + assert c["object"] is note + + def test_to_propagated_from_object(self, actor): + note = make_note(ACTOR_ID, "Hello", to=["https://inbox.example/"]) + c = make_create(actor, note) + assert "https://inbox.example/" in c["to"] + + def test_published_propagated(self, actor): + note = make_note(ACTOR_ID, "Hello") + c = make_create(actor, note) + assert c["published"] == note["published"] + + def test_id_is_unique(self, actor): + note = make_note(ACTOR_ID, "Hello") + a = make_create(actor, note) + b = make_create(actor, note) + assert a["id"] != b["id"] + + def test_wraps_offer(self, actor): + offer = make_offer(ACTOR_ID, "Free apples", "Take some") + c = make_create(actor, offer) + assert c["object"]["type"] == "Offer" + + def test_wraps_request(self, actor): + req = make_request(ACTOR_ID, "Need ladder", "...") + c = make_create(actor, req) + assert c["object"]["type"] == "Request" diff --git a/tests/test_activitypub/test_signing.py b/tests/test_activitypub/test_signing.py new file mode 100644 index 0000000..4955bdd --- /dev/null +++ b/tests/test_activitypub/test_signing.py @@ -0,0 +1,126 @@ +"""Tests for HTTP Signature signing and verification (draft-cavage-http-signatures-08).""" +import pytest +from circuitforge_core.activitypub.actor import generate_rsa_keypair, make_actor +from circuitforge_core.activitypub.signing import sign_headers, verify_signature + +ACTOR_ID = "https://kiwi.example.com/actors/kiwi" +INBOX_URL = "https://lemmy.example.com/inbox" + + +@pytest.fixture(scope="module") +def actor(): + priv, pub = generate_rsa_keypair(bits=1024) + return make_actor(ACTOR_ID, "kiwi", "Kiwi", priv, pub) + + +class TestSignHeaders: + def test_date_header_added(self, actor): + headers = sign_headers("POST", INBOX_URL, {}, b"body", actor) + assert "Date" in headers + + def test_host_header_added(self, actor): + headers = sign_headers("POST", INBOX_URL, {}, b"body", actor) + assert headers["Host"] == "lemmy.example.com" + + def test_digest_header_added_when_body(self, actor): + headers = sign_headers("POST", INBOX_URL, {}, b"body content", actor) + assert "Digest" in headers + assert headers["Digest"].startswith("SHA-256=") + + def test_digest_not_added_when_no_body(self, actor): + headers = sign_headers("GET", INBOX_URL, {}, None, actor) + assert "Digest" not in headers + + def test_signature_header_present(self, actor): + headers = sign_headers("POST", INBOX_URL, {}, b"body", actor) + assert "Signature" in headers + + def test_signature_contains_key_id(self, actor): + headers = sign_headers("POST", INBOX_URL, {}, b"body", actor) + assert f"{ACTOR_ID}#main-key" in headers["Signature"] + + def test_signature_algorithm_rsa_sha256(self, actor): + headers = sign_headers("POST", INBOX_URL, {}, b"body", actor) + assert 'algorithm="rsa-sha256"' in headers["Signature"] + + def test_does_not_mutate_input_headers(self, actor): + original = {"Content-Type": "application/json"} + sign_headers("POST", INBOX_URL, original, b"body", actor) + assert "Signature" not in original + + def test_content_type_signed_when_present(self, actor): + headers = sign_headers("POST", INBOX_URL, {"Content-Type": "application/activity+json"}, b"x", actor) + assert "content-type" in headers["Signature"] + + +class TestVerifySignature: + def test_valid_signature_returns_true(self, actor): + body = b'{"type": "Create"}' + headers = sign_headers("POST", INBOX_URL, {"Content-Type": "application/activity+json"}, body, actor) + result = verify_signature( + headers=headers, + method="POST", + path="/inbox", + body=body, + public_key_pem=actor.public_key_pem, + ) + assert result is True + + def test_tampered_body_returns_false(self, actor): + body = b'{"type": "Create"}' + headers = sign_headers("POST", INBOX_URL, {"Content-Type": "application/activity+json"}, body, actor) + result = verify_signature( + headers=headers, + method="POST", + path="/inbox", + body=b"tampered body", + public_key_pem=actor.public_key_pem, + ) + assert result is False + + def test_wrong_public_key_returns_false(self, actor): + _, other_pub = generate_rsa_keypair(bits=1024) + body = b"hello" + headers = sign_headers("POST", INBOX_URL, {}, body, actor) + result = verify_signature( + headers=headers, + method="POST", + path="/inbox", + body=body, + public_key_pem=other_pub, + ) + assert result is False + + def test_missing_signature_header_returns_false(self, actor): + result = verify_signature( + headers={"Date": "Mon, 20 Apr 2026 12:00:00 GMT"}, + method="POST", + path="/inbox", + body=b"body", + public_key_pem=actor.public_key_pem, + ) + assert result is False + + def test_bodyless_get_roundtrip(self, actor): + headers = sign_headers("GET", INBOX_URL, {}, None, actor) + result = verify_signature( + headers=headers, + method="GET", + path="/inbox", + body=None, + public_key_pem=actor.public_key_pem, + ) + assert result is True + + def test_wrong_method_fails_verification(self, actor): + body = b"data" + headers = sign_headers("POST", INBOX_URL, {}, body, actor) + # Verify with wrong method — (request-target) will differ + result = verify_signature( + headers=headers, + method="PUT", + path="/inbox", + body=body, + public_key_pem=actor.public_key_pem, + ) + assert result is False