cloudPlat/backend/models.py
2025-07-10 10:02:51 +08:00

81 lines
3.3 KiB
Python

from database import db
from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
import json
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password_hash = db.Column(db.String(255), nullable=False)
real_name = db.Column(db.String(120), nullable=False)
email = db.Column(db.String(120))
role = db.Column(db.String(20), nullable=False, default='viewer')
permissions = db.Column(db.Text)
status = db.Column(db.String(20), default='active')
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def set_permissions(self, permissions_list):
self.permissions = json.dumps(permissions_list)
def get_permissions(self):
return json.loads(self.permissions) if self.permissions else []
class Server(db.Model):
__tablename__ = 'servers'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
ip = db.Column(db.String(15), nullable=False)
port = db.Column(db.Integer, default=22)
username = db.Column(db.String(50), default='root')
password = db.Column(db.String(255)) # SSH密码
status = db.Column(db.String(20), default='offline')
description = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Script(db.Model):
__tablename__ = 'scripts'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
type = db.Column(db.String(20), nullable=False)
content = db.Column(db.Text, nullable=False)
description = db.Column(db.Text)
created_by = db.Column(db.Integer, db.ForeignKey('users.id'))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class ExecuteHistory(db.Model):
__tablename__ = 'execute_history'
id = db.Column(db.Integer, primary_key=True)
script_id = db.Column(db.Integer, db.ForeignKey('scripts.id'))
script_name = db.Column(db.String(100))
server_count = db.Column(db.Integer)
success_count = db.Column(db.Integer, default=0)
fail_count = db.Column(db.Integer, default=0)
status = db.Column(db.String(20), default='running')
executed_by = db.Column(db.Integer, db.ForeignKey('users.id'))
execute_time = db.Column(db.DateTime, default=datetime.utcnow)
class ExecuteResult(db.Model):
__tablename__ = 'execute_results'
id = db.Column(db.Integer, primary_key=True)
history_id = db.Column(db.Integer, db.ForeignKey('execute_history.id'))
server_id = db.Column(db.Integer, db.ForeignKey('servers.id'))
server_name = db.Column(db.String(100))
status = db.Column(db.String(20))
output = db.Column(db.Text)
error = db.Column(db.Text)
duration = db.Column(db.Integer)
execute_time = db.Column(db.DateTime, default=datetime.utcnow)