feat: add live 360° stream splitting pipeline

Spawn N FFmpeg subprocesses per session, each reading RTMP and outputting
HLS per view. Includes session watchdog, status polling, graceful stop,
and HLS segment serving with CORS.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jeff Emmett 2026-03-16 14:28:10 -07:00
parent 2c39167ce9
commit 1b4a32da55
1 changed files with 265 additions and 1 deletions

266
app.py
View File

@ -2,20 +2,28 @@ import os
import subprocess import subprocess
import uuid import uuid
import shutil import shutil
import time
import signal
import threading
from pathlib import Path from pathlib import Path
from flask import Flask, render_template, request, jsonify, send_from_directory from flask import Flask, render_template, request, jsonify, send_from_directory
app = Flask(__name__) app = Flask(__name__)
app.config["UPLOAD_FOLDER"] = "/data/uploads" app.config["UPLOAD_FOLDER"] = "/data/uploads"
app.config["OUTPUT_FOLDER"] = "/data/output" app.config["OUTPUT_FOLDER"] = "/data/output"
app.config["LIVE_SPLIT_FOLDER"] = "/data/live-split"
app.config["MAX_CONTENT_LENGTH"] = 10 * 1024 * 1024 * 1024 # 10GB app.config["MAX_CONTENT_LENGTH"] = 10 * 1024 * 1024 * 1024 # 10GB
os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True) os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True)
os.makedirs(app.config["OUTPUT_FOLDER"], exist_ok=True) os.makedirs(app.config["OUTPUT_FOLDER"], exist_ok=True)
os.makedirs(app.config["LIVE_SPLIT_FOLDER"], exist_ok=True)
# Track job status in memory # Track job status in memory (file-based splits)
jobs = {} jobs = {}
# Track live-split sessions
live_sessions = {}
def get_video_info(filepath): def get_video_info(filepath):
"""Get video resolution and duration using ffprobe.""" """Get video resolution and duration using ffprobe."""
@ -183,5 +191,261 @@ def cleanup(job_id):
return jsonify({"status": "cleaned"}) return jsonify({"status": "cleaned"})
# ── Live-Split Session Management ──
STALE_SESSION_TIMEOUT = 300 # 5 min after all processes die
def start_live_split_session(stream_url, num_views, h_fov, v_fov, overlap, output_res):
"""Spawn N FFmpeg processes, each reading RTMP and outputting HLS for one view."""
session_id = "ls-" + str(uuid.uuid4())[:8]
session_dir = os.path.join(app.config["LIVE_SPLIT_FOLDER"], session_id)
effective_fov = h_fov + overlap
step = 360.0 / num_views
views = []
processes = []
for i in range(num_views):
yaw = i * step
label = get_direction_label(yaw, num_views)
view_dir = os.path.join(session_dir, f"view_{i}")
os.makedirs(view_dir, exist_ok=True)
vf = f"v360=equirect:flat:h_fov={effective_fov}:v_fov={v_fov}:yaw={yaw}"
if output_res:
vf += f",scale={output_res}"
cmd = [
"ffmpeg", "-re", "-i", stream_url,
"-vf", vf,
"-c:v", "libx264", "-preset", "veryfast", "-tune", "zerolatency",
"-g", "30", "-sc_threshold", "0",
"-c:a", "aac", "-b:a", "128k",
"-f", "hls", "-hls_time", "2", "-hls_list_size", "10",
"-hls_flags", "delete_segments+append_list",
os.path.join(view_dir, "stream.m3u8"),
]
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
processes.append(proc)
views.append({
"index": i,
"label": label,
"yaw": yaw,
"hls_path": f"view_{i}/stream.m3u8",
})
live_sessions[session_id] = {
"session_id": session_id,
"status": "running",
"started_at": time.time(),
"stream_url": stream_url,
"num_views": num_views,
"views": views,
"processes": processes,
"stopped_at": None,
"error": None,
}
return session_id
def stop_live_session(session_id):
"""SIGTERM all FFmpeg processes, wait, then SIGKILL survivors."""
session = live_sessions.get(session_id)
if not session:
return False
for proc in session["processes"]:
if proc.poll() is None:
try:
proc.send_signal(signal.SIGTERM)
except OSError:
pass
# Wait up to 5s for graceful shutdown
deadline = time.time() + 5
for proc in session["processes"]:
remaining = max(0, deadline - time.time())
try:
proc.wait(timeout=remaining)
except subprocess.TimeoutExpired:
try:
proc.kill()
except OSError:
pass
session["status"] = "stopped"
session["stopped_at"] = time.time()
return True
def check_session_health(session_id):
"""Check if FFmpeg processes are still alive."""
session = live_sessions.get(session_id)
if not session or session["status"] != "running":
return
all_dead = True
for proc in session["processes"]:
if proc.poll() is None:
all_dead = False
if all_dead:
# Try to read stderr from first dead process for error info
err_msg = ""
for proc in session["processes"]:
if proc.stderr:
try:
stderr_bytes = proc.stderr.read()
if stderr_bytes:
err_msg = stderr_bytes.decode("utf-8", errors="replace")[-500:]
break
except Exception:
pass
session["status"] = "error"
session["error"] = err_msg or "All FFmpeg processes exited unexpectedly"
session["stopped_at"] = time.time()
def watchdog_loop():
"""Background thread: check session health, clean stale dirs."""
while True:
time.sleep(30)
try:
for sid in list(live_sessions.keys()):
session = live_sessions.get(sid)
if not session:
continue
if session["status"] == "running":
check_session_health(sid)
# Clean up stale sessions (stopped/error for > 5 min)
if session["status"] in ("stopped", "error") and session.get("stopped_at"):
if time.time() - session["stopped_at"] > STALE_SESSION_TIMEOUT:
session_dir = os.path.join(app.config["LIVE_SPLIT_FOLDER"], sid)
if os.path.exists(session_dir):
shutil.rmtree(session_dir, ignore_errors=True)
del live_sessions[sid]
except Exception as e:
print(f"[watchdog] error: {e}")
# Start watchdog thread
_watchdog = threading.Thread(target=watchdog_loop, daemon=True)
_watchdog.start()
# ── Live-Split API Endpoints ──
@app.route("/live-split", methods=["POST"])
def live_split_start():
data = request.get_json()
if not data or not data.get("stream_url"):
return jsonify({"error": "stream_url required"}), 400
stream_url = data["stream_url"]
num_views = int(data.get("num_views", 4))
h_fov = float(data.get("h_fov", 360.0 / num_views))
v_fov = float(data.get("v_fov", 90))
overlap = float(data.get("overlap", 0))
output_res = data.get("output_res", "").strip()
try:
session_id = start_live_split_session(
stream_url, num_views, h_fov, v_fov, overlap, output_res
)
except Exception as e:
return jsonify({"error": str(e)}), 500
session = live_sessions[session_id]
hls_base = f"/live-split/hls/{session_id}"
return jsonify({
"session_id": session_id,
"status": session["status"],
"hls_base_url": hls_base,
"views": [
{
"index": v["index"],
"label": v["label"],
"yaw": v["yaw"],
"hls_url": f"{hls_base}/{v['hls_path']}",
}
for v in session["views"]
],
})
@app.route("/live-split/status/<session_id>")
def live_split_status(session_id):
session = live_sessions.get(session_id)
if not session:
return jsonify({"error": "Session not found"}), 404
# Check health on each status call
if session["status"] == "running":
check_session_health(session_id)
uptime = time.time() - session["started_at"]
views_status = []
for i, v in enumerate(session["views"]):
proc = session["processes"][i]
views_status.append({
"index": v["index"],
"label": v["label"],
"yaw": v["yaw"],
"hls_url": f"/live-split/hls/{session_id}/{v['hls_path']}",
"alive": proc.poll() is None,
})
return jsonify({
"session_id": session_id,
"status": session["status"],
"uptime_seconds": round(uptime, 1),
"views": views_status,
"error": session.get("error"),
})
@app.route("/live-split/stop/<session_id>", methods=["POST"])
def live_split_stop(session_id):
session = live_sessions.get(session_id)
if not session:
return jsonify({"error": "Session not found"}), 404
if session["status"] == "running":
stop_live_session(session_id)
duration = (session.get("stopped_at") or time.time()) - session["started_at"]
return jsonify({
"session_id": session_id,
"status": "stopped",
"duration_seconds": round(duration, 1),
})
@app.route("/live-split/hls/<session_id>/<path:subpath>")
def live_split_hls(session_id, subpath):
session_dir = os.path.join(app.config["LIVE_SPLIT_FOLDER"], session_id)
if not os.path.isdir(session_dir):
return jsonify({"error": "Session not found"}), 404
# Determine content type
content_type = "application/vnd.apple.mpegurl"
if subpath.endswith(".ts"):
content_type = "video/mp2t"
response = send_from_directory(session_dir, subpath)
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "Range"
response.headers["Cache-Control"] = "no-cache, no-store"
response.headers["Content-Type"] = content_type
return response
if __name__ == "__main__": if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False) app.run(host="0.0.0.0", port=5000, debug=False)