import os import subprocess import uuid import shutil import time import signal import threading from pathlib import Path from flask import Flask, render_template, request, jsonify, send_from_directory app = Flask(__name__) app.config["UPLOAD_FOLDER"] = "/data/uploads" app.config["OUTPUT_FOLDER"] = "/data/output" app.config["LIVE_SPLIT_FOLDER"] = "/data/live-split" app.config["MAX_CONTENT_LENGTH"] = 10 * 1024 * 1024 * 1024 # 10GB os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True) os.makedirs(app.config["OUTPUT_FOLDER"], exist_ok=True) os.makedirs(app.config["LIVE_SPLIT_FOLDER"], exist_ok=True) # Track job status in memory (file-based splits) jobs = {} # Track live-split sessions live_sessions = {} def get_video_info(filepath): """Get video resolution and duration using ffprobe.""" result = subprocess.run( [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", filepath ], capture_output=True, text=True ) import json info = json.loads(result.stdout) video_stream = next( (s for s in info.get("streams", []) if s["codec_type"] == "video"), {} ) return { "width": int(video_stream.get("width", 0)), "height": int(video_stream.get("height", 0)), "duration": float(info.get("format", {}).get("duration", 0)), "codec": video_stream.get("codec_name", "unknown"), } def split_video(job_id, input_path, num_views, h_fov, v_fov, output_res, overlap): """Run the actual FFmpeg splitting in a subprocess-friendly way.""" try: jobs[job_id]["status"] = "processing" job_output = os.path.join(app.config["OUTPUT_FOLDER"], job_id) os.makedirs(job_output, exist_ok=True) effective_fov = h_fov + overlap step = 360 / num_views input_info = get_video_info(input_path) jobs[job_id]["input_info"] = input_info processes = [] output_files = [] for i in range(num_views): yaw = i * step label = get_direction_label(yaw, num_views) output_file = os.path.join(job_output, f"{label}_{int(yaw)}deg.mp4") output_files.append(output_file) vf = f"v360=equirect:flat:h_fov={effective_fov}:v_fov={v_fov}:yaw={yaw}" if output_res: vf += f",scale={output_res}" cmd = [ "ffmpeg", "-y", "-i", input_path, "-vf", vf, "-c:v", "libx264", "-crf", "18", "-preset", "medium", "-c:a", "aac", "-b:a", "192k", output_file ] processes.append((label, yaw, cmd, output_file)) total = len(processes) for idx, (label, yaw, cmd, output_file) in enumerate(processes): jobs[job_id]["current_view"] = f"{label} ({int(yaw)}°)" jobs[job_id]["progress"] = int((idx / total) * 100) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: jobs[job_id]["status"] = "error" jobs[job_id]["error"] = f"FFmpeg error on {label}: {result.stderr[-500:]}" return jobs[job_id]["status"] = "complete" jobs[job_id]["progress"] = 100 jobs[job_id]["output_files"] = [ os.path.basename(f) for f in output_files ] except Exception as e: jobs[job_id]["status"] = "error" jobs[job_id]["error"] = str(e) def get_direction_label(yaw, num_views): """Get a human-readable label for a yaw angle.""" if num_views == 4: labels = {0: "front", 90: "right", 180: "back", 270: "left"} return labels.get(int(yaw), f"view_{int(yaw)}") elif num_views == 3: labels = {0: "front", 120: "right", 240: "left"} return labels.get(int(yaw), f"view_{int(yaw)}") else: return f"view_{int(yaw)}" @app.route("/") def index(): return render_template("index.html") @app.route("/upload", methods=["POST"]) def upload(): if "video" not in request.files: return jsonify({"error": "No video file provided"}), 400 file = request.files["video"] if file.filename == "": return jsonify({"error": "No file selected"}), 400 job_id = str(uuid.uuid4())[:8] ext = Path(file.filename).suffix or ".mp4" input_path = os.path.join(app.config["UPLOAD_FOLDER"], f"{job_id}{ext}") file.save(input_path) num_views = int(request.form.get("num_views", 4)) h_fov = float(request.form.get("h_fov", 360 / num_views)) v_fov = float(request.form.get("v_fov", 90)) overlap = float(request.form.get("overlap", 0)) output_res = request.form.get("output_res", "").strip() jobs[job_id] = { "status": "queued", "progress": 0, "input_file": file.filename, "num_views": num_views, "current_view": "", "output_files": [], } # Run in a background thread import threading t = threading.Thread( target=split_video, args=(job_id, input_path, num_views, h_fov, v_fov, output_res, overlap) ) t.daemon = True t.start() return jsonify({"job_id": job_id}) @app.route("/status/") def status(job_id): if job_id not in jobs: return jsonify({"error": "Job not found"}), 404 return jsonify(jobs[job_id]) @app.route("/download//") def download(job_id, filename): job_dir = os.path.join(app.config["OUTPUT_FOLDER"], job_id) return send_from_directory(job_dir, filename, as_attachment=True) @app.route("/cleanup/", methods=["POST"]) def cleanup(job_id): """Delete uploaded and output files for a job.""" job_dir = os.path.join(app.config["OUTPUT_FOLDER"], job_id) if os.path.exists(job_dir): shutil.rmtree(job_dir) # Remove upload file for f in Path(app.config["UPLOAD_FOLDER"]).glob(f"{job_id}.*"): f.unlink() if job_id in jobs: del jobs[job_id] return jsonify({"status": "cleaned"}) # ── Live-Split Session Management ── STALE_SESSION_TIMEOUT = 300 # 5 min after all processes die def start_live_split_session(stream_url, num_views, h_fov, v_fov, overlap, output_res): """Spawn N FFmpeg processes, each reading RTMP and outputting HLS for one view.""" session_id = "ls-" + str(uuid.uuid4())[:8] session_dir = os.path.join(app.config["LIVE_SPLIT_FOLDER"], session_id) effective_fov = h_fov + overlap step = 360.0 / num_views views = [] processes = [] for i in range(num_views): yaw = i * step label = get_direction_label(yaw, num_views) view_dir = os.path.join(session_dir, f"view_{i}") os.makedirs(view_dir, exist_ok=True) vf = f"v360=equirect:flat:h_fov={effective_fov}:v_fov={v_fov}:yaw={yaw}" if output_res: vf += f",scale={output_res}" cmd = [ "ffmpeg", "-re", "-i", stream_url, "-vf", vf, "-c:v", "libx264", "-preset", "veryfast", "-tune", "zerolatency", "-g", "30", "-sc_threshold", "0", "-c:a", "aac", "-b:a", "128k", "-f", "hls", "-hls_time", "2", "-hls_list_size", "10", "-hls_flags", "delete_segments+append_list", os.path.join(view_dir, "stream.m3u8"), ] proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) processes.append(proc) views.append({ "index": i, "label": label, "yaw": yaw, "hls_path": f"view_{i}/stream.m3u8", }) live_sessions[session_id] = { "session_id": session_id, "status": "running", "started_at": time.time(), "stream_url": stream_url, "num_views": num_views, "views": views, "processes": processes, "stopped_at": None, "error": None, } return session_id def stop_live_session(session_id): """SIGTERM all FFmpeg processes, wait, then SIGKILL survivors.""" session = live_sessions.get(session_id) if not session: return False for proc in session["processes"]: if proc.poll() is None: try: proc.send_signal(signal.SIGTERM) except OSError: pass # Wait up to 5s for graceful shutdown deadline = time.time() + 5 for proc in session["processes"]: remaining = max(0, deadline - time.time()) try: proc.wait(timeout=remaining) except subprocess.TimeoutExpired: try: proc.kill() except OSError: pass session["status"] = "stopped" session["stopped_at"] = time.time() return True def check_session_health(session_id): """Check if FFmpeg processes are still alive.""" session = live_sessions.get(session_id) if not session or session["status"] != "running": return all_dead = True for proc in session["processes"]: if proc.poll() is None: all_dead = False if all_dead: # Try to read stderr from first dead process for error info err_msg = "" for proc in session["processes"]: if proc.stderr: try: stderr_bytes = proc.stderr.read() if stderr_bytes: err_msg = stderr_bytes.decode("utf-8", errors="replace")[-500:] break except Exception: pass session["status"] = "error" session["error"] = err_msg or "All FFmpeg processes exited unexpectedly" session["stopped_at"] = time.time() def watchdog_loop(): """Background thread: check session health, clean stale dirs.""" while True: time.sleep(30) try: for sid in list(live_sessions.keys()): session = live_sessions.get(sid) if not session: continue if session["status"] == "running": check_session_health(sid) # Clean up stale sessions (stopped/error for > 5 min) if session["status"] in ("stopped", "error") and session.get("stopped_at"): if time.time() - session["stopped_at"] > STALE_SESSION_TIMEOUT: session_dir = os.path.join(app.config["LIVE_SPLIT_FOLDER"], sid) if os.path.exists(session_dir): shutil.rmtree(session_dir, ignore_errors=True) del live_sessions[sid] except Exception as e: print(f"[watchdog] error: {e}") # Start watchdog thread _watchdog = threading.Thread(target=watchdog_loop, daemon=True) _watchdog.start() # ── Live-Split API Endpoints ── @app.route("/live-split", methods=["POST"]) def live_split_start(): data = request.get_json() if not data or not data.get("stream_url"): return jsonify({"error": "stream_url required"}), 400 stream_url = data["stream_url"] num_views = int(data.get("num_views", 4)) h_fov = float(data.get("h_fov", 360.0 / num_views)) v_fov = float(data.get("v_fov", 90)) overlap = float(data.get("overlap", 0)) output_res = data.get("output_res", "").strip() try: session_id = start_live_split_session( stream_url, num_views, h_fov, v_fov, overlap, output_res ) except Exception as e: return jsonify({"error": str(e)}), 500 session = live_sessions[session_id] hls_base = f"/live-split/hls/{session_id}" return jsonify({ "session_id": session_id, "status": session["status"], "hls_base_url": hls_base, "views": [ { "index": v["index"], "label": v["label"], "yaw": v["yaw"], "hls_url": f"{hls_base}/{v['hls_path']}", } for v in session["views"] ], }) @app.route("/live-split/status/") def live_split_status(session_id): session = live_sessions.get(session_id) if not session: return jsonify({"error": "Session not found"}), 404 # Check health on each status call if session["status"] == "running": check_session_health(session_id) uptime = time.time() - session["started_at"] views_status = [] for i, v in enumerate(session["views"]): proc = session["processes"][i] views_status.append({ "index": v["index"], "label": v["label"], "yaw": v["yaw"], "hls_url": f"/live-split/hls/{session_id}/{v['hls_path']}", "alive": proc.poll() is None, }) return jsonify({ "session_id": session_id, "status": session["status"], "uptime_seconds": round(uptime, 1), "views": views_status, "error": session.get("error"), }) @app.route("/live-split/stop/", methods=["POST"]) def live_split_stop(session_id): session = live_sessions.get(session_id) if not session: return jsonify({"error": "Session not found"}), 404 if session["status"] == "running": stop_live_session(session_id) duration = (session.get("stopped_at") or time.time()) - session["started_at"] return jsonify({ "session_id": session_id, "status": "stopped", "duration_seconds": round(duration, 1), }) @app.route("/live-split/hls//") def live_split_hls(session_id, subpath): session_dir = os.path.join(app.config["LIVE_SPLIT_FOLDER"], session_id) if not os.path.isdir(session_dir): return jsonify({"error": "Session not found"}), 404 # Determine content type content_type = "application/vnd.apple.mpegurl" if subpath.endswith(".ts"): content_type = "video/mp2t" response = send_from_directory(session_dir, subpath) response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS" response.headers["Access-Control-Allow-Headers"] = "Range" response.headers["Cache-Control"] = "no-cache, no-store" response.headers["Content-Type"] = content_type return response if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=False)