diff --git a/config/encryptid_auth.py b/config/encryptid_auth.py new file mode 100644 index 0000000..ba60c88 --- /dev/null +++ b/config/encryptid_auth.py @@ -0,0 +1,221 @@ +""" +EncryptID Django / DRF Authentication Backend + +Usage with Django REST Framework: + + # settings.py + REST_FRAMEWORK = { + 'DEFAULT_AUTHENTICATION_CLASSES': [ + 'encryptid_middleware.EncryptIDAuthentication', + 'rest_framework.authentication.SessionAuthentication', + 'rest_framework.authentication.TokenAuthentication', + ] + } + + ENCRYPTID_SERVER_URL = 'https://encryptid.jeffemmett.com' + ENCRYPTID_JWT_SECRET = None # Set for local verification; None = remote verify + +Usage standalone (Django middleware): + + # settings.py + MIDDLEWARE = [ + ... + 'encryptid_middleware.EncryptIDMiddleware', + ] +""" + +import json +import hmac +import hashlib +import base64 +import time +from typing import Optional +from urllib.request import Request, urlopen +from urllib.error import URLError + +from django.conf import settings +from django.contrib.auth import get_user_model + +User = get_user_model() + +# --------------------------------------------------------------------------- +# JWT helpers (no PyJWT dependency for basic HMAC-SHA256) +# --------------------------------------------------------------------------- + +def _b64decode(s: str) -> bytes: + """Base64url decode with padding.""" + s += '=' * (-len(s) % 4) + return base64.urlsafe_b64decode(s) + + +def _b64encode(data: bytes) -> str: + """Base64url encode without padding.""" + return base64.urlsafe_b64encode(data).rstrip(b'=').decode() + + +def verify_jwt_local(token: str, secret: str) -> Optional[dict]: + """Verify a HS256 JWT locally and return claims, or None.""" + try: + parts = token.split('.') + if len(parts) != 3: + return None + + header = json.loads(_b64decode(parts[0])) + if header.get('alg') != 'HS256': + return None + + signing_input = f'{parts[0]}.{parts[1]}'.encode() + expected_sig = hmac.new( + secret.encode(), signing_input, hashlib.sha256 + ).digest() + actual_sig = _b64decode(parts[2]) + + if not hmac.compare_digest(expected_sig, actual_sig): + return None + + claims = json.loads(_b64decode(parts[1])) + + # Check expiration + if claims.get('exp') and claims['exp'] < time.time(): + return None + + return claims + except Exception: + return None + + +def verify_jwt_remote(token: str, server_url: str) -> Optional[dict]: + """Verify a JWT by calling the EncryptID server.""" + try: + req = Request( + f'{server_url.rstrip("/")}/api/session/verify', + data=json.dumps({'token': token}).encode(), + headers={'Content-Type': 'application/json'}, + method='POST', + ) + with urlopen(req, timeout=5) as resp: + data = json.loads(resp.read()) + if data.get('valid'): + return data.get('claims', {}) + return None + except (URLError, json.JSONDecodeError, Exception): + return None + + +def verify_encryptid_token(token: str) -> Optional[dict]: + """Verify an EncryptID JWT using configured method.""" + secret = getattr(settings, 'ENCRYPTID_JWT_SECRET', None) + if secret: + return verify_jwt_local(token, secret) + + server_url = getattr( + settings, 'ENCRYPTID_SERVER_URL', 'https://encryptid.jeffemmett.com' + ) + return verify_jwt_remote(token, server_url) + + +# --------------------------------------------------------------------------- +# User resolution +# --------------------------------------------------------------------------- + +def get_or_create_user(claims: dict): + """Find or create a Django User from EncryptID claims.""" + did = claims.get('did') or claims.get('sub', '') + username = claims.get('username', '') + + # Try to find by DID first (stored in email field as convention) + try: + return User.objects.get(email=did) + except User.DoesNotExist: + pass + + # Try by username + if username: + try: + return User.objects.get(username=username) + except User.DoesNotExist: + pass + + # Create new user + safe_username = username or did[:30] + # Ensure unique username + base = safe_username + counter = 1 + while User.objects.filter(username=safe_username).exists(): + safe_username = f'{base}_{counter}' + counter += 1 + + return User.objects.create_user( + username=safe_username, + email=did, # Store DID in email field + password=None, # No password — passkey-only + ) + + +# --------------------------------------------------------------------------- +# Django REST Framework Authentication Backend +# --------------------------------------------------------------------------- + +try: + from rest_framework.authentication import BaseAuthentication + from rest_framework.exceptions import AuthenticationFailed + + class EncryptIDAuthentication(BaseAuthentication): + """ + DRF authentication backend for EncryptID JWT tokens. + + Accepts: Authorization: Bearer + """ + + def authenticate(self, request): + auth_header = request.META.get('HTTP_AUTHORIZATION', '') + if not auth_header.startswith('Bearer '): + return None # Not our token — let other backends try + + token = auth_header[7:] + claims = verify_encryptid_token(token) + if not claims: + raise AuthenticationFailed('Invalid or expired EncryptID token') + + user = get_or_create_user(claims) + + # Attach claims to request for downstream use + request.encryptid_claims = claims + return (user, token) + + def authenticate_header(self, request): + return 'Bearer realm="encryptid"' + +except ImportError: + # DRF not installed — skip + pass + + +# --------------------------------------------------------------------------- +# Django Middleware (for non-DRF views) +# --------------------------------------------------------------------------- + +class EncryptIDMiddleware: + """ + Django middleware that attaches EncryptID claims to request. + + Does NOT enforce authentication — just populates request.encryptid_claims + and request.encryptid_user if a valid Bearer token is present. + """ + + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request): + request.encryptid_claims = None + request.encryptid_user = None + + auth_header = request.META.get('HTTP_AUTHORIZATION', '') + if auth_header.startswith('Bearer '): + token = auth_header[7:] + claims = verify_encryptid_token(token) + if claims: + request.encryptid_claims = claims + request.encryptid_user = get_or_create_user(claims) + + return self.get_response(request) diff --git a/config/settings.py b/config/settings.py index 9ff07ca..1ae1145 100644 --- a/config/settings.py +++ b/config/settings.py @@ -111,6 +111,7 @@ DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField' # REST Framework REST_FRAMEWORK = { 'DEFAULT_AUTHENTICATION_CLASSES': [ + 'config.encryptid_auth.EncryptIDAuthentication', 'rest_framework.authentication.SessionAuthentication', 'rest_framework.authentication.TokenAuthentication', ], @@ -148,6 +149,10 @@ CELERY_BEAT_SCHEDULE = { # rfiles-specific settings SHARE_BASE_URL = os.environ.get('SHARE_BASE_URL', 'https://rfiles.online') +# EncryptID (passkey auth) +ENCRYPTID_SERVER_URL = os.environ.get('ENCRYPTID_SERVER_URL', 'https://encryptid.jeffemmett.com') +ENCRYPTID_JWT_SECRET = os.environ.get('ENCRYPTID_JWT_SECRET', None) # Set for local JWT verification + # Security (production) if not DEBUG: SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https')