2025-12-10 12:02:17 +08:00

121 lines
4.7 KiB
Python

from typing import List
from fastapi import APIRouter, Depends, HTTPException, Request, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from ..auth.jwt_utils import get_password_hash
from ..db import get_session
from ..dependencies import AuthUser, require_roles
from ..models import AuditAction, AuditResourceType, Role, RoleName, User
from ..schemas import UserCreate, UserOut, UserUpdate
from ..utils.audit import create_audit_log
router = APIRouter(prefix="/api/v1/users", tags=["users"])
def _validate_role_for_user(actor: AuthUser, target_role: Role) -> None:
if actor.role_name != RoleName.ADMIN.value and target_role.name == RoleName.ADMIN.value:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only ADMIN can assign ADMIN role")
@router.get("", response_model=List[UserOut])
async def list_users(
session: AsyncSession = Depends(get_session),
auth_user: AuthUser = Depends(require_roles([RoleName.ADMIN, RoleName.CUSTOMER_ADMIN])),
) -> List[UserOut]:
query = select(User).options(selectinload(User.role), selectinload(User.customer))
if auth_user.role_name != RoleName.ADMIN.value:
query = query.where(User.customer_id == auth_user.customer_id)
users = (await session.scalars(query)).all()
return [UserOut.model_validate(u) for u in users]
@router.post("", response_model=UserOut, status_code=status.HTTP_201_CREATED)
async def create_user(
payload: UserCreate,
request: Request,
session: AsyncSession = Depends(get_session),
auth_user: AuthUser = Depends(require_roles([RoleName.ADMIN, RoleName.CUSTOMER_ADMIN])),
) -> UserOut:
role = await session.scalar(select(Role).where(Role.id == payload.role_id))
if not role:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role not found")
_validate_role_for_user(auth_user, role)
customer_id = payload.customer_id or auth_user.customer_id
if auth_user.role_name != RoleName.ADMIN.value:
customer_id = auth_user.customer_id
if not customer_id and role.name != RoleName.ADMIN.value:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Customer is required")
user = User(
username=payload.username,
email=payload.email,
role_id=role.id,
customer_id=customer_id,
password_hash=get_password_hash(payload.password),
)
session.add(user)
await session.commit()
await session.refresh(user)
await create_audit_log(
session,
user_id=auth_user.user.id,
customer_id=customer_id,
action=AuditAction.USER_CREATE,
resource_type=AuditResourceType.USER,
resource_id=user.id,
description=f"Create user {user.username}",
request=request,
)
await session.commit()
await session.refresh(user)
return UserOut.model_validate(user)
@router.put("/{user_id}", response_model=UserOut)
async def update_user(
user_id: int,
payload: UserUpdate,
request: Request,
session: AsyncSession = Depends(get_session),
auth_user: AuthUser = Depends(require_roles([RoleName.ADMIN, RoleName.CUSTOMER_ADMIN])),
) -> UserOut:
user = await session.get(User, user_id)
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
if auth_user.role_name != RoleName.ADMIN.value and user.customer_id != auth_user.customer_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot manage other customers")
update_data = payload.model_dump(exclude_unset=True)
if "role_id" in update_data:
role = await session.get(Role, update_data["role_id"])
if not role:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role not found")
_validate_role_for_user(auth_user, role)
if "customer_id" in update_data and auth_user.role_name != RoleName.ADMIN.value:
update_data["customer_id"] = auth_user.customer_id
if "password" in update_data and update_data["password"]:
user.password_hash = get_password_hash(update_data.pop("password"))
for field, value in update_data.items():
setattr(user, field, value)
await session.commit()
await session.refresh(user)
await create_audit_log(
session,
user_id=auth_user.user.id,
customer_id=user.customer_id,
action=AuditAction.USER_UPDATE,
resource_type=AuditResourceType.USER,
resource_id=user.id,
description=f"Update user {user.username}",
payload=update_data,
request=request,
)
await session.commit()
await session.refresh(user)
return UserOut.model_validate(user)