obs-r2-uploader/obs_uploader/file_watcher.py

287 lines
9.1 KiB
Python

#!/usr/bin/env python3
"""File watcher for automatic upload of new OBS recordings."""
import sys
import time
import logging
import threading
from pathlib import Path
from typing import Optional
from collections import defaultdict
from datetime import datetime
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileCreatedEvent, FileModifiedEvent
from .config import Config
from .upload import R2Uploader
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
class VideoFileHandler(FileSystemEventHandler):
"""Handler for video file system events."""
# Supported video extensions
VIDEO_EXTENSIONS = {'.mp4', '.mkv', '.mov', '.avi', '.webm', '.flv', '.wmv'}
# Time to wait before considering a file "stable" (not being written to)
STABILITY_TIMEOUT = 5 # seconds
def __init__(self, uploader: R2Uploader, config: Config):
"""Initialize the video file handler.
Args:
uploader: R2Uploader instance
config: Configuration object
"""
super().__init__()
self.uploader = uploader
self.config = config
# Track file sizes to detect when writing is complete
self.file_sizes = defaultdict(lambda: (0, time.time()))
# Track files that have been uploaded
self.uploaded_files = set()
def check_pending_files(self):
"""Check all pending files for stability and upload if ready."""
for file_path_str in list(self.file_sizes.keys()):
if file_path_str in self.uploaded_files:
continue
file_path = Path(file_path_str)
if not file_path.exists():
# File was deleted, remove from tracking
del self.file_sizes[file_path_str]
continue
if self.is_file_stable(file_path):
logger.info(f"File appears stable, preparing to upload: {file_path.name}")
self.upload_file(file_path)
def is_video_file(self, path: str) -> bool:
"""Check if the file is a video file.
Args:
path: File path to check
Returns:
True if it's a video file
"""
return Path(path).suffix.lower() in self.VIDEO_EXTENSIONS
def is_file_stable(self, file_path: Path) -> bool:
"""Check if file has stopped being written to.
Args:
file_path: Path to the file
Returns:
True if file size hasn't changed for STABILITY_TIMEOUT seconds
"""
try:
current_size = file_path.stat().st_size
last_size, last_check = self.file_sizes[str(file_path)]
# If size changed, update and reset timer
if current_size != last_size:
self.file_sizes[str(file_path)] = (current_size, time.time())
return False
# If size hasn't changed and enough time has passed
if time.time() - last_check >= self.STABILITY_TIMEOUT:
return True
return False
except FileNotFoundError:
return False
except Exception as e:
logger.error(f"Error checking file stability: {e}")
return False
def on_created(self, event):
"""Handle file creation event.
Args:
event: File system event
"""
if isinstance(event, FileCreatedEvent) and not event.is_directory:
if self.is_video_file(event.src_path):
logger.info(f"New video file detected: {event.src_path}")
file_path = Path(event.src_path)
# Initialize size tracking
try:
current_size = file_path.stat().st_size
self.file_sizes[event.src_path] = (current_size, time.time())
except Exception:
self.file_sizes[event.src_path] = (0, time.time())
def on_modified(self, event):
"""Handle file modification event.
Args:
event: File system event
"""
if isinstance(event, FileModifiedEvent) and not event.is_directory:
if self.is_video_file(event.src_path):
file_path = Path(event.src_path)
# Skip if already uploaded
if str(file_path) in self.uploaded_files:
return
# Check if file is stable (done being written)
if self.is_file_stable(file_path):
logger.info(f"File appears stable, preparing to upload: {file_path.name}")
self.upload_file(file_path)
def upload_file(self, file_path: Path):
"""Upload a video file to R2.
Args:
file_path: Path to the video file
"""
try:
logger.info(f"Starting upload: {file_path.name}")
public_url = self.uploader.upload_file(file_path)
if public_url:
# Mark as uploaded
self.uploaded_files.add(str(file_path))
print(f"\n{'='*60}")
print(f"✓ Auto-upload successful!")
print(f"{'='*60}")
print(f"File: {file_path.name}")
print(f"URL: {public_url}")
print(f"{'='*60}\n")
# Auto-delete if configured
if self.config.auto_delete:
try:
file_path.unlink()
logger.info(f"Deleted local file: {file_path}")
except Exception as e:
logger.error(f"Failed to delete local file: {e}")
else:
logger.error(f"Upload failed: {file_path.name}")
except Exception as e:
logger.error(f"Error uploading file: {e}")
finally:
# Clean up tracking
if str(file_path) in self.file_sizes:
del self.file_sizes[str(file_path)]
class FileWatcher:
"""Watch a directory for new video files and auto-upload them."""
def __init__(self, watch_dir: Path, config: Config):
"""Initialize the file watcher.
Args:
watch_dir: Directory to watch for new videos
config: Configuration object
"""
self.watch_dir = watch_dir
self.config = config
if not watch_dir.exists():
raise ValueError(f"Watch directory does not exist: {watch_dir}")
if not watch_dir.is_dir():
raise ValueError(f"Watch path is not a directory: {watch_dir}")
# Initialize uploader
self.uploader = R2Uploader(config)
# Create event handler and observer
self.event_handler = VideoFileHandler(self.uploader, config)
self.observer = Observer()
self.observer.schedule(self.event_handler, str(watch_dir), recursive=False)
logger.info(f"Initialized file watcher for: {watch_dir}")
def start(self):
"""Start watching the directory."""
logger.info("Starting file watcher...")
print(f"\n{'='*60}")
print(f"OBS R2 Auto-Uploader - File Watcher Active")
print(f"{'='*60}")
print(f"Watching: {self.watch_dir}")
print(f"Bucket: {self.config.bucket_name}")
print(f"Public Domain: {self.config.public_domain}")
print(f"Auto-delete: {'Enabled' if self.config.auto_delete else 'Disabled'}")
print(f"{'='*60}")
print(f"Press Ctrl+C to stop\n")
self.observer.start()
try:
while True:
# Periodically check for stable files
self.event_handler.check_pending_files()
time.sleep(1)
except KeyboardInterrupt:
logger.info("Stopping file watcher...")
self.stop()
def stop(self):
"""Stop watching the directory."""
self.observer.stop()
self.observer.join()
logger.info("File watcher stopped")
def main():
"""Main entry point for file watcher CLI."""
# Load configuration first
try:
config = Config()
except Exception as e:
logger.error(f"Failed to load configuration: {e}")
sys.exit(1)
# Determine watch directory
if len(sys.argv) >= 2:
watch_dir = Path(sys.argv[1])
elif config.obs_recording_dir:
watch_dir = Path(config.obs_recording_dir)
else:
print("Usage: python -m obs_uploader.file_watcher <watch_directory>")
print("\nExample:")
print(" python -m obs_uploader.file_watcher /path/to/obs/recordings")
print("\nOr set OBS_RECORDING_DIR in .env and run without arguments:")
print(" python -m obs_uploader.file_watcher")
logger.error("\nNo watch directory specified.")
logger.error("Either provide a directory as an argument or set OBS_RECORDING_DIR in .env")
sys.exit(1)
# Start file watcher
try:
watcher = FileWatcher(watch_dir, config)
watcher.start()
except KeyboardInterrupt:
logger.info("Stopped by user")
sys.exit(0)
except Exception as e:
logger.error(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()