diff --git a/app.py b/app.py index 0ae7af3..bd65730 100644 --- a/app.py +++ b/app.py @@ -2,20 +2,28 @@ import os import subprocess import uuid import shutil +import time +import signal +import threading from pathlib import Path from flask import Flask, render_template, request, jsonify, send_from_directory app = Flask(__name__) app.config["UPLOAD_FOLDER"] = "/data/uploads" app.config["OUTPUT_FOLDER"] = "/data/output" +app.config["LIVE_SPLIT_FOLDER"] = "/data/live-split" app.config["MAX_CONTENT_LENGTH"] = 10 * 1024 * 1024 * 1024 # 10GB os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True) os.makedirs(app.config["OUTPUT_FOLDER"], exist_ok=True) +os.makedirs(app.config["LIVE_SPLIT_FOLDER"], exist_ok=True) -# Track job status in memory +# Track job status in memory (file-based splits) jobs = {} +# Track live-split sessions +live_sessions = {} + def get_video_info(filepath): """Get video resolution and duration using ffprobe.""" @@ -183,5 +191,261 @@ def cleanup(job_id): return jsonify({"status": "cleaned"}) +# ── Live-Split Session Management ── + +STALE_SESSION_TIMEOUT = 300 # 5 min after all processes die + + +def start_live_split_session(stream_url, num_views, h_fov, v_fov, overlap, output_res): + """Spawn N FFmpeg processes, each reading RTMP and outputting HLS for one view.""" + session_id = "ls-" + str(uuid.uuid4())[:8] + session_dir = os.path.join(app.config["LIVE_SPLIT_FOLDER"], session_id) + + effective_fov = h_fov + overlap + step = 360.0 / num_views + views = [] + processes = [] + + for i in range(num_views): + yaw = i * step + label = get_direction_label(yaw, num_views) + view_dir = os.path.join(session_dir, f"view_{i}") + os.makedirs(view_dir, exist_ok=True) + + vf = f"v360=equirect:flat:h_fov={effective_fov}:v_fov={v_fov}:yaw={yaw}" + if output_res: + vf += f",scale={output_res}" + + cmd = [ + "ffmpeg", "-re", "-i", stream_url, + "-vf", vf, + "-c:v", "libx264", "-preset", "veryfast", "-tune", "zerolatency", + "-g", "30", "-sc_threshold", "0", + "-c:a", "aac", "-b:a", "128k", + "-f", "hls", "-hls_time", "2", "-hls_list_size", "10", + "-hls_flags", "delete_segments+append_list", + os.path.join(view_dir, "stream.m3u8"), + ] + + proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) + processes.append(proc) + + views.append({ + "index": i, + "label": label, + "yaw": yaw, + "hls_path": f"view_{i}/stream.m3u8", + }) + + live_sessions[session_id] = { + "session_id": session_id, + "status": "running", + "started_at": time.time(), + "stream_url": stream_url, + "num_views": num_views, + "views": views, + "processes": processes, + "stopped_at": None, + "error": None, + } + + return session_id + + +def stop_live_session(session_id): + """SIGTERM all FFmpeg processes, wait, then SIGKILL survivors.""" + session = live_sessions.get(session_id) + if not session: + return False + + for proc in session["processes"]: + if proc.poll() is None: + try: + proc.send_signal(signal.SIGTERM) + except OSError: + pass + + # Wait up to 5s for graceful shutdown + deadline = time.time() + 5 + for proc in session["processes"]: + remaining = max(0, deadline - time.time()) + try: + proc.wait(timeout=remaining) + except subprocess.TimeoutExpired: + try: + proc.kill() + except OSError: + pass + + session["status"] = "stopped" + session["stopped_at"] = time.time() + return True + + +def check_session_health(session_id): + """Check if FFmpeg processes are still alive.""" + session = live_sessions.get(session_id) + if not session or session["status"] != "running": + return + + all_dead = True + for proc in session["processes"]: + if proc.poll() is None: + all_dead = False + + if all_dead: + # Try to read stderr from first dead process for error info + err_msg = "" + for proc in session["processes"]: + if proc.stderr: + try: + stderr_bytes = proc.stderr.read() + if stderr_bytes: + err_msg = stderr_bytes.decode("utf-8", errors="replace")[-500:] + break + except Exception: + pass + session["status"] = "error" + session["error"] = err_msg or "All FFmpeg processes exited unexpectedly" + session["stopped_at"] = time.time() + + +def watchdog_loop(): + """Background thread: check session health, clean stale dirs.""" + while True: + time.sleep(30) + try: + for sid in list(live_sessions.keys()): + session = live_sessions.get(sid) + if not session: + continue + + if session["status"] == "running": + check_session_health(sid) + + # Clean up stale sessions (stopped/error for > 5 min) + if session["status"] in ("stopped", "error") and session.get("stopped_at"): + if time.time() - session["stopped_at"] > STALE_SESSION_TIMEOUT: + session_dir = os.path.join(app.config["LIVE_SPLIT_FOLDER"], sid) + if os.path.exists(session_dir): + shutil.rmtree(session_dir, ignore_errors=True) + del live_sessions[sid] + except Exception as e: + print(f"[watchdog] error: {e}") + + +# Start watchdog thread +_watchdog = threading.Thread(target=watchdog_loop, daemon=True) +_watchdog.start() + + +# ── Live-Split API Endpoints ── + +@app.route("/live-split", methods=["POST"]) +def live_split_start(): + data = request.get_json() + if not data or not data.get("stream_url"): + return jsonify({"error": "stream_url required"}), 400 + + stream_url = data["stream_url"] + num_views = int(data.get("num_views", 4)) + h_fov = float(data.get("h_fov", 360.0 / num_views)) + v_fov = float(data.get("v_fov", 90)) + overlap = float(data.get("overlap", 0)) + output_res = data.get("output_res", "").strip() + + try: + session_id = start_live_split_session( + stream_url, num_views, h_fov, v_fov, overlap, output_res + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + session = live_sessions[session_id] + hls_base = f"/live-split/hls/{session_id}" + return jsonify({ + "session_id": session_id, + "status": session["status"], + "hls_base_url": hls_base, + "views": [ + { + "index": v["index"], + "label": v["label"], + "yaw": v["yaw"], + "hls_url": f"{hls_base}/{v['hls_path']}", + } + for v in session["views"] + ], + }) + + +@app.route("/live-split/status/") +def live_split_status(session_id): + session = live_sessions.get(session_id) + if not session: + return jsonify({"error": "Session not found"}), 404 + + # Check health on each status call + if session["status"] == "running": + check_session_health(session_id) + + uptime = time.time() - session["started_at"] + views_status = [] + for i, v in enumerate(session["views"]): + proc = session["processes"][i] + views_status.append({ + "index": v["index"], + "label": v["label"], + "yaw": v["yaw"], + "hls_url": f"/live-split/hls/{session_id}/{v['hls_path']}", + "alive": proc.poll() is None, + }) + + return jsonify({ + "session_id": session_id, + "status": session["status"], + "uptime_seconds": round(uptime, 1), + "views": views_status, + "error": session.get("error"), + }) + + +@app.route("/live-split/stop/", methods=["POST"]) +def live_split_stop(session_id): + session = live_sessions.get(session_id) + if not session: + return jsonify({"error": "Session not found"}), 404 + + if session["status"] == "running": + stop_live_session(session_id) + + duration = (session.get("stopped_at") or time.time()) - session["started_at"] + return jsonify({ + "session_id": session_id, + "status": "stopped", + "duration_seconds": round(duration, 1), + }) + + +@app.route("/live-split/hls//") +def live_split_hls(session_id, subpath): + session_dir = os.path.join(app.config["LIVE_SPLIT_FOLDER"], session_id) + if not os.path.isdir(session_dir): + return jsonify({"error": "Session not found"}), 404 + + # Determine content type + content_type = "application/vnd.apple.mpegurl" + if subpath.endswith(".ts"): + content_type = "video/mp2t" + + response = send_from_directory(session_dir, subpath) + response.headers["Access-Control-Allow-Origin"] = "*" + response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS" + response.headers["Access-Control-Allow-Headers"] = "Range" + response.headers["Cache-Control"] = "no-cache, no-store" + response.headers["Content-Type"] = content_type + return response + + if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=False)