diff --git a/server/api/me/export/request.post.ts b/server/api/me/export/request.post.ts new file mode 100644 index 0000000..8501581 --- /dev/null +++ b/server/api/me/export/request.post.ts @@ -0,0 +1,18 @@ +import { createExportTask } from "#server/service/export/jobs"; +import { runExportTask } from "#server/service/export/run"; +import { parseMeExportRequestBody } from "#server/utils/me-export-request-body"; +import { R } from "#server/utils/response"; + +export default defineWrappedResponseHandler(async (event) => { + const user = await event.context.auth.requireUser(); + const body = await readBody(event); + const { maskPolicy } = parseMeExportRequestBody(body); + const task = await createExportTask({ userId: user.id, maskPolicy }); + void runExportTask(task.id).catch(() => { + // 后台执行失败会在任务状态中可见,此处不影响请求返回。 + }); + return R.success({ + taskId: task.id, + status: task.status, + }); +}); diff --git a/server/api/me/export/tasks.get.ts b/server/api/me/export/tasks.get.ts new file mode 100644 index 0000000..74e422b --- /dev/null +++ b/server/api/me/export/tasks.get.ts @@ -0,0 +1,21 @@ +import { listExportTasksByUser } from "#server/service/export/jobs"; +import { R } from "#server/utils/response"; + +export default defineWrappedResponseHandler(async (event) => { + const user = await event.context.auth.requireUser(); + const tasks = await listExportTasksByUser(user.id); + return R.success({ + items: tasks.map((task) => ({ + id: task.id, + status: task.status, + maskPolicy: task.maskPolicy, + outputName: task.outputName, + totalBytes: task.totalBytes, + errorCode: task.errorCode, + errorMessage: task.errorMessage, + createdAt: task.createdAt.toISOString(), + updatedAt: task.updatedAt.toISOString(), + expiresAt: task.expiresAt ? task.expiresAt.toISOString() : null, + })), + }); +}); diff --git a/server/api/me/export/tasks/[id]/download.get.ts b/server/api/me/export/tasks/[id]/download.get.ts new file mode 100644 index 0000000..3937a15 --- /dev/null +++ b/server/api/me/export/tasks/[id]/download.get.ts @@ -0,0 +1,36 @@ +import fs from "node:fs"; +import path from "node:path"; +import { sendStream, setHeader } from "h3"; +import { getExportTaskForUser } from "#server/service/export/jobs"; + +export default defineWrappedResponseHandler(async (event) => { + const user = await event.context.auth.requireUser(); + const idRaw = getRouterParam(event, "id"); + const taskId = Number(idRaw); + if (!Number.isInteger(taskId) || taskId < 1) { + throw createError({ statusCode: 400, statusMessage: "无效的任务 id" }); + } + + const task = await getExportTaskForUser(taskId, user.id); + if (!task) { + throw createError({ statusCode: 404, statusMessage: "导出任务不存在" }); + } + if (task.status !== "succeeded") { + throw createError({ statusCode: 409, statusMessage: "导出任务尚未完成" }); + } + if (!task.expiresAt || task.expiresAt.getTime() <= Date.now()) { + throw createError({ statusCode: 410, statusMessage: "导出结果已过期" }); + } + if (!task.outputDir || !task.outputName) { + throw createError({ statusCode: 500, statusMessage: "导出结果缺失" }); + } + + const manifestPath = path.resolve(task.outputDir, "manifest.json"); + if (!fs.existsSync(manifestPath)) { + throw createError({ statusCode: 404, statusMessage: "导出文件不存在" }); + } + + setHeader(event, "Content-Type", "application/json; charset=utf-8"); + setHeader(event, "Content-Disposition", `attachment; filename="${task.outputName}-manifest.json"`); + return sendStream(event, fs.createReadStream(manifestPath)); +}); diff --git a/server/constants/export.ts b/server/constants/export.ts new file mode 100644 index 0000000..6322906 --- /dev/null +++ b/server/constants/export.ts @@ -0,0 +1,25 @@ +export const EXPORT_MASK_POLICIES = ["masked", "raw"] as const; + +export type ExportMaskPolicy = (typeof EXPORT_MASK_POLICIES)[number]; + +/** + * 标准化后(小写并移除 `_` / `-`)的永不导出字段精确匹配名单。 + */ +export const NEVER_EXPORT_FIELD_EXACT_NAMES = [ + "password", + "passwordhash", + "resettoken", + "resetpasswordtoken", + "sessionid", + "sessiontoken", + "accesstoken", + "refreshtoken", + "apikey", + "secretkey", + "credential", +] as const; + +/** + * masked 策略下优先脱敏的字段关键词(可扩展)。 + */ +export const DEFAULT_MASK_FIELD_KEYWORDS = ["email", "phone", "mobile"] as const; diff --git a/server/service/export/build-data.ts b/server/service/export/build-data.ts new file mode 100644 index 0000000..7165a56 --- /dev/null +++ b/server/service/export/build-data.ts @@ -0,0 +1,113 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { dbGlobal } from "drizzle-pkg/lib/db"; +import { users } from "drizzle-pkg/lib/schema/auth"; +import { userConfigs } from "drizzle-pkg/lib/schema/config"; +import { mediaAssets, mediaRefs, postComments, posts, timelineEvents } from "drizzle-pkg/lib/schema/content"; +import { and, eq, inArray, lte, or } from "drizzle-orm"; +import { MEDIA_REF_OWNER_POST } from "../../constants/media-refs"; +import type { ExportMaskPolicy } from "../../constants/export"; +import { sanitizeUserForExport } from "../../utils/export-mask"; + +export type BuildExportDataResult = { + dataFiles: Array<{ file: string; rowCount: number }>; + totalRows: number; +}; + +async function writeDataFile(baseDir: string, fileName: string, payload: unknown): Promise<{ file: string; rowCount: number }> { + const file = `data/${fileName}`; + const fullPath = path.resolve(baseDir, fileName); + await fs.writeFile(fullPath, `${JSON.stringify(payload, null, 2)}\n`, "utf8"); + const rowCount = Array.isArray(payload) ? payload.length : payload ? 1 : 0; + return { file, rowCount }; +} + +export async function buildExportDataJson(params: { + userId: number; + maskPolicy: ExportMaskPolicy; + cutoffAt: Date; + outputDataDir: string; +}): Promise { + await fs.mkdir(params.outputDataDir, { recursive: true }); + + const [userRow] = await dbGlobal.select().from(users).where(eq(users.id, params.userId)).limit(1); + if (!userRow) { + throw new Error(`user not found: ${params.userId}`); + } + + const sanitizedUser = sanitizeUserForExport(userRow as unknown as Record, params.maskPolicy); + + const [postRows, timelineRows, mediaRows, configRows] = await Promise.all([ + dbGlobal + .select() + .from(posts) + .where(and(eq(posts.userId, params.userId), lte(posts.createdAt, params.cutoffAt))), + dbGlobal + .select() + .from(timelineEvents) + .where(and(eq(timelineEvents.userId, params.userId), lte(timelineEvents.createdAt, params.cutoffAt))), + dbGlobal + .select() + .from(mediaAssets) + .where(and(eq(mediaAssets.userId, params.userId), lte(mediaAssets.createdAt, params.cutoffAt))), + dbGlobal + .select() + .from(userConfigs) + .where(and(eq(userConfigs.userId, params.userId), lte(userConfigs.updatedAt, params.cutoffAt))), + ]); + + const postIds = postRows.map((p) => p.id); + const commentCondition = + postIds.length === 0 + ? and(lte(postComments.createdAt, params.cutoffAt), eq(postComments.authorUserId, params.userId)) + : and( + lte(postComments.createdAt, params.cutoffAt), + or(eq(postComments.authorUserId, params.userId), inArray(postComments.postId, postIds)), + ); + const commentRowsRaw = await dbGlobal.select().from(postComments).where(commentCondition); + + const mediaRefRows = + postIds.length === 0 + ? [] + : await dbGlobal + .select() + .from(mediaRefs) + .where(and(eq(mediaRefs.ownerType, MEDIA_REF_OWNER_POST), inArray(mediaRefs.ownerId, postIds))); + const commentRows = commentRowsRaw.map((row) => + params.maskPolicy === "masked" + ? sanitizeUserForExport(row as unknown as Record, params.maskPolicy) + : row, + ); + const configRowsMasked = configRows.map((row) => + params.maskPolicy === "masked" + ? sanitizeUserForExport(row as unknown as Record, params.maskPolicy) + : row, + ); + + const dataFiles = await Promise.all([ + writeDataFile(params.outputDataDir, "user.json", sanitizedUser), + writeDataFile(params.outputDataDir, "posts.json", postRows), + writeDataFile(params.outputDataDir, "timeline.json", timelineRows), + writeDataFile(params.outputDataDir, "comments.json", commentRows), + writeDataFile(params.outputDataDir, "media-assets.json", mediaRows), + writeDataFile(params.outputDataDir, "media-refs.json", mediaRefRows), + writeDataFile(params.outputDataDir, "user-configs.json", configRowsMasked), + ]); + + const totalRows = dataFiles.reduce((sum, item) => sum + item.rowCount, 0); + + return { dataFiles, totalRows }; +} + +export async function listExportUserMediaAssets(params: { + userId: number; + cutoffAt: Date; +}): Promise> { + const rows = await dbGlobal + .select({ + storageKey: mediaAssets.storageKey, + }) + .from(mediaAssets) + .where(and(eq(mediaAssets.userId, params.userId), lte(mediaAssets.createdAt, params.cutoffAt))); + return rows; +} diff --git a/server/service/export/build-files.ts b/server/service/export/build-files.ts new file mode 100644 index 0000000..4b01be3 --- /dev/null +++ b/server/service/export/build-files.ts @@ -0,0 +1,63 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { RELATIVE_ASSETS_DIR } from "../../constants/media"; +import { listExportUserMediaAssets } from "./build-data"; + +type ExportFileAsset = { + storageKey: string; +}; + +export type BuildExportFilesResult = { + files: Array<{ file: string; bytes: number }>; + missingCount: number; +}; + +function resolveAssetsBaseDir(): string { + return path.resolve(process.cwd(), RELATIVE_ASSETS_DIR); +} + +export async function buildExportMediaFiles(params: { + userId: number; + cutoffAt: Date; + outputFilesDir: string; +}): Promise { + await fs.mkdir(params.outputFilesDir, { recursive: true }); + + const mediaAssets: ExportFileAsset[] = await listExportUserMediaAssets({ + userId: params.userId, + cutoffAt: params.cutoffAt, + }); + const assetsBaseDir = resolveAssetsBaseDir(); + let missingCount = 0; + const files: Array<{ file: string; bytes: number }> = []; + + for (const asset of mediaAssets) { + const key = asset.storageKey; + if (!key || key.includes("..") || path.isAbsolute(key)) { + missingCount += 1; + continue; + } + + const fromPath = path.resolve(assetsBaseDir, key); + const toPath = path.resolve(params.outputFilesDir, key); + const relativeOutputPath = `files/${key}`; + await fs.mkdir(path.dirname(toPath), { recursive: true }); + + try { + await fs.copyFile(fromPath, toPath); + const stat = await fs.stat(toPath); + files.push({ file: relativeOutputPath, bytes: stat.size }); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + missingCount += 1; + continue; + } + throw error; + } + } + + return { + files, + missingCount, + }; +} diff --git a/server/service/export/build-manifest.test.ts b/server/service/export/build-manifest.test.ts new file mode 100644 index 0000000..29cadf1 --- /dev/null +++ b/server/service/export/build-manifest.test.ts @@ -0,0 +1,47 @@ +import { describe, expect, test } from "bun:test"; +import { buildExportManifest } from "./build-manifest"; + +describe("buildExportManifest", () => { + test("builds manifest with required fields and checksums", () => { + const exportedAt = new Date("2026-04-24T10:00:00.000Z"); + const cutoffAt = new Date("2026-04-24T09:00:00.000Z"); + + const manifest = buildExportManifest({ + schemaVersion: 1, + userId: 123, + maskPolicy: "masked", + exportedAt, + exportCutoffAt: cutoffAt, + stats: { + dataRows: 10, + files: 1, + bytes: 1024, + }, + dataChecksums: [ + { file: "data/user.json", sha256: "sha256-user" }, + { file: "data/media-assets.json", sha256: "sha256-media" }, + ], + fileChecksums: [{ file: "files/a.webp", sha256: "sha256-file-a" }], + }); + + expect(manifest).toEqual({ + schemaVersion: 1, + exportedAt: exportedAt.toISOString(), + exportCutoffAt: cutoffAt.toISOString(), + userId: 123, + maskPolicy: "masked", + stats: { + dataRows: 10, + files: 1, + bytes: 1024, + }, + checksums: { + data: [ + { file: "data/user.json", sha256: "sha256-user" }, + { file: "data/media-assets.json", sha256: "sha256-media" }, + ], + files: [{ file: "files/a.webp", sha256: "sha256-file-a" }], + }, + }); + }); +}); diff --git a/server/service/export/build-manifest.ts b/server/service/export/build-manifest.ts new file mode 100644 index 0000000..3effb2c --- /dev/null +++ b/server/service/export/build-manifest.ts @@ -0,0 +1,44 @@ +import type { ExportMaskPolicy } from "../../constants/export"; + +type ExportManifestStats = { + dataRows: number; + files: number; + bytes: number; +}; + +export type ExportManifest = { + schemaVersion: number; + exportedAt: string; + exportCutoffAt: string; + userId: number; + maskPolicy: ExportMaskPolicy; + stats: ExportManifestStats; + checksums: { + data: Array<{ file: string; sha256: string }>; + files: Array<{ file: string; sha256: string }>; + }; +}; + +export function buildExportManifest(params: { + schemaVersion: number; + userId: number; + maskPolicy: ExportMaskPolicy; + exportedAt: Date; + exportCutoffAt: Date; + stats: ExportManifestStats; + dataChecksums: Array<{ file: string; sha256: string }>; + fileChecksums: Array<{ file: string; sha256: string }>; +}): ExportManifest { + return { + schemaVersion: params.schemaVersion, + exportedAt: params.exportedAt.toISOString(), + exportCutoffAt: params.exportCutoffAt.toISOString(), + userId: params.userId, + maskPolicy: params.maskPolicy, + stats: params.stats, + checksums: { + data: params.dataChecksums, + files: params.fileChecksums, + }, + }; +} diff --git a/server/service/export/jobs.test.ts b/server/service/export/jobs.test.ts new file mode 100644 index 0000000..347ddd2 --- /dev/null +++ b/server/service/export/jobs.test.ts @@ -0,0 +1,153 @@ +import { beforeAll, beforeEach, describe, expect, mock, test } from "bun:test"; +import { eq, inArray } from "drizzle-orm"; + +process.env.DATABASE_URL ??= "file:./packages/drizzle-pkg/db.sqlite"; + +const { dbGlobal } = await import("drizzle-pkg/database/sqlite/db-bun"); +mock.module("drizzle-pkg/lib/db", () => ({ dbGlobal })); + +const { users } = await import("drizzle-pkg/lib/schema/auth"); +const { userExportTasks } = await import("drizzle-pkg/lib/schema/export"); +const { + claimNextQueuedTask, + createExportTask, + getExportTaskForUser, + listExportTasksByUser, + markExportTaskFailed, + markExportTaskRunning, + markExportTaskSucceeded, +} = await import("./jobs"); + +const USER_1 = { id: 910001, username: "export_jobs_u1", password: "pw1" }; +const USER_2 = { id: 910002, username: "export_jobs_u2", password: "pw2" }; +const TEST_USERS: Array<{ id: number; username: string; password: string }> = [USER_1, USER_2]; + +async function resetRows() { + const ids = [USER_1.id, USER_2.id]; + await dbGlobal.delete(userExportTasks).where(inArray(userExportTasks.userId, ids)); + await dbGlobal.delete(users).where(inArray(users.id, ids)); +} + +describe("export jobs service", () => { + beforeAll(async () => { + await resetRows(); + }); + + beforeEach(async () => { + await resetRows(); + await dbGlobal.insert(users).values(TEST_USERS); + }); + + test("createExportTask creates queued task with given policy", async () => { + const task = await createExportTask({ userId: USER_1.id, maskPolicy: "masked" }); + + expect(task).toBeTruthy(); + expect(task.userId).toBe(USER_1.id); + expect(task.status).toBe("queued"); + expect(task.maskPolicy).toBe("masked"); + expect(task.exportCutoffAt).toBeNull(); + }); + + test("markExportTaskRunning writes exportCutoffAt timestamp", async () => { + const task = await createExportTask({ userId: USER_1.id, maskPolicy: "raw" }); + + const running = await markExportTaskRunning(task.id); + expect(running).toBeTruthy(); + expect(running?.status).toBe("running"); + expect(running?.exportCutoffAt).toBeInstanceOf(Date); + + const [fromDb] = await dbGlobal + .select() + .from(userExportTasks) + .where(eq(userExportTasks.id, task.id)) + .limit(1); + expect(fromDb?.exportCutoffAt).toBeInstanceOf(Date); + }); + + test("listExportTasksByUser only returns the requested user tasks", async () => { + const firstU1 = await createExportTask({ userId: USER_1.id, maskPolicy: "masked" }); + await markExportTaskRunning(firstU1.id); + await markExportTaskSucceeded(firstU1.id, { + outputDir: "/tmp/export-u1-1", + outputName: "export-u1-1.zip", + totalBytes: 128, + expiresAt: new Date("2026-04-24T00:00:00.000Z"), + }); + await createExportTask({ userId: USER_2.id, maskPolicy: "raw" }); + await createExportTask({ userId: USER_1.id, maskPolicy: "raw" }); + + const rows = await listExportTasksByUser(USER_1.id); + expect(rows.length).toBe(2); + expect(rows.every((row) => row.userId === USER_1.id)).toBe(true); + expect(rows[0]!.id).toBeGreaterThan(rows[1]!.id); + }); + + test("markExportTaskSucceeded and markExportTaskFailed persist payload fields", async () => { + const task1 = await createExportTask({ userId: USER_1.id, maskPolicy: "masked" }); + await markExportTaskRunning(task1.id); + + await markExportTaskSucceeded(task1.id, { + outputDir: "/tmp/export-1", + outputName: "export-1.zip", + totalBytes: 1234, + expiresAt: new Date("2026-04-24T00:00:00.000Z"), + }); + const task2 = await createExportTask({ userId: USER_1.id, maskPolicy: "raw" }); + await markExportTaskRunning(task2.id); + await markExportTaskFailed(task2.id, { + errorCode: "EXPORT_IO", + errorMessage: "disk full", + }); + + const row1 = await getExportTaskForUser(task1.id, USER_1.id); + const row2 = await getExportTaskForUser(task2.id, USER_1.id); + expect(row1?.status).toBe("succeeded"); + expect(row1?.outputName).toBe("export-1.zip"); + expect(row2?.status).toBe("failed"); + expect(row2?.errorCode).toBe("EXPORT_IO"); + }); + + test("invalid status transition is rejected", async () => { + const task = await createExportTask({ userId: USER_1.id, maskPolicy: "masked" }); + await expect( + markExportTaskSucceeded(task.id, { + outputDir: "/tmp/export-invalid", + outputName: "export-invalid.zip", + totalBytes: 1, + expiresAt: new Date("2026-04-24T00:00:00.000Z"), + }), + ).rejects.toThrow("invalid export task transition"); + }); + + test("getExportTaskForUser enforces ownership", async () => { + const task = await createExportTask({ userId: USER_1.id, maskPolicy: "masked" }); + + const own = await getExportTaskForUser(task.id, USER_1.id); + const other = await getExportTaskForUser(task.id, USER_2.id); + expect(own?.id).toBe(task.id); + expect(other).toBeNull(); + }); + + test("claimNextQueuedTask returns null when no queued task", async () => { + const task = await claimNextQueuedTask(); + expect(task).toBeNull(); + }); + + test("claimNextQueuedTask claims the earliest queued task once", async () => { + const first = await createExportTask({ userId: USER_1.id, maskPolicy: "masked" }); + await createExportTask({ userId: USER_2.id, maskPolicy: "raw" }); + + const claimed = await claimNextQueuedTask(); + const nextClaimed = await claimNextQueuedTask(); + expect(claimed?.id).toBe(first.id); + expect(claimed?.status).toBe("running"); + expect(nextClaimed?.id).not.toBe(first.id); + }); + + test("createExportTask rejects when user already has active task", async () => { + await createExportTask({ userId: USER_1.id, maskPolicy: "masked" }); + await expect(createExportTask({ userId: USER_1.id, maskPolicy: "raw" })).rejects.toMatchObject({ + statusCode: 409, + }); + }); +}); diff --git a/server/service/export/jobs.ts b/server/service/export/jobs.ts new file mode 100644 index 0000000..f7cbceb --- /dev/null +++ b/server/service/export/jobs.ts @@ -0,0 +1,155 @@ +import { dbGlobal } from "drizzle-pkg/lib/db"; +import { userExportTasks } from "drizzle-pkg/lib/schema/export"; +import { and, desc, eq, or } from "drizzle-orm"; +import { createError } from "h3"; +import { nextIntegerId } from "../../utils/sqlite-id"; + +type ExportMaskPolicy = "masked" | "raw"; + +async function getExportTaskById(taskId: number) { + const [row] = await dbGlobal + .select() + .from(userExportTasks) + .where(eq(userExportTasks.id, taskId)) + .limit(1); + return row ?? null; +} + +async function getRequiredExportTaskById(taskId: number) { + const row = await getExportTaskById(taskId); + if (!row) { + throw new Error(`export task not found: ${taskId}`); + } + return row; +} + +export async function createExportTask(params: { userId: number; maskPolicy: ExportMaskPolicy }) { + const [activeTask] = await dbGlobal + .select({ id: userExportTasks.id }) + .from(userExportTasks) + .where( + and( + eq(userExportTasks.userId, params.userId), + or(eq(userExportTasks.status, "queued"), eq(userExportTasks.status, "running")), + ), + ) + .limit(1); + if (activeTask) { + throw createError({ statusCode: 409, statusMessage: "已有导出任务在处理中,请稍后再试" }); + } + const id = await nextIntegerId(userExportTasks, userExportTasks.id); + await dbGlobal.insert(userExportTasks).values({ + id, + userId: params.userId, + maskPolicy: params.maskPolicy, + status: "queued", + }); + return getRequiredExportTaskById(id); +} + +export async function listExportTasksByUser(userId: number) { + return dbGlobal + .select() + .from(userExportTasks) + .where(eq(userExportTasks.userId, userId)) + .orderBy(desc(userExportTasks.id)); +} + +export async function markExportTaskRunning(taskId: number) { + const cutoffAt = new Date(); + await dbGlobal + .update(userExportTasks) + .set({ + status: "running", + exportCutoffAt: cutoffAt, + }) + .where(and(eq(userExportTasks.id, taskId), eq(userExportTasks.status, "queued"))); + const row = await getRequiredExportTaskById(taskId); + if (row.status !== "running" || row.exportCutoffAt?.getTime() !== cutoffAt.getTime()) { + throw new Error(`invalid export task transition for ${taskId}: expected queued -> running`); + } + return row; +} + +export async function claimNextQueuedTask() { + for (let i = 0; i < 5; i += 1) { + const [queued] = await dbGlobal + .select({ id: userExportTasks.id }) + .from(userExportTasks) + .where(eq(userExportTasks.status, "queued")) + .orderBy(userExportTasks.id) + .limit(1); + if (!queued) { + return null; + } + + try { + return await markExportTaskRunning(queued.id); + } catch (error) { + const message = error instanceof Error ? error.message : ""; + if (!message.includes("invalid export task transition")) { + throw error; + } + } + } + return null; +} + +export async function markExportTaskSucceeded( + taskId: number, + payload: { + outputDir: string; + outputName: string; + totalBytes: number; + expiresAt: Date; + }, +) { + await dbGlobal + .update(userExportTasks) + .set({ + status: "succeeded", + outputDir: payload.outputDir, + outputName: payload.outputName, + totalBytes: payload.totalBytes, + expiresAt: payload.expiresAt, + errorCode: null, + errorMessage: null, + }) + .where(and(eq(userExportTasks.id, taskId), eq(userExportTasks.status, "running"))); + const row = await getRequiredExportTaskById(taskId); + if (row.status !== "succeeded") { + throw new Error(`invalid export task transition for ${taskId}: expected running -> succeeded`); + } + return row; +} + +export async function markExportTaskFailed( + taskId: number, + payload: { + errorCode: string; + errorMessage: string; + }, +) { + await dbGlobal + .update(userExportTasks) + .set({ + status: "failed", + errorCode: payload.errorCode, + errorMessage: payload.errorMessage, + }) + .where(and(eq(userExportTasks.id, taskId), eq(userExportTasks.status, "running"))); + const row = await getRequiredExportTaskById(taskId); + if (row.status !== "failed") { + throw new Error(`invalid export task transition for ${taskId}: expected running -> failed`); + } + return row; +} + +export async function getExportTaskForUser(taskId: number, userId: number) { + const [row] = await dbGlobal + .select() + .from(userExportTasks) + .where(and(eq(userExportTasks.id, taskId), eq(userExportTasks.userId, userId))) + .limit(1); + return row ?? null; +} diff --git a/server/service/export/run.ts b/server/service/export/run.ts new file mode 100644 index 0000000..fba9fa7 --- /dev/null +++ b/server/service/export/run.ts @@ -0,0 +1,103 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { buildExportDataJson } from "#server/service/export/build-data"; +import { buildExportMediaFiles } from "#server/service/export/build-files"; +import { buildExportManifest } from "#server/service/export/build-manifest"; +import { + markExportTaskFailed, + markExportTaskRunning, + markExportTaskSucceeded, +} from "#server/service/export/jobs"; +import { sha256File } from "#server/utils/export-hash"; + +const EXPORT_RESULT_TTL_MS = 24 * 60 * 60 * 1000; + +function resolveExportRootDir() { + return path.resolve(process.cwd(), ".tmp", "exports"); +} + +async function calcChecksums(baseDir: string, files: Array<{ file: string }>) { + return Promise.all( + files.map(async (item) => ({ + file: item.file, + sha256: await sha256File(path.resolve(baseDir, item.file)), + })), + ); +} + +export async function runExportTask(taskId: number) { + const runningTask = await markExportTaskRunning(taskId); + return runExportTaskWithRunning(taskId, runningTask); +} + +export async function runExportTaskWithRunning( + taskId: number, + runningTask: Awaited>, +) { + const cutoffAt = runningTask.exportCutoffAt ?? new Date(); + const exportRootDir = resolveExportRootDir(); + const outputName = `export-task-${taskId}`; + const outputDir = path.resolve(exportRootDir, outputName); + const outputDataDir = path.resolve(outputDir, "data"); + const outputFilesDir = path.resolve(outputDir, "files"); + + try { + await fs.mkdir(outputDir, { recursive: true }); + + const dataResult = await buildExportDataJson({ + userId: runningTask.userId, + maskPolicy: runningTask.maskPolicy as "masked" | "raw", + cutoffAt, + outputDataDir, + }); + const filesResult = await buildExportMediaFiles({ + userId: runningTask.userId, + cutoffAt, + outputFilesDir, + }); + + const dataChecksums = await calcChecksums(outputDir, dataResult.dataFiles); + const fileChecksums = await calcChecksums(outputDir, filesResult.files); + const filesBytes = filesResult.files.reduce((sum, item) => sum + item.bytes, 0); + + const manifest = buildExportManifest({ + schemaVersion: 1, + userId: runningTask.userId, + maskPolicy: runningTask.maskPolicy as "masked" | "raw", + exportedAt: new Date(), + exportCutoffAt: cutoffAt, + stats: { + dataRows: dataResult.totalRows, + files: filesResult.files.length, + bytes: filesBytes, + }, + dataChecksums, + fileChecksums, + }); + + const manifestPath = path.resolve(outputDir, "manifest.json"); + await fs.writeFile(manifestPath, `${JSON.stringify(manifest, null, 2)}\n`, "utf8"); + + const dataBytes = await Promise.all( + dataResult.dataFiles.map(async (item) => { + const stat = await fs.stat(path.resolve(outputDir, item.file)); + return stat.size; + }), + ); + const manifestBytes = (await fs.stat(manifestPath)).size; + const totalBytes = dataBytes.reduce((sum, size) => sum + size, 0) + filesBytes + manifestBytes; + + await markExportTaskSucceeded(taskId, { + outputDir, + outputName, + totalBytes, + expiresAt: new Date(Date.now() + EXPORT_RESULT_TTL_MS), + }); + } catch (error) { + await markExportTaskFailed(taskId, { + errorCode: "EXPORT_BUILD_FAILED", + errorMessage: error instanceof Error ? error.message : "unknown export error", + }); + throw error; + } +} diff --git a/server/tasks/export/process.ts b/server/tasks/export/process.ts new file mode 100644 index 0000000..628b2ce --- /dev/null +++ b/server/tasks/export/process.ts @@ -0,0 +1,17 @@ +import { claimNextQueuedTask } from "#server/service/export/jobs"; +import { runExportTaskWithRunning } from "#server/service/export/run"; + +export default defineTask({ + meta: { + name: "export:process", + description: "Claim and process one queued export task", + }, + async run() { + const task = await claimNextQueuedTask(); + if (!task) { + return { result: "skipped: no queued task" }; + } + await runExportTaskWithRunning(task.id, task); + return { result: "ok", taskId: task.id }; + }, +}); diff --git a/server/utils/export-hash.ts b/server/utils/export-hash.ts new file mode 100644 index 0000000..bb1dbe1 --- /dev/null +++ b/server/utils/export-hash.ts @@ -0,0 +1,13 @@ +import fs from "node:fs"; +import { createHash } from "node:crypto"; + +export async function sha256File(filePath: string): Promise { + return new Promise((resolve, reject) => { + const hash = createHash("sha256"); + const stream = fs.createReadStream(filePath); + + stream.on("error", reject); + stream.on("data", (chunk) => hash.update(chunk)); + stream.on("end", () => resolve(hash.digest("hex"))); + }); +} diff --git a/server/utils/export-mask.ts b/server/utils/export-mask.ts new file mode 100644 index 0000000..985a111 --- /dev/null +++ b/server/utils/export-mask.ts @@ -0,0 +1,62 @@ +import { + DEFAULT_MASK_FIELD_KEYWORDS, + NEVER_EXPORT_FIELD_EXACT_NAMES, + type ExportMaskPolicy, +} from "../constants/export"; + +function hitKeyword(field: string, keywords: readonly string[]): boolean { + const name = field.toLowerCase(); + return keywords.some((k) => name.includes(k.toLowerCase())); +} + +function normalizeFieldName(field: string): string { + return field.toLowerCase().replace(/[_-]/g, ""); +} + +function shouldNeverExportField(field: string): boolean { + const normalizedField = normalizeFieldName(field); + return NEVER_EXPORT_FIELD_EXACT_NAMES.includes( + normalizedField as (typeof NEVER_EXPORT_FIELD_EXACT_NAMES)[number], + ); +} + +function shouldMaskField(field: string): boolean { + return hitKeyword(field, DEFAULT_MASK_FIELD_KEYWORDS); +} + +function maskValue(value: unknown): unknown { + if (value === null || value === undefined) { + return value; + } + if (typeof value === "string") { + return "***"; + } + if (typeof value === "number") { + return 0; + } + if (typeof value === "boolean") { + return false; + } + return "[masked]"; +} + +export function sanitizeUserForExport( + row: Record, + maskPolicy: ExportMaskPolicy, +): Record { + const out: Record = {}; + for (const [field, value] of Object.entries(row)) { + if (shouldNeverExportField(field)) { + continue; + } + if (maskPolicy === "masked" && shouldMaskField(field)) { + out[field] = maskValue(value); + continue; + } + out[field] = value; + } + return out; +} + +// Backward-compatible alias used by current tests/callers. +export const applyExportMask = sanitizeUserForExport; diff --git a/server/utils/me-export-request-body.test.ts b/server/utils/me-export-request-body.test.ts new file mode 100644 index 0000000..47f31d1 --- /dev/null +++ b/server/utils/me-export-request-body.test.ts @@ -0,0 +1,82 @@ +import { describe, expect, test } from "bun:test"; + +import { applyExportMask } from "./export-mask"; +import { parseMeExportRequestBody } from "./me-export-request-body"; + +describe("parseMeExportRequestBody", () => { + test("maskPolicy defaults to masked", () => { + expect(parseMeExportRequestBody({})).toEqual({ maskPolicy: "masked" }); + expect(parseMeExportRequestBody({ maskPolicy: undefined })).toEqual({ maskPolicy: "masked" }); + }); + + test("invalid maskPolicy throws 400", () => { + try { + parseMeExportRequestBody({ maskPolicy: "invalid-policy" }); + expect.unreachable(); + } catch (e: unknown) { + expect(e).toMatchObject({ statusCode: 400 }); + } + }); +}); + +describe("applyExportMask", () => { + test("never-export fields are excluded", () => { + const input = { + nickname: "dash", + passwordHash: "hash", + resetToken: "rst", + sessionId: "sid", + apiKey: "k", + accessToken: "token", + tokenCount: 3, + }; + + expect(applyExportMask(input, "raw")).toEqual({ + nickname: "dash", + tokenCount: 3, + }); + }); + + test("never-export fields match case and underscore variants", () => { + const input = { + nickname: "dash", + Access_Token: "token", + SESSION_ID: "sid", + }; + + expect(applyExportMask(input, "raw")).toEqual({ + nickname: "dash", + }); + }); + + test("raw policy still filters resetToken/apiKey and naming variants", () => { + const input = { + nickname: "dash", + resetToken: "rst-1", + reset_token: "rst-2", + resetPasswordToken: "rst-3", + apiKey: "key-1", + API_KEY: "key-2", + email: "dash@example.com", + }; + + expect(applyExportMask(input, "raw")).toEqual({ + nickname: "dash", + email: "dash@example.com", + }); + }); + + test("masked policy masks email and phone fields", () => { + const input = { + nickname: "dash", + email: "dash@example.com", + phone: "13800138000", + }; + + expect(applyExportMask(input, "masked")).toEqual({ + nickname: "dash", + email: "***", + phone: "***", + }); + }); +}); diff --git a/server/utils/me-export-request-body.ts b/server/utils/me-export-request-body.ts new file mode 100644 index 0000000..d12ded3 --- /dev/null +++ b/server/utils/me-export-request-body.ts @@ -0,0 +1,20 @@ +import { EXPORT_MASK_POLICIES, type ExportMaskPolicy } from "../constants/export"; +import { createError } from "h3"; + +type MeExportRequestBody = { + maskPolicy?: unknown; +}; + +export function parseMeExportRequestBody(body: unknown): { maskPolicy: ExportMaskPolicy } { + const payload = (body && typeof body === "object" ? body : {}) as MeExportRequestBody; + const raw = payload.maskPolicy; + if (raw === undefined || raw === null || raw === "") { + return { maskPolicy: "masked" }; + } + + if (typeof raw !== "string" || !EXPORT_MASK_POLICIES.includes(raw as ExportMaskPolicy)) { + throw createError({ statusCode: 400, statusMessage: "maskPolicy 非法,仅支持 masked|raw" }); + } + + return { maskPolicy: raw as ExportMaskPolicy }; +}