"""Auth service: password hashing, JWT creation/verification.""" from __future__ import annotations from datetime import datetime, timedelta, timezone import bcrypt from jose import JWTError, jwt from sqlalchemy.ext.asyncio import AsyncSession from rehearsalhub.config import get_settings from rehearsalhub.db.models import Member from rehearsalhub.repositories.member import MemberRepository from rehearsalhub.schemas.auth import RegisterRequest, TokenResponse from rehearsalhub.services.avatar import AvatarService def hash_password(plain: str) -> str: return bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode() def verify_password(plain: str, hashed: str) -> bool: return bcrypt.checkpw(plain.encode(), hashed.encode()) def create_access_token(member_id: str, email: str) -> str: settings = get_settings() expire = datetime.now(timezone.utc) + timedelta(minutes=settings.access_token_expire_minutes) payload = { "sub": member_id, "email": email, "exp": expire, "iat": datetime.now(timezone.utc), } return jwt.encode(payload, settings.secret_key, algorithm=settings.jwt_algorithm) def decode_token(token: str) -> dict: settings = get_settings() return jwt.decode(token, settings.secret_key, algorithms=[settings.jwt_algorithm]) class AuthService: def __init__(self, session: AsyncSession) -> None: self._repo = MemberRepository(session) self._session = session async def register(self, req: RegisterRequest) -> Member: if await self._repo.email_exists(req.email): raise ValueError(f"Email already registered: {req.email}") # Create member without avatar first member = await self._repo.create( email=req.email.lower(), display_name=req.display_name, password_hash=hash_password(req.password), ) # Generate default avatar for new member avatar_service = AvatarService() avatar_url = await avatar_service.generate_default_avatar(member) # Update member with avatar URL member.avatar_url = avatar_url await self._session.flush() return member async def login(self, email: str, password: str) -> TokenResponse | None: member = await self._repo.get_by_email(email) if member is None or not verify_password(password, member.password_hash): return None token = create_access_token(str(member.id), member.email) return TokenResponse(access_token=token) async def get_member_from_token(self, token: str) -> Member | None: try: payload = decode_token(token) member_id = payload.get("sub") if member_id is None: return None return await self._repo.get_by_id(__import__("uuid").UUID(member_id)) except (JWTError, ValueError): return None