2025-12-04 10:09:04 +08:00

486 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# services/ecs_service.py
import uuid
import time
import json
from datetime import datetime
from typing import List, Dict, Any
from decimal import Decimal, InvalidOperation
import traceback
from aliyunsdkcore.client import AcsClient
from aliyunsdkecs.request.v20140526.CreateInstanceRequest import CreateInstanceRequest
from aliyunsdkecs.request.v20140526.RunInstancesRequest import RunInstancesRequest
from aliyunsdkecs.request.v20140526.AllocatePublicIpAddressRequest import AllocatePublicIpAddressRequest
from aliyunsdkecs.request.v20140526.StartInstanceRequest import StartInstanceRequest
from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest
from aliyunsdkecs.request.v20140526.DescribeSecurityGroupsRequest import DescribeSecurityGroupsRequest
from aliyunsdkecs.request.v20140526.DescribeImagesRequest import DescribeImagesRequest
from aliyunsdkecs.request.v20140526.DescribeAvailableResourceRequest import DescribeAvailableResourceRequest
from aliyunsdkecs.request.v20140526.DescribeVSwitchesRequest import DescribeVSwitchesRequest
from aliyunsdkecs.request.v20140526.DescribeVpcsRequest import DescribeVpcsRequest
from fastapi import HTTPException
from services.ram_service import create_ram_user
from services.resource_group_service import (
create_resource_group,
create_custom_ecs_policy_for_group,
attach_policy_to_user
)
from services.resource_transfer_service import transfer_instance_to_group
from utils.generators import generate_instance_name, generate_password
from utils.logger import log_action
from aliyunsdkcore.request import CommonRequest
from decimal import Decimal
from aliyunsdkcore.acs_exception.exceptions import ServerException, ClientException
DEFAULT_IMAGE = "ubuntu_22_04_x64_20G_alibase_20250415.vhd"
DEFAULT_INSTANCE_TYPE = "ecs.u1-c1m2.xlarge"
DEFAULT_SYSTEM_DISK = 500
DEFAULT_BANDWIDTH = 1
DEFAULT_SECURITY_GROUP = "or_chain"
DEFAULT_USERNAME = "root"
# 阿里云 CreateInstance 订阅场景只接受 PeriodUnit=Month12 个月等价年付
DEFAULT_PERIOD = 12
DEFAULT_IMAGE_NAME = "Ubuntu 22.04 x64 20G 官方"
def list_custom_and_shared_images(
access_key: str,
secret_key: str,
region: str = "ap-southeast-1",
page_size: int = 100
) -> List[Dict[str, str]]:
"""
查询用户自建镜像和共享给用户的镜像,并注入默认官方镜像:
- 默认镜像ImageId=DEFAULT_IMAGE显示为 “Ubuntu22.04… 官方 (胜总项目)”
- 其他镜像:显示为 “<原名称> (OR项目)”
"""
client = AcsClient(access_key, secret_key, region)
all_images = []
# 拉自建 + 共享
for alias in ("self", "others"):
req = DescribeImagesRequest()
req.add_query_param("ImageOwnerAlias", alias)
req.add_query_param("PageSize", page_size)
try:
resp = client.do_action_with_exception(req)
data = json.loads(resp)
all_images.extend(data.get("Images", {}).get("Image", []))
except ServerException as e:
log_action(f"[ERROR] alias={alias} 拉取失败:{e}")
continue
# 去重
unique: Dict[str, Dict[str, str]] = {}
for img in all_images:
iid = img.get("ImageId")
if not iid or iid in unique:
continue
unique[iid] = {
"ImageId": iid,
"ImageName": img.get("ImageName", iid)
}
# 构建带标记的列表:默认镜像优先,其它紧随其后
annotated: List[Dict[str, str]] = []
# 1. 默认官方镜像
annotated.append({
"ImageId": DEFAULT_IMAGE,
"ImageName": f"{DEFAULT_IMAGE_NAME} (胜总项目)"
})
# 2. 其余去重后的镜像,统一标注 OR 项目
for iid, info in unique.items():
if iid == DEFAULT_IMAGE:
continue
annotated.append({
"ImageId": iid,
"ImageName": f"{info['ImageName']} (OR项目)"
})
return annotated
def get_account_credit(
access_key: str,
secret_key: str,
region: str = "ap-southeast-1"
) -> Decimal:
"""
调用 BSSOpenAPI QueryAccountBalance
使用用户提供的access_key和secret_key
"""
try:
# 使用传入的access_key和secret_key来创建客户端
client = AcsClient(access_key, secret_key, region)
# 创建请求对象
req = CommonRequest()
req.set_protocol_type("https")
req.set_accept_format("json")
req.set_method("POST")
req.set_domain("business.ap-southeast-1.aliyuncs.com")
req.set_version("2017-12-14")
req.set_action_name("QueryAccountBalance")
# 发送请求并获取响应
resp_bytes = client.do_action_with_exception(req)
resp = json.loads(resp_bytes)
# 打印响应内容,方便调试
print(f"API Response: {resp}")
# 解析响应数据
data = resp.get("Data", {})
available_amount = data.get("AvailableAmount", "0.00")
# 去除可能的逗号,确保数字格式正确
available_amount_clean = available_amount.replace(',', '')
# 确保返回值是有效的数字
try:
available_amount_decimal = Decimal(available_amount_clean)
except InvalidOperation:
# 如果无法转换为Decimal直接返回无效金额值
print(f"无效的余额值:{available_amount_clean}")
return available_amount_clean # 直接返回无效金额值
# 返回可用余额
return available_amount_decimal
except (ServerException, ClientException) as e:
# 捕获客户端或服务器端异常并输出错误
error_message = getattr(e, 'error_message', str(e)) # 获取错误消息,如果不存在则使用异常本身
print(f"API请求失败{error_message}")
raise HTTPException(status_code=400, detail=f"API请求失败{error_message}")
except ValueError as ve:
# 如果解析出无效的余额值,直接返回
print(f"无效的余额值:{ve}")
return "Invalid available amount" # 返回一个无效金额的字符串
except Exception as e:
# 捕获其他异常并输出错误
print(f"发生未知错误:{e}")
raise HTTPException(status_code=500, detail=f"发生未知错误:{e}")
def wait_instance_stopped(client, region, instance_id, timeout=60):
req = DescribeInstancesRequest()
req.set_InstanceIds(json.dumps([instance_id]))
print("⏳ 等待实例进入 Stopped 状态...")
for _ in range(timeout // 3):
res = client.do_action_with_exception(req)
status = json.loads(res)["Instances"]["Instance"][0]["Status"]
print(f" 当前状态: {status}")
if status == "Stopped":
print("✅ 实例已进入 Stopped 状态")
return
time.sleep(3)
raise RuntimeError("❌ 等待实例进入 Stopped 状态超时")
def get_security_group_id_by_name(group_name, region, client):
print("🔍 获取安全组 ID 中...")
req = DescribeSecurityGroupsRequest()
req.set_PageSize(50)
res = client.do_action_with_exception(req)
groups = json.loads(res)['SecurityGroups']['SecurityGroup']
for g in groups:
if g['SecurityGroupName'] == group_name:
print(f"✅ 找到安全组 {group_name}: {g['SecurityGroupId']}")
return g['SecurityGroupId']
raise ValueError(f"❌ 未找到安全组名称: {group_name}")
def pick_prepaid_zone(
client,
region: str,
instance_type: str,
network_category: str = "classic",
preferred_zone: str | None = None
) -> str | None:
"""
查询支持 PrePaid 的可用区,优先返回第一个可用的 ZoneId。
如果找不到,则抛出明确错误,避免直接用不支持订阅的组合导致 InvalidInstanceChargeType。
"""
req = DescribeAvailableResourceRequest()
# 某些 SDK 版本无 set_RegionId使用 add_query_param 传 RegionId
req.add_query_param("RegionId", region)
req.set_DestinationResource("InstanceType")
req.set_InstanceChargeType("PrePaid")
req.set_InstanceType(instance_type)
req.add_query_param("NetworkCategory", network_category)
res = client.do_action_with_exception(req)
data = json.loads(res)
zones = data.get("AvailableZones", {}).get("AvailableZone", [])
for zone in zones:
if zone.get("Status") != "Available":
continue
resources = zone.get("AvailableResources", {}).get("AvailableResource", [])
for r in resources:
if r.get("Type") != "instance-type":
continue
for status in r.get("SupportedResources", {}).get("SupportedResource", []):
if status.get("Status") == "Available" and status.get("Value") == instance_type:
if preferred_zone and preferred_zone == zone["ZoneId"]:
return preferred_zone
if not preferred_zone:
return zone["ZoneId"]
# 如果 API 没返回可用区,但用户明确指定了区,允许回退使用该区尝试下单(与控制台一致)
if preferred_zone:
log_action(f"⚠️ 资源查询未返回可用区,回退使用用户指定的可用区 {preferred_zone}")
return preferred_zone
# 无可用区且未指定,返回 None 由调用方决定如何处理
log_action("⚠️ 资源查询未返回任何支持包年包月的可用区")
return None
def get_vswitch_zone_id(vswitch_id: str, client) -> str:
req = DescribeVSwitchesRequest()
req.add_query_param("VSwitchId", vswitch_id)
res = client.do_action_with_exception(req)
data = json.loads(res)
vswitches = data.get("VSwitches", {}).get("VSwitch", [])
if not vswitches:
raise RuntimeError(f"❌ 未找到交换机 {vswitch_id}")
return vswitches[0].get("ZoneId")
def list_vpcs_and_vswitches(access_key: str, secret_key: str, region: str) -> Dict[str, Any]:
"""
拉取指定地域的 VPC 与其下交换机列表,方便前端下拉选择。
"""
client = AcsClient(access_key, secret_key, region)
result: Dict[str, Any] = {"vpcs": [], "vswitches": []}
# VPC 列表
try:
vpc_req = DescribeVpcsRequest()
if hasattr(vpc_req, "set_RegionId"):
vpc_req.set_RegionId(region)
else:
vpc_req.add_query_param("RegionId", region)
vpc_res = client.do_action_with_exception(vpc_req)
vpc_data = json.loads(vpc_res)
result["vpcs"] = vpc_data.get("Vpcs", {}).get("Vpc", [])
except Exception as e:
log_action(f"⚠️ 拉取 VPC 列表失败: {e}")
# VSwitch 列表:如果有 VPC则按 VPC 逐个查询,避免 Region 空查询结果
try:
vs_list = []
vpc_ids = [v["VpcId"] for v in result["vpcs"]] or [None]
for vpc_id in vpc_ids:
vsw_req = DescribeVSwitchesRequest()
if hasattr(vsw_req, "set_RegionId"):
vsw_req.set_RegionId(region)
else:
vsw_req.add_query_param("RegionId", region)
if vpc_id:
vsw_req.add_query_param("VpcId", vpc_id)
vsw_req.add_query_param("PageSize", 100)
vsw_req.add_query_param("PageNumber", 1)
vsw_res = client.do_action_with_exception(vsw_req)
vsw_data = json.loads(vsw_res)
vs_list.extend(vsw_data.get("VSwitches", {}).get("VSwitch", []))
result["vswitches"] = vs_list
except Exception as e:
log_action(f"⚠️ 拉取 VSwitch 列表失败: {e}")
# 如果列表仍为空,使用 VPC 返回的 VSwitchId 逐个查属性兜底
if not result["vswitches"] and result["vpcs"]:
try:
vsw_ids = []
for vpc in result["vpcs"]:
vsw_ids.extend(vpc.get("VSwitchIds", {}).get("VSwitchId", []))
seen = set()
vsw_details = []
for vsw_id in vsw_ids:
if vsw_id in seen:
continue
seen.add(vsw_id)
# 退化为按 VSwitchId 查询列表接口
attr_req = DescribeVSwitchesRequest()
attr_req.add_query_param("RegionId", region)
attr_req.add_query_param("VSwitchId", vsw_id)
attr_req.add_query_param("PageSize", 10)
attr_res = client.do_action_with_exception(attr_req)
attr_data = json.loads(attr_res)
items = attr_data.get("VSwitches", {}).get("VSwitch", [])
vsw_details.extend(items)
if vsw_details:
result["vswitches"] = vsw_details
except Exception as e:
log_action(f"⚠️ 兜底查询 VSwitch 属性失败: {e}")
return result
def create_ecs_instances(
customer: str,
resource_name: str,
region: str,
access_key: str,
secret_key: str,
count: int = 1,
account_uid: str | None = None,
image_id: str = DEFAULT_IMAGE,
zone_id: str | None = None,
vswitch_id: str | None = None,
) -> List[Dict[str, Any]]:
"""
批量创建 ECS 实例。
参数:
- customer: 客户名称(用于生成实例名)
- resource_name: RAM 用户名 & 资源组名
- region: ECS 区域
- access_key / secret_key: 阿里云凭证
- count: 要创建的实例数量
- account_uid: 如果有,则用于拼接 RAM 用户邮箱
- image_id: 要使用的镜像 ID前端下拉选中
返回:
- List of instance_info 字典,包含 ip, name, region, instance_id, user, password, created, email, ram_user, ram_password
"""
results = []
try:
client = AcsClient(access_key, secret_key, region)
log_action("🚀 开始创建 ECS 资源流程")
# 1. 创建 RAM 用户
ram_info = create_ram_user(resource_name, region, client)
log_action("✅ 创建 RAM 用户完成")
# 2. 创建资源组并绑定策略
rg_id = create_resource_group(resource_name, client)
policy_name = create_custom_ecs_policy_for_group(rg_id, client)
attach_policy_to_user(resource_name, policy_name, client)
log_action("✅ 资源组 & 策略绑定完成")
# 3. 获取安全组 ID
sg_id = get_security_group_id_by_name(
DEFAULT_SECURITY_GROUP, region, client)
log_action(f"✅ 获取安全组成功: {sg_id}")
# 3.1 选择支持订阅的可用区,优先使用传入的交换机/可用区信息
picked_zone_id = zone_id
network_category = "classic"
if vswitch_id:
network_category = "vpc"
if not picked_zone_id:
picked_zone_id = get_vswitch_zone_id(vswitch_id, client)
log_action(f"🔍 通过交换机获取可用区: {picked_zone_id}")
picked_zone_id = pick_prepaid_zone(
client,
region,
DEFAULT_INSTANCE_TYPE,
network_category=network_category,
preferred_zone=picked_zone_id
)
if picked_zone_id:
log_action(f"✅ 选定支持包年的可用区: {picked_zone_id}")
elif network_category == "vpc":
raise RuntimeError("❌ VPC 场景需指定可用区/交换机,请传入 zone_id 或 vswitch_id可从控制台复制")
else:
log_action("⚠️ 未找到可用区,继续尝试经典网络下单(可能继续报错,请优先提供可用区/交换机)")
for i in range(count):
instance_name = generate_instance_name(customer)
password = generate_password()
# 4. 调用 RunInstances 创建 ECS比 CreateInstance 更通用)
# 只使用 AutoPay=true 直接支付;若被拒绝则抛出具体错误
req = RunInstancesRequest()
req.set_ImageId(image_id)
req.set_InstanceType(DEFAULT_INSTANCE_TYPE)
req.set_SecurityGroupId(sg_id)
if picked_zone_id:
req.set_ZoneId(picked_zone_id)
if vswitch_id:
req.set_VSwitchId(vswitch_id)
req.add_query_param("NetworkType", "vpc")
else:
req.add_query_param("NetworkType", "classic")
req.set_SystemDiskCategory("cloud_essd_entry")
req.set_SystemDiskSize(DEFAULT_SYSTEM_DISK)
req.set_InternetMaxBandwidthOut(DEFAULT_BANDWIDTH)
req.set_InstanceChargeType("PrePaid")
req.set_Period(DEFAULT_PERIOD)
req.set_PeriodUnit("Month")
req.add_query_param("AutoPay", "true")
req.set_InternetChargeType("PayByBandwidth")
req.set_InstanceName(instance_name)
req.set_Password(password)
req.add_query_param("MinAmount", 1)
req.add_query_param("MaxAmount", 1)
res = client.do_action_with_exception(req)
res_data = json.loads(res)
instance_ids = res_data.get("InstanceIdSets", {}).get("InstanceIdSet", [])
order_id = res_data.get("OrderId")
if not instance_ids:
raise RuntimeError(
f"下单未返回实例 ID订单号: {order_id or '未知'}),请在控制台检查订单/支付"
)
instance_id = instance_ids[0]
log_action(f"✅ 创建 ECS 实例成功: {instance_id}")
# 5. 分配公网 IP
ip_req = AllocatePublicIpAddressRequest()
ip_req.set_InstanceId(instance_id)
client.do_action_with_exception(ip_req)
log_action("✅ 分配公网 IP")
# 6. 等待实例停止,然后重启
wait_instance_stopped(client, region, instance_id)
start_req = StartInstanceRequest()
start_req.set_InstanceId(instance_id)
client.do_action_with_exception(start_req)
log_action("✅ 启动实例")
# 7. 查询公网 IP
ip_address = ""
for _ in range(10):
time.sleep(5)
desc = DescribeInstancesRequest()
desc.set_InstanceIds(json.dumps([instance_id]))
info = json.loads(client.do_action_with_exception(desc))[
"Instances"]["Instance"][0]
ips = info.get("PublicIpAddress", {}).get("IpAddress", [])
if ips:
ip_address = ips[0]
break
if not ip_address:
raise RuntimeError("获取公网 IP 失败")
log_action(f"✅ 公网 IP: {ip_address}")
# 8. 绑定到资源组
transfer_instance_to_group(
instance_id, rg_id, region, access_key, secret_key)
log_action("✅ 绑定资源组成功")
# 9. 收集结果
email = f"{ram_info['user']}@{account_uid}.onaliyun.com" if account_uid else None
results.append({
"ip": ip_address,
"name": instance_name,
"region": region,
"instance_id": instance_id,
"image_id": image_id,
"user": DEFAULT_USERNAME,
"password": password,
"created": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
"email": email,
"ram_user": ram_info["user"],
"ram_password": ram_info["password"],
})
return results
except Exception as e:
traceback.print_exc()
raise RuntimeError(f"批量创建 ECS 实例失败: {e}")