obs-r2-uploader/obs_uploader/upload.py

311 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""Upload video files to Cloudflare R2 storage."""
import sys
import os
import logging
from pathlib import Path
from typing import Optional
import mimetypes
import boto3
from botocore.config import Config as BotoConfig
from botocore.exceptions import ClientError, NoCredentialsError
from tqdm import tqdm
import pyperclip
from .config import Config
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
class R2Uploader:
"""Upload videos to Cloudflare R2 with progress tracking."""
# Supported video formats
SUPPORTED_FORMATS = {'.mp4', '.mkv', '.mov', '.avi', '.webm', '.flv', '.wmv'}
# Multipart upload threshold (100MB)
MULTIPART_THRESHOLD = 100 * 1024 * 1024
MULTIPART_CHUNKSIZE = 10 * 1024 * 1024 # 10MB chunks
def __init__(self, config: Config):
"""Initialize the R2 uploader.
Args:
config: Configuration object with R2 credentials
"""
self.config = config
# Validate configuration
is_valid, missing = config.validate()
if not is_valid:
raise ValueError(f"Missing required configuration: {', '.join(missing)}")
# Initialize S3 client for R2
self.s3_client = boto3.client(
's3',
endpoint_url=config.endpoint,
aws_access_key_id=config.access_key_id,
aws_secret_access_key=config.secret_access_key,
config=BotoConfig(
signature_version='s3v4',
retries={'max_attempts': 3, 'mode': 'adaptive'},
# Disable payload signing for R2 compatibility
s3={'payload_signing_enabled': False}
)
)
logger.info(f"Initialized R2 uploader for bucket: {config.bucket_name}")
def validate_file(self, file_path: Path) -> bool:
"""Validate that the file exists and is a supported video format.
Args:
file_path: Path to the video file
Returns:
True if valid, False otherwise
"""
if not file_path.exists():
logger.error(f"File does not exist: {file_path}")
return False
if not file_path.is_file():
logger.error(f"Path is not a file: {file_path}")
return False
if file_path.suffix.lower() not in self.SUPPORTED_FORMATS:
logger.error(
f"Unsupported file format: {file_path.suffix}. "
f"Supported formats: {', '.join(self.SUPPORTED_FORMATS)}"
)
return False
return True
def get_content_type(self, file_path: Path) -> str:
"""Determine the MIME type for the file.
Args:
file_path: Path to the file
Returns:
MIME type string
"""
mime_type, _ = mimetypes.guess_type(str(file_path))
return mime_type or 'application/octet-stream'
def upload_file(
self,
file_path: Path,
object_name: Optional[str] = None,
public: bool = True
) -> Optional[str]:
"""Upload a file to R2.
Args:
file_path: Path to the file to upload
object_name: S3 object name. If not specified, file_path.name is used
public: Whether to make the file publicly readable
Returns:
Public URL of the uploaded file, or None if upload failed
"""
if not self.validate_file(file_path):
return None
# Use filename as object name if not specified
if object_name is None:
object_name = file_path.name
file_size = file_path.stat().st_size
content_type = self.get_content_type(file_path)
logger.info(f"Uploading {file_path.name} ({file_size / (1024*1024):.2f} MB)")
try:
# Prepare extra arguments
extra_args = {
'ContentType': content_type,
}
# Add public-read ACL if requested (R2 handles this differently)
# For R2, we'll rely on bucket-level public access configuration
# Use multipart upload for large files
if file_size > self.MULTIPART_THRESHOLD:
logger.info("Using multipart upload for large file")
self._upload_multipart(file_path, object_name, extra_args)
else:
self._upload_simple(file_path, object_name, extra_args)
logger.info(f"✓ Upload successful: {object_name}")
# Generate public URL
public_url = self.config.get_public_url(object_name)
return public_url
except NoCredentialsError:
logger.error("Invalid R2 credentials")
return None
except ClientError as e:
logger.error(f"Upload failed: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error during upload: {e}")
return None
def _upload_simple(self, file_path: Path, object_name: str, extra_args: dict):
"""Upload file using simple put_object.
Args:
file_path: Path to file
object_name: Object name in R2
extra_args: Extra arguments for upload
"""
file_size = file_path.stat().st_size
with open(file_path, 'rb') as f:
with tqdm(
total=file_size,
unit='B',
unit_scale=True,
desc=f"Uploading {file_path.name}"
) as pbar:
# Read file into memory for small files to avoid checksum issues
data = f.read()
pbar.update(len(data))
self.s3_client.put_object(
Bucket=self.config.bucket_name,
Key=object_name,
Body=data,
**extra_args
)
def _upload_multipart(self, file_path: Path, object_name: str, extra_args: dict):
"""Upload file using multipart upload with progress tracking.
Args:
file_path: Path to file
object_name: Object name in R2
extra_args: Extra arguments for upload
"""
# Use boto3's upload_file with automatic multipart handling
self.s3_client.upload_file(
str(file_path),
self.config.bucket_name,
object_name,
ExtraArgs=extra_args,
Config=boto3.s3.transfer.TransferConfig(
multipart_threshold=self.MULTIPART_THRESHOLD,
multipart_chunksize=self.MULTIPART_CHUNKSIZE,
use_threads=True
),
Callback=ProgressCallback(file_path.name, file_path.stat().st_size)
)
class ProgressCallback:
"""Callback for tracking upload progress with tqdm."""
def __init__(self, filename: str, size: int):
"""Initialize progress callback.
Args:
filename: Name of file being uploaded
size: Total file size in bytes
"""
self.filename = filename
self.size = size
self.pbar = tqdm(
total=size,
unit='B',
unit_scale=True,
desc=f"Uploading {filename}"
)
def __call__(self, bytes_transferred: int):
"""Update progress bar.
Args:
bytes_transferred: Number of bytes transferred in this chunk
"""
self.pbar.update(bytes_transferred)
def __del__(self):
"""Close progress bar."""
if hasattr(self, 'pbar'):
self.pbar.close()
def main():
"""Main entry point for CLI usage."""
if len(sys.argv) < 2:
print("Usage: python -m obs_uploader.upload <video_file> [object_name]")
print("\nExample:")
print(" python -m obs_uploader.upload /path/to/video.mp4")
print(" python -m obs_uploader.upload /path/to/video.mp4 my-custom-name.mp4")
sys.exit(1)
file_path = Path(sys.argv[1])
object_name = sys.argv[2] if len(sys.argv) > 2 else None
# Load configuration
try:
config = Config()
except Exception as e:
logger.error(f"Failed to load configuration: {e}")
logger.error("Make sure .env file exists and contains required settings.")
logger.error("See .env.example for required configuration.")
sys.exit(1)
# Create uploader and upload file
try:
uploader = R2Uploader(config)
public_url = uploader.upload_file(file_path, object_name)
if public_url:
print(f"\n{'='*60}")
print(f"✓ Upload successful!")
print(f"{'='*60}")
print(f"\nPublic URL: {public_url}")
print(f"\nThe URL has been copied to your clipboard.")
print(f"{'='*60}\n")
# Copy to clipboard
try:
pyperclip.copy(public_url)
except Exception as e:
logger.warning(f"Could not copy to clipboard: {e}")
# Auto-delete if configured
if config.auto_delete:
try:
file_path.unlink()
logger.info(f"Deleted local file: {file_path}")
except Exception as e:
logger.error(f"Failed to delete local file: {e}")
sys.exit(0)
else:
logger.error("Upload failed")
sys.exit(1)
except Exception as e:
logger.error(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()