From bf2d0f81c77e99ce149b96eb9e48dec4f0fbdba7 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Wed, 25 Feb 2026 22:53:11 -0800 Subject: [PATCH] =?UTF-8?q?feat:=20license.py=20client=20=E2=80=94=20verif?= =?UTF-8?q?y=5Flocal,=20effective=5Ftier,=20activate,=20refresh,=20report?= =?UTF-8?q?=5Fusage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/license.py | 275 +++++++++++++++++++++++++++++++++ scripts/license_public_key.pem | 9 ++ tests/test_license.py | 121 +++++++++++++++ 3 files changed, 405 insertions(+) create mode 100644 scripts/license.py create mode 100644 scripts/license_public_key.pem create mode 100644 tests/test_license.py diff --git a/scripts/license.py b/scripts/license.py new file mode 100644 index 0000000..e702d79 --- /dev/null +++ b/scripts/license.py @@ -0,0 +1,275 @@ +""" +CircuitForge license client for Peregrine. + +Activates against the license server, caches a signed JWT locally, +and verifies tier offline using the embedded RS256 public key. + +All functions accept override paths for testing; production code uses +the module-level defaults. +""" +from __future__ import annotations + +import hashlib +import json +import socket +import threading +import uuid +from datetime import datetime, timedelta, timezone +from pathlib import Path + +import jwt as pyjwt + +_HERE = Path(__file__).parent +_DEFAULT_LICENSE_PATH = _HERE.parent / "config" / "license.json" +_DEFAULT_PUBLIC_KEY_PATH = _HERE / "license_public_key.pem" +_LICENSE_SERVER = "https://license.circuitforge.tech" +_PRODUCT = "peregrine" +_REFRESH_THRESHOLD_DAYS = 5 +_GRACE_PERIOD_DAYS = 7 + + +# ── Machine fingerprint ──────────────────────────────────────────────────────── + +def _machine_id() -> str: + raw = f"{socket.gethostname()}-{uuid.getnode()}" + return hashlib.sha256(raw.encode()).hexdigest()[:32] + + +# ── License file helpers ─────────────────────────────────────────────────────── + +def _read_license(license_path: Path) -> dict | None: + try: + return json.loads(license_path.read_text()) + except (FileNotFoundError, json.JSONDecodeError, OSError): + return None + + +def _write_license(data: dict, license_path: Path) -> None: + license_path.parent.mkdir(parents=True, exist_ok=True) + license_path.write_text(json.dumps(data, indent=2)) + + +# ── Core verify ─────────────────────────────────────────────────────────────── + +def verify_local( + license_path: Path = _DEFAULT_LICENSE_PATH, + public_key_path: Path = _DEFAULT_PUBLIC_KEY_PATH, +) -> dict | None: + """Verify the cached JWT offline. Returns payload dict or None (= free tier). + + Returned dict has keys: tier, in_grace (bool), sub, product, notice (optional). + """ + stored = _read_license(license_path) + if not stored or not stored.get("jwt"): + return None + + if not public_key_path.exists(): + return None + + public_key = public_key_path.read_bytes() + + try: + payload = pyjwt.decode(stored["jwt"], public_key, algorithms=["RS256"]) + if payload.get("product") != _PRODUCT: + return None + return {**payload, "in_grace": False} + + except pyjwt.exceptions.ExpiredSignatureError: + # JWT expired — check local grace period before requiring a refresh + grace_until_str = stored.get("grace_until") + if not grace_until_str: + return None + try: + grace_until = datetime.fromisoformat(grace_until_str) + if grace_until.tzinfo is None: + grace_until = grace_until.replace(tzinfo=timezone.utc) + except ValueError: + return None + if datetime.now(timezone.utc) > grace_until: + return None + # Decode without expiry check to recover the payload + try: + payload = pyjwt.decode( + stored["jwt"], public_key, + algorithms=["RS256"], + options={"verify_exp": False}, + ) + if payload.get("product") != _PRODUCT: + return None + return {**payload, "in_grace": True} + except pyjwt.exceptions.PyJWTError: + return None + + except pyjwt.exceptions.PyJWTError: + return None + + +def effective_tier( + license_path: Path = _DEFAULT_LICENSE_PATH, + public_key_path: Path = _DEFAULT_PUBLIC_KEY_PATH, +) -> str: + """Return the effective tier string. Falls back to 'free' on any problem.""" + result = verify_local(license_path=license_path, public_key_path=public_key_path) + if result is None: + return "free" + return result.get("tier", "free") + + +# ── Network operations (all fire-and-forget or explicit) ────────────────────── + +def activate( + key: str, + license_path: Path = _DEFAULT_LICENSE_PATH, + public_key_path: Path = _DEFAULT_PUBLIC_KEY_PATH, + app_version: str | None = None, +) -> dict: + """Activate a license key. Returns response dict. Raises on failure.""" + import httpx + mid = _machine_id() + resp = httpx.post( + f"{_LICENSE_SERVER}/activate", + json={ + "key": key, + "machine_id": mid, + "product": _PRODUCT, + "app_version": app_version, + "platform": _detect_platform(), + }, + timeout=10, + ) + resp.raise_for_status() + data = resp.json() + stored = { + "jwt": data["jwt"], + "key_display": key, + "tier": data["tier"], + "valid_until": data.get("valid_until"), + "machine_id": mid, + "last_refresh": datetime.now(timezone.utc).isoformat(), + "grace_until": None, + } + _write_license(stored, license_path) + return data + + +def deactivate( + license_path: Path = _DEFAULT_LICENSE_PATH, +) -> None: + """Deactivate this machine. Deletes license.json.""" + import httpx + stored = _read_license(license_path) + if not stored: + return + try: + httpx.post( + f"{_LICENSE_SERVER}/deactivate", + json={"jwt": stored["jwt"], "machine_id": stored.get("machine_id", _machine_id())}, + timeout=10, + ) + except Exception: + pass # best-effort + license_path.unlink(missing_ok=True) + + +def refresh_if_needed( + license_path: Path = _DEFAULT_LICENSE_PATH, + public_key_path: Path = _DEFAULT_PUBLIC_KEY_PATH, +) -> None: + """Silently refresh JWT if it expires within threshold. No-op on network failure.""" + stored = _read_license(license_path) + if not stored or not stored.get("jwt"): + return + try: + payload = pyjwt.decode( + stored["jwt"], public_key_path.read_bytes(), algorithms=["RS256"] + ) + exp = datetime.fromtimestamp(payload["exp"], tz=timezone.utc) + if exp - datetime.now(timezone.utc) > timedelta(days=_REFRESH_THRESHOLD_DAYS): + return + except pyjwt.exceptions.ExpiredSignatureError: + # Already expired — try to refresh anyway, set grace if unreachable + pass + except Exception: + return + + try: + import httpx + resp = httpx.post( + f"{_LICENSE_SERVER}/refresh", + json={"jwt": stored["jwt"], "machine_id": stored.get("machine_id", _machine_id())}, + timeout=10, + ) + resp.raise_for_status() + data = resp.json() + stored["jwt"] = data["jwt"] + stored["tier"] = data["tier"] + stored["last_refresh"] = datetime.now(timezone.utc).isoformat() + stored["grace_until"] = None + _write_license(stored, license_path) + except Exception: + # Server unreachable — set grace period if not already set + if not stored.get("grace_until"): + grace = datetime.now(timezone.utc) + timedelta(days=_GRACE_PERIOD_DAYS) + stored["grace_until"] = grace.isoformat() + _write_license(stored, license_path) + + +def report_usage( + event_type: str, + metadata: dict | None = None, + license_path: Path = _DEFAULT_LICENSE_PATH, +) -> None: + """Fire-and-forget usage telemetry. Never blocks, never raises.""" + stored = _read_license(license_path) + if not stored or not stored.get("jwt"): + return + + def _send(): + try: + import httpx + httpx.post( + f"{_LICENSE_SERVER}/usage", + json={"event_type": event_type, "product": _PRODUCT, "metadata": metadata or {}}, + headers={"Authorization": f"Bearer {stored['jwt']}"}, + timeout=5, + ) + except Exception: + pass + + threading.Thread(target=_send, daemon=True).start() + + +def report_flag( + flag_type: str, + details: dict | None = None, + license_path: Path = _DEFAULT_LICENSE_PATH, +) -> None: + """Fire-and-forget violation report. Never blocks, never raises.""" + stored = _read_license(license_path) + if not stored or not stored.get("jwt"): + return + + def _send(): + try: + import httpx + httpx.post( + f"{_LICENSE_SERVER}/flag", + json={"flag_type": flag_type, "product": _PRODUCT, "details": details or {}}, + headers={"Authorization": f"Bearer {stored['jwt']}"}, + timeout=5, + ) + except Exception: + pass + + threading.Thread(target=_send, daemon=True).start() + + +def _detect_platform() -> str: + import sys + if sys.platform.startswith("linux"): + return "linux" + if sys.platform == "darwin": + return "macos" + if sys.platform == "win32": + return "windows" + return "unknown" diff --git a/scripts/license_public_key.pem b/scripts/license_public_key.pem new file mode 100644 index 0000000..92fc3e6 --- /dev/null +++ b/scripts/license_public_key.pem @@ -0,0 +1,9 @@ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAr9kLOyfJbm1QMFGdsC8b +LR9xm4bCZ9L63o8doejfMHNliQrUxmmKPKYF4o3dE73Y9og7MrmQRN1pvFgvcVAj +o7GB6os5hSf8DDLYSFa2uGwoWOTs9uhDHKcB32T7nI3PCq0hqIoLfwfc9noi+MWh +UP8APzgQe7iKjbr+l7wXFM7UhybZ30CYZ10jgdLyP/PMVqVpgWSBm/I84FT+krUS +pvx+9KEwzdwoHdZltTwFHr29RISsk4161R0+1pJmXBpa4EsKhlHvrXEpHDssG68h +nDeqdFN20EJhf6L0Gab6UYGJqkaMecrdYrij+6Xu5jx3awn7mIsxCkj0jXtmNPZJ +LQIDAQAB +-----END PUBLIC KEY----- diff --git a/tests/test_license.py b/tests/test_license.py new file mode 100644 index 0000000..b72a868 --- /dev/null +++ b/tests/test_license.py @@ -0,0 +1,121 @@ +import json +import pytest +from pathlib import Path +from unittest.mock import patch, MagicMock +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives import serialization +import jwt as pyjwt +from datetime import datetime, timedelta, timezone + + +@pytest.fixture() +def test_keys(tmp_path): + """Generate test RSA keypair and return (private_pem, public_pem, public_path).""" + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + private_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + public_pem = private_key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + public_path = tmp_path / "test_public.pem" + public_path.write_bytes(public_pem) + return private_pem, public_pem, public_path + + +def _make_jwt(private_pem: bytes, tier: str = "paid", + product: str = "peregrine", + exp_delta_days: int = 30, + machine: str = "test-machine") -> str: + now = datetime.now(timezone.utc) + payload = { + "sub": "CFG-PRNG-TEST-TEST-TEST", + "product": product, + "tier": tier, + "seats": 1, + "machine": machine, + "iat": now, + "exp": now + timedelta(days=exp_delta_days), + } + return pyjwt.encode(payload, private_pem, algorithm="RS256") + + +def _write_license(tmp_path, jwt_token: str, grace_until: str | None = None) -> Path: + data = { + "jwt": jwt_token, + "key_display": "CFG-PRNG-TEST-TEST-TEST", + "tier": "paid", + "valid_until": None, + "machine_id": "test-machine", + "last_refresh": datetime.now(timezone.utc).isoformat(), + "grace_until": grace_until, + } + p = tmp_path / "license.json" + p.write_text(json.dumps(data)) + return p + + +class TestVerifyLocal: + def test_valid_jwt_returns_tier(self, test_keys, tmp_path): + private_pem, _, public_path = test_keys + token = _make_jwt(private_pem) + license_path = _write_license(tmp_path, token) + from scripts.license import verify_local + result = verify_local(license_path=license_path, public_key_path=public_path) + assert result is not None + assert result["tier"] == "paid" + + def test_missing_file_returns_none(self, tmp_path): + from scripts.license import verify_local + result = verify_local(license_path=tmp_path / "missing.json", + public_key_path=tmp_path / "key.pem") + assert result is None + + def test_wrong_product_returns_none(self, test_keys, tmp_path): + private_pem, _, public_path = test_keys + token = _make_jwt(private_pem, product="falcon") + license_path = _write_license(tmp_path, token) + from scripts.license import verify_local + result = verify_local(license_path=license_path, public_key_path=public_path) + assert result is None + + def test_expired_within_grace_returns_tier(self, test_keys, tmp_path): + private_pem, _, public_path = test_keys + token = _make_jwt(private_pem, exp_delta_days=-1) + grace_until = (datetime.now(timezone.utc) + timedelta(days=3)).isoformat() + license_path = _write_license(tmp_path, token, grace_until=grace_until) + from scripts.license import verify_local + result = verify_local(license_path=license_path, public_key_path=public_path) + assert result is not None + assert result["tier"] == "paid" + assert result["in_grace"] is True + + def test_expired_past_grace_returns_none(self, test_keys, tmp_path): + private_pem, _, public_path = test_keys + token = _make_jwt(private_pem, exp_delta_days=-10) + grace_until = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat() + license_path = _write_license(tmp_path, token, grace_until=grace_until) + from scripts.license import verify_local + result = verify_local(license_path=license_path, public_key_path=public_path) + assert result is None + + +class TestEffectiveTier: + def test_returns_free_when_no_license(self, tmp_path): + from scripts.license import effective_tier + result = effective_tier( + license_path=tmp_path / "missing.json", + public_key_path=tmp_path / "key.pem", + ) + assert result == "free" + + def test_returns_tier_from_valid_jwt(self, test_keys, tmp_path): + private_pem, _, public_path = test_keys + token = _make_jwt(private_pem, tier="premium") + license_path = _write_license(tmp_path, token) + from scripts.license import effective_tier + result = effective_tier(license_path=license_path, public_key_path=public_path) + assert result == "premium"