1 changed files with 271 additions and 0 deletions
@ -0,0 +1,271 @@ |
|||
import { Cron } from "croner"; |
|||
import log4js from "logger"; |
|||
import { ExecutorPool } from "./executor-pool"; |
|||
import { getTaskHandler, hasTask } from "./registry"; |
|||
import { |
|||
listTasks, |
|||
getTaskById, |
|||
createExecutionLog, |
|||
updateExecutionLog, |
|||
} from "../service/scheduler"; |
|||
|
|||
const logger = log4js.getLogger("SCHEDULER"); |
|||
|
|||
interface CronJobEntry { |
|||
taskId: string; |
|||
cron: Cron; |
|||
} |
|||
|
|||
interface TaskRow { |
|||
id: string; |
|||
name: string; |
|||
cronExpression: string; |
|||
type: string; |
|||
functionName?: string | null; |
|||
functionPayload?: string | null; |
|||
httpMethod?: string | null; |
|||
httpUrl?: string | null; |
|||
httpHeaders?: string | null; |
|||
httpBody?: string | null; |
|||
catchUp: number; |
|||
enabled: number; |
|||
maxRetries: number; |
|||
retryDelaySeconds: number; |
|||
timeoutSeconds: number; |
|||
} |
|||
|
|||
let pool: ExecutorPool; |
|||
|
|||
export function getExecutorPool(): ExecutorPool { |
|||
return pool; |
|||
} |
|||
|
|||
const jobs = new Map<string, CronJobEntry>(); |
|||
|
|||
function parsePayload(payload?: string | null): Record<string, unknown> | undefined { |
|||
if (!payload) return undefined; |
|||
try { |
|||
return JSON.parse(payload); |
|||
} catch { |
|||
return undefined; |
|||
} |
|||
} |
|||
|
|||
async function executeFunctionTask(task: TaskRow): Promise<void> { |
|||
if (!task.functionName) { |
|||
logger.error("[task:%s] function type but no functionName", task.id); |
|||
return; |
|||
} |
|||
|
|||
if (!hasTask(task.functionName)) { |
|||
logger.error("[task:%s] function '%s' not registered", task.id, task.functionName); |
|||
return; |
|||
} |
|||
|
|||
const handler = getTaskHandler(task.functionName)!; |
|||
const payload = parsePayload(task.functionPayload); |
|||
|
|||
const executeWithRetries = async (): Promise<{ success: boolean; message: string }> => { |
|||
let lastError: Error | undefined; |
|||
for (let attempt = 0; attempt <= task.maxRetries; attempt++) { |
|||
try { |
|||
if (attempt > 0) { |
|||
logger.info( |
|||
"[task:%s] retry %d/%d after %ds", |
|||
task.id, |
|||
attempt, |
|||
task.maxRetries, |
|||
task.retryDelaySeconds |
|||
); |
|||
await new Promise((resolve) => { |
|||
const signal = AbortSignal.timeout(task.retryDelaySeconds * 1000); |
|||
signal.addEventListener("abort", resolve, { once: true }); |
|||
}); |
|||
} |
|||
return await handler(payload); |
|||
} catch (err) { |
|||
lastError = err instanceof Error ? err : new Error(String(err)); |
|||
logger.error("[task:%s] attempt %d failed: %s", task.id, attempt, lastError.message); |
|||
} |
|||
} |
|||
throw lastError ?? new Error("max retries exceeded"); |
|||
}; |
|||
|
|||
const logId = await createExecutionLog({ taskId: task.id, status: "running" }); |
|||
|
|||
try { |
|||
const timeoutSignal = AbortSignal.timeout(task.timeoutSeconds * 1000); |
|||
|
|||
const result = await new Promise<{ success: boolean; message: string }>( |
|||
(resolve, reject) => { |
|||
timeoutSignal.addEventListener("abort", () => { |
|||
reject(new Error(`Task timed out after ${task.timeoutSeconds}s`)); |
|||
}, { once: true }); |
|||
|
|||
executeWithRetries().then(resolve).catch(reject); |
|||
} |
|||
); |
|||
|
|||
await updateExecutionLog(logId, { |
|||
status: "success", |
|||
resultSummary: result.message, |
|||
}); |
|||
logger.info("[task:%s] completed: %s", task.id, result.message); |
|||
} catch (err) { |
|||
const message = err instanceof Error ? err.message : String(err); |
|||
await updateExecutionLog(logId, { status: "failed", errorMessage: message }); |
|||
logger.error("[task:%s] failed: %s", task.id, message); |
|||
} |
|||
} |
|||
|
|||
async function executeHttpTask(task: TaskRow): Promise<void> { |
|||
if (!task.httpUrl) { |
|||
logger.error("[task:%s] http type but no httpUrl", task.id); |
|||
return; |
|||
} |
|||
|
|||
const logId = await createExecutionLog({ taskId: task.id, status: "running" }); |
|||
|
|||
try { |
|||
const headers: Record<string, string> = task.httpHeaders |
|||
? JSON.parse(task.httpHeaders) |
|||
: {}; |
|||
|
|||
const timeoutSignal = AbortSignal.timeout(task.timeoutSeconds * 1000); |
|||
|
|||
const response = await fetch(task.httpUrl, { |
|||
method: (task.httpMethod ?? "GET").toUpperCase(), |
|||
headers: { "Content-Type": "application/json", ...headers }, |
|||
body: task.httpBody ?? undefined, |
|||
signal: timeoutSignal, |
|||
}); |
|||
|
|||
const summary = `HTTP ${response.status} ${response.statusText}`; |
|||
if (!response.ok) { |
|||
await updateExecutionLog(logId, { status: "failed", errorMessage: summary, resultSummary: summary }); |
|||
logger.error("[task:%s] %s", task.id, summary); |
|||
} else { |
|||
await updateExecutionLog(logId, { status: "success", resultSummary: summary }); |
|||
logger.info("[task:%s] %s", task.id, summary); |
|||
} |
|||
} catch (err) { |
|||
const message = err instanceof Error ? err.message : String(err); |
|||
await updateExecutionLog(logId, { status: "failed", errorMessage: message }); |
|||
logger.error("[task:%s] HTTP request failed: %s", task.id, message); |
|||
} |
|||
} |
|||
|
|||
async function executeTask(task: TaskRow): Promise<void> { |
|||
logger.info("[task:%s] executing '%s'", task.id, task.name); |
|||
if (task.type === "function") { |
|||
await executeFunctionTask(task); |
|||
} else if (task.type === "http") { |
|||
await executeHttpTask(task); |
|||
} else { |
|||
logger.error("[task:%s] unknown type: %s", task.id, task.type); |
|||
} |
|||
} |
|||
|
|||
function scheduleTask(task: TaskRow): void { |
|||
const cron = new Cron(task.cronExpression, async () => { |
|||
await pool.execute(() => executeTask(task)); |
|||
}); |
|||
jobs.set(task.id, { taskId: task.id, cron }); |
|||
logger.info( |
|||
"[task:%s] scheduled '%s' cron='%s' next=%s", |
|||
task.id, |
|||
task.name, |
|||
task.cronExpression, |
|||
cron.nextRun()?.toISOString() ?? "none" |
|||
); |
|||
} |
|||
|
|||
function unscheduleTask(taskId: string): void { |
|||
const entry = jobs.get(taskId); |
|||
if (entry) { |
|||
entry.cron.stop(); |
|||
jobs.delete(taskId); |
|||
logger.info("[task:%s] unscheduled", taskId); |
|||
} |
|||
} |
|||
|
|||
// ---- Public API ----
|
|||
|
|||
export async function start(maxConcurrency: number, logRetentionDays: number): Promise<void> { |
|||
pool = new ExecutorPool(maxConcurrency); |
|||
logger.info("Scheduler starting (max concurrency: %d)", maxConcurrency); |
|||
|
|||
// Mark stale running tasks and clean old logs
|
|||
const { markStaleRunningTasks, cleanupOldLogs } = await import("../service/scheduler"); |
|||
await markStaleRunningTasks(); |
|||
await cleanupOldLogs(logRetentionDays); |
|||
|
|||
// Load all enabled tasks
|
|||
const { list: allTasks } = await listTasks({ pageSize: 9999 }); |
|||
|
|||
for (const task of allTasks) { |
|||
if (!task.enabled) continue; |
|||
try { |
|||
scheduleTask(task as TaskRow); |
|||
|
|||
// Catch-up: if catchUp=1 and the task missed its last scheduled run, execute once
|
|||
if (task.catchUp) { |
|||
try { |
|||
const checkCron = new Cron(task.cronExpression); |
|||
const previous = checkCron.previousRun()?.getTime(); |
|||
if (previous && previous > 0 && previous < Date.now()) { |
|||
logger.info( |
|||
"[task:%s] catch-up: missed run at %s, executing now", |
|||
task.id, |
|||
new Date(previous).toISOString() |
|||
); |
|||
pool.execute(() => executeTask(task as TaskRow)); |
|||
} |
|||
} catch (err) { |
|||
logger.warn("[task:%s] catch-up check failed: %s", task.id, String(err)); |
|||
} |
|||
} |
|||
} catch (err) { |
|||
const message = err instanceof Error ? err.message : String(err); |
|||
logger.error("[task:%s] failed to schedule: %s", task.id, message); |
|||
} |
|||
} |
|||
|
|||
logger.info("Scheduler started with %d active job(s)", jobs.size); |
|||
} |
|||
|
|||
export async function stop(): Promise<void> { |
|||
for (const [id] of jobs) { |
|||
unscheduleTask(id); |
|||
} |
|||
logger.info("Scheduler stopped"); |
|||
} |
|||
|
|||
export function addTask(taskId: string): void { |
|||
unscheduleTask(taskId); |
|||
getTaskById(taskId).then((task) => { |
|||
if (task && task.enabled) { |
|||
scheduleTask(task as TaskRow); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
export function removeTask(taskId: string): void { |
|||
unscheduleTask(taskId); |
|||
} |
|||
|
|||
export function reloadTask(taskId: string): void { |
|||
addTask(taskId); |
|||
} |
|||
|
|||
export async function triggerTask(taskId: string): Promise<void> { |
|||
const task = await getTaskById(taskId); |
|||
if (!task) { |
|||
throw new Error(`Task ${taskId} not found`); |
|||
} |
|||
await pool.execute(() => executeTask(task as TaskRow)); |
|||
} |
|||
|
|||
export function getJobCount(): number { |
|||
return jobs.size; |
|||
} |
|||
Loading…
Reference in new issue