zhiyunTools/showAliServer/list_aliyun_servers.py

177 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
import json
import csv
from datetime import datetime
from alibabacloud_ecs20140526.client import Client as EcsClient
from alibabacloud_swas_open20200601.client import Client as SwasClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ecs20140526 import models as ecs_models
from alibabacloud_swas_open20200601 import models as swas_models
from alibabacloud_tea_util.client import Client as UtilClient
def create_ecs_client(access_key_id, access_key_secret, region_id):
"""
Create ECS client.
"""
config = open_api_models.Config(
access_key_id=access_key_id,
access_key_secret=access_key_secret,
region_id=region_id
)
return EcsClient(config)
def create_swas_client(access_key_id, access_key_secret, region_id):
"""
Create SWAS client.
"""
config = open_api_models.Config(
access_key_id=access_key_id,
access_key_secret=access_key_secret,
region_id=region_id
)
# Explicitly set the endpoint to avoid DNS resolution issues with the default global endpoint.
config.endpoint = f'swas.{region_id}.aliyuncs.com'
return SwasClient(config)
def get_all_regions(client):
"""
Get all available regions.
"""
try:
request = ecs_models.DescribeRegionsRequest()
response = client.describe_regions(request)
return [region.region_id for region in response.body.regions.region]
except Exception as e:
print(f"Error fetching regions: {e}")
return []
def list_ecs_instances(client, region_id):
"""
List all ECS instances in a specific region.
"""
all_instances = []
try:
request = ecs_models.DescribeInstancesRequest(region_id=region_id, page_size=100)
while True:
response = client.describe_instances(request)
instances = response.body.instances.instance
if not instances:
break
all_instances.extend(instances)
if not response.body.next_token:
break
request.next_token = response.body.next_token
except Exception as e:
print(f"Error fetching ECS instances in {region_id}: {e}")
return all_instances
def list_lightweight_servers(client, region_id):
"""
List all Lightweight Application Servers in a specific region.
"""
all_servers = []
try:
request = swas_models.ListInstancesRequest(region_id=region_id, page_size=100)
while True:
response = client.list_instances(request)
servers = response.body.instances
if not servers:
break
all_servers.extend(servers)
if response.body.total_count == len(all_servers):
break
request.page_number = request.page_number + 1 if request.page_number else 2
except Exception as e:
print(f"Error fetching Lightweight Application Servers in {region_id}: {e}")
return all_servers
def load_accounts_from_config(config_file='config.json'):
"""
Load account credentials from a JSON config file.
"""
if not os.path.exists(config_file):
print(f"错误:配置文件 '{config_file}' 不存在。")
print(f"请将 'config.json.example' 复制为 '{config_file}' 并填入您的AccessKey信息。")
return None
try:
with open(config_file, 'r', encoding='utf-8') as f:
accounts = json.load(f)
if not accounts:
print(f"错误:配置文件 '{config_file}' 为空或格式不正确。")
return None
return accounts
except json.JSONDecodeError:
print(f"错误:解析配置文件 '{config_file}' 失败请检查JSON格式是否正确。")
return None
except Exception as e:
print(f"读取配置文件时发生未知错误: {e}")
return None
def main():
accounts = load_accounts_from_config()
if not accounts:
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_filename = f'aliyun_servers_{timestamp}.csv'
with open(csv_filename, 'w', newline='', encoding='utf-8-sig') as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow([
'账号', '类型', '区域', '实例ID', '实例名称', '公网IP', '内网IP',
'配置', '创建/开机时间', '到期时间'
])
for account in accounts:
account_name = account['name']
print("=" * 80)
print(f"正在查询账号: {account_name}")
print("=" * 80)
default_region = 'cn-hangzhou'
try:
ecs_client_for_regions = create_ecs_client(account['access_key_id'], account['access_key_secret'], default_region)
regions = get_all_regions(ecs_client_for_regions)
except Exception as e:
print(f"无法连接到阿里云或获取区域列表请检查AccessKey和网络连接: {e}")
continue
print("正在查询 ECS 实例...")
for region in regions:
ecs_client = create_ecs_client(account['access_key_id'], account['access_key_secret'], region)
instances = list_ecs_instances(ecs_client, region)
if instances:
for inst in instances:
public_ip = ", ".join(inst.public_ip_address.ip_address) if inst.public_ip_address.ip_address else ""
private_ip = ", ".join(inst.vpc_attributes.private_ip_address.ip_address) if inst.vpc_attributes and inst.vpc_attributes.private_ip_address.ip_address else ""
config_str = f"{inst.instance_type} ({inst.cpu}核 CPU, {inst.memory / 1024} GB 内存)"
csv_writer.writerow([
account_name, 'ECS', region, inst.instance_id, inst.instance_name, public_ip, private_ip,
config_str, inst.creation_time, inst.expired_time
])
print("正在查询轻量应用服务器...")
for region in regions:
try:
swas_client = create_swas_client(account['access_key_id'], account['access_key_secret'], region)
servers = list_lightweight_servers(swas_client, region)
if servers:
for server in servers:
csv_writer.writerow([
account_name, '轻量应用服务器', region, server.instance_id, server.instance_name, server.public_ip_address, '',
server.plan_id, server.creation_time, server.expired_time
])
except Exception as e:
if "not supported" in str(e) or "Unreachable" in str(e) or "Failed to resolve" in str(e):
continue
print(f"查询轻量服务器时发生错误 (区域: {region}): {e}")
print(f"\n查询完成!结果已保存到文件: {csv_filename}")
if __name__ == '__main__':
main()