You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
271 lines
7.9 KiB
271 lines
7.9 KiB
import { Cron } from "croner";
|
|
import log4js from "logger";
|
|
import { ExecutorPool } from "./executor-pool";
|
|
import { getTaskHandler, hasTask } from "./registry";
|
|
import {
|
|
listTasks,
|
|
getTaskById,
|
|
createExecutionLog,
|
|
updateExecutionLog,
|
|
} from "../service/scheduler";
|
|
|
|
const logger = log4js.getLogger("SCHEDULER");
|
|
|
|
interface CronJobEntry {
|
|
taskId: string;
|
|
cron: Cron;
|
|
}
|
|
|
|
interface TaskRow {
|
|
id: string;
|
|
name: string;
|
|
cronExpression: string;
|
|
type: string;
|
|
functionName?: string | null;
|
|
functionPayload?: string | null;
|
|
httpMethod?: string | null;
|
|
httpUrl?: string | null;
|
|
httpHeaders?: string | null;
|
|
httpBody?: string | null;
|
|
catchUp: number;
|
|
enabled: number;
|
|
maxRetries: number;
|
|
retryDelaySeconds: number;
|
|
timeoutSeconds: number;
|
|
}
|
|
|
|
let pool: ExecutorPool;
|
|
|
|
export function getExecutorPool(): ExecutorPool {
|
|
return pool;
|
|
}
|
|
|
|
const jobs = new Map<string, CronJobEntry>();
|
|
|
|
function parsePayload(payload?: string | null): Record<string, unknown> | undefined {
|
|
if (!payload) return undefined;
|
|
try {
|
|
return JSON.parse(payload);
|
|
} catch {
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
async function executeFunctionTask(task: TaskRow): Promise<void> {
|
|
if (!task.functionName) {
|
|
logger.error("[task:%s] function type but no functionName", task.id);
|
|
return;
|
|
}
|
|
|
|
if (!hasTask(task.functionName)) {
|
|
logger.error("[task:%s] function '%s' not registered", task.id, task.functionName);
|
|
return;
|
|
}
|
|
|
|
const handler = getTaskHandler(task.functionName)!;
|
|
const payload = parsePayload(task.functionPayload);
|
|
|
|
const executeWithRetries = async (): Promise<{ success: boolean; message: string }> => {
|
|
let lastError: Error | undefined;
|
|
for (let attempt = 0; attempt <= task.maxRetries; attempt++) {
|
|
try {
|
|
if (attempt > 0) {
|
|
logger.info(
|
|
"[task:%s] retry %d/%d after %ds",
|
|
task.id,
|
|
attempt,
|
|
task.maxRetries,
|
|
task.retryDelaySeconds
|
|
);
|
|
await new Promise((resolve) => {
|
|
const signal = AbortSignal.timeout(task.retryDelaySeconds * 1000);
|
|
signal.addEventListener("abort", resolve, { once: true });
|
|
});
|
|
}
|
|
return await handler(payload);
|
|
} catch (err) {
|
|
lastError = err instanceof Error ? err : new Error(String(err));
|
|
logger.error("[task:%s] attempt %d failed: %s", task.id, attempt, lastError.message);
|
|
}
|
|
}
|
|
throw lastError ?? new Error("max retries exceeded");
|
|
};
|
|
|
|
const logId = await createExecutionLog({ taskId: task.id, status: "running" });
|
|
|
|
try {
|
|
const timeoutSignal = AbortSignal.timeout(task.timeoutSeconds * 1000);
|
|
|
|
const result = await new Promise<{ success: boolean; message: string }>(
|
|
(resolve, reject) => {
|
|
timeoutSignal.addEventListener("abort", () => {
|
|
reject(new Error(`Task timed out after ${task.timeoutSeconds}s`));
|
|
}, { once: true });
|
|
|
|
executeWithRetries().then(resolve).catch(reject);
|
|
}
|
|
);
|
|
|
|
await updateExecutionLog(logId, {
|
|
status: "success",
|
|
resultSummary: result.message,
|
|
});
|
|
logger.info("[task:%s] completed: %s", task.id, result.message);
|
|
} catch (err) {
|
|
const message = err instanceof Error ? err.message : String(err);
|
|
await updateExecutionLog(logId, { status: "failed", errorMessage: message });
|
|
logger.error("[task:%s] failed: %s", task.id, message);
|
|
}
|
|
}
|
|
|
|
async function executeHttpTask(task: TaskRow): Promise<void> {
|
|
if (!task.httpUrl) {
|
|
logger.error("[task:%s] http type but no httpUrl", task.id);
|
|
return;
|
|
}
|
|
|
|
const logId = await createExecutionLog({ taskId: task.id, status: "running" });
|
|
|
|
try {
|
|
const headers: Record<string, string> = task.httpHeaders
|
|
? JSON.parse(task.httpHeaders)
|
|
: {};
|
|
|
|
const timeoutSignal = AbortSignal.timeout(task.timeoutSeconds * 1000);
|
|
|
|
const response = await fetch(task.httpUrl, {
|
|
method: (task.httpMethod ?? "GET").toUpperCase(),
|
|
headers: { "Content-Type": "application/json", ...headers },
|
|
body: task.httpBody ?? undefined,
|
|
signal: timeoutSignal,
|
|
});
|
|
|
|
const summary = `HTTP ${response.status} ${response.statusText}`;
|
|
if (!response.ok) {
|
|
await updateExecutionLog(logId, { status: "failed", errorMessage: summary, resultSummary: summary });
|
|
logger.error("[task:%s] %s", task.id, summary);
|
|
} else {
|
|
await updateExecutionLog(logId, { status: "success", resultSummary: summary });
|
|
logger.info("[task:%s] %s", task.id, summary);
|
|
}
|
|
} catch (err) {
|
|
const message = err instanceof Error ? err.message : String(err);
|
|
await updateExecutionLog(logId, { status: "failed", errorMessage: message });
|
|
logger.error("[task:%s] HTTP request failed: %s", task.id, message);
|
|
}
|
|
}
|
|
|
|
async function executeTask(task: TaskRow): Promise<void> {
|
|
logger.info("[task:%s] executing '%s'", task.id, task.name);
|
|
if (task.type === "function") {
|
|
await executeFunctionTask(task);
|
|
} else if (task.type === "http") {
|
|
await executeHttpTask(task);
|
|
} else {
|
|
logger.error("[task:%s] unknown type: %s", task.id, task.type);
|
|
}
|
|
}
|
|
|
|
function scheduleTask(task: TaskRow): void {
|
|
const cron = new Cron(task.cronExpression, async () => {
|
|
await pool.execute(() => executeTask(task));
|
|
});
|
|
jobs.set(task.id, { taskId: task.id, cron });
|
|
logger.info(
|
|
"[task:%s] scheduled '%s' cron='%s' next=%s",
|
|
task.id,
|
|
task.name,
|
|
task.cronExpression,
|
|
cron.nextRun()?.toISOString() ?? "none"
|
|
);
|
|
}
|
|
|
|
function unscheduleTask(taskId: string): void {
|
|
const entry = jobs.get(taskId);
|
|
if (entry) {
|
|
entry.cron.stop();
|
|
jobs.delete(taskId);
|
|
logger.info("[task:%s] unscheduled", taskId);
|
|
}
|
|
}
|
|
|
|
// ---- Public API ----
|
|
|
|
export async function start(maxConcurrency: number, logRetentionDays: number): Promise<void> {
|
|
pool = new ExecutorPool(maxConcurrency);
|
|
logger.info("Scheduler starting (max concurrency: %d)", maxConcurrency);
|
|
|
|
// Mark stale running tasks and clean old logs
|
|
const { markStaleRunningTasks, cleanupOldLogs } = await import("../service/scheduler");
|
|
await markStaleRunningTasks();
|
|
await cleanupOldLogs(logRetentionDays);
|
|
|
|
// Load all enabled tasks
|
|
const { list: allTasks } = await listTasks({ pageSize: 9999 });
|
|
|
|
for (const task of allTasks) {
|
|
if (!task.enabled) continue;
|
|
try {
|
|
scheduleTask(task as TaskRow);
|
|
|
|
// Catch-up: if catchUp=1 and the task missed its last scheduled run, execute once
|
|
if (task.catchUp) {
|
|
try {
|
|
const checkCron = new Cron(task.cronExpression);
|
|
const previous = checkCron.previousRun()?.getTime();
|
|
if (previous && previous > 0 && previous < Date.now()) {
|
|
logger.info(
|
|
"[task:%s] catch-up: missed run at %s, executing now",
|
|
task.id,
|
|
new Date(previous).toISOString()
|
|
);
|
|
pool.execute(() => executeTask(task as TaskRow));
|
|
}
|
|
} catch (err) {
|
|
logger.warn("[task:%s] catch-up check failed: %s", task.id, String(err));
|
|
}
|
|
}
|
|
} catch (err) {
|
|
const message = err instanceof Error ? err.message : String(err);
|
|
logger.error("[task:%s] failed to schedule: %s", task.id, message);
|
|
}
|
|
}
|
|
|
|
logger.info("Scheduler started with %d active job(s)", jobs.size);
|
|
}
|
|
|
|
export async function stop(): Promise<void> {
|
|
for (const [id] of jobs) {
|
|
unscheduleTask(id);
|
|
}
|
|
logger.info("Scheduler stopped");
|
|
}
|
|
|
|
export function addTask(taskId: string): void {
|
|
unscheduleTask(taskId);
|
|
getTaskById(taskId).then((task) => {
|
|
if (task && task.enabled) {
|
|
scheduleTask(task as TaskRow);
|
|
}
|
|
});
|
|
}
|
|
|
|
export function removeTask(taskId: string): void {
|
|
unscheduleTask(taskId);
|
|
}
|
|
|
|
export function reloadTask(taskId: string): void {
|
|
addTask(taskId);
|
|
}
|
|
|
|
export async function triggerTask(taskId: string): Promise<void> {
|
|
const task = await getTaskById(taskId);
|
|
if (!task) {
|
|
throw new Error(`Task ${taskId} not found`);
|
|
}
|
|
await pool.execute(() => executeTask(task as TaskRow));
|
|
}
|
|
|
|
export function getJobCount(): number {
|
|
return jobs.size;
|
|
}
|
|
|