69 KiB
CircuitForge License Server — Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Build a self-hosted RS256 JWT licensing server for Circuit Forge LLC and wire Peregrine to validate licenses offline.
Architecture: Two work streams — (A) a new FastAPI + SQLite service (circuitforge-license) deployed on Heimdall via Docker + Caddy, and (B) a scripts/license.py client in Peregrine that activates against the server and verifies JWTs offline using an embedded public key. The server issues 30-day signed tokens; the client verifies signatures locally on every tier check with zero network calls during normal operation.
Tech Stack: FastAPI, PyJWT[crypto], Pydantic v2, SQLite, pytest, httpx (test client), cryptography (RSA key gen in tests), Docker Compose V2, Caddy.
Repos:
- License server dev:
/Library/Development/CircuitForge/circuitforge-license/→git.opensourcesolarpunk.com/pyr0ball/circuitforge-license - License server live (on Heimdall): cloned to
/devl/circuitforge-license/ - Peregrine client:
/Library/Development/devl/peregrine/ - Run tests:
/devl/miniconda3/envs/job-seeker/bin/pytest tests/ -v - Python env for local dev/test:
conda run -n job-seeker
PART A — License Server (new repo)
Task 1: Repo scaffold + DB schema
Files:
- Create:
/Library/Development/CircuitForge/circuitforge-license/(new directory) - Create:
requirements.txt - Create:
app/__init__.py - Create:
app/db.py - Create:
tests/__init__.py - Create:
tests/test_db.py - Create:
.gitignore
Step 1: Create the directory and git repo
mkdir -p /Library/Development/devl/circuitforge-license
cd /Library/Development/devl/circuitforge-license
git init
Step 2: Create .gitignore
# Secrets — never commit these
.env
keys/private.pem
data/
# Python
__pycache__/
*.pyc
.pytest_cache/
*.egg-info/
dist/
.coverage
htmlcov/
Step 3: Create requirements.txt
fastapi>=0.110
uvicorn[standard]>=0.27
pyjwt[crypto]>=2.8
pydantic>=2.0
python-dotenv>=1.0
pytest>=9.0
pytest-cov
httpx
cryptography>=42
Step 4: Create app/__init__.py (empty file)
Step 5: Write the failing test
# tests/test_db.py
import pytest
from pathlib import Path
from app.db import init_db, get_db
def test_init_db_creates_all_tables(tmp_path):
db = tmp_path / "test.db"
init_db(db)
with get_db(db) as conn:
tables = {row[0] for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()}
expected = {"license_keys", "activations", "usage_events", "flags", "audit_log"}
assert expected.issubset(tables)
def test_init_db_idempotent(tmp_path):
db = tmp_path / "test.db"
init_db(db)
init_db(db) # second call must not raise or corrupt
with get_db(db) as conn:
count = conn.execute("SELECT COUNT(*) FROM license_keys").fetchone()[0]
assert count == 0
Step 6: Run test to verify it fails
cd /Library/Development/devl/circuitforge-license
conda run -n job-seeker python -m pytest tests/test_db.py -v
Expected: FAILED — ModuleNotFoundError: No module named 'app'
Step 7: Write app/db.py
# app/db.py
import sqlite3
from contextlib import contextmanager
from pathlib import Path
DB_PATH = Path(__file__).parent.parent / "data" / "license.db"
_SCHEMA = """
CREATE TABLE IF NOT EXISTS license_keys (
id TEXT PRIMARY KEY,
key_display TEXT UNIQUE NOT NULL,
product TEXT NOT NULL,
tier TEXT NOT NULL,
seats INTEGER DEFAULT 1,
valid_until TEXT,
revoked INTEGER DEFAULT 0,
customer_email TEXT,
source TEXT DEFAULT 'manual',
trial INTEGER DEFAULT 0,
notes TEXT,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS activations (
id TEXT PRIMARY KEY,
key_id TEXT NOT NULL REFERENCES license_keys(id),
machine_id TEXT NOT NULL,
app_version TEXT,
platform TEXT,
activated_at TEXT NOT NULL,
last_refresh TEXT NOT NULL,
deactivated_at TEXT
);
CREATE TABLE IF NOT EXISTS usage_events (
id TEXT PRIMARY KEY,
key_id TEXT NOT NULL REFERENCES license_keys(id),
machine_id TEXT NOT NULL,
product TEXT NOT NULL,
event_type TEXT NOT NULL,
metadata TEXT,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS flags (
id TEXT PRIMARY KEY,
key_id TEXT NOT NULL REFERENCES license_keys(id),
machine_id TEXT,
product TEXT NOT NULL,
flag_type TEXT NOT NULL,
details TEXT,
status TEXT DEFAULT 'open',
created_at TEXT NOT NULL,
reviewed_at TEXT,
action_taken TEXT
);
CREATE TABLE IF NOT EXISTS audit_log (
id TEXT PRIMARY KEY,
entity_type TEXT NOT NULL,
entity_id TEXT NOT NULL,
action TEXT NOT NULL,
actor TEXT,
details TEXT,
created_at TEXT NOT NULL
);
"""
@contextmanager
def get_db(db_path: Path = DB_PATH):
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def init_db(db_path: Path = DB_PATH) -> None:
with get_db(db_path) as conn:
conn.executescript(_SCHEMA)
Step 8: Run test to verify it passes
conda run -n job-seeker python -m pytest tests/test_db.py -v
Expected: 2 passed
Step 9: Commit
cd /Library/Development/devl/circuitforge-license
git add -A
git commit -m "feat: repo scaffold, DB schema, init_db"
Task 2: Crypto module + test keypair fixture
Files:
- Create:
app/crypto.py - Create:
tests/conftest.py - Create:
tests/test_crypto.py - Create:
keys/(directory;public.pemcommitted later)
Step 1: Write the failing tests
# tests/test_crypto.py
import pytest
import jwt as pyjwt
from app.crypto import sign_jwt, verify_jwt
def test_sign_and_verify_roundtrip(test_keypair):
private_pem, public_pem = test_keypair
payload = {"sub": "CFG-PRNG-TEST", "product": "peregrine", "tier": "paid"}
token = sign_jwt(payload, private_pem=private_pem, expiry_days=30)
decoded = verify_jwt(token, public_pem=public_pem)
assert decoded["sub"] == "CFG-PRNG-TEST"
assert decoded["tier"] == "paid"
assert "exp" in decoded
assert "iat" in decoded
def test_verify_rejects_wrong_key(test_keypair):
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
private_pem, _ = test_keypair
other_private = rsa.generate_private_key(public_exponent=65537, key_size=2048)
other_public_pem = other_private.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
token = sign_jwt({"sub": "test"}, private_pem=private_pem, expiry_days=30)
with pytest.raises(pyjwt.exceptions.InvalidSignatureError):
verify_jwt(token, public_pem=other_public_pem)
def test_verify_rejects_expired_token(test_keypair):
private_pem, public_pem = test_keypair
token = sign_jwt({"sub": "test"}, private_pem=private_pem, expiry_days=-1)
with pytest.raises(pyjwt.exceptions.ExpiredSignatureError):
verify_jwt(token, public_pem=public_pem)
Step 2: Write tests/conftest.py
# tests/conftest.py
import pytest
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
@pytest.fixture(scope="session")
def test_keypair():
"""Generate a fresh RSA-2048 keypair for the test session."""
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
return private_pem, public_pem
Step 3: Run test to verify it fails
conda run -n job-seeker python -m pytest tests/test_crypto.py -v
Expected: FAILED — ModuleNotFoundError: No module named 'app.crypto'
Step 4: Write app/crypto.py
# app/crypto.py
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
import jwt as pyjwt
def _load_key(env_var: str, override: bytes | None) -> bytes:
if override is not None:
return override
path = Path(os.environ[env_var])
return path.read_bytes()
def sign_jwt(
payload: dict,
expiry_days: int | None = None,
private_pem: bytes | None = None,
) -> str:
if expiry_days is None:
expiry_days = int(os.environ.get("JWT_EXPIRY_DAYS", "30"))
now = datetime.now(timezone.utc)
full_payload = {
**payload,
"iat": now,
"exp": now + timedelta(days=expiry_days),
}
key = _load_key("JWT_PRIVATE_KEY_PATH", private_pem)
return pyjwt.encode(full_payload, key, algorithm="RS256")
def verify_jwt(token: str, public_pem: bytes | None = None) -> dict:
"""Verify RS256 JWT and return decoded payload. Raises on invalid/expired."""
key = _load_key("JWT_PUBLIC_KEY_PATH", public_pem)
return pyjwt.decode(token, key, algorithms=["RS256"])
Step 5: Run test to verify it passes
conda run -n job-seeker python -m pytest tests/test_crypto.py -v
Expected: 3 passed
Step 6: Commit
git add -A
git commit -m "feat: crypto module — RS256 sign/verify with test keypair fixture"
Task 3: Pydantic models
Files:
- Create:
app/models.py - Create:
tests/test_models.py
Step 1: Write the failing test
# tests/test_models.py
from app.models import (
ActivateRequest, ActivateResponse,
RefreshRequest, DeactivateRequest,
UsageRequest, FlagRequest,
CreateKeyRequest,
)
def test_activate_request_requires_key_machine_product():
req = ActivateRequest(key="CFG-PRNG-A1B2-C3D4-E5F6",
machine_id="abc123", product="peregrine")
assert req.key == "CFG-PRNG-A1B2-C3D4-E5F6"
assert req.app_version is None
assert req.platform is None
def test_create_key_request_defaults():
req = CreateKeyRequest(product="peregrine", tier="paid")
assert req.seats == 1
assert req.source == "manual"
assert req.trial is False
assert req.valid_until is None
Step 2: Run to verify failure
conda run -n job-seeker python -m pytest tests/test_models.py -v
Expected: FAILED — ModuleNotFoundError: No module named 'app.models'
Step 3: Write app/models.py
# app/models.py
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel
class ActivateRequest(BaseModel):
key: str
machine_id: str
product: str
app_version: Optional[str] = None
platform: Optional[str] = None
class ActivateResponse(BaseModel):
jwt: str
tier: str
valid_until: Optional[str] = None
notice: Optional[str] = None
class RefreshRequest(BaseModel):
jwt: str
machine_id: str
app_version: Optional[str] = None
platform: Optional[str] = None
class DeactivateRequest(BaseModel):
jwt: str
machine_id: str
class UsageRequest(BaseModel):
event_type: str
product: str
metadata: Optional[dict] = None
class FlagRequest(BaseModel):
flag_type: str
product: str
details: Optional[dict] = None
class CreateKeyRequest(BaseModel):
product: str
tier: str
seats: int = 1
valid_until: Optional[str] = None
customer_email: Optional[str] = None
source: str = "manual"
trial: bool = False
notes: Optional[str] = None
class KeyResponse(BaseModel):
id: str
key_display: str
product: str
tier: str
seats: int
valid_until: Optional[str]
revoked: bool
customer_email: Optional[str]
source: str
trial: bool
notes: Optional[str]
created_at: str
active_seat_count: int = 0
class FlagUpdateRequest(BaseModel):
status: str # reviewed | dismissed | actioned
action_taken: Optional[str] = None # none | warned | revoked
Step 4: Run to verify it passes
conda run -n job-seeker python -m pytest tests/test_models.py -v
Expected: 2 passed
Step 5: Commit
git add -A
git commit -m "feat: Pydantic v2 request/response models"
Task 4: Public routes — activate, refresh, deactivate
Files:
- Create:
app/routes/__init__.py(empty) - Create:
app/routes/public.py - Create:
tests/test_public_routes.py
Step 1: Write failing tests
# tests/test_public_routes.py
import json
import pytest
from fastapi.testclient import TestClient
from app.main import create_app
from app.db import init_db
@pytest.fixture()
def client(tmp_path, test_keypair, monkeypatch):
db = tmp_path / "test.db"
private_pem, public_pem = test_keypair
# Write keys to tmp files
(tmp_path / "private.pem").write_bytes(private_pem)
(tmp_path / "public.pem").write_bytes(public_pem)
monkeypatch.setenv("JWT_PRIVATE_KEY_PATH", str(tmp_path / "private.pem"))
monkeypatch.setenv("JWT_PUBLIC_KEY_PATH", str(tmp_path / "public.pem"))
monkeypatch.setenv("JWT_EXPIRY_DAYS", "30")
monkeypatch.setenv("GRACE_PERIOD_DAYS", "7")
monkeypatch.setenv("ADMIN_TOKEN", "test-admin-token")
monkeypatch.setenv("SERVER_NOTICE", "")
init_db(db)
app = create_app(db_path=db)
return TestClient(app)
@pytest.fixture()
def active_key(client):
"""Create a paid key via admin API, return key_display."""
resp = client.post("/admin/keys", json={
"product": "peregrine", "tier": "paid", "seats": 2,
"customer_email": "test@example.com",
}, headers={"Authorization": "Bearer test-admin-token"})
assert resp.status_code == 200
return resp.json()["key_display"]
def test_activate_returns_jwt(client, active_key):
resp = client.post("/v1/activate", json={
"key": active_key, "machine_id": "machine-1", "product": "peregrine",
"platform": "linux", "app_version": "1.0.0",
})
assert resp.status_code == 200
data = resp.json()
assert "jwt" in data
assert data["tier"] == "paid"
def test_activate_same_machine_twice_ok(client, active_key):
payload = {"key": active_key, "machine_id": "machine-1", "product": "peregrine"}
resp1 = client.post("/v1/activate", json=payload)
resp2 = client.post("/v1/activate", json=payload)
assert resp1.status_code == 200
assert resp2.status_code == 200
def test_activate_seat_limit_enforced(client, active_key):
# seats=2, so machine-1 and machine-2 OK, machine-3 rejected
for mid in ["machine-1", "machine-2"]:
r = client.post("/v1/activate", json={
"key": active_key, "machine_id": mid, "product": "peregrine"
})
assert r.status_code == 200
r3 = client.post("/v1/activate", json={
"key": active_key, "machine_id": "machine-3", "product": "peregrine"
})
assert r3.status_code == 409
def test_activate_invalid_key_rejected(client):
resp = client.post("/v1/activate", json={
"key": "CFG-PRNG-FAKE-FAKE-FAKE", "machine_id": "m1", "product": "peregrine"
})
assert resp.status_code == 403
def test_activate_wrong_product_rejected(client, active_key):
resp = client.post("/v1/activate", json={
"key": active_key, "machine_id": "m1", "product": "falcon"
})
assert resp.status_code == 403
def test_refresh_returns_new_jwt(client, active_key):
act = client.post("/v1/activate", json={
"key": active_key, "machine_id": "m1", "product": "peregrine"
})
old_jwt = act.json()["jwt"]
resp = client.post("/v1/refresh", json={"jwt": old_jwt, "machine_id": "m1"})
assert resp.status_code == 200
assert "jwt" in resp.json()
def test_deactivate_frees_seat(client, active_key):
# Fill both seats
for mid in ["machine-1", "machine-2"]:
client.post("/v1/activate", json={
"key": active_key, "machine_id": mid, "product": "peregrine"
})
# Deactivate machine-1
act = client.post("/v1/activate", json={
"key": active_key, "machine_id": "machine-1", "product": "peregrine"
})
token = act.json()["jwt"]
deact = client.post("/v1/deactivate", json={"jwt": token, "machine_id": "machine-1"})
assert deact.status_code == 200
# Now machine-3 can activate
r3 = client.post("/v1/activate", json={
"key": active_key, "machine_id": "machine-3", "product": "peregrine"
})
assert r3.status_code == 200
Step 2: Run to verify failure
conda run -n job-seeker python -m pytest tests/test_public_routes.py -v
Expected: FAILED — ModuleNotFoundError: No module named 'app.main'
Step 3: Write app/routes/__init__.py (empty)
Step 4: Write app/routes/public.py
# app/routes/public.py
import json
import os
import uuid
from datetime import datetime, timezone
import jwt as pyjwt
from fastapi import APIRouter, Depends, HTTPException
from app.crypto import sign_jwt, verify_jwt
from app.db import get_db
from app.models import (
ActivateRequest, ActivateResponse,
RefreshRequest, DeactivateRequest,
UsageRequest, FlagRequest,
)
router = APIRouter()
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _get_key_row(conn, key_display: str, product: str):
row = conn.execute(
"SELECT * FROM license_keys WHERE key_display=? AND product=?",
(key_display, product),
).fetchone()
if not row or row["revoked"]:
raise HTTPException(status_code=403, detail="Invalid or revoked license key")
if row["valid_until"] and row["valid_until"] < datetime.now(timezone.utc).date().isoformat():
raise HTTPException(status_code=403, detail="License key expired")
return row
def _build_jwt(key_row, machine_id: str) -> str:
notice = os.environ.get("SERVER_NOTICE", "")
payload = {
"sub": key_row["key_display"],
"product": key_row["product"],
"tier": key_row["tier"],
"seats": key_row["seats"],
"machine": machine_id,
}
if notice:
payload["notice"] = notice
return sign_jwt(payload)
def _audit(conn, entity_type: str, entity_id: str, action: str, details: dict | None = None):
conn.execute(
"INSERT INTO audit_log (id, entity_type, entity_id, action, details, created_at) "
"VALUES (?,?,?,?,?,?)",
(str(uuid.uuid4()), entity_type, entity_id, action,
json.dumps(details) if details else None, _now()),
)
@router.post("/activate", response_model=ActivateResponse)
def activate(req: ActivateRequest, db_path=Depends(lambda: None)):
from app.routes._db_dep import get_db_path
with get_db(get_db_path()) as conn:
key_row = _get_key_row(conn, req.key, req.product)
# Count active seats, excluding this machine
active_seats = conn.execute(
"SELECT COUNT(*) FROM activations "
"WHERE key_id=? AND deactivated_at IS NULL AND machine_id!=?",
(key_row["id"], req.machine_id),
).fetchone()[0]
existing = conn.execute(
"SELECT * FROM activations WHERE key_id=? AND machine_id=?",
(key_row["id"], req.machine_id),
).fetchone()
if not existing and active_seats >= key_row["seats"]:
raise HTTPException(status_code=409, detail=f"Seat limit reached ({key_row['seats']} seats)")
now = _now()
if existing:
conn.execute(
"UPDATE activations SET last_refresh=?, app_version=?, platform=?, "
"deactivated_at=NULL WHERE id=?",
(now, req.app_version, req.platform, existing["id"]),
)
activation_id = existing["id"]
else:
activation_id = str(uuid.uuid4())
conn.execute(
"INSERT INTO activations (id, key_id, machine_id, app_version, platform, "
"activated_at, last_refresh) VALUES (?,?,?,?,?,?,?)",
(activation_id, key_row["id"], req.machine_id,
req.app_version, req.platform, now, now),
)
_audit(conn, "activation", activation_id, "activated", {"machine_id": req.machine_id})
token = _build_jwt(key_row, req.machine_id)
notice = os.environ.get("SERVER_NOTICE") or None
return ActivateResponse(jwt=token, tier=key_row["tier"],
valid_until=key_row["valid_until"], notice=notice)
@router.post("/refresh", response_model=ActivateResponse)
def refresh(req: RefreshRequest, db_path=Depends(lambda: None)):
from app.routes._db_dep import get_db_path
# Decode without expiry check so we can refresh near-expired tokens
try:
payload = verify_jwt(req.jwt)
except pyjwt.exceptions.ExpiredSignatureError:
# Allow refresh of just-expired tokens
payload = pyjwt.decode(req.jwt, options={"verify_exp": False,
"verify_signature": False})
except pyjwt.exceptions.InvalidTokenError as e:
raise HTTPException(status_code=403, detail=str(e))
with get_db(get_db_path()) as conn:
key_row = _get_key_row(conn, payload.get("sub", ""), payload.get("product", ""))
existing = conn.execute(
"SELECT * FROM activations WHERE key_id=? AND machine_id=? AND deactivated_at IS NULL",
(key_row["id"], req.machine_id),
).fetchone()
if not existing:
raise HTTPException(status_code=403, detail="Machine not registered for this key")
now = _now()
conn.execute(
"UPDATE activations SET last_refresh=?, app_version=? WHERE id=?",
(now, req.app_version or existing["app_version"], existing["id"]),
)
_audit(conn, "activation", existing["id"], "refreshed", {"machine_id": req.machine_id})
token = _build_jwt(key_row, req.machine_id)
notice = os.environ.get("SERVER_NOTICE") or None
return ActivateResponse(jwt=token, tier=key_row["tier"],
valid_until=key_row["valid_until"], notice=notice)
@router.post("/deactivate")
def deactivate(req: DeactivateRequest):
from app.routes._db_dep import get_db_path
try:
payload = verify_jwt(req.jwt)
except pyjwt.exceptions.PyJWTError as e:
raise HTTPException(status_code=403, detail=str(e))
with get_db(get_db_path()) as conn:
existing = conn.execute(
"SELECT a.id FROM activations a "
"JOIN license_keys k ON k.id=a.key_id "
"WHERE k.key_display=? AND a.machine_id=? AND a.deactivated_at IS NULL",
(payload.get("sub", ""), req.machine_id),
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail="No active seat found")
now = _now()
conn.execute("UPDATE activations SET deactivated_at=? WHERE id=?",
(now, existing["id"]))
_audit(conn, "activation", existing["id"], "deactivated", {"machine_id": req.machine_id})
return {"status": "deactivated"}
Step 5: Write app/routes/_db_dep.py (module-level DB path holder, allows test injection)
# app/routes/_db_dep.py
from pathlib import Path
from app.db import DB_PATH
_db_path: Path = DB_PATH
def set_db_path(p: Path) -> None:
global _db_path
_db_path = p
def get_db_path() -> Path:
return _db_path
Step 6: Write app/main.py (minimal, enough for tests)
# app/main.py
from pathlib import Path
from fastapi import FastAPI
from app.db import init_db, DB_PATH
from app.routes import public, admin
from app.routes._db_dep import set_db_path
def create_app(db_path: Path = DB_PATH) -> FastAPI:
set_db_path(db_path)
init_db(db_path)
app = FastAPI(title="CircuitForge License Server", version="1.0.0")
app.include_router(public.router, prefix="/v1")
app.include_router(admin.router, prefix="/admin")
return app
app = create_app()
Step 7: Write minimal app/routes/admin.py (enough for active_key fixture to work)
# app/routes/admin.py — skeleton; full implementation in Task 5
import os
import uuid
import secrets
import string
from datetime import datetime, timezone
from fastapi import APIRouter, HTTPException, Header
from app.db import get_db
from app.models import CreateKeyRequest, KeyResponse
from app.routes._db_dep import get_db_path
router = APIRouter()
def _require_admin(authorization: str = Header(...)):
expected = f"Bearer {os.environ.get('ADMIN_TOKEN', '')}"
if authorization != expected:
raise HTTPException(status_code=401, detail="Unauthorized")
def _gen_key_display(product: str) -> str:
codes = {"peregrine": "PRNG", "falcon": "FLCN", "osprey": "OSPY",
"kestrel": "KSTR", "harrier": "HARR", "merlin": "MRLN",
"ibis": "IBIS", "tern": "TERN", "wren": "WREN", "martin": "MRTN"}
code = codes.get(product, product[:4].upper())
chars = string.ascii_uppercase + string.digits
segs = [secrets.choice(chars) + secrets.choice(chars) +
secrets.choice(chars) + secrets.choice(chars) for _ in range(3)]
return f"CFG-{code}-{segs[0]}-{segs[1]}-{segs[2]}"
@router.post("/keys", response_model=KeyResponse)
def create_key(req: CreateKeyRequest, authorization: str = Header(...)):
_require_admin(authorization)
with get_db(get_db_path()) as conn:
key_id = str(uuid.uuid4())
key_display = _gen_key_display(req.product)
now = datetime.now(timezone.utc).isoformat()
conn.execute(
"INSERT INTO license_keys (id, key_display, product, tier, seats, valid_until, "
"customer_email, source, trial, notes, created_at) VALUES (?,?,?,?,?,?,?,?,?,?,?)",
(key_id, key_display, req.product, req.tier, req.seats, req.valid_until,
req.customer_email, req.source, 1 if req.trial else 0, req.notes, now),
)
return KeyResponse(id=key_id, key_display=key_display, product=req.product,
tier=req.tier, seats=req.seats, valid_until=req.valid_until,
revoked=False, customer_email=req.customer_email,
source=req.source, trial=req.trial, notes=req.notes,
created_at=now, active_seat_count=0)
Step 8: Fix test client fixture — remove the broken Depends in activate and use _db_dep properly. Update tests/test_public_routes.py fixture to call set_db_path:
# Update the client fixture in tests/test_public_routes.py
@pytest.fixture()
def client(tmp_path, test_keypair, monkeypatch):
db = tmp_path / "test.db"
private_pem, public_pem = test_keypair
(tmp_path / "private.pem").write_bytes(private_pem)
(tmp_path / "public.pem").write_bytes(public_pem)
monkeypatch.setenv("JWT_PRIVATE_KEY_PATH", str(tmp_path / "private.pem"))
monkeypatch.setenv("JWT_PUBLIC_KEY_PATH", str(tmp_path / "public.pem"))
monkeypatch.setenv("JWT_EXPIRY_DAYS", "30")
monkeypatch.setenv("GRACE_PERIOD_DAYS", "7")
monkeypatch.setenv("ADMIN_TOKEN", "test-admin-token")
monkeypatch.setenv("SERVER_NOTICE", "")
from app.routes._db_dep import set_db_path
set_db_path(db)
from app.main import create_app
init_db(db)
app = create_app(db_path=db)
return TestClient(app)
Also remove the broken db_path=Depends(lambda: None) from route functions — they should call get_db_path() directly (already done in the implementation above).
Step 9: Run tests to verify they pass
conda run -n job-seeker python -m pytest tests/test_public_routes.py -v
Expected: 7 passed
Step 10: Commit
git add -A
git commit -m "feat: public routes — activate, refresh, deactivate with seat enforcement"
Task 5: Public routes — usage + flag; Admin routes
Files:
- Modify:
app/routes/public.py(add/usage,/flag) - Modify:
app/routes/admin.py(add list, delete, activations, usage, flags endpoints) - Modify:
tests/test_public_routes.py(add usage/flag tests) - Create:
tests/test_admin_routes.py
Step 1: Add usage/flag tests to tests/test_public_routes.py
def test_usage_event_recorded(client, active_key):
act = client.post("/v1/activate", json={
"key": active_key, "machine_id": "m1", "product": "peregrine"
})
token = act.json()["jwt"]
resp = client.post("/v1/usage", json={
"event_type": "cover_letter_generated",
"product": "peregrine",
"metadata": {"job_id": 42},
}, headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
def test_flag_recorded(client, active_key):
act = client.post("/v1/activate", json={
"key": active_key, "machine_id": "m1", "product": "peregrine"
})
token = act.json()["jwt"]
resp = client.post("/v1/flag", json={
"flag_type": "content_violation",
"product": "peregrine",
"details": {"prompt_snippet": "test"},
}, headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
def test_usage_with_invalid_jwt_rejected(client):
resp = client.post("/v1/usage", json={
"event_type": "test", "product": "peregrine"
}, headers={"Authorization": "Bearer not-a-jwt"})
assert resp.status_code == 403
Step 2: Write tests/test_admin_routes.py
# tests/test_admin_routes.py
import pytest
from fastapi.testclient import TestClient
from app.main import create_app
from app.db import init_db
from app.routes._db_dep import set_db_path
ADMIN_HDR = {"Authorization": "Bearer test-admin-token"}
@pytest.fixture()
def client(tmp_path, test_keypair, monkeypatch):
db = tmp_path / "test.db"
private_pem, public_pem = test_keypair
(tmp_path / "private.pem").write_bytes(private_pem)
(tmp_path / "public.pem").write_bytes(public_pem)
monkeypatch.setenv("JWT_PRIVATE_KEY_PATH", str(tmp_path / "private.pem"))
monkeypatch.setenv("JWT_PUBLIC_KEY_PATH", str(tmp_path / "public.pem"))
monkeypatch.setenv("JWT_EXPIRY_DAYS", "30")
monkeypatch.setenv("ADMIN_TOKEN", "test-admin-token")
monkeypatch.setenv("SERVER_NOTICE", "")
set_db_path(db)
init_db(db)
return TestClient(create_app(db_path=db))
def test_create_key_returns_display(client):
resp = client.post("/admin/keys", json={
"product": "peregrine", "tier": "paid"
}, headers=ADMIN_HDR)
assert resp.status_code == 200
assert resp.json()["key_display"].startswith("CFG-PRNG-")
def test_list_keys(client):
client.post("/admin/keys", json={"product": "peregrine", "tier": "paid"},
headers=ADMIN_HDR)
resp = client.get("/admin/keys", headers=ADMIN_HDR)
assert resp.status_code == 200
assert len(resp.json()) == 1
def test_revoke_key(client):
create = client.post("/admin/keys", json={"product": "peregrine", "tier": "paid"},
headers=ADMIN_HDR)
key_id = create.json()["id"]
resp = client.delete(f"/admin/keys/{key_id}", headers=ADMIN_HDR)
assert resp.status_code == 200
# Activation should now fail
key_display = create.json()["key_display"]
act = client.post("/v1/activate", json={
"key": key_display, "machine_id": "m1", "product": "peregrine"
})
assert act.status_code == 403
def test_admin_requires_token(client):
resp = client.get("/admin/keys", headers={"Authorization": "Bearer wrong"})
assert resp.status_code == 401
def test_admin_usage_returns_events(client):
# Create key, activate, report usage
create = client.post("/admin/keys", json={"product": "peregrine", "tier": "paid"},
headers=ADMIN_HDR)
key_display = create.json()["key_display"]
act = client.post("/v1/activate", json={
"key": key_display, "machine_id": "m1", "product": "peregrine"
})
token = act.json()["jwt"]
client.post("/v1/usage", json={"event_type": "cover_letter_generated",
"product": "peregrine"},
headers={"Authorization": f"Bearer {token}"})
resp = client.get("/admin/usage", headers=ADMIN_HDR)
assert resp.status_code == 200
assert len(resp.json()) >= 1
def test_admin_flags_returns_list(client):
create = client.post("/admin/keys", json={"product": "peregrine", "tier": "paid"},
headers=ADMIN_HDR)
key_display = create.json()["key_display"]
act = client.post("/v1/activate", json={
"key": key_display, "machine_id": "m1", "product": "peregrine"
})
token = act.json()["jwt"]
client.post("/v1/flag", json={"flag_type": "content_violation", "product": "peregrine"},
headers={"Authorization": f"Bearer {token}"})
resp = client.get("/admin/flags", headers=ADMIN_HDR)
assert resp.status_code == 200
flags = resp.json()
assert len(flags) == 1
assert flags[0]["status"] == "open"
Step 3: Run to verify failure
conda run -n job-seeker python -m pytest tests/test_public_routes.py tests/test_admin_routes.py -v
Expected: failures on new tests
Step 4: Add /usage and /flag to app/routes/public.py
# Add these imports at top of public.py
import json as _json
from fastapi import Header
# Add to router (append after deactivate):
def _jwt_bearer(authorization: str = Header(...)) -> dict:
try:
token = authorization.removeprefix("Bearer ")
return verify_jwt(token)
except pyjwt.exceptions.PyJWTError as e:
raise HTTPException(status_code=403, detail=str(e))
@router.post("/usage")
def record_usage(req: UsageRequest, payload: dict = Depends(_jwt_bearer)):
from app.routes._db_dep import get_db_path
with get_db(get_db_path()) as conn:
key_row = conn.execute(
"SELECT id FROM license_keys WHERE key_display=?",
(payload.get("sub", ""),),
).fetchone()
if not key_row:
raise HTTPException(status_code=403, detail="Key not found")
conn.execute(
"INSERT INTO usage_events (id, key_id, machine_id, product, event_type, metadata, created_at) "
"VALUES (?,?,?,?,?,?,?)",
(str(uuid.uuid4()), key_row["id"], payload.get("machine", ""),
req.product, req.event_type,
_json.dumps(req.metadata) if req.metadata else None, _now()),
)
return {"status": "recorded"}
@router.post("/flag")
def record_flag(req: FlagRequest, payload: dict = Depends(_jwt_bearer)):
from app.routes._db_dep import get_db_path
with get_db(get_db_path()) as conn:
key_row = conn.execute(
"SELECT id FROM license_keys WHERE key_display=?",
(payload.get("sub", ""),),
).fetchone()
if not key_row:
raise HTTPException(status_code=403, detail="Key not found")
conn.execute(
"INSERT INTO flags (id, key_id, machine_id, product, flag_type, details, created_at) "
"VALUES (?,?,?,?,?,?,?)",
(str(uuid.uuid4()), key_row["id"], payload.get("machine", ""),
req.product, req.flag_type,
_json.dumps(req.details) if req.details else None, _now()),
)
return {"status": "flagged"}
Step 5: Complete app/routes/admin.py — add GET keys, DELETE, activations, usage, flags, PATCH flag:
# Append to app/routes/admin.py
@router.get("/keys")
def list_keys(authorization: str = Header(...)):
_require_admin(authorization)
with get_db(get_db_path()) as conn:
rows = conn.execute("SELECT * FROM license_keys ORDER BY created_at DESC").fetchall()
result = []
for row in rows:
seat_count = conn.execute(
"SELECT COUNT(*) FROM activations WHERE key_id=? AND deactivated_at IS NULL",
(row["id"],),
).fetchone()[0]
result.append({**dict(row), "active_seat_count": seat_count, "revoked": bool(row["revoked"])})
return result
@router.delete("/keys/{key_id}")
def revoke_key(key_id: str, authorization: str = Header(...)):
_require_admin(authorization)
with get_db(get_db_path()) as conn:
row = conn.execute("SELECT id FROM license_keys WHERE id=?", (key_id,)).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Key not found")
now = datetime.now(timezone.utc).isoformat()
conn.execute("UPDATE license_keys SET revoked=1 WHERE id=?", (key_id,))
conn.execute(
"INSERT INTO audit_log (id, entity_type, entity_id, action, created_at) "
"VALUES (?,?,?,?,?)",
(str(uuid.uuid4()), "key", key_id, "revoked", now),
)
return {"status": "revoked"}
@router.get("/activations")
def list_activations(authorization: str = Header(...)):
_require_admin(authorization)
with get_db(get_db_path()) as conn:
rows = conn.execute(
"SELECT a.*, k.key_display, k.product FROM activations a "
"JOIN license_keys k ON k.id=a.key_id ORDER BY a.activated_at DESC"
).fetchall()
return [dict(r) for r in rows]
@router.get("/usage")
def list_usage(key_id: str | None = None, authorization: str = Header(...)):
_require_admin(authorization)
with get_db(get_db_path()) as conn:
if key_id:
rows = conn.execute(
"SELECT * FROM usage_events WHERE key_id=? ORDER BY created_at DESC",
(key_id,),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM usage_events ORDER BY created_at DESC LIMIT 500"
).fetchall()
return [dict(r) for r in rows]
@router.get("/flags")
def list_flags(status: str = "open", authorization: str = Header(...)):
_require_admin(authorization)
with get_db(get_db_path()) as conn:
rows = conn.execute(
"SELECT * FROM flags WHERE status=? ORDER BY created_at DESC", (status,)
).fetchall()
return [dict(r) for r in rows]
@router.patch("/flags/{flag_id}")
def update_flag(flag_id: str, req: "FlagUpdateRequest", authorization: str = Header(...)):
from app.models import FlagUpdateRequest as FUR
_require_admin(authorization)
with get_db(get_db_path()) as conn:
row = conn.execute("SELECT id FROM flags WHERE id=?", (flag_id,)).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Flag not found")
now = datetime.now(timezone.utc).isoformat()
conn.execute(
"UPDATE flags SET status=?, action_taken=?, reviewed_at=? WHERE id=?",
(req.status, req.action_taken, now, flag_id),
)
conn.execute(
"INSERT INTO audit_log (id, entity_type, entity_id, action, created_at) "
"VALUES (?,?,?,?,?)",
(str(uuid.uuid4()), "flag", flag_id, f"flag_{req.status}", now),
)
return {"status": "updated"}
Add from app.models import FlagUpdateRequest to the imports at top of admin.py.
Step 6: Run all server tests
conda run -n job-seeker python -m pytest tests/ -v
Expected: all tests pass
Step 7: Commit
git add -A
git commit -m "feat: usage/flag endpoints + complete admin CRUD"
Task 6: Docker + infrastructure files
Files:
- Create:
Dockerfile - Create:
docker-compose.yml - Create:
.env.example - Create:
keys/README.md
Step 1: Write Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app/ ./app/
EXPOSE 8600
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8600", "--workers", "1"]
Step 2: Write docker-compose.yml
services:
license:
build: .
restart: unless-stopped
ports:
- "127.0.0.1:8600:8600"
volumes:
- license_data:/app/data
- ./keys:/app/keys:ro
env_file: .env
volumes:
license_data:
Step 3: Write .env.example
# Copy to .env and fill in values — never commit .env
ADMIN_TOKEN=replace-with-long-random-string
JWT_PRIVATE_KEY_PATH=/app/keys/private.pem
JWT_PUBLIC_KEY_PATH=/app/keys/public.pem
JWT_EXPIRY_DAYS=30
GRACE_PERIOD_DAYS=7
# Optional: shown to users as a banner on next JWT refresh
SERVER_NOTICE=
Step 4: Write keys/README.md
# Keys
Generate the RSA keypair once on the server, then copy `public.pem` into the Peregrine repo.
```bash
openssl genrsa -out private.pem 2048
openssl rsa -in private.pem -pubout -out public.pem
private.pem— NEVER commit. Stays on Heimdall only.public.pem— committed to this repo AND toperegrine/scripts/license_public_key.pem.
**Step 5: Write `scripts/issue-key.sh`**
```bash
#!/usr/bin/env bash
# scripts/issue-key.sh — Issue a CircuitForge license key
# Usage: ./scripts/issue-key.sh [--product peregrine] [--tier paid] [--seats 2]
# [--email user@example.com] [--notes "Beta user"]
# [--trial] [--valid-until 2027-01-01]
set -euo pipefail
SERVER="${LICENSE_SERVER:-https://license.circuitforge.com}"
TOKEN="${ADMIN_TOKEN:-}"
if [[ -z "$TOKEN" ]]; then
echo "Error: set ADMIN_TOKEN env var" >&2
exit 1
fi
PRODUCT="peregrine"
TIER="paid"
SEATS=1
EMAIL=""
NOTES=""
TRIAL="false"
VALID_UNTIL="null"
while [[ $# -gt 0 ]]; do
case "$1" in
--product) PRODUCT="$2"; shift 2 ;;
--tier) TIER="$2"; shift 2 ;;
--seats) SEATS="$2"; shift 2 ;;
--email) EMAIL="$2"; shift 2 ;;
--notes) NOTES="$2"; shift 2 ;;
--trial) TRIAL="true"; shift 1 ;;
--valid-until) VALID_UNTIL="\"$2\""; shift 2 ;;
*) echo "Unknown arg: $1" >&2; exit 1 ;;
esac
done
EMAIL_JSON=$([ -n "$EMAIL" ] && echo "\"$EMAIL\"" || echo "null")
NOTES_JSON=$([ -n "$NOTES" ] && echo "\"$NOTES\"" || echo "null")
curl -s -X POST "$SERVER/admin/keys" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d "{
\"product\": \"$PRODUCT\",
\"tier\": \"$TIER\",
\"seats\": $SEATS,
\"valid_until\": $VALID_UNTIL,
\"customer_email\": $EMAIL_JSON,
\"source\": \"manual\",
\"trial\": $TRIAL,
\"notes\": $NOTES_JSON
}" | python3 -c "
import json, sys
data = json.load(sys.stdin)
if 'key_display' in data:
print(f'Key: {data[\"key_display\"]}')
print(f'ID: {data[\"id\"]}')
print(f'Tier: {data[\"tier\"]} ({data[\"seats\"]} seat(s))')
else:
print('Error:', json.dumps(data, indent=2))
"
chmod +x scripts/issue-key.sh
Step 6: Commit
git add -A
git commit -m "feat: Dockerfile, docker-compose.yml, .env.example, issue-key.sh"
Task 7: Init Forgejo repo + push
Step 1: Create repo on Forgejo
Using gh CLI configured for your Forgejo instance, or via the web UI at https://git.opensourcesolarpunk.com. Create a private repo named circuitforge-license under the pyr0ball user.
# If gh is configured for Forgejo:
gh repo create pyr0ball/circuitforge-license --private \
--gitea-url https://git.opensourcesolarpunk.com
# Or create manually at https://git.opensourcesolarpunk.com and add remote:
cd /Library/Development/devl/circuitforge-license
git remote add origin https://git.opensourcesolarpunk.com/pyr0ball/circuitforge-license.git
Step 2: Push
git push -u origin main
Step 3: Generate real keypair on Heimdall (do once, after deployment)
# SSH to Heimdall or run locally — keys go in circuitforge-license/keys/
mkdir -p /Library/Development/CircuitForge/circuitforge-license/keys
cd /Library/Development/CircuitForge/circuitforge-license/keys
openssl genrsa -out private.pem 2048
openssl rsa -in private.pem -pubout -out public.pem
git add public.pem
git commit -m "chore: add RSA public key"
git push
PART B — Peregrine Client Integration
Working directory for all Part B tasks: /Library/Development/devl/peregrine/
Run tests: /devl/miniconda3/envs/job-seeker/bin/pytest tests/ -v
Task 8: scripts/license.py + public key
Files:
- Create:
scripts/license_public_key.pem(copy from license serverkeys/public.pem) - Create:
scripts/license.py - Create:
tests/test_license.py
Step 1: Copy the public key
cp /Library/Development/CircuitForge/circuitforge-license/keys/public.pem \
/Library/Development/devl/peregrine/scripts/license_public_key.pem
Step 2: Write failing tests
# tests/test_license.py
import json
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
import jwt as pyjwt
from datetime import datetime, timedelta, timezone
@pytest.fixture()
def test_keys(tmp_path):
"""Generate test RSA keypair and return (private_pem, public_pem, public_path)."""
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
public_path = tmp_path / "test_public.pem"
public_path.write_bytes(public_pem)
return private_pem, public_pem, public_path
def _make_jwt(private_pem: bytes, tier: str = "paid",
product: str = "peregrine",
exp_delta_days: int = 30,
machine: str = "test-machine") -> str:
now = datetime.now(timezone.utc)
payload = {
"sub": "CFG-PRNG-TEST-TEST-TEST",
"product": product,
"tier": tier,
"seats": 1,
"machine": machine,
"iat": now,
"exp": now + timedelta(days=exp_delta_days),
}
return pyjwt.encode(payload, private_pem, algorithm="RS256")
def _write_license(tmp_path, jwt_token: str, grace_until: str | None = None) -> Path:
data = {
"jwt": jwt_token,
"key_display": "CFG-PRNG-TEST-TEST-TEST",
"tier": "paid",
"valid_until": None,
"machine_id": "test-machine",
"last_refresh": datetime.now(timezone.utc).isoformat(),
"grace_until": grace_until,
}
p = tmp_path / "license.json"
p.write_text(json.dumps(data))
return p
class TestVerifyLocal:
def test_valid_jwt_returns_tier(self, test_keys, tmp_path):
private_pem, _, public_path = test_keys
token = _make_jwt(private_pem)
license_path = _write_license(tmp_path, token)
from scripts.license import verify_local
result = verify_local(license_path=license_path, public_key_path=public_path)
assert result is not None
assert result["tier"] == "paid"
def test_missing_file_returns_none(self, tmp_path):
from scripts.license import verify_local
result = verify_local(license_path=tmp_path / "missing.json",
public_key_path=tmp_path / "key.pem")
assert result is None
def test_wrong_product_returns_none(self, test_keys, tmp_path):
private_pem, _, public_path = test_keys
token = _make_jwt(private_pem, product="falcon")
license_path = _write_license(tmp_path, token)
from scripts.license import verify_local
result = verify_local(license_path=license_path, public_key_path=public_path)
assert result is None
def test_expired_within_grace_returns_tier(self, test_keys, tmp_path):
private_pem, _, public_path = test_keys
token = _make_jwt(private_pem, exp_delta_days=-1)
grace_until = (datetime.now(timezone.utc) + timedelta(days=3)).isoformat()
license_path = _write_license(tmp_path, token, grace_until=grace_until)
from scripts.license import verify_local
result = verify_local(license_path=license_path, public_key_path=public_path)
assert result is not None
assert result["tier"] == "paid"
assert result["in_grace"] is True
def test_expired_past_grace_returns_none(self, test_keys, tmp_path):
private_pem, _, public_path = test_keys
token = _make_jwt(private_pem, exp_delta_days=-10)
grace_until = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
license_path = _write_license(tmp_path, token, grace_until=grace_until)
from scripts.license import verify_local
result = verify_local(license_path=license_path, public_key_path=public_path)
assert result is None
class TestEffectiveTier:
def test_returns_free_when_no_license(self, tmp_path):
from scripts.license import effective_tier
result = effective_tier(
license_path=tmp_path / "missing.json",
public_key_path=tmp_path / "key.pem",
)
assert result == "free"
def test_returns_tier_from_valid_jwt(self, test_keys, tmp_path):
private_pem, _, public_path = test_keys
token = _make_jwt(private_pem, tier="premium")
license_path = _write_license(tmp_path, token)
from scripts.license import effective_tier
result = effective_tier(license_path=license_path, public_key_path=public_path)
assert result == "premium"
Step 3: Run to verify failure
/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_license.py -v
Expected: FAILED — ModuleNotFoundError: No module named 'scripts.license'
Step 4: Write scripts/license.py
# scripts/license.py
"""
CircuitForge license client for Peregrine.
Activates against the license server, caches a signed JWT locally,
and verifies tier offline using the embedded RS256 public key.
All functions accept override paths for testing; production code uses
the module-level defaults.
"""
from __future__ import annotations
import hashlib
import json
import socket
import threading
import uuid
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
import jwt as pyjwt
_HERE = Path(__file__).parent
_DEFAULT_LICENSE_PATH = _HERE.parent / "config" / "license.json"
_DEFAULT_PUBLIC_KEY_PATH = _HERE / "license_public_key.pem"
_LICENSE_SERVER = "https://license.circuitforge.com"
_PRODUCT = "peregrine"
_REFRESH_THRESHOLD_DAYS = 5
_GRACE_PERIOD_DAYS = 7
# ── Machine fingerprint ────────────────────────────────────────────────────────
def _machine_id() -> str:
raw = f"{socket.gethostname()}-{uuid.getnode()}"
return hashlib.sha256(raw.encode()).hexdigest()[:32]
# ── License file helpers ───────────────────────────────────────────────────────
def _read_license(license_path: Path) -> dict | None:
try:
return json.loads(license_path.read_text())
except (FileNotFoundError, json.JSONDecodeError, OSError):
return None
def _write_license(data: dict, license_path: Path) -> None:
license_path.parent.mkdir(parents=True, exist_ok=True)
license_path.write_text(json.dumps(data, indent=2))
# ── Core verify ───────────────────────────────────────────────────────────────
def verify_local(
license_path: Path = _DEFAULT_LICENSE_PATH,
public_key_path: Path = _DEFAULT_PUBLIC_KEY_PATH,
) -> dict | None:
"""Verify the cached JWT offline. Returns payload dict or None (= free tier).
Returns dict has keys: tier, in_grace (bool), sub, product, notice (optional).
"""
stored = _read_license(license_path)
if not stored or not stored.get("jwt"):
return None
if not public_key_path.exists():
return None
public_key = public_key_path.read_bytes()
try:
payload = pyjwt.decode(stored["jwt"], public_key, algorithms=["RS256"])
# Valid and not expired
if payload.get("product") != _PRODUCT:
return None
return {**payload, "in_grace": False}
except pyjwt.exceptions.ExpiredSignatureError:
# JWT expired — check grace period
grace_until_str = stored.get("grace_until")
if not grace_until_str:
return None
try:
grace_until = datetime.fromisoformat(grace_until_str)
if grace_until.tzinfo is None:
grace_until = grace_until.replace(tzinfo=timezone.utc)
except ValueError:
return None
if datetime.now(timezone.utc) > grace_until:
return None
# Decode without verification to get payload
try:
payload = pyjwt.decode(stored["jwt"], public_key,
algorithms=["RS256"],
options={"verify_exp": False})
if payload.get("product") != _PRODUCT:
return None
return {**payload, "in_grace": True}
except pyjwt.exceptions.PyJWTError:
return None
except pyjwt.exceptions.PyJWTError:
return None
def effective_tier(
license_path: Path = _DEFAULT_LICENSE_PATH,
public_key_path: Path = _DEFAULT_PUBLIC_KEY_PATH,
) -> str:
"""Return the effective tier string. Falls back to 'free' on any problem."""
result = verify_local(license_path=license_path, public_key_path=public_key_path)
if result is None:
return "free"
return result.get("tier", "free")
# ── Network operations (all fire-and-forget or explicit) ──────────────────────
def activate(
key: str,
license_path: Path = _DEFAULT_LICENSE_PATH,
public_key_path: Path = _DEFAULT_PUBLIC_KEY_PATH,
app_version: str | None = None,
) -> dict:
"""Activate a license key. Returns response dict. Raises on failure."""
import httpx
mid = _machine_id()
resp = httpx.post(
f"{_LICENSE_SERVER}/v1/activate",
json={"key": key, "machine_id": mid, "product": _PRODUCT,
"app_version": app_version, "platform": _detect_platform()},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
stored = {
"jwt": data["jwt"],
"key_display": key,
"tier": data["tier"],
"valid_until": data.get("valid_until"),
"machine_id": mid,
"last_refresh": datetime.now(timezone.utc).isoformat(),
"grace_until": None,
}
_write_license(stored, license_path)
return data
def deactivate(
license_path: Path = _DEFAULT_LICENSE_PATH,
) -> None:
"""Deactivate this machine. Deletes license.json."""
import httpx
stored = _read_license(license_path)
if not stored:
return
try:
httpx.post(
f"{_LICENSE_SERVER}/v1/deactivate",
json={"jwt": stored["jwt"], "machine_id": stored.get("machine_id", _machine_id())},
timeout=10,
)
except Exception:
pass # best-effort
license_path.unlink(missing_ok=True)
def refresh_if_needed(
license_path: Path = _DEFAULT_LICENSE_PATH,
public_key_path: Path = _DEFAULT_PUBLIC_KEY_PATH,
) -> None:
"""Silently refresh JWT if it expires within threshold. No-op on network failure."""
stored = _read_license(license_path)
if not stored or not stored.get("jwt"):
return
try:
payload = pyjwt.decode(stored["jwt"], public_key_path.read_bytes(),
algorithms=["RS256"])
exp = datetime.fromtimestamp(payload["exp"], tz=timezone.utc)
if exp - datetime.now(timezone.utc) > timedelta(days=_REFRESH_THRESHOLD_DAYS):
return
except pyjwt.exceptions.ExpiredSignatureError:
# Already expired — try to refresh anyway, set grace if unreachable
pass
except Exception:
return
try:
import httpx
resp = httpx.post(
f"{_LICENSE_SERVER}/v1/refresh",
json={"jwt": stored["jwt"],
"machine_id": stored.get("machine_id", _machine_id())},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
stored["jwt"] = data["jwt"]
stored["tier"] = data["tier"]
stored["last_refresh"] = datetime.now(timezone.utc).isoformat()
stored["grace_until"] = None
_write_license(stored, license_path)
except Exception:
# Unreachable — set grace period if not already set
if not stored.get("grace_until"):
grace = datetime.now(timezone.utc) + timedelta(days=_GRACE_PERIOD_DAYS)
stored["grace_until"] = grace.isoformat()
_write_license(stored, license_path)
def report_usage(
event_type: str,
metadata: dict | None = None,
license_path: Path = _DEFAULT_LICENSE_PATH,
) -> None:
"""Fire-and-forget usage telemetry. Never blocks, never raises."""
stored = _read_license(license_path)
if not stored or not stored.get("jwt"):
return
def _send():
try:
import httpx
httpx.post(
f"{_LICENSE_SERVER}/v1/usage",
json={"event_type": event_type, "product": _PRODUCT,
"metadata": metadata or {}},
headers={"Authorization": f"Bearer {stored['jwt']}"},
timeout=5,
)
except Exception:
pass
threading.Thread(target=_send, daemon=True).start()
def report_flag(
flag_type: str,
details: dict | None = None,
license_path: Path = _DEFAULT_LICENSE_PATH,
) -> None:
"""Fire-and-forget violation report. Never blocks, never raises."""
stored = _read_license(license_path)
if not stored or not stored.get("jwt"):
return
def _send():
try:
import httpx
httpx.post(
f"{_LICENSE_SERVER}/v1/flag",
json={"flag_type": flag_type, "product": _PRODUCT,
"details": details or {}},
headers={"Authorization": f"Bearer {stored['jwt']}"},
timeout=5,
)
except Exception:
pass
threading.Thread(target=_send, daemon=True).start()
def _detect_platform() -> str:
import sys
if sys.platform.startswith("linux"):
return "linux"
if sys.platform == "darwin":
return "macos"
if sys.platform == "win32":
return "windows"
return "unknown"
Step 5: Run tests to verify they pass
/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_license.py -v
Expected: all tests pass
Step 6: Commit
cd /Library/Development/devl/peregrine
git add scripts/license.py scripts/license_public_key.pem tests/test_license.py
git commit -m "feat: license.py client — verify_local, effective_tier, activate, refresh, report_usage"
Task 9: Wire tiers.py + update .gitignore
Files:
- Modify:
app/wizard/tiers.py - Modify:
.gitignore - Create:
tests/test_license_tier_integration.py
Step 1: Write failing test
# tests/test_license_tier_integration.py
import json
import pytest
from pathlib import Path
from datetime import datetime, timedelta, timezone
from unittest.mock import patch
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
import jwt as pyjwt
@pytest.fixture()
def license_env(tmp_path):
"""Returns (private_pem, public_path, license_path) for tier integration tests."""
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
public_path = tmp_path / "public.pem"
public_path.write_bytes(public_pem)
license_path = tmp_path / "license.json"
return private_pem, public_path, license_path
def _write_jwt_license(license_path, private_pem, tier="paid", days=30):
now = datetime.now(timezone.utc)
token = pyjwt.encode({
"sub": "CFG-PRNG-TEST", "product": "peregrine", "tier": tier,
"iat": now, "exp": now + timedelta(days=days),
}, private_pem, algorithm="RS256")
license_path.write_text(json.dumps({"jwt": token, "grace_until": None}))
def test_effective_tier_free_without_license(tmp_path):
from app.wizard.tiers import effective_tier
tier = effective_tier(
profile=None,
license_path=tmp_path / "missing.json",
public_key_path=tmp_path / "key.pem",
)
assert tier == "free"
def test_effective_tier_paid_with_valid_license(license_env):
private_pem, public_path, license_path = license_env
_write_jwt_license(license_path, private_pem, tier="paid")
from app.wizard.tiers import effective_tier
tier = effective_tier(profile=None, license_path=license_path,
public_key_path=public_path)
assert tier == "paid"
def test_effective_tier_dev_override_takes_precedence(license_env):
"""dev_tier_override wins even when a valid license is present."""
private_pem, public_path, license_path = license_env
_write_jwt_license(license_path, private_pem, tier="paid")
class FakeProfile:
dev_tier_override = "premium"
from app.wizard.tiers import effective_tier
tier = effective_tier(profile=FakeProfile(), license_path=license_path,
public_key_path=public_path)
assert tier == "premium"
Step 2: Run to verify failure
/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_license_tier_integration.py -v
Expected: FAILED — effective_tier() got unexpected keyword argument 'license_path'
Step 3: Update app/wizard/tiers.py — add effective_tier() function
# Add at bottom of app/wizard/tiers.py (after existing functions):
def effective_tier(
profile=None,
license_path=None,
public_key_path=None,
) -> str:
"""Return the effective tier for this installation.
Priority:
1. profile.dev_tier_override (developer mode override)
2. License JWT verification (offline RS256 check)
3. "free" (fallback)
license_path and public_key_path default to production paths when None.
Pass explicit paths in tests to avoid touching real files.
"""
if profile and getattr(profile, "dev_tier_override", None):
return profile.dev_tier_override
from scripts.license import effective_tier as _license_tier
from pathlib import Path as _Path
kwargs = {}
if license_path is not None:
kwargs["license_path"] = _Path(license_path)
if public_key_path is not None:
kwargs["public_key_path"] = _Path(public_key_path)
return _license_tier(**kwargs)
Step 4: Add config/license.json to .gitignore
Open /Library/Development/devl/peregrine/.gitignore and add:
config/license.json
Step 5: Run tests
/devl/miniconda3/envs/job-seeker/bin/pytest tests/test_license_tier_integration.py -v
Expected: 3 passed
Step 6: Run full suite to check for regressions
/devl/miniconda3/envs/job-seeker/bin/pytest tests/ -v
Expected: all existing tests still pass
Step 7: Commit
git add app/wizard/tiers.py .gitignore tests/test_license_tier_integration.py
git commit -m "feat: wire license.effective_tier into tiers.py; add dev_override priority"
Task 10: Settings License tab + app.py startup refresh
Files:
- Modify:
app/pages/2_Settings.py(add License tab) - Modify:
app/app.py(callrefresh_if_neededon startup)
Step 1: Add License tab to app/pages/2_Settings.py
Find the _tab_names list and insert "🔑 License" after "🛠️ Developer" (or at the end of the list before Developer). Then find the corresponding tab variable assignment block and add:
# In the tab variables section:
tab_license = _all_tabs[<correct index>]
Then add the license tab content block:
# ── License tab ──────────────────────────────────────────────────────────────
with tab_license:
st.subheader("🔑 License")
from scripts.license import (
verify_local as _verify_local,
activate as _activate,
deactivate as _deactivate,
_DEFAULT_LICENSE_PATH,
_DEFAULT_PUBLIC_KEY_PATH,
)
_lic = _verify_local()
if _lic:
# Active license
_grace_note = " _(grace period active)_" if _lic.get("in_grace") else ""
st.success(f"**{_lic['tier'].title()} tier** active{_grace_note}")
st.caption(f"Key: `{_DEFAULT_LICENSE_PATH.exists() and __import__('json').loads(_DEFAULT_LICENSE_PATH.read_text()).get('key_display', '—') or '—'}`")
if _lic.get("notice"):
st.info(_lic["notice"])
if st.button("Deactivate this machine", type="secondary"):
_deactivate()
st.success("Deactivated. Restart the app to apply.")
st.rerun()
else:
st.info("No active license — running on **free tier**.")
st.caption("Enter a license key to unlock paid features.")
_key_input = st.text_input(
"License key",
placeholder="CFG-PRNG-XXXX-XXXX-XXXX",
label_visibility="collapsed",
)
if st.button("Activate", disabled=not (_key_input or "").strip()):
with st.spinner("Activating…"):
try:
result = _activate(_key_input.strip())
st.success(f"Activated! Tier: **{result['tier']}**")
st.rerun()
except Exception as _e:
st.error(f"Activation failed: {_e}")
Step 2: Add startup refresh to app/app.py
Find the startup block (near where init_db is called, before st.navigation). Add:
# Silent license refresh on startup — no-op if unreachable
try:
from scripts.license import refresh_if_needed as _refresh_license
_refresh_license()
except Exception:
pass
Step 3: Run full test suite
/devl/miniconda3/envs/job-seeker/bin/pytest tests/ -v
Expected: all tests pass (License tab is UI-only, no new unit tests needed — covered by existing Settings tests for tab structure)
Step 4: Commit
git add app/pages/2_Settings.py app/app.py
git commit -m "feat: License tab in Settings (activate/deactivate UI) + startup refresh"
Task 11: Final check + Forgejo push
Step 1: Run full suite one last time
/devl/miniconda3/envs/job-seeker/bin/pytest tests/ -v --tb=short
Expected: all tests pass
Step 2: Push Peregrine to Forgejo
cd /Library/Development/devl/peregrine
git push origin main
Step 3: Verify Caddy route is ready
Add to /opt/containers/caddy/Caddyfile on Heimdall (SSH in and edit):
license.circuitforge.com {
reverse_proxy localhost:8600
}
Reload Caddy:
docker exec caddy-proxy caddy reload --config /etc/caddy/Caddyfile
Step 4: Deploy license server on Heimdall
# SSH to Heimdall
cd /devl/circuitforge-license # live clone lives here
cp .env.example .env
# Edit .env: set ADMIN_TOKEN to a long random string
# keys/ already has private.pem + public.pem from Task 7 step 3
docker compose up -d
Step 5: Smoke test
# Create a test key
export ADMIN_TOKEN=<your token>
./scripts/issue-key.sh --product peregrine --tier paid --email test@example.com
# → Key: CFG-PRNG-XXXX-XXXX-XXXX
# Test activation from Peregrine machine
curl -X POST https://license.circuitforge.com/v1/activate \
-H "Content-Type: application/json" \
-d '{"key":"CFG-PRNG-XXXX-XXXX-XXXX","machine_id":"test","product":"peregrine"}'
# → {"jwt":"eyJ...","tier":"paid",...}
Summary
| Task | Repo | Deliverable |
|---|---|---|
| 1 | license-server | Repo scaffold + DB schema |
| 2 | license-server | crypto.py + test keypair fixture |
| 3 | license-server | Pydantic models |
| 4 | license-server | /v1/activate, /v1/refresh, /v1/deactivate |
| 5 | license-server | /v1/usage, /v1/flag, full admin CRUD |
| 6 | license-server | Docker + Caddy + issue-key.sh |
| 7 | license-server | Forgejo push + real keypair |
| 8 | peregrine | scripts/license.py + public key |
| 9 | peregrine | tiers.py wired + .gitignore updated |
| 10 | peregrine | License tab in Settings + startup refresh |
| 11 | both | Deploy to Heimdall + smoke test |