From 4ed909dbc47c48b5d05dcde09e6cb94070306014 Mon Sep 17 00:00:00 2001 From: Jeff Emmett Date: Wed, 26 Nov 2025 20:47:42 -0800 Subject: [PATCH] Initial docling-service: document extraction for AI stack MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - FastAPI service using IBM Docling for document extraction - Supports PDF, DOCX, PPTX, XLSX, HTML, images with OCR - Integrates with AI Orchestrator (Ollama) for summarization - Routes audio to RunPod Whisper for transcription - Optional indexing to Semantic Search service - Docker + Traefik configuration for RS 8000 deployment - Python client library included 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .env.example | 4 + .gitignore | 12 + Dockerfile | 40 +++ client.py | 207 +++++++++++++++ docker-compose.yml | 58 +++++ requirements.txt | 15 ++ server.py | 618 +++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 954 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 client.py create mode 100644 docker-compose.yml create mode 100644 requirements.txt create mode 100644 server.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..72209ad --- /dev/null +++ b/.env.example @@ -0,0 +1,4 @@ +RUNPOD_API_KEY=your_runpod_api_key_here +AI_ORCHESTRATOR_URL=http://ai-orchestrator:8080 +SEMANTIC_SEARCH_URL=http://semantic-search:8000 +RUNPOD_WHISPER_ENDPOINT=lrtisuv8ixbtub diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1f7ab95 --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +__pycache__/ +*.pyc +.env +.venv/ +venv/ +*.egg-info/ +dist/ +build/ +.pytest_cache/ +.coverage +htmlcov/ +*.log diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..3cf9489 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,40 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies for Docling and OCR +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + libgl1-mesa-glx \ + libglib2.0-0 \ + libsm6 \ + libxext6 \ + libxrender-dev \ + libgomp1 \ + poppler-utils \ + tesseract-ocr \ + libtesseract-dev \ + ffmpeg \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements first for layer caching +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Download Docling models at build time (optional, reduces first-run latency) +RUN python -c "from docling.document_converter import DocumentConverter; DocumentConverter()" || true + +# Copy application code +COPY server.py . + +# Create non-root user +RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app +USER appuser + +EXPOSE 8081 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ + CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8081/health')" || exit 1 + +CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8081"] diff --git a/client.py b/client.py new file mode 100644 index 0000000..43f00d1 --- /dev/null +++ b/client.py @@ -0,0 +1,207 @@ +""" +Docling Service Client - Use this to integrate with other services + +Example usage: + from client import DoclingClient + + client = DoclingClient("http://docs.jeffemmett.com") + + # Extract from URL + result = await client.extract_url("https://example.com/doc.pdf") + + # Extract with summarization + result = await client.extract_url( + "https://example.com/doc.pdf", + summarize=True, + summarize_style="bullet_points" + ) + + # Transcribe audio + result = await client.transcribe_url("https://example.com/audio.mp3") +""" + +import httpx +import base64 +from pathlib import Path +from typing import Optional, Dict, Any, Literal + + +OutputFormat = Literal["markdown", "json", "text", "html"] +SummarizeStyle = Literal["concise", "detailed", "bullet_points", "technical", "eli5"] + + +class DoclingClient: + """Async client for Docling Service""" + + def __init__(self, base_url: str = "http://localhost:8081", timeout: float = 300): + self.base_url = base_url.rstrip("/") + self.timeout = timeout + + async def health(self) -> dict: + """Check service health""" + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get(f"{self.base_url}/health") + return resp.json() + + async def stats(self) -> dict: + """Get processing statistics""" + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get(f"{self.base_url}/stats") + return resp.json() + + async def extract_url( + self, + url: str, + output_format: OutputFormat = "markdown", + summarize: bool = False, + summarize_style: SummarizeStyle = "concise", + index_to_search: bool = False, + metadata: Optional[Dict[str, Any]] = None, + ) -> dict: + """Extract content from a URL""" + async with httpx.AsyncClient(timeout=self.timeout) as client: + resp = await client.post( + f"{self.base_url}/extract", + json={ + "url": url, + "output_format": output_format, + "summarize": summarize, + "summarize_style": summarize_style, + "index_to_search": index_to_search, + "metadata": metadata, + }, + ) + return resp.json() + + async def extract_file( + self, + file_path: str, + output_format: OutputFormat = "markdown", + summarize: bool = False, + summarize_style: SummarizeStyle = "concise", + index_to_search: bool = False, + metadata: Optional[Dict[str, Any]] = None, + ) -> dict: + """Extract content from a local file""" + path = Path(file_path) + content = base64.b64encode(path.read_bytes()).decode() + + async with httpx.AsyncClient(timeout=self.timeout) as client: + resp = await client.post( + f"{self.base_url}/extract", + json={ + "base64_content": content, + "filename": path.name, + "output_format": output_format, + "summarize": summarize, + "summarize_style": summarize_style, + "index_to_search": index_to_search, + "metadata": metadata, + }, + ) + return resp.json() + + async def extract_bytes( + self, + content: bytes, + filename: str, + output_format: OutputFormat = "markdown", + summarize: bool = False, + summarize_style: SummarizeStyle = "concise", + index_to_search: bool = False, + metadata: Optional[Dict[str, Any]] = None, + ) -> dict: + """Extract content from bytes""" + b64_content = base64.b64encode(content).decode() + + async with httpx.AsyncClient(timeout=self.timeout) as client: + resp = await client.post( + f"{self.base_url}/extract", + json={ + "base64_content": b64_content, + "filename": filename, + "output_format": output_format, + "summarize": summarize, + "summarize_style": summarize_style, + "index_to_search": index_to_search, + "metadata": metadata, + }, + ) + return resp.json() + + async def transcribe_url( + self, + url: str, + language: Optional[str] = None, + summarize: bool = False, + summarize_style: SummarizeStyle = "concise", + ) -> dict: + """Transcribe audio from URL""" + async with httpx.AsyncClient(timeout=self.timeout) as client: + resp = await client.post( + f"{self.base_url}/transcribe", + json={ + "url": url, + "language": language, + "summarize": summarize, + "summarize_style": summarize_style, + }, + ) + return resp.json() + + async def transcribe_file( + self, + file_path: str, + language: Optional[str] = None, + summarize: bool = False, + summarize_style: SummarizeStyle = "concise", + ) -> dict: + """Transcribe audio from local file""" + path = Path(file_path) + content = base64.b64encode(path.read_bytes()).decode() + + async with httpx.AsyncClient(timeout=self.timeout) as client: + resp = await client.post( + f"{self.base_url}/transcribe", + json={ + "base64_content": content, + "language": language, + "summarize": summarize, + "summarize_style": summarize_style, + }, + ) + return resp.json() + + async def preview_url(self, url: str) -> dict: + """Quick preview of URL content""" + async with httpx.AsyncClient(timeout=60) as client: + resp = await client.post( + f"{self.base_url}/url/preview", + json=url, + ) + return resp.json() + + +# Sync wrapper for convenience +class DoclingClientSync: + """Synchronous client wrapper""" + + def __init__(self, base_url: str = "http://localhost:8081", timeout: float = 300): + self.base_url = base_url.rstrip("/") + self.timeout = timeout + + def extract_url(self, url: str, **kwargs) -> dict: + with httpx.Client(timeout=self.timeout) as client: + resp = client.post( + f"{self.base_url}/extract", + json={"url": url, **kwargs}, + ) + return resp.json() + + def transcribe_url(self, url: str, **kwargs) -> dict: + with httpx.Client(timeout=self.timeout) as client: + resp = client.post( + f"{self.base_url}/transcribe", + json={"url": url, **kwargs}, + ) + return resp.json() diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..3ae1bf4 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,58 @@ +services: + docling-service: + build: + context: . + dockerfile: Dockerfile + image: docling-service:latest + container_name: docling-service + restart: unless-stopped + environment: + # Connect to AI orchestrator for summarization (Ollama) + - AI_ORCHESTRATOR_URL=http://ai-orchestrator:8080 + # Connect to semantic search for indexing + - SEMANTIC_SEARCH_URL=http://semantic-search:8000 + # RunPod for Whisper transcription + - RUNPOD_API_KEY=${RUNPOD_API_KEY} + - RUNPOD_WHISPER_ENDPOINT=lrtisuv8ixbtub + labels: + # Traefik auto-discovery + - "traefik.enable=true" + # HTTP router + - "traefik.http.routers.docling.rule=Host(`docs.jeffemmett.com`)" + - "traefik.http.routers.docling.entrypoints=web" + - "traefik.http.services.docling.loadbalancer.server.port=8081" + # HTTPS router + - "traefik.http.routers.docling-secure.rule=Host(`docs.jeffemmett.com`)" + - "traefik.http.routers.docling-secure.entrypoints=websecure" + - "traefik.http.routers.docling-secure.tls=true" + # Health check for Traefik + - "traefik.http.services.docling.loadbalancer.healthcheck.path=/health" + - "traefik.http.services.docling.loadbalancer.healthcheck.interval=30s" + networks: + - traefik-public + - ai-internal + volumes: + # Cache for Docling models (persists across restarts) + - docling-cache:/home/appuser/.cache + deploy: + resources: + limits: + memory: 8G + reservations: + memory: 2G + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8081/health')"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + +volumes: + docling-cache: + driver: local + +networks: + traefik-public: + external: true + ai-internal: + external: true diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c3489b2 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,15 @@ +fastapi>=0.109.0 +uvicorn[standard]>=0.27.0 +httpx>=0.26.0 +pydantic>=2.0.0 +python-multipart>=0.0.6 + +# Docling and dependencies +docling>=2.0.0 +docling-core>=2.0.0 + +# OCR support (optional, for enhanced PDF/image processing) +easyocr>=1.7.0 + +# For audio file handling +pydub>=0.25.1 diff --git a/server.py b/server.py new file mode 100644 index 0000000..d7aac35 --- /dev/null +++ b/server.py @@ -0,0 +1,618 @@ +""" +Docling Service - Document extraction and processing for the AI stack + +Integrates with: +- AI Orchestrator (Ollama) for summarization +- RunPod Whisper for audio transcription +- Semantic Search for indexing extracted content +""" + +import os +import asyncio +import tempfile +import base64 +import hashlib +from pathlib import Path +from datetime import datetime +from typing import Optional, List, Dict, Any, Literal +from enum import Enum + +import httpx +from fastapi import FastAPI, HTTPException, UploadFile, File, Form, BackgroundTasks +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +from pydantic import BaseModel, HttpUrl + +# Docling imports +from docling.document_converter import DocumentConverter +from docling.datamodel.base_models import InputFormat +from docling.datamodel.pipeline_options import PdfPipelineOptions +from docling.document_converter import PdfFormatOption + +# Config from environment +AI_ORCHESTRATOR_URL = os.getenv("AI_ORCHESTRATOR_URL", "http://ai-orchestrator:8080") +SEMANTIC_SEARCH_URL = os.getenv("SEMANTIC_SEARCH_URL", "http://semantic-search:8000") +RUNPOD_API_KEY = os.getenv("RUNPOD_API_KEY", "") +RUNPOD_WHISPER_ENDPOINT = os.getenv("RUNPOD_WHISPER_ENDPOINT", "lrtisuv8ixbtub") + +# Supported formats +DOCUMENT_FORMATS = {".pdf", ".docx", ".pptx", ".xlsx", ".html", ".md", ".txt", ".epub"} +IMAGE_FORMATS = {".png", ".jpg", ".jpeg", ".tiff", ".bmp", ".gif"} +AUDIO_FORMATS = {".mp3", ".wav", ".m4a", ".ogg", ".flac", ".webm"} + +app = FastAPI( + title="Docling Service", + description="Document extraction and processing service using Docling", + version="1.0.0", +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Initialize document converter with optimized settings +pipeline_options = PdfPipelineOptions() +pipeline_options.do_ocr = True +pipeline_options.do_table_structure = True + +converter = DocumentConverter( + format_options={ + InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options) + } +) + +# Track processing stats +stats = { + "documents_processed": 0, + "pages_extracted": 0, + "audio_transcribed": 0, + "urls_fetched": 0, + "errors": 0, +} + + +class OutputFormat(str, Enum): + MARKDOWN = "markdown" + JSON = "json" + TEXT = "text" + HTML = "html" + + +class SummarizeStyle(str, Enum): + CONCISE = "concise" + DETAILED = "detailed" + BULLET_POINTS = "bullet_points" + TECHNICAL = "technical" + ELI5 = "eli5" # Explain like I'm 5 + + +class ExtractRequest(BaseModel): + """Request to extract content from a URL or base64-encoded file""" + url: Optional[HttpUrl] = None + base64_content: Optional[str] = None + filename: Optional[str] = None + output_format: OutputFormat = OutputFormat.MARKDOWN + summarize: bool = False + summarize_style: SummarizeStyle = SummarizeStyle.CONCISE + index_to_search: bool = False + metadata: Optional[Dict[str, Any]] = None + + +class TranscribeRequest(BaseModel): + """Request to transcribe audio""" + url: Optional[HttpUrl] = None + base64_content: Optional[str] = None + language: Optional[str] = None # Auto-detect if not specified + summarize: bool = False + summarize_style: SummarizeStyle = SummarizeStyle.CONCISE + + +class BatchExtractRequest(BaseModel): + """Batch extraction request""" + items: List[ExtractRequest] + + +class ExtractionResult(BaseModel): + """Result of document extraction""" + success: bool + source: str + content: Optional[str] = None + format: OutputFormat + metadata: Dict[str, Any] = {} + summary: Optional[str] = None + indexed: bool = False + error: Optional[str] = None + + +# ============== Helper Functions ============== + +def get_file_extension(filename: str) -> str: + """Get lowercase file extension""" + return Path(filename).suffix.lower() + + +def generate_doc_id(source: str, content: str) -> str: + """Generate a unique document ID""" + hash_input = f"{source}:{content[:1000]}" + return hashlib.sha256(hash_input.encode()).hexdigest()[:16] + + +async def fetch_url_content(url: str) -> tuple[bytes, str]: + """Fetch content from URL, return bytes and detected filename""" + async with httpx.AsyncClient(follow_redirects=True, timeout=60) as client: + resp = await client.get(url) + resp.raise_for_status() + + # Try to get filename from headers or URL + content_disposition = resp.headers.get("content-disposition", "") + if "filename=" in content_disposition: + filename = content_disposition.split("filename=")[1].strip('"\'') + else: + filename = url.split("/")[-1].split("?")[0] or "document" + + return resp.content, filename + + +async def transcribe_audio_runpod(audio_data: bytes, language: Optional[str] = None) -> dict: + """Transcribe audio using RunPod Whisper endpoint""" + if not RUNPOD_API_KEY: + raise HTTPException(status_code=500, detail="RunPod API key not configured") + + # Convert audio to base64 + audio_base64 = base64.b64encode(audio_data).decode() + + payload = { + "input": { + "audio_base64": audio_base64, + } + } + if language: + payload["input"]["language"] = language + + async with httpx.AsyncClient(timeout=300) as client: + # Submit job + resp = await client.post( + f"https://api.runpod.ai/v2/{RUNPOD_WHISPER_ENDPOINT}/run", + headers={ + "Authorization": f"Bearer {RUNPOD_API_KEY}", + "Content-Type": "application/json", + }, + json=payload, + ) + result = resp.json() + + if "error" in result: + raise HTTPException(status_code=500, detail=f"RunPod error: {result['error']}") + + job_id = result.get("id") + if not job_id: + raise HTTPException(status_code=500, detail="No job ID returned from RunPod") + + # Poll for completion + for _ in range(120): # Max 10 minutes + await asyncio.sleep(5) + status_resp = await client.get( + f"https://api.runpod.ai/v2/{RUNPOD_WHISPER_ENDPOINT}/status/{job_id}", + headers={"Authorization": f"Bearer {RUNPOD_API_KEY}"}, + ) + status_data = status_resp.json() + + if status_data.get("status") == "COMPLETED": + return status_data.get("output", {}) + elif status_data.get("status") in ["FAILED", "CANCELLED"]: + raise HTTPException( + status_code=500, + detail=f"Transcription failed: {status_data.get('error', 'Unknown error')}" + ) + + raise HTTPException(status_code=504, detail="Transcription timed out") + + +async def summarize_with_ollama(content: str, style: SummarizeStyle) -> str: + """Summarize content using AI Orchestrator (Ollama)""" + style_prompts = { + SummarizeStyle.CONCISE: "Provide a concise 2-3 sentence summary of the following content:", + SummarizeStyle.DETAILED: "Provide a detailed summary of the following content, covering all main points:", + SummarizeStyle.BULLET_POINTS: "Summarize the following content as bullet points:", + SummarizeStyle.TECHNICAL: "Provide a technical summary of the following content, focusing on key technical details:", + SummarizeStyle.ELI5: "Explain the following content in simple terms that a child could understand:", + } + + prompt = f"{style_prompts[style]}\n\n{content[:8000]}" # Limit content for context window + + async with httpx.AsyncClient(timeout=120) as client: + try: + resp = await client.post( + f"{AI_ORCHESTRATOR_URL}/api/generate/text", + json={ + "prompt": prompt, + "model": "llama3.2", + "max_tokens": 1024, + "priority": "low", # Use free Ollama + }, + ) + result = resp.json() + return result.get("response", "") + except Exception as e: + return f"[Summarization failed: {str(e)}]" + + +async def index_to_semantic_search( + doc_id: str, + content: str, + source: str, + metadata: Dict[str, Any], +) -> bool: + """Index document to semantic search service""" + async with httpx.AsyncClient(timeout=30) as client: + try: + resp = await client.post( + f"{SEMANTIC_SEARCH_URL}/index", + json={ + "id": doc_id, + "content": content, + "metadata": { + "source": source, + "indexed_at": datetime.now().isoformat(), + **metadata, + }, + }, + ) + return resp.status_code == 200 + except Exception: + return False + + +def extract_with_docling(file_path: Path, output_format: OutputFormat) -> tuple[str, dict]: + """Extract content from document using Docling""" + result = converter.convert(str(file_path)) + doc = result.document + + # Get metadata + metadata = { + "pages": len(doc.pages) if hasattr(doc, "pages") else 0, + "tables": len(doc.tables) if hasattr(doc, "tables") else 0, + "figures": len(doc.pictures) if hasattr(doc, "pictures") else 0, + } + + # Export in requested format + if output_format == OutputFormat.MARKDOWN: + content = doc.export_to_markdown() + elif output_format == OutputFormat.JSON: + content = doc.export_to_dict() + elif output_format == OutputFormat.HTML: + content = doc.export_to_html() + else: # TEXT + content = doc.export_to_markdown() # Markdown is readable as plain text + + return content if isinstance(content, str) else str(content), metadata + + +# ============== API Endpoints ============== + +@app.get("/") +async def root(): + """Service info and health check""" + return { + "service": "Docling Service", + "version": "1.0.0", + "status": "healthy", + "supported_formats": { + "documents": list(DOCUMENT_FORMATS), + "images": list(IMAGE_FORMATS), + "audio": list(AUDIO_FORMATS), + }, + "integrations": { + "ai_orchestrator": AI_ORCHESTRATOR_URL, + "semantic_search": SEMANTIC_SEARCH_URL, + "runpod_whisper": f"endpoint:{RUNPOD_WHISPER_ENDPOINT}", + }, + } + + +@app.get("/health") +async def health(): + """Health check endpoint for Traefik""" + return {"status": "healthy", "timestamp": datetime.now().isoformat()} + + +@app.get("/stats") +async def get_stats(): + """Get processing statistics""" + return stats + + +@app.post("/extract", response_model=ExtractionResult) +async def extract_document(request: ExtractRequest): + """ + Extract content from a document (URL or base64). + + Supports: PDF, DOCX, PPTX, XLSX, HTML, MD, TXT, EPUB, images (with OCR) + """ + try: + # Get content + if request.url: + content_bytes, filename = await fetch_url_content(str(request.url)) + source = str(request.url) + elif request.base64_content: + content_bytes = base64.b64decode(request.base64_content) + filename = request.filename or "document" + source = f"base64:{filename}" + else: + raise HTTPException(status_code=400, detail="Provide either url or base64_content") + + ext = get_file_extension(filename) + + # Handle audio separately + if ext in AUDIO_FORMATS: + raise HTTPException( + status_code=400, + detail="Use /transcribe endpoint for audio files" + ) + + # Write to temp file for Docling + with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: + tmp.write(content_bytes) + tmp_path = Path(tmp.name) + + try: + # Extract content + content, metadata = extract_with_docling(tmp_path, request.output_format) + stats["documents_processed"] += 1 + stats["pages_extracted"] += metadata.get("pages", 1) + + # Summarize if requested + summary = None + if request.summarize: + summary = await summarize_with_ollama(content, request.summarize_style) + + # Index if requested + indexed = False + if request.index_to_search: + doc_id = generate_doc_id(source, content) + indexed = await index_to_semantic_search( + doc_id=doc_id, + content=content, + source=source, + metadata={**metadata, **(request.metadata or {})}, + ) + + return ExtractionResult( + success=True, + source=source, + content=content, + format=request.output_format, + metadata=metadata, + summary=summary, + indexed=indexed, + ) + finally: + tmp_path.unlink(missing_ok=True) + + except HTTPException: + raise + except Exception as e: + stats["errors"] += 1 + return ExtractionResult( + success=False, + source=str(request.url or request.filename or "unknown"), + format=request.output_format, + error=str(e), + ) + + +@app.post("/extract/upload", response_model=ExtractionResult) +async def extract_uploaded_file( + file: UploadFile = File(...), + output_format: OutputFormat = Form(OutputFormat.MARKDOWN), + summarize: bool = Form(False), + summarize_style: SummarizeStyle = Form(SummarizeStyle.CONCISE), + index_to_search: bool = Form(False), +): + """Extract content from an uploaded file""" + try: + content_bytes = await file.read() + ext = get_file_extension(file.filename or "document") + + if ext in AUDIO_FORMATS: + raise HTTPException( + status_code=400, + detail="Use /transcribe/upload endpoint for audio files" + ) + + with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: + tmp.write(content_bytes) + tmp_path = Path(tmp.name) + + try: + content, metadata = extract_with_docling(tmp_path, output_format) + stats["documents_processed"] += 1 + stats["pages_extracted"] += metadata.get("pages", 1) + + summary = None + if summarize: + summary = await summarize_with_ollama(content, summarize_style) + + indexed = False + if index_to_search: + doc_id = generate_doc_id(file.filename or "upload", content) + indexed = await index_to_semantic_search( + doc_id=doc_id, + content=content, + source=f"upload:{file.filename}", + metadata=metadata, + ) + + return ExtractionResult( + success=True, + source=f"upload:{file.filename}", + content=content, + format=output_format, + metadata=metadata, + summary=summary, + indexed=indexed, + ) + finally: + tmp_path.unlink(missing_ok=True) + + except HTTPException: + raise + except Exception as e: + stats["errors"] += 1 + return ExtractionResult( + success=False, + source=f"upload:{file.filename}", + format=output_format, + error=str(e), + ) + + +@app.post("/transcribe") +async def transcribe_audio(request: TranscribeRequest): + """ + Transcribe audio using RunPod Whisper. + + Supports: MP3, WAV, M4A, OGG, FLAC, WEBM + """ + try: + if request.url: + content_bytes, filename = await fetch_url_content(str(request.url)) + source = str(request.url) + elif request.base64_content: + content_bytes = base64.b64decode(request.base64_content) + source = "base64:audio" + else: + raise HTTPException(status_code=400, detail="Provide either url or base64_content") + + # Transcribe + result = await transcribe_audio_runpod(content_bytes, request.language) + stats["audio_transcribed"] += 1 + + transcript = result.get("transcription", result.get("text", "")) + + # Summarize if requested + summary = None + if request.summarize and transcript: + summary = await summarize_with_ollama(transcript, request.summarize_style) + + return { + "success": True, + "source": source, + "transcript": transcript, + "language": result.get("detected_language"), + "duration": result.get("duration"), + "summary": summary, + } + + except HTTPException: + raise + except Exception as e: + stats["errors"] += 1 + return { + "success": False, + "source": str(request.url or "base64"), + "error": str(e), + } + + +@app.post("/transcribe/upload") +async def transcribe_uploaded_audio( + file: UploadFile = File(...), + language: Optional[str] = Form(None), + summarize: bool = Form(False), + summarize_style: SummarizeStyle = Form(SummarizeStyle.CONCISE), +): + """Transcribe uploaded audio file""" + try: + content_bytes = await file.read() + + result = await transcribe_audio_runpod(content_bytes, language) + stats["audio_transcribed"] += 1 + + transcript = result.get("transcription", result.get("text", "")) + + summary = None + if summarize and transcript: + summary = await summarize_with_ollama(transcript, summarize_style) + + return { + "success": True, + "source": f"upload:{file.filename}", + "transcript": transcript, + "language": result.get("detected_language"), + "duration": result.get("duration"), + "summary": summary, + } + + except HTTPException: + raise + except Exception as e: + stats["errors"] += 1 + return { + "success": False, + "source": f"upload:{file.filename}", + "error": str(e), + } + + +@app.post("/batch") +async def batch_extract(request: BatchExtractRequest, background_tasks: BackgroundTasks): + """ + Batch extract multiple documents. + Returns immediately with job ID, processes in background. + """ + job_id = hashlib.sha256(str(datetime.now()).encode()).hexdigest()[:16] + + # For now, process synchronously (can be enhanced with Redis queue later) + results = [] + for item in request.items: + result = await extract_document(item) + results.append(result) + + return { + "job_id": job_id, + "total": len(request.items), + "results": results, + } + + +@app.post("/url/preview") +async def preview_url(url: HttpUrl): + """Quick preview of URL content (first 500 chars of markdown)""" + try: + content_bytes, filename = await fetch_url_content(str(url)) + stats["urls_fetched"] += 1 + + ext = get_file_extension(filename) + + with tempfile.NamedTemporaryFile(suffix=ext or ".html", delete=False) as tmp: + tmp.write(content_bytes) + tmp_path = Path(tmp.name) + + try: + content, metadata = extract_with_docling(tmp_path, OutputFormat.MARKDOWN) + return { + "success": True, + "url": str(url), + "preview": content[:500] + ("..." if len(content) > 500 else ""), + "full_length": len(content), + "metadata": metadata, + } + finally: + tmp_path.unlink(missing_ok=True) + + except Exception as e: + return { + "success": False, + "url": str(url), + "error": str(e), + } + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8081)