""" EncryptID Django / DRF Authentication Backend Usage with Django REST Framework: # settings.py REST_FRAMEWORK = { 'DEFAULT_AUTHENTICATION_CLASSES': [ 'encryptid_middleware.EncryptIDAuthentication', 'rest_framework.authentication.SessionAuthentication', 'rest_framework.authentication.TokenAuthentication', ] } ENCRYPTID_SERVER_URL = 'https://encryptid.jeffemmett.com' ENCRYPTID_JWT_SECRET = None # Set for local verification; None = remote verify Usage standalone (Django middleware): # settings.py MIDDLEWARE = [ ... 'encryptid_middleware.EncryptIDMiddleware', ] """ import json import hmac import hashlib import base64 import time from typing import Optional from urllib.request import Request, urlopen from urllib.error import URLError from django.conf import settings from django.contrib.auth import get_user_model User = get_user_model() # --------------------------------------------------------------------------- # JWT helpers (no PyJWT dependency for basic HMAC-SHA256) # --------------------------------------------------------------------------- def _b64decode(s: str) -> bytes: """Base64url decode with padding.""" s += '=' * (-len(s) % 4) return base64.urlsafe_b64decode(s) def _b64encode(data: bytes) -> str: """Base64url encode without padding.""" return base64.urlsafe_b64encode(data).rstrip(b'=').decode() def verify_jwt_local(token: str, secret: str) -> Optional[dict]: """Verify a HS256 JWT locally and return claims, or None.""" try: parts = token.split('.') if len(parts) != 3: return None header = json.loads(_b64decode(parts[0])) if header.get('alg') != 'HS256': return None signing_input = f'{parts[0]}.{parts[1]}'.encode() expected_sig = hmac.new( secret.encode(), signing_input, hashlib.sha256 ).digest() actual_sig = _b64decode(parts[2]) if not hmac.compare_digest(expected_sig, actual_sig): return None claims = json.loads(_b64decode(parts[1])) # Check expiration if claims.get('exp') and claims['exp'] < time.time(): return None return claims except Exception: return None def verify_jwt_remote(token: str, server_url: str) -> Optional[dict]: """Verify a JWT by calling the EncryptID server.""" try: req = Request( f'{server_url.rstrip("/")}/api/session/verify', data=json.dumps({'token': token}).encode(), headers={'Content-Type': 'application/json'}, method='POST', ) with urlopen(req, timeout=5) as resp: data = json.loads(resp.read()) if data.get('valid'): return data.get('claims', {}) return None except (URLError, json.JSONDecodeError, Exception): return None def verify_encryptid_token(token: str) -> Optional[dict]: """Verify an EncryptID JWT using configured method.""" secret = getattr(settings, 'ENCRYPTID_JWT_SECRET', None) if secret: return verify_jwt_local(token, secret) server_url = getattr( settings, 'ENCRYPTID_SERVER_URL', 'https://encryptid.jeffemmett.com' ) return verify_jwt_remote(token, server_url) # --------------------------------------------------------------------------- # User resolution # --------------------------------------------------------------------------- def get_or_create_user(claims: dict): """Find or create a Django User from EncryptID claims.""" did = claims.get('did') or claims.get('sub', '') username = claims.get('username', '') # Try to find by DID first (stored in email field as convention) try: return User.objects.get(email=did) except User.DoesNotExist: pass # Try by username if username: try: return User.objects.get(username=username) except User.DoesNotExist: pass # Create new user safe_username = username or did[:30] # Ensure unique username base = safe_username counter = 1 while User.objects.filter(username=safe_username).exists(): safe_username = f'{base}_{counter}' counter += 1 return User.objects.create_user( username=safe_username, email=did, # Store DID in email field password=None, # No password — passkey-only ) # --------------------------------------------------------------------------- # Django REST Framework Authentication Backend # --------------------------------------------------------------------------- try: from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed from rest_framework import permissions class EncryptIDAuthentication(BaseAuthentication): """ DRF authentication backend for EncryptID JWT tokens. Accepts: Authorization: Bearer """ def authenticate(self, request): auth_header = request.META.get('HTTP_AUTHORIZATION', '') if not auth_header.startswith('Bearer '): return None # Not our token — let other backends try token = auth_header[7:] claims = verify_encryptid_token(token) if not claims: raise AuthenticationFailed('Invalid or expired EncryptID token') user = get_or_create_user(claims) # Attach claims to request for downstream use request.encryptid_claims = claims return (user, token) def authenticate_header(self, request): return 'Bearer realm="encryptid"' class SpacePermission(permissions.BasePermission): """ DRF permission that enforces SharedSpace visibility rules. Expects the view to implement get_space_config(request) returning {'visibility': str, 'owner_did': str} or None (no space context). """ def has_permission(self, request, view): get_config = getattr(view, 'get_space_config', None) if not get_config: return True config = get_config(request) if not config: return True # No space context — allow visibility = config.get('visibility', 'public') # Public spaces: anyone can read and write if visibility == 'public': return True # Public-read spaces: anyone can read, auth required to write if visibility == 'public_read': if request.method in permissions.SAFE_METHODS: return True return request.user and request.user.is_authenticated # Authenticated spaces: any logged-in user if visibility == 'authenticated': return request.user and request.user.is_authenticated # Members-only: must be the space owner if visibility == 'members_only': if not (request.user and request.user.is_authenticated): return False owner_did = config.get('owner_did', '') if not owner_did: return False return getattr(request.user, 'email', '') == owner_did return False except ImportError: # DRF not installed — skip pass # --------------------------------------------------------------------------- # Django Middleware (for non-DRF views) # --------------------------------------------------------------------------- class EncryptIDMiddleware: """ Django middleware that attaches EncryptID claims to request. Does NOT enforce authentication — just populates request.encryptid_claims and request.encryptid_user if a valid Bearer token is present. """ def __init__(self, get_response): self.get_response = get_response def __call__(self, request): request.encryptid_claims = None request.encryptid_user = None auth_header = request.META.get('HTTP_AUTHORIZATION', '') if auth_header.startswith('Bearer '): token = auth_header[7:] claims = verify_encryptid_token(token) if claims: request.encryptid_claims = claims request.encryptid_user = get_or_create_user(claims) return self.get_response(request) # --------------------------------------------------------------------------- # Space access helper (used by non-DRF views) # --------------------------------------------------------------------------- def check_space_access(request, space_config): """ Check if a request has write access to a shared space. Args: request: Django HttpRequest space_config: dict with 'visibility' and 'owner_did' Returns: dict with 'allowed' (bool) and 'reason' (str) """ visibility = space_config.get('visibility', 'public') if visibility == 'public': return {'allowed': True, 'reason': ''} if visibility == 'public_read': if request.user and request.user.is_authenticated: return {'allowed': True, 'reason': ''} return {'allowed': False, 'reason': 'Authentication required to upload'} if visibility == 'authenticated': if request.user and request.user.is_authenticated: return {'allowed': True, 'reason': ''} return {'allowed': False, 'reason': 'Authentication required'} if visibility == 'members_only': if not (request.user and request.user.is_authenticated): return {'allowed': False, 'reason': 'Authentication required'} owner_did = space_config.get('owner_did', '') if not owner_did: return {'allowed': False, 'reason': 'Space has no owner configured'} if getattr(request.user, 'email', '') == owner_did: return {'allowed': True, 'reason': ''} return {'allowed': False, 'reason': 'Only the space owner can upload'} return {'allowed': False, 'reason': 'Unknown visibility setting'}