From 9bbe84ae03c49149e105b31f96f2b5e97d221a66 Mon Sep 17 00:00:00 2001 From: npmrun <1549469775@qq.com> Date: Thu, 14 May 2026 14:41:33 +0800 Subject: [PATCH] feat: add scheduler service layer with task/execution CRUD --- server/service/scheduler/index.ts | 239 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 239 insertions(+) create mode 100644 server/service/scheduler/index.ts diff --git a/server/service/scheduler/index.ts b/server/service/scheduler/index.ts new file mode 100644 index 0000000..a871012 --- /dev/null +++ b/server/service/scheduler/index.ts @@ -0,0 +1,239 @@ +import { dbGlobal } from "drizzle-pkg/lib/db"; +import { scheduledTasks, taskExecutionLogs } from "drizzle-pkg/lib/schema/scheduler"; +import { eq, desc, and, sql } from "drizzle-orm"; +import log4js from "logger"; + +const logger = log4js.getLogger("SCHEDULER"); + +export interface CreateTaskInput { + name: string; + cronExpression: string; + type: "function" | "http"; + functionName?: string; + functionPayload?: string; + httpMethod?: string; + httpUrl?: string; + httpHeaders?: string; + httpBody?: string; + catchUp?: number; + enabled?: number; + maxRetries?: number; + retryDelaySeconds?: number; + timeoutSeconds?: number; +} + +export interface UpdateTaskInput extends Partial {} + +function uuid() { + return crypto.randomUUID(); +} + +function now() { + return new Date(); +} + +// ---- Task CRUD ---- + +export async function listTasks(opts: { + page?: number; + pageSize?: number; + type?: string; + enabled?: number; +}) { + const page = opts.page ?? 1; + const pageSize = opts.pageSize ?? 20; + const conditions = []; + + if (opts.type) conditions.push(eq(scheduledTasks.type, opts.type)); + if (opts.enabled !== undefined) conditions.push(eq(scheduledTasks.enabled, opts.enabled)); + + const where = conditions.length > 0 ? and(...conditions) : undefined; + + const [rows, countResult] = await Promise.all([ + dbGlobal + .select() + .from(scheduledTasks) + .where(where) + .orderBy(desc(scheduledTasks.createdAt)) + .limit(pageSize) + .offset((page - 1) * pageSize), + dbGlobal + .select({ count: sql`count(*)` }) + .from(scheduledTasks) + .where(where), + ]); + + return { + list: rows, + total: countResult[0]?.count ?? 0, + page, + pageSize, + }; +} + +export async function getTaskById(id: string) { + const rows = await dbGlobal + .select() + .from(scheduledTasks) + .where(eq(scheduledTasks.id, id)) + .limit(1); + return rows[0] ?? null; +} + +export async function createTask(input: CreateTaskInput) { + const id = uuid(); + const ts = now(); + await dbGlobal.insert(scheduledTasks).values({ + id, + ...input, + createdAt: ts, + updatedAt: ts, + }); + return getTaskById(id); +} + +export async function updateTask(id: string, input: UpdateTaskInput) { + await dbGlobal + .update(scheduledTasks) + .set({ ...input, updatedAt: now() }) + .where(eq(scheduledTasks.id, id)); + return getTaskById(id); +} + +export async function deleteTask(id: string) { + await dbGlobal.delete(scheduledTasks).where(eq(scheduledTasks.id, id)); +} + +export async function toggleTask(id: string, enabled: boolean) { + await dbGlobal + .update(scheduledTasks) + .set({ enabled: enabled ? 1 : 0, updatedAt: now() }) + .where(eq(scheduledTasks.id, id)); + return getTaskById(id); +} + +// ---- Execution logs ---- + +export async function listExecutions(opts: { + taskId?: string; + status?: string; + page?: number; + pageSize?: number; +}) { + const page = opts.page ?? 1; + const pageSize = opts.pageSize ?? 20; + const conditions = []; + + if (opts.taskId) conditions.push(eq(taskExecutionLogs.taskId, opts.taskId)); + if (opts.status) conditions.push(eq(taskExecutionLogs.status, opts.status)); + + const where = conditions.length > 0 ? and(...conditions) : undefined; + + const [rows, countResult] = await Promise.all([ + dbGlobal + .select() + .from(taskExecutionLogs) + .where(where) + .orderBy(desc(taskExecutionLogs.startedAt)) + .limit(pageSize) + .offset((page - 1) * pageSize), + dbGlobal + .select({ count: sql`count(*)` }) + .from(taskExecutionLogs) + .where(where), + ]); + + return { + list: rows, + total: countResult[0]?.count ?? 0, + page, + pageSize, + }; +} + +export async function getRecentExecutions(taskId: string, limit = 10) { + return dbGlobal + .select() + .from(taskExecutionLogs) + .where(eq(taskExecutionLogs.taskId, taskId)) + .orderBy(desc(taskExecutionLogs.startedAt)) + .limit(limit); +} + +export async function createExecutionLog(input: { + taskId: string; + status: "running" | "success" | "failed"; + errorMessage?: string; + resultSummary?: string; +}) { + const id = uuid(); + await dbGlobal.insert(taskExecutionLogs).values({ + id, + ...input, + startedAt: now(), + }); + return id; +} + +export async function updateExecutionLog( + id: string, + input: { status: "success" | "failed"; errorMessage?: string; resultSummary?: string } +) { + await dbGlobal + .update(taskExecutionLogs) + .set({ ...input, finishedAt: now() }) + .where(eq(taskExecutionLogs.id, id)); +} + +export async function markStaleRunningTasks() { + const result = await dbGlobal + .update(taskExecutionLogs) + .set({ + status: "failed", + errorMessage: "Service restarted — task was interrupted", + finishedAt: now(), + }) + .where(eq(taskExecutionLogs.status, "running")); + + if (result.rowsAffected > 0) { + logger.info("Marked %d stale running execution(s) as failed", result.rowsAffected); + } +} + +export async function cleanupOldLogs(retentionDays: number) { + const cutoff = new Date(Date.now() - retentionDays * 24 * 60 * 60 * 1000); + const result = await dbGlobal + .delete(taskExecutionLogs) + .where( + and( + eq(taskExecutionLogs.status, "success"), + sql`${taskExecutionLogs.startedAt} < ${cutoff.getTime()}` + ) + ); + + if (result.rowsAffected > 0) { + logger.info("Cleaned up %d old execution log(s)", result.rowsAffected); + } +} + +export async function getStats() { + const [totalTasks, enabledTasks, recentExecutions] = await Promise.all([ + dbGlobal.select({ count: sql`count(*)` }).from(scheduledTasks), + dbGlobal + .select({ count: sql`count(*)` }) + .from(scheduledTasks) + .where(eq(scheduledTasks.enabled, 1)), + dbGlobal + .select({ count: sql`count(*)` }) + .from(taskExecutionLogs) + .where( + sql`${taskExecutionLogs.startedAt} > ${Date.now() - 24 * 60 * 60 * 1000}` + ), + ]); + + return { + totalTasks: totalTasks[0]?.count ?? 0, + enabledTasks: enabledTasks[0]?.count ?? 0, + last24hExecutions: recentExecutions[0]?.count ?? 0, + }; +}