import os from typing import Dict from functools import wraps from dotenv import load_dotenv from flask import Flask, jsonify, render_template, request, redirect, url_for, session from aws_service import ( AWSOperationError, AccountConfig, replace_instance_ip, ) from db import ( add_replacement_history, delete_aws_account, get_account_by_ip, get_replacement_history, get_history_by_ip_or_group, get_history_chains, get_server_spec, init_db, load_disallowed_ips, list_aws_accounts, list_account_mappings, update_ip_account_mapping, upsert_server_spec, upsert_account_mapping, delete_account_mapping, upsert_aws_account, ) load_dotenv() app = Flask(__name__) app.secret_key = os.getenv("SECRET_KEY", "please-change-me") APP_USER = os.getenv("APP_USER", "") APP_PASSWORD = os.getenv("APP_PASSWORD", "") ZHIYUN_PASS = os.getenv("ZHIYUN_PASS", "") def login_required(fn): @wraps(fn) def wrapper(*args, **kwargs): if not session.get("authed"): return redirect(url_for("login", next=request.path)) return fn(*args, **kwargs) return wrapper def zhiyun_required(fn): @wraps(fn) def wrapper(*args, **kwargs): if not session.get("zhiyun_authed"): return jsonify({"error": "未通过校验密码"}), 403 return fn(*args, **kwargs) return wrapper def admin_required(fn): @wraps(fn) def wrapper(*args, **kwargs): if not session.get("admin_authed"): return jsonify({"error": "未通过管理员校验"}), 403 return fn(*args, **kwargs) return wrapper def load_account_configs_from_db() -> tuple[Dict[str, AccountConfig], str]: try: accounts = list_aws_accounts() except Exception as exc: # noqa: BLE001 - surface DB errors to UI return {}, f"读取 AWS 账户配置失败: {exc}" if not accounts: return {}, "数据库中未找到任何 AWS 账户配置" configs: Dict[str, AccountConfig] = {} for item in accounts: configs[item["name"]] = AccountConfig( name=item["name"], region=item["region"], access_key_id=item["access_key_id"], secret_access_key=item["secret_access_key"], ami_id=item["ami_id"], subnet_id=item.get("subnet_id"), security_group_ids=item.get("security_group_ids", []), key_name=item.get("key_name"), ) return configs, "" retry_limit = int(os.getenv("IP_RETRY_LIMIT", "5")) try: init_db() db_error = "" except Exception as exc: # noqa: BLE001 - surface DB connection issues to UI db_error = f"数据库初始化失败: {exc}" @app.route("/", methods=["GET"]) @login_required def index(): if db_error: return render_template("index.html", accounts=[], init_error=db_error) accounts, config_error = load_account_configs_from_db() error_msg = db_error or config_error return render_template("index.html", accounts=accounts.values(), init_error=error_msg) @app.route("/login", methods=["GET", "POST"]) def login(): if session.get("authed"): return redirect(url_for("index")) error = "" next_url = request.args.get("next", "/") if request.method == "POST": username = request.form.get("username", "").strip() password = request.form.get("password", "").strip() if username == APP_USER and password == APP_PASSWORD: session["authed"] = True return redirect(next_url or url_for("index")) error = "用户名或密码错误" return render_template("login.html", error=error, next_url=next_url) @app.route("/logout", methods=["POST"]) def logout(): session.clear() return redirect(url_for("login")) @app.route("/mapping/auth", methods=["POST"]) @login_required def mapping_auth(): pwd = request.form.get("password", "") if pwd and pwd == ZHIYUN_PASS: session["zhiyun_authed"] = True session["admin_authed"] = True # 复用同一口令 return jsonify({"ok": True}) session.pop("zhiyun_authed", None) session.pop("admin_authed", None) return jsonify({"error": "密码错误"}), 403 @app.route("/mapping_page", methods=["GET"]) @login_required def mapping_page(): return render_template("mapping.html") @app.route("/mapping/list", methods=["GET"]) @login_required @zhiyun_required def mapping_list(): try: data = list_account_mappings() except Exception as exc: # noqa: BLE001 return jsonify({"error": f"读取失败: {exc}"}), 500 return jsonify({"items": data}) @app.route("/mapping/upsert", methods=["POST"]) @login_required @zhiyun_required def mapping_upsert(): ip = request.form.get("ip", "").strip() account = request.form.get("account", "").strip() if not ip or not account: return jsonify({"error": "IP 和账户名不能为空"}), 400 try: upsert_account_mapping(ip, account) except Exception as exc: # noqa: BLE001 return jsonify({"error": f"保存失败: {exc}"}), 500 return jsonify({"ok": True}) @app.route("/mapping/delete", methods=["POST"]) @login_required @zhiyun_required def mapping_delete(): ip = request.form.get("ip", "").strip() if not ip: return jsonify({"error": "IP 不能为空"}), 400 try: delete_account_mapping(ip) except Exception as exc: # noqa: BLE001 return jsonify({"error": f"删除失败: {exc}"}), 500 return jsonify({"ok": True}) @app.route("/replace_ip", methods=["POST"]) @login_required def replace_ip(): if db_error: return jsonify({"error": db_error}), 500 account_configs, config_error = load_account_configs_from_db() if config_error: return jsonify({"error": config_error}), 500 ip_to_replace = request.form.get("ip_to_replace", "").strip() if not ip_to_replace: return jsonify({"error": "请输入要替换的IP"}), 400 account_name = get_account_by_ip(ip_to_replace) if not account_name: return jsonify({"error": "数据库中未找到该IP对应的账户映射"}), 400 if account_name not in account_configs: return jsonify({"error": f"账户 {account_name} 未在数据库中配置"}), 400 disallowed = load_disallowed_ips() fallback_spec = get_server_spec(ip_to_replace) account = account_configs[account_name] try: result = replace_instance_ip( ip_to_replace, account, disallowed, retry_limit, fallback_spec=fallback_spec ) spec_used = result.get("spec_used", {}) if isinstance(result, dict) else {} # 记录当前 IP 的规格(输入 IP、数据库规格、或从 AWS 读到的规格) upsert_server_spec( ip_address=ip_to_replace, account_name=account_name, instance_type=spec_used.get("instance_type"), instance_name=spec_used.get("instance_name"), volume_type=spec_used.get("root_volume_type"), security_group_names=spec_used.get("security_group_names", []), security_group_ids=spec_used.get("security_group_ids", []), region=spec_used.get("region"), subnet_id=spec_used.get("subnet_id"), availability_zone=spec_used.get("availability_zone"), ) # 新 IP 同步规格 upsert_server_spec( ip_address=result["new_ip"], account_name=account_name, instance_type=spec_used.get("instance_type"), instance_name=spec_used.get("instance_name"), volume_type=spec_used.get("root_volume_type"), security_group_names=spec_used.get("security_group_names", []), security_group_ids=spec_used.get("security_group_ids", []), region=spec_used.get("region"), subnet_id=spec_used.get("subnet_id"), availability_zone=spec_used.get("availability_zone"), ) update_ip_account_mapping(ip_to_replace, result["new_ip"], account_name) add_replacement_history( ip_to_replace, result["new_ip"], account_name, None, terminated_network_out_mb=result.get("terminated_network_out_mb"), ) except (AWSOperationError, ValueError) as exc: return jsonify({"error": str(exc)}), 400 return jsonify(result), 200 @app.route("/history", methods=["GET"]) @login_required def history(): try: records = get_replacement_history(limit=100) except Exception as exc: # noqa: BLE001 return jsonify({"error": f"读取历史失败: {exc}"}), 500 return jsonify({"items": records}) @app.route("/history/search", methods=["GET"]) @login_required def history_search(): ip = request.args.get("ip", "").strip() or None group_id = request.args.get("group", "").strip() or None try: records = get_history_by_ip_or_group(ip, group_id, limit=200) except Exception as exc: # noqa: BLE001 return jsonify({"error": f"读取历史失败: {exc}"}), 500 return jsonify({"items": records}) @app.route("/history_page", methods=["GET"]) @login_required def history_page(): return render_template("history.html") @app.route("/history/chains", methods=["GET"]) @login_required def history_chains(): ip = request.args.get("ip", "").strip() or None group_id = request.args.get("group", "").strip() or None try: records = get_history_chains(ip=ip, group_id=group_id, limit=500) except Exception as exc: # noqa: BLE001 return jsonify({"error": f"读取历史失败: {exc}"}), 500 return jsonify({"items": records}) @app.route("/admin", methods=["GET"]) @login_required def admin_page(): return render_template("admin.html") @app.route("/admin/auth", methods=["POST"]) @login_required def admin_auth(): pwd = request.form.get("password", "") if pwd and pwd == ZHIYUN_PASS: session["admin_authed"] = True session["zhiyun_authed"] = True # 复用映射接口的校验 return jsonify({"ok": True}) session.pop("admin_authed", None) session.pop("zhiyun_authed", None) return jsonify({"error": "密码错误"}), 403 @app.route("/admin/aws/list", methods=["GET"]) @login_required @admin_required def admin_aws_list(): try: items = list_aws_accounts() except Exception as exc: # noqa: BLE001 return jsonify({"error": f"读取失败: {exc}"}), 500 # 避免前端泄露敏感字段,可自行删减;这里仍返回全部以便编辑 return jsonify({"items": items}) @app.route("/admin/aws/upsert", methods=["POST"]) @login_required @admin_required def admin_aws_upsert(): name = request.form.get("name", "").strip() region = request.form.get("region", "").strip() access_key_id = request.form.get("access_key_id", "").strip() secret_access_key = request.form.get("secret_access_key", "").strip() ami_id = request.form.get("ami_id", "").strip() subnet_id = request.form.get("subnet_id", "").strip() or None sg_raw = request.form.get("security_group_ids", "").strip() key_name = request.form.get("key_name", "").strip() or None if not (name and region and access_key_id and secret_access_key and ami_id): return jsonify({"error": "name/region/access_key_id/secret_access_key/ami_id 不能为空"}), 400 security_group_ids = [s.strip() for s in sg_raw.split(",") if s.strip()] if sg_raw else [] try: upsert_aws_account( name=name, region=region, access_key_id=access_key_id, secret_access_key=secret_access_key, ami_id=ami_id, subnet_id=subnet_id, security_group_ids=security_group_ids, key_name=key_name, ) except Exception as exc: # noqa: BLE001 return jsonify({"error": f"保存失败: {exc}"}), 500 return jsonify({"ok": True}) @app.route("/admin/aws/delete", methods=["POST"]) @login_required @admin_required def admin_aws_delete(): name = request.form.get("name", "").strip() if not name: return jsonify({"error": "name 不能为空"}), 400 try: delete_aws_account(name) except Exception as exc: # noqa: BLE001 return jsonify({"error": f"删除失败: {exc}"}), 500 return jsonify({"ok": True}) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)