325 lines
11 KiB
Python
325 lines
11 KiB
Python
"""
|
|
API views for file upload and public sharing.
|
|
"""
|
|
|
|
from django.http import FileResponse, JsonResponse
|
|
from django.shortcuts import get_object_or_404
|
|
from django.views import View
|
|
from django.utils.decorators import method_decorator
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
|
|
from rest_framework import viewsets, status, permissions
|
|
from rest_framework.decorators import action
|
|
from rest_framework.response import Response
|
|
from rest_framework.parsers import MultiPartParser, FormParser, JSONParser
|
|
|
|
from .models import MediaFile, PublicShare, FileAccessLog, SharedSpace, MemoryCard
|
|
from config.encryptid_auth import SpacePermission
|
|
from .serializers import (
|
|
MediaFileSerializer,
|
|
MediaFileUploadSerializer,
|
|
PublicShareSerializer,
|
|
PublicShareCreateSerializer,
|
|
PublicShareInfoSerializer,
|
|
FileAccessLogSerializer,
|
|
MemoryCardSerializer,
|
|
)
|
|
|
|
|
|
class MediaFileViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for managing uploaded media files."""
|
|
|
|
queryset = MediaFile.objects.all()
|
|
serializer_class = MediaFileSerializer
|
|
parser_classes = [MultiPartParser, FormParser, JSONParser]
|
|
permission_classes = [permissions.IsAuthenticatedOrReadOnly, SpacePermission]
|
|
|
|
def get_space_config(self, request):
|
|
"""Provide space config for SpacePermission check."""
|
|
space_slug = request.query_params.get('space')
|
|
if not space_slug:
|
|
return None
|
|
try:
|
|
space = SharedSpace.objects.get(slug=space_slug, is_active=True)
|
|
return {'visibility': space.visibility, 'owner_did': space.owner_did}
|
|
except SharedSpace.DoesNotExist:
|
|
return None
|
|
|
|
def get_serializer_class(self):
|
|
if self.action == 'create':
|
|
return MediaFileUploadSerializer
|
|
return MediaFileSerializer
|
|
|
|
def get_queryset(self):
|
|
queryset = MediaFile.objects.all()
|
|
|
|
mime_type = self.request.query_params.get('mime_type')
|
|
if mime_type:
|
|
queryset = queryset.filter(mime_type__startswith=mime_type)
|
|
|
|
tags = self.request.query_params.getlist('tag')
|
|
if tags:
|
|
for tag in tags:
|
|
queryset = queryset.filter(tags__contains=tag)
|
|
|
|
space = self.request.query_params.get('space')
|
|
if space:
|
|
queryset = queryset.filter(shared_space__slug=space)
|
|
|
|
return queryset
|
|
|
|
def perform_create(self, serializer):
|
|
serializer.save()
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def share(self, request, pk=None):
|
|
"""Create a public share link for this file."""
|
|
media_file = self.get_object()
|
|
|
|
data = request.data.copy()
|
|
data['media_file'] = media_file.id
|
|
|
|
serializer = PublicShareCreateSerializer(
|
|
data=data,
|
|
context={'request': request}
|
|
)
|
|
serializer.is_valid(raise_exception=True)
|
|
share = serializer.save()
|
|
|
|
FileAccessLog.objects.create(
|
|
media_file=media_file,
|
|
share=share,
|
|
ip_address=self._get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', '')[:500],
|
|
access_type='share_created',
|
|
user=request.user if request.user.is_authenticated else None
|
|
)
|
|
|
|
return Response(
|
|
PublicShareSerializer(share, context={'request': request}).data,
|
|
status=status.HTTP_201_CREATED
|
|
)
|
|
|
|
@action(detail=True, methods=['get'])
|
|
def shares(self, request, pk=None):
|
|
"""List all shares for this file."""
|
|
media_file = self.get_object()
|
|
shares = media_file.public_shares.all()
|
|
|
|
if request.query_params.get('active') == 'true':
|
|
shares = media_file.get_public_shares()
|
|
|
|
serializer = PublicShareSerializer(
|
|
shares,
|
|
many=True,
|
|
context={'request': request}
|
|
)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=True, methods=['get'])
|
|
def access_logs(self, request, pk=None):
|
|
"""Get access logs for this file."""
|
|
media_file = self.get_object()
|
|
logs = media_file.access_logs.all()[:100]
|
|
|
|
serializer = FileAccessLogSerializer(logs, many=True)
|
|
return Response(serializer.data)
|
|
|
|
def _get_client_ip(self, request):
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
return x_forwarded_for.split(',')[0].strip()
|
|
return request.META.get('REMOTE_ADDR', '')
|
|
|
|
|
|
class PublicShareViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for managing public shares."""
|
|
|
|
queryset = PublicShare.objects.all()
|
|
serializer_class = PublicShareSerializer
|
|
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
|
|
|
|
def get_serializer_class(self):
|
|
if self.action == 'create':
|
|
return PublicShareCreateSerializer
|
|
return PublicShareSerializer
|
|
|
|
def get_queryset(self):
|
|
queryset = PublicShare.objects.all()
|
|
|
|
file_id = self.request.query_params.get('file')
|
|
if file_id:
|
|
queryset = queryset.filter(media_file_id=file_id)
|
|
|
|
active = self.request.query_params.get('active')
|
|
if active == 'true':
|
|
from django.db.models import Q
|
|
from django.utils import timezone
|
|
queryset = queryset.filter(
|
|
is_active=True
|
|
).filter(
|
|
Q(expires_at__isnull=True) | Q(expires_at__gt=timezone.now())
|
|
)
|
|
|
|
return queryset
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def revoke(self, request, pk=None):
|
|
"""Revoke a share link."""
|
|
share = self.get_object()
|
|
share.is_active = False
|
|
share.save(update_fields=['is_active'])
|
|
|
|
FileAccessLog.objects.create(
|
|
media_file=share.media_file,
|
|
share=share,
|
|
ip_address=self._get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', '')[:500],
|
|
access_type='share_revoked',
|
|
user=request.user if request.user.is_authenticated else None
|
|
)
|
|
|
|
return Response({'status': 'revoked'})
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def set_password(self, request, pk=None):
|
|
"""Set or update password for a share."""
|
|
share = self.get_object()
|
|
password = request.data.get('password')
|
|
|
|
if not password:
|
|
return Response(
|
|
{'error': 'Password is required'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
share.set_password(password)
|
|
return Response({'status': 'password_set'})
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def remove_password(self, request, pk=None):
|
|
"""Remove password protection from a share."""
|
|
share = self.get_object()
|
|
share.is_password_protected = False
|
|
share.password_hash = ''
|
|
share.save(update_fields=['is_password_protected', 'password_hash'])
|
|
return Response({'status': 'password_removed'})
|
|
|
|
def _get_client_ip(self, request):
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
return x_forwarded_for.split(',')[0].strip()
|
|
return request.META.get('REMOTE_ADDR', '')
|
|
|
|
|
|
class MemoryCardViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for managing memory cards within shared spaces."""
|
|
|
|
queryset = MemoryCard.objects.all()
|
|
serializer_class = MemoryCardSerializer
|
|
permission_classes = [permissions.IsAuthenticatedOrReadOnly, SpacePermission]
|
|
|
|
def get_space_config(self, request):
|
|
"""Provide space config for SpacePermission check."""
|
|
space_slug = request.query_params.get('space')
|
|
if not space_slug:
|
|
return None
|
|
try:
|
|
space = SharedSpace.objects.get(slug=space_slug, is_active=True)
|
|
return {'visibility': space.visibility, 'owner_did': space.owner_did}
|
|
except SharedSpace.DoesNotExist:
|
|
return None
|
|
|
|
def get_queryset(self):
|
|
queryset = MemoryCard.objects.all()
|
|
|
|
space = self.request.query_params.get('space')
|
|
if space:
|
|
queryset = queryset.filter(shared_space__slug=space)
|
|
|
|
card_type = self.request.query_params.get('type')
|
|
if card_type:
|
|
queryset = queryset.filter(card_type=card_type)
|
|
|
|
tags = self.request.query_params.getlist('tag')
|
|
if tags:
|
|
for tag in tags:
|
|
queryset = queryset.filter(tags__contains=tag)
|
|
|
|
return queryset
|
|
|
|
|
|
@method_decorator(csrf_exempt, name='dispatch')
|
|
class PublicDownloadView(View):
|
|
"""Public view for downloading shared files."""
|
|
|
|
def get(self, request, token):
|
|
share = get_object_or_404(PublicShare, token=token)
|
|
|
|
if not share.is_valid:
|
|
if share.is_expired:
|
|
return JsonResponse({'error': 'This share link has expired'}, status=410)
|
|
if share.is_download_limit_reached:
|
|
return JsonResponse({'error': 'Download limit reached'}, status=410)
|
|
if not share.is_active:
|
|
return JsonResponse({'error': 'This share link has been revoked'}, status=410)
|
|
return JsonResponse({'error': 'Share link is not valid'}, status=403)
|
|
|
|
if share.is_password_protected:
|
|
password = request.GET.get('password') or request.POST.get('password')
|
|
if not password:
|
|
return JsonResponse(
|
|
{'error': 'Password required', 'is_password_protected': True},
|
|
status=401
|
|
)
|
|
if not share.check_password(password):
|
|
return JsonResponse({'error': 'Invalid password'}, status=401)
|
|
|
|
share.record_download(request)
|
|
|
|
media_file = share.media_file
|
|
try:
|
|
response = FileResponse(
|
|
media_file.file.open('rb'),
|
|
content_type=media_file.mime_type or 'application/octet-stream'
|
|
)
|
|
response['Content-Disposition'] = f'attachment; filename="{media_file.original_filename}"'
|
|
response['Content-Length'] = media_file.file_size
|
|
return response
|
|
except FileNotFoundError:
|
|
return JsonResponse({'error': 'File not found'}, status=404)
|
|
|
|
|
|
@method_decorator(csrf_exempt, name='dispatch')
|
|
class PublicShareInfoView(View):
|
|
"""Public view for getting share info without downloading."""
|
|
|
|
def get(self, request, token):
|
|
share = get_object_or_404(PublicShare, token=token)
|
|
serializer = PublicShareInfoSerializer(share)
|
|
return JsonResponse(serializer.data)
|
|
|
|
|
|
@method_decorator(csrf_exempt, name='dispatch')
|
|
class PublicShareVerifyPasswordView(View):
|
|
"""Public view for verifying share password."""
|
|
|
|
def post(self, request, token):
|
|
import json
|
|
|
|
share = get_object_or_404(PublicShare, token=token)
|
|
|
|
if not share.is_password_protected:
|
|
return JsonResponse({'valid': True, 'message': 'No password required'})
|
|
|
|
try:
|
|
data = json.loads(request.body)
|
|
password = data.get('password', '')
|
|
except json.JSONDecodeError:
|
|
password = request.POST.get('password', '')
|
|
|
|
if share.check_password(password):
|
|
return JsonResponse({'valid': True})
|
|
else:
|
|
return JsonResponse({'valid': False, 'error': 'Invalid password'}, status=401)
|