import { dbGlobal } from "drizzle-pkg/lib/db"; import { userExportTasks } from "drizzle-pkg/lib/schema/export"; import { and, desc, eq, or } from "drizzle-orm"; import { createError } from "h3"; import { nextIntegerId } from "../../utils/sqlite-id"; type ExportMaskPolicy = "masked" | "raw"; async function getExportTaskById(taskId: number) { const [row] = await dbGlobal .select() .from(userExportTasks) .where(eq(userExportTasks.id, taskId)) .limit(1); return row ?? null; } async function getRequiredExportTaskById(taskId: number) { const row = await getExportTaskById(taskId); if (!row) { throw new Error(`export task not found: ${taskId}`); } return row; } export async function createExportTask(params: { userId: number; maskPolicy: ExportMaskPolicy }) { const [activeTask] = await dbGlobal .select({ id: userExportTasks.id }) .from(userExportTasks) .where( and( eq(userExportTasks.userId, params.userId), or(eq(userExportTasks.status, "queued"), eq(userExportTasks.status, "running")), ), ) .limit(1); if (activeTask) { throw createError({ statusCode: 409, statusMessage: "已有导出任务在处理中,请稍后再试" }); } const id = await nextIntegerId(userExportTasks, userExportTasks.id); await dbGlobal.insert(userExportTasks).values({ id, userId: params.userId, maskPolicy: params.maskPolicy, status: "queued", }); return getRequiredExportTaskById(id); } export async function listExportTasksByUser(userId: number) { return dbGlobal .select() .from(userExportTasks) .where(eq(userExportTasks.userId, userId)) .orderBy(desc(userExportTasks.id)); } export async function markExportTaskRunning(taskId: number) { const cutoffAt = new Date(); await dbGlobal .update(userExportTasks) .set({ status: "running", exportCutoffAt: cutoffAt, }) .where(and(eq(userExportTasks.id, taskId), eq(userExportTasks.status, "queued"))); const row = await getRequiredExportTaskById(taskId); if (row.status !== "running" || row.exportCutoffAt?.getTime() !== cutoffAt.getTime()) { throw new Error(`invalid export task transition for ${taskId}: expected queued -> running`); } return row; } export async function claimNextQueuedTask() { for (let i = 0; i < 5; i += 1) { const [queued] = await dbGlobal .select({ id: userExportTasks.id }) .from(userExportTasks) .where(eq(userExportTasks.status, "queued")) .orderBy(userExportTasks.id) .limit(1); if (!queued) { return null; } try { return await markExportTaskRunning(queued.id); } catch (error) { const message = error instanceof Error ? error.message : ""; if (!message.includes("invalid export task transition")) { throw error; } } } return null; } export async function markExportTaskSucceeded( taskId: number, payload: { outputDir: string; outputName: string; totalBytes: number; expiresAt: Date; }, ) { await dbGlobal .update(userExportTasks) .set({ status: "succeeded", outputDir: payload.outputDir, outputName: payload.outputName, totalBytes: payload.totalBytes, expiresAt: payload.expiresAt, errorCode: null, errorMessage: null, }) .where(and(eq(userExportTasks.id, taskId), eq(userExportTasks.status, "running"))); const row = await getRequiredExportTaskById(taskId); if (row.status !== "succeeded") { throw new Error(`invalid export task transition for ${taskId}: expected running -> succeeded`); } return row; } export async function markExportTaskFailed( taskId: number, payload: { errorCode: string; errorMessage: string; }, ) { await dbGlobal .update(userExportTasks) .set({ status: "failed", errorCode: payload.errorCode, errorMessage: payload.errorMessage, }) .where(and(eq(userExportTasks.id, taskId), eq(userExportTasks.status, "running"))); const row = await getRequiredExportTaskById(taskId); if (row.status !== "failed") { throw new Error(`invalid export task transition for ${taskId}: expected running -> failed`); } return row; } export async function getExportTaskForUser(taskId: number, userId: number) { const [row] = await dbGlobal .select() .from(userExportTasks) .where(and(eq(userExportTasks.id, taskId), eq(userExportTasks.userId, userId))) .limit(1); return row ?? null; }