jdeip/app/eip_client.py
2025-10-21 18:41:07 +08:00

198 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
EIP客户端模块
提供与EIPEdge IP服务交互的客户端功能包括认证、设备管理、网关配置等操作。
支持自动重试、token自动刷新等机制。
"""
import time
from typing import Any, Dict, List, Optional
import httpx
from pydantic import BaseModel
from tenacity import retry, stop_after_attempt, wait_exponential
from .config import settings
class AuthResponse(BaseModel):
"""认证响应模型"""
token: str # 认证令牌
exp: Optional[int] = None # 过期时间epoch秒
class EipClient:
"""EIP客户端类用于与EIP服务进行交互"""
def __init__(self, base_url: str, username: str, password: str) -> None:
"""
初始化EIP客户端
Args:
base_url: EIP服务的基础URL
username: 用户名
password: 密码
"""
self.base_url = base_url.rstrip("/") # 移除末尾的斜杠
self.username = username
self.password = password
self._token: Optional[str] = None # 当前认证令牌
self._token_exp: Optional[int] = None # 令牌过期时间
self._client = httpx.Client(timeout=15.0) # HTTP客户端设置15秒超时
def _is_token_valid(self) -> bool:
"""
检查当前令牌是否有效
Returns:
bool: 令牌是否有效提前60秒刷新
"""
if not self._token or not self._token_exp:
return False
# 提前60秒刷新令牌避免在请求时过期
return time.time() < (self._token_exp - 60)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def authenticate(self) -> AuthResponse:
"""
进行用户认证,获取访问令牌
支持多种令牌字段格式的兼容性处理,包括:
- token
- X-Token
- access_token
- 直接返回字符串token的情况
Returns:
AuthResponse: 包含令牌和过期时间的认证响应
Raises:
ValueError: 当响应中缺少令牌时
"""
url = f"{self.base_url}/client/auth"
resp = self._client.post(url, json={"username": self.username, "password": self.password})
resp.raise_for_status()
data = resp.json()
data_token = data.get("data")
# 尝试从响应中提取过期时间如果没有设置1小时后过期
exp = data.get("exp") or int(time.time()) + 3600
# 尝试多种可能的令牌字段名
token = data.get("token") or data.get("X-Token") or data.get("access_token")
if not token:
# 兼容后端直接返回字符串token的情况
if isinstance(data_token, str) and data:
token = data_token
else:
raise ValueError("Auth response missing token")
# 保存令牌和过期时间
self._token = token
self._token_exp = exp
return AuthResponse(token=token, exp=exp)
def _get_headers(self) -> Dict[str, str]:
"""
获取包含认证令牌的请求头
如果令牌无效或即将过期,会自动重新认证
Returns:
Dict[str, str]: 包含认证令牌和Accept头的字典
"""
if not self._is_token_valid():
self.authenticate()
assert self._token
return {"X-Token": self._token, "Accept": "application/json"}
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def list_cities(self) -> List[Dict[str, Any]]:
"""
获取可用城市列表
Returns:
List[Dict[str, Any]]: 城市信息列表
"""
url = f"{self.base_url}/edge/city"
resp = self._client.get(url, headers=self._get_headers())
resp.raise_for_status()
data = resp.json()
# 兼容不同的响应格式直接返回列表或包含data字段的对象
return data if isinstance(data, list) else data.get("data", [])
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def list_devices(self, geo: str, offset: int, num: int) -> List[Dict[str, Any]]:
"""
获取指定地理位置的设备列表
Args:
geo: 地理位置标识
offset: 偏移量(分页)
num: 返回数量
Returns:
List[Dict[str, Any]]: 设备信息列表
"""
url = f"{self.base_url}/edge/device"
payload = {"geo": geo, "offset": offset, "num": num}
resp = self._client.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
data = resp.json()
# 兼容不同的响应格式直接返回列表或包含data字段的对象
return data if isinstance(data, list) else data.get("data", [])
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def gateway_config_get(self, macaddr: str) -> Dict[str, Any]:
"""
获取网关配置信息
Args:
macaddr: 网关MAC地址
Returns:
Dict[str, Any]: 网关配置信息
"""
url = f"{self.base_url}/gateway/config/get"
resp = self._client.post(url, headers=self._get_headers(), json={"macaddr": macaddr})
resp.raise_for_status()
return resp.json()
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def gateway_config_set(self, macaddr: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""
设置网关配置信息
Args:
macaddr: 网关MAC地址
config: 配置信息字典
Returns:
Dict[str, Any]: 设置结果
"""
url = f"{self.base_url}/gateway/config/set"
payload = {"macaddr": macaddr, "config": config}
resp = self._client.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
return resp.json()
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=2))
def gateway_status(self, macaddr: str) -> Dict[str, Any]:
"""
获取网关状态信息
Args:
macaddr: 网关MAC地址
Returns:
Dict[str, Any]: 网关状态信息
"""
url = f"{self.base_url}/gateway/status"
resp = self._client.post(url, headers=self._get_headers(), json={"macaddr": macaddr})
resp.raise_for_status()
return resp.json()
# 创建全局单例客户端实例
client_singleton = EipClient(settings.eip_base_url, settings.eip_username, settings.eip_password)