From 2fc5573dc1614c25bde9e1f8ecbc31d164be3bd0 Mon Sep 17 00:00:00 2001 From: npmrun <1549469775@qq.com> Date: Thu, 14 May 2026 14:51:49 +0800 Subject: [PATCH] feat: add Scheduler Engine with croner integration and hot-reload support Co-Authored-By: Claude Opus 4.7 --- server/scheduler/engine.ts | 271 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 271 insertions(+) create mode 100644 server/scheduler/engine.ts diff --git a/server/scheduler/engine.ts b/server/scheduler/engine.ts new file mode 100644 index 0000000..512719b --- /dev/null +++ b/server/scheduler/engine.ts @@ -0,0 +1,271 @@ +import { Cron } from "croner"; +import log4js from "logger"; +import { ExecutorPool } from "./executor-pool"; +import { getTaskHandler, hasTask } from "./registry"; +import { + listTasks, + getTaskById, + createExecutionLog, + updateExecutionLog, +} from "../service/scheduler"; + +const logger = log4js.getLogger("SCHEDULER"); + +interface CronJobEntry { + taskId: string; + cron: Cron; +} + +interface TaskRow { + id: string; + name: string; + cronExpression: string; + type: string; + functionName?: string | null; + functionPayload?: string | null; + httpMethod?: string | null; + httpUrl?: string | null; + httpHeaders?: string | null; + httpBody?: string | null; + catchUp: number; + enabled: number; + maxRetries: number; + retryDelaySeconds: number; + timeoutSeconds: number; +} + +let pool: ExecutorPool; + +export function getExecutorPool(): ExecutorPool { + return pool; +} + +const jobs = new Map(); + +function parsePayload(payload?: string | null): Record | undefined { + if (!payload) return undefined; + try { + return JSON.parse(payload); + } catch { + return undefined; + } +} + +async function executeFunctionTask(task: TaskRow): Promise { + if (!task.functionName) { + logger.error("[task:%s] function type but no functionName", task.id); + return; + } + + if (!hasTask(task.functionName)) { + logger.error("[task:%s] function '%s' not registered", task.id, task.functionName); + return; + } + + const handler = getTaskHandler(task.functionName)!; + const payload = parsePayload(task.functionPayload); + + const executeWithRetries = async (): Promise<{ success: boolean; message: string }> => { + let lastError: Error | undefined; + for (let attempt = 0; attempt <= task.maxRetries; attempt++) { + try { + if (attempt > 0) { + logger.info( + "[task:%s] retry %d/%d after %ds", + task.id, + attempt, + task.maxRetries, + task.retryDelaySeconds + ); + await new Promise((resolve) => { + const signal = AbortSignal.timeout(task.retryDelaySeconds * 1000); + signal.addEventListener("abort", resolve, { once: true }); + }); + } + return await handler(payload); + } catch (err) { + lastError = err instanceof Error ? err : new Error(String(err)); + logger.error("[task:%s] attempt %d failed: %s", task.id, attempt, lastError.message); + } + } + throw lastError ?? new Error("max retries exceeded"); + }; + + const logId = await createExecutionLog({ taskId: task.id, status: "running" }); + + try { + const timeoutSignal = AbortSignal.timeout(task.timeoutSeconds * 1000); + + const result = await new Promise<{ success: boolean; message: string }>( + (resolve, reject) => { + timeoutSignal.addEventListener("abort", () => { + reject(new Error(`Task timed out after ${task.timeoutSeconds}s`)); + }, { once: true }); + + executeWithRetries().then(resolve).catch(reject); + } + ); + + await updateExecutionLog(logId, { + status: "success", + resultSummary: result.message, + }); + logger.info("[task:%s] completed: %s", task.id, result.message); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + await updateExecutionLog(logId, { status: "failed", errorMessage: message }); + logger.error("[task:%s] failed: %s", task.id, message); + } +} + +async function executeHttpTask(task: TaskRow): Promise { + if (!task.httpUrl) { + logger.error("[task:%s] http type but no httpUrl", task.id); + return; + } + + const logId = await createExecutionLog({ taskId: task.id, status: "running" }); + + try { + const headers: Record = task.httpHeaders + ? JSON.parse(task.httpHeaders) + : {}; + + const timeoutSignal = AbortSignal.timeout(task.timeoutSeconds * 1000); + + const response = await fetch(task.httpUrl, { + method: (task.httpMethod ?? "GET").toUpperCase(), + headers: { "Content-Type": "application/json", ...headers }, + body: task.httpBody ?? undefined, + signal: timeoutSignal, + }); + + const summary = `HTTP ${response.status} ${response.statusText}`; + if (!response.ok) { + await updateExecutionLog(logId, { status: "failed", errorMessage: summary, resultSummary: summary }); + logger.error("[task:%s] %s", task.id, summary); + } else { + await updateExecutionLog(logId, { status: "success", resultSummary: summary }); + logger.info("[task:%s] %s", task.id, summary); + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + await updateExecutionLog(logId, { status: "failed", errorMessage: message }); + logger.error("[task:%s] HTTP request failed: %s", task.id, message); + } +} + +async function executeTask(task: TaskRow): Promise { + logger.info("[task:%s] executing '%s'", task.id, task.name); + if (task.type === "function") { + await executeFunctionTask(task); + } else if (task.type === "http") { + await executeHttpTask(task); + } else { + logger.error("[task:%s] unknown type: %s", task.id, task.type); + } +} + +function scheduleTask(task: TaskRow): void { + const cron = new Cron(task.cronExpression, async () => { + await pool.execute(() => executeTask(task)); + }); + jobs.set(task.id, { taskId: task.id, cron }); + logger.info( + "[task:%s] scheduled '%s' cron='%s' next=%s", + task.id, + task.name, + task.cronExpression, + cron.nextRun()?.toISOString() ?? "none" + ); +} + +function unscheduleTask(taskId: string): void { + const entry = jobs.get(taskId); + if (entry) { + entry.cron.stop(); + jobs.delete(taskId); + logger.info("[task:%s] unscheduled", taskId); + } +} + +// ---- Public API ---- + +export async function start(maxConcurrency: number, logRetentionDays: number): Promise { + pool = new ExecutorPool(maxConcurrency); + logger.info("Scheduler starting (max concurrency: %d)", maxConcurrency); + + // Mark stale running tasks and clean old logs + const { markStaleRunningTasks, cleanupOldLogs } = await import("../service/scheduler"); + await markStaleRunningTasks(); + await cleanupOldLogs(logRetentionDays); + + // Load all enabled tasks + const { list: allTasks } = await listTasks({ pageSize: 9999 }); + + for (const task of allTasks) { + if (!task.enabled) continue; + try { + scheduleTask(task as TaskRow); + + // Catch-up: if catchUp=1 and the task missed its last scheduled run, execute once + if (task.catchUp) { + try { + const checkCron = new Cron(task.cronExpression); + const previous = checkCron.previousRun()?.getTime(); + if (previous && previous > 0 && previous < Date.now()) { + logger.info( + "[task:%s] catch-up: missed run at %s, executing now", + task.id, + new Date(previous).toISOString() + ); + pool.execute(() => executeTask(task as TaskRow)); + } + } catch (err) { + logger.warn("[task:%s] catch-up check failed: %s", task.id, String(err)); + } + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + logger.error("[task:%s] failed to schedule: %s", task.id, message); + } + } + + logger.info("Scheduler started with %d active job(s)", jobs.size); +} + +export async function stop(): Promise { + for (const [id] of jobs) { + unscheduleTask(id); + } + logger.info("Scheduler stopped"); +} + +export function addTask(taskId: string): void { + unscheduleTask(taskId); + getTaskById(taskId).then((task) => { + if (task && task.enabled) { + scheduleTask(task as TaskRow); + } + }); +} + +export function removeTask(taskId: string): void { + unscheduleTask(taskId); +} + +export function reloadTask(taskId: string): void { + addTask(taskId); +} + +export async function triggerTask(taskId: string): Promise { + const task = await getTaskById(taskId); + if (!task) { + throw new Error(`Task ${taskId} not found`); + } + await pool.execute(() => executeTask(task as TaskRow)); +} + +export function getJobCount(): number { + return jobs.size; +}