71 lines
2.5 KiB
Python
71 lines
2.5 KiB
Python
from fastapi import Depends, HTTPException, status
|
|
from loguru import logger
|
|
|
|
from app.api.dependencies.authentication import get_current_user_authorizer
|
|
from app.api.dependencies.database import get_repository
|
|
from app.db.repositories.roles import RolesRepository
|
|
from app.models.domain.users import User
|
|
from app.core.config import get_app_settings
|
|
|
|
ADMIN_ROLE_NAME = "admin"
|
|
|
|
|
|
async def get_admin_user(
|
|
current_user: User = Depends(get_current_user_authorizer()),
|
|
roles_repo: RolesRepository = Depends(get_repository(RolesRepository)),
|
|
) -> User:
|
|
user_id = getattr(current_user, "id", None) or getattr(current_user, "id_", None)
|
|
if not user_id:
|
|
logger.warning("[AdminAccess] missing user_id, deny")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Insufficient permissions",
|
|
)
|
|
|
|
# 调试日志,观察鉴权上下文
|
|
logger.info(
|
|
"[AdminAccess] current_user id={id} email={email} roles={roles}",
|
|
id=user_id,
|
|
email=getattr(current_user, "email", None),
|
|
roles=getattr(current_user, "roles", None),
|
|
)
|
|
try:
|
|
logger.info(
|
|
"[AdminAccess] current_user dump={}",
|
|
current_user.model_dump() if hasattr(current_user, "model_dump") else vars(current_user),
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("[AdminAccess] dump error: {}", exc)
|
|
|
|
# 先看用户自身的 roles 是否已包含 admin
|
|
roles_from_user = getattr(current_user, "roles", []) or []
|
|
if isinstance(roles_from_user, (list, tuple)) and "admin" in roles_from_user:
|
|
logger.info("[AdminAccess] allow via user.roles contains admin")
|
|
return current_user
|
|
|
|
# 再看是否在信任邮箱名单,避免“还未赋权”时卡死
|
|
settings = get_app_settings()
|
|
trusted_emails = {
|
|
email.strip().lower()
|
|
for email in getattr(settings, "admin_emails", []) or []
|
|
if email
|
|
}
|
|
email = getattr(current_user, "email", None)
|
|
if email and email.lower() in trusted_emails:
|
|
logger.info("[AdminAccess] allow via trusted email list")
|
|
return current_user
|
|
|
|
has_role = await roles_repo.user_has_role(
|
|
user_id=user_id,
|
|
role_name=ADMIN_ROLE_NAME,
|
|
)
|
|
if has_role:
|
|
logger.info("[AdminAccess] allow via DB role check")
|
|
return current_user
|
|
|
|
logger.warning("[AdminAccess] deny id={id} email={email}", id=user_id, email=email)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Insufficient permissions",
|
|
)
|