2025-12-10 12:02:17 +08:00

88 lines
3.0 KiB
Python

from typing import List, Optional
from fastapi import HTTPException, status
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from backend.modules.jobs.models import Job, JobItem, JobItemAction, JobItemResourceType, JobItemStatus, JobStatus, JobType
from uuid import uuid4
async def list_jobs(
session: AsyncSession,
customer_id: int | None,
offset: int,
limit: int,
status_filter: Optional[JobStatus] = None,
job_type: Optional[JobType] = None,
) -> tuple[List[Job], int]:
query = select(Job).order_by(Job.created_at.desc())
if customer_id:
query = query.where(Job.created_for_customer == customer_id)
if status_filter:
query = query.where(Job.status == status_filter)
if job_type:
query = query.where(Job.job_type == job_type)
total = await session.scalar(select(func.count()).select_from(query.subquery()))
jobs = (await session.scalars(query.offset(offset).limit(limit))).all()
return jobs, total or 0
async def get_job_by_uuid(session: AsyncSession, job_uuid: str, customer_id: int | None) -> Job:
job = await session.scalar(select(Job).where(Job.job_uuid == job_uuid))
if not job:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
if customer_id and job.created_for_customer != customer_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden")
return job
async def list_job_items(session: AsyncSession, job_id: int) -> List[JobItem]:
return (
await session.scalars(
select(JobItem)
.where(JobItem.job_id == job_id)
.options(selectinload(JobItem.instance))
.order_by(JobItem.id.asc())
)
).all()
async def create_job_with_items(
session: AsyncSession,
job_type: JobType,
created_by_user_id: int | None,
created_for_customer: int | None,
payload: dict | None,
items: list[dict],
) -> Job:
job = Job(
job_uuid=payload.get("job_uuid") if payload and payload.get("job_uuid") else uuid4().hex,
job_type=job_type,
status=JobStatus.PENDING,
progress=0,
total_count=len(items),
created_by_user_id=created_by_user_id,
created_for_customer=created_for_customer,
payload=payload,
)
session.add(job)
await session.flush()
for item in items:
session.add(
JobItem(
job_id=job.id,
resource_type=item.get("resource_type", JobItemResourceType.INSTANCE),
resource_id=item.get("resource_id"),
account_id=item.get("account_id"),
region=item.get("region"),
instance_id=item.get("instance_id"),
action=item.get("action", JobItemAction.SYNC),
status=JobItemStatus.PENDING,
)
)
await session.commit()
await session.refresh(job)
return job