From 76424cc8c32a490f4557eff6c90003be1d0dd2ce Mon Sep 17 00:00:00 2001 From: wangqifan Date: Mon, 5 Jan 2026 11:07:55 +0800 Subject: [PATCH] =?UTF-8?q?aws=E8=83=BD=E7=94=A8=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env | 5 +- .gitignore | 1 + .vscode/settings.json | 5 + README.md | 40 ++++--- app.py | 143 ++++++++++++++++++++++- aws_service.py | 243 +++++++++++++++++++++++++++++++++----- config/accounts.yaml | 17 ++- db.py | 258 ++++++++++++++++++++++++++++++++++++++++- requirements.txt | 1 + templates/history.html | 131 +++++++++++++++++++++ templates/index.html | 35 +++--- templates/login.html | 65 +++++++++++ 12 files changed, 864 insertions(+), 80 deletions(-) create mode 100644 .gitignore create mode 100644 .vscode/settings.json create mode 100644 templates/history.html create mode 100644 templates/login.html diff --git a/.env b/.env index 4f74b82..23384dc 100644 --- a/.env +++ b/.env @@ -1,4 +1,7 @@ FLASK_ENV=development -DATABASE_URL=mysql+pymysql://username:password@localhost:3306/ip_ops +DATABASE_URL=mysql+pymysql://ec2_mt5:8FmzXj4xcz3AiH2R@163.123.183.106:3306/ec2_mt5 AWS_CONFIG_PATH=config/accounts.yaml IP_RETRY_LIMIT=5 +APP_USER=admin +APP_PASSWORD=Pc9mVTm3kKo0pO +SECRET_KEY=51aiapi diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..75ec24a --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__/*.pyc diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..a8c2003 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "python-envs.defaultEnvManager": "ms-python.python:conda", + "python-envs.defaultPackageManager": "ms-python.python:conda", + "python-envs.pythonProjects": [] +} \ No newline at end of file diff --git a/README.md b/README.md index fedc6f8..0d12a3c 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,10 @@ -# AWS IP 替换网站(Flask) +# AWS IP 替换工具(Flask) -基于 Flask + boto3 + MySQL 的小工具,用于: -- 根据输入的 IP 查找对应的 EC2 实例并终止 -- 使用配置好的 AMI 创建新实例 -- 如果新实例的公网 IP 存在于运维表(黑名单),通过停止/启动循环获取新 IP -- 支持多 AWS 账户,通过配置文件切换 +Flask + boto3 + MySQL 的小工具,用于: +- 根据输入 IP 查找对应 EC2 实例并终止,使用预设 AMI 创建新实例 +- 通过数据库中的 IP-账户映射自动选择 AWS 账户,前端不暴露账户列表 +- 如果新公网 IP 落入运维黑名单,自动停机/开机循环更换 IP(受 `IP_RETRY_LIMIT` 控制) +- MySQL 存储黑名单 (`ip_operations`)、IP-账户映射 (`ip_account_mapping`)、服务器规格 (`server_specs`,含实例类型/Name/磁盘/安全组/区域/子网/AZ)、IP 替换历史 (`ip_replacement_history`,含 group_id 链路标识,默认取旧 IP) ## 快速开始 1) 安装依赖 @@ -17,19 +17,22 @@ pip install -r requirements.txt 2) 配置环境变量 复制 `.env.example` 为 `.env`,按需修改: - `DATABASE_URL`:MySQL 连接串,例如 `mysql+pymysql://user:pass@localhost:3306/ip_ops` -- `AWS_CONFIG_PATH`:AWS 账户配置文件(默认 `config/accounts.yaml`) -- `IP_RETRY_LIMIT`:新 IP 与运维表冲突时的关机/开机重试次数 +- `AWS_CONFIG_PATH`:AWS 账户配置文件,默认 `config/accounts.yaml` +- `IP_RETRY_LIMIT`:新 IP 与黑名单冲突时的停机/开机重试次数 3) 准备数据库 -创建数据库并授权,然后首次运行时会自动创建表 `ip_operations`: +创建数据库并授权,首次运行会自动建表 `ip_operations`、`ip_account_mapping`、`server_specs`、`ip_replacement_history`: ```sql CREATE DATABASE ip_ops DEFAULT CHARACTER SET utf8mb4; GRANT ALL ON ip_ops.* TO 'user'@'%' IDENTIFIED BY 'pass'; ``` -表中记录的 IP 被视为不可使用的 IP 黑名单。 +`ip_account_mapping` 记录 IP 与账户名映射(运行前请先写入),例如: +```sql +INSERT INTO ip_account_mapping (ip_address, account_name) VALUES ('54.12.34.56', 'account_a'); +``` 4) 配置 AWS 账户 -编辑 `config/accounts.yaml`,为每个账户填写:访问密钥、区域、AMI ID、实例类型、子网、安全组等。 +编辑 `config/accounts.yaml`,为每个账户填写:访问密钥、区域、AMI ID、可选子网/安全组/密钥名等(实例类型无需配置,后端按源实例类型创建;若能读取到源实例的子网与安全组,将复用它们,否则回落到配置文件;密钥名若不存在会自动忽略重试)。 5) 启动 ```bash @@ -38,13 +41,12 @@ flask --app app run --host 0.0.0.0 --port 5000 ``` ## 运行流程 -1) 页面输入需要替换的 IP,并选择 AWS 账户 -2) 后端在该账户中查找实例(先查公网 IP,再查私网 IP) -3) 终止旧实例 -4) 使用配置的 AMI/实例规格创建新实例并等待 `running` -5) 如果新公网 IP 在 `ip_operations` 表中,自动执行停止+启动直到拿到未被列入黑名单的 IP(最多 `IP_RETRY_LIMIT` 次) +1) 页面输入需要替换的 IP,后端用 `ip_account_mapping` 定位账户并读取对应 AWS 配置 +2) 在该账户中查找公/私网 IP 匹配的实例,读取实例类型、Name、根盘大小/类型、安全组(ID/名称)、区域/子网/AZ,并记录到 `server_specs`;若实例未找到则回退使用数据库中已存的规格 +3) 按记录的规格创建新实例(实例类型、磁盘类型/大小、安全组、子网/AZ),如新公网 IP 在 `ip_operations` 黑名单中,则停机/开机循环直至获得可用 IP(或达到重试上限);旧实例的终止异步触发,不会阻塞新实例创建 +4) 记录 IP 替换历史到 `ip_replacement_history`,group_id 默认用旧 IP;前端主页可跳转到历史页按 IP/group 查询链路;同时更新 `server_specs` 中的 IP 规格为最新 IP ## 注意事项 -- 真实环境会产生终止/创建实例等成本操作,请先在测试账户验证流程 -- 如果 AWS 或数据库配置加载失败,页面会显示错误提示 -- 根据需要可在 `ip_operations` 表中维护不可用 IP 列表,避免重复分配 +- 真实环境会产生终止/创建实例等成本操作,先在测试账户验证流程 +- 若 AWS 或数据库配置加载失败,页面会直接显示错误提示 +- 需定期维护 `ip_operations` 黑名单、`ip_account_mapping` 映射,以及 `server_specs` 中的规格数据 diff --git a/app.py b/app.py index 756e07a..a503535 100644 --- a/app.py +++ b/app.py @@ -1,8 +1,9 @@ import os from typing import Dict +from functools import wraps from dotenv import load_dotenv -from flask import Flask, jsonify, render_template, request +from flask import Flask, jsonify, render_template, request, redirect, url_for, session from aws_service import ( AWSOperationError, @@ -11,12 +12,37 @@ from aws_service import ( load_account_configs, replace_instance_ip, ) -from db import init_db, load_disallowed_ips +from db import ( + add_replacement_history, + get_account_by_ip, + get_replacement_history, + get_history_by_ip_or_group, + get_history_chains, + get_server_spec, + init_db, + load_disallowed_ips, + update_ip_account_mapping, + upsert_server_spec, +) load_dotenv() app = Flask(__name__) +app.secret_key = os.getenv("SECRET_KEY", "please-change-me") + +APP_USER = os.getenv("APP_USER", "") +APP_PASSWORD = os.getenv("APP_PASSWORD", "") + + +def login_required(fn): + @wraps(fn) + def wrapper(*args, **kwargs): + if not session.get("authed"): + return redirect(url_for("login", next=request.path)) + return fn(*args, **kwargs) + + return wrapper def load_configs() -> Dict[str, AccountConfig]: @@ -41,34 +67,139 @@ except Exception as exc: # noqa: BLE001 - surface DB connection issues to UI @app.route("/", methods=["GET"]) +@login_required def index(): if init_error or db_error: return render_template("index.html", accounts=[], init_error=init_error or db_error) return render_template("index.html", accounts=account_configs.values(), init_error="") +@app.route("/login", methods=["GET", "POST"]) +def login(): + if session.get("authed"): + return redirect(url_for("index")) + error = "" + next_url = request.args.get("next", "/") + if request.method == "POST": + username = request.form.get("username", "").strip() + password = request.form.get("password", "").strip() + if username == APP_USER and password == APP_PASSWORD: + session["authed"] = True + return redirect(next_url or url_for("index")) + error = "用户名或密码错误" + return render_template("login.html", error=error, next_url=next_url) + + +@app.route("/logout", methods=["POST"]) +def logout(): + session.clear() + return redirect(url_for("login")) + + @app.route("/replace_ip", methods=["POST"]) +@login_required def replace_ip(): if init_error or db_error: return jsonify({"error": init_error or db_error}), 500 ip_to_replace = request.form.get("ip_to_replace", "").strip() - account_name = request.form.get("account_name", "").strip() if not ip_to_replace: return jsonify({"error": "请输入要替换的IP"}), 400 + + account_name = get_account_by_ip(ip_to_replace) + if not account_name: + return jsonify({"error": "数据库中未找到该IP对应的账户映射"}), 400 if account_name not in account_configs: - return jsonify({"error": "无效的账户选择"}), 400 + return jsonify({"error": f"账户 {account_name} 未在配置文件中定义"}), 400 disallowed = load_disallowed_ips() + fallback_spec = get_server_spec(ip_to_replace) account = account_configs[account_name] try: - result = replace_instance_ip(ip_to_replace, account, disallowed, retry_limit) - except AWSOperationError as exc: + result = replace_instance_ip( + ip_to_replace, account, disallowed, retry_limit, fallback_spec=fallback_spec + ) + spec_used = result.get("spec_used", {}) if isinstance(result, dict) else {} + # 记录当前 IP 的规格(输入 IP、数据库规格、或从 AWS 读到的规格) + upsert_server_spec( + ip_address=ip_to_replace, + account_name=account_name, + instance_type=spec_used.get("instance_type"), + instance_name=spec_used.get("instance_name"), + volume_type=spec_used.get("root_volume_type"), + security_group_names=spec_used.get("security_group_names", []), + security_group_ids=spec_used.get("security_group_ids", []), + region=spec_used.get("region"), + subnet_id=spec_used.get("subnet_id"), + availability_zone=spec_used.get("availability_zone"), + ) + # 新 IP 同步规格 + upsert_server_spec( + ip_address=result["new_ip"], + account_name=account_name, + instance_type=spec_used.get("instance_type"), + instance_name=spec_used.get("instance_name"), + volume_type=spec_used.get("root_volume_type"), + security_group_names=spec_used.get("security_group_names", []), + security_group_ids=spec_used.get("security_group_ids", []), + region=spec_used.get("region"), + subnet_id=spec_used.get("subnet_id"), + availability_zone=spec_used.get("availability_zone"), + ) + update_ip_account_mapping(ip_to_replace, result["new_ip"], account_name) + add_replacement_history( + ip_to_replace, + result["new_ip"], + account_name, + None, + terminated_network_out_mb=result.get("terminated_network_out_mb"), + ) + except (AWSOperationError, ValueError) as exc: return jsonify({"error": str(exc)}), 400 return jsonify(result), 200 +@app.route("/history", methods=["GET"]) +@login_required +def history(): + try: + records = get_replacement_history(limit=100) + except Exception as exc: # noqa: BLE001 + return jsonify({"error": f"读取历史失败: {exc}"}), 500 + return jsonify({"items": records}) + + +@app.route("/history/search", methods=["GET"]) +@login_required +def history_search(): + ip = request.args.get("ip", "").strip() or None + group_id = request.args.get("group", "").strip() or None + try: + records = get_history_by_ip_or_group(ip, group_id, limit=200) + except Exception as exc: # noqa: BLE001 + return jsonify({"error": f"读取历史失败: {exc}"}), 500 + return jsonify({"items": records}) + + +@app.route("/history_page", methods=["GET"]) +@login_required +def history_page(): + return render_template("history.html") + + +@app.route("/history/chains", methods=["GET"]) +@login_required +def history_chains(): + ip = request.args.get("ip", "").strip() or None + group_id = request.args.get("group", "").strip() or None + try: + records = get_history_chains(ip=ip, group_id=group_id, limit=500) + except Exception as exc: # noqa: BLE001 + return jsonify({"error": f"读取历史失败: {exc}"}), 500 + return jsonify({"items": records}) + + if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True) diff --git a/aws_service.py b/aws_service.py index 648d5f2..bfebc99 100644 --- a/aws_service.py +++ b/aws_service.py @@ -1,6 +1,7 @@ import os -from dataclasses import dataclass -from typing import Dict, List, Optional +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from typing import Dict, List, Optional, TypedDict import boto3 from botocore.exceptions import BotoCoreError, ClientError @@ -15,6 +16,19 @@ class AWSOperationError(Exception): pass +class InstanceSpec(TypedDict, total=False): + instance_type: Optional[str] + instance_name: Optional[str] + root_device: Optional[str] + root_size: Optional[int] + root_volume_type: Optional[str] + security_group_ids: List[str] + security_group_names: List[str] + subnet_id: Optional[str] + availability_zone: Optional[str] + region: Optional[str] + + @dataclass class AccountConfig: name: str @@ -22,9 +36,8 @@ class AccountConfig: access_key_id: str secret_access_key: str ami_id: str - instance_type: str - subnet_id: str - security_group_ids: List[str] + subnet_id: Optional[str] = None + security_group_ids: List[str] = field(default_factory=list) key_name: Optional[str] = None @@ -43,8 +56,7 @@ def load_account_configs(path: str) -> Dict[str, AccountConfig]: access_key_id=item["access_key_id"], secret_access_key=item["secret_access_key"], ami_id=item["ami_id"], - instance_type=item["instance_type"], - subnet_id=item["subnet_id"], + subnet_id=item.get("subnet_id"), security_group_ids=item.get("security_group_ids", []), key_name=item.get("key_name"), ) @@ -61,7 +73,16 @@ def ec2_client(account: AccountConfig): ) -def _find_instance_id_by_ip(client, ip: str) -> Optional[str]: +def cloudwatch_client(account: AccountConfig): + return boto3.client( + "cloudwatch", + region_name=account.region, + aws_access_key_id=account.access_key_id, + aws_secret_access_key=account.secret_access_key, + ) + + +def _get_instance_by_ip(client, ip: str) -> Optional[dict]: filters = [ {"Name": "instance-state-name", "Values": ["pending", "running", "stopping", "stopped"]}, ] @@ -73,7 +94,7 @@ def _find_instance_id_by_ip(client, ip: str) -> Optional[str]: for reservation in resp.get("Reservations", []): for instance in reservation.get("Instances", []): - return instance["InstanceId"] + return instance return None @@ -82,31 +103,133 @@ def _wait_for_state(client, instance_id: str, waiter_name: str) -> None: waiter.wait(InstanceIds=[instance_id]) -def _terminate_instance(client, instance_id: str) -> None: +def _get_root_volume_spec(client, instance: dict) -> tuple[Optional[str], Optional[int], Optional[str]]: + """Return (device_name, size_gb, volume_type) for root volume if available.""" + root_device_name = instance.get("RootDeviceName") + if not root_device_name: + return None, None, None + + for mapping in instance.get("BlockDeviceMappings", []): + if mapping.get("DeviceName") != root_device_name: + continue + ebs = mapping.get("Ebs") + if not ebs: + return root_device_name, None, None + volume_id = ebs.get("VolumeId") + if not volume_id: + return root_device_name, None, None + try: + vol_resp = client.describe_volumes(VolumeIds=[volume_id]) + volumes = vol_resp.get("Volumes", []) + if volumes: + volume = volumes[0] + return root_device_name, volume.get("Size"), volume.get("VolumeType") + except (ClientError, BotoCoreError) as exc: + raise AWSOperationError(f"Failed to read volume info for {volume_id}: {exc}") from exc + return root_device_name, None, None + + +def _extract_security_group_ids(instance: dict) -> List[str]: + groups = [] + for g in instance.get("SecurityGroups", []): + gid = g.get("GroupId") + if gid: + groups.append(gid) + return groups + + +def _extract_security_group_names(instance: dict) -> List[str]: + groups = [] + for g in instance.get("SecurityGroups", []): + name = g.get("GroupName") + if name: + groups.append(name) + return groups + + +def _extract_name_tag(instance: dict) -> Optional[str]: + for tag in instance.get("Tags", []) or []: + if tag.get("Key") == "Name": + return tag.get("Value") + return None + + +def _terminate_instance(client, instance_id: str, wait_for_completion: bool = True) -> None: try: client.terminate_instances(InstanceIds=[instance_id]) - _wait_for_state(client, instance_id, "instance_terminated") + if wait_for_completion: + _wait_for_state(client, instance_id, "instance_terminated") except (ClientError, BotoCoreError) as exc: raise AWSOperationError(f"Failed to terminate instance {instance_id}: {exc}") from exc -def _provision_instance(client, account: AccountConfig) -> str: - try: +def _build_block_device_mappings( + device_name: Optional[str], volume_size: Optional[int], volume_type: Optional[str] +) -> Optional[list]: + if not device_name: + return None + ebs = {"DeleteOnTermination": True} + if volume_type: + ebs["VolumeType"] = volume_type + if volume_size: + ebs["VolumeSize"] = volume_size + return [{"DeviceName": device_name, "Ebs": ebs}] + + +def _provision_instance( + client, + account: AccountConfig, + spec: InstanceSpec, +) -> str: + def _build_params(include_key: bool = True) -> dict: params = { "ImageId": account.ami_id, - "InstanceType": account.instance_type, + "InstanceType": spec.get("instance_type"), "MinCount": 1, "MaxCount": 1, - "SubnetId": account.subnet_id, - "SecurityGroupIds": account.security_group_ids, } - if account.key_name: + if spec.get("instance_name"): + params["TagSpecifications"] = [ + { + "ResourceType": "instance", + "Tags": [{"Key": "Name", "Value": spec["instance_name"]}], + } + ] + subnet_id = spec.get("subnet_id") + if subnet_id: + params["SubnetId"] = subnet_id + security_group_ids = spec.get("security_group_ids") + if security_group_ids: + params["SecurityGroupIds"] = security_group_ids + block_mapping = _build_block_device_mappings( + spec.get("root_device"), spec.get("root_size"), spec.get("root_volume_type") + ) + if block_mapping: + params["BlockDeviceMappings"] = block_mapping + if include_key and account.key_name: params["KeyName"] = account.key_name + return params + + def _run(params: dict) -> str: resp = client.run_instances(**params) instance_id = resp["Instances"][0]["InstanceId"] _wait_for_state(client, instance_id, "instance_running") return instance_id - except (ClientError, BotoCoreError) as exc: + + try: + return _run(_build_params()) + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code") if hasattr(exc, "response") else None + if code == "InvalidKeyPair.NotFound" and account.key_name: + # fallback: retry without key pair + try: + return _run(_build_params(include_key=False)) + except (ClientError, BotoCoreError) as exc2: + raise AWSOperationError( + f"Failed to create instance after removing missing key pair {account.key_name}: {exc2}" + ) from exc + raise AWSOperationError(f"Failed to create instance: {exc}") from exc + except BotoCoreError as exc: raise AWSOperationError(f"Failed to create instance: {exc}") from exc @@ -139,20 +262,86 @@ def _recycle_ip_until_free(client, instance_id: str, banned_ips: set[str], retry raise AWSOperationError("Reached retry limit while attempting to obtain a free IP") -def replace_instance_ip( - ip: str, account: AccountConfig, disallowed_ips: set[str], retry_limit: int = 5 -) -> Dict[str, str]: - client = ec2_client(account) - instance_id = _find_instance_id_by_ip(client, ip) - if not instance_id: - raise AWSOperationError(f"No instance found with IP {ip}") +def _get_network_out_mb(cw_client, instance_id: str, days: int = 30) -> float: + """Fetch total NetworkOut over the past window (MB).""" + end = datetime.now(timezone.utc) + start = end - timedelta(days=days) + try: + resp = cw_client.get_metric_statistics( + Namespace="AWS/EC2", + MetricName="NetworkOut", + Dimensions=[{"Name": "InstanceId", "Value": instance_id}], + StartTime=start, + EndTime=end, + Period=3600 * 6, # 6 小时粒度,覆盖 30 天 + Statistics=["Sum"], + ) + datapoints = resp.get("Datapoints", []) + if not datapoints: + return 0.0 + total_bytes = sum(dp.get("Sum", 0.0) for dp in datapoints) + return round(total_bytes / (1024 * 1024), 2) + except (ClientError, BotoCoreError) as exc: + raise AWSOperationError(f"Failed to fetch NetworkOut metrics: {exc}") from exc - _terminate_instance(client, instance_id) - new_instance_id = _provision_instance(client, account) + +def _build_spec_from_instance(client, instance: dict, account: AccountConfig) -> InstanceSpec: + instance_type = instance.get("InstanceType") + if not instance_type: + raise AWSOperationError("Failed to detect instance type from source instance") + root_device, root_size, root_volume_type = _get_root_volume_spec(client, instance) + return { + "instance_type": instance_type, + "instance_name": _extract_name_tag(instance), + "root_device": root_device, + "root_size": root_size, + "root_volume_type": root_volume_type, + "security_group_ids": _extract_security_group_ids(instance), + "security_group_names": _extract_security_group_names(instance), + "subnet_id": instance.get("SubnetId") or account.subnet_id, + "availability_zone": instance.get("Placement", {}).get("AvailabilityZone"), + "region": account.region, + } + + +def replace_instance_ip( + ip: str, + account: AccountConfig, + disallowed_ips: set[str], + retry_limit: int = 5, + fallback_spec: Optional[InstanceSpec] = None, +) -> Dict[str, object]: + client = ec2_client(account) + cw = cloudwatch_client(account) + instance = _get_instance_by_ip(client, ip) + + spec: Optional[InstanceSpec] = None + instance_id: Optional[str] = None + network_out_mb: Optional[float] = None + if instance: + instance_id = instance["InstanceId"] + spec = _build_spec_from_instance(client, instance, account) + try: + network_out_mb = _get_network_out_mb(cw, instance_id) + except AWSOperationError: + network_out_mb = None + elif fallback_spec: + spec = fallback_spec + + if not spec: + raise AWSOperationError(f"No instance found with IP {ip} 且数据库无该IP规格信息") + + new_instance_id = _provision_instance(client, account, spec) new_ip = _recycle_ip_until_free(client, new_instance_id, disallowed_ips, retry_limit) + if instance_id: + # 不阻塞新实例创建,终止旧实例但不等待完成 + _terminate_instance(client, instance_id, wait_for_completion=False) + return { "terminated_instance_id": instance_id, "new_instance_id": new_instance_id, "new_ip": new_ip, + "spec_used": spec, + "terminated_network_out_mb": network_out_mb, } diff --git a/config/accounts.yaml b/config/accounts.yaml index b97b9e4..11b4bc9 100644 --- a/config/accounts.yaml +++ b/config/accounts.yaml @@ -1,20 +1,17 @@ accounts: - name: 91-500f - region: us-east-1 + region: eu-west-2 access_key_id: AKIASAZ4PZBSBRYB7WFJ secret_access_key: 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 - ami_id: ami-xxxxxxxx - instance_type: t3.micro + ami_id: ami-0674ba52a729cde32 subnet_id: subnet-xxxxxxxx security_group_ids: - sg-xxxxxxxx - key_name: optional-keypair-name - - name: account-two - region: us-west-2 - access_key_id: YOUR_ACCESS_KEY_ID - secret_access_key: YOUR_SECRET_ACCESS_KEY - ami_id: ami-yyyyyyyy - instance_type: t3.micro + - name: 18f + region: us-east-1 + access_key_id: AKIAU6GD3AFMGKOMBAPS + secret_access_key: hOZNN7+mZDPAKQDd+ptSqN686Pv+57/Cu4JNubN4 + ami_id: ami-0c398cb65a93047f2 subnet_id: subnet-yyyyyyyy security_group_ids: - sg-yyyyyyyy diff --git a/db.py b/db.py index 89d8ae2..337f173 100644 --- a/db.py +++ b/db.py @@ -1,12 +1,13 @@ import os from contextlib import contextmanager -from typing import Iterable +from datetime import datetime, timedelta, timezone +from typing import Iterable, Optional, List, Dict -from sqlalchemy import Column, Integer, String, create_engine, select +from sqlalchemy import Column, DateTime, Integer, String, Float, create_engine, select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import declarative_base, sessionmaker -DATABASE_URL = os.getenv("DATABASE_URL", "mysql+pymysql://ec2_mt5:8FmzXj4xcz3AiH2R@163.123.183.106:3306/ip_ops") +DATABASE_URL = os.getenv("DATABASE_URL", "mysql+pymysql://username:password@localhost:3306/ip_ops") engine = create_engine(DATABASE_URL, pool_pre_ping=True) SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False) @@ -21,6 +22,54 @@ class IPOperation(Base): note = Column(String(255), nullable=True) +class IPAccountMapping(Base): + __tablename__ = "ip_account_mapping" + + id = Column(Integer, primary_key=True, autoincrement=True) + ip_address = Column(String(64), unique=True, nullable=False, index=True) + account_name = Column(String(128), nullable=False) + + +class ServerSpec(Base): + __tablename__ = "server_specs" + + id = Column(Integer, primary_key=True, autoincrement=True) + ip_address = Column(String(64), unique=True, nullable=False, index=True) + account_name = Column(String(128), nullable=False) + instance_type = Column(String(64), nullable=True) + instance_name = Column(String(255), nullable=True) + volume_type = Column(String(64), nullable=True) + security_group_names = Column(String(512), nullable=True) + security_group_ids = Column(String(512), nullable=True) + region = Column(String(64), nullable=True) + subnet_id = Column(String(128), nullable=True) + availability_zone = Column(String(64), nullable=True) + created_at = Column(DateTime(timezone=True), nullable=False) + + +class IPReplacementHistory(Base): + __tablename__ = "ip_replacement_history" + + id = Column(Integer, primary_key=True, autoincrement=True) + old_ip = Column(String(64), nullable=False, index=True) + new_ip = Column(String(64), nullable=False, index=True) + account_name = Column(String(128), nullable=False) + group_id = Column(String(128), nullable=True, index=True) + terminated_network_out_mb = Column(Float, nullable=True) + created_at = Column(DateTime(timezone=True), nullable=False) + + +def resolve_group_id(old_ip: str) -> str: + """Group id继承上一条 new_ip=old_ip 的记录,否则用 old_ip 作为新的组标识。""" + with db_session() as session: + prev = session.scalar( + select(IPReplacementHistory.group_id) + .where(IPReplacementHistory.new_ip == old_ip) + .order_by(IPReplacementHistory.id.desc()) + ) + return prev or old_ip + + def init_db() -> None: Base.metadata.create_all(bind=engine) @@ -42,3 +91,206 @@ def load_disallowed_ips() -> set[str]: with db_session() as session: rows: Iterable[IPOperation] = session.scalars(select(IPOperation.ip_address)) return {row for row in rows} + + +def get_account_by_ip(ip: str) -> Optional[str]: + with db_session() as session: + return session.scalar( + select(IPAccountMapping.account_name).where(IPAccountMapping.ip_address == ip) + ) + + +def update_ip_account_mapping(old_ip: str, new_ip: str, account_name: str) -> None: + with db_session() as session: + existing_mapping = session.scalar( + select(IPAccountMapping).where(IPAccountMapping.ip_address == old_ip) + ) + conflict_mapping = session.scalar( + select(IPAccountMapping).where(IPAccountMapping.ip_address == new_ip) + ) + if conflict_mapping and (not existing_mapping or conflict_mapping.id != existing_mapping.id): + raise ValueError(f"IP {new_ip} 已经映射到账户 {conflict_mapping.account_name}") + + if existing_mapping: + existing_mapping.ip_address = new_ip + existing_mapping.account_name = account_name + else: + session.add(IPAccountMapping(ip_address=new_ip, account_name=account_name)) + + +def _now_cn() -> datetime: + return datetime.now(timezone(timedelta(hours=8))) + + +def upsert_server_spec( + *, + ip_address: str, + account_name: str, + instance_type: Optional[str], + instance_name: Optional[str], + volume_type: Optional[str], + security_group_names: List[str], + security_group_ids: List[str], + region: Optional[str], + subnet_id: Optional[str], + availability_zone: Optional[str], + created_at: Optional[datetime] = None, +) -> None: + with db_session() as session: + spec = session.scalar(select(ServerSpec).where(ServerSpec.ip_address == ip_address)) + payload = { + "account_name": account_name, + "instance_type": instance_type, + "instance_name": instance_name, + "volume_type": volume_type, + "security_group_names": ",".join(security_group_names), + "security_group_ids": ",".join(security_group_ids), + "region": region, + "subnet_id": subnet_id, + "availability_zone": availability_zone, + "created_at": created_at or _now_cn(), + } + if spec: + for key, val in payload.items(): + setattr(spec, key, val) + else: + session.add(ServerSpec(ip_address=ip_address, **payload)) + + +def get_server_spec(ip_address: str) -> Optional[Dict[str, Optional[str]]]: + with db_session() as session: + spec = session.scalar(select(ServerSpec).where(ServerSpec.ip_address == ip_address)) + if not spec: + return None + return { + "ip_address": spec.ip_address, + "account_name": spec.account_name, + "instance_type": spec.instance_type, + "instance_name": spec.instance_name, + "volume_type": spec.volume_type, + "security_group_names": spec.security_group_names.split(",") if spec.security_group_names else [], + "security_group_ids": spec.security_group_ids.split(",") if spec.security_group_ids else [], + "region": spec.region, + "subnet_id": spec.subnet_id, + "availability_zone": spec.availability_zone, + "created_at": spec.created_at, + } + + +def add_replacement_history( + old_ip: str, + new_ip: str, + account_name: str, + group_id: Optional[str], + terminated_network_out_mb: Optional[float] = None, +) -> None: + resolved_group = group_id or resolve_group_id(old_ip) + with db_session() as session: + session.add( + IPReplacementHistory( + old_ip=old_ip, + new_ip=new_ip, + account_name=account_name, + group_id=resolved_group, + terminated_network_out_mb=terminated_network_out_mb, + created_at=_now_cn(), + ) + ) + + +def get_replacement_history(limit: int = 50) -> List[Dict[str, str]]: + with db_session() as session: + rows: Iterable[IPReplacementHistory] = session.scalars( + select(IPReplacementHistory).order_by(IPReplacementHistory.id.desc()).limit(limit) + ) + return [ + { + "old_ip": row.old_ip, + "new_ip": row.new_ip, + "account_name": row.account_name, + "group_id": row.group_id, + "terminated_network_out_mb": row.terminated_network_out_mb, + "created_at": row.created_at.isoformat(), + } + for row in rows + ] + + +def get_history_by_ip_or_group(ip: Optional[str], group_id: Optional[str], limit: int = 200) -> List[Dict[str, str]]: + with db_session() as session: + stmt = select(IPReplacementHistory).order_by(IPReplacementHistory.id.desc()).limit(limit) + if group_id: + stmt = stmt.where(IPReplacementHistory.group_id == group_id) + elif ip: + stmt = stmt.where( + (IPReplacementHistory.old_ip == ip) | (IPReplacementHistory.new_ip == ip) + ) + rows: Iterable[IPReplacementHistory] = session.scalars(stmt) + return [ + { + "old_ip": row.old_ip, + "new_ip": row.new_ip, + "account_name": row.account_name, + "group_id": row.group_id, + "terminated_network_out_mb": row.terminated_network_out_mb, + "created_at": row.created_at.isoformat(), + } + for row in rows + ] + + +def get_history_chains(ip: Optional[str] = None, group_id: Optional[str] = None, limit: int = 500) -> List[Dict[str, object]]: + """返回按 group_id 聚合的链路信息(按创建时间升序构建链)。""" + with db_session() as session: + stmt = select(IPReplacementHistory).order_by(IPReplacementHistory.created_at.asc()) + if group_id: + stmt = stmt.where(IPReplacementHistory.group_id == group_id) + elif ip: + stmt = stmt.where( + (IPReplacementHistory.old_ip == ip) | (IPReplacementHistory.new_ip == ip) + ) + stmt = stmt.limit(limit) + rows: Iterable[IPReplacementHistory] = session.scalars(stmt) + + groups: Dict[str, Dict[str, object]] = {} + for row in rows: + gid = row.group_id or row.old_ip + if gid not in groups: + groups[gid] = {"group_id": gid, "items": [], "chain": [], "first_ip_start": None} + entry = { + "old_ip": row.old_ip, + "new_ip": row.new_ip, + "account_name": row.account_name, + "terminated_network_out_mb": row.terminated_network_out_mb, + "created_at": row.created_at.isoformat(), + } + groups[gid]["items"].append(entry) + + # 构建链路 + for gid, data in groups.items(): + items = data["items"] + items.sort(key=lambda x: x["created_at"]) + chain: List[str] = [] + for it in items: + if not chain: + chain.append(it["old_ip"]) + if chain[-1] != it["old_ip"] and it["old_ip"] not in chain: + chain.append(it["old_ip"]) + if chain[-1] != it["new_ip"]: + chain.append(it["new_ip"]) + data["chain"] = chain + # 读取链首 IP 的创建时间(server_specs.created_at) + if chain: + first_ip = chain[0] + spec_time = session.scalar( + select(ServerSpec.created_at).where(ServerSpec.ip_address == first_ip) + ) + if spec_time: + data["first_ip_start"] = spec_time.isoformat() + + # 返回按最早时间排序的组 + ordered = sorted( + groups.values(), + key=lambda g: g["items"][0]["created_at"] if g["items"] else "", + ) + return ordered diff --git a/requirements.txt b/requirements.txt index cf74cc4..026962f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ boto3==1.34.14 PyMySQL==1.1.0 SQLAlchemy==2.0.25 python-dotenv==1.0.1 +PyYAML==6.0.3 diff --git a/templates/history.html b/templates/history.html new file mode 100644 index 0000000..4f865a2 --- /dev/null +++ b/templates/history.html @@ -0,0 +1,131 @@ + + + + + IP 替换历史 + + + + +
+

IP 替换历史

+

按 IP 或 group_id 查看完整链路(a→b→c)。group_id 默认继承上一跳,否则用旧 IP。

+
+
+ + +
+
+ + +
+
+ + +
+
+ + + + diff --git a/templates/index.html b/templates/index.html index 5249288..74677fd 100644 --- a/templates/index.html +++ b/templates/index.html @@ -49,7 +49,7 @@ margin-bottom: 6px; color: #cbd5e1; } - input, select, button { + input, button { width: 100%; padding: 12px 14px; border-radius: 10px; @@ -60,7 +60,7 @@ outline: none; transition: border-color 0.2s ease, transform 0.1s ease; } - input:focus, select:focus { + input:focus { border-color: var(--accent); transform: translateY(-1px); } @@ -94,6 +94,10 @@ } .muted { color: var(--muted); } .grid { display: grid; gap: 12px; } + .history { margin-top: 18px; } + .history-item { padding: 8px 10px; border-bottom: 1px solid rgba(255,255,255,0.06); } + .history-item:last-child { border-bottom: none; } + .history-head { display: flex; justify-content: space-between; align-items: center; } @media (max-width: 600px) { .shell { padding: 20px; } } @@ -102,7 +106,8 @@

AWS IP 替换

-

通过输入现有服务器 IP,自动销毁实例并用指定 AMI 创建新实例,确保新 IP 不在运维表。

+

输入当前服务器 IP,系统会根据数据库中的 IP-账户映射自动确定 AWS 账户并替换实例。

+

历史查看:IP 替换链路

{% if init_error %}
配置加载失败:{{ init_error }}
@@ -112,20 +117,13 @@
-
-
- - -
账户、AMI、实例类型等从 config/accounts.yaml 读取。
+
账户选择已隐藏,依赖数据库中的 IP-账户映射,请提前维护。
+