rfiles-online/config/encryptid_auth.py

267 lines
8.3 KiB
Python

"""
EncryptID Django / DRF Authentication Backend
Usage with Django REST Framework:
# settings.py
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'encryptid_middleware.EncryptIDAuthentication',
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.TokenAuthentication',
]
}
ENCRYPTID_SERVER_URL = 'https://encryptid.jeffemmett.com'
ENCRYPTID_JWT_SECRET = None # Set for local verification; None = remote verify
Usage standalone (Django middleware):
# settings.py
MIDDLEWARE = [
...
'encryptid_middleware.EncryptIDMiddleware',
]
"""
import json
import hmac
import hashlib
import base64
import time
from typing import Optional
from urllib.request import Request, urlopen
from urllib.error import URLError
from django.conf import settings
from django.contrib.auth import get_user_model
User = get_user_model()
# ---------------------------------------------------------------------------
# JWT helpers (no PyJWT dependency for basic HMAC-SHA256)
# ---------------------------------------------------------------------------
def _b64decode(s: str) -> bytes:
"""Base64url decode with padding."""
s += '=' * (-len(s) % 4)
return base64.urlsafe_b64decode(s)
def _b64encode(data: bytes) -> str:
"""Base64url encode without padding."""
return base64.urlsafe_b64encode(data).rstrip(b'=').decode()
def verify_jwt_local(token: str, secret: str) -> Optional[dict]:
"""Verify a HS256 JWT locally and return claims, or None."""
try:
parts = token.split('.')
if len(parts) != 3:
return None
header = json.loads(_b64decode(parts[0]))
if header.get('alg') != 'HS256':
return None
signing_input = f'{parts[0]}.{parts[1]}'.encode()
expected_sig = hmac.new(
secret.encode(), signing_input, hashlib.sha256
).digest()
actual_sig = _b64decode(parts[2])
if not hmac.compare_digest(expected_sig, actual_sig):
return None
claims = json.loads(_b64decode(parts[1]))
# Check expiration
if claims.get('exp') and claims['exp'] < time.time():
return None
return claims
except Exception:
return None
def verify_jwt_remote(token: str, server_url: str) -> Optional[dict]:
"""Verify a JWT by calling the EncryptID server."""
try:
req = Request(
f'{server_url.rstrip("/")}/api/session/verify',
data=json.dumps({'token': token}).encode(),
headers={'Content-Type': 'application/json'},
method='POST',
)
with urlopen(req, timeout=5) as resp:
data = json.loads(resp.read())
if data.get('valid'):
return data.get('claims', {})
return None
except (URLError, json.JSONDecodeError, Exception):
return None
def verify_encryptid_token(token: str) -> Optional[dict]:
"""Verify an EncryptID JWT using configured method."""
secret = getattr(settings, 'ENCRYPTID_JWT_SECRET', None)
if secret:
return verify_jwt_local(token, secret)
server_url = getattr(
settings, 'ENCRYPTID_SERVER_URL', 'https://encryptid.jeffemmett.com'
)
return verify_jwt_remote(token, server_url)
# ---------------------------------------------------------------------------
# User resolution
# ---------------------------------------------------------------------------
def get_or_create_user(claims: dict):
"""Find or create a Django User from EncryptID claims."""
did = claims.get('did') or claims.get('sub', '')
username = claims.get('username', '')
# Try to find by DID first (stored in email field as convention)
try:
return User.objects.get(email=did)
except User.DoesNotExist:
pass
# Try by username
if username:
try:
return User.objects.get(username=username)
except User.DoesNotExist:
pass
# Create new user
safe_username = username or did[:30]
# Ensure unique username
base = safe_username
counter = 1
while User.objects.filter(username=safe_username).exists():
safe_username = f'{base}_{counter}'
counter += 1
return User.objects.create_user(
username=safe_username,
email=did, # Store DID in email field
password=None, # No password — passkey-only
)
# ---------------------------------------------------------------------------
# Django REST Framework Authentication Backend
# ---------------------------------------------------------------------------
try:
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from rest_framework import permissions
class EncryptIDAuthentication(BaseAuthentication):
"""
DRF authentication backend for EncryptID JWT tokens.
Accepts: Authorization: Bearer <encryptid-jwt>
"""
def authenticate(self, request):
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return None # Not our token — let other backends try
token = auth_header[7:]
claims = verify_encryptid_token(token)
if not claims:
raise AuthenticationFailed('Invalid or expired EncryptID token')
user = get_or_create_user(claims)
# Attach claims to request for downstream use
request.encryptid_claims = claims
return (user, token)
def authenticate_header(self, request):
return 'Bearer realm="encryptid"'
class SpacePermission(permissions.BasePermission):
"""
DRF permission that enforces SharedSpace visibility rules.
Expects the view to implement get_space_config(request) returning
{'visibility': str, 'owner_did': str} or None (no space context).
"""
def has_permission(self, request, view):
get_config = getattr(view, 'get_space_config', None)
if not get_config:
return True
config = get_config(request)
if not config:
return True # No space context — allow
visibility = config.get('visibility', 'public')
# Public spaces: anyone can read and write
if visibility == 'public':
return True
# Public-read spaces: anyone can read, auth required to write
if visibility == 'public_read':
if request.method in permissions.SAFE_METHODS:
return True
return request.user and request.user.is_authenticated
# Authenticated spaces: any logged-in user
if visibility == 'authenticated':
return request.user and request.user.is_authenticated
# Members-only: must be the space owner
if visibility == 'members_only':
if not (request.user and request.user.is_authenticated):
return False
owner_did = config.get('owner_did', '')
if not owner_did:
return False
return getattr(request.user, 'email', '') == owner_did
return False
except ImportError:
# DRF not installed — skip
pass
# ---------------------------------------------------------------------------
# Django Middleware (for non-DRF views)
# ---------------------------------------------------------------------------
class EncryptIDMiddleware:
"""
Django middleware that attaches EncryptID claims to request.
Does NOT enforce authentication — just populates request.encryptid_claims
and request.encryptid_user if a valid Bearer token is present.
"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
request.encryptid_claims = None
request.encryptid_user = None
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if auth_header.startswith('Bearer '):
token = auth_header[7:]
claims = verify_encryptid_token(token)
if claims:
request.encryptid_claims = claims
request.encryptid_user = get_or_create_user(claims)
return self.get_response(request)