diff --git a/.env b/.env new file mode 100644 index 0000000..37730a4 --- /dev/null +++ b/.env @@ -0,0 +1,4 @@ +FLASK_ENV=development +SECRET_KEY=change-this-secret +DATABASE_URL=mysql+pymysql://api_schedule:22CKSd22NFekncDc@dify.pinnovatecloud.com:3306/api_schedule +APP_TIMEZONE=UTC diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..68f3d0b --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +app/__pycache__/*.pyc +*.pyc diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..04a1a23 --- /dev/null +++ b/app/__init__.py @@ -0,0 +1,108 @@ +import logging +import os +from flask import Flask, redirect, url_for +from flask_migrate import Migrate + +from app.config import DevelopmentConfig, ProductionConfig +from app.extensions import db, login_manager, scheduler +from app.services.scheduler import SchedulerService + + +def create_app() -> Flask: + """ + Application factory creating the Flask app, loading config, registering blueprints, + initializing extensions, and booting the scheduler. + """ + app = Flask(__name__) + + config_name = os.getenv("FLASK_ENV", "development").lower() + if config_name == "production": + app_config = ProductionConfig() + else: + app_config = DevelopmentConfig() + app.config.from_object(app_config) + + configure_logging(app) + register_extensions(app) + register_blueprints(app) + register_template_filters(app) + enable_scheduler = app.config.get("ENABLE_SCHEDULER", True) and os.getenv("FLASK_SKIP_SCHEDULER") != "1" + if enable_scheduler: + init_scheduler(app) + else: + app.logger.info("Scheduler not started (ENABLE_SCHEDULER=%s, FLASK_SKIP_SCHEDULER=%s)", + app.config.get("ENABLE_SCHEDULER", True), os.getenv("FLASK_SKIP_SCHEDULER")) + + @app.route("/") + def index(): + return redirect(url_for("apis.list_apis")) + + return app + + +def configure_logging(app: Flask) -> None: + log_level = logging.DEBUG if app.debug else logging.INFO + logging.basicConfig(level=log_level, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") + + +def register_extensions(app: Flask) -> None: + db.init_app(app) + login_manager.init_app(app) + Migrate(app, db) + scheduler.init_app(app) + + +def register_blueprints(app: Flask) -> None: + from app.views.auth import auth_bp + from app.views.apis import apis_bp + from app.views.logs import logs_bp + + app.register_blueprint(auth_bp) + app.register_blueprint(apis_bp) + app.register_blueprint(logs_bp) + + +def register_template_filters(app: Flask) -> None: + @app.template_filter("cron_human") + def cron_human(expr: str) -> str: + """ + 将常见的 5 字段 cron 表达式转换为简单中文描述,不能完全覆盖所有情况。 + """ + parts = expr.strip().split() + if len(parts) != 5: + return expr + minute, hour, day, month, dow = parts + + # 每 N 分钟 + if minute.startswith("*/") and hour == "*" and day == "*" and month == "*" and dow == "*": + return f"每 {minute[2:]} 分钟" + + # 整点或固定时间 + if minute.isdigit() and hour.isdigit() and day == "*" and month == "*" and dow in ("*", "?"): + return f"每天 {hour.zfill(2)}:{minute.zfill(2)}" + + # 每 N 小时的整点 + if minute == "0" and hour.startswith("*/") and day == "*" and month == "*" and dow in ("*", "?"): + return f"每 {hour[2:]} 小时整点" + + # 每月某日 + if minute.isdigit() and hour.isdigit() and day.isdigit() and month == "*" and dow in ("*", "?"): + return f"每月 {day} 日 {hour.zfill(2)}:{minute.zfill(2)}" + + # 每周某天 + weekday_map = {"0": "周日", "1": "周一", "2": "周二", "3": "周三", "4": "周四", "5": "周五", "6": "周六", "7": "周日"} + if minute.isdigit() and hour.isdigit() and day in ("*", "?") and month == "*" and dow not in ("*", "?"): + label = weekday_map.get(dow, f"周{dow}") + return f"每{label} {hour.zfill(2)}:{minute.zfill(2)}" + + return expr + + +def init_scheduler(app: Flask) -> None: + """ + Start APScheduler and load enabled jobs from database. + """ + scheduler.start() + with app.app_context(): + service = SchedulerService(scheduler) + service.load_enabled_jobs() diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..47f9e6c --- /dev/null +++ b/app/config.py @@ -0,0 +1,30 @@ +import os +from datetime import timedelta + + +class BaseConfig: + SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-key-change-me") + SQLALCHEMY_TRACK_MODIFICATIONS = False + SQLALCHEMY_DATABASE_URI = os.getenv( + "DATABASE_URL", + "mysql+pymysql://user:password@localhost:3306/api_scheduler", + ) + SQLALCHEMY_ENGINE_OPTIONS = { + # 防止长时间空闲导致连接断开 + "pool_pre_ping": True, + "pool_recycle": 300, + "pool_timeout": 30, + } + REMEMBER_COOKIE_DURATION = timedelta(days=7) + SCHEDULER_API_ENABLED = False + # 默认采用中国标准时间,可通过环境变量 APP_TIMEZONE 覆盖 + SCHEDULER_TIMEZONE = os.getenv("APP_TIMEZONE", "Asia/Shanghai") + ENABLE_SCHEDULER = True + + +class DevelopmentConfig(BaseConfig): + DEBUG = True + + +class ProductionConfig(BaseConfig): + DEBUG = False diff --git a/app/extensions.py b/app/extensions.py new file mode 100644 index 0000000..32f87be --- /dev/null +++ b/app/extensions.py @@ -0,0 +1,11 @@ +from flask_apscheduler import APScheduler +from flask_login import LoginManager +from flask_sqlalchemy import SQLAlchemy + +db = SQLAlchemy() +login_manager = LoginManager() +login_manager.login_view = "auth.login" +login_manager.login_message = "请先登录后再访问该页面" + +# 通过 Flask-APScheduler 集成 APScheduler,方便在应用上下文中管理后台任务 +scheduler = APScheduler() diff --git a/app/forms.py b/app/forms.py new file mode 100644 index 0000000..c5979c0 --- /dev/null +++ b/app/forms.py @@ -0,0 +1,65 @@ +from datetime import datetime +from typing import Optional + +from flask_wtf import FlaskForm +from wtforms import ( + BooleanField, + IntegerField, + PasswordField, + SelectField, + StringField, + SubmitField, + TextAreaField, +) +from wtforms.validators import DataRequired, Length, NumberRange, Optional as OptionalValidator + + +class LoginForm(FlaskForm): + username = StringField("用户名", validators=[DataRequired(), Length(max=64)]) + password = PasswordField("密码", validators=[DataRequired()]) + submit = SubmitField("登录") + + +class ApiConfigForm(FlaskForm): + name = StringField("名称", validators=[DataRequired(), Length(max=128)]) + description = TextAreaField("描述", validators=[OptionalValidator()]) + url = StringField("URL 地址", validators=[DataRequired(), Length(max=512)]) + http_method = SelectField( + "HTTP 方法", + choices=[("GET", "GET"), ("POST", "POST"), ("PUT", "PUT"), ("DELETE", "DELETE")], + validators=[DataRequired()], + ) + headers = TextAreaField("请求头(JSON)", validators=[OptionalValidator()]) + query_params = TextAreaField("查询参数(JSON)", validators=[OptionalValidator()]) + body = TextAreaField("请求体(JSON 或文本)", validators=[OptionalValidator()]) + schedule_type = SelectField( + "调度类型", + choices=[("cron", "cron"), ("interval", "interval"), ("daily", "daily")], + validators=[DataRequired()], + ) + schedule_expression = StringField("调度表达式", validators=[DataRequired(), Length(max=128)]) + timeout_seconds = IntegerField("超时时间(秒)", validators=[DataRequired(), NumberRange(min=1, max=600)]) + retry_times = IntegerField("重试次数", validators=[DataRequired(), NumberRange(min=0, max=10)]) + retry_interval_seconds = IntegerField( + "重试间隔(秒)", validators=[DataRequired(), NumberRange(min=1, max=300)] + ) + enabled = BooleanField("启用", default=True) + submit = SubmitField("保存") + + +class LogFilterForm(FlaskForm): + api_id = SelectField("API", coerce=int, validators=[OptionalValidator()]) + success = SelectField( + "成功状态", choices=[("", "全部"), ("1", "成功"), ("0", "失败")], validators=[OptionalValidator()] + ) + start_date = StringField("开始日期(YYYY-MM-DD)", validators=[OptionalValidator()]) + end_date = StringField("结束日期(YYYY-MM-DD)", validators=[OptionalValidator()]) + submit = SubmitField("筛选") + + def parse_date(self, value: str) -> Optional[datetime]: + if not value: + return None + try: + return datetime.strptime(value, "%Y-%m-%d") + except ValueError: + return None diff --git a/app/models.py b/app/models.py new file mode 100644 index 0000000..1c3bc9b --- /dev/null +++ b/app/models.py @@ -0,0 +1,82 @@ +from datetime import datetime +from typing import Any + +from flask_login import UserMixin +from werkzeug.security import check_password_hash, generate_password_hash + +from app.extensions import db, login_manager + + +class TimestampMixin: + created_at = db.Column(db.DateTime, default=datetime.utcnow) + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + +class User(UserMixin, TimestampMixin, db.Model): + __tablename__ = "users" + + id = db.Column(db.Integer, primary_key=True) + username = db.Column(db.String(64), unique=True, nullable=False) + password_hash = db.Column(db.String(255), nullable=False) + email = db.Column(db.String(128)) + role = db.Column(db.String(32)) + is_active = db.Column(db.Boolean, default=True) + + def set_password(self, password: str) -> None: + self.password_hash = generate_password_hash(password) + + def check_password(self, password: str) -> bool: + return check_password_hash(self.password_hash, password) + + @property + def is_authenticated(self) -> bool: # type: ignore[override] + return True + + def get_id(self) -> str: # type: ignore[override] + return str(self.id) + + +@login_manager.user_loader +def load_user(user_id: str) -> Any: + return User.query.get(int(user_id)) + + +class ApiConfig(TimestampMixin, db.Model): + __tablename__ = "api_configs" + + id = db.Column(db.Integer, primary_key=True) + name = db.Column(db.String(128), nullable=False) + description = db.Column(db.Text) + url = db.Column(db.String(512), nullable=False) + http_method = db.Column(db.String(10), nullable=False, default="GET") + headers = db.Column(db.Text) + query_params = db.Column(db.Text) + body = db.Column(db.Text) + enabled = db.Column(db.Boolean, default=True) + schedule_type = db.Column(db.String(32), nullable=False, default="cron") + schedule_expression = db.Column(db.String(128), nullable=False, default="0 10 * * *") + timeout_seconds = db.Column(db.Integer, default=30) + retry_times = db.Column(db.Integer, default=0) + retry_interval_seconds = db.Column(db.Integer, default=3) + created_by = db.Column(db.Integer, db.ForeignKey("users.id")) + + creator = db.relationship("User", backref="api_configs") + + def job_id(self) -> str: + return f"api_job_{self.id}" + + +class ApiCallLog(TimestampMixin, db.Model): + __tablename__ = "api_call_logs" + + id = db.Column(db.Integer, primary_key=True) + api_id = db.Column(db.Integer, db.ForeignKey("api_configs.id"), nullable=False) + request_time = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + response_time = db.Column(db.DateTime) + success = db.Column(db.Boolean, default=False) + http_status_code = db.Column(db.Integer) + error_message = db.Column(db.Text) + response_body = db.Column(db.Text) + duration_ms = db.Column(db.Integer) + + api = db.relationship("ApiConfig", backref="call_logs") diff --git a/app/services/api_executor.py b/app/services/api_executor.py new file mode 100644 index 0000000..9c62b5b --- /dev/null +++ b/app/services/api_executor.py @@ -0,0 +1,105 @@ +import json +import logging +import time +from datetime import datetime +from typing import Any, Dict, Optional + +import requests +from requests import Response + +from app.extensions import db +from flask import current_app + +from app.models import ApiCallLog, ApiConfig + +logger = logging.getLogger(__name__) + + +def _parse_json_field(raw: Optional[str]) -> Optional[Dict[str, Any]]: + if not raw: + return None + try: + return json.loads(raw) + except json.JSONDecodeError: + return None + + +def execute_api(api_config: ApiConfig) -> None: + """ + Execute an API call based on ApiConfig, apply retry logic, and persist ApiCallLog. + Only the final attempt is logged to avoid noise. + """ + request_time = datetime.utcnow() + headers = _parse_json_field(api_config.headers) or {} + params = _parse_json_field(api_config.query_params) or {} + data = api_config.body + try: + json_body = json.loads(api_config.body) if api_config.body else None + except json.JSONDecodeError: + json_body = None + + retries = max(api_config.retry_times, 0) + delay = max(api_config.retry_interval_seconds, 1) + timeout = max(api_config.timeout_seconds, 1) + attempt = 0 + last_response: Optional[Response] = None + last_error: Optional[str] = None + start_ts = time.time() + + while attempt <= retries: + try: + attempt += 1 + last_response = requests.request( + method=api_config.http_method, + url=api_config.url, + headers=headers, + params=params, + data=None if json_body is not None else data, + json=json_body, + timeout=timeout, + ) + if 200 <= last_response.status_code < 300: + last_error = None + break + last_error = f"Non-2xx status: {last_response.status_code}" + except requests.RequestException as exc: + last_response = None + last_error = str(exc) + if attempt <= retries: + time.sleep(delay) + + duration_ms = int((time.time() - start_ts) * 1000) + success = last_error is None + + log_entry = ApiCallLog( + api_id=api_config.id, + request_time=request_time, + response_time=datetime.utcnow(), + success=success, + http_status_code=last_response.status_code if last_response else None, + error_message=last_error, + response_body=(last_response.text[:2000] if last_response and last_response.text else None), + duration_ms=duration_ms, + ) + db.session.add(log_entry) + try: + db.session.commit() + except Exception: + logger.exception("Failed to persist ApiCallLog for api_id=%s", api_config.id) + db.session.rollback() + + if not success: + logger.warning("API call failed for api_id=%s error=%s", api_config.id, last_error) + + +def execute_api_by_id(api_id: int, app=None) -> None: + """ + Load ApiConfig by id and execute within app context (for scheduler threads). + """ + app_obj = app or current_app._get_current_object() + with app_obj.app_context(): + config = ApiConfig.query.get(api_id) + if not config: + logger.error("ApiConfig not found for id=%s", api_id) + return + execute_api(config) diff --git a/app/services/scheduler.py b/app/services/scheduler.py new file mode 100644 index 0000000..1b590d3 --- /dev/null +++ b/app/services/scheduler.py @@ -0,0 +1,94 @@ +import logging +from typing import Optional + +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger +from flask import current_app +from flask_apscheduler import APScheduler + +from app.models import ApiConfig +from flask import current_app + +from app.services.api_executor import execute_api_by_id + +logger = logging.getLogger(__name__) + + +class SchedulerService: + """ + Thin wrapper around APScheduler for adding/removing API jobs. + """ + + def __init__(self, scheduler: APScheduler) -> None: + self.scheduler = scheduler + + def load_enabled_jobs(self) -> None: + try: + enabled_configs = ApiConfig.query.filter_by(enabled=True).all() + except Exception: + logger.warning("Could not load jobs; database might not be initialized yet.") + return + for config in enabled_configs: + self.add_job_for_api(config) + logger.info("Scheduler loaded %s enabled jobs", len(enabled_configs)) + + def add_job_for_api(self, api_config: ApiConfig) -> None: + job_id = api_config.job_id() + trigger = self._build_trigger(api_config) + if not trigger: + logger.error("Unsupported schedule type for api_id=%s", api_config.id) + return + self.remove_job_for_api(api_config.id) + app_obj = current_app._get_current_object() + self.scheduler.add_job( + func=execute_api_by_id, + trigger=trigger, + args=[api_config.id, app_obj], + id=job_id, + replace_existing=True, + max_instances=1, + misfire_grace_time=current_app.config.get("SCHEDULER_MISFIRE_GRACE_TIME", 60), + ) + logger.info("Registered job %s for api_id=%s", job_id, api_config.id) + + def remove_job_for_api(self, api_id: int) -> None: + job_id = f"api_job_{api_id}" + try: + self.scheduler.remove_job(job_id) + logger.info("Removed job %s", job_id) + except Exception: + # 如果任务不存在则静默忽略 + pass + + def reschedule_job_for_api(self, api_config: ApiConfig) -> None: + self.add_job_for_api(api_config) + + def _build_trigger(self, api_config: ApiConfig): + schedule_type = api_config.schedule_type + expr = api_config.schedule_expression + if schedule_type == "cron": + try: + fields = expr.split() + if len(fields) == 5: + minute, hour, day, month, day_of_week = fields + return CronTrigger(minute=minute, hour=hour, day=day, month=month, day_of_week=day_of_week) + return CronTrigger.from_crontab(expr) + except Exception as exc: + logger.error("Invalid cron expression for api_id=%s error=%s", api_config.id, exc) + return None + if schedule_type == "interval": + try: + seconds = int(expr) + return IntervalTrigger(seconds=seconds) + except ValueError: + logger.error("Invalid interval seconds for api_id=%s", api_config.id) + return None + if schedule_type == "daily": + # 期待格式为 "HH:MM" + try: + hour, minute = expr.split(":") + return CronTrigger(hour=int(hour), minute=int(minute)) + except Exception as exc: + logger.error("Invalid daily expression for api_id=%s error=%s", api_config.id, exc) + return None + return None diff --git a/app/templates/apis/form.html b/app/templates/apis/form.html new file mode 100644 index 0000000..a353859 --- /dev/null +++ b/app/templates/apis/form.html @@ -0,0 +1,69 @@ +{% extends "base.html" %} +{% block title %}{{ '编辑' if api else '新建' }} API{% endblock %} +{% block content %} +
| 名称 | +URL | +调度 | +启用 | +操作 | +
|---|---|---|---|---|
| {{ api.name }} | +{{ api.url }} | +
+ {{ api.schedule_type }}: {{ api.schedule_expression }}
+ {% if api.schedule_type == 'cron' %}
+ {{ api.schedule_expression|cron_human }}
+ {% endif %}
+ |
+ + {% if api.enabled %} + 已启用 + {% else %} + 已停用 + {% endif %} + | ++ 编辑 + + + + | +
| 暂无 API 配置。 | ||||
请求时间: {{ log.request_time }}
+响应时间: {{ log.response_time }}
+是否成功: {{ log.success }}
+HTTP 状态码: {{ log.http_status_code or '-' }}
+耗时 (ms): {{ log.duration_ms or '-' }}
+错误信息: {{ log.error_message or '-' }}
+响应内容:
+{{ log.response_body or '-' }}
+ | API | +请求时间 | +状态 | +HTTP | +耗时(ms) | +错误 | ++ |
|---|---|---|---|---|---|---|
| {{ log.api.name }} | +{{ log.request_time }} | ++ {% if log.success %} + 成功 + {% else %} + 失败 + {% endif %} + | +{{ log.http_status_code or '-' }} | +{{ log.duration_ms or '-' }} | +{{ log.error_message or '' }} | +详情 | +
| 暂无日志。 | ||||||