222 lines
6.7 KiB
Python
222 lines
6.7 KiB
Python
"""
|
|
EncryptID Django / DRF Authentication Backend
|
|
|
|
Usage with Django REST Framework:
|
|
|
|
# settings.py
|
|
REST_FRAMEWORK = {
|
|
'DEFAULT_AUTHENTICATION_CLASSES': [
|
|
'encryptid_middleware.EncryptIDAuthentication',
|
|
'rest_framework.authentication.SessionAuthentication',
|
|
'rest_framework.authentication.TokenAuthentication',
|
|
]
|
|
}
|
|
|
|
ENCRYPTID_SERVER_URL = 'https://encryptid.jeffemmett.com'
|
|
ENCRYPTID_JWT_SECRET = None # Set for local verification; None = remote verify
|
|
|
|
Usage standalone (Django middleware):
|
|
|
|
# settings.py
|
|
MIDDLEWARE = [
|
|
...
|
|
'encryptid_middleware.EncryptIDMiddleware',
|
|
]
|
|
"""
|
|
|
|
import json
|
|
import hmac
|
|
import hashlib
|
|
import base64
|
|
import time
|
|
from typing import Optional
|
|
from urllib.request import Request, urlopen
|
|
from urllib.error import URLError
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
|
|
User = get_user_model()
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# JWT helpers (no PyJWT dependency for basic HMAC-SHA256)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _b64decode(s: str) -> bytes:
|
|
"""Base64url decode with padding."""
|
|
s += '=' * (-len(s) % 4)
|
|
return base64.urlsafe_b64decode(s)
|
|
|
|
|
|
def _b64encode(data: bytes) -> str:
|
|
"""Base64url encode without padding."""
|
|
return base64.urlsafe_b64encode(data).rstrip(b'=').decode()
|
|
|
|
|
|
def verify_jwt_local(token: str, secret: str) -> Optional[dict]:
|
|
"""Verify a HS256 JWT locally and return claims, or None."""
|
|
try:
|
|
parts = token.split('.')
|
|
if len(parts) != 3:
|
|
return None
|
|
|
|
header = json.loads(_b64decode(parts[0]))
|
|
if header.get('alg') != 'HS256':
|
|
return None
|
|
|
|
signing_input = f'{parts[0]}.{parts[1]}'.encode()
|
|
expected_sig = hmac.new(
|
|
secret.encode(), signing_input, hashlib.sha256
|
|
).digest()
|
|
actual_sig = _b64decode(parts[2])
|
|
|
|
if not hmac.compare_digest(expected_sig, actual_sig):
|
|
return None
|
|
|
|
claims = json.loads(_b64decode(parts[1]))
|
|
|
|
# Check expiration
|
|
if claims.get('exp') and claims['exp'] < time.time():
|
|
return None
|
|
|
|
return claims
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def verify_jwt_remote(token: str, server_url: str) -> Optional[dict]:
|
|
"""Verify a JWT by calling the EncryptID server."""
|
|
try:
|
|
req = Request(
|
|
f'{server_url.rstrip("/")}/api/session/verify',
|
|
data=json.dumps({'token': token}).encode(),
|
|
headers={'Content-Type': 'application/json'},
|
|
method='POST',
|
|
)
|
|
with urlopen(req, timeout=5) as resp:
|
|
data = json.loads(resp.read())
|
|
if data.get('valid'):
|
|
return data.get('claims', {})
|
|
return None
|
|
except (URLError, json.JSONDecodeError, Exception):
|
|
return None
|
|
|
|
|
|
def verify_encryptid_token(token: str) -> Optional[dict]:
|
|
"""Verify an EncryptID JWT using configured method."""
|
|
secret = getattr(settings, 'ENCRYPTID_JWT_SECRET', None)
|
|
if secret:
|
|
return verify_jwt_local(token, secret)
|
|
|
|
server_url = getattr(
|
|
settings, 'ENCRYPTID_SERVER_URL', 'https://encryptid.jeffemmett.com'
|
|
)
|
|
return verify_jwt_remote(token, server_url)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# User resolution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_or_create_user(claims: dict):
|
|
"""Find or create a Django User from EncryptID claims."""
|
|
did = claims.get('did') or claims.get('sub', '')
|
|
username = claims.get('username', '')
|
|
|
|
# Try to find by DID first (stored in email field as convention)
|
|
try:
|
|
return User.objects.get(email=did)
|
|
except User.DoesNotExist:
|
|
pass
|
|
|
|
# Try by username
|
|
if username:
|
|
try:
|
|
return User.objects.get(username=username)
|
|
except User.DoesNotExist:
|
|
pass
|
|
|
|
# Create new user
|
|
safe_username = username or did[:30]
|
|
# Ensure unique username
|
|
base = safe_username
|
|
counter = 1
|
|
while User.objects.filter(username=safe_username).exists():
|
|
safe_username = f'{base}_{counter}'
|
|
counter += 1
|
|
|
|
return User.objects.create_user(
|
|
username=safe_username,
|
|
email=did, # Store DID in email field
|
|
password=None, # No password — passkey-only
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Django REST Framework Authentication Backend
|
|
# ---------------------------------------------------------------------------
|
|
|
|
try:
|
|
from rest_framework.authentication import BaseAuthentication
|
|
from rest_framework.exceptions import AuthenticationFailed
|
|
|
|
class EncryptIDAuthentication(BaseAuthentication):
|
|
"""
|
|
DRF authentication backend for EncryptID JWT tokens.
|
|
|
|
Accepts: Authorization: Bearer <encryptid-jwt>
|
|
"""
|
|
|
|
def authenticate(self, request):
|
|
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
|
|
if not auth_header.startswith('Bearer '):
|
|
return None # Not our token — let other backends try
|
|
|
|
token = auth_header[7:]
|
|
claims = verify_encryptid_token(token)
|
|
if not claims:
|
|
raise AuthenticationFailed('Invalid or expired EncryptID token')
|
|
|
|
user = get_or_create_user(claims)
|
|
|
|
# Attach claims to request for downstream use
|
|
request.encryptid_claims = claims
|
|
return (user, token)
|
|
|
|
def authenticate_header(self, request):
|
|
return 'Bearer realm="encryptid"'
|
|
|
|
except ImportError:
|
|
# DRF not installed — skip
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Django Middleware (for non-DRF views)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class EncryptIDMiddleware:
|
|
"""
|
|
Django middleware that attaches EncryptID claims to request.
|
|
|
|
Does NOT enforce authentication — just populates request.encryptid_claims
|
|
and request.encryptid_user if a valid Bearer token is present.
|
|
"""
|
|
|
|
def __init__(self, get_response):
|
|
self.get_response = get_response
|
|
|
|
def __call__(self, request):
|
|
request.encryptid_claims = None
|
|
request.encryptid_user = None
|
|
|
|
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
|
|
if auth_header.startswith('Bearer '):
|
|
token = auth_header[7:]
|
|
claims = verify_encryptid_token(token)
|
|
if claims:
|
|
request.encryptid_claims = claims
|
|
request.encryptid_user = get_or_create_user(claims)
|
|
|
|
return self.get_response(request)
|