feat: add dithering API with 12 algorithms and screen-print color separations

Adds on-demand dithering endpoint (Floyd-Steinberg, Atkinson, Bayer, etc.)
and screen-print color separation workflow with per-channel PNG exports.
Uses hitherdither library with in-memory LRU cache (max 200 entries).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jeff Emmett 2026-03-16 16:27:05 -07:00
parent 19c00b5ed9
commit 94fd3ad4d3
7 changed files with 477 additions and 1 deletions

View File

@ -7,6 +7,7 @@ WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
libpq-dev \
git \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies

153
backend/app/api/dither.py Normal file
View File

@ -0,0 +1,153 @@
"""Dithering API endpoints for on-demand dithering and screen-print separations."""
import io
import logging
from pathlib import Path
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import StreamingResponse
from app.schemas.dither import (
DitherAlgorithm,
DitherResponse,
PaletteMode,
ScreenPrintExport,
ScreenPrintRequest,
)
from app.services.design_service import DesignService
from app.services.dither_service import dither_design, generate_color_separations
logger = logging.getLogger(__name__)
router = APIRouter()
design_service = DesignService()
@router.get("/{slug}/dither")
async def get_dithered_design(
slug: str,
algorithm: DitherAlgorithm = DitherAlgorithm.FLOYD_STEINBERG,
palette: PaletteMode = PaletteMode.AUTO,
num_colors: int = Query(default=8, ge=2, le=32),
colors: str | None = Query(default=None, description="Comma-separated hex colors for custom palette"),
threshold: int = Query(default=64, ge=1, le=256),
order: int = Query(default=8, ge=2, le=16),
fresh: bool = False,
format: str = Query(default="image", description="'image' for PNG, 'json' for metadata"),
):
"""Apply dithering to a design image.
Returns a PNG image (default) or JSON metadata with image URL.
"""
image_path = await design_service.get_design_image_path(slug)
if not image_path or not Path(image_path).exists():
raise HTTPException(status_code=404, detail="Design not found")
custom_colors = [c.strip() for c in colors.split(",")] if colors else None
try:
png_bytes, meta = dither_design(
image_path=image_path,
slug=slug,
algorithm=algorithm,
palette_mode=palette,
num_colors=num_colors,
custom_colors=custom_colors,
threshold=threshold,
order=order,
fresh=fresh,
)
except Exception as e:
logger.error(f"Dithering failed for {slug}: {e}")
raise HTTPException(status_code=500, detail=f"Dithering failed: {e}")
if format == "json":
return DitherResponse(
slug=slug,
algorithm=algorithm,
palette_mode=palette,
num_colors=meta["num_colors"],
colors_used=meta["colors_used"],
cached=meta["cached"],
image_url=f"/api/designs/{slug}/dither?algorithm={algorithm.value}&palette={palette.value}&num_colors={num_colors}",
)
return StreamingResponse(
io.BytesIO(png_bytes),
media_type="image/png",
headers={"Cache-Control": "public, max-age=86400"},
)
@router.post("/{slug}/screen-print", response_model=ScreenPrintExport)
async def create_screen_print(slug: str, body: ScreenPrintRequest):
"""Generate color separations for screen printing."""
image_path = await design_service.get_design_image_path(slug)
if not image_path or not Path(image_path).exists():
raise HTTPException(status_code=404, detail="Design not found")
try:
composite_bytes, separations, colors = generate_color_separations(
image_path=image_path,
slug=slug,
num_colors=body.num_colors,
algorithm=body.algorithm,
spot_colors=body.spot_colors,
)
except Exception as e:
logger.error(f"Screen print generation failed for {slug}: {e}")
raise HTTPException(status_code=500, detail=f"Screen print generation failed: {e}")
base_url = f"/api/designs/{slug}/screen-print"
separation_urls = {color: f"{base_url}/{color}" for color in colors}
return ScreenPrintExport(
slug=slug,
num_colors=len(colors),
algorithm=body.algorithm,
colors=colors,
composite_url=f"{base_url}/composite",
separation_urls=separation_urls,
)
@router.get("/{slug}/screen-print/{channel}")
async def get_screen_print_channel(slug: str, channel: str):
"""Serve a screen-print separation channel image.
channel: "composite" or a hex color code (e.g. "FF0000")
"""
image_path = await design_service.get_design_image_path(slug)
if not image_path or not Path(image_path).exists():
raise HTTPException(status_code=404, detail="Design not found")
# Try to find cached separations — generate with defaults if not cached
try:
composite_bytes, separations, colors = generate_color_separations(
image_path=image_path,
slug=slug,
)
except Exception as e:
logger.error(f"Screen print channel failed for {slug}/{channel}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to generate separation: {e}")
if channel == "composite":
png_bytes = composite_bytes
filename = f"{slug}-composite.png"
else:
hex_upper = channel.upper().lstrip("#")
if hex_upper not in separations:
raise HTTPException(
status_code=404,
detail=f"Color channel '{channel}' not found. Available: {', '.join(separations.keys())}",
)
png_bytes = separations[hex_upper]
filename = f"{slug}-{hex_upper}.png"
return StreamingResponse(
io.BytesIO(png_bytes),
media_type="image/png",
headers={
"Cache-Control": "public, max-age=86400",
"Content-Disposition": f"attachment; filename={filename}",
},
)

View File

@ -6,7 +6,7 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import get_settings
from app.api import designs, products, cart, checkout, orders, webhooks, health, design_generator, upload, spaces
from app.api import designs, products, cart, checkout, orders, webhooks, health, design_generator, upload, spaces, dither
from app.api.admin import router as admin_router
settings = get_settings()
@ -52,6 +52,7 @@ app.include_router(webhooks.router, prefix="/api/webhooks", tags=["webhooks"])
app.include_router(design_generator.router, prefix="/api/design", tags=["design-generator"])
app.include_router(upload.router, prefix="/api/design", tags=["upload"])
app.include_router(spaces.router, prefix="/api/spaces", tags=["spaces"])
app.include_router(dither.router, prefix="/api/designs", tags=["dithering"])
app.include_router(admin_router, prefix="/api/admin", tags=["admin"])

View File

@ -10,6 +10,7 @@ from app.schemas.cart import (
CartItemResponse,
)
from app.schemas.order import OrderResponse, OrderItemResponse, OrderStatus
from app.schemas.dither import DitherAlgorithm, DitherResponse, PaletteMode, ScreenPrintExport
__all__ = [
"Design",
@ -25,4 +26,8 @@ __all__ = [
"OrderResponse",
"OrderItemResponse",
"OrderStatus",
"DitherAlgorithm",
"DitherResponse",
"PaletteMode",
"ScreenPrintExport",
]

View File

@ -0,0 +1,67 @@
"""Pydantic schemas for dithering API."""
from enum import Enum
from pydantic import BaseModel, Field
class DitherAlgorithm(str, Enum):
"""Supported dithering algorithms."""
FLOYD_STEINBERG = "floyd-steinberg"
ATKINSON = "atkinson"
STUCKI = "stucki"
BURKES = "burkes"
SIERRA = "sierra"
SIERRA_TWO_ROW = "sierra-two-row"
SIERRA_LITE = "sierra-lite"
JARVIS_JUDICE_NINKE = "jarvis-judice-ninke"
BAYER = "bayer"
YLILUOMA = "yliluoma"
ORDERED = "ordered"
CLUSTER_DOT = "cluster-dot"
class PaletteMode(str, Enum):
"""Palette generation modes."""
AUTO = "auto"
GRAYSCALE = "grayscale"
SPOT = "spot"
CUSTOM = "custom"
class DitherResponse(BaseModel):
"""Metadata response for dithered image."""
slug: str
algorithm: DitherAlgorithm
palette_mode: PaletteMode
num_colors: int
colors_used: list[str] = Field(description="Hex color codes used in the palette")
cached: bool
image_url: str
class ScreenPrintRequest(BaseModel):
"""Request body for screen print color separation."""
num_colors: int = Field(default=4, ge=2, le=12)
algorithm: DitherAlgorithm = DitherAlgorithm.FLOYD_STEINBERG
spot_colors: list[str] | None = Field(
default=None,
description="Optional list of hex colors to use as spot colors (e.g. ['FF0000', '00FF00'])",
)
class ScreenPrintExport(BaseModel):
"""Response for screen print color separation."""
slug: str
num_colors: int
algorithm: DitherAlgorithm
colors: list[str] = Field(description="Hex color codes for each separation channel")
composite_url: str
separation_urls: dict[str, str] = Field(
description="Map of hex color to separation image URL",
)

View File

@ -0,0 +1,247 @@
"""Dithering service — palette building, dithering, and color separations."""
import hashlib
import io
import logging
from collections import OrderedDict
import numpy as np
from PIL import Image
import hitherdither
from app.schemas.dither import DitherAlgorithm, PaletteMode
logger = logging.getLogger(__name__)
# In-memory cache: sha256(slug+params) → (png_bytes, metadata)
_MAX_CACHE = 200
_dither_cache: OrderedDict[str, tuple[bytes, dict]] = OrderedDict()
# Screen-print separation cache: sha256(slug+params) → {color_hex: png_bytes, "composite": png_bytes}
_separation_cache: OrderedDict[str, tuple[bytes, dict[str, bytes], list[str]]] = OrderedDict()
def _cache_key(*parts) -> str:
raw = "|".join(str(p) for p in parts)
return hashlib.sha256(raw.encode()).hexdigest()
def _evict(cache: OrderedDict, max_size: int = _MAX_CACHE):
while len(cache) > max_size:
cache.popitem(last=False)
def _hex_to_rgb(hex_str: str) -> tuple[int, int, int]:
h = hex_str.lstrip("#")
return (int(h[0:2], 16), int(h[2:4], 16), int(h[4:6], 16))
def _rgb_to_hex(r: int, g: int, b: int) -> str:
return f"{r:02X}{g:02X}{b:02X}"
def build_palette(
image: Image.Image,
mode: PaletteMode,
num_colors: int = 8,
custom_colors: list[str] | None = None,
) -> hitherdither.palette.Palette:
"""Build a hitherdither Palette from an image and mode."""
if mode == PaletteMode.CUSTOM and custom_colors:
rgb_colors = [_hex_to_rgb(c) for c in custom_colors]
return hitherdither.palette.Palette(rgb_colors)
if mode == PaletteMode.GRAYSCALE:
step = 255 // max(num_colors - 1, 1)
grays = [(i * step, i * step, i * step) for i in range(num_colors)]
return hitherdither.palette.Palette(grays)
if mode == PaletteMode.SPOT and custom_colors:
rgb_colors = [_hex_to_rgb(c) for c in custom_colors[:num_colors]]
return hitherdither.palette.Palette(rgb_colors)
# Auto mode: median-cut quantization
quantized = image.convert("RGB").quantize(colors=num_colors, method=Image.Quantize.MEDIANCUT)
palette_data = quantized.getpalette()
colors = []
for i in range(num_colors):
idx = i * 3
if idx + 2 < len(palette_data):
colors.append((palette_data[idx], palette_data[idx + 1], palette_data[idx + 2]))
if not colors:
colors = [(0, 0, 0), (255, 255, 255)]
return hitherdither.palette.Palette(colors)
def _get_palette_hex(palette: hitherdither.palette.Palette) -> list[str]:
"""Extract hex color strings from a palette."""
colors = []
for color in palette.colours:
r, g, b = int(color[0]), int(color[1]), int(color[2])
colors.append(_rgb_to_hex(r, g, b))
return colors
# Map algorithm enum to hitherdither function calls
_ERROR_DIFFUSION_ALGOS = {
DitherAlgorithm.FLOYD_STEINBERG: "floyd-steinberg",
DitherAlgorithm.ATKINSON: "atkinson",
DitherAlgorithm.STUCKI: "stucki",
DitherAlgorithm.BURKES: "burkes",
DitherAlgorithm.SIERRA: "sierra3",
DitherAlgorithm.SIERRA_TWO_ROW: "sierra2",
DitherAlgorithm.SIERRA_LITE: "sierra-2-4a",
DitherAlgorithm.JARVIS_JUDICE_NINKE: "jarvis-judice-ninke",
}
def apply_dither(
image: Image.Image,
palette: hitherdither.palette.Palette,
algorithm: DitherAlgorithm,
threshold: int = 64,
order: int = 8,
) -> Image.Image:
"""Apply a dithering algorithm to an image with the given palette."""
img_rgb = image.convert("RGB")
if algorithm in _ERROR_DIFFUSION_ALGOS:
method = _ERROR_DIFFUSION_ALGOS[algorithm]
result = hitherdither.diffusion.error_diffusion_dithering(
img_rgb, palette, method=method, order=2,
)
elif algorithm == DitherAlgorithm.ORDERED:
result = hitherdither.ordered.bayer.bayer_dithering(
img_rgb, palette, [threshold, threshold, threshold], order=order,
)
elif algorithm == DitherAlgorithm.BAYER:
result = hitherdither.ordered.bayer.bayer_dithering(
img_rgb, palette, [threshold, threshold, threshold], order=order,
)
elif algorithm == DitherAlgorithm.YLILUOMA:
result = hitherdither.ordered.yliluoma.yliluomas_1_ordered_dithering(
img_rgb, palette, order=order,
)
elif algorithm == DitherAlgorithm.CLUSTER_DOT:
result = hitherdither.ordered.cluster.cluster_dot_dithering(
img_rgb, palette, [threshold, threshold, threshold], order=order,
)
else:
# Fallback to Floyd-Steinberg
result = hitherdither.diffusion.error_diffusion_dithering(
img_rgb, palette, method="floyd-steinberg", order=2,
)
# hitherdither returns a PIL Image (indexed); convert to RGB
if hasattr(result, "convert"):
return result.convert("RGB")
return result
def dither_design(
image_path: str,
slug: str,
algorithm: DitherAlgorithm = DitherAlgorithm.FLOYD_STEINBERG,
palette_mode: PaletteMode = PaletteMode.AUTO,
num_colors: int = 8,
custom_colors: list[str] | None = None,
threshold: int = 64,
order: int = 8,
fresh: bool = False,
) -> tuple[bytes, dict]:
"""Dither a design image and return (png_bytes, metadata).
Results are cached by slug+params combination.
"""
key = _cache_key(slug, algorithm.value, palette_mode.value, num_colors,
",".join(custom_colors or []), threshold, order)
if not fresh and key in _dither_cache:
_dither_cache.move_to_end(key)
png_bytes, meta = _dither_cache[key]
meta["cached"] = True
return png_bytes, meta
image = Image.open(image_path)
palette = build_palette(image, palette_mode, num_colors, custom_colors)
dithered = apply_dither(image, palette, algorithm, threshold, order)
buf = io.BytesIO()
dithered.save(buf, format="PNG", optimize=True)
png_bytes = buf.getvalue()
colors_used = _get_palette_hex(palette)
meta = {
"slug": slug,
"algorithm": algorithm.value,
"palette_mode": palette_mode.value,
"num_colors": len(colors_used),
"colors_used": colors_used,
"cached": False,
}
_dither_cache[key] = (png_bytes, meta)
_evict(_dither_cache)
return png_bytes, meta
def generate_color_separations(
image_path: str,
slug: str,
num_colors: int = 4,
algorithm: DitherAlgorithm = DitherAlgorithm.FLOYD_STEINBERG,
spot_colors: list[str] | None = None,
) -> tuple[bytes, dict[str, bytes], list[str]]:
"""Generate color separations for screen printing.
Returns (composite_png_bytes, {hex_color: separation_png_bytes}, colors_list).
"""
palette_mode = PaletteMode.SPOT if spot_colors else PaletteMode.AUTO
key = _cache_key("sep", slug, num_colors, algorithm.value,
",".join(spot_colors or []))
if key in _separation_cache:
_separation_cache.move_to_end(key)
return _separation_cache[key]
image = Image.open(image_path)
palette = build_palette(image, palette_mode, num_colors, spot_colors)
dithered = apply_dither(image, palette, algorithm)
# Save composite
buf = io.BytesIO()
dithered.save(buf, format="PNG", optimize=True)
composite_bytes = buf.getvalue()
# Generate per-color separations
dithered_rgb = dithered.convert("RGB")
arr = np.array(dithered_rgb)
colors = _get_palette_hex(palette)
separations: dict[str, bytes] = {}
for hex_color in colors:
r, g, b = _hex_to_rgb(hex_color)
# Create mask where pixels match this color (with small tolerance)
mask = (
(np.abs(arr[:, :, 0].astype(int) - r) < 16) &
(np.abs(arr[:, :, 1].astype(int) - g) < 16) &
(np.abs(arr[:, :, 2].astype(int) - b) < 16)
)
# White background, color pixels where mask is true
sep_img = np.full_like(arr, 255)
sep_img[mask] = [r, g, b]
sep_pil = Image.fromarray(sep_img.astype(np.uint8))
buf = io.BytesIO()
sep_pil.save(buf, format="PNG", optimize=True)
separations[hex_color] = buf.getvalue()
result = (composite_bytes, separations, colors)
_separation_cache[key] = result
_evict(_separation_cache)
return result

View File

@ -27,6 +27,8 @@ pyyaml>=6.0.0
pillow>=10.0.0
python-multipart>=0.0.6
aiofiles>=23.0.0
hitherdither @ git+https://github.com/hbldh/hitherdither@master
numpy>=1.24.0
# Email
aiosmtplib>=3.0.0