📄 Input Source
Drop a file here or click to browse
PDF, DOCX, PPTX, XLSX, HTML, MD, TXT, EPUB, images, audio
""" Docling Service - Document extraction and processing for the AI stack Integrates with: - AI Orchestrator (Ollama) for summarization - RunPod Whisper for audio transcription - Semantic Search for indexing extracted content """ import os import asyncio import tempfile import base64 import hashlib from pathlib import Path from datetime import datetime from typing import Optional, List, Dict, Any, Literal from enum import Enum import httpx from fastapi import FastAPI, HTTPException, UploadFile, File, Form, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, HTMLResponse from pydantic import BaseModel, HttpUrl # Docling imports from docling.document_converter import DocumentConverter from docling.datamodel.base_models import InputFormat from docling.datamodel.pipeline_options import PdfPipelineOptions from docling.document_converter import PdfFormatOption # Config from environment AI_ORCHESTRATOR_URL = os.getenv("AI_ORCHESTRATOR_URL", "http://ai-orchestrator:8080") SEMANTIC_SEARCH_URL = os.getenv("SEMANTIC_SEARCH_URL", "http://semantic-search:8000") RUNPOD_API_KEY = os.getenv("RUNPOD_API_KEY", "") RUNPOD_WHISPER_ENDPOINT = os.getenv("RUNPOD_WHISPER_ENDPOINT", "lrtisuv8ixbtub") # Supported formats DOCUMENT_FORMATS = {".pdf", ".docx", ".pptx", ".xlsx", ".html", ".md", ".txt", ".epub"} IMAGE_FORMATS = {".png", ".jpg", ".jpeg", ".tiff", ".bmp", ".gif"} AUDIO_FORMATS = {".mp3", ".wav", ".m4a", ".ogg", ".flac", ".webm"} app = FastAPI( title="Docling Service", description="Document extraction and processing service using Docling", version="1.0.0", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize document converter with optimized settings pipeline_options = PdfPipelineOptions() pipeline_options.do_ocr = True pipeline_options.do_table_structure = True converter = DocumentConverter( format_options={ InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options) } ) # Track processing stats stats = { "documents_processed": 0, "pages_extracted": 0, "audio_transcribed": 0, "urls_fetched": 0, "errors": 0, } class OutputFormat(str, Enum): MARKDOWN = "markdown" JSON = "json" TEXT = "text" HTML = "html" class SummarizeStyle(str, Enum): CONCISE = "concise" DETAILED = "detailed" BULLET_POINTS = "bullet_points" TECHNICAL = "technical" ELI5 = "eli5" # Explain like I'm 5 class ExtractRequest(BaseModel): """Request to extract content from a URL or base64-encoded file""" url: Optional[HttpUrl] = None base64_content: Optional[str] = None filename: Optional[str] = None output_format: OutputFormat = OutputFormat.MARKDOWN summarize: bool = False summarize_style: SummarizeStyle = SummarizeStyle.CONCISE index_to_search: bool = False metadata: Optional[Dict[str, Any]] = None class TranscribeRequest(BaseModel): """Request to transcribe audio""" url: Optional[HttpUrl] = None base64_content: Optional[str] = None language: Optional[str] = None # Auto-detect if not specified summarize: bool = False summarize_style: SummarizeStyle = SummarizeStyle.CONCISE class BatchExtractRequest(BaseModel): """Batch extraction request""" items: List[ExtractRequest] class ExtractionResult(BaseModel): """Result of document extraction""" success: bool source: str content: Optional[str] = None format: OutputFormat metadata: Dict[str, Any] = {} summary: Optional[str] = None indexed: bool = False error: Optional[str] = None # ============== Helper Functions ============== def get_file_extension(filename: str) -> str: """Get lowercase file extension""" return Path(filename).suffix.lower() def generate_doc_id(source: str, content: str) -> str: """Generate a unique document ID""" hash_input = f"{source}:{content[:1000]}" return hashlib.sha256(hash_input.encode()).hexdigest()[:16] async def fetch_url_content(url: str) -> tuple[bytes, str]: """Fetch content from URL, return bytes and detected filename""" async with httpx.AsyncClient(follow_redirects=True, timeout=60) as client: resp = await client.get(url) resp.raise_for_status() # Try to get filename from headers or URL content_disposition = resp.headers.get("content-disposition", "") if "filename=" in content_disposition: filename = content_disposition.split("filename=")[1].strip('"\'') else: filename = url.split("/")[-1].split("?")[0] or "document" return resp.content, filename async def transcribe_audio_runpod(audio_data: bytes, language: Optional[str] = None) -> dict: """Transcribe audio using RunPod Whisper endpoint""" if not RUNPOD_API_KEY: raise HTTPException(status_code=500, detail="RunPod API key not configured") # Convert audio to base64 audio_base64 = base64.b64encode(audio_data).decode() payload = { "input": { "audio_base64": audio_base64, } } if language: payload["input"]["language"] = language async with httpx.AsyncClient(timeout=300) as client: # Submit job resp = await client.post( f"https://api.runpod.ai/v2/{RUNPOD_WHISPER_ENDPOINT}/run", headers={ "Authorization": f"Bearer {RUNPOD_API_KEY}", "Content-Type": "application/json", }, json=payload, ) result = resp.json() if "error" in result: raise HTTPException(status_code=500, detail=f"RunPod error: {result['error']}") job_id = result.get("id") if not job_id: raise HTTPException(status_code=500, detail="No job ID returned from RunPod") # Poll for completion for _ in range(120): # Max 10 minutes await asyncio.sleep(5) status_resp = await client.get( f"https://api.runpod.ai/v2/{RUNPOD_WHISPER_ENDPOINT}/status/{job_id}", headers={"Authorization": f"Bearer {RUNPOD_API_KEY}"}, ) status_data = status_resp.json() if status_data.get("status") == "COMPLETED": return status_data.get("output", {}) elif status_data.get("status") in ["FAILED", "CANCELLED"]: raise HTTPException( status_code=500, detail=f"Transcription failed: {status_data.get('error', 'Unknown error')}" ) raise HTTPException(status_code=504, detail="Transcription timed out") async def summarize_with_ollama(content: str, style: SummarizeStyle) -> str: """Summarize content using AI Orchestrator (Ollama)""" style_prompts = { SummarizeStyle.CONCISE: "Provide a concise 2-3 sentence summary of the following content:", SummarizeStyle.DETAILED: "Provide a detailed summary of the following content, covering all main points:", SummarizeStyle.BULLET_POINTS: "Summarize the following content as bullet points:", SummarizeStyle.TECHNICAL: "Provide a technical summary of the following content, focusing on key technical details:", SummarizeStyle.ELI5: "Explain the following content in simple terms that a child could understand:", } prompt = f"{style_prompts[style]}\n\n{content[:8000]}" # Limit content for context window async with httpx.AsyncClient(timeout=120) as client: try: resp = await client.post( f"{AI_ORCHESTRATOR_URL}/api/generate/text", json={ "prompt": prompt, "model": "llama3.2", "max_tokens": 1024, "priority": "low", # Use free Ollama }, ) result = resp.json() return result.get("response", "") except Exception as e: return f"[Summarization failed: {str(e)}]" async def index_to_semantic_search( doc_id: str, content: str, source: str, metadata: Dict[str, Any], ) -> bool: """Index document to semantic search service""" async with httpx.AsyncClient(timeout=30) as client: try: resp = await client.post( f"{SEMANTIC_SEARCH_URL}/index", json={ "id": doc_id, "content": content, "metadata": { "source": source, "indexed_at": datetime.now().isoformat(), **metadata, }, }, ) return resp.status_code == 200 except Exception: return False def extract_with_docling(file_path: Path, output_format: OutputFormat) -> tuple[str, dict]: """Extract content from document using Docling""" result = converter.convert(str(file_path)) doc = result.document # Get metadata metadata = { "pages": len(doc.pages) if hasattr(doc, "pages") else 0, "tables": len(doc.tables) if hasattr(doc, "tables") else 0, "figures": len(doc.pictures) if hasattr(doc, "pictures") else 0, } # Export in requested format if output_format == OutputFormat.MARKDOWN: content = doc.export_to_markdown() elif output_format == OutputFormat.JSON: content = doc.export_to_dict() elif output_format == OutputFormat.HTML: content = doc.export_to_html() else: # TEXT content = doc.export_to_markdown() # Markdown is readable as plain text return content if isinstance(content, str) else str(content), metadata # ============== API Endpoints ============== @app.get("/", response_class=HTMLResponse) async def dashboard(): """Web interface for document processing""" html = """
Extract, summarize, and index documents into your knowledge base
Drop a file here or click to browse
PDF, DOCX, PPTX, XLSX, HTML, MD, TXT, EPUB, images, audio