import os import subprocess import uuid import shutil from pathlib import Path from flask import Flask, render_template, request, jsonify, send_from_directory app = Flask(__name__) app.config["UPLOAD_FOLDER"] = "/data/uploads" app.config["OUTPUT_FOLDER"] = "/data/output" app.config["MAX_CONTENT_LENGTH"] = 10 * 1024 * 1024 * 1024 # 10GB os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True) os.makedirs(app.config["OUTPUT_FOLDER"], exist_ok=True) # Track job status in memory jobs = {} def get_video_info(filepath): """Get video resolution and duration using ffprobe.""" result = subprocess.run( [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", filepath ], capture_output=True, text=True ) import json info = json.loads(result.stdout) video_stream = next( (s for s in info.get("streams", []) if s["codec_type"] == "video"), {} ) return { "width": int(video_stream.get("width", 0)), "height": int(video_stream.get("height", 0)), "duration": float(info.get("format", {}).get("duration", 0)), "codec": video_stream.get("codec_name", "unknown"), } def split_video(job_id, input_path, num_views, h_fov, v_fov, output_res, overlap): """Run the actual FFmpeg splitting in a subprocess-friendly way.""" try: jobs[job_id]["status"] = "processing" job_output = os.path.join(app.config["OUTPUT_FOLDER"], job_id) os.makedirs(job_output, exist_ok=True) effective_fov = h_fov + overlap step = 360 / num_views input_info = get_video_info(input_path) jobs[job_id]["input_info"] = input_info processes = [] output_files = [] for i in range(num_views): yaw = i * step label = get_direction_label(yaw, num_views) output_file = os.path.join(job_output, f"{label}_{int(yaw)}deg.mp4") output_files.append(output_file) vf = f"v360=equirect:flat:h_fov={effective_fov}:v_fov={v_fov}:yaw={yaw}" if output_res: vf += f",scale={output_res}" cmd = [ "ffmpeg", "-y", "-i", input_path, "-vf", vf, "-c:v", "libx264", "-crf", "18", "-preset", "medium", "-c:a", "aac", "-b:a", "192k", output_file ] processes.append((label, yaw, cmd, output_file)) total = len(processes) for idx, (label, yaw, cmd, output_file) in enumerate(processes): jobs[job_id]["current_view"] = f"{label} ({int(yaw)}°)" jobs[job_id]["progress"] = int((idx / total) * 100) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: jobs[job_id]["status"] = "error" jobs[job_id]["error"] = f"FFmpeg error on {label}: {result.stderr[-500:]}" return jobs[job_id]["status"] = "complete" jobs[job_id]["progress"] = 100 jobs[job_id]["output_files"] = [ os.path.basename(f) for f in output_files ] except Exception as e: jobs[job_id]["status"] = "error" jobs[job_id]["error"] = str(e) def get_direction_label(yaw, num_views): """Get a human-readable label for a yaw angle.""" if num_views == 4: labels = {0: "front", 90: "right", 180: "back", 270: "left"} return labels.get(int(yaw), f"view_{int(yaw)}") elif num_views == 3: labels = {0: "front", 120: "right", 240: "left"} return labels.get(int(yaw), f"view_{int(yaw)}") else: return f"view_{int(yaw)}" @app.route("/") def index(): return render_template("index.html") @app.route("/upload", methods=["POST"]) def upload(): if "video" not in request.files: return jsonify({"error": "No video file provided"}), 400 file = request.files["video"] if file.filename == "": return jsonify({"error": "No file selected"}), 400 job_id = str(uuid.uuid4())[:8] ext = Path(file.filename).suffix or ".mp4" input_path = os.path.join(app.config["UPLOAD_FOLDER"], f"{job_id}{ext}") file.save(input_path) num_views = int(request.form.get("num_views", 4)) h_fov = float(request.form.get("h_fov", 360 / num_views)) v_fov = float(request.form.get("v_fov", 90)) overlap = float(request.form.get("overlap", 0)) output_res = request.form.get("output_res", "").strip() jobs[job_id] = { "status": "queued", "progress": 0, "input_file": file.filename, "num_views": num_views, "current_view": "", "output_files": [], } # Run in a background thread import threading t = threading.Thread( target=split_video, args=(job_id, input_path, num_views, h_fov, v_fov, output_res, overlap) ) t.daemon = True t.start() return jsonify({"job_id": job_id}) @app.route("/status/") def status(job_id): if job_id not in jobs: return jsonify({"error": "Job not found"}), 404 return jsonify(jobs[job_id]) @app.route("/download//") def download(job_id, filename): job_dir = os.path.join(app.config["OUTPUT_FOLDER"], job_id) return send_from_directory(job_dir, filename, as_attachment=True) @app.route("/cleanup/", methods=["POST"]) def cleanup(job_id): """Delete uploaded and output files for a job.""" job_dir = os.path.join(app.config["OUTPUT_FOLDER"], job_id) if os.path.exists(job_dir): shutil.rmtree(job_dir) # Remove upload file for f in Path(app.config["UPLOAD_FOLDER"]).glob(f"{job_id}.*"): f.unlink() if job_id in jobs: del jobs[job_id] return jsonify({"status": "cleaned"}) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=False)