You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

271 lines
7.9 KiB

import { Cron } from "croner";
import log4js from "logger";
import { ExecutorPool } from "./executor-pool";
import { getTaskHandler, hasTask } from "./registry";
import {
listTasks,
getTaskById,
createExecutionLog,
updateExecutionLog,
} from "../service/scheduler";
const logger = log4js.getLogger("SCHEDULER");
interface CronJobEntry {
taskId: string;
cron: Cron;
}
interface TaskRow {
id: string;
name: string;
cronExpression: string;
type: string;
functionName?: string | null;
functionPayload?: string | null;
httpMethod?: string | null;
httpUrl?: string | null;
httpHeaders?: string | null;
httpBody?: string | null;
catchUp: number;
enabled: number;
maxRetries: number;
retryDelaySeconds: number;
timeoutSeconds: number;
}
let pool: ExecutorPool;
export function getExecutorPool(): ExecutorPool {
return pool;
}
const jobs = new Map<string, CronJobEntry>();
function parsePayload(payload?: string | null): Record<string, unknown> | undefined {
if (!payload) return undefined;
try {
return JSON.parse(payload);
} catch {
return undefined;
}
}
async function executeFunctionTask(task: TaskRow): Promise<void> {
if (!task.functionName) {
logger.error("[task:%s] function type but no functionName", task.id);
return;
}
if (!hasTask(task.functionName)) {
logger.error("[task:%s] function '%s' not registered", task.id, task.functionName);
return;
}
const handler = getTaskHandler(task.functionName)!;
const payload = parsePayload(task.functionPayload);
const executeWithRetries = async (): Promise<{ success: boolean; message: string }> => {
let lastError: Error | undefined;
for (let attempt = 0; attempt <= task.maxRetries; attempt++) {
try {
if (attempt > 0) {
logger.info(
"[task:%s] retry %d/%d after %ds",
task.id,
attempt,
task.maxRetries,
task.retryDelaySeconds
);
await new Promise((resolve) => {
const signal = AbortSignal.timeout(task.retryDelaySeconds * 1000);
signal.addEventListener("abort", resolve, { once: true });
});
}
return await handler(payload);
} catch (err) {
lastError = err instanceof Error ? err : new Error(String(err));
logger.error("[task:%s] attempt %d failed: %s", task.id, attempt, lastError.message);
}
}
throw lastError ?? new Error("max retries exceeded");
};
const logId = await createExecutionLog({ taskId: task.id, status: "running" });
try {
const timeoutSignal = AbortSignal.timeout(task.timeoutSeconds * 1000);
const result = await new Promise<{ success: boolean; message: string }>(
(resolve, reject) => {
timeoutSignal.addEventListener("abort", () => {
reject(new Error(`Task timed out after ${task.timeoutSeconds}s`));
}, { once: true });
executeWithRetries().then(resolve).catch(reject);
}
);
await updateExecutionLog(logId, {
status: "success",
resultSummary: result.message,
});
logger.info("[task:%s] completed: %s", task.id, result.message);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
await updateExecutionLog(logId, { status: "failed", errorMessage: message });
logger.error("[task:%s] failed: %s", task.id, message);
}
}
async function executeHttpTask(task: TaskRow): Promise<void> {
if (!task.httpUrl) {
logger.error("[task:%s] http type but no httpUrl", task.id);
return;
}
const logId = await createExecutionLog({ taskId: task.id, status: "running" });
try {
const headers: Record<string, string> = task.httpHeaders
? JSON.parse(task.httpHeaders)
: {};
const timeoutSignal = AbortSignal.timeout(task.timeoutSeconds * 1000);
const response = await fetch(task.httpUrl, {
method: (task.httpMethod ?? "GET").toUpperCase(),
headers: { "Content-Type": "application/json", ...headers },
body: task.httpBody ?? undefined,
signal: timeoutSignal,
});
const summary = `HTTP ${response.status} ${response.statusText}`;
if (!response.ok) {
await updateExecutionLog(logId, { status: "failed", errorMessage: summary, resultSummary: summary });
logger.error("[task:%s] %s", task.id, summary);
} else {
await updateExecutionLog(logId, { status: "success", resultSummary: summary });
logger.info("[task:%s] %s", task.id, summary);
}
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
await updateExecutionLog(logId, { status: "failed", errorMessage: message });
logger.error("[task:%s] HTTP request failed: %s", task.id, message);
}
}
async function executeTask(task: TaskRow): Promise<void> {
logger.info("[task:%s] executing '%s'", task.id, task.name);
if (task.type === "function") {
await executeFunctionTask(task);
} else if (task.type === "http") {
await executeHttpTask(task);
} else {
logger.error("[task:%s] unknown type: %s", task.id, task.type);
}
}
function scheduleTask(task: TaskRow): void {
const cron = new Cron(task.cronExpression, async () => {
await pool.execute(() => executeTask(task));
});
jobs.set(task.id, { taskId: task.id, cron });
logger.info(
"[task:%s] scheduled '%s' cron='%s' next=%s",
task.id,
task.name,
task.cronExpression,
cron.nextRun()?.toISOString() ?? "none"
);
}
function unscheduleTask(taskId: string): void {
const entry = jobs.get(taskId);
if (entry) {
entry.cron.stop();
jobs.delete(taskId);
logger.info("[task:%s] unscheduled", taskId);
}
}
// ---- Public API ----
export async function start(maxConcurrency: number, logRetentionDays: number): Promise<void> {
pool = new ExecutorPool(maxConcurrency);
logger.info("Scheduler starting (max concurrency: %d)", maxConcurrency);
// Mark stale running tasks and clean old logs
const { markStaleRunningTasks, cleanupOldLogs } = await import("../service/scheduler");
await markStaleRunningTasks();
await cleanupOldLogs(logRetentionDays);
// Load all enabled tasks
const { list: allTasks } = await listTasks({ pageSize: 9999 });
for (const task of allTasks) {
if (!task.enabled) continue;
try {
scheduleTask(task as TaskRow);
// Catch-up: if catchUp=1 and the task missed its last scheduled run, execute once
if (task.catchUp) {
try {
const checkCron = new Cron(task.cronExpression);
const previous = checkCron.previousRun()?.getTime();
if (previous && previous > 0 && previous < Date.now()) {
logger.info(
"[task:%s] catch-up: missed run at %s, executing now",
task.id,
new Date(previous).toISOString()
);
pool.execute(() => executeTask(task as TaskRow));
}
} catch (err) {
logger.warn("[task:%s] catch-up check failed: %s", task.id, String(err));
}
}
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
logger.error("[task:%s] failed to schedule: %s", task.id, message);
}
}
logger.info("Scheduler started with %d active job(s)", jobs.size);
}
export async function stop(): Promise<void> {
for (const [id] of jobs) {
unscheduleTask(id);
}
logger.info("Scheduler stopped");
}
export function addTask(taskId: string): void {
unscheduleTask(taskId);
getTaskById(taskId).then((task) => {
if (task && task.enabled) {
scheduleTask(task as TaskRow);
}
});
}
export function removeTask(taskId: string): void {
unscheduleTask(taskId);
}
export function reloadTask(taskId: string): void {
addTask(taskId);
}
export async function triggerTask(taskId: string): Promise<void> {
const task = await getTaskById(taskId);
if (!task) {
throw new Error(`Task ${taskId} not found`);
}
await pool.execute(() => executeTask(task as TaskRow));
}
export function getJobCount(): number {
return jobs.size;
}