261 lines
8.6 KiB
Python
261 lines
8.6 KiB
Python
#!/usr/bin/env python3
|
|
"""Real-time HLS chunk uploader to R2."""
|
|
|
|
import sys
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Optional, Set
|
|
from collections import defaultdict
|
|
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler, FileCreatedEvent, FileModifiedEvent
|
|
|
|
from .config import Config
|
|
from .upload import R2Uploader
|
|
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HLSChunkHandler(FileSystemEventHandler):
|
|
"""Handler for HLS chunk files (.ts and .m3u8)."""
|
|
|
|
def __init__(self, uploader: R2Uploader, config: Config, stream_prefix: str = "live"):
|
|
"""Initialize the HLS chunk handler.
|
|
|
|
Args:
|
|
uploader: R2Uploader instance
|
|
config: Configuration object
|
|
stream_prefix: Prefix for R2 object names (e.g., "live/")
|
|
"""
|
|
super().__init__()
|
|
self.uploader = uploader
|
|
self.config = config
|
|
self.stream_prefix = stream_prefix
|
|
self.uploaded_files: Set[str] = set()
|
|
self.pending_files: Set[str] = set()
|
|
|
|
def is_hls_file(self, path: str) -> bool:
|
|
"""Check if file is an HLS file (.m3u8 or .ts).
|
|
|
|
Args:
|
|
path: File path to check
|
|
|
|
Returns:
|
|
True if it's an HLS file
|
|
"""
|
|
return Path(path).suffix.lower() in {'.m3u8', '.ts'}
|
|
|
|
def on_created(self, event):
|
|
"""Handle file creation event.
|
|
|
|
Args:
|
|
event: File system event
|
|
"""
|
|
if isinstance(event, FileCreatedEvent) and not event.is_directory:
|
|
if self.is_hls_file(event.src_path):
|
|
logger.info(f"New HLS chunk detected: {Path(event.src_path).name}")
|
|
self.pending_files.add(event.src_path)
|
|
|
|
def on_modified(self, event):
|
|
"""Handle file modification event.
|
|
|
|
Args:
|
|
event: File system event
|
|
"""
|
|
if isinstance(event, FileModifiedEvent) and not event.is_directory:
|
|
if self.is_hls_file(event.src_path):
|
|
# Always add .m3u8 files (they update frequently)
|
|
# Only add .ts files if not already uploaded
|
|
if event.src_path.endswith('.m3u8') or event.src_path not in self.uploaded_files:
|
|
self.pending_files.add(event.src_path)
|
|
|
|
def upload_chunk(self, file_path: Path):
|
|
"""Upload HLS chunk to R2.
|
|
|
|
Args:
|
|
file_path: Path to the chunk file
|
|
"""
|
|
try:
|
|
# Build R2 object name: live/{stream_name}/{filename}
|
|
# Extract stream name from filename (e.g., my-stream-0.ts -> my-stream)
|
|
filename = file_path.name
|
|
|
|
# Stream name is the base name before the segment number
|
|
# e.g., "my-stream-0.ts" -> "my-stream"
|
|
# "my-stream.m3u8" -> "my-stream"
|
|
if filename.endswith('.m3u8'):
|
|
stream_name = filename.replace('.m3u8', '')
|
|
elif '-' in filename and filename.endswith('.ts'):
|
|
# Split on last hyphen to get stream name
|
|
stream_name = filename.rsplit('-', 1)[0]
|
|
else:
|
|
stream_name = filename.split('.')[0]
|
|
|
|
object_name = f"{self.stream_prefix}/{stream_name}/{filename}"
|
|
|
|
# Read file content
|
|
with open(file_path, 'rb') as f:
|
|
data = f.read()
|
|
|
|
# Determine content type
|
|
content_type = 'application/vnd.apple.mpegurl' if file_path.suffix == '.m3u8' else 'video/MP2T'
|
|
|
|
# Upload to R2
|
|
self.uploader.s3_client.put_object(
|
|
Bucket=self.config.bucket_name,
|
|
Key=object_name,
|
|
Body=data,
|
|
ContentType=content_type,
|
|
CacheControl='no-cache' if file_path.suffix == '.m3u8' else 'public, max-age=31536000'
|
|
)
|
|
|
|
logger.info(f"✓ Uploaded: {object_name} ({len(data)} bytes)")
|
|
# Only track .ts files as uploaded (m3u8 needs to be re-uploaded on every change)
|
|
if file_path.suffix != '.m3u8':
|
|
self.uploaded_files.add(str(file_path))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to upload {file_path.name}: {e}")
|
|
|
|
def process_pending(self):
|
|
"""Process all pending files for upload."""
|
|
for file_path_str in list(self.pending_files):
|
|
file_path = Path(file_path_str)
|
|
|
|
# Skip if already uploaded
|
|
if file_path_str in self.uploaded_files:
|
|
self.pending_files.discard(file_path_str)
|
|
continue
|
|
|
|
# Skip if file doesn't exist
|
|
if not file_path.exists():
|
|
self.pending_files.discard(file_path_str)
|
|
continue
|
|
|
|
# For .m3u8 files, upload immediately (playlist updates frequently)
|
|
# For .ts files, wait a moment to ensure they're fully written
|
|
if file_path.suffix == '.m3u8':
|
|
self.upload_chunk(file_path)
|
|
self.pending_files.discard(file_path_str)
|
|
else:
|
|
# Check if file size is stable
|
|
try:
|
|
size = file_path.stat().st_size
|
|
if size > 0: # Only upload if file has content
|
|
self.upload_chunk(file_path)
|
|
self.pending_files.discard(file_path_str)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
class HLSWatcher:
|
|
"""Watch HLS directory and upload chunks to R2 in real-time."""
|
|
|
|
def __init__(self, hls_dir: Path, config: Config, stream_prefix: str = "live"):
|
|
"""Initialize the HLS watcher.
|
|
|
|
Args:
|
|
hls_dir: Directory where HLS files are written
|
|
config: Configuration object
|
|
stream_prefix: Prefix for R2 object names
|
|
"""
|
|
self.hls_dir = hls_dir
|
|
self.config = config
|
|
|
|
if not hls_dir.exists():
|
|
hls_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Initialize uploader
|
|
self.uploader = R2Uploader(config)
|
|
|
|
# Create event handler and observer
|
|
self.event_handler = HLSChunkHandler(self.uploader, config, stream_prefix)
|
|
self.observer = Observer()
|
|
self.observer.schedule(self.event_handler, str(hls_dir), recursive=True)
|
|
|
|
logger.info(f"Initialized HLS watcher for: {hls_dir}")
|
|
|
|
def upload_playlists(self):
|
|
"""Upload all m3u8 playlist files in the directory."""
|
|
for m3u8_file in self.hls_dir.glob("*.m3u8"):
|
|
if m3u8_file.exists():
|
|
self.event_handler.upload_chunk(m3u8_file)
|
|
|
|
def start(self):
|
|
"""Start watching the HLS directory."""
|
|
logger.info("Starting HLS watcher...")
|
|
print(f"\n{'='*60}")
|
|
print(f"HLS Real-Time Uploader - Active")
|
|
print(f"{'='*60}")
|
|
print(f"Watching: {self.hls_dir}")
|
|
print(f"Bucket: {self.config.bucket_name}")
|
|
print(f"Public Domain: {self.config.public_domain}")
|
|
print(f"{'='*60}")
|
|
print(f"Press Ctrl+C to stop\n")
|
|
|
|
self.observer.start()
|
|
|
|
playlist_upload_counter = 0
|
|
|
|
try:
|
|
while True:
|
|
# Process pending uploads every second
|
|
self.event_handler.process_pending()
|
|
|
|
# Upload playlists every 2 seconds
|
|
playlist_upload_counter += 1
|
|
if playlist_upload_counter >= 2:
|
|
self.upload_playlists()
|
|
playlist_upload_counter = 0
|
|
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
logger.info("Stopping HLS watcher...")
|
|
self.stop()
|
|
|
|
def stop(self):
|
|
"""Stop watching the directory."""
|
|
self.observer.stop()
|
|
self.observer.join()
|
|
logger.info("HLS watcher stopped")
|
|
|
|
|
|
def main():
|
|
"""Main entry point for HLS uploader CLI."""
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python -m obs_uploader.hls_uploader <hls_directory>")
|
|
print("\nExample:")
|
|
print(" python -m obs_uploader.hls_uploader /home/user/obs-r2-uploader/streaming/hls")
|
|
sys.exit(1)
|
|
|
|
hls_dir = Path(sys.argv[1])
|
|
|
|
# Load configuration
|
|
try:
|
|
config = Config()
|
|
except Exception as e:
|
|
logger.error(f"Failed to load configuration: {e}")
|
|
sys.exit(1)
|
|
|
|
# Start HLS watcher
|
|
try:
|
|
watcher = HLSWatcher(hls_dir, config)
|
|
watcher.start()
|
|
except KeyboardInterrupt:
|
|
logger.info("Stopped by user")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|