import os from dataclasses import dataclass from typing import Dict, List, Optional import boto3 from botocore.exceptions import BotoCoreError, ClientError import yaml class ConfigError(Exception): pass class AWSOperationError(Exception): pass @dataclass class AccountConfig: name: str region: str access_key_id: str secret_access_key: str ami_id: str instance_type: str subnet_id: str security_group_ids: List[str] key_name: Optional[str] = None def load_account_configs(path: str) -> Dict[str, AccountConfig]: if not os.path.exists(path): raise ConfigError(f"Config file not found at {path}") with open(path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) if not data or "accounts" not in data: raise ConfigError("accounts.yaml missing 'accounts' list") accounts = {} for item in data["accounts"]: cfg = AccountConfig( name=item["name"], region=item["region"], access_key_id=item["access_key_id"], secret_access_key=item["secret_access_key"], ami_id=item["ami_id"], instance_type=item["instance_type"], subnet_id=item["subnet_id"], security_group_ids=item.get("security_group_ids", []), key_name=item.get("key_name"), ) accounts[cfg.name] = cfg return accounts def ec2_client(account: AccountConfig): return boto3.client( "ec2", region_name=account.region, aws_access_key_id=account.access_key_id, aws_secret_access_key=account.secret_access_key, ) def _find_instance_id_by_ip(client, ip: str) -> Optional[str]: filters = [ {"Name": "instance-state-name", "Values": ["pending", "running", "stopping", "stopped"]}, ] for field in ["ip-address", "private-ip-address"]: try: resp = client.describe_instances(Filters=filters + [{"Name": field, "Values": [ip]}]) except (ClientError, BotoCoreError) as exc: raise AWSOperationError(f"Failed to describe instances: {exc}") from exc for reservation in resp.get("Reservations", []): for instance in reservation.get("Instances", []): return instance["InstanceId"] return None def _wait_for_state(client, instance_id: str, waiter_name: str) -> None: waiter = client.get_waiter(waiter_name) waiter.wait(InstanceIds=[instance_id]) def _terminate_instance(client, instance_id: str) -> None: try: client.terminate_instances(InstanceIds=[instance_id]) _wait_for_state(client, instance_id, "instance_terminated") except (ClientError, BotoCoreError) as exc: raise AWSOperationError(f"Failed to terminate instance {instance_id}: {exc}") from exc def _provision_instance(client, account: AccountConfig) -> str: try: params = { "ImageId": account.ami_id, "InstanceType": account.instance_type, "MinCount": 1, "MaxCount": 1, "SubnetId": account.subnet_id, "SecurityGroupIds": account.security_group_ids, } if account.key_name: params["KeyName"] = account.key_name resp = client.run_instances(**params) instance_id = resp["Instances"][0]["InstanceId"] _wait_for_state(client, instance_id, "instance_running") return instance_id except (ClientError, BotoCoreError) as exc: raise AWSOperationError(f"Failed to create instance: {exc}") from exc def _get_public_ip(client, instance_id: str) -> str: try: resp = client.describe_instances(InstanceIds=[instance_id]) reservations = resp.get("Reservations", []) if not reservations: raise AWSOperationError("Instance not found when reading IP") instance = reservations[0]["Instances"][0] return instance.get("PublicIpAddress") or "" except (ClientError, BotoCoreError) as exc: raise AWSOperationError(f"Failed to fetch public IP: {exc}") from exc def _recycle_ip_until_free(client, instance_id: str, banned_ips: set[str], retry_limit: int) -> str: attempts = 0 while attempts < retry_limit: current_ip = _get_public_ip(client, instance_id) if current_ip and current_ip not in banned_ips: return current_ip try: client.stop_instances(InstanceIds=[instance_id]) _wait_for_state(client, instance_id, "instance_stopped") client.start_instances(InstanceIds=[instance_id]) _wait_for_state(client, instance_id, "instance_running") except (ClientError, BotoCoreError) as exc: raise AWSOperationError(f"Failed while cycling IP: {exc}") from exc attempts += 1 raise AWSOperationError("Reached retry limit while attempting to obtain a free IP") def replace_instance_ip( ip: str, account: AccountConfig, disallowed_ips: set[str], retry_limit: int = 5 ) -> Dict[str, str]: client = ec2_client(account) instance_id = _find_instance_id_by_ip(client, ip) if not instance_id: raise AWSOperationError(f"No instance found with IP {ip}") _terminate_instance(client, instance_id) new_instance_id = _provision_instance(client, account) new_ip = _recycle_ip_until_free(client, new_instance_id, disallowed_ips, retry_limit) return { "terminated_instance_id": instance_id, "new_instance_id": new_instance_id, "new_ip": new_ip, }