""" rDesign — Document design & PDF generation service. Headless Scribus automation API, following the blender-automation pattern. """ import asyncio import os import uuid import glob as globmod import time from datetime import datetime, timezone from pathlib import Path from typing import Optional import yaml import aiofiles from fastapi import FastAPI, BackgroundTasks, HTTPException, UploadFile, File, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse from pydantic import BaseModel, Field # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- BASE_URL = os.getenv("BASE_URL", "https://scribus.rspace.online") SCRIBUS_PATH = os.getenv("SCRIBUS_PATH", "/usr/bin/scribus") TEMPLATES_DIR = Path(os.getenv("TEMPLATES_DIR", "/app/templates")) OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "/app/output")) JOBS_DIR = Path(os.getenv("JOBS_DIR", "/app/jobs")) RSWAG_DESIGNS_PATH = Path(os.getenv("RSWAG_DESIGNS_PATH", "/app/rswag-designs")) # --------------------------------------------------------------------------- # App # --------------------------------------------------------------------------- app = FastAPI( title="rDesign", description="Document design & PDF generation — headless Scribus automation", version="0.1.0", ) app.add_middleware( CORSMiddleware, allow_origins=[ "https://scribus.rspace.online", "https://rswag.online", ], allow_methods=["*"], allow_headers=["*"], ) # --------------------------------------------------------------------------- # In-memory job store (same pattern as blender-automation) # --------------------------------------------------------------------------- jobs: dict[str, dict] = {} # --------------------------------------------------------------------------- # Models # --------------------------------------------------------------------------- class ExportRequest(BaseModel): """Generate a PDF from a Scribus template.""" template: str = Field(..., description="Template slug (directory name under /templates)") output_format: str = Field("pdf", description="Output format: pdf, png, svg") variables: dict[str, str] = Field(default_factory=dict, description="Template variables to substitute") dpi: int = Field(300, ge=72, le=600) class BatchExportRequest(BaseModel): """Generate multiple PDFs from a template + data rows (ScribusGenerator pattern).""" template: str output_format: str = Field("pdf", description="pdf or png") rows: list[dict[str, str]] = Field(..., description="List of variable dicts, one per output document") dpi: int = Field(300, ge=72, le=600) class RswagExportRequest(BaseModel): """Export an rSwag design to print-ready PDF with bleed/crop marks.""" design_slug: str category: str = Field("stickers", description="rSwag category: stickers, shirts, prints") paper_size: str = Field("A4", description="Paper size: A4, A3, Letter, custom") add_bleed: bool = Field(True, description="Add 3mm bleed") add_crop_marks: bool = Field(True, description="Add crop/trim marks") class ConvertIdmlRequest(BaseModel): """Convert an IDML file (InDesign interchange) to SLA and/or PDF.""" output_sla: bool = Field(True, description="Save as Scribus .sla file") output_pdf: bool = Field(True, description="Export to PDF") dpi: int = Field(300, ge=72, le=600) class TemplateInfo(BaseModel): slug: str name: str description: str category: str variables: list[str] preview_url: Optional[str] = None created: Optional[str] = None class JobStatus(BaseModel): job_id: str status: str # queued, processing, completed, failed progress: int = 0 result_url: Optional[str] = None error: Optional[str] = None created_at: str completed_at: Optional[str] = None # --------------------------------------------------------------------------- # Template management # --------------------------------------------------------------------------- def scan_templates() -> list[TemplateInfo]: """Scan templates directory for available Scribus templates.""" templates = [] for meta_path in sorted(TEMPLATES_DIR.glob("*/metadata.yaml")): try: with open(meta_path) as f: meta = yaml.safe_load(f) slug = meta_path.parent.name preview = None for ext in ("png", "jpg", "webp"): pf = meta_path.parent / f"preview.{ext}" if pf.exists(): preview = f"{BASE_URL}/templates/{slug}/preview" break templates.append(TemplateInfo( slug=slug, name=meta.get("name", slug), description=meta.get("description", ""), category=meta.get("category", "general"), variables=meta.get("variables", []), preview_url=preview, created=meta.get("created"), )) except Exception: continue return templates def find_template_sla(slug: str) -> Path: """Find the .sla file for a template slug.""" tpl_dir = TEMPLATES_DIR / slug if not tpl_dir.is_dir(): raise HTTPException(404, f"Template '{slug}' not found") sla_files = list(tpl_dir.glob("*.sla")) if not sla_files: raise HTTPException(404, f"No .sla file found in template '{slug}'") return sla_files[0] # --------------------------------------------------------------------------- # Scribus headless execution # --------------------------------------------------------------------------- async def run_scribus_script(script_path: str, args: list[str], timeout: int = 120) -> tuple[str, str, int]: """Run a Scribus Python script in headless mode via xvfb-run.""" cmd = [ "xvfb-run", "--auto-servernum", "--server-args=-screen 0 1920x1080x24", SCRIBUS_PATH, "-g", "-ns", "-py", script_path, "--", *args, ] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.communicate() return "", "Process timed out", -1 return stdout.decode(), stderr.decode(), proc.returncode # --------------------------------------------------------------------------- # Job processing # --------------------------------------------------------------------------- async def process_export_job(job_id: str, req: ExportRequest): """Process a single template export job.""" jobs[job_id]["status"] = "processing" jobs[job_id]["progress"] = 10 try: sla_path = find_template_sla(req.template) timestamp = int(time.time()) output_filename = f"{req.template}_{timestamp}.{req.output_format}" output_path = OUTPUT_DIR / output_filename # Build script args script_args = [ "--input", str(sla_path), "--output", str(output_path), "--format", req.output_format, "--dpi", str(req.dpi), ] # Pass variables as --var key=value for k, v in req.variables.items(): script_args.extend(["--var", f"{k}={v}"]) jobs[job_id]["progress"] = 30 stdout, stderr, returncode = await run_scribus_script( "/app/scripts/export_document.py", script_args ) if returncode != 0 or not output_path.exists(): jobs[job_id]["status"] = "failed" jobs[job_id]["error"] = stderr[:500] if stderr else "Export failed — no output produced" return jobs[job_id]["progress"] = 100 jobs[job_id]["status"] = "completed" jobs[job_id]["result_url"] = f"{BASE_URL}/output/{output_filename}" jobs[job_id]["completed_at"] = datetime.now(timezone.utc).isoformat() except HTTPException as e: jobs[job_id]["status"] = "failed" jobs[job_id]["error"] = e.detail except Exception as e: jobs[job_id]["status"] = "failed" jobs[job_id]["error"] = str(e)[:500] async def process_batch_job(job_id: str, req: BatchExportRequest): """Process a batch export job (ScribusGenerator pattern).""" jobs[job_id]["status"] = "processing" total = len(req.rows) results = [] try: sla_path = find_template_sla(req.template) for i, row in enumerate(req.rows): jobs[job_id]["progress"] = int((i / total) * 100) timestamp = int(time.time()) row_id = row.get("id", str(i)) output_filename = f"{req.template}_{row_id}_{timestamp}.{req.output_format}" output_path = OUTPUT_DIR / output_filename script_args = [ "--input", str(sla_path), "--output", str(output_path), "--format", req.output_format, "--dpi", str(req.dpi), ] for k, v in row.items(): script_args.extend(["--var", f"{k}={v}"]) stdout, stderr, returncode = await run_scribus_script( "/app/scripts/export_document.py", script_args ) if returncode == 0 and output_path.exists(): results.append({ "row_id": row_id, "url": f"{BASE_URL}/output/{output_filename}", "status": "ok", }) else: results.append({ "row_id": row_id, "status": "failed", "error": (stderr or "unknown error")[:200], }) jobs[job_id]["progress"] = 100 jobs[job_id]["status"] = "completed" jobs[job_id]["results"] = results jobs[job_id]["completed_at"] = datetime.now(timezone.utc).isoformat() except Exception as e: jobs[job_id]["status"] = "failed" jobs[job_id]["error"] = str(e)[:500] async def process_rswag_export(job_id: str, req: RswagExportRequest): """Export an rSwag design to print-ready PDF.""" jobs[job_id]["status"] = "processing" jobs[job_id]["progress"] = 10 try: # Find the rSwag design design_dir = RSWAG_DESIGNS_PATH / req.category / req.design_slug if not design_dir.is_dir(): raise FileNotFoundError(f"rSwag design not found: {req.category}/{req.design_slug}") # Find the design image design_image = None for candidate in [ design_dir / "exports" / "300dpi" / f"{req.design_slug}.png", design_dir / f"{req.design_slug}.png", ]: if candidate.exists(): design_image = candidate break # Fallback: find any PNG if not design_image: pngs = list(design_dir.glob("*.png")) if pngs: design_image = pngs[0] if not design_image: raise FileNotFoundError(f"No image found for design: {req.design_slug}") # Load design metadata if available meta_path = design_dir / "metadata.yaml" meta = {} if meta_path.exists(): with open(meta_path) as f: meta = yaml.safe_load(f) or {} jobs[job_id]["progress"] = 30 timestamp = int(time.time()) output_filename = f"rswag_{req.design_slug}_{timestamp}.pdf" output_path = OUTPUT_DIR / output_filename script_args = [ "--image", str(design_image), "--output", str(output_path), "--paper", req.paper_size, "--dpi", str(300), "--title", meta.get("name", req.design_slug), ] if req.add_bleed: script_args.append("--bleed") if req.add_crop_marks: script_args.append("--crop-marks") stdout, stderr, returncode = await run_scribus_script( "/app/scripts/rswag_export.py", script_args, timeout=180 ) if returncode != 0 or not output_path.exists(): jobs[job_id]["status"] = "failed" jobs[job_id]["error"] = stderr[:500] if stderr else "rSwag export failed" return jobs[job_id]["progress"] = 100 jobs[job_id]["status"] = "completed" jobs[job_id]["result_url"] = f"{BASE_URL}/output/{output_filename}" jobs[job_id]["completed_at"] = datetime.now(timezone.utc).isoformat() except Exception as e: jobs[job_id]["status"] = "failed" jobs[job_id]["error"] = str(e)[:500] async def process_idml_convert(job_id: str, idml_path: Path, req: ConvertIdmlRequest): """Convert an IDML file to SLA and/or PDF.""" jobs[job_id]["status"] = "processing" jobs[job_id]["progress"] = 10 try: timestamp = int(time.time()) stem = idml_path.stem results = {} script_args = ["--input", str(idml_path)] if req.output_sla: sla_filename = f"{stem}_{timestamp}.sla" sla_path = OUTPUT_DIR / sla_filename script_args.extend(["--output-sla", str(sla_path)]) if req.output_pdf: pdf_filename = f"{stem}_{timestamp}.pdf" pdf_path = OUTPUT_DIR / pdf_filename script_args.extend(["--output-pdf", str(pdf_path)]) script_args.extend(["--dpi", str(req.dpi)]) jobs[job_id]["progress"] = 30 stdout, stderr, returncode = await run_scribus_script( "/app/scripts/convert_idml.py", script_args, timeout=300 ) if returncode != 0: jobs[job_id]["status"] = "failed" jobs[job_id]["error"] = stderr[:500] if stderr else "IDML conversion failed" return if req.output_sla and sla_path.exists(): results["sla_url"] = f"{BASE_URL}/output/{sla_filename}" if req.output_pdf and pdf_path.exists(): results["pdf_url"] = f"{BASE_URL}/output/{pdf_filename}" # Save as template too (SLA goes into templates dir) if req.output_sla and sla_path.exists(): tpl_dir = TEMPLATES_DIR / stem tpl_dir.mkdir(parents=True, exist_ok=True) import shutil shutil.copy2(sla_path, tpl_dir / f"{stem}.sla") meta = { "name": stem.replace("-", " ").replace("_", " ").title(), "description": f"Imported from IDML: {idml_path.name}", "category": "imported", "variables": [], "created": datetime.now(timezone.utc).strftime("%Y-%m-%d"), "source": {"format": "idml", "original": idml_path.name}, } with open(tpl_dir / "metadata.yaml", "w") as f: yaml.dump(meta, f, default_flow_style=False) results["template_slug"] = stem jobs[job_id]["progress"] = 100 jobs[job_id]["status"] = "completed" jobs[job_id]["results"] = results jobs[job_id]["completed_at"] = datetime.now(timezone.utc).isoformat() except Exception as e: jobs[job_id]["status"] = "failed" jobs[job_id]["error"] = str(e)[:500] # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @app.get("/", tags=["health"]) async def root(): return {"service": "rDesign", "version": "0.1.0", "status": "ok"} @app.get("/health", tags=["health"]) async def health(): scribus_exists = os.path.isfile(SCRIBUS_PATH) return { "status": "ok" if scribus_exists else "degraded", "scribus": "available" if scribus_exists else "missing", "templates": len(scan_templates()), } # --- Templates --- @app.get("/templates", response_model=list[TemplateInfo], tags=["templates"]) async def list_templates(category: Optional[str] = None): """List available Scribus templates.""" templates = scan_templates() if category: templates = [t for t in templates if t.category == category] return templates @app.get("/templates/{slug}", response_model=TemplateInfo, tags=["templates"]) async def get_template(slug: str): """Get template metadata.""" for t in scan_templates(): if t.slug == slug: return t raise HTTPException(404, f"Template '{slug}' not found") @app.get("/templates/{slug}/preview", tags=["templates"]) async def get_template_preview(slug: str): """Get template preview image.""" tpl_dir = TEMPLATES_DIR / slug if not tpl_dir.is_dir(): raise HTTPException(404, "Template not found") for ext in ("png", "jpg", "webp"): preview = tpl_dir / f"preview.{ext}" if preview.exists(): return FileResponse(preview) raise HTTPException(404, "No preview available") @app.post("/templates/upload", tags=["templates"]) async def upload_template( file: UploadFile = File(...), name: str = Query(...), description: str = Query(""), category: str = Query("general"), ): """Upload a new Scribus template (.sla file).""" if not file.filename or not file.filename.endswith(".sla"): raise HTTPException(400, "File must be a .sla Scribus document") slug = name.lower().replace(" ", "-").replace("_", "-") tpl_dir = TEMPLATES_DIR / slug tpl_dir.mkdir(parents=True, exist_ok=True) sla_path = tpl_dir / file.filename async with aiofiles.open(sla_path, "wb") as f: content = await file.read() await f.write(content) # Create metadata meta = { "name": name, "description": description, "category": category, "variables": [], "created": datetime.now(timezone.utc).strftime("%Y-%m-%d"), } meta_path = tpl_dir / "metadata.yaml" with open(meta_path, "w") as f: yaml.dump(meta, f, default_flow_style=False) return {"slug": slug, "message": f"Template '{name}' uploaded", "path": str(sla_path)} # --- IDML Import / Convert --- @app.post("/convert/idml", tags=["convert"]) async def convert_idml( file: UploadFile = File(...), output_sla: bool = Query(True, description="Save as Scribus .sla"), output_pdf: bool = Query(True, description="Export to PDF"), dpi: int = Query(300, ge=72, le=600), background_tasks: BackgroundTasks = None, ): """Upload an IDML file (InDesign interchange format) and convert to SLA/PDF. IDML is Adobe's open interchange format — export from InDesign via File → Save As → InDesign Markup (IDML). Scribus imports IDML natively. Note: Native .indd files cannot be converted directly. Use IDML instead. """ if not file.filename or not file.filename.lower().endswith((".idml", ".idms")): raise HTTPException(400, "File must be .idml or .idms (InDesign Markup). " "Native .indd files are not supported — export as IDML from InDesign first.") # Save uploaded file upload_dir = JOBS_DIR / "uploads" upload_dir.mkdir(parents=True, exist_ok=True) idml_path = upload_dir / f"{uuid.uuid4()}_{file.filename}" async with aiofiles.open(idml_path, "wb") as f: content = await file.read() await f.write(content) req = ConvertIdmlRequest(output_sla=output_sla, output_pdf=output_pdf, dpi=dpi) job_id = str(uuid.uuid4()) jobs[job_id] = { "job_id": job_id, "status": "queued", "progress": 0, "created_at": datetime.now(timezone.utc).isoformat(), "source": file.filename, "type": "idml_convert", } background_tasks.add_task(process_idml_convert, job_id, idml_path, req) return { "job_id": job_id, "status": "queued", "poll_url": f"{BASE_URL}/jobs/{job_id}", "note": "IDML import may take a few minutes for complex documents.", } # --- Export (single document) --- @app.post("/export", tags=["export"]) async def export_document(req: ExportRequest, background_tasks: BackgroundTasks): """Export a Scribus template to PDF/PNG (async job).""" job_id = str(uuid.uuid4()) jobs[job_id] = { "job_id": job_id, "status": "queued", "progress": 0, "created_at": datetime.now(timezone.utc).isoformat(), "template": req.template, } background_tasks.add_task(process_export_job, job_id, req) return {"job_id": job_id, "status": "queued", "poll_url": f"{BASE_URL}/jobs/{job_id}"} @app.post("/export/sync", tags=["export"]) async def export_document_sync(req: ExportRequest): """Export a Scribus template synchronously (blocks until done).""" job_id = str(uuid.uuid4()) jobs[job_id] = { "job_id": job_id, "status": "queued", "progress": 0, "created_at": datetime.now(timezone.utc).isoformat(), "template": req.template, } await process_export_job(job_id, req) return jobs[job_id] # --- Batch export --- @app.post("/export/batch", tags=["export"]) async def export_batch(req: BatchExportRequest, background_tasks: BackgroundTasks): """Batch export: one template + multiple data rows = multiple PDFs.""" job_id = str(uuid.uuid4()) jobs[job_id] = { "job_id": job_id, "status": "queued", "progress": 0, "created_at": datetime.now(timezone.utc).isoformat(), "template": req.template, "total_rows": len(req.rows), } background_tasks.add_task(process_batch_job, job_id, req) return {"job_id": job_id, "status": "queued", "total_rows": len(req.rows), "poll_url": f"{BASE_URL}/jobs/{job_id}"} # --- rSwag integration --- @app.get("/rswag/designs", tags=["rswag"]) async def list_rswag_designs(category: Optional[str] = None): """List available rSwag designs that can be exported to print-ready PDFs.""" designs = [] search_dirs = [RSWAG_DESIGNS_PATH] if category: search_dirs = [RSWAG_DESIGNS_PATH / category] for search_dir in search_dirs: if not search_dir.is_dir(): continue for cat_dir in sorted(search_dir.iterdir()): if not cat_dir.is_dir(): continue # If filtering by category, cat_dir IS the design dir design_dirs = [cat_dir] if category else sorted(cat_dir.iterdir()) for ddir in design_dirs: if not ddir.is_dir(): continue meta_path = ddir / "metadata.yaml" if not meta_path.exists(): continue try: with open(meta_path) as f: meta = yaml.safe_load(f) designs.append({ "slug": ddir.name, "name": meta.get("name", ddir.name), "category": cat_dir.name if not category else category, "description": meta.get("description", ""), "status": meta.get("status", "unknown"), }) except Exception: continue return designs @app.post("/rswag/export", tags=["rswag"]) async def export_rswag_design(req: RswagExportRequest, background_tasks: BackgroundTasks): """Export an rSwag design to a print-ready PDF with bleed and crop marks.""" job_id = str(uuid.uuid4()) jobs[job_id] = { "job_id": job_id, "status": "queued", "progress": 0, "created_at": datetime.now(timezone.utc).isoformat(), "design": req.design_slug, } background_tasks.add_task(process_rswag_export, job_id, req) return {"job_id": job_id, "status": "queued", "poll_url": f"{BASE_URL}/jobs/{job_id}"} # --- Jobs --- @app.get("/jobs/{job_id}", tags=["jobs"]) async def get_job(job_id: str): """Get job status.""" if job_id not in jobs: raise HTTPException(404, "Job not found") return jobs[job_id] @app.get("/jobs", tags=["jobs"]) async def list_jobs(limit: int = Query(20, le=100)): """List recent jobs.""" sorted_jobs = sorted(jobs.values(), key=lambda j: j["created_at"], reverse=True) return sorted_jobs[:limit] @app.delete("/jobs/{job_id}", tags=["jobs"]) async def delete_job(job_id: str): """Delete a job and its output.""" if job_id not in jobs: raise HTTPException(404, "Job not found") job = jobs.pop(job_id) if "result_url" in job: filename = job["result_url"].split("/")[-1] output_file = OUTPUT_DIR / filename if output_file.exists(): output_file.unlink() return {"message": "Job deleted"} # --- Output files --- @app.get("/output/{filename}", tags=["output"]) async def get_output(filename: str): """Serve an output file.""" file_path = OUTPUT_DIR / filename # Path traversal protection if not file_path.resolve().is_relative_to(OUTPUT_DIR.resolve()): raise HTTPException(403, "Access denied") if not file_path.exists(): raise HTTPException(404, "File not found") return FileResponse( file_path, headers={"Cache-Control": "public, max-age=86400"}, )