apiScheduler/app/services/api_executor.py
2025-11-28 17:39:54 +08:00

106 lines
3.2 KiB
Python

import json
import logging
import time
from datetime import datetime
from typing import Any, Dict, Optional
import requests
from requests import Response
from app.extensions import db
from flask import current_app
from app.models import ApiCallLog, ApiConfig
logger = logging.getLogger(__name__)
def _parse_json_field(raw: Optional[str]) -> Optional[Dict[str, Any]]:
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return None
def execute_api(api_config: ApiConfig) -> None:
"""
Execute an API call based on ApiConfig, apply retry logic, and persist ApiCallLog.
Only the final attempt is logged to avoid noise.
"""
request_time = datetime.utcnow()
headers = _parse_json_field(api_config.headers) or {}
params = _parse_json_field(api_config.query_params) or {}
data = api_config.body
try:
json_body = json.loads(api_config.body) if api_config.body else None
except json.JSONDecodeError:
json_body = None
retries = max(api_config.retry_times, 0)
delay = max(api_config.retry_interval_seconds, 1)
timeout = max(api_config.timeout_seconds, 1)
attempt = 0
last_response: Optional[Response] = None
last_error: Optional[str] = None
start_ts = time.time()
while attempt <= retries:
try:
attempt += 1
last_response = requests.request(
method=api_config.http_method,
url=api_config.url,
headers=headers,
params=params,
data=None if json_body is not None else data,
json=json_body,
timeout=timeout,
)
if 200 <= last_response.status_code < 300:
last_error = None
break
last_error = f"Non-2xx status: {last_response.status_code}"
except requests.RequestException as exc:
last_response = None
last_error = str(exc)
if attempt <= retries:
time.sleep(delay)
duration_ms = int((time.time() - start_ts) * 1000)
success = last_error is None
log_entry = ApiCallLog(
api_id=api_config.id,
request_time=request_time,
response_time=datetime.utcnow(),
success=success,
http_status_code=last_response.status_code if last_response else None,
error_message=last_error,
response_body=(last_response.text[:2000] if last_response and last_response.text else None),
duration_ms=duration_ms,
)
db.session.add(log_entry)
try:
db.session.commit()
except Exception:
logger.exception("Failed to persist ApiCallLog for api_id=%s", api_config.id)
db.session.rollback()
if not success:
logger.warning("API call failed for api_id=%s error=%s", api_config.id, last_error)
def execute_api_by_id(api_id: int, app=None) -> None:
"""
Load ApiConfig by id and execute within app context (for scheduler threads).
"""
app_obj = app or current_app._get_current_object()
with app_obj.app_context():
config = ApiConfig.query.get(api_id)
if not config:
logger.error("ApiConfig not found for id=%s", api_id)
return
execute_api(config)