import { dbGlobal } from "drizzle-pkg/lib/db"; import { scheduledTasks, taskExecutionLogs } from "drizzle-pkg/lib/schema/scheduler"; import { eq, desc, and, sql } from "drizzle-orm"; import log4js from "logger"; const logger = log4js.getLogger("SCHEDULER"); export interface CreateTaskInput { name: string; cronExpression: string; type: "function" | "http"; functionName?: string; functionPayload?: string; httpMethod?: string; httpUrl?: string; httpHeaders?: string; httpBody?: string; catchUp?: number; enabled?: number; maxRetries?: number; retryDelaySeconds?: number; timeoutSeconds?: number; } export interface UpdateTaskInput extends Partial {} function uuid() { return crypto.randomUUID(); } function now() { return new Date(); } // ---- Task CRUD ---- export async function listTasks(opts: { page?: number; pageSize?: number; type?: string; enabled?: number; }) { const page = opts.page ?? 1; const pageSize = opts.pageSize ?? 20; const conditions = []; if (opts.type) conditions.push(eq(scheduledTasks.type, opts.type)); if (opts.enabled !== undefined) conditions.push(eq(scheduledTasks.enabled, opts.enabled)); const where = conditions.length > 0 ? and(...conditions) : undefined; const [rows, countResult] = await Promise.all([ dbGlobal .select() .from(scheduledTasks) .where(where) .orderBy(desc(scheduledTasks.createdAt)) .limit(pageSize) .offset((page - 1) * pageSize), dbGlobal .select({ count: sql`count(*)` }) .from(scheduledTasks) .where(where), ]); return { list: rows, total: countResult[0]?.count ?? 0, page, pageSize, }; } export async function getTaskById(id: string) { const rows = await dbGlobal .select() .from(scheduledTasks) .where(eq(scheduledTasks.id, id)) .limit(1); return rows[0] ?? null; } export async function createTask(input: CreateTaskInput) { const id = uuid(); const ts = now(); await dbGlobal.insert(scheduledTasks).values({ id, ...input, createdAt: ts, updatedAt: ts, }); return getTaskById(id); } export async function updateTask(id: string, input: UpdateTaskInput) { await dbGlobal .update(scheduledTasks) .set({ ...input, updatedAt: now() }) .where(eq(scheduledTasks.id, id)); return getTaskById(id); } export async function deleteTask(id: string) { await dbGlobal.delete(scheduledTasks).where(eq(scheduledTasks.id, id)); } export async function toggleTask(id: string, enabled: boolean) { await dbGlobal .update(scheduledTasks) .set({ enabled: enabled ? 1 : 0, updatedAt: now() }) .where(eq(scheduledTasks.id, id)); return getTaskById(id); } // ---- Execution logs ---- export async function listExecutions(opts: { taskId?: string; status?: string; page?: number; pageSize?: number; }) { const page = opts.page ?? 1; const pageSize = opts.pageSize ?? 20; const conditions = []; if (opts.taskId) conditions.push(eq(taskExecutionLogs.taskId, opts.taskId)); if (opts.status) conditions.push(eq(taskExecutionLogs.status, opts.status)); const where = conditions.length > 0 ? and(...conditions) : undefined; const [rows, countResult] = await Promise.all([ dbGlobal .select() .from(taskExecutionLogs) .where(where) .orderBy(desc(taskExecutionLogs.startedAt)) .limit(pageSize) .offset((page - 1) * pageSize), dbGlobal .select({ count: sql`count(*)` }) .from(taskExecutionLogs) .where(where), ]); return { list: rows, total: countResult[0]?.count ?? 0, page, pageSize, }; } export async function getRecentExecutions(taskId: string, limit = 10) { return dbGlobal .select() .from(taskExecutionLogs) .where(eq(taskExecutionLogs.taskId, taskId)) .orderBy(desc(taskExecutionLogs.startedAt)) .limit(limit); } export async function createExecutionLog(input: { taskId: string; status: "running" | "success" | "failed"; errorMessage?: string; resultSummary?: string; }) { const id = uuid(); await dbGlobal.insert(taskExecutionLogs).values({ id, ...input, startedAt: now(), }); return id; } export async function updateExecutionLog( id: string, input: { status: "success" | "failed"; errorMessage?: string; resultSummary?: string } ) { await dbGlobal .update(taskExecutionLogs) .set({ ...input, finishedAt: now() }) .where(eq(taskExecutionLogs.id, id)); } export async function markStaleRunningTasks() { const result = await dbGlobal .update(taskExecutionLogs) .set({ status: "failed", errorMessage: "Service restarted — task was interrupted", finishedAt: now(), }) .where(eq(taskExecutionLogs.status, "running")); if (result.rowsAffected > 0) { logger.info("Marked %d stale running execution(s) as failed", result.rowsAffected); } } export async function cleanupOldLogs(retentionDays: number) { const cutoff = new Date(Date.now() - retentionDays * 24 * 60 * 60 * 1000); const result = await dbGlobal .delete(taskExecutionLogs) .where( and( eq(taskExecutionLogs.status, "success"), sql`${taskExecutionLogs.startedAt} < ${cutoff.getTime()}` ) ); if (result.rowsAffected > 0) { logger.info("Cleaned up %d old execution log(s)", result.rowsAffected); } } export async function deleteExecution(id: string) { const result = await dbGlobal .delete(taskExecutionLogs) .where(eq(taskExecutionLogs.id, id)); return result.rowsAffected > 0; } export async function deleteAllExecutions(taskId?: string) { const condition = taskId ? eq(taskExecutionLogs.taskId, taskId) : undefined; const result = await dbGlobal .delete(taskExecutionLogs) .where(condition); return result.rowsAffected; } export async function getStats() { const [totalTasks, enabledTasks, recentExecutions] = await Promise.all([ dbGlobal.select({ count: sql`count(*)` }).from(scheduledTasks), dbGlobal .select({ count: sql`count(*)` }) .from(scheduledTasks) .where(eq(scheduledTasks.enabled, 1)), dbGlobal .select({ count: sql`count(*)` }) .from(taskExecutionLogs) .where( sql`${taskExecutionLogs.startedAt} > ${Date.now() - 24 * 60 * 60 * 1000}` ), ]); return { totalTasks: totalTasks[0]?.count ?? 0, enabledTasks: enabledTasks[0]?.count ?? 0, last24hExecutions: recentExecutions[0]?.count ?? 0, }; }