126 lines
3.8 KiB
Python
126 lines
3.8 KiB
Python
"""Video download service using yt-dlp."""
|
|
|
|
import os
|
|
import re
|
|
import shutil
|
|
import logging
|
|
import tempfile
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
import yt_dlp
|
|
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
COOKIES_FILE = settings.ytdlp_cookies_file
|
|
|
|
|
|
@dataclass
|
|
class VideoInfo:
|
|
title: str
|
|
duration: float
|
|
video_path: str
|
|
video_id: str
|
|
|
|
|
|
def extract_video_id(url: str) -> Optional[str]:
|
|
patterns = [
|
|
r"(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})",
|
|
r"youtube\.com/shorts/([a-zA-Z0-9_-]{11})",
|
|
]
|
|
for pattern in patterns:
|
|
match = re.search(pattern, url)
|
|
if match:
|
|
return match.group(1)
|
|
return None
|
|
|
|
|
|
def _base_opts() -> dict:
|
|
opts = {"quiet": True, "no_warnings": True}
|
|
if COOKIES_FILE and os.path.exists(COOKIES_FILE):
|
|
# Copy cookies to a temp file so yt-dlp doesn't overwrite the original
|
|
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".txt")
|
|
shutil.copy2(COOKIES_FILE, tmp.name)
|
|
tmp.close()
|
|
opts["cookiefile"] = tmp.name
|
|
# Enable remote EJS challenge solver for YouTube
|
|
opts["extractor_args"] = {"youtube": {"player_client": ["default", "web_creator"]}}
|
|
return opts
|
|
|
|
|
|
async def get_video_metadata(url: str) -> dict:
|
|
"""Get video metadata without downloading."""
|
|
opts = _base_opts()
|
|
opts["extract_flat"] = False
|
|
|
|
with yt_dlp.YoutubeDL(opts) as ydl:
|
|
info = ydl.extract_info(url, download=False)
|
|
return {
|
|
"title": info.get("title", "Unknown"),
|
|
"duration": info.get("duration", 0),
|
|
"video_id": info.get("id", ""),
|
|
}
|
|
|
|
|
|
async def download_video(url: str, output_dir: str) -> VideoInfo:
|
|
"""Download video from YouTube URL. Downloads video+audio for clip extraction."""
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
video_id = extract_video_id(url) or "video"
|
|
output_template = os.path.join(output_dir, f"{video_id}.%(ext)s")
|
|
|
|
opts = _base_opts()
|
|
opts.update({
|
|
# Download best video+audio merged to mp4
|
|
"format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best",
|
|
"merge_output_format": "mp4",
|
|
"outtmpl": output_template,
|
|
})
|
|
|
|
with yt_dlp.YoutubeDL(opts) as ydl:
|
|
info = ydl.extract_info(url, download=True)
|
|
|
|
video_path = os.path.join(output_dir, f"{video_id}.mp4")
|
|
if not os.path.exists(video_path):
|
|
# Find whatever file was downloaded
|
|
for f in os.listdir(output_dir):
|
|
if f.startswith(video_id) and not f.endswith(".part"):
|
|
video_path = os.path.join(output_dir, f)
|
|
break
|
|
|
|
duration = info.get("duration", 0)
|
|
if duration > settings.max_video_duration:
|
|
raise ValueError(
|
|
f"Video is {duration}s, max is {settings.max_video_duration}s"
|
|
)
|
|
|
|
logger.info(f"Downloaded: {info.get('title')} ({duration}s) -> {video_path}")
|
|
return VideoInfo(
|
|
title=info.get("title", "Unknown"),
|
|
duration=duration,
|
|
video_path=video_path,
|
|
video_id=video_id,
|
|
)
|
|
|
|
|
|
async def extract_audio(video_path: str, output_path: str) -> str:
|
|
"""Extract audio from video file for transcription."""
|
|
import asyncio
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"ffmpeg", "-i", video_path,
|
|
"-vn", "-acodec", "libmp3lame", "-q:a", "4",
|
|
"-y", output_path,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
_, stderr = await proc.communicate()
|
|
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(f"FFmpeg audio extraction failed: {stderr.decode()}")
|
|
|
|
logger.info(f"Extracted audio: {output_path}")
|
|
return output_path
|