#!/usr/bin/env python3 """Real-time HLS chunk uploader to R2.""" import sys import time import logging from pathlib import Path from typing import Optional, Set from collections import defaultdict from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler, FileCreatedEvent, FileModifiedEvent from .config import Config from .upload import R2Uploader logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) class HLSChunkHandler(FileSystemEventHandler): """Handler for HLS chunk files (.ts and .m3u8).""" def __init__(self, uploader: R2Uploader, config: Config, stream_prefix: str = "live"): """Initialize the HLS chunk handler. Args: uploader: R2Uploader instance config: Configuration object stream_prefix: Prefix for R2 object names (e.g., "live/") """ super().__init__() self.uploader = uploader self.config = config self.stream_prefix = stream_prefix self.uploaded_files: Set[str] = set() self.pending_files: Set[str] = set() def is_hls_file(self, path: str) -> bool: """Check if file is an HLS file (.m3u8 or .ts). Args: path: File path to check Returns: True if it's an HLS file """ return Path(path).suffix.lower() in {'.m3u8', '.ts'} def on_created(self, event): """Handle file creation event. Args: event: File system event """ if isinstance(event, FileCreatedEvent) and not event.is_directory: if self.is_hls_file(event.src_path): logger.info(f"New HLS chunk detected: {Path(event.src_path).name}") self.pending_files.add(event.src_path) def on_modified(self, event): """Handle file modification event. Args: event: File system event """ if isinstance(event, FileModifiedEvent) and not event.is_directory: if self.is_hls_file(event.src_path): # Always add .m3u8 files (they update frequently) # Only add .ts files if not already uploaded if event.src_path.endswith('.m3u8') or event.src_path not in self.uploaded_files: self.pending_files.add(event.src_path) def upload_chunk(self, file_path: Path): """Upload HLS chunk to R2. Args: file_path: Path to the chunk file """ try: # Build R2 object name: live/{stream_name}/{filename} # Extract stream name from filename (e.g., my-stream-0.ts -> my-stream) filename = file_path.name # Stream name is the base name before the segment number # e.g., "my-stream-0.ts" -> "my-stream" # "my-stream.m3u8" -> "my-stream" if filename.endswith('.m3u8'): stream_name = filename.replace('.m3u8', '') elif '-' in filename and filename.endswith('.ts'): # Split on last hyphen to get stream name stream_name = filename.rsplit('-', 1)[0] else: stream_name = filename.split('.')[0] object_name = f"{self.stream_prefix}/{stream_name}/{filename}" # Read file content with open(file_path, 'rb') as f: data = f.read() # Determine content type content_type = 'application/vnd.apple.mpegurl' if file_path.suffix == '.m3u8' else 'video/MP2T' # Upload to R2 self.uploader.s3_client.put_object( Bucket=self.config.bucket_name, Key=object_name, Body=data, ContentType=content_type, CacheControl='no-cache' if file_path.suffix == '.m3u8' else 'public, max-age=31536000' ) logger.info(f"✓ Uploaded: {object_name} ({len(data)} bytes)") # Only track .ts files as uploaded (m3u8 needs to be re-uploaded on every change) if file_path.suffix != '.m3u8': self.uploaded_files.add(str(file_path)) except Exception as e: logger.error(f"Failed to upload {file_path.name}: {e}") def process_pending(self): """Process all pending files for upload.""" for file_path_str in list(self.pending_files): file_path = Path(file_path_str) # Skip if already uploaded if file_path_str in self.uploaded_files: self.pending_files.discard(file_path_str) continue # Skip if file doesn't exist if not file_path.exists(): self.pending_files.discard(file_path_str) continue # For .m3u8 files, upload immediately (playlist updates frequently) # For .ts files, wait a moment to ensure they're fully written if file_path.suffix == '.m3u8': self.upload_chunk(file_path) self.pending_files.discard(file_path_str) else: # Check if file size is stable try: size = file_path.stat().st_size if size > 0: # Only upload if file has content self.upload_chunk(file_path) self.pending_files.discard(file_path_str) except Exception: pass class HLSWatcher: """Watch HLS directory and upload chunks to R2 in real-time.""" def __init__(self, hls_dir: Path, config: Config, stream_prefix: str = "live"): """Initialize the HLS watcher. Args: hls_dir: Directory where HLS files are written config: Configuration object stream_prefix: Prefix for R2 object names """ self.hls_dir = hls_dir self.config = config if not hls_dir.exists(): hls_dir.mkdir(parents=True, exist_ok=True) # Initialize uploader self.uploader = R2Uploader(config) # Create event handler and observer self.event_handler = HLSChunkHandler(self.uploader, config, stream_prefix) self.observer = Observer() self.observer.schedule(self.event_handler, str(hls_dir), recursive=True) logger.info(f"Initialized HLS watcher for: {hls_dir}") def upload_playlists(self): """Upload all m3u8 playlist files in the directory.""" for m3u8_file in self.hls_dir.glob("*.m3u8"): if m3u8_file.exists(): self.event_handler.upload_chunk(m3u8_file) def start(self): """Start watching the HLS directory.""" logger.info("Starting HLS watcher...") print(f"\n{'='*60}") print(f"HLS Real-Time Uploader - Active") print(f"{'='*60}") print(f"Watching: {self.hls_dir}") print(f"Bucket: {self.config.bucket_name}") print(f"Public Domain: {self.config.public_domain}") print(f"{'='*60}") print(f"Press Ctrl+C to stop\n") self.observer.start() playlist_upload_counter = 0 try: while True: # Process pending uploads every second self.event_handler.process_pending() # Upload playlists every 2 seconds playlist_upload_counter += 1 if playlist_upload_counter >= 2: self.upload_playlists() playlist_upload_counter = 0 time.sleep(1) except KeyboardInterrupt: logger.info("Stopping HLS watcher...") self.stop() def stop(self): """Stop watching the directory.""" self.observer.stop() self.observer.join() logger.info("HLS watcher stopped") def main(): """Main entry point for HLS uploader CLI.""" if len(sys.argv) < 2: print("Usage: python -m obs_uploader.hls_uploader ") print("\nExample:") print(" python -m obs_uploader.hls_uploader /home/user/obs-r2-uploader/streaming/hls") sys.exit(1) hls_dir = Path(sys.argv[1]) # Load configuration try: config = Config() except Exception as e: logger.error(f"Failed to load configuration: {e}") sys.exit(1) # Start HLS watcher try: watcher = HLSWatcher(hls_dir, config) watcher.start() except KeyboardInterrupt: logger.info("Stopped by user") sys.exit(0) except Exception as e: logger.error(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()