# -*- coding: utf-8 -*- import os import json import csv from datetime import datetime from alibabacloud_ecs20140526.client import Client as EcsClient from alibabacloud_swas_open20200601.client import Client as SwasClient from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_ecs20140526 import models as ecs_models from alibabacloud_swas_open20200601 import models as swas_models from alibabacloud_tea_util.client import Client as UtilClient def create_ecs_client(access_key_id, access_key_secret, region_id): """ Create ECS client. """ config = open_api_models.Config( access_key_id=access_key_id, access_key_secret=access_key_secret, region_id=region_id ) return EcsClient(config) def create_swas_client(access_key_id, access_key_secret, region_id): """ Create SWAS client. """ config = open_api_models.Config( access_key_id=access_key_id, access_key_secret=access_key_secret, region_id=region_id ) # Explicitly set the endpoint to avoid DNS resolution issues with the default global endpoint. config.endpoint = f'swas.{region_id}.aliyuncs.com' return SwasClient(config) def get_all_regions(client): """ Get all available regions. """ try: request = ecs_models.DescribeRegionsRequest() response = client.describe_regions(request) return [region.region_id for region in response.body.regions.region] except Exception as e: print(f"Error fetching regions: {e}") return [] def list_ecs_instances(client, region_id): """ List all ECS instances in a specific region. """ all_instances = [] try: request = ecs_models.DescribeInstancesRequest(region_id=region_id, page_size=100) while True: response = client.describe_instances(request) instances = response.body.instances.instance if not instances: break all_instances.extend(instances) if not response.body.next_token: break request.next_token = response.body.next_token except Exception as e: print(f"Error fetching ECS instances in {region_id}: {e}") return all_instances def list_lightweight_servers(client, region_id): """ List all Lightweight Application Servers in a specific region. """ all_servers = [] try: request = swas_models.ListInstancesRequest(region_id=region_id, page_size=100) while True: response = client.list_instances(request) servers = response.body.instances if not servers: break all_servers.extend(servers) if response.body.total_count == len(all_servers): break request.page_number = request.page_number + 1 if request.page_number else 2 except Exception as e: print(f"Error fetching Lightweight Application Servers in {region_id}: {e}") return all_servers def load_accounts_from_config(config_file='config.json'): """ Load account credentials from a JSON config file. """ if not os.path.exists(config_file): print(f"错误:配置文件 '{config_file}' 不存在。") print(f"请将 'config.json.example' 复制为 '{config_file}' 并填入您的AccessKey信息。") return None try: with open(config_file, 'r', encoding='utf-8') as f: accounts = json.load(f) if not accounts: print(f"错误:配置文件 '{config_file}' 为空或格式不正确。") return None return accounts except json.JSONDecodeError: print(f"错误:解析配置文件 '{config_file}' 失败,请检查JSON格式是否正确。") return None except Exception as e: print(f"读取配置文件时发生未知错误: {e}") return None def main(): accounts = load_accounts_from_config() if not accounts: return timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") csv_filename = f'aliyun_servers_{timestamp}.csv' with open(csv_filename, 'w', newline='', encoding='utf-8-sig') as csvfile: csv_writer = csv.writer(csvfile) csv_writer.writerow([ '账号', '类型', '区域', '实例ID', '实例名称', '公网IP', '内网IP', '配置', '创建/开机时间', '到期时间' ]) for account in accounts: account_name = account['name'] print("=" * 80) print(f"正在查询账号: {account_name}") print("=" * 80) default_region = 'cn-hangzhou' try: ecs_client_for_regions = create_ecs_client(account['access_key_id'], account['access_key_secret'], default_region) regions = get_all_regions(ecs_client_for_regions) except Exception as e: print(f"无法连接到阿里云或获取区域列表,请检查AccessKey和网络连接: {e}") continue print("正在查询 ECS 实例...") for region in regions: ecs_client = create_ecs_client(account['access_key_id'], account['access_key_secret'], region) instances = list_ecs_instances(ecs_client, region) if instances: for inst in instances: public_ip = ", ".join(inst.public_ip_address.ip_address) if inst.public_ip_address.ip_address else "" private_ip = ", ".join(inst.vpc_attributes.private_ip_address.ip_address) if inst.vpc_attributes and inst.vpc_attributes.private_ip_address.ip_address else "" config_str = f"{inst.instance_type} ({inst.cpu}核 CPU, {inst.memory / 1024} GB 内存)" csv_writer.writerow([ account_name, 'ECS', region, inst.instance_id, inst.instance_name, public_ip, private_ip, config_str, inst.creation_time, inst.expired_time ]) print("正在查询轻量应用服务器...") for region in regions: try: swas_client = create_swas_client(account['access_key_id'], account['access_key_secret'], region) servers = list_lightweight_servers(swas_client, region) if servers: for server in servers: csv_writer.writerow([ account_name, '轻量应用服务器', region, server.instance_id, server.instance_name, server.public_ip_address, '', server.plan_id, server.creation_time, server.expired_time ]) except Exception as e: if "not supported" in str(e) or "Unreachable" in str(e) or "Failed to resolve" in str(e): continue print(f"查询轻量服务器时发生错误 (区域: {region}): {e}") print(f"\n查询完成!结果已保存到文件: {csv_filename}") if __name__ == '__main__': main()