obs-r2-uploader/obs_uploader/hls_uploader.py

261 lines
8.6 KiB
Python

#!/usr/bin/env python3
"""Real-time HLS chunk uploader to R2."""
import sys
import time
import logging
from pathlib import Path
from typing import Optional, Set
from collections import defaultdict
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileCreatedEvent, FileModifiedEvent
from .config import Config
from .upload import R2Uploader
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
class HLSChunkHandler(FileSystemEventHandler):
"""Handler for HLS chunk files (.ts and .m3u8)."""
def __init__(self, uploader: R2Uploader, config: Config, stream_prefix: str = "live"):
"""Initialize the HLS chunk handler.
Args:
uploader: R2Uploader instance
config: Configuration object
stream_prefix: Prefix for R2 object names (e.g., "live/")
"""
super().__init__()
self.uploader = uploader
self.config = config
self.stream_prefix = stream_prefix
self.uploaded_files: Set[str] = set()
self.pending_files: Set[str] = set()
def is_hls_file(self, path: str) -> bool:
"""Check if file is an HLS file (.m3u8 or .ts).
Args:
path: File path to check
Returns:
True if it's an HLS file
"""
return Path(path).suffix.lower() in {'.m3u8', '.ts'}
def on_created(self, event):
"""Handle file creation event.
Args:
event: File system event
"""
if isinstance(event, FileCreatedEvent) and not event.is_directory:
if self.is_hls_file(event.src_path):
logger.info(f"New HLS chunk detected: {Path(event.src_path).name}")
self.pending_files.add(event.src_path)
def on_modified(self, event):
"""Handle file modification event.
Args:
event: File system event
"""
if isinstance(event, FileModifiedEvent) and not event.is_directory:
if self.is_hls_file(event.src_path):
# Always add .m3u8 files (they update frequently)
# Only add .ts files if not already uploaded
if event.src_path.endswith('.m3u8') or event.src_path not in self.uploaded_files:
self.pending_files.add(event.src_path)
def upload_chunk(self, file_path: Path):
"""Upload HLS chunk to R2.
Args:
file_path: Path to the chunk file
"""
try:
# Build R2 object name: live/{stream_name}/{filename}
# Extract stream name from filename (e.g., my-stream-0.ts -> my-stream)
filename = file_path.name
# Stream name is the base name before the segment number
# e.g., "my-stream-0.ts" -> "my-stream"
# "my-stream.m3u8" -> "my-stream"
if filename.endswith('.m3u8'):
stream_name = filename.replace('.m3u8', '')
elif '-' in filename and filename.endswith('.ts'):
# Split on last hyphen to get stream name
stream_name = filename.rsplit('-', 1)[0]
else:
stream_name = filename.split('.')[0]
object_name = f"{self.stream_prefix}/{stream_name}/{filename}"
# Read file content
with open(file_path, 'rb') as f:
data = f.read()
# Determine content type
content_type = 'application/vnd.apple.mpegurl' if file_path.suffix == '.m3u8' else 'video/MP2T'
# Upload to R2
self.uploader.s3_client.put_object(
Bucket=self.config.bucket_name,
Key=object_name,
Body=data,
ContentType=content_type,
CacheControl='no-cache' if file_path.suffix == '.m3u8' else 'public, max-age=31536000'
)
logger.info(f"✓ Uploaded: {object_name} ({len(data)} bytes)")
# Only track .ts files as uploaded (m3u8 needs to be re-uploaded on every change)
if file_path.suffix != '.m3u8':
self.uploaded_files.add(str(file_path))
except Exception as e:
logger.error(f"Failed to upload {file_path.name}: {e}")
def process_pending(self):
"""Process all pending files for upload."""
for file_path_str in list(self.pending_files):
file_path = Path(file_path_str)
# Skip if already uploaded
if file_path_str in self.uploaded_files:
self.pending_files.discard(file_path_str)
continue
# Skip if file doesn't exist
if not file_path.exists():
self.pending_files.discard(file_path_str)
continue
# For .m3u8 files, upload immediately (playlist updates frequently)
# For .ts files, wait a moment to ensure they're fully written
if file_path.suffix == '.m3u8':
self.upload_chunk(file_path)
self.pending_files.discard(file_path_str)
else:
# Check if file size is stable
try:
size = file_path.stat().st_size
if size > 0: # Only upload if file has content
self.upload_chunk(file_path)
self.pending_files.discard(file_path_str)
except Exception:
pass
class HLSWatcher:
"""Watch HLS directory and upload chunks to R2 in real-time."""
def __init__(self, hls_dir: Path, config: Config, stream_prefix: str = "live"):
"""Initialize the HLS watcher.
Args:
hls_dir: Directory where HLS files are written
config: Configuration object
stream_prefix: Prefix for R2 object names
"""
self.hls_dir = hls_dir
self.config = config
if not hls_dir.exists():
hls_dir.mkdir(parents=True, exist_ok=True)
# Initialize uploader
self.uploader = R2Uploader(config)
# Create event handler and observer
self.event_handler = HLSChunkHandler(self.uploader, config, stream_prefix)
self.observer = Observer()
self.observer.schedule(self.event_handler, str(hls_dir), recursive=True)
logger.info(f"Initialized HLS watcher for: {hls_dir}")
def upload_playlists(self):
"""Upload all m3u8 playlist files in the directory."""
for m3u8_file in self.hls_dir.glob("*.m3u8"):
if m3u8_file.exists():
self.event_handler.upload_chunk(m3u8_file)
def start(self):
"""Start watching the HLS directory."""
logger.info("Starting HLS watcher...")
print(f"\n{'='*60}")
print(f"HLS Real-Time Uploader - Active")
print(f"{'='*60}")
print(f"Watching: {self.hls_dir}")
print(f"Bucket: {self.config.bucket_name}")
print(f"Public Domain: {self.config.public_domain}")
print(f"{'='*60}")
print(f"Press Ctrl+C to stop\n")
self.observer.start()
playlist_upload_counter = 0
try:
while True:
# Process pending uploads every second
self.event_handler.process_pending()
# Upload playlists every 2 seconds
playlist_upload_counter += 1
if playlist_upload_counter >= 2:
self.upload_playlists()
playlist_upload_counter = 0
time.sleep(1)
except KeyboardInterrupt:
logger.info("Stopping HLS watcher...")
self.stop()
def stop(self):
"""Stop watching the directory."""
self.observer.stop()
self.observer.join()
logger.info("HLS watcher stopped")
def main():
"""Main entry point for HLS uploader CLI."""
if len(sys.argv) < 2:
print("Usage: python -m obs_uploader.hls_uploader <hls_directory>")
print("\nExample:")
print(" python -m obs_uploader.hls_uploader /home/user/obs-r2-uploader/streaming/hls")
sys.exit(1)
hls_dir = Path(sys.argv[1])
# Load configuration
try:
config = Config()
except Exception as e:
logger.error(f"Failed to load configuration: {e}")
sys.exit(1)
# Start HLS watcher
try:
watcher = HLSWatcher(hls_dir, config)
watcher.start()
except KeyboardInterrupt:
logger.info("Stopped by user")
sys.exit(0)
except Exception as e:
logger.error(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()