91 lines
2.7 KiB
Python
91 lines
2.7 KiB
Python
"""Authentication service for admin users."""
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from jose import JWTError, jwt
|
|
from passlib.hash import bcrypt
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.config import get_settings
|
|
from app.database import get_db
|
|
from app.models.admin import AdminUser
|
|
|
|
settings = get_settings()
|
|
security = HTTPBearer()
|
|
|
|
|
|
class AuthService:
|
|
"""Service for admin authentication."""
|
|
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
|
|
async def authenticate(self, email: str, password: str) -> str | None:
|
|
"""Authenticate admin user and return JWT token."""
|
|
result = await self.db.execute(
|
|
select(AdminUser).where(AdminUser.email == email)
|
|
)
|
|
admin = result.scalar_one_or_none()
|
|
|
|
if not admin or not admin.is_active:
|
|
return None
|
|
|
|
if not bcrypt.verify(password, admin.password_hash):
|
|
return None
|
|
|
|
# Create JWT token
|
|
expire = datetime.utcnow() + timedelta(hours=settings.jwt_expire_hours)
|
|
payload = {
|
|
"sub": str(admin.id),
|
|
"email": admin.email,
|
|
"exp": expire,
|
|
}
|
|
token = jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)
|
|
return token
|
|
|
|
async def verify_token(self, token: str) -> AdminUser | None:
|
|
"""Verify JWT token and return admin user."""
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
settings.jwt_secret,
|
|
algorithms=[settings.jwt_algorithm],
|
|
)
|
|
admin_id = payload.get("sub")
|
|
if not admin_id:
|
|
return None
|
|
|
|
result = await self.db.execute(
|
|
select(AdminUser).where(AdminUser.id == admin_id)
|
|
)
|
|
admin = result.scalar_one_or_none()
|
|
if not admin or not admin.is_active:
|
|
return None
|
|
|
|
return admin
|
|
except JWTError:
|
|
return None
|
|
|
|
@staticmethod
|
|
def hash_password(password: str) -> str:
|
|
"""Hash a password."""
|
|
return bcrypt.hash(password)
|
|
|
|
|
|
async def get_current_admin(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> AdminUser:
|
|
"""Dependency to get current authenticated admin."""
|
|
auth_service = AuthService(db)
|
|
admin = await auth_service.verify_token(credentials.credentials)
|
|
if not admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid or expired token",
|
|
)
|
|
return admin
|