2025-12-04 10:09:04 +08:00

104 lines
3.9 KiB
Python

# resource_group_service.py
import time
import json
from aliyunsdkresourcemanager.request.v20200331.CreateResourceGroupRequest import CreateResourceGroupRequest
from aliyunsdkresourcemanager.request.v20200331.ListResourceGroupsRequest import ListResourceGroupsRequest
from aliyunsdkram.request.v20150501.CreatePolicyRequest import CreatePolicyRequest
from aliyunsdkram.request.v20150501.AttachPolicyToUserRequest import AttachPolicyToUserRequest
from aliyunsdkram.request.v20150501.ListPoliciesRequest import ListPoliciesRequest
from utils.logger import log_action
from services.ram_service import get_account_uid_by_user_name
def create_resource_group(resource_name, client) -> str:
try:
req = CreateResourceGroupRequest()
req.set_DisplayName(resource_name)
req.set_Name(resource_name)
client.do_action_with_exception(req)
log_action(f"✅ 创建资源组: {resource_name}")
except Exception as e:
if "EntityAlreadyExists.ResourceGroup" in str(e):
log_action(f"⚠️ 资源组已存在: {resource_name},跳过创建")
else:
raise RuntimeError(f"❌ 创建资源组失败: {e}")
for attempt in range(15):
try:
page = 1
while True:
req = ListResourceGroupsRequest()
req.set_PageNumber(page)
req.set_PageSize(50)
res = client.do_action_with_exception(req)
groups = json.loads(res)['ResourceGroups']['ResourceGroup']
for g in groups:
if g['Name'] == resource_name:
return g['Id']
if len(groups) < 50:
break
page += 1
except Exception:
time.sleep(3)
raise RuntimeError("❌ 无法获取资源组 ID")
def create_custom_ecs_policy_for_group(resource_group_id: str, client) -> str:
policy_name = f"ECSReadOnlyAccess-{resource_group_id[-6:]}"
doc = {
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": ["ecs:DescribeInstances", "ecs:DescribeInstanceAttribute"],
"Resource": "*",
"Condition": {
"StringEquals": {"acs:ResourceGroupId": resource_group_id}
}
}
]
}
try:
req = CreatePolicyRequest()
req.set_PolicyName(policy_name)
req.set_Description("限制 ECS 只读策略")
req.set_PolicyDocument(json.dumps(doc))
client.do_action_with_exception(req)
log_action(f"✅ 创建策略 {policy_name}")
except Exception as e:
if "EntityAlreadyExists.Policy" in str(e):
log_action(f"⚠️ 策略 {policy_name} 已存在")
else:
raise RuntimeError(f"❌ 创建策略失败: {e}")
for _ in range(10):
time.sleep(2)
try:
list_req = ListPoliciesRequest()
list_req.set_PolicyType("Custom")
res = client.do_action_with_exception(list_req)
names = [p['PolicyName'] for p in json.loads(res)['Policies']['Policy']]
if policy_name in names:
return policy_name
except Exception:
continue
raise RuntimeError("❌ 策略创建后未生效")
def attach_policy_to_user(username, policy_name, client):
log_action(f"🔐 正在绑定策略给用户: {username}")
try:
req = AttachPolicyToUserRequest()
req.set_PolicyType("Custom")
req.set_PolicyName(policy_name)
req.set_UserName(username) # 直接用用户名绑定
client.do_action_with_exception(req)
log_action(f"✅ 用户 {username} 成功绑定策略: {policy_name}")
except Exception as e:
if "EntityAlreadyExists.User.Policy" in str(e):
log_action(f"⚠️ 用户 {username} 已绑定策略: {policy_name}")
else:
raise RuntimeError(f"❌ 策略绑定失败: {e}")