import json from aliyunsdkram.request.v20150501.CreateUserRequest import CreateUserRequest from aliyunsdkram.request.v20150501.CreateLoginProfileRequest import CreateLoginProfileRequest from aliyunsdkram.request.v20150501.GetUserRequest import GetUserRequest from utils.generators import generate_password from utils.logger import log_action CREDENTIALS_FILE = "ram_credentials.json" def create_ram_user(resource_name: str, region: str, client): log_action(f"🧪 创建 RAM 用户流程启动: {resource_name}") print(f"🧪 [RAM] 创建用户流程启动: {resource_name}") try: with open(CREDENTIALS_FILE, "r", encoding="utf-8") as f: credentials = json.load(f) print("📖 成功读取本地凭证") except: credentials = {} print("⚠️ 未读取到本地凭证文件,初始化为空") if resource_name in credentials: log_action(f"⚠️ RAM 用户已存在: {resource_name},从本地读取密码") print(f"⚠️ [RAM] 用户已存在于本地缓存: {resource_name}") return {"user": resource_name, "password": credentials[resource_name]} password = generate_password() print(f"🔐 生成密码成功: {password}") try: print("📨 创建 RAM 用户请求发送中...") req = CreateUserRequest() req.set_UserName(resource_name) req.set_DisplayName(resource_name) client.do_action_with_exception(req) log_action(f"✅ 创建 RAM 用户成功: {resource_name}") print(f"✅ [RAM] 创建成功: {resource_name}") except Exception as e: print("❌ 创建 RAM 用户失败") import traceback traceback.print_exc() if "EntityAlreadyExists.User" in str(e): raise RuntimeError(f"用户已存在但本地无密码记录:{resource_name}") raise try: print("🔧 设置 RAM 登录配置中...") login_req = CreateLoginProfileRequest() login_req.set_UserName(resource_name) login_req.set_Password(password) login_req.set_PasswordResetRequired(False) login_req.set_MFABindRequired(False) client.do_action_with_exception(login_req) log_action("✅ 设置 RAM 登录密码成功") print("✅ [RAM] 登录密码配置成功") except Exception as e: print("❌ 设置 RAM 密码失败") import traceback traceback.print_exc() raise RuntimeError(f"❌ 设置 RAM 密码失败: {e}") credentials[resource_name] = password with open(CREDENTIALS_FILE, "w", encoding="utf-8") as f: json.dump(credentials, f, indent=2, ensure_ascii=False) print("💾 已将密码写入本地凭证文件") return {"user": resource_name, "password": password} def get_account_uid_by_user_name(client, user_name: str): from utils.logger import log_action try: req = GetUserRequest() req.set_UserName(user_name) res = json.loads(client.do_action_with_exception(req)) log_action(f"📎 GetUser 返回: {res}") # 优先尝试 Arn arn = res.get("User", {}).get("Arn") if arn and ":" in arn: uid = arn.split(":")[4] log_action(f"✅ UID 来自 ARN: {uid}") return uid # 使用 UserId 作为替代 user_id = res.get("User", {}).get("UserId") if user_id: log_action(f"✅ UID 来自 UserId: {user_id}") return user_id raise ValueError("❌ 无法从 GetUser 返回中提取 UID") except Exception as e: import traceback traceback.print_exc() raise RuntimeError(f"❌ 获取账号 UID 失败: {e}")