video360-splitter/app.py

452 lines
14 KiB
Python

import os
import subprocess
import uuid
import shutil
import time
import signal
import threading
from pathlib import Path
from flask import Flask, render_template, request, jsonify, send_from_directory
app = Flask(__name__)
app.config["UPLOAD_FOLDER"] = "/data/uploads"
app.config["OUTPUT_FOLDER"] = "/data/output"
app.config["LIVE_SPLIT_FOLDER"] = "/data/live-split"
app.config["MAX_CONTENT_LENGTH"] = 10 * 1024 * 1024 * 1024 # 10GB
os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True)
os.makedirs(app.config["OUTPUT_FOLDER"], exist_ok=True)
os.makedirs(app.config["LIVE_SPLIT_FOLDER"], exist_ok=True)
# Track job status in memory (file-based splits)
jobs = {}
# Track live-split sessions
live_sessions = {}
def get_video_info(filepath):
"""Get video resolution and duration using ffprobe."""
result = subprocess.run(
[
"ffprobe", "-v", "quiet", "-print_format", "json",
"-show_format", "-show_streams", filepath
],
capture_output=True, text=True
)
import json
info = json.loads(result.stdout)
video_stream = next(
(s for s in info.get("streams", []) if s["codec_type"] == "video"), {}
)
return {
"width": int(video_stream.get("width", 0)),
"height": int(video_stream.get("height", 0)),
"duration": float(info.get("format", {}).get("duration", 0)),
"codec": video_stream.get("codec_name", "unknown"),
}
def split_video(job_id, input_path, num_views, h_fov, v_fov, output_res, overlap):
"""Run the actual FFmpeg splitting in a subprocess-friendly way."""
try:
jobs[job_id]["status"] = "processing"
job_output = os.path.join(app.config["OUTPUT_FOLDER"], job_id)
os.makedirs(job_output, exist_ok=True)
effective_fov = h_fov + overlap
step = 360 / num_views
input_info = get_video_info(input_path)
jobs[job_id]["input_info"] = input_info
processes = []
output_files = []
for i in range(num_views):
yaw = i * step
label = get_direction_label(yaw, num_views)
output_file = os.path.join(job_output, f"{label}_{int(yaw)}deg.mp4")
output_files.append(output_file)
vf = f"v360=equirect:flat:h_fov={effective_fov}:v_fov={v_fov}:yaw={yaw}"
if output_res:
vf += f",scale={output_res}"
cmd = [
"ffmpeg", "-y", "-i", input_path,
"-vf", vf,
"-c:v", "libx264", "-crf", "18", "-preset", "medium",
"-c:a", "aac", "-b:a", "192k",
output_file
]
processes.append((label, yaw, cmd, output_file))
total = len(processes)
for idx, (label, yaw, cmd, output_file) in enumerate(processes):
jobs[job_id]["current_view"] = f"{label} ({int(yaw)}°)"
jobs[job_id]["progress"] = int((idx / total) * 100)
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
jobs[job_id]["status"] = "error"
jobs[job_id]["error"] = f"FFmpeg error on {label}: {result.stderr[-500:]}"
return
jobs[job_id]["status"] = "complete"
jobs[job_id]["progress"] = 100
jobs[job_id]["output_files"] = [
os.path.basename(f) for f in output_files
]
except Exception as e:
jobs[job_id]["status"] = "error"
jobs[job_id]["error"] = str(e)
def get_direction_label(yaw, num_views):
"""Get a human-readable label for a yaw angle."""
if num_views == 4:
labels = {0: "front", 90: "right", 180: "back", 270: "left"}
return labels.get(int(yaw), f"view_{int(yaw)}")
elif num_views == 3:
labels = {0: "front", 120: "right", 240: "left"}
return labels.get(int(yaw), f"view_{int(yaw)}")
else:
return f"view_{int(yaw)}"
@app.route("/")
def index():
return render_template("index.html")
@app.route("/upload", methods=["POST"])
def upload():
if "video" not in request.files:
return jsonify({"error": "No video file provided"}), 400
file = request.files["video"]
if file.filename == "":
return jsonify({"error": "No file selected"}), 400
job_id = str(uuid.uuid4())[:8]
ext = Path(file.filename).suffix or ".mp4"
input_path = os.path.join(app.config["UPLOAD_FOLDER"], f"{job_id}{ext}")
file.save(input_path)
num_views = int(request.form.get("num_views", 4))
h_fov = float(request.form.get("h_fov", 360 / num_views))
v_fov = float(request.form.get("v_fov", 90))
overlap = float(request.form.get("overlap", 0))
output_res = request.form.get("output_res", "").strip()
jobs[job_id] = {
"status": "queued",
"progress": 0,
"input_file": file.filename,
"num_views": num_views,
"current_view": "",
"output_files": [],
}
# Run in a background thread
import threading
t = threading.Thread(
target=split_video,
args=(job_id, input_path, num_views, h_fov, v_fov, output_res, overlap)
)
t.daemon = True
t.start()
return jsonify({"job_id": job_id})
@app.route("/status/<job_id>")
def status(job_id):
if job_id not in jobs:
return jsonify({"error": "Job not found"}), 404
return jsonify(jobs[job_id])
@app.route("/download/<job_id>/<filename>")
def download(job_id, filename):
job_dir = os.path.join(app.config["OUTPUT_FOLDER"], job_id)
return send_from_directory(job_dir, filename, as_attachment=True)
@app.route("/cleanup/<job_id>", methods=["POST"])
def cleanup(job_id):
"""Delete uploaded and output files for a job."""
job_dir = os.path.join(app.config["OUTPUT_FOLDER"], job_id)
if os.path.exists(job_dir):
shutil.rmtree(job_dir)
# Remove upload file
for f in Path(app.config["UPLOAD_FOLDER"]).glob(f"{job_id}.*"):
f.unlink()
if job_id in jobs:
del jobs[job_id]
return jsonify({"status": "cleaned"})
# ── Live-Split Session Management ──
STALE_SESSION_TIMEOUT = 300 # 5 min after all processes die
def start_live_split_session(stream_url, num_views, h_fov, v_fov, overlap, output_res):
"""Spawn N FFmpeg processes, each reading RTMP and outputting HLS for one view."""
session_id = "ls-" + str(uuid.uuid4())[:8]
session_dir = os.path.join(app.config["LIVE_SPLIT_FOLDER"], session_id)
effective_fov = h_fov + overlap
step = 360.0 / num_views
views = []
processes = []
for i in range(num_views):
yaw = i * step
label = get_direction_label(yaw, num_views)
view_dir = os.path.join(session_dir, f"view_{i}")
os.makedirs(view_dir, exist_ok=True)
vf = f"v360=equirect:flat:h_fov={effective_fov}:v_fov={v_fov}:yaw={yaw}"
if output_res:
vf += f",scale={output_res}"
cmd = [
"ffmpeg", "-re", "-i", stream_url,
"-vf", vf,
"-c:v", "libx264", "-preset", "veryfast", "-tune", "zerolatency",
"-g", "30", "-sc_threshold", "0",
"-c:a", "aac", "-b:a", "128k",
"-f", "hls", "-hls_time", "2", "-hls_list_size", "10",
"-hls_flags", "delete_segments+append_list",
os.path.join(view_dir, "stream.m3u8"),
]
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
processes.append(proc)
views.append({
"index": i,
"label": label,
"yaw": yaw,
"hls_path": f"view_{i}/stream.m3u8",
})
live_sessions[session_id] = {
"session_id": session_id,
"status": "running",
"started_at": time.time(),
"stream_url": stream_url,
"num_views": num_views,
"views": views,
"processes": processes,
"stopped_at": None,
"error": None,
}
return session_id
def stop_live_session(session_id):
"""SIGTERM all FFmpeg processes, wait, then SIGKILL survivors."""
session = live_sessions.get(session_id)
if not session:
return False
for proc in session["processes"]:
if proc.poll() is None:
try:
proc.send_signal(signal.SIGTERM)
except OSError:
pass
# Wait up to 5s for graceful shutdown
deadline = time.time() + 5
for proc in session["processes"]:
remaining = max(0, deadline - time.time())
try:
proc.wait(timeout=remaining)
except subprocess.TimeoutExpired:
try:
proc.kill()
except OSError:
pass
session["status"] = "stopped"
session["stopped_at"] = time.time()
return True
def check_session_health(session_id):
"""Check if FFmpeg processes are still alive."""
session = live_sessions.get(session_id)
if not session or session["status"] != "running":
return
all_dead = True
for proc in session["processes"]:
if proc.poll() is None:
all_dead = False
if all_dead:
# Try to read stderr from first dead process for error info
err_msg = ""
for proc in session["processes"]:
if proc.stderr:
try:
stderr_bytes = proc.stderr.read()
if stderr_bytes:
err_msg = stderr_bytes.decode("utf-8", errors="replace")[-500:]
break
except Exception:
pass
session["status"] = "error"
session["error"] = err_msg or "All FFmpeg processes exited unexpectedly"
session["stopped_at"] = time.time()
def watchdog_loop():
"""Background thread: check session health, clean stale dirs."""
while True:
time.sleep(30)
try:
for sid in list(live_sessions.keys()):
session = live_sessions.get(sid)
if not session:
continue
if session["status"] == "running":
check_session_health(sid)
# Clean up stale sessions (stopped/error for > 5 min)
if session["status"] in ("stopped", "error") and session.get("stopped_at"):
if time.time() - session["stopped_at"] > STALE_SESSION_TIMEOUT:
session_dir = os.path.join(app.config["LIVE_SPLIT_FOLDER"], sid)
if os.path.exists(session_dir):
shutil.rmtree(session_dir, ignore_errors=True)
del live_sessions[sid]
except Exception as e:
print(f"[watchdog] error: {e}")
# Start watchdog thread
_watchdog = threading.Thread(target=watchdog_loop, daemon=True)
_watchdog.start()
# ── Live-Split API Endpoints ──
@app.route("/live-split", methods=["POST"])
def live_split_start():
data = request.get_json()
if not data or not data.get("stream_url"):
return jsonify({"error": "stream_url required"}), 400
stream_url = data["stream_url"]
num_views = int(data.get("num_views", 4))
h_fov = float(data.get("h_fov", 360.0 / num_views))
v_fov = float(data.get("v_fov", 90))
overlap = float(data.get("overlap", 0))
output_res = data.get("output_res", "").strip()
try:
session_id = start_live_split_session(
stream_url, num_views, h_fov, v_fov, overlap, output_res
)
except Exception as e:
return jsonify({"error": str(e)}), 500
session = live_sessions[session_id]
hls_base = f"/live-split/hls/{session_id}"
return jsonify({
"session_id": session_id,
"status": session["status"],
"hls_base_url": hls_base,
"views": [
{
"index": v["index"],
"label": v["label"],
"yaw": v["yaw"],
"hls_url": f"{hls_base}/{v['hls_path']}",
}
for v in session["views"]
],
})
@app.route("/live-split/status/<session_id>")
def live_split_status(session_id):
session = live_sessions.get(session_id)
if not session:
return jsonify({"error": "Session not found"}), 404
# Check health on each status call
if session["status"] == "running":
check_session_health(session_id)
uptime = time.time() - session["started_at"]
views_status = []
for i, v in enumerate(session["views"]):
proc = session["processes"][i]
views_status.append({
"index": v["index"],
"label": v["label"],
"yaw": v["yaw"],
"hls_url": f"/live-split/hls/{session_id}/{v['hls_path']}",
"alive": proc.poll() is None,
})
return jsonify({
"session_id": session_id,
"status": session["status"],
"uptime_seconds": round(uptime, 1),
"views": views_status,
"error": session.get("error"),
})
@app.route("/live-split/stop/<session_id>", methods=["POST"])
def live_split_stop(session_id):
session = live_sessions.get(session_id)
if not session:
return jsonify({"error": "Session not found"}), 404
if session["status"] == "running":
stop_live_session(session_id)
duration = (session.get("stopped_at") or time.time()) - session["started_at"]
return jsonify({
"session_id": session_id,
"status": "stopped",
"duration_seconds": round(duration, 1),
})
@app.route("/live-split/hls/<session_id>/<path:subpath>")
def live_split_hls(session_id, subpath):
session_dir = os.path.join(app.config["LIVE_SPLIT_FOLDER"], session_id)
if not os.path.isdir(session_dir):
return jsonify({"error": "Session not found"}), 404
# Determine content type
content_type = "application/vnd.apple.mpegurl"
if subpath.endswith(".ts"):
content_type = "video/mp2t"
response = send_from_directory(session_dir, subpath)
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "Range"
response.headers["Cache-Control"] = "no-cache, no-store"
response.headers["Content-Type"] = content_type
return response
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False)