74 lines
1.9 KiB
JavaScript
74 lines
1.9 KiB
JavaScript
import { Cron } from "croner";
|
|
import { createError } from "h3";
|
|
import { isTest } from "std-env";
|
|
import { scheduledTasks, tasks } from "#nitro-internal-virtual/tasks";
|
|
export function defineTask(def) {
|
|
if (typeof def.run !== "function") {
|
|
def.run = () => {
|
|
throw new TypeError("Task must implement a `run` method!");
|
|
};
|
|
}
|
|
return def;
|
|
}
|
|
const __runningTasks__ = {};
|
|
export async function runTask(name, {
|
|
payload = {},
|
|
context = {}
|
|
} = {}) {
|
|
if (__runningTasks__[name]) {
|
|
return __runningTasks__[name];
|
|
}
|
|
if (!(name in tasks)) {
|
|
throw createError({
|
|
message: `Task \`${name}\` is not available!`,
|
|
statusCode: 404
|
|
});
|
|
}
|
|
if (!tasks[name].resolve) {
|
|
throw createError({
|
|
message: `Task \`${name}\` is not implemented!`,
|
|
statusCode: 501
|
|
});
|
|
}
|
|
const handler = await tasks[name].resolve();
|
|
const taskEvent = { name, payload, context };
|
|
__runningTasks__[name] = handler.run(taskEvent);
|
|
try {
|
|
const res = await __runningTasks__[name];
|
|
return res;
|
|
} finally {
|
|
delete __runningTasks__[name];
|
|
}
|
|
}
|
|
export function startScheduleRunner() {
|
|
if (!scheduledTasks || scheduledTasks.length === 0 || isTest) {
|
|
return;
|
|
}
|
|
const payload = {
|
|
scheduledTime: Date.now()
|
|
};
|
|
for (const schedule of scheduledTasks) {
|
|
const cron = new Cron(schedule.cron, async () => {
|
|
await Promise.all(
|
|
schedule.tasks.map(
|
|
(name) => runTask(name, {
|
|
payload,
|
|
context: {}
|
|
}).catch((error) => {
|
|
console.error(
|
|
`Error while running scheduled task "${name}"`,
|
|
error
|
|
);
|
|
})
|
|
)
|
|
);
|
|
});
|
|
}
|
|
}
|
|
export function getCronTasks(cron) {
|
|
return (scheduledTasks || []).find((task) => task.cron === cron)?.tasks || [];
|
|
}
|
|
export function runCronTasks(cron, ctx) {
|
|
return Promise.all(getCronTasks(cron).map((name) => runTask(name, ctx)));
|
|
}
|