import os from typing import Dict from functools import wraps from dotenv import load_dotenv from flask import Flask, jsonify, render_template, request, redirect, url_for, session from aws_service import ( AWSOperationError, ConfigError, AccountConfig, load_account_configs, replace_instance_ip, ) from db import ( add_replacement_history, get_account_by_ip, get_replacement_history, get_history_by_ip_or_group, get_history_chains, get_server_spec, init_db, load_disallowed_ips, update_ip_account_mapping, upsert_server_spec, ) load_dotenv() app = Flask(__name__) app.secret_key = os.getenv("SECRET_KEY", "please-change-me") APP_USER = os.getenv("APP_USER", "") APP_PASSWORD = os.getenv("APP_PASSWORD", "") def login_required(fn): @wraps(fn) def wrapper(*args, **kwargs): if not session.get("authed"): return redirect(url_for("login", next=request.path)) return fn(*args, **kwargs) return wrapper def load_configs() -> Dict[str, AccountConfig]: config_path = os.getenv("AWS_CONFIG_PATH", "config/accounts.yaml") return load_account_configs(config_path) try: account_configs = load_configs() init_error = "" except ConfigError as exc: account_configs = {} init_error = str(exc) retry_limit = int(os.getenv("IP_RETRY_LIMIT", "5")) try: init_db() db_error = "" except Exception as exc: # noqa: BLE001 - surface DB connection issues to UI db_error = f"数据库初始化失败: {exc}" @app.route("/", methods=["GET"]) @login_required def index(): if init_error or db_error: return render_template("index.html", accounts=[], init_error=init_error or db_error) return render_template("index.html", accounts=account_configs.values(), init_error="") @app.route("/login", methods=["GET", "POST"]) def login(): if session.get("authed"): return redirect(url_for("index")) error = "" next_url = request.args.get("next", "/") if request.method == "POST": username = request.form.get("username", "").strip() password = request.form.get("password", "").strip() if username == APP_USER and password == APP_PASSWORD: session["authed"] = True return redirect(next_url or url_for("index")) error = "用户名或密码错误" return render_template("login.html", error=error, next_url=next_url) @app.route("/logout", methods=["POST"]) def logout(): session.clear() return redirect(url_for("login")) @app.route("/replace_ip", methods=["POST"]) @login_required def replace_ip(): if init_error or db_error: return jsonify({"error": init_error or db_error}), 500 ip_to_replace = request.form.get("ip_to_replace", "").strip() if not ip_to_replace: return jsonify({"error": "请输入要替换的IP"}), 400 account_name = get_account_by_ip(ip_to_replace) if not account_name: return jsonify({"error": "数据库中未找到该IP对应的账户映射"}), 400 if account_name not in account_configs: return jsonify({"error": f"账户 {account_name} 未在配置文件中定义"}), 400 disallowed = load_disallowed_ips() fallback_spec = get_server_spec(ip_to_replace) account = account_configs[account_name] try: result = replace_instance_ip( ip_to_replace, account, disallowed, retry_limit, fallback_spec=fallback_spec ) spec_used = result.get("spec_used", {}) if isinstance(result, dict) else {} # 记录当前 IP 的规格(输入 IP、数据库规格、或从 AWS 读到的规格) upsert_server_spec( ip_address=ip_to_replace, account_name=account_name, instance_type=spec_used.get("instance_type"), instance_name=spec_used.get("instance_name"), volume_type=spec_used.get("root_volume_type"), security_group_names=spec_used.get("security_group_names", []), security_group_ids=spec_used.get("security_group_ids", []), region=spec_used.get("region"), subnet_id=spec_used.get("subnet_id"), availability_zone=spec_used.get("availability_zone"), ) # 新 IP 同步规格 upsert_server_spec( ip_address=result["new_ip"], account_name=account_name, instance_type=spec_used.get("instance_type"), instance_name=spec_used.get("instance_name"), volume_type=spec_used.get("root_volume_type"), security_group_names=spec_used.get("security_group_names", []), security_group_ids=spec_used.get("security_group_ids", []), region=spec_used.get("region"), subnet_id=spec_used.get("subnet_id"), availability_zone=spec_used.get("availability_zone"), ) update_ip_account_mapping(ip_to_replace, result["new_ip"], account_name) add_replacement_history( ip_to_replace, result["new_ip"], account_name, None, terminated_network_out_mb=result.get("terminated_network_out_mb"), ) except (AWSOperationError, ValueError) as exc: return jsonify({"error": str(exc)}), 400 return jsonify(result), 200 @app.route("/history", methods=["GET"]) @login_required def history(): try: records = get_replacement_history(limit=100) except Exception as exc: # noqa: BLE001 return jsonify({"error": f"读取历史失败: {exc}"}), 500 return jsonify({"items": records}) @app.route("/history/search", methods=["GET"]) @login_required def history_search(): ip = request.args.get("ip", "").strip() or None group_id = request.args.get("group", "").strip() or None try: records = get_history_by_ip_or_group(ip, group_id, limit=200) except Exception as exc: # noqa: BLE001 return jsonify({"error": f"读取历史失败: {exc}"}), 500 return jsonify({"items": records}) @app.route("/history_page", methods=["GET"]) @login_required def history_page(): return render_template("history.html") @app.route("/history/chains", methods=["GET"]) @login_required def history_chains(): ip = request.args.get("ip", "").strip() or None group_id = request.args.get("group", "").strip() or None try: records = get_history_chains(ip=ip, group_id=group_id, limit=500) except Exception as exc: # noqa: BLE001 return jsonify({"error": f"读取历史失败: {exc}"}), 500 return jsonify({"items": records}) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)