feat: add activitypub module — actor, objects, signing, delivery, Lemmy, inbox (closes #51)
Some checks are pending
CI / test (push) Waiting to run
Mirror / mirror (push) Waiting to run

CFActor (frozen dataclass, RSA keygen), AS2 object constructors (Note, Offer,
Request, Create), HTTP Signatures (draft-cavage-http-signatures-08, rsa-sha256),
signed delivery via requests, Lemmy REST client (JWT auth), FastAPI inbox router
with optional signature verification.

Digest header re-verified against actual body bytes on verify_signature() to
prevent body-swap attacks. inbox.py omits __future__ annotations to avoid
FastAPI's annotation-resolution-against-module-globals constraint.

105 tests. Bumps to v0.14.0.
This commit is contained in:
pyr0ball 2026-04-20 13:18:03 -07:00
parent f9b9fa5283
commit 1553ff1630
17 changed files with 1771 additions and 2 deletions

View file

@ -6,6 +6,27 @@ Versions follow [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
---
## [0.14.0] — 2026-04-20
### Added
**`circuitforge_core.activitypub`** — ActivityPub actor management, object construction, HTTP Signature signing, delivery, and Lemmy integration (MIT, closes #51)
- `actor.py``CFActor` frozen dataclass; `generate_rsa_keypair(bits)`; `make_actor()`; `load_actor_from_key_file()`. `to_ap_dict()` produces an ActivityPub Application/Person object and never includes the private key.
- `objects.py``make_note()`, `make_offer()`, `make_request()` (CF namespace extension), `make_create()`. All return plain dicts; IDs minted with UUID4. `make_request` uses `https://circuitforge.tech/ns/activitystreams` context extension for the non-AS2 Request type.
- `signing.py``sign_headers()` (draft-cavage-http-signatures-08, rsa-sha256; signs `(request-target)`, `host`, `date`, `digest`, `content-type`). `verify_signature()` re-computes Digest from actual body after signature verification to catch body-swap attacks.
- `delivery.py``deliver_activity(activity, inbox_url, actor)` — synchronous `requests.post` with signed headers and `Content-Type: application/activity+json`.
- `lemmy.py``LemmyConfig` frozen dataclass; `LemmyClient` with `login()`, `resolve_community()` (bare name or `!community@instance` address), `post_to_community()`. Uses Lemmy v0.19+ REST API (JWT auth). `LemmyAuthError` / `LemmyCommunityNotFound` exceptions.
- `inbox.py``make_inbox_router(handlers, verify_key_fetcher, path)` — FastAPI APIRouter stub; dispatches by activity type; optional HTTP Signature verification via async `verify_key_fetcher` callback. FastAPI imported at module level with `_FASTAPI_AVAILABLE` guard (avoids annotation-resolution bug with lazy string annotations).
- 105 tests across all six files.
**Key design notes:**
- `inbox` not re-exported from `__init__` — requires fastapi, imported explicitly by products that need it
- Signing Digest + re-verifying digest against body on verify — prevents body-swap attacks even when signature is valid
- `from __future__ import annotations` intentionally omitted in `inbox.py` — FastAPI resolves `Request` annotation against module globals at route registration time
---
## [0.13.0] — 2026-04-20
### Added

View file

@ -1,4 +1,4 @@
__version__ = "0.13.0"
__version__ = "0.14.0"
try:
from circuitforge_core.community import CommunityDB, CommunityPost, SharedStore

View file

@ -0,0 +1,55 @@
"""
circuitforge_core.activitypub ActivityPub actor management, object construction,
HTTP Signature signing, delivery, and Lemmy integration.
MIT licensed.
"""
from circuitforge_core.activitypub.actor import (
CFActor,
generate_rsa_keypair,
load_actor_from_key_file,
make_actor,
)
from circuitforge_core.activitypub.delivery import deliver_activity
from circuitforge_core.activitypub.lemmy import (
LemmyAuthError,
LemmyClient,
LemmyCommunityNotFound,
LemmyConfig,
)
from circuitforge_core.activitypub.objects import (
PUBLIC,
make_create,
make_note,
make_offer,
make_request,
)
from circuitforge_core.activitypub.signing import sign_headers, verify_signature
__all__ = [
# Actor
"CFActor",
"generate_rsa_keypair",
"load_actor_from_key_file",
"make_actor",
# Objects
"PUBLIC",
"make_note",
"make_offer",
"make_request",
"make_create",
# Signing
"sign_headers",
"verify_signature",
# Delivery
"deliver_activity",
# Lemmy
"LemmyConfig",
"LemmyClient",
"LemmyAuthError",
"LemmyCommunityNotFound",
]
# inbox is optional (requires fastapi) — import it when needed:
# from circuitforge_core.activitypub.inbox import make_inbox_router

View file

@ -0,0 +1,146 @@
"""
CFActor ActivityPub actor identity for CircuitForge products.
An actor holds RSA key material and its ActivityPub identity URLs.
The private key is in-memory only; to_ap_dict() never includes it.
MIT licensed.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class CFActor:
"""ActivityPub actor for a CircuitForge product instance."""
actor_id: str # e.g. "https://kiwi.circuitforge.tech/actors/kiwi"
username: str
display_name: str
inbox_url: str
outbox_url: str
public_key_pem: str
private_key_pem: str # Never included in to_ap_dict()
icon_url: str | None = None
summary: str | None = None
def to_ap_dict(self) -> dict:
"""Return an ActivityPub Person/Application object (public only)."""
obj: dict = {
"@context": [
"https://www.w3.org/ns/activitystreams",
"https://w3id.org/security/v1",
],
"id": self.actor_id,
"type": "Application",
"preferredUsername": self.username,
"name": self.display_name,
"inbox": self.inbox_url,
"outbox": self.outbox_url,
"publicKey": {
"id": f"{self.actor_id}#main-key",
"owner": self.actor_id,
"publicKeyPem": self.public_key_pem,
},
}
if self.summary:
obj["summary"] = self.summary
if self.icon_url:
obj["icon"] = {
"type": "Image",
"mediaType": "image/png",
"url": self.icon_url,
}
return obj
def generate_rsa_keypair(bits: int = 2048) -> tuple[str, str]:
"""
Generate a new RSA keypair.
Returns:
(private_key_pem, public_key_pem) as PEM-encoded strings.
"""
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
private_key = rsa.generate_private_key(public_exponent=65537, key_size=bits)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode()
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode()
return private_pem, public_pem
def make_actor(
actor_id: str,
username: str,
display_name: str,
private_key_pem: str,
public_key_pem: str,
icon_url: str | None = None,
summary: str | None = None,
) -> CFActor:
"""
Construct a CFActor from an existing keypair.
Inbox and outbox URLs are derived from actor_id by convention:
{actor_id}/inbox and {actor_id}/outbox
"""
return CFActor(
actor_id=actor_id,
username=username,
display_name=display_name,
inbox_url=f"{actor_id}/inbox",
outbox_url=f"{actor_id}/outbox",
public_key_pem=public_key_pem,
private_key_pem=private_key_pem,
icon_url=icon_url,
summary=summary,
)
def load_actor_from_key_file(
actor_id: str,
username: str,
display_name: str,
private_key_path: str,
icon_url: str | None = None,
summary: str | None = None,
) -> CFActor:
"""
Load a CFActor from a PEM private key file on disk.
The public key is derived from the private key no separate public key
file is required.
"""
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
pem_bytes = Path(private_key_path).read_bytes()
private_key = load_pem_private_key(pem_bytes, password=None)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode()
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode()
return make_actor(
actor_id=actor_id,
username=username,
display_name=display_name,
private_key_pem=private_pem,
public_key_pem=public_pem,
icon_url=icon_url,
summary=summary,
)

View file

@ -0,0 +1,56 @@
"""
ActivityPub HTTP delivery POST a signed activity to a remote inbox.
Synchronous (uses requests). Async callers can wrap in asyncio.to_thread.
MIT licensed.
"""
from __future__ import annotations
import json
from typing import TYPE_CHECKING
import requests
from circuitforge_core.activitypub.signing import sign_headers
if TYPE_CHECKING:
from circuitforge_core.activitypub.actor import CFActor
ACTIVITY_CONTENT_TYPE = "application/activity+json"
def deliver_activity(
activity: dict,
inbox_url: str,
actor: "CFActor",
timeout: float = 10.0,
) -> requests.Response:
"""
POST a signed ActivityPub activity to a remote inbox.
The activity dict is serialized to JSON, signed with the actor's private
key (HTTP Signatures, rsa-sha256), and delivered via HTTP POST.
Args:
activity: ActivityPub activity dict (e.g. from make_create()).
inbox_url: Target inbox URL (e.g. "https://lemmy.ml/inbox").
actor: CFActor whose key signs the request.
timeout: Request timeout in seconds.
Returns:
The raw requests.Response. Caller decides retry / error policy.
Raises:
requests.RequestException: On network-level failure.
"""
body = json.dumps(activity).encode()
base_headers = {"Content-Type": ACTIVITY_CONTENT_TYPE}
signed = sign_headers(
method="POST",
url=inbox_url,
headers=base_headers,
body=body,
actor=actor,
)
return requests.post(inbox_url, data=body, headers=signed, timeout=timeout)

View file

@ -0,0 +1,128 @@
"""
ActivityPub inbox router FastAPI stub for receiving federated activities.
Products mount this router to handle incoming Create, Follow, Like, Announce,
and other ActivityPub activities from the Fediverse.
Requires fastapi (optional dep). ImportError is raised with a clear message
when fastapi is not installed.
NOTE: from __future__ import annotations is intentionally omitted here.
FastAPI resolves route parameter annotations against module globals at
definition time; lazy string annotations break the Request injection.
MIT licensed.
"""
import json as _json
import re
from typing import Awaitable, Callable
# Handler type: receives (activity_dict, request_headers) and returns None
InboxHandler = Callable[[dict, dict], Awaitable[None]]
# FastAPI imports at module level so annotations resolve correctly.
# Products that don't use the inbox router are not affected by this import
# since circuitforge_core.activitypub.__init__ does NOT import inbox.
try:
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import JSONResponse
_FASTAPI_AVAILABLE = True
except ImportError:
_FASTAPI_AVAILABLE = False
# Provide stubs so the module can be imported without fastapi
APIRouter = None # type: ignore[assignment,misc]
HTTPException = None # type: ignore[assignment]
Request = None # type: ignore[assignment]
JSONResponse = None # type: ignore[assignment]
def make_inbox_router(
handlers: dict[str, InboxHandler] | None = None,
verify_key_fetcher: Callable[[str], Awaitable[str | None]] | None = None,
path: str = "/inbox",
) -> "APIRouter": # type: ignore[name-defined]
"""
Build a FastAPI router that handles ActivityPub inbox POSTs.
The router:
1. Parses the JSON body into an activity dict
2. Optionally verifies the HTTP Signature (when verify_key_fetcher is provided)
3. Dispatches activity["type"] to the matching handler from *handlers*
4. Returns 202 Accepted on success, 400 on bad JSON, 401 on bad signature
Args:
handlers: Dict mapping activity type strings (e.g. "Create",
"Follow") to async handler callables.
verify_key_fetcher: Async callable that takes a keyId URL and returns the
actor's public key PEM, or None if not found.
When None, signature verification is skipped (dev mode).
path: Inbox endpoint path (default "/inbox").
Returns:
FastAPI APIRouter.
Example::
async def on_create(activity: dict, headers: dict) -> None:
print("Received Create:", activity)
router = make_inbox_router(handlers={"Create": on_create})
app.include_router(router, prefix="/actors/kiwi")
"""
if not _FASTAPI_AVAILABLE:
raise ImportError(
"circuitforge_core.activitypub.inbox requires fastapi. "
"Install with: pip install fastapi"
)
from circuitforge_core.activitypub.signing import verify_signature
router = APIRouter()
_handlers: dict[str, InboxHandler] = handlers or {}
@router.post(path, status_code=202)
async def inbox_endpoint(request: Request) -> JSONResponse:
# Parse body — read bytes first (needed for signature verification),
# then decode JSON manually to avoid double-read issues.
try:
body = await request.body()
activity = _json.loads(body)
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body.")
# Optional signature verification
if verify_key_fetcher is not None:
sig_header = request.headers.get("Signature", "")
key_id = _parse_key_id(sig_header)
if not key_id:
raise HTTPException(status_code=401, detail="Missing or malformed Signature header.")
public_key_pem = await verify_key_fetcher(key_id)
if public_key_pem is None:
raise HTTPException(status_code=401, detail=f"Unknown keyId: {key_id}")
ok = verify_signature(
headers=dict(request.headers),
method="POST",
path=request.url.path,
body=body,
public_key_pem=public_key_pem,
)
if not ok:
raise HTTPException(status_code=401, detail="Signature verification failed.")
activity_type = activity.get("type", "")
handler = _handlers.get(activity_type)
if handler is None:
# Unknown types are silently accepted per AP spec — return 202
return JSONResponse(status_code=202, content={"status": "accepted", "type": activity_type})
await handler(activity, dict(request.headers))
return JSONResponse(status_code=202, content={"status": "accepted"})
return router
def _parse_key_id(sig_header: str) -> str | None:
"""Extract keyId value from a Signature header string."""
match = re.search(r'keyId="([^"]+)"', sig_header)
return match.group(1) if match else None

View file

@ -0,0 +1,173 @@
"""
Lemmy REST API client for posting to Lemmy communities.
Uses JWT authentication (Lemmy v0.19+ API). Does not require ActivityPub
federation setup the Lemmy REST API is simpler and more reliable for
the initial integration.
MIT licensed.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
import requests
class LemmyAuthError(Exception):
"""Raised when Lemmy login fails."""
class LemmyCommunityNotFound(Exception):
"""Raised when a community cannot be resolved by name."""
@dataclass(frozen=True)
class LemmyConfig:
"""Connection config for a Lemmy instance."""
instance_url: str # e.g. "https://lemmy.ml" (no trailing slash)
username: str
password: str # Load from env/config; never hardcode
class LemmyClient:
"""
Lemmy REST API client.
Usage::
config = LemmyConfig(instance_url="https://lemmy.ml", username="bot", password="...")
client = LemmyClient(config)
client.login()
community_id = client.resolve_community("!cooking@lemmy.world")
client.post_to_community(community_id, title="Fresh pesto recipe", body="...")
"""
def __init__(self, config: LemmyConfig) -> None:
self._config = config
self._jwt: str | None = None
self._session = requests.Session()
self._session.headers.update({"Content-Type": "application/json"})
@property
def _api(self) -> str:
return f"{self._config.instance_url.rstrip('/')}/api/v3"
def _auth_headers(self) -> dict[str, str]:
if not self._jwt:
raise LemmyAuthError("Not logged in — call login() first.")
return {"Authorization": f"Bearer {self._jwt}"}
def login(self) -> None:
"""
Authenticate with the Lemmy instance and store the JWT.
Raises:
LemmyAuthError: If credentials are rejected or the request fails.
"""
resp = self._session.post(
f"{self._api}/user/login",
json={"username_or_email": self._config.username, "password": self._config.password},
timeout=10,
)
if resp.status_code != 200:
raise LemmyAuthError(
f"Lemmy login failed ({resp.status_code}): {resp.text[:200]}"
)
data = resp.json()
token = data.get("jwt")
if not token:
raise LemmyAuthError("Lemmy login response missing 'jwt' field.")
self._jwt = token
def resolve_community(self, name: str) -> int:
"""
Resolve a community name or address to its numeric Lemmy ID.
Accepts:
- Bare name: "cooking"
- Fediverse address: "!cooking@lemmy.world"
- Display name search (best-effort)
Args:
name: Community identifier.
Returns:
Numeric community ID.
Raises:
LemmyCommunityNotFound: If not found or multiple matches are ambiguous.
LemmyAuthError: If not logged in.
"""
# Strip leading ! for address lookups
lookup = name.lstrip("!")
resp = self._session.get(
f"{self._api}/search",
params={"q": lookup, "type_": "Communities", "limit": 5},
headers=self._auth_headers(),
timeout=10,
)
if resp.status_code != 200:
raise LemmyCommunityNotFound(
f"Community search failed ({resp.status_code}): {resp.text[:200]}"
)
communities = resp.json().get("communities", [])
if not communities:
raise LemmyCommunityNotFound(f"No communities found for '{name}'.")
# Prefer exact actor_id match (e.g. !cooking@lemmy.world)
for item in communities:
view = item.get("community", {})
if "@" in lookup:
actor_id: str = view.get("actor_id", "")
if lookup.lower() in actor_id.lower():
return int(view["id"])
else:
if view.get("name", "").lower() == lookup.lower():
return int(view["id"])
# Fall back to first result
return int(communities[0]["community"]["id"])
def post_to_community(
self,
community_id: int,
title: str,
body: str,
url: str | None = None,
nsfw: bool = False,
) -> dict[str, Any]:
"""
Create a post in a Lemmy community.
Args:
community_id: Numeric community ID (from resolve_community()).
title: Post title.
body: Markdown post body.
url: Optional external URL to attach.
nsfw: Mark NSFW (default False).
Returns:
Lemmy API response dict (contains 'post_view', etc.).
Raises:
LemmyAuthError: If not logged in.
requests.RequestException: On network failure.
"""
payload: dict[str, Any] = {
"community_id": community_id,
"name": title,
"body": body,
"nsfw": nsfw,
}
if url:
payload["url"] = url
resp = self._session.post(
f"{self._api}/post",
json=payload,
headers=self._auth_headers(),
timeout=15,
)
resp.raise_for_status()
return resp.json()

View file

@ -0,0 +1,168 @@
"""
ActivityStreams 2.0 object constructors.
All functions return plain dicts (no classes) they are serialized to JSON
for delivery. IDs are minted with UUID4 so callers don't need to track them.
Custom types:
- "Offer" AS2 Offer (Rook exchange offers)
- "Request" custom CF extension (Rook exchange requests); not in core AS2
MIT licensed.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from circuitforge_core.activitypub.actor import CFActor
# AS2 public address (all followers)
PUBLIC = "https://www.w3.org/ns/activitystreams#Public"
# Custom context extension for CF-specific types
_CF_CONTEXT = "https://circuitforge.tech/ns/activitystreams"
def _now_iso() -> str:
return datetime.now(tz=timezone.utc).isoformat().replace("+00:00", "Z")
def _mint_id(actor_id: str, type_slug: str) -> str:
"""Generate a unique ID scoped to the actor's namespace."""
return f"{actor_id}/{type_slug}/{uuid.uuid4().hex}"
def make_note(
actor_id: str,
content: str,
to: list[str] | None = None,
cc: list[str] | None = None,
in_reply_to: str | None = None,
tag: list[dict] | None = None,
published: datetime | None = None,
) -> dict:
"""
Construct an AS2 Note object.
Args:
actor_id: The actor's ID URL (attributedTo).
content: HTML or plain-text body.
to: Direct recipients (defaults to [PUBLIC]).
cc: CC recipients.
in_reply_to: URL of the parent note when replying.
tag: Mention/hashtag tag dicts.
published: Post timestamp (defaults to now UTC).
"""
note: dict = {
"@context": "https://www.w3.org/ns/activitystreams",
"id": _mint_id(actor_id, "notes"),
"type": "Note",
"attributedTo": actor_id,
"content": content,
"to": to if to is not None else [PUBLIC],
"published": published.isoformat().replace("+00:00", "Z") if published else _now_iso(),
}
if cc:
note["cc"] = cc
if in_reply_to:
note["inReplyTo"] = in_reply_to
if tag:
note["tag"] = tag
return note
def make_offer(
actor_id: str,
summary: str,
content: str,
to: list[str] | None = None,
cc: list[str] | None = None,
) -> dict:
"""
Construct an AS2 Offer object (Rook exchange offers).
The Offer type is part of core ActivityStreams 2.0.
Args:
actor_id: The actor's ID URL (actor field).
summary: Short one-line description (used as title in Lemmy).
content: Full HTML/plain-text description.
to: Recipients (defaults to [PUBLIC]).
cc: CC recipients.
"""
return {
"@context": "https://www.w3.org/ns/activitystreams",
"id": _mint_id(actor_id, "offers"),
"type": "Offer",
"actor": actor_id,
"summary": summary,
"content": content,
"to": to if to is not None else [PUBLIC],
"cc": cc or [],
"published": _now_iso(),
}
def make_request(
actor_id: str,
summary: str,
content: str,
to: list[str] | None = None,
cc: list[str] | None = None,
) -> dict:
"""
Construct a CF-extension Request object (Rook exchange requests).
"Request" is not in core AS2 vocabulary the CF namespace context
extension is included so federating servers don't reject it.
Args:
actor_id: The actor's ID URL.
summary: Short one-line description.
content: Full HTML/plain-text description.
to: Recipients (defaults to [PUBLIC]).
cc: CC recipients.
"""
return {
"@context": [
"https://www.w3.org/ns/activitystreams",
_CF_CONTEXT,
],
"id": _mint_id(actor_id, "requests"),
"type": "Request",
"actor": actor_id,
"summary": summary,
"content": content,
"to": to if to is not None else [PUBLIC],
"cc": cc or [],
"published": _now_iso(),
}
def make_create(actor: "CFActor", obj: dict) -> dict:
"""
Wrap any object dict in an AS2 Create activity.
The Create activity's id, actor, to, cc, and published fields are
derived from the wrapped object where available.
Args:
actor: The CFActor originating the Create.
obj: An object dict (Note, Offer, Request, etc.).
"""
# Propagate context from inner object if it's a list (custom types)
ctx = obj.get("@context", "https://www.w3.org/ns/activitystreams")
return {
"@context": ctx,
"id": _mint_id(actor.actor_id, "activities"),
"type": "Create",
"actor": actor.actor_id,
"to": obj.get("to", [PUBLIC]),
"cc": obj.get("cc", []),
"published": obj.get("published", _now_iso()),
"object": obj,
}

View file

@ -0,0 +1,197 @@
"""
HTTP Signatures for ActivityPub (draft-cavage-http-signatures-08).
This is the signing convention used by Mastodon, Lemmy, and the broader
ActivityPub ecosystem. It is distinct from the newer RFC 9421.
Signing algorithm: rsa-sha256
Signed headers: (request-target) host date [digest] content-type
Digest header: SHA-256 of request body (when body is present)
keyId: {actor.actor_id}#main-key
MIT licensed.
"""
from __future__ import annotations
import base64
import hashlib
import re
from email.utils import formatdate
from typing import TYPE_CHECKING
from urllib.parse import urlparse
if TYPE_CHECKING:
from circuitforge_core.activitypub.actor import CFActor
def _rfc1123_now() -> str:
"""Return current UTC time in RFC 1123 format as required by HTTP Date header."""
return formatdate(usegmt=True)
def _sha256_digest(body: bytes) -> str:
"""Return 'SHA-256=<base64>' digest string for body."""
digest = hashlib.sha256(body).digest()
return f"SHA-256={base64.b64encode(digest).decode()}"
def sign_headers(
method: str,
url: str,
headers: dict,
body: bytes | None,
actor: "CFActor", # type: ignore[name-defined]
) -> dict:
"""
Return a new headers dict with Date, Digest (if body), and Signature added.
The input *headers* dict is not mutated.
Args:
method: HTTP method string (e.g. "POST"), case-insensitive.
url: Full request URL.
headers: Existing headers dict (Content-Type, etc.).
body: Request body bytes, or None for bodyless requests.
actor: CFActor whose private key signs the request.
Returns:
New dict with all original headers plus Date, Digest (if body), Signature.
"""
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import load_pem_private_key
parsed = urlparse(url)
host = parsed.netloc
path = parsed.path or "/"
if parsed.query:
path = f"{path}?{parsed.query}"
method_lower = method.lower()
date = _rfc1123_now()
out = dict(headers)
out["Date"] = date
out["Host"] = host
signed_header_names = ["(request-target)", "host", "date"]
if body is not None:
digest = _sha256_digest(body)
out["Digest"] = digest
signed_header_names.append("digest")
if "Content-Type" in out:
signed_header_names.append("content-type")
# Build the signature string — header names in the spec are lowercase,
# but the dict uses Title-Case HTTP convention, so look up case-insensitively.
def _ci_get(d: dict, key: str) -> str:
for k, v in d.items():
if k.lower() == key.lower():
return v
raise KeyError(key)
lines = []
for name in signed_header_names:
if name == "(request-target)":
lines.append(f"(request-target): {method_lower} {path}")
else:
lines.append(f"{name}: {_ci_get(out, name)}")
signature_string = "\n".join(lines).encode()
private_key = load_pem_private_key(actor.private_key_pem.encode(), password=None)
raw_sig = private_key.sign(signature_string, padding.PKCS1v15(), hashes.SHA256())
b64_sig = base64.b64encode(raw_sig).decode()
key_id = f"{actor.actor_id}#main-key"
headers_param = " ".join(signed_header_names)
out["Signature"] = (
f'keyId="{key_id}",'
f'algorithm="rsa-sha256",'
f'headers="{headers_param}",'
f'signature="{b64_sig}"'
)
return out
def verify_signature(
headers: dict,
method: str,
path: str,
body: bytes | None,
public_key_pem: str,
) -> bool:
"""
Verify an incoming ActivityPub HTTP Signature.
Returns False on any parse or verification failure never raises.
Args:
headers: Request headers dict (case-insensitive lookup attempted).
method: HTTP method (e.g. "POST").
path: Request path (e.g. "/actors/kiwi/inbox").
body: Raw request body bytes, or None.
public_key_pem: PEM-encoded RSA public key of the signing actor.
"""
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import load_pem_public_key
try:
# Case-insensitive header lookup helper
def _get(name: str) -> str | None:
name_lower = name.lower()
for k, v in headers.items():
if k.lower() == name_lower:
return v
return None
sig_header = _get("Signature")
if not sig_header:
return False
# Parse Signature header key=value pairs
params: dict[str, str] = {}
for match in re.finditer(r'(\w+)="([^"]*)"', sig_header):
params[match.group(1)] = match.group(2)
if "signature" not in params or "headers" not in params:
return False
signed_header_names = params["headers"].split()
method_lower = method.lower()
lines = []
for name in signed_header_names:
if name == "(request-target)":
lines.append(f"(request-target): {method_lower} {path}")
else:
val = _get(name)
if val is None:
return False
lines.append(f"{name}: {val}")
signature_string = "\n".join(lines).encode()
raw_sig = base64.b64decode(params["signature"])
public_key = load_pem_public_key(public_key_pem.encode())
public_key.verify(raw_sig, signature_string, padding.PKCS1v15(), hashes.SHA256())
# Also verify the Digest header matches the actual body, if both are present.
# Signing the Digest header proves it wasn't swapped; re-computing it proves
# the body wasn't replaced after signing.
digest_val = _get("Digest")
if digest_val and body is not None:
expected = _sha256_digest(body)
if digest_val != expected:
return False
return True
except (InvalidSignature, Exception):
return False

View file

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "circuitforge-core"
version = "0.13.0"
version = "0.14.0"
description = "Shared scaffold for CircuitForge products (MIT)"
requires-python = ">=3.11"
dependencies = [

View file

View file

@ -0,0 +1,152 @@
"""Tests for CFActor and key generation utilities."""
import pytest
from circuitforge_core.activitypub.actor import (
CFActor,
generate_rsa_keypair,
make_actor,
load_actor_from_key_file,
)
@pytest.fixture(scope="module")
def keypair():
return generate_rsa_keypair(bits=1024) # small key for speed in tests
@pytest.fixture(scope="module")
def actor(keypair):
priv, pub = keypair
return make_actor(
actor_id="https://kiwi.example.com/actors/kiwi",
username="kiwi",
display_name="Kiwi Pantry Bot",
private_key_pem=priv,
public_key_pem=pub,
summary="Community recipe posts from Kiwi.",
)
class TestGenerateRsaKeypair:
def test_returns_two_strings(self, keypair):
priv, pub = keypair
assert isinstance(priv, str)
assert isinstance(pub, str)
def test_private_key_pem_header(self, keypair):
priv, _ = keypair
assert "BEGIN PRIVATE KEY" in priv
def test_public_key_pem_header(self, keypair):
_, pub = keypair
assert "BEGIN PUBLIC KEY" in pub
def test_keys_are_different(self, keypair):
priv, pub = keypair
assert priv != pub
class TestMakeActor:
def test_actor_id_stored(self, actor):
assert actor.actor_id == "https://kiwi.example.com/actors/kiwi"
def test_inbox_derived_from_actor_id(self, actor):
assert actor.inbox_url == "https://kiwi.example.com/actors/kiwi/inbox"
def test_outbox_derived_from_actor_id(self, actor):
assert actor.outbox_url == "https://kiwi.example.com/actors/kiwi/outbox"
def test_username(self, actor):
assert actor.username == "kiwi"
def test_display_name(self, actor):
assert actor.display_name == "Kiwi Pantry Bot"
def test_summary_stored(self, actor):
assert actor.summary == "Community recipe posts from Kiwi."
def test_icon_url_defaults_none(self, keypair):
priv, pub = keypair
a = make_actor("https://x.com/a", "a", "A", priv, pub)
assert a.icon_url is None
def test_actor_is_frozen(self, actor):
with pytest.raises((AttributeError, TypeError)):
actor.username = "changed" # type: ignore[misc]
class TestToApDict:
def test_type_is_application(self, actor):
d = actor.to_ap_dict()
assert d["type"] == "Application"
def test_id_matches_actor_id(self, actor):
d = actor.to_ap_dict()
assert d["id"] == actor.actor_id
def test_preferred_username(self, actor):
d = actor.to_ap_dict()
assert d["preferredUsername"] == "kiwi"
def test_public_key_present(self, actor):
d = actor.to_ap_dict()
assert "publicKey" in d
assert d["publicKey"]["publicKeyPem"] == actor.public_key_pem
def test_key_id_includes_main_key_fragment(self, actor):
d = actor.to_ap_dict()
assert d["publicKey"]["id"].endswith("#main-key")
def test_private_key_not_in_dict(self, actor):
d = actor.to_ap_dict()
import json
serialized = json.dumps(d)
assert "PRIVATE KEY" not in serialized
def test_context_includes_security(self, actor):
d = actor.to_ap_dict()
ctx = d["@context"]
assert "https://w3id.org/security/v1" in ctx
def test_summary_included_when_set(self, actor):
d = actor.to_ap_dict()
assert d["summary"] == actor.summary
def test_summary_omitted_when_none(self, keypair):
priv, pub = keypair
a = make_actor("https://x.com/a", "a", "A", priv, pub)
d = a.to_ap_dict()
assert "summary" not in d
def test_icon_included_when_set(self, keypair):
priv, pub = keypair
a = make_actor("https://x.com/a", "a", "A", priv, pub, icon_url="https://x.com/icon.png")
d = a.to_ap_dict()
assert d["icon"]["url"] == "https://x.com/icon.png"
def test_icon_omitted_when_none(self, actor):
d = actor.to_ap_dict()
assert "icon" not in d
class TestLoadActorFromKeyFile:
def test_loads_and_derives_public_key(self, keypair, tmp_path):
priv, pub = keypair
key_file = tmp_path / "actor.pem"
key_file.write_text(priv)
loaded = load_actor_from_key_file(
actor_id="https://test.example/actors/x",
username="x",
display_name="X",
private_key_path=str(key_file),
)
assert "BEGIN PUBLIC KEY" in loaded.public_key_pem
assert loaded.actor_id == "https://test.example/actors/x"
def test_missing_file_raises(self, tmp_path):
with pytest.raises(FileNotFoundError):
load_actor_from_key_file(
actor_id="https://x.com/a",
username="a",
display_name="A",
private_key_path=str(tmp_path / "missing.pem"),
)

View file

@ -0,0 +1,61 @@
"""Tests for deliver_activity — mocked at requests layer."""
import json
import pytest
from unittest.mock import MagicMock, patch
from circuitforge_core.activitypub.actor import generate_rsa_keypair, make_actor
from circuitforge_core.activitypub.delivery import deliver_activity
from circuitforge_core.activitypub.objects import make_note, make_create
ACTOR_ID = "https://kiwi.example.com/actors/kiwi"
INBOX_URL = "https://lemmy.example.com/inbox"
@pytest.fixture(scope="module")
def actor():
priv, pub = generate_rsa_keypair(bits=1024)
return make_actor(ACTOR_ID, "kiwi", "Kiwi", priv, pub)
@pytest.fixture(scope="module")
def activity(actor):
note = make_note(ACTOR_ID, "Hello Lemmy!")
return make_create(actor, note)
class TestDeliverActivity:
def test_posts_to_inbox_url(self, actor, activity):
mock_resp = MagicMock(status_code=202)
with patch("circuitforge_core.activitypub.delivery.requests.post", return_value=mock_resp) as mock_post:
deliver_activity(activity, INBOX_URL, actor)
mock_post.assert_called_once()
call_url = mock_post.call_args[0][0]
assert call_url == INBOX_URL
def test_content_type_is_activity_json(self, actor, activity):
mock_resp = MagicMock(status_code=202)
with patch("circuitforge_core.activitypub.delivery.requests.post", return_value=mock_resp) as mock_post:
deliver_activity(activity, INBOX_URL, actor)
headers = mock_post.call_args[1]["headers"]
assert headers.get("Content-Type") == "application/activity+json"
def test_body_is_json_serialized(self, actor, activity):
mock_resp = MagicMock(status_code=202)
with patch("circuitforge_core.activitypub.delivery.requests.post", return_value=mock_resp) as mock_post:
deliver_activity(activity, INBOX_URL, actor)
body = mock_post.call_args[1]["data"]
parsed = json.loads(body)
assert parsed["type"] == "Create"
def test_signature_header_present(self, actor, activity):
mock_resp = MagicMock(status_code=202)
with patch("circuitforge_core.activitypub.delivery.requests.post", return_value=mock_resp) as mock_post:
deliver_activity(activity, INBOX_URL, actor)
headers = mock_post.call_args[1]["headers"]
assert "Signature" in headers
def test_returns_response(self, actor, activity):
mock_resp = MagicMock(status_code=202)
with patch("circuitforge_core.activitypub.delivery.requests.post", return_value=mock_resp):
result = deliver_activity(activity, INBOX_URL, actor)
assert result is mock_resp

View file

@ -0,0 +1,145 @@
"""Tests for the ActivityPub inbox FastAPI router."""
import json
import pytest
from unittest.mock import AsyncMock, MagicMock
from fastapi import FastAPI
from fastapi.testclient import TestClient
from circuitforge_core.activitypub.actor import generate_rsa_keypair, make_actor
from circuitforge_core.activitypub.inbox import make_inbox_router
from circuitforge_core.activitypub.signing import sign_headers
ACTOR_ID = "https://kiwi.example.com/actors/kiwi"
@pytest.fixture(scope="module")
def actor():
priv, pub = generate_rsa_keypair(bits=1024)
return make_actor(ACTOR_ID, "kiwi", "Kiwi", priv, pub)
@pytest.fixture
def app_no_verify():
"""App with inbox router, no signature verification (dev mode)."""
received = []
async def on_create(activity, headers):
received.append(activity)
router = make_inbox_router(handlers={"Create": on_create})
app = FastAPI()
app.include_router(router)
app._received = received
return app
@pytest.fixture
def client_no_verify(app_no_verify):
return TestClient(app_no_verify)
class TestInboxNoVerification:
def test_202_on_known_activity_type(self, client_no_verify):
resp = client_no_verify.post(
"/inbox",
json={"type": "Create", "actor": ACTOR_ID, "object": {}},
)
assert resp.status_code == 202
def test_202_on_unknown_activity_type(self, client_no_verify):
resp = client_no_verify.post(
"/inbox",
json={"type": "Undo", "actor": ACTOR_ID},
)
assert resp.status_code == 202
def test_response_body_contains_accepted(self, client_no_verify):
resp = client_no_verify.post(
"/inbox",
json={"type": "Create", "actor": ACTOR_ID, "object": {}},
)
assert resp.json()["status"] == "accepted"
def test_400_on_invalid_json(self, client_no_verify):
resp = client_no_verify.post(
"/inbox",
data=b"not json at all",
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 400
def test_handler_called_with_activity(self, app_no_verify, client_no_verify):
app_no_verify._received.clear()
activity = {"type": "Create", "actor": ACTOR_ID, "object": {"type": "Note"}}
client_no_verify.post("/inbox", json=activity)
assert len(app_no_verify._received) == 1
assert app_no_verify._received[0]["type"] == "Create"
def test_no_handlers_still_returns_202(self):
router = make_inbox_router()
app = FastAPI()
app.include_router(router)
with TestClient(app) as c:
resp = c.post("/inbox", json={"type": "Follow"})
assert resp.status_code == 202
class TestInboxWithSignatureVerification:
def test_valid_signature_accepted(self, actor):
async def key_fetcher(key_id: str):
return actor.public_key_pem
router = make_inbox_router(handlers={}, verify_key_fetcher=key_fetcher)
app = FastAPI()
app.include_router(router)
activity = {"type": "Create", "actor": ACTOR_ID}
body = json.dumps(activity).encode()
headers = sign_headers(
method="POST",
url="http://testserver/inbox",
headers={"Content-Type": "application/activity+json"},
body=body,
actor=actor,
)
with TestClient(app) as c:
resp = c.post("/inbox", content=body, headers=headers)
assert resp.status_code == 202
def test_missing_signature_returns_401(self, actor):
async def key_fetcher(key_id: str):
return actor.public_key_pem
router = make_inbox_router(handlers={}, verify_key_fetcher=key_fetcher)
app = FastAPI()
app.include_router(router)
with TestClient(app) as c:
resp = c.post("/inbox", json={"type": "Create"})
assert resp.status_code == 401
def test_unknown_key_id_returns_401(self, actor):
async def key_fetcher(key_id: str):
return None # Unknown actor
router = make_inbox_router(handlers={}, verify_key_fetcher=key_fetcher)
app = FastAPI()
app.include_router(router)
activity = {"type": "Create"}
body = json.dumps(activity).encode()
headers = sign_headers("POST", "http://testserver/inbox", {}, body, actor)
with TestClient(app) as c:
resp = c.post("/inbox", content=body, headers=headers)
assert resp.status_code == 401
class TestMakeInboxRouterImportError:
def test_raises_on_missing_fastapi(self, monkeypatch):
import circuitforge_core.activitypub.inbox as inbox_mod
monkeypatch.setattr(inbox_mod, "_FASTAPI_AVAILABLE", False)
with pytest.raises(ImportError, match="fastapi"):
make_inbox_router()

View file

@ -0,0 +1,163 @@
"""Tests for LemmyClient — mocked at the requests layer."""
import pytest
from unittest.mock import MagicMock, patch
from circuitforge_core.activitypub.lemmy import (
LemmyAuthError,
LemmyClient,
LemmyCommunityNotFound,
LemmyConfig,
)
CONFIG = LemmyConfig(
instance_url="https://lemmy.example.com",
username="kiwi_bot",
password="s3cret",
)
@pytest.fixture
def client():
return LemmyClient(CONFIG)
def _mock_response(status_code: int, json_data: dict):
resp = MagicMock()
resp.status_code = status_code
resp.json.return_value = json_data
resp.text = str(json_data)
resp.raise_for_status = MagicMock()
return resp
class TestLemmyConfig:
def test_fields_stored(self):
assert CONFIG.instance_url == "https://lemmy.example.com"
assert CONFIG.username == "kiwi_bot"
assert CONFIG.password == "s3cret"
def test_frozen(self):
with pytest.raises((AttributeError, TypeError)):
CONFIG.username = "other" # type: ignore[misc]
class TestLogin:
def test_successful_login_stores_jwt(self, client):
with patch.object(client._session, "post", return_value=_mock_response(200, {"jwt": "token123"})):
client.login()
assert client._jwt == "token123"
def test_401_raises_lemmy_auth_error(self, client):
with patch.object(client._session, "post", return_value=_mock_response(401, {})):
with pytest.raises(LemmyAuthError):
client.login()
def test_missing_jwt_field_raises(self, client):
with patch.object(client._session, "post", return_value=_mock_response(200, {})):
with pytest.raises(LemmyAuthError, match="missing 'jwt'"):
client.login()
class TestResolveCommunity:
def _logged_in_client(self):
c = LemmyClient(CONFIG)
c._jwt = "fake-jwt"
return c
def test_resolves_exact_name_match(self):
client = self._logged_in_client()
community_resp = {
"communities": [
{"community": {"id": 42, "name": "cooking", "actor_id": "https://lemmy.world/c/cooking"}}
]
}
with patch.object(client._session, "get", return_value=_mock_response(200, community_resp)):
cid = client.resolve_community("cooking")
assert cid == 42
def test_resolves_fediverse_address(self):
client = self._logged_in_client()
community_resp = {
"communities": [
{"community": {"id": 99, "name": "cooking", "actor_id": "https://lemmy.world/c/cooking"}}
]
}
with patch.object(client._session, "get", return_value=_mock_response(200, community_resp)):
cid = client.resolve_community("!cooking@lemmy.world")
assert cid == 99
def test_empty_results_raises_not_found(self):
client = self._logged_in_client()
with patch.object(client._session, "get", return_value=_mock_response(200, {"communities": []})):
with pytest.raises(LemmyCommunityNotFound):
client.resolve_community("nonexistent")
def test_search_failure_raises_not_found(self):
client = self._logged_in_client()
with patch.object(client._session, "get", return_value=_mock_response(500, {})):
with pytest.raises(LemmyCommunityNotFound):
client.resolve_community("cooking")
def test_not_logged_in_raises_auth_error(self, client):
with pytest.raises(LemmyAuthError):
client.resolve_community("cooking")
class TestPostToCommunity:
def _logged_in_client(self):
c = LemmyClient(CONFIG)
c._jwt = "fake-jwt"
return c
def test_successful_post_returns_dict(self):
client = self._logged_in_client()
post_resp = {"post_view": {"post": {"id": 123, "name": "Recipe post"}}}
with patch.object(client._session, "post", return_value=_mock_response(200, post_resp)):
result = client.post_to_community(42, "Recipe post", "Great recipe!")
assert result == post_resp
def test_post_includes_title_and_body(self):
client = self._logged_in_client()
post_resp = {"post_view": {}}
captured = {}
def fake_post(url, json=None, headers=None, timeout=None):
captured["json"] = json
return _mock_response(200, post_resp)
with patch.object(client._session, "post", side_effect=fake_post):
client.post_to_community(42, "My Title", "My body")
assert captured["json"]["name"] == "My Title"
assert captured["json"]["body"] == "My body"
assert captured["json"]["community_id"] == 42
def test_optional_url_included_when_set(self):
client = self._logged_in_client()
captured = {}
def fake_post(url, json=None, headers=None, timeout=None):
captured["json"] = json
return _mock_response(200, {})
with patch.object(client._session, "post", side_effect=fake_post):
client.post_to_community(42, "Title", "Body", url="https://example.com")
assert captured["json"]["url"] == "https://example.com"
def test_url_absent_when_not_set(self):
client = self._logged_in_client()
captured = {}
def fake_post(url, json=None, headers=None, timeout=None):
captured["json"] = json
return _mock_response(200, {})
with patch.object(client._session, "post", side_effect=fake_post):
client.post_to_community(42, "Title", "Body")
assert "url" not in captured["json"]
def test_not_logged_in_raises_auth_error(self, client):
with pytest.raises(LemmyAuthError):
client.post_to_community(42, "Title", "Body")

View file

@ -0,0 +1,178 @@
"""Tests for ActivityStreams 2.0 object constructors."""
import pytest
from circuitforge_core.activitypub.actor import generate_rsa_keypair, make_actor
from circuitforge_core.activitypub.objects import (
PUBLIC,
make_create,
make_note,
make_offer,
make_request,
)
ACTOR_ID = "https://rook.example.com/actors/rook"
@pytest.fixture(scope="module")
def actor():
priv, pub = generate_rsa_keypair(bits=1024)
return make_actor(ACTOR_ID, "rook", "Rook Exchange", priv, pub)
class TestPublicConstant:
def test_is_as_public_uri(self):
assert PUBLIC == "https://www.w3.org/ns/activitystreams#Public"
class TestMakeNote:
def test_type_is_note(self):
assert make_note(ACTOR_ID, "Hello")["type"] == "Note"
def test_attributed_to_actor(self):
n = make_note(ACTOR_ID, "Hello")
assert n["attributedTo"] == ACTOR_ID
def test_content_stored(self):
n = make_note(ACTOR_ID, "Hello world")
assert n["content"] == "Hello world"
def test_default_to_is_public(self):
n = make_note(ACTOR_ID, "Hello")
assert PUBLIC in n["to"]
def test_custom_to(self):
n = make_note(ACTOR_ID, "Hello", to=["https://other.example/inbox"])
assert "https://other.example/inbox" in n["to"]
def test_cc_present_when_set(self):
n = make_note(ACTOR_ID, "Hello", cc=["https://x.com/followers"])
assert n["cc"] == ["https://x.com/followers"]
def test_cc_absent_when_not_set(self):
n = make_note(ACTOR_ID, "Hello")
assert "cc" not in n
def test_in_reply_to_included(self):
n = make_note(ACTOR_ID, "Reply", in_reply_to="https://mastodon.social/notes/123")
assert n["inReplyTo"] == "https://mastodon.social/notes/123"
def test_in_reply_to_absent_by_default(self):
assert "inReplyTo" not in make_note(ACTOR_ID, "Hello")
def test_tag_included(self):
tag = [{"type": "Mention", "href": "https://mastodon.social/users/alice"}]
n = make_note(ACTOR_ID, "Hi @alice", tag=tag)
assert n["tag"] == tag
def test_id_is_unique(self):
a = make_note(ACTOR_ID, "Hello")
b = make_note(ACTOR_ID, "Hello")
assert a["id"] != b["id"]
def test_id_scoped_to_actor(self):
n = make_note(ACTOR_ID, "Hello")
assert n["id"].startswith(ACTOR_ID)
def test_published_present(self):
n = make_note(ACTOR_ID, "Hello")
assert "published" in n
assert n["published"].endswith("Z")
def test_custom_published(self):
from datetime import datetime, timezone
ts = datetime(2026, 1, 15, 12, 0, 0, tzinfo=timezone.utc)
n = make_note(ACTOR_ID, "Hello", published=ts)
assert "2026-01-15" in n["published"]
def test_context_is_activitystreams(self):
n = make_note(ACTOR_ID, "Hello")
assert "activitystreams" in n["@context"]
class TestMakeOffer:
def test_type_is_offer(self):
assert make_offer(ACTOR_ID, "Free apples", "Lots of apples")["type"] == "Offer"
def test_summary_stored(self):
o = make_offer(ACTOR_ID, "Free apples", "Lots of apples")
assert o["summary"] == "Free apples"
def test_content_stored(self):
o = make_offer(ACTOR_ID, "Free apples", "Lots of apples available.")
assert o["content"] == "Lots of apples available."
def test_actor_field_set(self):
o = make_offer(ACTOR_ID, "x", "y")
assert o["actor"] == ACTOR_ID
def test_default_to_is_public(self):
o = make_offer(ACTOR_ID, "x", "y")
assert PUBLIC in o["to"]
def test_id_is_unique(self):
assert make_offer(ACTOR_ID, "x", "y")["id"] != make_offer(ACTOR_ID, "x", "y")["id"]
class TestMakeRequest:
def test_type_is_request(self):
assert make_request(ACTOR_ID, "Need a ladder", "Borrowing a ladder")["type"] == "Request"
def test_context_includes_cf_namespace(self):
r = make_request(ACTOR_ID, "Need", "Need something")
ctx = r["@context"]
assert isinstance(ctx, list)
assert any("circuitforge" in c for c in ctx)
def test_summary_stored(self):
r = make_request(ACTOR_ID, "Need a ladder", "...")
assert r["summary"] == "Need a ladder"
def test_actor_field_set(self):
assert make_request(ACTOR_ID, "x", "y")["actor"] == ACTOR_ID
def test_id_is_unique(self):
a = make_request(ACTOR_ID, "x", "y")
b = make_request(ACTOR_ID, "x", "y")
assert a["id"] != b["id"]
class TestMakeCreate:
def test_type_is_create(self, actor):
note = make_note(ACTOR_ID, "Hello")
c = make_create(actor, note)
assert c["type"] == "Create"
def test_actor_field_matches(self, actor):
note = make_note(ACTOR_ID, "Hello")
c = make_create(actor, note)
assert c["actor"] == ACTOR_ID
def test_object_is_inner_dict(self, actor):
note = make_note(ACTOR_ID, "Hello")
c = make_create(actor, note)
assert c["object"] is note
def test_to_propagated_from_object(self, actor):
note = make_note(ACTOR_ID, "Hello", to=["https://inbox.example/"])
c = make_create(actor, note)
assert "https://inbox.example/" in c["to"]
def test_published_propagated(self, actor):
note = make_note(ACTOR_ID, "Hello")
c = make_create(actor, note)
assert c["published"] == note["published"]
def test_id_is_unique(self, actor):
note = make_note(ACTOR_ID, "Hello")
a = make_create(actor, note)
b = make_create(actor, note)
assert a["id"] != b["id"]
def test_wraps_offer(self, actor):
offer = make_offer(ACTOR_ID, "Free apples", "Take some")
c = make_create(actor, offer)
assert c["object"]["type"] == "Offer"
def test_wraps_request(self, actor):
req = make_request(ACTOR_ID, "Need ladder", "...")
c = make_create(actor, req)
assert c["object"]["type"] == "Request"

View file

@ -0,0 +1,126 @@
"""Tests for HTTP Signature signing and verification (draft-cavage-http-signatures-08)."""
import pytest
from circuitforge_core.activitypub.actor import generate_rsa_keypair, make_actor
from circuitforge_core.activitypub.signing import sign_headers, verify_signature
ACTOR_ID = "https://kiwi.example.com/actors/kiwi"
INBOX_URL = "https://lemmy.example.com/inbox"
@pytest.fixture(scope="module")
def actor():
priv, pub = generate_rsa_keypair(bits=1024)
return make_actor(ACTOR_ID, "kiwi", "Kiwi", priv, pub)
class TestSignHeaders:
def test_date_header_added(self, actor):
headers = sign_headers("POST", INBOX_URL, {}, b"body", actor)
assert "Date" in headers
def test_host_header_added(self, actor):
headers = sign_headers("POST", INBOX_URL, {}, b"body", actor)
assert headers["Host"] == "lemmy.example.com"
def test_digest_header_added_when_body(self, actor):
headers = sign_headers("POST", INBOX_URL, {}, b"body content", actor)
assert "Digest" in headers
assert headers["Digest"].startswith("SHA-256=")
def test_digest_not_added_when_no_body(self, actor):
headers = sign_headers("GET", INBOX_URL, {}, None, actor)
assert "Digest" not in headers
def test_signature_header_present(self, actor):
headers = sign_headers("POST", INBOX_URL, {}, b"body", actor)
assert "Signature" in headers
def test_signature_contains_key_id(self, actor):
headers = sign_headers("POST", INBOX_URL, {}, b"body", actor)
assert f"{ACTOR_ID}#main-key" in headers["Signature"]
def test_signature_algorithm_rsa_sha256(self, actor):
headers = sign_headers("POST", INBOX_URL, {}, b"body", actor)
assert 'algorithm="rsa-sha256"' in headers["Signature"]
def test_does_not_mutate_input_headers(self, actor):
original = {"Content-Type": "application/json"}
sign_headers("POST", INBOX_URL, original, b"body", actor)
assert "Signature" not in original
def test_content_type_signed_when_present(self, actor):
headers = sign_headers("POST", INBOX_URL, {"Content-Type": "application/activity+json"}, b"x", actor)
assert "content-type" in headers["Signature"]
class TestVerifySignature:
def test_valid_signature_returns_true(self, actor):
body = b'{"type": "Create"}'
headers = sign_headers("POST", INBOX_URL, {"Content-Type": "application/activity+json"}, body, actor)
result = verify_signature(
headers=headers,
method="POST",
path="/inbox",
body=body,
public_key_pem=actor.public_key_pem,
)
assert result is True
def test_tampered_body_returns_false(self, actor):
body = b'{"type": "Create"}'
headers = sign_headers("POST", INBOX_URL, {"Content-Type": "application/activity+json"}, body, actor)
result = verify_signature(
headers=headers,
method="POST",
path="/inbox",
body=b"tampered body",
public_key_pem=actor.public_key_pem,
)
assert result is False
def test_wrong_public_key_returns_false(self, actor):
_, other_pub = generate_rsa_keypair(bits=1024)
body = b"hello"
headers = sign_headers("POST", INBOX_URL, {}, body, actor)
result = verify_signature(
headers=headers,
method="POST",
path="/inbox",
body=body,
public_key_pem=other_pub,
)
assert result is False
def test_missing_signature_header_returns_false(self, actor):
result = verify_signature(
headers={"Date": "Mon, 20 Apr 2026 12:00:00 GMT"},
method="POST",
path="/inbox",
body=b"body",
public_key_pem=actor.public_key_pem,
)
assert result is False
def test_bodyless_get_roundtrip(self, actor):
headers = sign_headers("GET", INBOX_URL, {}, None, actor)
result = verify_signature(
headers=headers,
method="GET",
path="/inbox",
body=None,
public_key_pem=actor.public_key_pem,
)
assert result is True
def test_wrong_method_fails_verification(self, actor):
body = b"data"
headers = sign_headers("POST", INBOX_URL, {}, body, actor)
# Verify with wrong method — (request-target) will differ
result = verify_signature(
headers=headers,
method="PUT",
path="/inbox",
body=body,
public_key_pem=actor.public_key_pem,
)
assert result is False