完成or地址自动转换

This commit is contained in:
wangqifan 2025-08-10 11:31:55 +08:00
parent 81f7e205e1
commit eda6d4ad66
7 changed files with 241 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
__pycache__/*.pyc
chrome_user_data/*
modules/__pycache__/*.pyc
data/o-address.csv

91
browser_manager.py Normal file
View File

@ -0,0 +1,91 @@
# browser_manager.py
import subprocess
import time
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from config import CHROME_DEBUGGER_PORT, USER_DATA_DIR
def get_chrome_driver(use_debugger=False, url=None):
"""
获取 Chrome WebDriver 实例
:param use_debugger: 是否使用 debugger 模式连接到现有浏览器
:param url: 初始加载的 URL
:return: WebDriver 实例
"""
chrome_options = Options()
if use_debugger:
# 连接到已经启动的 Chrome 浏览器
debugger_address = f"127.0.0.1:{CHROME_DEBUGGER_PORT}"
chrome_options.add_experimental_option("debuggerAddress", debugger_address)
print(f"连接到现有的 Chrome 实例: {debugger_address}")
else:
# 启动一个新的 Chrome 浏览器
# 为了后续可以复用,也指定 user-data-dir
chrome_options.add_argument(f"--user-data-dir={USER_DATA_DIR}")
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument("--disable-infobars")
chrome_options.add_argument("--disable-extensions")
chrome_options.add_argument("--disable-popup-blocking")
print("启动一个新的 Chrome 实例。")
try:
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
if url and not use_debugger:
driver.get(url)
return driver
except Exception as e:
print(f"启动浏览器时出错: {e}")
if not use_debugger:
print("尝试以调试模式启动一个新的 Chrome 实例...")
# 如果正常启动失败,可以尝试手动启动一个调试模式的浏览器
start_chrome_in_debug_mode()
time.sleep(5) # 等待浏览器启动
return get_chrome_driver(use_debugger=True)
return None
def start_chrome_in_debug_mode():
"""
在后台启动一个带有远程调试端口的 Chrome 浏览器
"""
try:
command = (
f'start chrome.exe --remote-debugging-port={CHROME_DEBUGGER_PORT} '
f'--user-data-dir="{USER_DATA_DIR}"'
)
subprocess.Popen(command, shell=True)
print(f"已启动 Chrome 调试模式,端口: {CHROME_DEBUGGER_PORT}")
except FileNotFoundError:
print("错误: 'chrome.exe' 未找到。请确保 Chrome 已安装并已添加到系统路径中。")
except Exception as e:
print(f"启动调试模式的 Chrome 时出错: {e}")
if __name__ == '__main__':
# 这是一个简单的使用示例
# 示例1: 启动一个新的浏览器并访问百度
# print("--- 示例1: 启动新浏览器 ---")
# driver1 = get_chrome_driver(url="https://www.baidu.com")
# if driver1:
# time.sleep(5)
# print("页面标题:", driver1.title)
# driver1.quit()
# 示例2: 首先手动启动一个调试模式的浏览器,然后连接它
print("\n--- 示例2: 连接到调试模式的浏览器 ---")
print(f"请先手动执行以下命令启动Chrome: \nchrome.exe --remote-debugging-port={CHROME_DEBUGGER_PORT} --user-data-dir=\"{USER_DATA_DIR}\"")
input("按 Enter 键继续...")
driver2 = get_chrome_driver(use_debugger=True)
if driver2:
print("成功连接到浏览器!")
print("当前页面标题:", driver2.title)
# 注意:通过 debugger 连接时,不要用 quit() 关闭浏览器,否则会关闭整个浏览器
# driver2.close() # close() 只会关闭当前标签页
else:
print("连接浏览器失败。")

7
config.py Normal file
View File

@ -0,0 +1,7 @@
# config.py
# 用于 Chrome debugger 模式的端口
CHROME_DEBUGGER_PORT = 9222
# Chrome 用户数据目录
USER_DATA_DIR = "chrome_user_data"

View File

@ -0,0 +1,3 @@
address,main_address
0x01354beccc5053b36ff0167df51090d4d735a0b2,
0x01354beccc5053b36ff0167df51090d4d735a0b2,
1 address main_address
2 0x01354beccc5053b36ff0167df51090d4d735a0b2
3 0x01354beccc5053b36ff0167df51090d4d735a0b2

68
main.py Normal file
View File

@ -0,0 +1,68 @@
# main.py
import argparse
from browser_manager import get_chrome_driver
from modules.or_address_handler import process_addresses
# 模块映射
MODULES = {
"or-address": process_addresses,
# 在这里可以添加更多的模块
# "another-module": another_handler_function,
}
def main():
"""
主函数用于解析参数并执行相应的自动化任务
"""
# 1. 设置命令行参数解析
parser = argparse.ArgumentParser(description="Selenium 自动化脚本启动器")
parser.add_argument(
"--module",
type=str,
required=True,
choices=MODULES.keys(),
help="要执行的业务模块名称。"
)
parser.add_argument(
"--debug",
action="store_true",
help="使用此参数连接到已存在的 Chrome 调试实例。"
)
args = parser.parse_args()
# 2. 根据参数选择浏览器启动方式
use_debugger = args.debug
driver = get_chrome_driver(use_debugger=use_debugger)
if not driver:
print("无法获取 WebDriver程序退出。")
return
try:
# 3. 根据参数调用相应的业务模块
module_name = args.module
handler_function = MODULES.get(module_name)
if handler_function:
print(f"--- 开始执行模块: {module_name} ---")
handler_function(driver)
print(f"--- 模块: {module_name} 执行完毕 ---")
else:
# 这段代码理论上不会执行,因为 argparse 的 choices 已经做了限制
print(f"错误: 未知的模块 '{module_name}'")
except Exception as e:
print(f"执行自动化任务时发生严重错误: {e}")
finally:
# 4. 任务结束后的清理工作
print("所有任务执行完毕。")
# 如果不是连接的调试模式,则关闭浏览器
if not use_debugger:
print("关闭浏览器...")
driver.quit()
else:
print("任务完成,保持调试浏览器开启。")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,65 @@
# modules/or_address_handler.py
import pandas as pd
import os
import time
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
def process_addresses(driver):
"""
处理 o-address.csv 文件访问 Etherscan 并更新地址信息
:param driver: Selenium WebDriver 实例
"""
# 定义文件路径
csv_file_path = os.path.join('data', 'o-address.csv')
# 1. 读取 CSV 文件
if not os.path.exists(csv_file_path):
print(f"错误: 文件未找到 at {csv_file_path}")
return
df = pd.read_csv(csv_file_path)
print(f"成功读取 {len(df)} 条地址。")
# 2. 循环处理每个地址
for index, row in df.iterrows():
address = row['address']
# 检查 main_address 是否已经有值,如果有则跳过
# 使用 pd.isna() 检查 NaN并检查是否为空字符串
if 'main_address' in df.columns and pd.notna(row['main_address']) and row['main_address']:
print(f"地址 {address} 已有 main_address跳过。")
continue
# 构建 URL 并访问
url = f"https://etherscan.io/address/{address}"
print(f"正在访问: {url}")
driver.get(url)
try:
# 等待页面加载并找到 #mainaddress 元素
wait = WebDriverWait(driver, 20) # 最多等待20秒
main_address_element = wait.until(
EC.presence_of_element_located((By.ID, "mainaddress"))
)
# 获取并保存值
main_address_value = main_address_element.text
df.loc[index, 'main_address'] = main_address_value
print(f" -> 成功获取 main_address: {main_address_value}")
except Exception as e:
print(f" -> 无法为地址 {address} 找到 main_address。")
df.loc[index, 'main_address'] = "Not Found" # 标记为未找到
# 每次处理后都保存一次,防止中途中断
df.to_csv(csv_file_path, index=False)
# 短暂延时,避免请求过于频繁
time.sleep(2)
# 3. 所有处理完成后,再次保存最终结果
df.to_csv(csv_file_path, index=False)
print(f"\n处理完成!结果已保存到 {csv_file_path}")

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
selenium==4.34.2
webdriver-manager==4.0.2
pandas==2.3.1