Browse Source

feat: add scheduler service layer with task/execution CRUD

main
npmrun 2 weeks ago
parent
commit
9bbe84ae03
  1. 239
      server/service/scheduler/index.ts

239
server/service/scheduler/index.ts

@ -0,0 +1,239 @@
import { dbGlobal } from "drizzle-pkg/lib/db";
import { scheduledTasks, taskExecutionLogs } from "drizzle-pkg/lib/schema/scheduler";
import { eq, desc, and, sql } from "drizzle-orm";
import log4js from "logger";
const logger = log4js.getLogger("SCHEDULER");
export interface CreateTaskInput {
name: string;
cronExpression: string;
type: "function" | "http";
functionName?: string;
functionPayload?: string;
httpMethod?: string;
httpUrl?: string;
httpHeaders?: string;
httpBody?: string;
catchUp?: number;
enabled?: number;
maxRetries?: number;
retryDelaySeconds?: number;
timeoutSeconds?: number;
}
export interface UpdateTaskInput extends Partial<CreateTaskInput> {}
function uuid() {
return crypto.randomUUID();
}
function now() {
return new Date();
}
// ---- Task CRUD ----
export async function listTasks(opts: {
page?: number;
pageSize?: number;
type?: string;
enabled?: number;
}) {
const page = opts.page ?? 1;
const pageSize = opts.pageSize ?? 20;
const conditions = [];
if (opts.type) conditions.push(eq(scheduledTasks.type, opts.type));
if (opts.enabled !== undefined) conditions.push(eq(scheduledTasks.enabled, opts.enabled));
const where = conditions.length > 0 ? and(...conditions) : undefined;
const [rows, countResult] = await Promise.all([
dbGlobal
.select()
.from(scheduledTasks)
.where(where)
.orderBy(desc(scheduledTasks.createdAt))
.limit(pageSize)
.offset((page - 1) * pageSize),
dbGlobal
.select({ count: sql<number>`count(*)` })
.from(scheduledTasks)
.where(where),
]);
return {
list: rows,
total: countResult[0]?.count ?? 0,
page,
pageSize,
};
}
export async function getTaskById(id: string) {
const rows = await dbGlobal
.select()
.from(scheduledTasks)
.where(eq(scheduledTasks.id, id))
.limit(1);
return rows[0] ?? null;
}
export async function createTask(input: CreateTaskInput) {
const id = uuid();
const ts = now();
await dbGlobal.insert(scheduledTasks).values({
id,
...input,
createdAt: ts,
updatedAt: ts,
});
return getTaskById(id);
}
export async function updateTask(id: string, input: UpdateTaskInput) {
await dbGlobal
.update(scheduledTasks)
.set({ ...input, updatedAt: now() })
.where(eq(scheduledTasks.id, id));
return getTaskById(id);
}
export async function deleteTask(id: string) {
await dbGlobal.delete(scheduledTasks).where(eq(scheduledTasks.id, id));
}
export async function toggleTask(id: string, enabled: boolean) {
await dbGlobal
.update(scheduledTasks)
.set({ enabled: enabled ? 1 : 0, updatedAt: now() })
.where(eq(scheduledTasks.id, id));
return getTaskById(id);
}
// ---- Execution logs ----
export async function listExecutions(opts: {
taskId?: string;
status?: string;
page?: number;
pageSize?: number;
}) {
const page = opts.page ?? 1;
const pageSize = opts.pageSize ?? 20;
const conditions = [];
if (opts.taskId) conditions.push(eq(taskExecutionLogs.taskId, opts.taskId));
if (opts.status) conditions.push(eq(taskExecutionLogs.status, opts.status));
const where = conditions.length > 0 ? and(...conditions) : undefined;
const [rows, countResult] = await Promise.all([
dbGlobal
.select()
.from(taskExecutionLogs)
.where(where)
.orderBy(desc(taskExecutionLogs.startedAt))
.limit(pageSize)
.offset((page - 1) * pageSize),
dbGlobal
.select({ count: sql<number>`count(*)` })
.from(taskExecutionLogs)
.where(where),
]);
return {
list: rows,
total: countResult[0]?.count ?? 0,
page,
pageSize,
};
}
export async function getRecentExecutions(taskId: string, limit = 10) {
return dbGlobal
.select()
.from(taskExecutionLogs)
.where(eq(taskExecutionLogs.taskId, taskId))
.orderBy(desc(taskExecutionLogs.startedAt))
.limit(limit);
}
export async function createExecutionLog(input: {
taskId: string;
status: "running" | "success" | "failed";
errorMessage?: string;
resultSummary?: string;
}) {
const id = uuid();
await dbGlobal.insert(taskExecutionLogs).values({
id,
...input,
startedAt: now(),
});
return id;
}
export async function updateExecutionLog(
id: string,
input: { status: "success" | "failed"; errorMessage?: string; resultSummary?: string }
) {
await dbGlobal
.update(taskExecutionLogs)
.set({ ...input, finishedAt: now() })
.where(eq(taskExecutionLogs.id, id));
}
export async function markStaleRunningTasks() {
const result = await dbGlobal
.update(taskExecutionLogs)
.set({
status: "failed",
errorMessage: "Service restarted — task was interrupted",
finishedAt: now(),
})
.where(eq(taskExecutionLogs.status, "running"));
if (result.rowsAffected > 0) {
logger.info("Marked %d stale running execution(s) as failed", result.rowsAffected);
}
}
export async function cleanupOldLogs(retentionDays: number) {
const cutoff = new Date(Date.now() - retentionDays * 24 * 60 * 60 * 1000);
const result = await dbGlobal
.delete(taskExecutionLogs)
.where(
and(
eq(taskExecutionLogs.status, "success"),
sql`${taskExecutionLogs.startedAt} < ${cutoff.getTime()}`
)
);
if (result.rowsAffected > 0) {
logger.info("Cleaned up %d old execution log(s)", result.rowsAffected);
}
}
export async function getStats() {
const [totalTasks, enabledTasks, recentExecutions] = await Promise.all([
dbGlobal.select({ count: sql<number>`count(*)` }).from(scheduledTasks),
dbGlobal
.select({ count: sql<number>`count(*)` })
.from(scheduledTasks)
.where(eq(scheduledTasks.enabled, 1)),
dbGlobal
.select({ count: sql<number>`count(*)` })
.from(taskExecutionLogs)
.where(
sql`${taskExecutionLogs.startedAt} > ${Date.now() - 24 * 60 * 60 * 1000}`
),
]);
return {
totalTasks: totalTasks[0]?.count ?? 0,
enabledTasks: enabledTasks[0]?.count ?? 0,
last24hExecutions: recentExecutions[0]?.count ?? 0,
};
}
Loading…
Cancel
Save