"""Video download service using yt-dlp.""" import os import re import shutil import logging import tempfile from dataclasses import dataclass from typing import Optional import yt_dlp from app.config import settings logger = logging.getLogger(__name__) COOKIES_FILE = settings.ytdlp_cookies_file @dataclass class VideoInfo: title: str duration: float video_path: str video_id: str def extract_video_id(url: str) -> Optional[str]: patterns = [ r"(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})", r"youtube\.com/shorts/([a-zA-Z0-9_-]{11})", ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None def _base_opts() -> dict: opts = {"quiet": True, "no_warnings": True} if COOKIES_FILE and os.path.exists(COOKIES_FILE): # Copy cookies to a temp file so yt-dlp doesn't overwrite the original tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".txt") shutil.copy2(COOKIES_FILE, tmp.name) tmp.close() opts["cookiefile"] = tmp.name # Enable remote EJS challenge solver for YouTube opts["extractor_args"] = {"youtube": {"player_client": ["default", "web_creator"]}} return opts async def get_video_metadata(url: str) -> dict: """Get video metadata without downloading.""" opts = _base_opts() opts["extract_flat"] = False with yt_dlp.YoutubeDL(opts) as ydl: info = ydl.extract_info(url, download=False) return { "title": info.get("title", "Unknown"), "duration": info.get("duration", 0), "video_id": info.get("id", ""), } async def download_video(url: str, output_dir: str) -> VideoInfo: """Download video from YouTube URL. Downloads video+audio for clip extraction.""" os.makedirs(output_dir, exist_ok=True) video_id = extract_video_id(url) or "video" output_template = os.path.join(output_dir, f"{video_id}.%(ext)s") opts = _base_opts() opts.update({ # Download best video+audio merged to mp4 "format": "bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]/best[height<=720][ext=mp4]/best", "merge_output_format": "mp4", "outtmpl": output_template, }) with yt_dlp.YoutubeDL(opts) as ydl: info = ydl.extract_info(url, download=True) video_path = os.path.join(output_dir, f"{video_id}.mp4") if not os.path.exists(video_path): # Find whatever file was downloaded for f in os.listdir(output_dir): if f.startswith(video_id) and not f.endswith(".part"): video_path = os.path.join(output_dir, f) break duration = info.get("duration", 0) if duration > settings.max_video_duration: raise ValueError( f"Video is {duration}s, max is {settings.max_video_duration}s" ) logger.info(f"Downloaded: {info.get('title')} ({duration}s) -> {video_path}") return VideoInfo( title=info.get("title", "Unknown"), duration=duration, video_path=video_path, video_id=video_id, ) async def extract_audio(video_path: str, output_path: str) -> str: """Extract audio from video file for transcription.""" import asyncio proc = await asyncio.create_subprocess_exec( "ffmpeg", "-i", video_path, "-vn", "-acodec", "libmp3lame", "-q:a", "4", "-y", output_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) _, stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"FFmpeg audio extraction failed: {stderr.decode()}") logger.info(f"Extracted audio: {output_path}") return output_path