apiScheduler/app/services/api_executor.py
2025-11-28 18:43:11 +08:00

109 lines
3.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
import time
from datetime import datetime
from typing import Any, Dict, Optional
import requests
from requests import Response
from app.extensions import db
from flask import current_app
from app.models import ApiCallLog, ApiConfig
logger = logging.getLogger(__name__)
def _parse_json_field(raw: Optional[str]) -> Optional[Dict[str, Any]]:
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return None
def execute_api(api_config: ApiConfig) -> None:
"""
根据配置调用 API带重试并将每次尝试都记录为日志保留历史
"""
request_time = datetime.utcnow()
headers = _parse_json_field(api_config.headers) or {}
params = _parse_json_field(api_config.query_params) or {}
data = api_config.body
try:
json_body = json.loads(api_config.body) if api_config.body else None
except json.JSONDecodeError:
json_body = None
retries = max(api_config.retry_times, 0)
delay = max(api_config.retry_interval_seconds, 1)
timeout = max(api_config.timeout_seconds, 1)
attempt = 0
total_attempts = retries + 1
last_error: Optional[str] = None
while attempt < total_attempts:
last_response: Optional[Response] = None
start_ts = time.time()
attempt += 1
try:
last_response = requests.request(
method=api_config.http_method,
url=api_config.url,
headers=headers,
params=params,
data=None if json_body is not None else data,
json=json_body,
timeout=timeout,
)
if 200 <= last_response.status_code < 300:
last_error = None
else:
body_snippet = (last_response.text or "")[:200]
last_error = f"Non-2xx status: {last_response.status_code}, body: {body_snippet}"
except requests.RequestException as exc:
last_error = str(exc)
duration_ms = int((time.time() - start_ts) * 1000)
success = last_error is None
log_entry = ApiCallLog(
api_id=api_config.id,
request_time=request_time,
response_time=datetime.utcnow(),
success=success,
http_status_code=last_response.status_code if last_response else None,
error_message=(f"[第{attempt}次尝试/{total_attempts}] {last_error}" if last_error else None),
response_body=(last_response.text[:2000] if last_response and last_response.text else None),
duration_ms=duration_ms,
)
db.session.add(log_entry)
try:
db.session.commit()
except Exception:
logger.exception("Failed to persist ApiCallLog for api_id=%s", api_config.id)
db.session.rollback()
if success:
break
if attempt < total_attempts:
time.sleep(delay)
if last_error:
logger.warning("API call failed for api_id=%s error=%s", api_config.id, last_error)
def execute_api_by_id(api_id: int, app=None) -> None:
"""
Load ApiConfig by id and execute within app context (for scheduler threads).
"""
app_obj = app or current_app._get_current_object()
with app_obj.app_context():
config = ApiConfig.query.get(api_id)
if not config:
logger.error("ApiConfig not found for id=%s", api_id)
return
execute_api(config)