rswag-online/backend/app/services/design_service.py

301 lines
9.6 KiB
Python

"""Design service for reading designs from the designs directory."""
from pathlib import Path
from functools import lru_cache
import yaml
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.schemas.design import Design, DesignSource, DesignProduct
from app.schemas.product import Product, ProductVariant
from app.models.product import ProductOverride
settings = get_settings()
class DesignService:
"""Service for reading and managing designs."""
def __init__(self):
self.designs_path = settings.designs_dir
self._cache: dict[str, Design] = {}
def clear_cache(self):
"""Clear the design cache."""
self._cache.clear()
async def list_designs(
self,
status: str = "active",
category: str | None = None,
space: str | None = None,
) -> list[Design]:
"""List all designs from the designs directory."""
designs = []
if not self.designs_path.exists():
return designs
for category_dir in self.designs_path.iterdir():
if not category_dir.is_dir():
continue
# Filter by category if specified
if category and category_dir.name != category:
continue
for design_dir in category_dir.iterdir():
if not design_dir.is_dir():
continue
design = await self._load_design(design_dir, category_dir.name)
if design and design.status == status:
# Filter by space if specified
if space and space != "all":
if design.space != space and design.space != "all":
continue
designs.append(design)
return designs
async def get_design(self, slug: str) -> Design | None:
"""Get a single design by slug."""
# Check cache
if slug in self._cache:
return self._cache[slug]
# Search for the design
for category_dir in self.designs_path.iterdir():
if not category_dir.is_dir():
continue
design_dir = category_dir / slug
if design_dir.exists():
design = await self._load_design(design_dir, category_dir.name)
if design:
self._cache[slug] = design
return design
return None
async def get_design_image_path(self, slug: str) -> str | None:
"""Get the path to the design image file."""
design = await self.get_design(slug)
if not design:
return None
# Look for exported PNG first
for category_dir in self.designs_path.iterdir():
if not category_dir.is_dir():
continue
design_dir = category_dir / slug
if not design_dir.exists():
continue
# Check exports/300dpi first
export_path = design_dir / "exports" / "300dpi" / f"{slug}.png"
if export_path.exists():
return str(export_path)
# Check for source PNG
source_path = design_dir / design.source.file
if source_path.exists() and source_path.suffix.lower() == ".png":
return str(source_path)
# Check for any PNG in the directory
for png_file in design_dir.glob("*.png"):
return str(png_file)
return None
async def _load_design(self, design_dir: Path, category: str) -> Design | None:
"""Load a design from its directory."""
metadata_path = design_dir / "metadata.yaml"
if not metadata_path.exists():
return None
try:
with open(metadata_path) as f:
metadata = yaml.safe_load(f)
except Exception:
return None
if not metadata:
return None
slug = metadata.get("slug", design_dir.name)
# Parse source info
source_data = metadata.get("source", {})
source = DesignSource(
file=source_data.get("file", f"{slug}.svg"),
format=source_data.get("format", "svg"),
dimensions=source_data.get("dimensions", {"width": 0, "height": 0}),
dpi=source_data.get("dpi", 300),
color_profile=source_data.get("color_profile", "sRGB"),
)
# Parse products
products = []
for p in metadata.get("products", []):
products.append(
DesignProduct(
type=p.get("type", ""),
provider=p.get("provider", ""),
sku=str(p.get("sku", "")), # Convert to string (some SKUs are integers)
variants=p.get("variants", []),
retail_price=float(p.get("retail_price", 0)),
)
)
return Design(
slug=slug,
name=metadata.get("name", slug),
description=metadata.get("description", ""),
tags=metadata.get("tags", []),
category=category,
author=metadata.get("author", ""),
created=str(metadata.get("created", "")),
source=source,
products=products,
space=metadata.get("space", "default"),
status=metadata.get("status", "draft"),
image_url=f"/api/designs/{slug}/image",
)
async def list_products(
self,
category: str | None = None,
product_type: str | None = None,
space: str | None = None,
) -> list[Product]:
"""List all products (designs formatted for storefront)."""
designs = await self.list_designs(status="active", category=category, space=space)
products = []
for design in designs:
# Skip designs with no products
if not design.products:
continue
# Filter by product type if specified
matching_products = [
dp for dp in design.products
if not product_type or dp.type == product_type
]
if not matching_products:
continue
# Use the first matching product for base info, combine all variants
dp = matching_products[0]
all_variants = []
for mp in matching_products:
if mp.variants:
for v in mp.variants:
all_variants.append(
ProductVariant(
name=f"{v} ({mp.provider})",
sku=f"{mp.sku}-{v}",
provider=mp.provider,
price=mp.retail_price,
)
)
else:
all_variants.append(
ProductVariant(
name=f"default ({mp.provider})",
sku=mp.sku,
provider=mp.provider,
price=mp.retail_price,
)
)
products.append(
Product(
slug=design.slug,
name=design.name,
description=design.description,
category=design.category,
product_type=dp.type,
tags=design.tags,
image_url=design.image_url,
base_price=dp.retail_price,
variants=all_variants,
is_active=True,
)
)
return products
async def get_product(self, slug: str) -> Product | None:
"""Get a single product by slug."""
design = await self.get_design(slug)
if not design or not design.products:
return None
# Use the first product configuration
dp = design.products[0]
variants = [
ProductVariant(
name=v,
sku=f"{dp.sku}-{v}",
provider=dp.provider,
price=dp.retail_price,
)
for v in dp.variants
] if dp.variants else [
ProductVariant(
name="default",
sku=dp.sku,
provider=dp.provider,
price=dp.retail_price,
)
]
return Product(
slug=design.slug,
name=design.name,
description=design.description,
category=design.category,
product_type=dp.type,
tags=design.tags,
image_url=design.image_url,
base_price=dp.retail_price,
variants=variants,
is_active=True,
)
async def set_product_override(
self,
db: AsyncSession,
slug: str,
is_active: bool | None = None,
price_override: float | None = None,
):
"""Set a product override in the database."""
# Check if override exists
result = await db.execute(
select(ProductOverride).where(ProductOverride.slug == slug)
)
override = result.scalar_one_or_none()
if override:
if is_active is not None:
override.is_active = is_active
if price_override is not None:
override.price_override = price_override
else:
override = ProductOverride(
slug=slug,
is_active=is_active if is_active is not None else True,
price_override=price_override,
)
db.add(override)
await db.commit()