# services/ecs_service.py import uuid import time import json from datetime import datetime from typing import List, Dict, Any from decimal import Decimal, InvalidOperation import traceback from aliyunsdkcore.client import AcsClient from aliyunsdkecs.request.v20140526.CreateInstanceRequest import CreateInstanceRequest from aliyunsdkecs.request.v20140526.RunInstancesRequest import RunInstancesRequest from aliyunsdkecs.request.v20140526.AllocatePublicIpAddressRequest import AllocatePublicIpAddressRequest from aliyunsdkecs.request.v20140526.StartInstanceRequest import StartInstanceRequest from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest from aliyunsdkecs.request.v20140526.DescribeSecurityGroupsRequest import DescribeSecurityGroupsRequest from aliyunsdkecs.request.v20140526.DescribeImagesRequest import DescribeImagesRequest from aliyunsdkecs.request.v20140526.DescribeAvailableResourceRequest import DescribeAvailableResourceRequest from aliyunsdkecs.request.v20140526.DescribeVSwitchesRequest import DescribeVSwitchesRequest from aliyunsdkecs.request.v20140526.DescribeVpcsRequest import DescribeVpcsRequest from fastapi import HTTPException from services.ram_service import create_ram_user from services.resource_group_service import ( create_resource_group, create_custom_ecs_policy_for_group, attach_policy_to_user ) from services.resource_transfer_service import transfer_instance_to_group from utils.generators import generate_instance_name, generate_password from utils.logger import log_action from aliyunsdkcore.request import CommonRequest from decimal import Decimal from aliyunsdkcore.acs_exception.exceptions import ServerException, ClientException DEFAULT_IMAGE = "ubuntu_22_04_x64_20G_alibase_20250415.vhd" DEFAULT_INSTANCE_TYPE = "ecs.u1-c1m2.xlarge" DEFAULT_SYSTEM_DISK = 500 DEFAULT_BANDWIDTH = 1 DEFAULT_SECURITY_GROUP = "or_chain" DEFAULT_USERNAME = "root" # 阿里云 CreateInstance 订阅场景只接受 PeriodUnit=Month,12 个月等价年付 DEFAULT_PERIOD = 12 DEFAULT_IMAGE_NAME = "Ubuntu 22.04 x64 20G 官方" def list_custom_and_shared_images( access_key: str, secret_key: str, region: str = "ap-southeast-1", page_size: int = 100 ) -> List[Dict[str, str]]: """ 查询用户自建镜像和共享给用户的镜像,并注入默认官方镜像: - 默认镜像:ImageId=DEFAULT_IMAGE,显示为 “Ubuntu22.04… 官方 (胜总项目)” - 其他镜像:显示为 “<原名称> (OR项目)” """ client = AcsClient(access_key, secret_key, region) all_images = [] # 拉自建 + 共享 for alias in ("self", "others"): req = DescribeImagesRequest() req.add_query_param("ImageOwnerAlias", alias) req.add_query_param("PageSize", page_size) try: resp = client.do_action_with_exception(req) data = json.loads(resp) all_images.extend(data.get("Images", {}).get("Image", [])) except ServerException as e: log_action(f"[ERROR] alias={alias} 拉取失败:{e}") continue # 去重 unique: Dict[str, Dict[str, str]] = {} for img in all_images: iid = img.get("ImageId") if not iid or iid in unique: continue unique[iid] = { "ImageId": iid, "ImageName": img.get("ImageName", iid) } # 构建带标记的列表:默认镜像优先,其它紧随其后 annotated: List[Dict[str, str]] = [] # 1. 默认官方镜像 annotated.append({ "ImageId": DEFAULT_IMAGE, "ImageName": f"{DEFAULT_IMAGE_NAME} (胜总项目)" }) # 2. 其余去重后的镜像,统一标注 OR 项目 for iid, info in unique.items(): if iid == DEFAULT_IMAGE: continue annotated.append({ "ImageId": iid, "ImageName": f"{info['ImageName']} (OR项目)" }) return annotated def get_account_credit( access_key: str, secret_key: str, region: str = "ap-southeast-1" ) -> Decimal: """ 调用 BSSOpenAPI QueryAccountBalance 使用用户提供的access_key和secret_key """ try: # 使用传入的access_key和secret_key来创建客户端 client = AcsClient(access_key, secret_key, region) # 创建请求对象 req = CommonRequest() req.set_protocol_type("https") req.set_accept_format("json") req.set_method("POST") req.set_domain("business.ap-southeast-1.aliyuncs.com") req.set_version("2017-12-14") req.set_action_name("QueryAccountBalance") # 发送请求并获取响应 resp_bytes = client.do_action_with_exception(req) resp = json.loads(resp_bytes) # 打印响应内容,方便调试 print(f"API Response: {resp}") # 解析响应数据 data = resp.get("Data", {}) available_amount = data.get("AvailableAmount", "0.00") # 去除可能的逗号,确保数字格式正确 available_amount_clean = available_amount.replace(',', '') # 确保返回值是有效的数字 try: available_amount_decimal = Decimal(available_amount_clean) except InvalidOperation: # 如果无法转换为Decimal,直接返回无效金额值 print(f"无效的余额值:{available_amount_clean}") return available_amount_clean # 直接返回无效金额值 # 返回可用余额 return available_amount_decimal except (ServerException, ClientException) as e: # 捕获客户端或服务器端异常并输出错误 error_message = getattr(e, 'error_message', str(e)) # 获取错误消息,如果不存在则使用异常本身 print(f"API请求失败:{error_message}") raise HTTPException(status_code=400, detail=f"API请求失败:{error_message}") except ValueError as ve: # 如果解析出无效的余额值,直接返回 print(f"无效的余额值:{ve}") return "Invalid available amount" # 返回一个无效金额的字符串 except Exception as e: # 捕获其他异常并输出错误 print(f"发生未知错误:{e}") raise HTTPException(status_code=500, detail=f"发生未知错误:{e}") def wait_instance_stopped(client, region, instance_id, timeout=60): req = DescribeInstancesRequest() req.set_InstanceIds(json.dumps([instance_id])) print("⏳ 等待实例进入 Stopped 状态...") for _ in range(timeout // 3): res = client.do_action_with_exception(req) status = json.loads(res)["Instances"]["Instance"][0]["Status"] print(f" 当前状态: {status}") if status == "Stopped": print("✅ 实例已进入 Stopped 状态") return time.sleep(3) raise RuntimeError("❌ 等待实例进入 Stopped 状态超时") def get_security_group_id_by_name(group_name, region, client): print("🔍 获取安全组 ID 中...") req = DescribeSecurityGroupsRequest() req.set_PageSize(50) res = client.do_action_with_exception(req) groups = json.loads(res)['SecurityGroups']['SecurityGroup'] for g in groups: if g['SecurityGroupName'] == group_name: print(f"✅ 找到安全组 {group_name}: {g['SecurityGroupId']}") return g['SecurityGroupId'] raise ValueError(f"❌ 未找到安全组名称: {group_name}") def pick_prepaid_zone( client, region: str, instance_type: str, network_category: str = "classic", preferred_zone: str | None = None ) -> str | None: """ 查询支持 PrePaid 的可用区,优先返回第一个可用的 ZoneId。 如果找不到,则抛出明确错误,避免直接用不支持订阅的组合导致 InvalidInstanceChargeType。 """ req = DescribeAvailableResourceRequest() # 某些 SDK 版本无 set_RegionId,使用 add_query_param 传 RegionId req.add_query_param("RegionId", region) req.set_DestinationResource("InstanceType") req.set_InstanceChargeType("PrePaid") req.set_InstanceType(instance_type) req.add_query_param("NetworkCategory", network_category) res = client.do_action_with_exception(req) data = json.loads(res) zones = data.get("AvailableZones", {}).get("AvailableZone", []) for zone in zones: if zone.get("Status") != "Available": continue resources = zone.get("AvailableResources", {}).get("AvailableResource", []) for r in resources: if r.get("Type") != "instance-type": continue for status in r.get("SupportedResources", {}).get("SupportedResource", []): if status.get("Status") == "Available" and status.get("Value") == instance_type: if preferred_zone and preferred_zone == zone["ZoneId"]: return preferred_zone if not preferred_zone: return zone["ZoneId"] # 如果 API 没返回可用区,但用户明确指定了区,允许回退使用该区尝试下单(与控制台一致) if preferred_zone: log_action(f"⚠️ 资源查询未返回可用区,回退使用用户指定的可用区 {preferred_zone}") return preferred_zone # 无可用区且未指定,返回 None 由调用方决定如何处理 log_action("⚠️ 资源查询未返回任何支持包年包月的可用区") return None def get_vswitch_zone_id(vswitch_id: str, client) -> str: req = DescribeVSwitchesRequest() req.add_query_param("VSwitchId", vswitch_id) res = client.do_action_with_exception(req) data = json.loads(res) vswitches = data.get("VSwitches", {}).get("VSwitch", []) if not vswitches: raise RuntimeError(f"❌ 未找到交换机 {vswitch_id}") return vswitches[0].get("ZoneId") def list_vpcs_and_vswitches(access_key: str, secret_key: str, region: str) -> Dict[str, Any]: """ 拉取指定地域的 VPC 与其下交换机列表,方便前端下拉选择。 """ client = AcsClient(access_key, secret_key, region) result: Dict[str, Any] = {"vpcs": [], "vswitches": []} # VPC 列表 try: vpc_req = DescribeVpcsRequest() if hasattr(vpc_req, "set_RegionId"): vpc_req.set_RegionId(region) else: vpc_req.add_query_param("RegionId", region) vpc_res = client.do_action_with_exception(vpc_req) vpc_data = json.loads(vpc_res) result["vpcs"] = vpc_data.get("Vpcs", {}).get("Vpc", []) except Exception as e: log_action(f"⚠️ 拉取 VPC 列表失败: {e}") # VSwitch 列表:如果有 VPC,则按 VPC 逐个查询,避免 Region 空查询结果 try: vs_list = [] vpc_ids = [v["VpcId"] for v in result["vpcs"]] or [None] for vpc_id in vpc_ids: vsw_req = DescribeVSwitchesRequest() if hasattr(vsw_req, "set_RegionId"): vsw_req.set_RegionId(region) else: vsw_req.add_query_param("RegionId", region) if vpc_id: vsw_req.add_query_param("VpcId", vpc_id) vsw_req.add_query_param("PageSize", 100) vsw_req.add_query_param("PageNumber", 1) vsw_res = client.do_action_with_exception(vsw_req) vsw_data = json.loads(vsw_res) vs_list.extend(vsw_data.get("VSwitches", {}).get("VSwitch", [])) result["vswitches"] = vs_list except Exception as e: log_action(f"⚠️ 拉取 VSwitch 列表失败: {e}") # 如果列表仍为空,使用 VPC 返回的 VSwitchId 逐个查属性兜底 if not result["vswitches"] and result["vpcs"]: try: vsw_ids = [] for vpc in result["vpcs"]: vsw_ids.extend(vpc.get("VSwitchIds", {}).get("VSwitchId", [])) seen = set() vsw_details = [] for vsw_id in vsw_ids: if vsw_id in seen: continue seen.add(vsw_id) # 退化为按 VSwitchId 查询列表接口 attr_req = DescribeVSwitchesRequest() attr_req.add_query_param("RegionId", region) attr_req.add_query_param("VSwitchId", vsw_id) attr_req.add_query_param("PageSize", 10) attr_res = client.do_action_with_exception(attr_req) attr_data = json.loads(attr_res) items = attr_data.get("VSwitches", {}).get("VSwitch", []) vsw_details.extend(items) if vsw_details: result["vswitches"] = vsw_details except Exception as e: log_action(f"⚠️ 兜底查询 VSwitch 属性失败: {e}") return result def create_ecs_instances( customer: str, resource_name: str, region: str, access_key: str, secret_key: str, count: int = 1, account_uid: str | None = None, image_id: str = DEFAULT_IMAGE, zone_id: str | None = None, vswitch_id: str | None = None, ) -> List[Dict[str, Any]]: """ 批量创建 ECS 实例。 参数: - customer: 客户名称(用于生成实例名) - resource_name: RAM 用户名 & 资源组名 - region: ECS 区域 - access_key / secret_key: 阿里云凭证 - count: 要创建的实例数量 - account_uid: 如果有,则用于拼接 RAM 用户邮箱 - image_id: 要使用的镜像 ID(前端下拉选中) 返回: - List of instance_info 字典,包含 ip, name, region, instance_id, user, password, created, email, ram_user, ram_password """ results = [] try: client = AcsClient(access_key, secret_key, region) log_action("🚀 开始创建 ECS 资源流程") # 1. 创建 RAM 用户 ram_info = create_ram_user(resource_name, region, client) log_action("✅ 创建 RAM 用户完成") # 2. 创建资源组并绑定策略 rg_id = create_resource_group(resource_name, client) policy_name = create_custom_ecs_policy_for_group(rg_id, client) attach_policy_to_user(resource_name, policy_name, client) log_action("✅ 资源组 & 策略绑定完成") # 3. 获取安全组 ID sg_id = get_security_group_id_by_name( DEFAULT_SECURITY_GROUP, region, client) log_action(f"✅ 获取安全组成功: {sg_id}") # 3.1 选择支持订阅的可用区,优先使用传入的交换机/可用区信息 picked_zone_id = zone_id network_category = "classic" if vswitch_id: network_category = "vpc" if not picked_zone_id: picked_zone_id = get_vswitch_zone_id(vswitch_id, client) log_action(f"🔍 通过交换机获取可用区: {picked_zone_id}") picked_zone_id = pick_prepaid_zone( client, region, DEFAULT_INSTANCE_TYPE, network_category=network_category, preferred_zone=picked_zone_id ) if picked_zone_id: log_action(f"✅ 选定支持包年的可用区: {picked_zone_id}") elif network_category == "vpc": raise RuntimeError("❌ VPC 场景需指定可用区/交换机,请传入 zone_id 或 vswitch_id(可从控制台复制)") else: log_action("⚠️ 未找到可用区,继续尝试经典网络下单(可能继续报错,请优先提供可用区/交换机)") for i in range(count): instance_name = generate_instance_name(customer) password = generate_password() # 4. 调用 RunInstances 创建 ECS(比 CreateInstance 更通用) # 只使用 AutoPay=true 直接支付;若被拒绝则抛出具体错误 req = RunInstancesRequest() req.set_ImageId(image_id) req.set_InstanceType(DEFAULT_INSTANCE_TYPE) req.set_SecurityGroupId(sg_id) if picked_zone_id: req.set_ZoneId(picked_zone_id) if vswitch_id: req.set_VSwitchId(vswitch_id) req.add_query_param("NetworkType", "vpc") else: req.add_query_param("NetworkType", "classic") req.set_SystemDiskCategory("cloud_essd_entry") req.set_SystemDiskSize(DEFAULT_SYSTEM_DISK) req.set_InternetMaxBandwidthOut(DEFAULT_BANDWIDTH) req.set_InstanceChargeType("PrePaid") req.set_Period(DEFAULT_PERIOD) req.set_PeriodUnit("Month") req.add_query_param("AutoPay", "true") req.set_InternetChargeType("PayByBandwidth") req.set_InstanceName(instance_name) req.set_Password(password) req.add_query_param("MinAmount", 1) req.add_query_param("MaxAmount", 1) res = client.do_action_with_exception(req) res_data = json.loads(res) instance_ids = res_data.get("InstanceIdSets", {}).get("InstanceIdSet", []) order_id = res_data.get("OrderId") if not instance_ids: raise RuntimeError( f"下单未返回实例 ID(订单号: {order_id or '未知'}),请在控制台检查订单/支付" ) instance_id = instance_ids[0] log_action(f"✅ 创建 ECS 实例成功: {instance_id}") # 5. 分配公网 IP ip_req = AllocatePublicIpAddressRequest() ip_req.set_InstanceId(instance_id) client.do_action_with_exception(ip_req) log_action("✅ 分配公网 IP") # 6. 等待实例停止,然后重启 wait_instance_stopped(client, region, instance_id) start_req = StartInstanceRequest() start_req.set_InstanceId(instance_id) client.do_action_with_exception(start_req) log_action("✅ 启动实例") # 7. 查询公网 IP ip_address = "" for _ in range(10): time.sleep(5) desc = DescribeInstancesRequest() desc.set_InstanceIds(json.dumps([instance_id])) info = json.loads(client.do_action_with_exception(desc))[ "Instances"]["Instance"][0] ips = info.get("PublicIpAddress", {}).get("IpAddress", []) if ips: ip_address = ips[0] break if not ip_address: raise RuntimeError("获取公网 IP 失败") log_action(f"✅ 公网 IP: {ip_address}") # 8. 绑定到资源组 transfer_instance_to_group( instance_id, rg_id, region, access_key, secret_key) log_action("✅ 绑定资源组成功") # 9. 收集结果 email = f"{ram_info['user']}@{account_uid}.onaliyun.com" if account_uid else None results.append({ "ip": ip_address, "name": instance_name, "region": region, "instance_id": instance_id, "image_id": image_id, "user": DEFAULT_USERNAME, "password": password, "created": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), "email": email, "ram_user": ram_info["user"], "ram_password": ram_info["password"], }) return results except Exception as e: traceback.print_exc() raise RuntimeError(f"批量创建 ECS 实例失败: {e}")