""" PDF OCR Service Converts image-based PDFs to searchable text PDFs using ocrmypdf """ import os import subprocess import tempfile import asyncio from pathlib import Path from typing import Optional from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks from fastapi.responses import FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware import aiofiles app = FastAPI( title="PDF OCR Service", description="Convert scanned/image-based PDFs to searchable text PDFs", version="1.0.0" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) UPLOAD_DIR = Path("/app/uploads") PROCESSED_DIR = Path("/app/processed") # Ensure directories exist UPLOAD_DIR.mkdir(exist_ok=True) PROCESSED_DIR.mkdir(exist_ok=True) def run_ocr(input_path: Path, output_path: Path, language: str = "eng") -> dict: """Run ocrmypdf on a PDF file""" try: result = subprocess.run( [ "ocrmypdf", "--skip-text", # Skip pages that already have text "--optimize", "1", # Light optimization "--language", language, "--output-type", "pdf", "--jobs", "2", # Parallel processing str(input_path), str(output_path) ], capture_output=True, text=True, timeout=600 # 10 minute timeout ) if result.returncode == 0: return {"success": True, "message": "OCR completed successfully"} elif result.returncode == 6: # Exit code 6 means the PDF already has text # Copy the original file import shutil shutil.copy(input_path, output_path) return {"success": True, "message": "PDF already contains text, copied as-is"} else: return { "success": False, "message": f"OCR failed: {result.stderr}", "returncode": result.returncode } except subprocess.TimeoutExpired: return {"success": False, "message": "OCR timed out after 10 minutes"} except Exception as e: return {"success": False, "message": str(e)} @app.get("/") async def root(): return { "service": "PDF OCR Service", "version": "1.0.0", "endpoints": { "POST /ocr": "Upload PDF and get OCR'd version", "GET /health": "Health check" } } @app.get("/health") async def health(): # Check if ocrmypdf is available try: result = subprocess.run(["ocrmypdf", "--version"], capture_output=True, text=True) ocrmypdf_version = result.stdout.strip() except: ocrmypdf_version = "not available" try: result = subprocess.run(["tesseract", "--version"], capture_output=True, text=True) tesseract_version = result.stdout.split("\n")[0] except: tesseract_version = "not available" return { "status": "healthy", "ocrmypdf": ocrmypdf_version, "tesseract": tesseract_version } @app.post("/ocr") async def ocr_pdf( file: UploadFile = File(...), language: str = "eng" ): """ Upload a PDF and receive an OCR'd searchable version. - **file**: PDF file to process - **language**: OCR language (default: eng) Returns the processed PDF file. """ if not file.filename.lower().endswith('.pdf'): raise HTTPException(status_code=400, detail="Only PDF files are supported") # Save uploaded file input_path = UPLOAD_DIR / file.filename async with aiofiles.open(input_path, 'wb') as f: content = await file.read() await f.write(content) # Generate output filename output_filename = f"ocr_{file.filename}" output_path = PROCESSED_DIR / output_filename # Run OCR result = run_ocr(input_path, output_path, language) if not result["success"]: # Cleanup if input_path.exists(): input_path.unlink() raise HTTPException(status_code=500, detail=result["message"]) # Check if output was created if not output_path.exists(): raise HTTPException(status_code=500, detail="OCR completed but output file not found") # Return the processed file return FileResponse( path=output_path, filename=output_filename, media_type="application/pdf", headers={ "X-OCR-Status": result["message"] } ) @app.post("/ocr/async") async def ocr_pdf_async( file: UploadFile = File(...), language: str = "eng", background_tasks: BackgroundTasks = None ): """ Upload a PDF for async OCR processing. Returns a job ID to check status later. """ import uuid if not file.filename.lower().endswith('.pdf'): raise HTTPException(status_code=400, detail="Only PDF files are supported") job_id = str(uuid.uuid4())[:8] # Save uploaded file with job ID input_path = UPLOAD_DIR / f"{job_id}_{file.filename}" async with aiofiles.open(input_path, 'wb') as f: content = await file.read() await f.write(content) output_path = PROCESSED_DIR / f"ocr_{job_id}_{file.filename}" # Create status file status_path = PROCESSED_DIR / f"{job_id}.status" async with aiofiles.open(status_path, 'w') as f: await f.write("processing") # Run OCR in background def process_ocr(): result = run_ocr(input_path, output_path, language) with open(status_path, 'w') as f: if result["success"]: f.write(f"completed:{output_path.name}") else: f.write(f"failed:{result['message']}") background_tasks.add_task(process_ocr) return { "job_id": job_id, "status": "processing", "check_status": f"/ocr/status/{job_id}", "download": f"/ocr/download/{job_id}" } @app.get("/ocr/status/{job_id}") async def get_ocr_status(job_id: str): """Check the status of an async OCR job""" status_path = PROCESSED_DIR / f"{job_id}.status" if not status_path.exists(): raise HTTPException(status_code=404, detail="Job not found") async with aiofiles.open(status_path, 'r') as f: status = await f.read() if status == "processing": return {"job_id": job_id, "status": "processing"} elif status.startswith("completed:"): filename = status.split(":", 1)[1] return { "job_id": job_id, "status": "completed", "download": f"/ocr/download/{job_id}" } elif status.startswith("failed:"): error = status.split(":", 1)[1] return {"job_id": job_id, "status": "failed", "error": error} return {"job_id": job_id, "status": "unknown"} @app.get("/ocr/download/{job_id}") async def download_ocr_result(job_id: str): """Download the OCR'd PDF for a completed job""" status_path = PROCESSED_DIR / f"{job_id}.status" if not status_path.exists(): raise HTTPException(status_code=404, detail="Job not found") async with aiofiles.open(status_path, 'r') as f: status = await f.read() if not status.startswith("completed:"): raise HTTPException(status_code=400, detail="Job not completed yet") filename = status.split(":", 1)[1] output_path = PROCESSED_DIR / filename if not output_path.exists(): raise HTTPException(status_code=404, detail="Output file not found") return FileResponse( path=output_path, filename=filename, media_type="application/pdf" )