from typing import List, Optional from fastapi import HTTPException, status from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from backend.modules.jobs.models import Job, JobItem, JobItemAction, JobItemResourceType, JobItemStatus, JobStatus, JobType from uuid import uuid4 async def list_jobs( session: AsyncSession, customer_id: int | None, offset: int, limit: int, status_filter: Optional[JobStatus] = None, job_type: Optional[JobType] = None, ) -> tuple[List[Job], int]: query = select(Job).order_by(Job.created_at.desc()) if customer_id: query = query.where(Job.created_for_customer == customer_id) if status_filter: query = query.where(Job.status == status_filter) if job_type: query = query.where(Job.job_type == job_type) total = await session.scalar(select(func.count()).select_from(query.subquery())) jobs = (await session.scalars(query.offset(offset).limit(limit))).all() return jobs, total or 0 async def get_job_by_uuid(session: AsyncSession, job_uuid: str, customer_id: int | None) -> Job: job = await session.scalar(select(Job).where(Job.job_uuid == job_uuid)) if not job: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found") if customer_id and job.created_for_customer != customer_id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden") return job async def list_job_items(session: AsyncSession, job_id: int) -> List[JobItem]: return ( await session.scalars( select(JobItem) .where(JobItem.job_id == job_id) .options(selectinload(JobItem.instance)) .order_by(JobItem.id.asc()) ) ).all() async def create_job_with_items( session: AsyncSession, job_type: JobType, created_by_user_id: int | None, created_for_customer: int | None, payload: dict | None, items: list[dict], ) -> Job: job = Job( job_uuid=payload.get("job_uuid") if payload and payload.get("job_uuid") else uuid4().hex, job_type=job_type, status=JobStatus.PENDING, progress=0, total_count=len(items), created_by_user_id=created_by_user_id, created_for_customer=created_for_customer, payload=payload, ) session.add(job) await session.flush() for item in items: session.add( JobItem( job_id=job.id, resource_type=item.get("resource_type", JobItemResourceType.INSTANCE), resource_id=item.get("resource_id"), account_id=item.get("account_id"), region=item.get("region"), instance_id=item.get("instance_id"), action=item.get("action", JobItemAction.SYNC), status=JobItemStatus.PENDING, ) ) await session.commit() await session.refresh(job) return job