rswag-online/backend/app/services/auth_service.py

91 lines
2.7 KiB
Python

"""Authentication service for admin users."""
from datetime import datetime, timedelta
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.hash import bcrypt
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.database import get_db
from app.models.admin import AdminUser
settings = get_settings()
security = HTTPBearer()
class AuthService:
"""Service for admin authentication."""
def __init__(self, db: AsyncSession):
self.db = db
async def authenticate(self, email: str, password: str) -> str | None:
"""Authenticate admin user and return JWT token."""
result = await self.db.execute(
select(AdminUser).where(AdminUser.email == email)
)
admin = result.scalar_one_or_none()
if not admin or not admin.is_active:
return None
if not bcrypt.verify(password, admin.password_hash):
return None
# Create JWT token
expire = datetime.utcnow() + timedelta(hours=settings.jwt_expire_hours)
payload = {
"sub": str(admin.id),
"email": admin.email,
"exp": expire,
}
token = jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)
return token
async def verify_token(self, token: str) -> AdminUser | None:
"""Verify JWT token and return admin user."""
try:
payload = jwt.decode(
token,
settings.jwt_secret,
algorithms=[settings.jwt_algorithm],
)
admin_id = payload.get("sub")
if not admin_id:
return None
result = await self.db.execute(
select(AdminUser).where(AdminUser.id == admin_id)
)
admin = result.scalar_one_or_none()
if not admin or not admin.is_active:
return None
return admin
except JWTError:
return None
@staticmethod
def hash_password(password: str) -> str:
"""Hash a password."""
return bcrypt.hash(password)
async def get_current_admin(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db),
) -> AdminUser:
"""Dependency to get current authenticated admin."""
auth_service = AuthService(db)
admin = await auth_service.verify_token(credentials.credentials)
if not admin:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
)
return admin