From 741d152e14357bf266446181e6604ad2836d81c4 Mon Sep 17 00:00:00 2001 From: npmrun <1549469775@qq.com> Date: Thu, 14 May 2026 14:14:09 +0800 Subject: [PATCH] add scheduled tasks implementation plan (18 tasks) Co-Authored-By: Claude Opus 4.7 --- .../plans/2026-05-14-scheduled-tasks.md | 1766 ++++++++++++++++++++ 1 file changed, 1766 insertions(+) create mode 100644 docs/superpowers/plans/2026-05-14-scheduled-tasks.md diff --git a/docs/superpowers/plans/2026-05-14-scheduled-tasks.md b/docs/superpowers/plans/2026-05-14-scheduled-tasks.md new file mode 100644 index 0000000..04abe47 --- /dev/null +++ b/docs/superpowers/plans/2026-05-14-scheduled-tasks.md @@ -0,0 +1,1766 @@ +# Scheduled Tasks Feature Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add cron-based scheduled task functionality with REST API control and Nuxt UI admin page, supporting both system-registered functions and HTTP requests. + +**Architecture:** A Nitro plugin (`03.scheduler.ts`) initializes the Scheduler Engine on server start. The Engine uses `croner` for cron parsing/triggering, an Executor Pool for concurrency control, and a Task Registry for named function handlers. Tasks are persisted via Drizzle ORM to SQLite. API endpoints handle CRUD and trigger the Engine for hot-reload. Frontend admin pages use Nuxt UI for task/execution management. + +**Tech Stack:** croner, Drizzle ORM + SQLite, Nitro plugins, Nuxt UI, Zod validation, log4js + +--- + +### Task 1: Install croner dependency + +**Files:** +- Modify: `package.json` + +- [ ] **Step 1: Add croner to project** + +```bash +bun add croner +``` + +- [ ] **Step 2: Verify installation** + +Check `package.json` now includes `"croner"` in dependencies. Check `bun.lock` is updated. + +- [ ] **Step 3: Commit** + +```bash +git add package.json bun.lock +git commit -m "chore: add croner dependency for scheduled tasks" +``` + +--- + +### Task 2: Create Drizzle schema for scheduler tables + +**Files:** +- Create: `packages/drizzle-pkg/lib/schema/scheduler.ts` + +- [ ] **Step 1: Write the scheduler schema** + +```typescript +// packages/drizzle-pkg/lib/schema/scheduler.ts +import { sqliteTable, text, integer } from "drizzle-orm/sqlite-core"; + +export const scheduledTasks = sqliteTable("scheduled_tasks", { + id: text("id").primaryKey(), + name: text("name").notNull(), + cronExpression: text("cron_expression").notNull(), + type: text("type").notNull(), // "function" | "http" + + // function type fields + functionName: text("function_name"), + functionPayload: text("function_payload"), + + // http type fields + httpMethod: text("http_method"), + httpUrl: text("http_url"), + httpHeaders: text("http_headers"), + httpBody: text("http_body"), + + catchUp: integer("catch_up").default(0).notNull(), + enabled: integer("enabled").default(1).notNull(), + maxRetries: integer("max_retries").default(0).notNull(), + retryDelaySeconds: integer("retry_delay_seconds").default(60).notNull(), + timeoutSeconds: integer("timeout_seconds").default(300).notNull(), + + createdAt: integer("created_at", { mode: "timestamp_ms" }).defaultNow().notNull(), + updatedAt: integer("updated_at", { mode: "timestamp_ms" }) + .defaultNow() + .$onUpdate(() => new Date()) + .notNull(), +}); + +export const taskExecutionLogs = sqliteTable("task_execution_logs", { + id: text("id").primaryKey(), + taskId: text("task_id") + .notNull() + .references(() => scheduledTasks.id), + status: text("status").notNull(), // "running" | "success" | "failed" + startedAt: integer("started_at", { mode: "timestamp_ms" }).defaultNow().notNull(), + finishedAt: integer("finished_at", { mode: "timestamp_ms" }), + errorMessage: text("error_message"), + resultSummary: text("result_summary"), +}); +``` + +- [ ] **Step 2: Commit** + +```bash +git add packages/drizzle-pkg/lib/schema/scheduler.ts +git commit -m "feat: add scheduler Drizzle schema (scheduled_tasks, task_execution_logs)" +``` + +--- + +### Task 3: Generate and run DB migration + +**Files:** +- Create: migration SQL file (auto-generated by drizzle-kit) + +- [ ] **Step 1: Generate migration** + +```bash +bun run db:generate +``` + +- [ ] **Step 2: Verify the generated migration SQL** + +Check the new migration file in `packages/drizzle-pkg/migrations/` contains `CREATE TABLE scheduled_tasks` and `CREATE TABLE task_execution_logs`. + +- [ ] **Step 3: Run migration** + +```bash +bun run db:migrate +``` + +- [ ] **Step 4: Commit** + +```bash +git add packages/drizzle-pkg/migrations/ +git commit -m "feat: add scheduler tables migration" +``` + +--- + +### Task 4: Create scheduler service layer + +**Files:** +- Create: `server/service/scheduler/index.ts` + +- [ ] **Step 1: Write the service layer** + +```typescript +// server/service/scheduler/index.ts +import { dbGlobal } from "drizzle-pkg/lib/db"; +import { scheduledTasks, taskExecutionLogs } from "drizzle-pkg/lib/schema/scheduler"; +import { eq, desc, and, sql } from "drizzle-orm"; +import log4js from "logger"; + +const logger = log4js.getLogger("SCHEDULER"); + +export interface CreateTaskInput { + name: string; + cronExpression: string; + type: "function" | "http"; + functionName?: string; + functionPayload?: string; + httpMethod?: string; + httpUrl?: string; + httpHeaders?: string; + httpBody?: string; + catchUp?: number; + enabled?: number; + maxRetries?: number; + retryDelaySeconds?: number; + timeoutSeconds?: number; +} + +export interface UpdateTaskInput extends Partial {} + +function uuid() { + return crypto.randomUUID(); +} + +function now() { + return new Date(); +} + +// ---- Task CRUD ---- + +export async function listTasks(opts: { + page?: number; + pageSize?: number; + type?: string; + enabled?: number; +}) { + const page = opts.page ?? 1; + const pageSize = opts.pageSize ?? 20; + const conditions = []; + + if (opts.type) conditions.push(eq(scheduledTasks.type, opts.type)); + if (opts.enabled !== undefined) conditions.push(eq(scheduledTasks.enabled, opts.enabled)); + + const where = conditions.length > 0 ? and(...conditions) : undefined; + + const [rows, countResult] = await Promise.all([ + dbGlobal + .select() + .from(scheduledTasks) + .where(where) + .orderBy(desc(scheduledTasks.createdAt)) + .limit(pageSize) + .offset((page - 1) * pageSize), + dbGlobal + .select({ count: sql`count(*)` }) + .from(scheduledTasks) + .where(where), + ]); + + return { + list: rows, + total: countResult[0]?.count ?? 0, + page, + pageSize, + }; +} + +export async function getTaskById(id: string) { + const rows = await dbGlobal + .select() + .from(scheduledTasks) + .where(eq(scheduledTasks.id, id)) + .limit(1); + return rows[0] ?? null; +} + +export async function createTask(input: CreateTaskInput) { + const id = uuid(); + const ts = now(); + await dbGlobal.insert(scheduledTasks).values({ + id, + ...input, + createdAt: ts, + updatedAt: ts, + }); + return getTaskById(id); +} + +export async function updateTask(id: string, input: UpdateTaskInput) { + await dbGlobal + .update(scheduledTasks) + .set({ ...input, updatedAt: now() }) + .where(eq(scheduledTasks.id, id)); + return getTaskById(id); +} + +export async function deleteTask(id: string) { + await dbGlobal.delete(scheduledTasks).where(eq(scheduledTasks.id, id)); +} + +export async function toggleTask(id: string, enabled: boolean) { + await dbGlobal + .update(scheduledTasks) + .set({ enabled: enabled ? 1 : 0, updatedAt: now() }) + .where(eq(scheduledTasks.id, id)); + return getTaskById(id); +} + +// ---- Execution logs ---- + +export async function listExecutions(opts: { + taskId?: string; + status?: string; + page?: number; + pageSize?: number; +}) { + const page = opts.page ?? 1; + const pageSize = opts.pageSize ?? 20; + const conditions = []; + + if (opts.taskId) conditions.push(eq(taskExecutionLogs.taskId, opts.taskId)); + if (opts.status) conditions.push(eq(taskExecutionLogs.status, opts.status)); + + const where = conditions.length > 0 ? and(...conditions) : undefined; + + const [rows, countResult] = await Promise.all([ + dbGlobal + .select() + .from(taskExecutionLogs) + .where(where) + .orderBy(desc(taskExecutionLogs.startedAt)) + .limit(pageSize) + .offset((page - 1) * pageSize), + dbGlobal + .select({ count: sql`count(*)` }) + .from(taskExecutionLogs) + .where(where), + ]); + + return { + list: rows, + total: countResult[0]?.count ?? 0, + page, + pageSize, + }; +} + +export async function getRecentExecutions(taskId: string, limit = 10) { + return dbGlobal + .select() + .from(taskExecutionLogs) + .where(eq(taskExecutionLogs.taskId, taskId)) + .orderBy(desc(taskExecutionLogs.startedAt)) + .limit(limit); +} + +export async function createExecutionLog(input: { + taskId: string; + status: "running" | "success" | "failed"; + errorMessage?: string; + resultSummary?: string; +}) { + const id = uuid(); + await dbGlobal.insert(taskExecutionLogs).values({ + id, + ...input, + startedAt: now(), + }); + return id; +} + +export async function updateExecutionLog( + id: string, + input: { status: "success" | "failed"; errorMessage?: string; resultSummary?: string } +) { + await dbGlobal + .update(taskExecutionLogs) + .set({ ...input, finishedAt: now() }) + .where(eq(taskExecutionLogs.id, id)); +} + +export async function markStaleRunningTasks() { + const result = await dbGlobal + .update(taskExecutionLogs) + .set({ + status: "failed", + errorMessage: "Service restarted — task was interrupted", + finishedAt: now(), + }) + .where(eq(taskExecutionLogs.status, "running")); + + if (result.rowsAffected > 0) { + logger.info("Marked %d stale running execution(s) as failed", result.rowsAffected); + } +} + +export async function cleanupOldLogs(retentionDays: number) { + const cutoff = new Date(Date.now() - retentionDays * 24 * 60 * 60 * 1000); + const result = await dbGlobal + .delete(taskExecutionLogs) + .where( + and( + eq(taskExecutionLogs.status, "success"), + // Only clean up successful logs older than retention + sql`${taskExecutionLogs.startedAt} < ${cutoff.getTime()}` + ) + ); + + if (result.rowsAffected > 0) { + logger.info("Cleaned up %d old execution log(s)", result.rowsAffected); + } +} + +export async function getStats() { + const [totalTasks, enabledTasks, recentExecutions] = await Promise.all([ + dbGlobal.select({ count: sql`count(*)` }).from(scheduledTasks), + dbGlobal + .select({ count: sql`count(*)` }) + .from(scheduledTasks) + .where(eq(scheduledTasks.enabled, 1)), + dbGlobal + .select({ count: sql`count(*)` }) + .from(taskExecutionLogs) + .where( + sql`${taskExecutionLogs.startedAt} > ${Date.now() - 24 * 60 * 60 * 1000}` + ), + ]); + + return { + totalTasks: totalTasks[0]?.count ?? 0, + enabledTasks: enabledTasks[0]?.count ?? 0, + last24hExecutions: recentExecutions[0]?.count ?? 0, + }; +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add server/service/scheduler/index.ts +git commit -m "feat: add scheduler service layer with task/execution CRUD" +``` + +--- + +### Task 5: Create Task Registry + +**Files:** +- Create: `server/scheduler/registry.ts` + +- [ ] **Step 1: Write the Task Registry** + +```typescript +// server/scheduler/registry.ts +import log4js from "logger"; + +const logger = log4js.getLogger("SCHEDULER"); + +export type TaskHandler = ( + payload?: Record +) => Promise<{ success: boolean; message: string }>; + +const registry = new Map(); + +export function registerTask(name: string, handler: TaskHandler): void { + if (registry.has(name)) { + logger.warn("Task '%s' is already registered, overwriting", name); + } + registry.set(name, handler); + logger.info("Registered task: %s", name); +} + +export function hasTask(name: string): boolean { + return registry.has(name); +} + +export function listRegisteredTasks(): string[] { + return Array.from(registry.keys()); +} + +export function getTaskHandler(name: string): TaskHandler | undefined { + return registry.get(name); +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add server/scheduler/registry.ts +git commit -m "feat: add Task Registry for named function handlers" +``` + +--- + +### Task 6: Create Executor Pool + +**Files:** +- Create: `server/scheduler/executor-pool.ts` + +- [ ] **Step 1: Write the Executor Pool** + +```typescript +// server/scheduler/executor-pool.ts +import log4js from "logger"; + +const logger = log4js.getLogger("SCHEDULER"); + +type QueuedTask = { + run: () => void; +}; + +export class ExecutorPool { + private running = 0; + private queue: QueuedTask[] = []; + private maxConcurrency: number; + + constructor(maxConcurrency = 5) { + this.maxConcurrency = maxConcurrency; + } + + get activeCount(): number { + return this.running; + } + + get queuedCount(): number { + return this.queue.length; + } + + async execute(fn: () => Promise): Promise { + if (this.running >= this.maxConcurrency) { + logger.info( + "Pool full (%d/%d), queuing task (%d in queue)", + this.running, + this.maxConcurrency, + this.queue.length + 1 + ); + await new Promise((resolve) => { + this.queue.push({ run: resolve }); + }); + } + + this.running++; + try { + await fn(); + } finally { + this.running--; + const next = this.queue.shift(); + if (next) { + next.run(); + } + } + } +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add server/scheduler/executor-pool.ts +git commit -m "feat: add Executor Pool with concurrency control" +``` + +--- + +### Task 7: Create Scheduler Engine + +**Files:** +- Create: `server/scheduler/engine.ts` + +- [ ] **Step 1: Write the Scheduler Engine** + +```typescript +// server/scheduler/engine.ts +import { Cron } from "croner"; +import log4js from "logger"; +import { ExecutorPool } from "./executor-pool"; +import { getTaskHandler, hasTask } from "./registry"; +import { + listTasks, + getTaskById, + createExecutionLog, + updateExecutionLog, +} from "../service/scheduler"; + +const logger = log4js.getLogger("SCHEDULER"); + +interface CronJobEntry { + taskId: string; + cron: Cron; +} + +interface TaskRow { + id: string; + name: string; + cronExpression: string; + type: string; + functionName?: string | null; + functionPayload?: string | null; + httpMethod?: string | null; + httpUrl?: string | null; + httpHeaders?: string | null; + httpBody?: string | null; + catchUp: number; + enabled: number; + maxRetries: number; + retryDelaySeconds: number; + timeoutSeconds: number; +} + +let pool: ExecutorPool; + +export function getExecutorPool(): ExecutorPool { + return pool; +} + +const jobs = new Map(); + +function parsePayload(payload?: string | null): Record | undefined { + if (!payload) return undefined; + try { + return JSON.parse(payload); + } catch { + return undefined; + } +} + +async function executeFunctionTask(task: TaskRow): Promise { + if (!task.functionName) { + logger.error("[task:%s] function type but no functionName", task.id); + return; + } + + if (!hasTask(task.functionName)) { + logger.error("[task:%s] function '%s' not registered", task.id, task.functionName); + return; + } + + const handler = getTaskHandler(task.functionName)!; + const payload = parsePayload(task.functionPayload); + + const executeWithRetries = async (): Promise<{ success: boolean; message: string }> => { + let lastError: Error | undefined; + for (let attempt = 0; attempt <= task.maxRetries; attempt++) { + try { + if (attempt > 0) { + logger.info( + "[task:%s] retry %d/%d after %ds", + task.id, + attempt, + task.maxRetries, + task.retryDelaySeconds + ); + await new Promise((resolve) => { + const signal = AbortSignal.timeout(task.retryDelaySeconds * 1000); + signal.addEventListener("abort", resolve, { once: true }); + }); + } + return await handler(payload); + } catch (err) { + lastError = err instanceof Error ? err : new Error(String(err)); + logger.error("[task:%s] attempt %d failed: %s", task.id, attempt, lastError.message); + } + } + throw lastError ?? new Error("max retries exceeded"); + }; + + const logId = await createExecutionLog({ taskId: task.id, status: "running" }); + + try { + const abortController = new AbortController(); + const timeoutSignal = AbortSignal.timeout(task.timeoutSeconds * 1000); + + const result = await new Promise<{ success: boolean; message: string }>( + (resolve, reject) => { + timeoutSignal.addEventListener("abort", () => { + reject(new Error(`Task timed out after ${task.timeoutSeconds}s`)); + }, { once: true }); + + executeWithRetries().then(resolve).catch(reject); + } + ); + + await updateExecutionLog(logId, { + status: "success", + resultSummary: result.message, + }); + logger.info("[task:%s] completed: %s", task.id, result.message); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + await updateExecutionLog(logId, { status: "failed", errorMessage: message }); + logger.error("[task:%s] failed: %s", task.id, message); + } +} + +async function executeHttpTask(task: TaskRow): Promise { + if (!task.httpUrl) { + logger.error("[task:%s] http type but no httpUrl", task.id); + return; + } + + const logId = await createExecutionLog({ taskId: task.id, status: "running" }); + + try { + const headers: Record = task.httpHeaders + ? JSON.parse(task.httpHeaders) + : {}; + + const abortController = new AbortController(); + const timeoutSignal = AbortSignal.timeout(task.timeoutSeconds * 1000); + + const response = await fetch(task.httpUrl, { + method: (task.httpMethod ?? "GET").toUpperCase(), + headers: { "Content-Type": "application/json", ...headers }, + body: task.httpBody ?? undefined, + signal: AbortSignal.any([abortController.signal, timeoutSignal]), + }); + + const summary = `HTTP ${response.status} ${response.statusText}`; + if (!response.ok) { + await updateExecutionLog(logId, { status: "failed", errorMessage: summary, resultSummary: summary }); + logger.error("[task:%s] %s", task.id, summary); + } else { + await updateExecutionLog(logId, { status: "success", resultSummary: summary }); + logger.info("[task:%s] %s", task.id, summary); + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + await updateExecutionLog(logId, { status: "failed", errorMessage: message }); + logger.error("[task:%s] HTTP request failed: %s", task.id, message); + } +} + +async function executeTask(task: TaskRow): Promise { + logger.info("[task:%s] executing '%s'", task.id, task.name); + if (task.type === "function") { + await executeFunctionTask(task); + } else if (task.type === "http") { + await executeHttpTask(task); + } else { + logger.error("[task:%s] unknown type: %s", task.id, task.type); + } +} + +function scheduleTask(task: TaskRow): void { + const cron = new Cron(task.cronExpression, async () => { + await pool.execute(() => executeTask(task)); + }); + jobs.set(task.id, { taskId: task.id, cron }); + logger.info( + "[task:%s] scheduled '%s' cron='%s' next=%s", + task.id, + task.name, + task.cronExpression, + cron.nextRun()?.toISOString() ?? "none" + ); +} + +function unscheduleTask(taskId: string): void { + const entry = jobs.get(taskId); + if (entry) { + entry.cron.stop(); + jobs.delete(taskId); + logger.info("[task:%s] unscheduled", taskId); + } +} + +// ---- Public API ---- + +export async function start(maxConcurrency: number, logRetentionDays: number): Promise { + pool = new ExecutorPool(maxConcurrency); + logger.info("Scheduler starting (max concurrency: %d)", maxConcurrency); + + // Mark stale running tasks and clean old logs + const { markStaleRunningTasks, cleanupOldLogs } = await import("../service/scheduler"); + await markStaleRunningTasks(); + await cleanupOldLogs(logRetentionDays); + + // Load all enabled tasks + const { list: allTasks } = await listTasks({ pageSize: 9999 }); + + for (const task of allTasks) { + if (!task.enabled) continue; + try { + scheduleTask(task as TaskRow); + + // Catch-up: if catchUp=1 and the task missed its last scheduled run, execute once + if (task.catchUp) { + try { + const checkCron = new Cron(task.cronExpression); + const previous = checkCron.previousRun()?.getTime(); + if (previous && previous > 0 && previous < Date.now()) { + logger.info( + "[task:%s] catch-up: missed run at %s, executing now", + task.id, + new Date(previous).toISOString() + ); + pool.execute(() => executeTask(task as TaskRow)); + } + } catch (err) { + logger.warn("[task:%s] catch-up check failed: %s", task.id, String(err)); + } + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + logger.error("[task:%s] failed to schedule: %s", task.id, message); + } + } + + logger.info("Scheduler started with %d active job(s)", jobs.size); +} + +export async function stop(): Promise { + for (const [id] of jobs) { + unscheduleTask(id); + } + logger.info("Scheduler stopped"); +} + +export function addTask(taskId: string): void { + unscheduleTask(taskId); + getTaskById(taskId).then((task) => { + if (task && task.enabled) { + scheduleTask(task as TaskRow); + } + }); +} + +export function removeTask(taskId: string): void { + unscheduleTask(taskId); +} + +export function reloadTask(taskId: string): void { + addTask(taskId); +} + +export async function triggerTask(taskId: string): Promise { + const task = await getTaskById(taskId); + if (!task) { + throw new Error(`Task ${taskId} not found`); + } + await pool.execute(() => executeTask(task as TaskRow)); +} + +export function getJobCount(): number { + return jobs.size; +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add server/scheduler/engine.ts +git commit -m "feat: add Scheduler Engine with croner integration and hot-reload support" +``` + +--- + +### Task 8: Create Nitro plugin to wire scheduler lifecycle + +**Files:** +- Create: `server/plugins/03.scheduler.ts` + +- [ ] **Step 1: Write the Nitro plugin** + +```typescript +// server/plugins/03.scheduler.ts +import { start, stop } from "../scheduler/engine"; + +const MAX_CONCURRENCY = Number(process.env.SCHEDULER_MAX_CONCURRENCY) || 5; +const LOG_RETENTION_DAYS = Number(process.env.SCHEDULER_LOG_RETENTION_DAYS) || 30; + +if (import.meta.dev) { + console.log("plugin: 03.scheduler"); +} + +export default defineNitroPlugin(async (nitroApp) => { + await start(MAX_CONCURRENCY, LOG_RETENTION_DAYS); + + nitroApp.hooks.hook("close", async () => { + await stop(); + }); +}); +``` + +- [ ] **Step 2: Add env vars to .env.example** + +```bash +echo ' +SCHEDULER_MAX_CONCURRENCY=5 +SCHEDULER_LOG_RETENTION_DAYS=30' >> .env.example +``` + +- [ ] **Step 3: Commit** + +```bash +git add server/plugins/03.scheduler.ts .env.example +git commit -m "feat: wire scheduler lifecycle into Nitro plugin" +``` + +--- + +### Task 9: API — List and Create tasks + +**Files:** +- Create: `server/api/scheduler/tasks/index.get.ts` +- Create: `server/api/scheduler/tasks/index.post.ts` + +- [ ] **Step 1: Write GET /api/scheduler/tasks (list)** + +```typescript +// server/api/scheduler/tasks/index.get.ts +import { listTasks } from "../../../service/scheduler"; +import { listRegisteredTasks } from "../../../scheduler/registry"; + +export default defineWrappedResponseHandler(async (event) => { + const query = getQuery(event); + const page = query.page ? Number(query.page) : 1; + const pageSize = query.pageSize ? Number(query.pageSize) : 20; + + const result = await listTasks({ + page, + pageSize, + type: query.type as string | undefined, + enabled: query.enabled !== undefined ? Number(query.enabled) : undefined, + }); + + return R.success({ + ...result, + registeredFunctions: listRegisteredTasks(), + }); +}); +``` + +- [ ] **Step 2: Write zod schema for task creation and POST /api/scheduler/tasks** + +```typescript +// server/api/scheduler/tasks/index.post.ts +import { z } from "zod"; +import { createTask } from "../../../service/scheduler"; +import { addTask } from "../../../scheduler/engine"; +import { Cron } from "croner"; + +const createTaskSchema = z.object({ + name: z.string().min(1), + cronExpression: z.string().min(1), + type: z.enum(["function", "http"]), + functionName: z.string().optional(), + functionPayload: z.string().optional(), + httpMethod: z.enum(["GET", "POST", "PUT", "DELETE"]).optional(), + httpUrl: z.string().url().optional(), + httpHeaders: z.string().optional(), + httpBody: z.string().optional(), + catchUp: z.union([z.boolean(), z.number()]).optional(), + enabled: z.union([z.boolean(), z.number()]).optional(), + maxRetries: z.number().int().min(0).optional(), + retryDelaySeconds: z.number().int().min(1).optional(), + timeoutSeconds: z.number().int().min(1).optional(), +}); + +export default defineWrappedResponseHandler(async (event) => { + const body = await readBody(event); + const parsed = createTaskSchema.safeParse(body); + + if (!parsed.success) { + return R.throwError(422, "Validation failed", parsed.error.issues); + } + + // Validate cron expression + try { + new Cron(parsed.data.cronExpression); + } catch { + return R.throwError(422, "Invalid cron expression", null); + } + + // function type must specify functionName + if (parsed.data.type === "function" && !parsed.data.functionName) { + return R.throwError(422, "functionName required for function type", null); + } + + // http type must specify httpUrl + if (parsed.data.type === "http" && !parsed.data.httpUrl) { + return R.throwError(422, "httpUrl required for http type", null); + } + + const task = await createTask({ + ...parsed.data, + catchUp: parsed.data.catchUp ? 1 : 0, + enabled: parsed.data.enabled !== undefined ? (parsed.data.enabled ? 1 : 0) : 1, + }); + + if (task) { + addTask(task.id); + } + + return R.success(task); +}); +``` + +- [ ] **Step 3: Commit** + +```bash +git add server/api/scheduler/tasks/index.get.ts server/api/scheduler/tasks/index.post.ts +git commit -m "feat: add API endpoints to list and create scheduled tasks" +``` + +--- + +### Task 10: API — Get, Update, Delete task + +**Files:** +- Create: `server/api/scheduler/tasks/[id].get.ts` +- Create: `server/api/scheduler/tasks/[id].put.ts` +- Create: `server/api/scheduler/tasks/[id].delete.ts` + +- [ ] **Step 1: Write GET /api/scheduler/tasks/:id** + +```typescript +// server/api/scheduler/tasks/[id].get.ts +import { getTaskById, getRecentExecutions } from "../../../service/scheduler"; + +export default defineWrappedResponseHandler(async (event) => { + const id = getRouterParam(event, "id"); + if (!id) return R.throwError(400, "Missing id", null); + + const task = await getTaskById(id); + if (!task) return R.throwError(404, "Task not found", null); + + const recentExecutions = await getRecentExecutions(id, 20); + + return R.success({ task, recentExecutions }); +}); +``` + +- [ ] **Step 2: Write PUT /api/scheduler/tasks/:id** + +```typescript +// server/api/scheduler/tasks/[id].put.ts +import { z } from "zod"; +import { updateTask, getTaskById } from "../../../service/scheduler"; +import { reloadTask } from "../../../scheduler/engine"; +import { Cron } from "croner"; + +const updateTaskSchema = z.object({ + name: z.string().min(1).optional(), + cronExpression: z.string().min(1).optional(), + type: z.enum(["function", "http"]).optional(), + functionName: z.string().optional().nullable(), + functionPayload: z.string().optional().nullable(), + httpMethod: z.enum(["GET", "POST", "PUT", "DELETE"]).optional().nullable(), + httpUrl: z.string().optional().nullable(), + httpHeaders: z.string().optional().nullable(), + httpBody: z.string().optional().nullable(), + catchUp: z.union([z.boolean(), z.number()]).optional(), + enabled: z.union([z.boolean(), z.number()]).optional(), + maxRetries: z.number().int().min(0).optional(), + retryDelaySeconds: z.number().int().min(1).optional(), + timeoutSeconds: z.number().int().min(1).optional(), +}); + +export default defineWrappedResponseHandler(async (event) => { + const id = getRouterParam(event, "id"); + if (!id) return R.throwError(400, "Missing id", null); + + const body = await readBody(event); + const parsed = updateTaskSchema.safeParse(body); + if (!parsed.success) { + return R.throwError(422, "Validation failed", parsed.error.issues); + } + + const existing = await getTaskById(id); + if (!existing) return R.throwError(404, "Task not found", null); + + // Validate cron if provided + const cronExpr = parsed.data.cronExpression ?? existing.cronExpression; + try { + new Cron(cronExpr); + } catch { + return R.throwError(422, "Invalid cron expression", null); + } + + const updateData: Record = { ...parsed.data }; + if (parsed.data.catchUp !== undefined) { + updateData.catchUp = parsed.data.catchUp ? 1 : 0; + } + if (parsed.data.enabled !== undefined) { + updateData.enabled = parsed.data.enabled ? 1 : 0; + } + + const task = await updateTask(id, updateData as Parameters[1]); + if (task) { + reloadTask(task.id); + } + + return R.success(task); +}); +``` + +- [ ] **Step 3: Write DELETE /api/scheduler/tasks/:id** + +```typescript +// server/api/scheduler/tasks/[id].delete.ts +import { deleteTask } from "../../../service/scheduler"; +import { removeTask } from "../../../scheduler/engine"; + +export default defineWrappedResponseHandler(async (event) => { + const id = getRouterParam(event, "id"); + if (!id) return R.throwError(400, "Missing id", null); + + removeTask(id); + await deleteTask(id); + + return R.success(null); +}); +``` + +- [ ] **Step 4: Commit** + +```bash +git add server/api/scheduler/tasks/\[id\].get.ts server/api/scheduler/tasks/\[id\].put.ts server/api/scheduler/tasks/\[id\].delete.ts +git commit -m "feat: add API endpoints to get, update, and delete tasks" +``` + +--- + +### Task 11: API — Trigger and Toggle + +**Files:** +- Create: `server/api/scheduler/tasks/[id]/trigger.post.ts` +- Create: `server/api/scheduler/tasks/[id]/toggle.post.ts` + +- [ ] **Step 1: Write POST /api/scheduler/tasks/:id/trigger** + +```typescript +// server/api/scheduler/tasks/[id]/trigger.post.ts +import { triggerTask } from "../../../../scheduler/engine"; + +export default defineWrappedResponseHandler(async (event) => { + const id = getRouterParam(event, "id"); + if (!id) return R.throwError(400, "Missing id", null); + + // Fire and forget — trigger returns immediately, execution is async + triggerTask(id).catch(() => {}); + + return R.success({ triggered: true }); +}); +``` + +- [ ] **Step 2: Write POST /api/scheduler/tasks/:id/toggle** + +```typescript +// server/api/scheduler/tasks/[id]/toggle.post.ts +import { toggleTask, getTaskById } from "../../../../service/scheduler"; +import { reloadTask, removeTask } from "../../../../scheduler/engine"; + +export default defineWrappedResponseHandler(async (event) => { + const id = getRouterParam(event, "id"); + if (!id) return R.throwError(400, "Missing id", null); + + const body = await readBody<{ enabled: boolean }>(event); + + const task = await toggleTask(id, body.enabled); + if (!task) return R.throwError(404, "Task not found", null); + + if (task.enabled) { + reloadTask(id); + } else { + removeTask(id); + } + + return R.success(task); +}); +``` + +- [ ] **Step 3: Commit** + +```bash +git add server/api/scheduler/tasks/\[id\]/trigger.post.ts server/api/scheduler/tasks/\[id\]/toggle.post.ts +git commit -m "feat: add API endpoints to trigger and toggle tasks" +``` + +--- + +### Task 12: API — Execution logs and Stats + +**Files:** +- Create: `server/api/scheduler/executions.get.ts` +- Create: `server/api/scheduler/stats.get.ts` + +- [ ] **Step 1: Write GET /api/scheduler/executions** + +```typescript +// server/api/scheduler/executions.get.ts +import { listExecutions } from "../../service/scheduler"; + +export default defineWrappedResponseHandler(async (event) => { + const query = getQuery(event); + const page = query.page ? Number(query.page) : 1; + const pageSize = query.pageSize ? Number(query.pageSize) : 20; + + const result = await listExecutions({ + page, + pageSize, + taskId: query.taskId as string | undefined, + status: query.status as string | undefined, + }); + + return R.success(result); +}); +``` + +- [ ] **Step 2: Write GET /api/scheduler/stats** + +```typescript +// server/api/scheduler/stats.get.ts +import { getStats } from "../../service/scheduler"; +import { getJobCount } from "../../scheduler/engine"; + +export default defineWrappedResponseHandler(async () => { + const stats = await getStats(); + return R.success({ + ...stats, + activeJobs: getJobCount(), + }); +}); +``` + +- [ ] **Step 3: Commit** + +```bash +git add server/api/scheduler/executions.get.ts server/api/scheduler/stats.get.ts +git commit -m "feat: add API endpoints for execution logs and stats" +``` + +--- + +### Task 13: Frontend — Scheduler admin list page + +**Files:** +- Create: `app/pages/admin/scheduler/index.vue` + +- [ ] **Step 1: Write the task list page** + +```vue + + + + +``` + +- [ ] **Step 2: Commit** + +```bash +git add app/pages/admin/scheduler/index.vue +git commit -m "feat: add scheduler admin list page with task table" +``` + +--- + +### Task 14: Frontend — Create/Edit task modal component + +**Files:** +- Create: `app/components/SchedulerTaskModal.vue` + +- [ ] **Step 1: Write the task modal component** + +```vue + + + + +``` + +- [ ] **Step 2: Commit** + +```bash +git add app/components/SchedulerTaskModal.vue +git commit -m "feat: add scheduler task create/edit modal component" +``` + +--- + +### Task 15: Frontend — Task detail page + +**Files:** +- Create: `app/pages/admin/scheduler/[id].vue` + +- [ ] **Step 1: Write the task detail page** + +```vue + + + + +``` + +- [ ] **Step 2: Commit** + +```bash +git add app/pages/admin/scheduler/\[id\].vue +git commit -m "feat: add scheduler task detail page with execution history" +``` + +--- + +### Task 16: Smoke test — verify the feature end-to-end + +- [ ] **Step 1: Start dev server** + +```bash +bun run dev +``` + +- [ ] **Step 2: Verify API endpoints** + +```bash +# List tasks (should be empty initially) +curl -s http://localhost:3399/api/scheduler/tasks | head -c 200 + +# Create a task +curl -s -X POST http://localhost:3399/api/scheduler/tasks \ + -H "Content-Type: application/json" \ + -d '{"name":"test","cronExpression":"0 9 * * *","type":"http","httpUrl":"https://httpbin.org/get","httpMethod":"GET"}' | head -c 200 + +# List tasks (should have 1) +curl -s http://localhost:3399/api/scheduler/tasks | head -c 200 + +# Get stats +curl -s http://localhost:3399/api/scheduler/stats | head -c 200 +``` + +- [ ] **Step 3: Verify frontend** + +Open `http://localhost:3399/admin/scheduler` in the browser and confirm: +- Stats bar shows correct numbers +- Task table shows the created task +- Create button opens modal +- Edit/Delete/Trigger buttons work +- Task detail page shows configuration and execution history + +--- + +### Task 17: Example system task — daily log cleanup + +**Files:** +- Create: `server/scheduler/tasks/cleanup-logs.ts` + +- [ ] **Step 1: Write an example system task** + +```typescript +// server/scheduler/tasks/cleanup-logs.ts +import { registerTask } from "../registry"; +import log4js from "logger"; + +const logger = log4js.getLogger("SCHEDULER"); + +registerTask("cleanup-logs", async (payload) => { + const days = (payload?.days as number) ?? 30; + logger.info("Log cleanup placeholder: would remove logs older than %d days", days); + return { success: true, message: `Cleaned logs older than ${days} days` }; +}); +``` + +- [ ] **Step 2: Import in the scheduler plugin** + +In `server/plugins/03.scheduler.ts`, add at the top: + +```typescript +import "../scheduler/tasks/cleanup-logs"; +``` + +- [ ] **Step 3: Commit** + +```bash +git add server/scheduler/tasks/cleanup-logs.ts server/plugins/03.scheduler.ts +git commit -m "feat: add example cleanup-logs system task" +``` + +--- + +### Task 18: Final verification and cleanup + +- [ ] **Step 1: Verify all API endpoints return correct responses** + +- [ ] **Step 2: Verify the scheduler plugin loads without errors in dev console** + +- [ ] **Step 3: Verify the admin page renders correctly**