198 lines
6.8 KiB
Python
198 lines
6.8 KiB
Python
"""
|
||
EIP客户端模块
|
||
|
||
提供与EIP(Edge IP)服务交互的客户端功能,包括认证、设备管理、网关配置等操作。
|
||
支持自动重试、token自动刷新等机制。
|
||
"""
|
||
import time
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
import httpx
|
||
from pydantic import BaseModel
|
||
from tenacity import retry, stop_after_attempt, wait_exponential
|
||
|
||
from .config import settings
|
||
|
||
|
||
class AuthResponse(BaseModel):
|
||
"""认证响应模型"""
|
||
token: str # 认证令牌
|
||
exp: Optional[int] = None # 过期时间(epoch秒)
|
||
|
||
|
||
class EipClient:
|
||
"""EIP客户端类,用于与EIP服务进行交互"""
|
||
|
||
def __init__(self, base_url: str, username: str, password: str) -> None:
|
||
"""
|
||
初始化EIP客户端
|
||
|
||
Args:
|
||
base_url: EIP服务的基础URL
|
||
username: 用户名
|
||
password: 密码
|
||
"""
|
||
self.base_url = base_url.rstrip("/") # 移除末尾的斜杠
|
||
self.username = username
|
||
self.password = password
|
||
self._token: Optional[str] = None # 当前认证令牌
|
||
self._token_exp: Optional[int] = None # 令牌过期时间
|
||
self._client = httpx.Client(timeout=15.0) # HTTP客户端,设置15秒超时
|
||
|
||
def _is_token_valid(self) -> bool:
|
||
"""
|
||
检查当前令牌是否有效
|
||
|
||
Returns:
|
||
bool: 令牌是否有效(提前60秒刷新)
|
||
"""
|
||
if not self._token or not self._token_exp:
|
||
return False
|
||
# 提前60秒刷新令牌,避免在请求时过期
|
||
return time.time() < (self._token_exp - 60)
|
||
|
||
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
|
||
def authenticate(self) -> AuthResponse:
|
||
"""
|
||
进行用户认证,获取访问令牌
|
||
|
||
支持多种令牌字段格式的兼容性处理,包括:
|
||
- token
|
||
- X-Token
|
||
- access_token
|
||
- 直接返回字符串token的情况
|
||
|
||
Returns:
|
||
AuthResponse: 包含令牌和过期时间的认证响应
|
||
|
||
Raises:
|
||
ValueError: 当响应中缺少令牌时
|
||
"""
|
||
url = f"{self.base_url}/client/auth"
|
||
resp = self._client.post(url, json={"username": self.username, "password": self.password})
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
data_token = data.get("data")
|
||
|
||
# 尝试从响应中提取过期时间;如果没有,设置1小时后过期
|
||
exp = data.get("exp") or int(time.time()) + 3600
|
||
|
||
# 尝试多种可能的令牌字段名
|
||
token = data.get("token") or data.get("X-Token") or data.get("access_token")
|
||
if not token:
|
||
# 兼容后端直接返回字符串token的情况
|
||
if isinstance(data_token, str) and data:
|
||
token = data_token
|
||
else:
|
||
raise ValueError("Auth response missing token")
|
||
|
||
# 保存令牌和过期时间
|
||
self._token = token
|
||
self._token_exp = exp
|
||
return AuthResponse(token=token, exp=exp)
|
||
|
||
def _get_headers(self) -> Dict[str, str]:
|
||
"""
|
||
获取包含认证令牌的请求头
|
||
|
||
如果令牌无效或即将过期,会自动重新认证
|
||
|
||
Returns:
|
||
Dict[str, str]: 包含认证令牌和Accept头的字典
|
||
"""
|
||
if not self._is_token_valid():
|
||
self.authenticate()
|
||
assert self._token
|
||
return {"X-Token": self._token, "Accept": "application/json"}
|
||
|
||
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
|
||
def list_cities(self) -> List[Dict[str, Any]]:
|
||
"""
|
||
获取可用城市列表
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 城市信息列表
|
||
"""
|
||
url = f"{self.base_url}/edge/city"
|
||
resp = self._client.get(url, headers=self._get_headers())
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
# 兼容不同的响应格式:直接返回列表或包含data字段的对象
|
||
return data if isinstance(data, list) else data.get("data", [])
|
||
|
||
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
|
||
def list_devices(self, geo: str, offset: int, num: int) -> List[Dict[str, Any]]:
|
||
"""
|
||
获取指定地理位置的设备列表
|
||
|
||
Args:
|
||
geo: 地理位置标识
|
||
offset: 偏移量(分页)
|
||
num: 返回数量
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 设备信息列表
|
||
"""
|
||
url = f"{self.base_url}/edge/device"
|
||
payload = {"geo": geo, "offset": offset, "num": num}
|
||
resp = self._client.post(url, headers=self._get_headers(), json=payload)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
# 兼容不同的响应格式:直接返回列表或包含data字段的对象
|
||
return data if isinstance(data, list) else data.get("data", [])
|
||
|
||
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
|
||
def gateway_config_get(self, macaddr: str) -> Dict[str, Any]:
|
||
"""
|
||
获取网关配置信息
|
||
|
||
Args:
|
||
macaddr: 网关MAC地址
|
||
|
||
Returns:
|
||
Dict[str, Any]: 网关配置信息
|
||
"""
|
||
url = f"{self.base_url}/gateway/config/get"
|
||
resp = self._client.post(url, headers=self._get_headers(), json={"macaddr": macaddr})
|
||
resp.raise_for_status()
|
||
return resp.json()
|
||
|
||
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
|
||
def gateway_config_set(self, macaddr: str, config: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""
|
||
设置网关配置信息
|
||
|
||
Args:
|
||
macaddr: 网关MAC地址
|
||
config: 配置信息字典
|
||
|
||
Returns:
|
||
Dict[str, Any]: 设置结果
|
||
"""
|
||
url = f"{self.base_url}/gateway/config/set"
|
||
payload = {"macaddr": macaddr, "config": config}
|
||
resp = self._client.post(url, headers=self._get_headers(), json=payload)
|
||
resp.raise_for_status()
|
||
return resp.json()
|
||
|
||
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
|
||
def gateway_status(self, macaddr: str) -> Dict[str, Any]:
|
||
"""
|
||
获取网关状态信息
|
||
|
||
Args:
|
||
macaddr: 网关MAC地址
|
||
|
||
Returns:
|
||
Dict[str, Any]: 网关状态信息
|
||
"""
|
||
url = f"{self.base_url}/gateway/status"
|
||
resp = self._client.post(url, headers=self._get_headers(), json={"macaddr": macaddr})
|
||
resp.raise_for_status()
|
||
return resp.json()
|
||
|
||
|
||
# 创建全局单例客户端实例
|
||
client_singleton = EipClient(settings.eip_base_url, settings.eip_username, settings.eip_password)
|
||
|