# Scheduled Tasks Feature Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Add cron-based scheduled task functionality with REST API control and Nuxt UI admin page, supporting both system-registered functions and HTTP requests. **Architecture:** A Nitro plugin (`03.scheduler.ts`) initializes the Scheduler Engine on server start. The Engine uses `croner` for cron parsing/triggering, an Executor Pool for concurrency control, and a Task Registry for named function handlers. Tasks are persisted via Drizzle ORM to SQLite. API endpoints handle CRUD and trigger the Engine for hot-reload. Frontend admin pages use Nuxt UI for task/execution management. **Tech Stack:** croner, Drizzle ORM + SQLite, Nitro plugins, Nuxt UI, Zod validation, log4js --- ### Task 1: Install croner dependency **Files:** - Modify: `package.json` - [ ] **Step 1: Add croner to project** ```bash bun add croner ``` - [ ] **Step 2: Verify installation** Check `package.json` now includes `"croner"` in dependencies. Check `bun.lock` is updated. - [ ] **Step 3: Commit** ```bash git add package.json bun.lock git commit -m "chore: add croner dependency for scheduled tasks" ``` --- ### Task 2: Create Drizzle schema for scheduler tables **Files:** - Create: `packages/drizzle-pkg/lib/schema/scheduler.ts` - [ ] **Step 1: Write the scheduler schema** ```typescript // packages/drizzle-pkg/lib/schema/scheduler.ts import { sqliteTable, text, integer } from "drizzle-orm/sqlite-core"; export const scheduledTasks = sqliteTable("scheduled_tasks", { id: text("id").primaryKey(), name: text("name").notNull(), cronExpression: text("cron_expression").notNull(), type: text("type").notNull(), // "function" | "http" // function type fields functionName: text("function_name"), functionPayload: text("function_payload"), // http type fields httpMethod: text("http_method"), httpUrl: text("http_url"), httpHeaders: text("http_headers"), httpBody: text("http_body"), catchUp: integer("catch_up").default(0).notNull(), enabled: integer("enabled").default(1).notNull(), maxRetries: integer("max_retries").default(0).notNull(), retryDelaySeconds: integer("retry_delay_seconds").default(60).notNull(), timeoutSeconds: integer("timeout_seconds").default(300).notNull(), createdAt: integer("created_at", { mode: "timestamp_ms" }).defaultNow().notNull(), updatedAt: integer("updated_at", { mode: "timestamp_ms" }) .defaultNow() .$onUpdate(() => new Date()) .notNull(), }); export const taskExecutionLogs = sqliteTable("task_execution_logs", { id: text("id").primaryKey(), taskId: text("task_id") .notNull() .references(() => scheduledTasks.id), status: text("status").notNull(), // "running" | "success" | "failed" startedAt: integer("started_at", { mode: "timestamp_ms" }).defaultNow().notNull(), finishedAt: integer("finished_at", { mode: "timestamp_ms" }), errorMessage: text("error_message"), resultSummary: text("result_summary"), }); ``` - [ ] **Step 2: Commit** ```bash git add packages/drizzle-pkg/lib/schema/scheduler.ts git commit -m "feat: add scheduler Drizzle schema (scheduled_tasks, task_execution_logs)" ``` --- ### Task 3: Generate and run DB migration **Files:** - Create: migration SQL file (auto-generated by drizzle-kit) - [ ] **Step 1: Generate migration** ```bash bun run db:generate ``` - [ ] **Step 2: Verify the generated migration SQL** Check the new migration file in `packages/drizzle-pkg/migrations/` contains `CREATE TABLE scheduled_tasks` and `CREATE TABLE task_execution_logs`. - [ ] **Step 3: Run migration** ```bash bun run db:migrate ``` - [ ] **Step 4: Commit** ```bash git add packages/drizzle-pkg/migrations/ git commit -m "feat: add scheduler tables migration" ``` --- ### Task 4: Create scheduler service layer **Files:** - Create: `server/service/scheduler/index.ts` - [ ] **Step 1: Write the service layer** ```typescript // server/service/scheduler/index.ts import { dbGlobal } from "drizzle-pkg/lib/db"; import { scheduledTasks, taskExecutionLogs } from "drizzle-pkg/lib/schema/scheduler"; import { eq, desc, and, sql } from "drizzle-orm"; import log4js from "logger"; const logger = log4js.getLogger("SCHEDULER"); export interface CreateTaskInput { name: string; cronExpression: string; type: "function" | "http"; functionName?: string; functionPayload?: string; httpMethod?: string; httpUrl?: string; httpHeaders?: string; httpBody?: string; catchUp?: number; enabled?: number; maxRetries?: number; retryDelaySeconds?: number; timeoutSeconds?: number; } export interface UpdateTaskInput extends Partial {} function uuid() { return crypto.randomUUID(); } function now() { return new Date(); } // ---- Task CRUD ---- export async function listTasks(opts: { page?: number; pageSize?: number; type?: string; enabled?: number; }) { const page = opts.page ?? 1; const pageSize = opts.pageSize ?? 20; const conditions = []; if (opts.type) conditions.push(eq(scheduledTasks.type, opts.type)); if (opts.enabled !== undefined) conditions.push(eq(scheduledTasks.enabled, opts.enabled)); const where = conditions.length > 0 ? and(...conditions) : undefined; const [rows, countResult] = await Promise.all([ dbGlobal .select() .from(scheduledTasks) .where(where) .orderBy(desc(scheduledTasks.createdAt)) .limit(pageSize) .offset((page - 1) * pageSize), dbGlobal .select({ count: sql`count(*)` }) .from(scheduledTasks) .where(where), ]); return { list: rows, total: countResult[0]?.count ?? 0, page, pageSize, }; } export async function getTaskById(id: string) { const rows = await dbGlobal .select() .from(scheduledTasks) .where(eq(scheduledTasks.id, id)) .limit(1); return rows[0] ?? null; } export async function createTask(input: CreateTaskInput) { const id = uuid(); const ts = now(); await dbGlobal.insert(scheduledTasks).values({ id, ...input, createdAt: ts, updatedAt: ts, }); return getTaskById(id); } export async function updateTask(id: string, input: UpdateTaskInput) { await dbGlobal .update(scheduledTasks) .set({ ...input, updatedAt: now() }) .where(eq(scheduledTasks.id, id)); return getTaskById(id); } export async function deleteTask(id: string) { await dbGlobal.delete(scheduledTasks).where(eq(scheduledTasks.id, id)); } export async function toggleTask(id: string, enabled: boolean) { await dbGlobal .update(scheduledTasks) .set({ enabled: enabled ? 1 : 0, updatedAt: now() }) .where(eq(scheduledTasks.id, id)); return getTaskById(id); } // ---- Execution logs ---- export async function listExecutions(opts: { taskId?: string; status?: string; page?: number; pageSize?: number; }) { const page = opts.page ?? 1; const pageSize = opts.pageSize ?? 20; const conditions = []; if (opts.taskId) conditions.push(eq(taskExecutionLogs.taskId, opts.taskId)); if (opts.status) conditions.push(eq(taskExecutionLogs.status, opts.status)); const where = conditions.length > 0 ? and(...conditions) : undefined; const [rows, countResult] = await Promise.all([ dbGlobal .select() .from(taskExecutionLogs) .where(where) .orderBy(desc(taskExecutionLogs.startedAt)) .limit(pageSize) .offset((page - 1) * pageSize), dbGlobal .select({ count: sql`count(*)` }) .from(taskExecutionLogs) .where(where), ]); return { list: rows, total: countResult[0]?.count ?? 0, page, pageSize, }; } export async function getRecentExecutions(taskId: string, limit = 10) { return dbGlobal .select() .from(taskExecutionLogs) .where(eq(taskExecutionLogs.taskId, taskId)) .orderBy(desc(taskExecutionLogs.startedAt)) .limit(limit); } export async function createExecutionLog(input: { taskId: string; status: "running" | "success" | "failed"; errorMessage?: string; resultSummary?: string; }) { const id = uuid(); await dbGlobal.insert(taskExecutionLogs).values({ id, ...input, startedAt: now(), }); return id; } export async function updateExecutionLog( id: string, input: { status: "success" | "failed"; errorMessage?: string; resultSummary?: string } ) { await dbGlobal .update(taskExecutionLogs) .set({ ...input, finishedAt: now() }) .where(eq(taskExecutionLogs.id, id)); } export async function markStaleRunningTasks() { const result = await dbGlobal .update(taskExecutionLogs) .set({ status: "failed", errorMessage: "Service restarted — task was interrupted", finishedAt: now(), }) .where(eq(taskExecutionLogs.status, "running")); if (result.rowsAffected > 0) { logger.info("Marked %d stale running execution(s) as failed", result.rowsAffected); } } export async function cleanupOldLogs(retentionDays: number) { const cutoff = new Date(Date.now() - retentionDays * 24 * 60 * 60 * 1000); const result = await dbGlobal .delete(taskExecutionLogs) .where( and( eq(taskExecutionLogs.status, "success"), // Only clean up successful logs older than retention sql`${taskExecutionLogs.startedAt} < ${cutoff.getTime()}` ) ); if (result.rowsAffected > 0) { logger.info("Cleaned up %d old execution log(s)", result.rowsAffected); } } export async function getStats() { const [totalTasks, enabledTasks, recentExecutions] = await Promise.all([ dbGlobal.select({ count: sql`count(*)` }).from(scheduledTasks), dbGlobal .select({ count: sql`count(*)` }) .from(scheduledTasks) .where(eq(scheduledTasks.enabled, 1)), dbGlobal .select({ count: sql`count(*)` }) .from(taskExecutionLogs) .where( sql`${taskExecutionLogs.startedAt} > ${Date.now() - 24 * 60 * 60 * 1000}` ), ]); return { totalTasks: totalTasks[0]?.count ?? 0, enabledTasks: enabledTasks[0]?.count ?? 0, last24hExecutions: recentExecutions[0]?.count ?? 0, }; } ``` - [ ] **Step 2: Commit** ```bash git add server/service/scheduler/index.ts git commit -m "feat: add scheduler service layer with task/execution CRUD" ``` --- ### Task 5: Create Task Registry **Files:** - Create: `server/scheduler/registry.ts` - [ ] **Step 1: Write the Task Registry** ```typescript // server/scheduler/registry.ts import log4js from "logger"; const logger = log4js.getLogger("SCHEDULER"); export type TaskHandler = ( payload?: Record ) => Promise<{ success: boolean; message: string }>; const registry = new Map(); export function registerTask(name: string, handler: TaskHandler): void { if (registry.has(name)) { logger.warn("Task '%s' is already registered, overwriting", name); } registry.set(name, handler); logger.info("Registered task: %s", name); } export function hasTask(name: string): boolean { return registry.has(name); } export function listRegisteredTasks(): string[] { return Array.from(registry.keys()); } export function getTaskHandler(name: string): TaskHandler | undefined { return registry.get(name); } ``` - [ ] **Step 2: Commit** ```bash git add server/scheduler/registry.ts git commit -m "feat: add Task Registry for named function handlers" ``` --- ### Task 6: Create Executor Pool **Files:** - Create: `server/scheduler/executor-pool.ts` - [ ] **Step 1: Write the Executor Pool** ```typescript // server/scheduler/executor-pool.ts import log4js from "logger"; const logger = log4js.getLogger("SCHEDULER"); type QueuedTask = { run: () => void; }; export class ExecutorPool { private running = 0; private queue: QueuedTask[] = []; private maxConcurrency: number; constructor(maxConcurrency = 5) { this.maxConcurrency = maxConcurrency; } get activeCount(): number { return this.running; } get queuedCount(): number { return this.queue.length; } async execute(fn: () => Promise): Promise { if (this.running >= this.maxConcurrency) { logger.info( "Pool full (%d/%d), queuing task (%d in queue)", this.running, this.maxConcurrency, this.queue.length + 1 ); await new Promise((resolve) => { this.queue.push({ run: resolve }); }); } this.running++; try { await fn(); } finally { this.running--; const next = this.queue.shift(); if (next) { next.run(); } } } } ``` - [ ] **Step 2: Commit** ```bash git add server/scheduler/executor-pool.ts git commit -m "feat: add Executor Pool with concurrency control" ``` --- ### Task 7: Create Scheduler Engine **Files:** - Create: `server/scheduler/engine.ts` - [ ] **Step 1: Write the Scheduler Engine** ```typescript // server/scheduler/engine.ts import { Cron } from "croner"; import log4js from "logger"; import { ExecutorPool } from "./executor-pool"; import { getTaskHandler, hasTask } from "./registry"; import { listTasks, getTaskById, createExecutionLog, updateExecutionLog, } from "../service/scheduler"; const logger = log4js.getLogger("SCHEDULER"); interface CronJobEntry { taskId: string; cron: Cron; } interface TaskRow { id: string; name: string; cronExpression: string; type: string; functionName?: string | null; functionPayload?: string | null; httpMethod?: string | null; httpUrl?: string | null; httpHeaders?: string | null; httpBody?: string | null; catchUp: number; enabled: number; maxRetries: number; retryDelaySeconds: number; timeoutSeconds: number; } let pool: ExecutorPool; export function getExecutorPool(): ExecutorPool { return pool; } const jobs = new Map(); function parsePayload(payload?: string | null): Record | undefined { if (!payload) return undefined; try { return JSON.parse(payload); } catch { return undefined; } } async function executeFunctionTask(task: TaskRow): Promise { if (!task.functionName) { logger.error("[task:%s] function type but no functionName", task.id); return; } if (!hasTask(task.functionName)) { logger.error("[task:%s] function '%s' not registered", task.id, task.functionName); return; } const handler = getTaskHandler(task.functionName)!; const payload = parsePayload(task.functionPayload); const executeWithRetries = async (): Promise<{ success: boolean; message: string }> => { let lastError: Error | undefined; for (let attempt = 0; attempt <= task.maxRetries; attempt++) { try { if (attempt > 0) { logger.info( "[task:%s] retry %d/%d after %ds", task.id, attempt, task.maxRetries, task.retryDelaySeconds ); await new Promise((resolve) => { const signal = AbortSignal.timeout(task.retryDelaySeconds * 1000); signal.addEventListener("abort", resolve, { once: true }); }); } return await handler(payload); } catch (err) { lastError = err instanceof Error ? err : new Error(String(err)); logger.error("[task:%s] attempt %d failed: %s", task.id, attempt, lastError.message); } } throw lastError ?? new Error("max retries exceeded"); }; const logId = await createExecutionLog({ taskId: task.id, status: "running" }); try { const abortController = new AbortController(); const timeoutSignal = AbortSignal.timeout(task.timeoutSeconds * 1000); const result = await new Promise<{ success: boolean; message: string }>( (resolve, reject) => { timeoutSignal.addEventListener("abort", () => { reject(new Error(`Task timed out after ${task.timeoutSeconds}s`)); }, { once: true }); executeWithRetries().then(resolve).catch(reject); } ); await updateExecutionLog(logId, { status: "success", resultSummary: result.message, }); logger.info("[task:%s] completed: %s", task.id, result.message); } catch (err) { const message = err instanceof Error ? err.message : String(err); await updateExecutionLog(logId, { status: "failed", errorMessage: message }); logger.error("[task:%s] failed: %s", task.id, message); } } async function executeHttpTask(task: TaskRow): Promise { if (!task.httpUrl) { logger.error("[task:%s] http type but no httpUrl", task.id); return; } const logId = await createExecutionLog({ taskId: task.id, status: "running" }); try { const headers: Record = task.httpHeaders ? JSON.parse(task.httpHeaders) : {}; const abortController = new AbortController(); const timeoutSignal = AbortSignal.timeout(task.timeoutSeconds * 1000); const response = await fetch(task.httpUrl, { method: (task.httpMethod ?? "GET").toUpperCase(), headers: { "Content-Type": "application/json", ...headers }, body: task.httpBody ?? undefined, signal: AbortSignal.any([abortController.signal, timeoutSignal]), }); const summary = `HTTP ${response.status} ${response.statusText}`; if (!response.ok) { await updateExecutionLog(logId, { status: "failed", errorMessage: summary, resultSummary: summary }); logger.error("[task:%s] %s", task.id, summary); } else { await updateExecutionLog(logId, { status: "success", resultSummary: summary }); logger.info("[task:%s] %s", task.id, summary); } } catch (err) { const message = err instanceof Error ? err.message : String(err); await updateExecutionLog(logId, { status: "failed", errorMessage: message }); logger.error("[task:%s] HTTP request failed: %s", task.id, message); } } async function executeTask(task: TaskRow): Promise { logger.info("[task:%s] executing '%s'", task.id, task.name); if (task.type === "function") { await executeFunctionTask(task); } else if (task.type === "http") { await executeHttpTask(task); } else { logger.error("[task:%s] unknown type: %s", task.id, task.type); } } function scheduleTask(task: TaskRow): void { const cron = new Cron(task.cronExpression, async () => { await pool.execute(() => executeTask(task)); }); jobs.set(task.id, { taskId: task.id, cron }); logger.info( "[task:%s] scheduled '%s' cron='%s' next=%s", task.id, task.name, task.cronExpression, cron.nextRun()?.toISOString() ?? "none" ); } function unscheduleTask(taskId: string): void { const entry = jobs.get(taskId); if (entry) { entry.cron.stop(); jobs.delete(taskId); logger.info("[task:%s] unscheduled", taskId); } } // ---- Public API ---- export async function start(maxConcurrency: number, logRetentionDays: number): Promise { pool = new ExecutorPool(maxConcurrency); logger.info("Scheduler starting (max concurrency: %d)", maxConcurrency); // Mark stale running tasks and clean old logs const { markStaleRunningTasks, cleanupOldLogs } = await import("../service/scheduler"); await markStaleRunningTasks(); await cleanupOldLogs(logRetentionDays); // Load all enabled tasks const { list: allTasks } = await listTasks({ pageSize: 9999 }); for (const task of allTasks) { if (!task.enabled) continue; try { scheduleTask(task as TaskRow); // Catch-up: if catchUp=1 and the task missed its last scheduled run, execute once if (task.catchUp) { try { const checkCron = new Cron(task.cronExpression); const previous = checkCron.previousRun()?.getTime(); if (previous && previous > 0 && previous < Date.now()) { logger.info( "[task:%s] catch-up: missed run at %s, executing now", task.id, new Date(previous).toISOString() ); pool.execute(() => executeTask(task as TaskRow)); } } catch (err) { logger.warn("[task:%s] catch-up check failed: %s", task.id, String(err)); } } } catch (err) { const message = err instanceof Error ? err.message : String(err); logger.error("[task:%s] failed to schedule: %s", task.id, message); } } logger.info("Scheduler started with %d active job(s)", jobs.size); } export async function stop(): Promise { for (const [id] of jobs) { unscheduleTask(id); } logger.info("Scheduler stopped"); } export function addTask(taskId: string): void { unscheduleTask(taskId); getTaskById(taskId).then((task) => { if (task && task.enabled) { scheduleTask(task as TaskRow); } }); } export function removeTask(taskId: string): void { unscheduleTask(taskId); } export function reloadTask(taskId: string): void { addTask(taskId); } export async function triggerTask(taskId: string): Promise { const task = await getTaskById(taskId); if (!task) { throw new Error(`Task ${taskId} not found`); } await pool.execute(() => executeTask(task as TaskRow)); } export function getJobCount(): number { return jobs.size; } ``` - [ ] **Step 2: Commit** ```bash git add server/scheduler/engine.ts git commit -m "feat: add Scheduler Engine with croner integration and hot-reload support" ``` --- ### Task 8: Create Nitro plugin to wire scheduler lifecycle **Files:** - Create: `server/plugins/03.scheduler.ts` - [ ] **Step 1: Write the Nitro plugin** ```typescript // server/plugins/03.scheduler.ts import { start, stop } from "../scheduler/engine"; const MAX_CONCURRENCY = Number(process.env.SCHEDULER_MAX_CONCURRENCY) || 5; const LOG_RETENTION_DAYS = Number(process.env.SCHEDULER_LOG_RETENTION_DAYS) || 30; if (import.meta.dev) { console.log("plugin: 03.scheduler"); } export default defineNitroPlugin(async (nitroApp) => { await start(MAX_CONCURRENCY, LOG_RETENTION_DAYS); nitroApp.hooks.hook("close", async () => { await stop(); }); }); ``` - [ ] **Step 2: Add env vars to .env.example** ```bash echo ' SCHEDULER_MAX_CONCURRENCY=5 SCHEDULER_LOG_RETENTION_DAYS=30' >> .env.example ``` - [ ] **Step 3: Commit** ```bash git add server/plugins/03.scheduler.ts .env.example git commit -m "feat: wire scheduler lifecycle into Nitro plugin" ``` --- ### Task 9: API — List and Create tasks **Files:** - Create: `server/api/scheduler/tasks/index.get.ts` - Create: `server/api/scheduler/tasks/index.post.ts` - [ ] **Step 1: Write GET /api/scheduler/tasks (list)** ```typescript // server/api/scheduler/tasks/index.get.ts import { listTasks } from "../../../service/scheduler"; import { listRegisteredTasks } from "../../../scheduler/registry"; export default defineWrappedResponseHandler(async (event) => { const query = getQuery(event); const page = query.page ? Number(query.page) : 1; const pageSize = query.pageSize ? Number(query.pageSize) : 20; const result = await listTasks({ page, pageSize, type: query.type as string | undefined, enabled: query.enabled !== undefined ? Number(query.enabled) : undefined, }); return R.success({ ...result, registeredFunctions: listRegisteredTasks(), }); }); ``` - [ ] **Step 2: Write zod schema for task creation and POST /api/scheduler/tasks** ```typescript // server/api/scheduler/tasks/index.post.ts import { z } from "zod"; import { createTask } from "../../../service/scheduler"; import { addTask } from "../../../scheduler/engine"; import { Cron } from "croner"; const createTaskSchema = z.object({ name: z.string().min(1), cronExpression: z.string().min(1), type: z.enum(["function", "http"]), functionName: z.string().optional(), functionPayload: z.string().optional(), httpMethod: z.enum(["GET", "POST", "PUT", "DELETE"]).optional(), httpUrl: z.string().url().optional(), httpHeaders: z.string().optional(), httpBody: z.string().optional(), catchUp: z.union([z.boolean(), z.number()]).optional(), enabled: z.union([z.boolean(), z.number()]).optional(), maxRetries: z.number().int().min(0).optional(), retryDelaySeconds: z.number().int().min(1).optional(), timeoutSeconds: z.number().int().min(1).optional(), }); export default defineWrappedResponseHandler(async (event) => { const body = await readBody(event); const parsed = createTaskSchema.safeParse(body); if (!parsed.success) { return R.throwError(422, "Validation failed", parsed.error.issues); } // Validate cron expression try { new Cron(parsed.data.cronExpression); } catch { return R.throwError(422, "Invalid cron expression", null); } // function type must specify functionName if (parsed.data.type === "function" && !parsed.data.functionName) { return R.throwError(422, "functionName required for function type", null); } // http type must specify httpUrl if (parsed.data.type === "http" && !parsed.data.httpUrl) { return R.throwError(422, "httpUrl required for http type", null); } const task = await createTask({ ...parsed.data, catchUp: parsed.data.catchUp ? 1 : 0, enabled: parsed.data.enabled !== undefined ? (parsed.data.enabled ? 1 : 0) : 1, }); if (task) { addTask(task.id); } return R.success(task); }); ``` - [ ] **Step 3: Commit** ```bash git add server/api/scheduler/tasks/index.get.ts server/api/scheduler/tasks/index.post.ts git commit -m "feat: add API endpoints to list and create scheduled tasks" ``` --- ### Task 10: API — Get, Update, Delete task **Files:** - Create: `server/api/scheduler/tasks/[id].get.ts` - Create: `server/api/scheduler/tasks/[id].put.ts` - Create: `server/api/scheduler/tasks/[id].delete.ts` - [ ] **Step 1: Write GET /api/scheduler/tasks/:id** ```typescript // server/api/scheduler/tasks/[id].get.ts import { getTaskById, getRecentExecutions } from "../../../service/scheduler"; export default defineWrappedResponseHandler(async (event) => { const id = getRouterParam(event, "id"); if (!id) return R.throwError(400, "Missing id", null); const task = await getTaskById(id); if (!task) return R.throwError(404, "Task not found", null); const recentExecutions = await getRecentExecutions(id, 20); return R.success({ task, recentExecutions }); }); ``` - [ ] **Step 2: Write PUT /api/scheduler/tasks/:id** ```typescript // server/api/scheduler/tasks/[id].put.ts import { z } from "zod"; import { updateTask, getTaskById } from "../../../service/scheduler"; import { reloadTask } from "../../../scheduler/engine"; import { Cron } from "croner"; const updateTaskSchema = z.object({ name: z.string().min(1).optional(), cronExpression: z.string().min(1).optional(), type: z.enum(["function", "http"]).optional(), functionName: z.string().optional().nullable(), functionPayload: z.string().optional().nullable(), httpMethod: z.enum(["GET", "POST", "PUT", "DELETE"]).optional().nullable(), httpUrl: z.string().optional().nullable(), httpHeaders: z.string().optional().nullable(), httpBody: z.string().optional().nullable(), catchUp: z.union([z.boolean(), z.number()]).optional(), enabled: z.union([z.boolean(), z.number()]).optional(), maxRetries: z.number().int().min(0).optional(), retryDelaySeconds: z.number().int().min(1).optional(), timeoutSeconds: z.number().int().min(1).optional(), }); export default defineWrappedResponseHandler(async (event) => { const id = getRouterParam(event, "id"); if (!id) return R.throwError(400, "Missing id", null); const body = await readBody(event); const parsed = updateTaskSchema.safeParse(body); if (!parsed.success) { return R.throwError(422, "Validation failed", parsed.error.issues); } const existing = await getTaskById(id); if (!existing) return R.throwError(404, "Task not found", null); // Validate cron if provided const cronExpr = parsed.data.cronExpression ?? existing.cronExpression; try { new Cron(cronExpr); } catch { return R.throwError(422, "Invalid cron expression", null); } const updateData: Record = { ...parsed.data }; if (parsed.data.catchUp !== undefined) { updateData.catchUp = parsed.data.catchUp ? 1 : 0; } if (parsed.data.enabled !== undefined) { updateData.enabled = parsed.data.enabled ? 1 : 0; } const task = await updateTask(id, updateData as Parameters[1]); if (task) { reloadTask(task.id); } return R.success(task); }); ``` - [ ] **Step 3: Write DELETE /api/scheduler/tasks/:id** ```typescript // server/api/scheduler/tasks/[id].delete.ts import { deleteTask } from "../../../service/scheduler"; import { removeTask } from "../../../scheduler/engine"; export default defineWrappedResponseHandler(async (event) => { const id = getRouterParam(event, "id"); if (!id) return R.throwError(400, "Missing id", null); removeTask(id); await deleteTask(id); return R.success(null); }); ``` - [ ] **Step 4: Commit** ```bash git add server/api/scheduler/tasks/\[id\].get.ts server/api/scheduler/tasks/\[id\].put.ts server/api/scheduler/tasks/\[id\].delete.ts git commit -m "feat: add API endpoints to get, update, and delete tasks" ``` --- ### Task 11: API — Trigger and Toggle **Files:** - Create: `server/api/scheduler/tasks/[id]/trigger.post.ts` - Create: `server/api/scheduler/tasks/[id]/toggle.post.ts` - [ ] **Step 1: Write POST /api/scheduler/tasks/:id/trigger** ```typescript // server/api/scheduler/tasks/[id]/trigger.post.ts import { triggerTask } from "../../../../scheduler/engine"; export default defineWrappedResponseHandler(async (event) => { const id = getRouterParam(event, "id"); if (!id) return R.throwError(400, "Missing id", null); // Fire and forget — trigger returns immediately, execution is async triggerTask(id).catch(() => {}); return R.success({ triggered: true }); }); ``` - [ ] **Step 2: Write POST /api/scheduler/tasks/:id/toggle** ```typescript // server/api/scheduler/tasks/[id]/toggle.post.ts import { toggleTask, getTaskById } from "../../../../service/scheduler"; import { reloadTask, removeTask } from "../../../../scheduler/engine"; export default defineWrappedResponseHandler(async (event) => { const id = getRouterParam(event, "id"); if (!id) return R.throwError(400, "Missing id", null); const body = await readBody<{ enabled: boolean }>(event); const task = await toggleTask(id, body.enabled); if (!task) return R.throwError(404, "Task not found", null); if (task.enabled) { reloadTask(id); } else { removeTask(id); } return R.success(task); }); ``` - [ ] **Step 3: Commit** ```bash git add server/api/scheduler/tasks/\[id\]/trigger.post.ts server/api/scheduler/tasks/\[id\]/toggle.post.ts git commit -m "feat: add API endpoints to trigger and toggle tasks" ``` --- ### Task 12: API — Execution logs and Stats **Files:** - Create: `server/api/scheduler/executions.get.ts` - Create: `server/api/scheduler/stats.get.ts` - [ ] **Step 1: Write GET /api/scheduler/executions** ```typescript // server/api/scheduler/executions.get.ts import { listExecutions } from "../../service/scheduler"; export default defineWrappedResponseHandler(async (event) => { const query = getQuery(event); const page = query.page ? Number(query.page) : 1; const pageSize = query.pageSize ? Number(query.pageSize) : 20; const result = await listExecutions({ page, pageSize, taskId: query.taskId as string | undefined, status: query.status as string | undefined, }); return R.success(result); }); ``` - [ ] **Step 2: Write GET /api/scheduler/stats** ```typescript // server/api/scheduler/stats.get.ts import { getStats } from "../../service/scheduler"; import { getJobCount } from "../../scheduler/engine"; export default defineWrappedResponseHandler(async () => { const stats = await getStats(); return R.success({ ...stats, activeJobs: getJobCount(), }); }); ``` - [ ] **Step 3: Commit** ```bash git add server/api/scheduler/executions.get.ts server/api/scheduler/stats.get.ts git commit -m "feat: add API endpoints for execution logs and stats" ``` --- ### Task 13: Frontend — Scheduler admin list page **Files:** - Create: `app/pages/admin/scheduler/index.vue` - [ ] **Step 1: Write the task list page** ```vue ``` - [ ] **Step 2: Commit** ```bash git add app/pages/admin/scheduler/index.vue git commit -m "feat: add scheduler admin list page with task table" ``` --- ### Task 14: Frontend — Create/Edit task modal component **Files:** - Create: `app/components/SchedulerTaskModal.vue` - [ ] **Step 1: Write the task modal component** ```vue ``` - [ ] **Step 2: Commit** ```bash git add app/components/SchedulerTaskModal.vue git commit -m "feat: add scheduler task create/edit modal component" ``` --- ### Task 15: Frontend — Task detail page **Files:** - Create: `app/pages/admin/scheduler/[id].vue` - [ ] **Step 1: Write the task detail page** ```vue ``` - [ ] **Step 2: Commit** ```bash git add app/pages/admin/scheduler/\[id\].vue git commit -m "feat: add scheduler task detail page with execution history" ``` --- ### Task 16: Smoke test — verify the feature end-to-end - [ ] **Step 1: Start dev server** ```bash bun run dev ``` - [ ] **Step 2: Verify API endpoints** ```bash # List tasks (should be empty initially) curl -s http://localhost:3399/api/scheduler/tasks | head -c 200 # Create a task curl -s -X POST http://localhost:3399/api/scheduler/tasks \ -H "Content-Type: application/json" \ -d '{"name":"test","cronExpression":"0 9 * * *","type":"http","httpUrl":"https://httpbin.org/get","httpMethod":"GET"}' | head -c 200 # List tasks (should have 1) curl -s http://localhost:3399/api/scheduler/tasks | head -c 200 # Get stats curl -s http://localhost:3399/api/scheduler/stats | head -c 200 ``` - [ ] **Step 3: Verify frontend** Open `http://localhost:3399/admin/scheduler` in the browser and confirm: - Stats bar shows correct numbers - Task table shows the created task - Create button opens modal - Edit/Delete/Trigger buttons work - Task detail page shows configuration and execution history --- ### Task 17: Example system task — daily log cleanup **Files:** - Create: `server/scheduler/tasks/cleanup-logs.ts` - [ ] **Step 1: Write an example system task** ```typescript // server/scheduler/tasks/cleanup-logs.ts import { registerTask } from "../registry"; import log4js from "logger"; const logger = log4js.getLogger("SCHEDULER"); registerTask("cleanup-logs", async (payload) => { const days = (payload?.days as number) ?? 30; logger.info("Log cleanup placeholder: would remove logs older than %d days", days); return { success: true, message: `Cleaned logs older than ${days} days` }; }); ``` - [ ] **Step 2: Import in the scheduler plugin** In `server/plugins/03.scheduler.ts`, add at the top: ```typescript import "../scheduler/tasks/cleanup-logs"; ``` - [ ] **Step 3: Commit** ```bash git add server/scheduler/tasks/cleanup-logs.ts server/plugins/03.scheduler.ts git commit -m "feat: add example cleanup-logs system task" ``` --- ### Task 18: Final verification and cleanup - [ ] **Step 1: Verify all API endpoints return correct responses** - [ ] **Step 2: Verify the scheduler plugin loads without errors in dev console** - [ ] **Step 3: Verify the admin page renders correctly**