rtube-online/nginx-rtmp/scripts/archive-worker.py

89 lines
2.2 KiB
Python

#!/usr/bin/env python3
"""
Archive worker - converts completed streams to MP4 and uploads to R2.
Ported from streaming-server archive-worker.py, using rtube-videos bucket.
"""
import os
import subprocess
import time
import threading
from flask import Flask
app = Flask(__name__)
RECORDINGS_DIR = "/recordings"
R2_BUCKET = os.environ.get("R2_BUCKET", "rtube-videos")
ARCHIVE_PREFIX = "streams"
def upload_to_r2(filepath):
"""Convert FLV to MP4 and upload to R2."""
filename = os.path.basename(filepath)
mp4_path = filepath.replace(".flv", ".mp4")
print(f"Converting {filename} to MP4...")
try:
subprocess.run(
[
"ffmpeg", "-i", filepath,
"-c", "copy",
"-movflags", "+faststart",
mp4_path,
],
check=True,
capture_output=True,
)
dest = f"r2:{R2_BUCKET}/{ARCHIVE_PREFIX}/"
print(f"Uploading {os.path.basename(mp4_path)} to {dest}...")
subprocess.run(
["rclone", "copy", mp4_path, dest],
check=True,
)
print(f"Uploaded: {mp4_path}")
# Cleanup local files
os.remove(filepath)
os.remove(mp4_path)
print("Cleaned up local files")
except Exception as e:
print(f"Error processing {filename}: {e}")
def process_recordings():
"""Process any pending FLV recordings."""
time.sleep(5) # Wait for file to finish writing
for f in os.listdir(RECORDINGS_DIR):
if f.endswith(".flv"):
filepath = os.path.join(RECORDINGS_DIR, f)
# Check if file is still being written
size1 = os.path.getsize(filepath)
time.sleep(2)
size2 = os.path.getsize(filepath)
if size1 == size2: # File is complete
upload_to_r2(filepath)
@app.route("/archive", methods=["POST", "GET"])
def archive():
"""Called when stream ends."""
print("Stream ended, processing recordings...")
thread = threading.Thread(target=process_recordings)
thread.start()
return "OK", 200
@app.route("/health")
def health():
return "OK", 200
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8081)