import { Cron } from "croner"; import log4js from "logger"; import { ExecutorPool } from "./executor-pool"; import { getTaskHandler, hasTask } from "./registry"; import { listTasks, getTaskById, createExecutionLog, updateExecutionLog, } from "../service/scheduler"; const logger = log4js.getLogger("SCHEDULER"); interface CronJobEntry { taskId: string; cron: Cron; } interface TaskRow { id: string; name: string; cronExpression: string; type: string; functionName?: string | null; functionPayload?: string | null; httpMethod?: string | null; httpUrl?: string | null; httpHeaders?: string | null; httpBody?: string | null; catchUp: number; enabled: number; maxRetries: number; retryDelaySeconds: number; timeoutSeconds: number; } let pool: ExecutorPool; export function getExecutorPool(): ExecutorPool { return pool; } const jobs = new Map(); function parsePayload(payload?: string | null): Record | undefined { if (!payload) return undefined; try { return JSON.parse(payload); } catch { return undefined; } } async function executeFunctionTask(task: TaskRow): Promise { if (!task.functionName) { logger.error("[task:%s] function type but no functionName", task.id); return; } if (!hasTask(task.functionName)) { logger.error("[task:%s] function '%s' not registered", task.id, task.functionName); return; } const handler = getTaskHandler(task.functionName)!; const payload = parsePayload(task.functionPayload); const executeWithRetries = async (): Promise<{ success: boolean; message: string }> => { let lastError: Error | undefined; for (let attempt = 0; attempt <= task.maxRetries; attempt++) { try { if (attempt > 0) { logger.info( "[task:%s] retry %d/%d after %ds", task.id, attempt, task.maxRetries, task.retryDelaySeconds ); await new Promise((resolve) => { const signal = AbortSignal.timeout(task.retryDelaySeconds * 1000); signal.addEventListener("abort", resolve, { once: true }); }); } return await handler(payload); } catch (err) { lastError = err instanceof Error ? err : new Error(String(err)); logger.error("[task:%s] attempt %d failed: %s", task.id, attempt, lastError.message); } } throw lastError ?? new Error("max retries exceeded"); }; const logId = await createExecutionLog({ taskId: task.id, status: "running" }); try { const timeoutSignal = AbortSignal.timeout(task.timeoutSeconds * 1000); const result = await new Promise<{ success: boolean; message: string }>( (resolve, reject) => { timeoutSignal.addEventListener("abort", () => { reject(new Error(`Task timed out after ${task.timeoutSeconds}s`)); }, { once: true }); executeWithRetries().then(resolve).catch(reject); } ); await updateExecutionLog(logId, { status: "success", resultSummary: result.message, }); logger.info("[task:%s] completed: %s", task.id, result.message); } catch (err) { const message = err instanceof Error ? err.message : String(err); await updateExecutionLog(logId, { status: "failed", errorMessage: message }); logger.error("[task:%s] failed: %s", task.id, message); } } async function executeHttpTask(task: TaskRow): Promise { if (!task.httpUrl) { logger.error("[task:%s] http type but no httpUrl", task.id); return; } const logId = await createExecutionLog({ taskId: task.id, status: "running" }); try { const headers: Record = task.httpHeaders ? JSON.parse(task.httpHeaders) : {}; const timeoutSignal = AbortSignal.timeout(task.timeoutSeconds * 1000); const response = await fetch(task.httpUrl, { method: (task.httpMethod ?? "GET").toUpperCase(), headers: { "Content-Type": "application/json", ...headers }, body: task.httpBody ?? undefined, signal: timeoutSignal, }); const summary = `HTTP ${response.status} ${response.statusText}`; if (!response.ok) { await updateExecutionLog(logId, { status: "failed", errorMessage: summary, resultSummary: summary }); logger.error("[task:%s] %s", task.id, summary); } else { await updateExecutionLog(logId, { status: "success", resultSummary: summary }); logger.info("[task:%s] %s", task.id, summary); } } catch (err) { const message = err instanceof Error ? err.message : String(err); await updateExecutionLog(logId, { status: "failed", errorMessage: message }); logger.error("[task:%s] HTTP request failed: %s", task.id, message); } } async function executeTask(task: TaskRow): Promise { logger.info("[task:%s] executing '%s'", task.id, task.name); if (task.type === "function") { await executeFunctionTask(task); } else if (task.type === "http") { await executeHttpTask(task); } else { logger.error("[task:%s] unknown type: %s", task.id, task.type); } } function scheduleTask(task: TaskRow): void { const cron = new Cron(task.cronExpression, async () => { await pool.execute(() => executeTask(task)); }); jobs.set(task.id, { taskId: task.id, cron }); logger.info( "[task:%s] scheduled '%s' cron='%s' next=%s", task.id, task.name, task.cronExpression, cron.nextRun()?.toISOString() ?? "none" ); } function unscheduleTask(taskId: string): void { const entry = jobs.get(taskId); if (entry) { entry.cron.stop(); jobs.delete(taskId); logger.info("[task:%s] unscheduled", taskId); } } // ---- Public API ---- export async function start(maxConcurrency: number, logRetentionDays: number): Promise { pool = new ExecutorPool(maxConcurrency); logger.info("Scheduler starting (max concurrency: %d)", maxConcurrency); // Mark stale running tasks and clean old logs const { markStaleRunningTasks, cleanupOldLogs } = await import("../service/scheduler"); await markStaleRunningTasks(); await cleanupOldLogs(logRetentionDays); // Load all enabled tasks const { list: allTasks } = await listTasks({ pageSize: 9999 }); for (const task of allTasks) { if (!task.enabled) continue; try { scheduleTask(task as TaskRow); // Catch-up: if catchUp=1 and the task missed its last scheduled run, execute once if (task.catchUp) { try { const checkCron = new Cron(task.cronExpression); const previous = checkCron.previousRun()?.getTime(); if (previous && previous > 0 && previous < Date.now()) { logger.info( "[task:%s] catch-up: missed run at %s, executing now", task.id, new Date(previous).toISOString() ); pool.execute(() => executeTask(task as TaskRow)); } } catch (err) { logger.warn("[task:%s] catch-up check failed: %s", task.id, String(err)); } } } catch (err) { const message = err instanceof Error ? err.message : String(err); logger.error("[task:%s] failed to schedule: %s", task.id, message); } } logger.info("Scheduler started with %d active job(s)", jobs.size); } export async function stop(): Promise { for (const [id] of jobs) { unscheduleTask(id); } logger.info("Scheduler stopped"); } export function addTask(taskId: string): void { unscheduleTask(taskId); getTaskById(taskId).then((task) => { if (task && task.enabled) { scheduleTask(task as TaskRow); } }); } export function removeTask(taskId: string): void { unscheduleTask(taskId); } export function reloadTask(taskId: string): void { addTask(taskId); } export async function triggerTask(taskId: string): Promise { const task = await getTaskById(taskId); if (!task) { throw new Error(`Task ${taskId} not found`); } await pool.execute(() => executeTask(task as TaskRow)); } export function getJobCount(): number { return jobs.size; }