from typing import Optional from loguru import logger from app.db.errors import EntityDoesNotExist from app.db.queries.queries import queries from app.db.repositories.base import BaseRepository from app.db.repositories.roles import RolesRepository from app.models.domain.users import User, UserInDB from app.core.config import get_app_settings class UsersRepository(BaseRepository): """ User repository with helpers for both public auth flows and admin features. """ def __init__(self, conn) -> None: super().__init__(conn) self._roles_repo = RolesRepository(conn) async def _attach_roles(self, user: Optional[UserInDB]) -> Optional[UserInDB]: if user and getattr(user, "id", None): if not user.roles: # 兜底从 user_roles/roles 联查,确保 roles 填充 rows = await self.connection.fetch( """ SELECT r.name FROM user_roles ur JOIN roles r ON r.id = ur.role_id WHERE ur.user_id = $1 ORDER BY r.name """, user.id, ) user.roles = [row["name"] for row in rows] return user async def get_user_by_email_optional(self, *, email: str) -> Optional[UserInDB]: user_row = await queries.get_user_by_email(self.connection, email=email) if not user_row: return None return await self._attach_roles(UserInDB(**user_row)) async def get_user_id_by_email(self, *, email: str) -> Optional[int]: user_id = await self.connection.fetchval( "SELECT id FROM users WHERE email = $1", email, ) return int(user_id) if user_id is not None else None async def get_user_by_id(self, *, id_: int) -> UserInDB: user_row = await queries.get_user_by_id(self.connection, id=id_) if not user_row: raise EntityDoesNotExist(f"user with id={id_} does not exist") return await self._attach_roles(UserInDB(**user_row)) async def get_user_by_email(self, *, email: str) -> UserInDB: user_row = await queries.get_user_by_email(self.connection, email=email) if not user_row: raise EntityDoesNotExist(f"user with email {email} does not exist") return await self._attach_roles(UserInDB(**user_row)) async def get_user_by_username(self, *, username: str) -> UserInDB: user_row = await queries.get_user_by_username( self.connection, username=username, ) if not user_row: raise EntityDoesNotExist(f"user with username {username} does not exist") return await self._attach_roles(UserInDB(**user_row)) async def create_user( self, *, username: str, email: str, password: str, ) -> UserInDB: user = UserInDB(username=username, email=email) user.change_password(password) async with self.connection.transaction(): user_row = await queries.create_new_user( self.connection, username=user.username, email=user.email, salt=user.salt, hashed_password=user.hashed_password, ) created = user.copy(update=dict(user_row)) created.roles = [] return created async def update_user( # noqa: WPS211 self, *, user: User, username: Optional[str] = None, email: Optional[str] = None, password: Optional[str] = None, bio: Optional[str] = None, image: Optional[str] = None, phone: Optional[str] = None, user_type: Optional[str] = None, company_name: Optional[str] = None, ) -> UserInDB: user_in_db = await self.get_user_by_username(username=user.username) user_in_db.username = username or user_in_db.username user_in_db.email = email or user_in_db.email user_in_db.bio = bio if bio is not None else user_in_db.bio user_in_db.image = image if image is not None else user_in_db.image user_in_db.phone = phone if phone is not None else user_in_db.phone user_in_db.user_type = user_type if user_type is not None else user_in_db.user_type user_in_db.company_name = company_name if company_name is not None else user_in_db.company_name if password: user_in_db.change_password(password) async with self.connection.transaction(): user_in_db.updated_at = await queries.update_user_by_username( self.connection, username=user.username, new_username=user_in_db.username, new_email=user_in_db.email, new_salt=user_in_db.salt, new_password=user_in_db.hashed_password, new_bio=user_in_db.bio, new_image=user_in_db.image, new_phone=user_in_db.phone, new_user_type=user_in_db.user_type, new_company_name=user_in_db.company_name, ) return await self._attach_roles(user_in_db) async def set_email_verified(self, *, email: str, verified: bool = True) -> None: await queries.set_user_email_verified( self.connection, email=email, verified=verified, ) async def update_user_by_id( # noqa: WPS211 self, *, user_id: int, username: Optional[str] = None, email: Optional[str] = None, password: Optional[str] = None, bio: Optional[str] = None, image: Optional[str] = None, phone: Optional[str] = None, user_type: Optional[str] = None, company_name: Optional[str] = None, ) -> UserInDB: user_in_db = await self.get_user_by_id(id_=user_id) user_in_db.username = username or user_in_db.username user_in_db.email = email or user_in_db.email user_in_db.bio = bio if bio is not None else user_in_db.bio user_in_db.image = image if image is not None else user_in_db.image user_in_db.phone = phone if phone is not None else user_in_db.phone user_in_db.user_type = user_type if user_type is not None else user_in_db.user_type user_in_db.company_name = company_name if company_name is not None else user_in_db.company_name if password: user_in_db.change_password(password) updated_row = await queries.admin_update_user_by_id( self.connection, id=user_id, new_username=user_in_db.username, new_email=user_in_db.email, new_salt=user_in_db.salt, new_password=user_in_db.hashed_password, new_bio=user_in_db.bio, new_image=user_in_db.image, new_phone=user_in_db.phone, new_user_type=user_in_db.user_type, new_company_name=user_in_db.company_name, ) return await self._attach_roles(UserInDB(**updated_row)) async def delete_user_by_id(self, *, user_id: int) -> None: await queries.admin_delete_user(self.connection, id=user_id)