import { dbGlobal } from "drizzle-pkg/lib/db"; import { rssFeeds, rssItems } from "drizzle-pkg/lib/schema/rss"; import { users } from "drizzle-pkg/lib/schema/auth"; import { and, desc, eq } from "drizzle-orm"; import { XMLParser } from "fast-xml-parser"; import { assertSafeRssUrl } from "#server/utils/rss-url"; import { RssUrlUnsafeError } from "#server/utils/rss-url"; import { nextIntegerId } from "#server/utils/sqlite-id"; import { visibilityShareToken } from "#server/utils/share-token"; const DEFAULT_POLL_MINUTES = Number(process.env.RSS_SYNC_INTERVAL_MINUTES ?? 60); const FETCH_TIMEOUT_MS = Number(process.env.RSS_FETCH_TIMEOUT_MS ?? 15_000); const MAX_BODY_BYTES = Number(process.env.RSS_MAX_RESPONSE_BYTES ?? 2_000_000); export async function listFeedsForUser(userId: number) { return dbGlobal.select().from(rssFeeds).where(eq(rssFeeds.userId, userId)).orderBy(desc(rssFeeds.id)); } export async function addFeed(userId: number, feedUrl: string) { const url = assertSafeRssUrl(feedUrl.trim()); const [dup] = await dbGlobal .select({ id: rssFeeds.id }) .from(rssFeeds) .where(and(eq(rssFeeds.userId, userId), eq(rssFeeds.feedUrl, url))) .limit(1); if (dup) { throw createError({ statusCode: 409, statusMessage: "该订阅地址已存在" }); } const id = await nextIntegerId(rssFeeds, rssFeeds.id); await dbGlobal.insert(rssFeeds).values({ id, userId, feedUrl: url, }); const [row] = await dbGlobal.select().from(rssFeeds).where(eq(rssFeeds.id, id)).limit(1); return row; } export async function deleteFeed(userId: number, feedId: number) { const [row] = await dbGlobal .select({ id: rssFeeds.id }) .from(rssFeeds) .where(and(eq(rssFeeds.id, feedId), eq(rssFeeds.userId, userId))) .limit(1); if (!row) { return false; } await dbGlobal.delete(rssFeeds).where(and(eq(rssFeeds.id, feedId), eq(rssFeeds.userId, userId))); return true; } export async function listItemsForUser(userId: number, feedId?: number) { const cond = feedId !== undefined ? and(eq(rssItems.userId, userId), eq(rssItems.feedId, feedId)) : eq(rssItems.userId, userId); return dbGlobal.select().from(rssItems).where(cond).orderBy(desc(rssItems.publishedAt), desc(rssItems.id)).limit(500); } export async function updateItemVisibility(userId: number, itemId: number, visibility: "private" | "unlisted" | "public") { const [existing] = await dbGlobal .select() .from(rssItems) .where(and(eq(rssItems.id, itemId), eq(rssItems.userId, userId))) .limit(1); if (!existing) { return null; } const shareToken = visibilityShareToken(visibility, existing.shareToken); await dbGlobal .update(rssItems) .set({ visibility, shareToken }) .where(and(eq(rssItems.id, itemId), eq(rssItems.userId, userId))); const [row] = await dbGlobal .select() .from(rssItems) .where(and(eq(rssItems.id, itemId), eq(rssItems.userId, userId))) .limit(1); return row ?? null; } export async function listPublicRssItemsBySlug(publicSlug: string) { const rows = await dbGlobal .select({ it: rssItems }) .from(rssItems) .innerJoin(users, eq(rssItems.userId, users.id)) .where( and( eq(users.publicSlug, publicSlug), eq(users.status, "active"), eq(rssItems.visibility, "public"), ), ) .orderBy(desc(rssItems.publishedAt), desc(rssItems.id)) .limit(200); return rows.map((r) => r.it); } export async function getUnlistedRssItem(publicSlug: string, shareToken: string) { const [row] = await dbGlobal .select({ it: rssItems }) .from(rssItems) .innerJoin(users, eq(rssItems.userId, users.id)) .where( and( eq(users.publicSlug, publicSlug), eq(users.status, "active"), eq(rssItems.visibility, "unlisted"), eq(rssItems.shareToken, shareToken), ), ) .limit(1); return row?.it ?? null; } type ParsedItem = { guid?: string; canonicalUrl: string; title?: string; summary?: string; contentSnippet?: string; author?: string; publishedAt?: Date; }; function normalizeLink(link: unknown): string | undefined { if (typeof link === "string") { return link; } if (link && typeof link === "object" && "@_href" in link) { return String((link as { "@_href": string })["@_href"]); } return undefined; } function parseFeedXml(xml: string): { title?: string; siteUrl?: string; items: ParsedItem[] } { const parser = new XMLParser({ ignoreAttributes: false, attributeNamePrefix: "@_", isArray: (name) => name === "item" || name === "entry", }); const doc = parser.parse(xml) as Record; if (doc.rss && typeof doc.rss === "object") { const channel = (doc.rss as { channel?: Record }).channel; if (!channel) { return { items: [] }; } const rawItems = channel.item; const itemsArr = Array.isArray(rawItems) ? rawItems : rawItems ? [rawItems] : []; const items: ParsedItem[] = []; for (const it of itemsArr as Record[]) { const link = normalizeLink(it.link) ?? normalizeLink(it.guid); if (!link) { continue; } const guid = typeof it.guid === "string" ? it.guid : typeof it.guid === "object" && it.guid && "#text" in (it.guid as object) ? String((it.guid as { "#text": string })["#text"]) : undefined; let pub: Date | undefined; if (typeof it.pubDate === "string") { const d = new Date(it.pubDate); if (!Number.isNaN(d.getTime())) { pub = d; } } items.push({ guid, canonicalUrl: link, title: typeof it.title === "string" ? it.title : undefined, summary: typeof it.description === "string" ? it.description.slice(0, 4000) : typeof it["content:encoded"] === "string" ? String(it["content:encoded"]).slice(0, 4000) : undefined, contentSnippet: typeof it.description === "string" ? it.description.slice(0, 500) : undefined, author: typeof it.author === "string" ? it.author : undefined, publishedAt: pub, }); } return { title: typeof channel.title === "string" ? channel.title : undefined, siteUrl: typeof channel.link === "string" ? channel.link : undefined, items, }; } if (doc.feed && typeof doc.feed === "object") { const feed = doc.feed as Record; const rawEntries = feed.entry; const entries = Array.isArray(rawEntries) ? rawEntries : rawEntries ? [rawEntries] : []; const items: ParsedItem[] = []; for (const en of entries as Record[]) { const link = normalizeLink(en.link) ?? normalizeLink(en.id); if (!link) { continue; } const guid = typeof en.id === "string" ? en.id : undefined; let pub: Date | undefined; if (typeof en.updated === "string") { const d = new Date(en.updated); if (!Number.isNaN(d.getTime())) { pub = d; } } items.push({ guid, canonicalUrl: link, title: typeof en.title === "string" ? en.title : typeof en.title === "object" && en.title && "#text" in (en.title as object) ? String((en.title as { "#text": string })["#text"]) : undefined, summary: typeof en.summary === "string" ? en.summary.slice(0, 4000) : undefined, contentSnippet: typeof en.summary === "string" ? en.summary.slice(0, 500) : undefined, author: typeof en.author === "string" ? en.author : typeof en.author === "object" && en.author && "name" in (en.author as object) ? String((en.author as { name?: string }).name) : undefined, publishedAt: pub, }); } return { title: typeof feed.title === "string" ? feed.title : typeof feed.title === "object" && feed.title && "#text" in (feed.title as object) ? String((feed.title as { "#text": string })["#text"]) : undefined, siteUrl: typeof feed.id === "string" ? feed.id : undefined, items, }; } return { items: [] }; } async function readResponseBody(res: Response): Promise { const reader = res.body?.getReader(); if (!reader) { return await res.text(); } const chunks: Uint8Array[] = []; let total = 0; while (true) { const { done, value } = await reader.read(); if (done) { break; } if (value) { total += value.length; if (total > MAX_BODY_BYTES) { reader.cancel(); throw new Error("响应过大"); } chunks.push(value); } } const all = new Uint8Array(total); let off = 0; for (const c of chunks) { all.set(c, off); off += c.length; } return new TextDecoder().decode(all); } export async function syncFeed(feedId: number): Promise<{ ok: boolean; error?: string }> { const [feed] = await dbGlobal.select().from(rssFeeds).where(eq(rssFeeds.id, feedId)).limit(1); if (!feed) { return { ok: false, error: "feed not found" }; } try { assertSafeRssUrl(feed.feedUrl); } catch (e) { const msg = e instanceof RssUrlUnsafeError ? e.message : "unsafe url"; await dbGlobal .update(rssFeeds) .set({ lastError: msg, lastFetchedAt: new Date() }) .where(eq(rssFeeds.id, feedId)); return { ok: false, error: msg }; } const ac = new AbortController(); const t = setTimeout(() => ac.abort(), FETCH_TIMEOUT_MS); try { const res = await fetch(feed.feedUrl, { signal: ac.signal, headers: { "User-Agent": "PersonPanelRSS/1.0" }, redirect: "follow", }); if (!res.ok) { throw new Error(`HTTP ${res.status}`); } const xml = await readResponseBody(res); const parsed = parseFeedXml(xml); await dbGlobal .update(rssFeeds) .set({ title: parsed.title ?? feed.title, siteUrl: parsed.siteUrl ?? feed.siteUrl, lastFetchedAt: new Date(), lastError: null, }) .where(eq(rssFeeds.id, feedId)); for (const it of parsed.items) { const dedupeCond = it.guid ? and(eq(rssItems.feedId, feedId), eq(rssItems.guid, it.guid)) : and(eq(rssItems.feedId, feedId), eq(rssItems.canonicalUrl, it.canonicalUrl)); const existing = await dbGlobal .select({ id: rssItems.id }) .from(rssItems) .where(dedupeCond) .limit(1); if (existing.length > 0) { continue; } const id = await nextIntegerId(rssItems, rssItems.id); await dbGlobal.insert(rssItems).values({ id, userId: feed.userId, feedId, guid: it.guid ?? null, canonicalUrl: it.canonicalUrl, title: it.title ?? null, summary: it.summary ?? null, contentSnippet: it.contentSnippet ?? null, author: it.author ?? null, publishedAt: it.publishedAt ?? null, visibility: "private", shareToken: null, }); } return { ok: true }; } catch (e) { const msg = e instanceof Error ? e.message : String(e); await dbGlobal .update(rssFeeds) .set({ lastError: msg, lastFetchedAt: new Date() }) .where(eq(rssFeeds.id, feedId)); return { ok: false, error: msg }; } finally { clearTimeout(t); } } export async function syncDueFeedsForUser(userId: number) { const feeds = await listFeedsForUser(userId); const now = Date.now(); for (const f of feeds) { const intervalMs = (f.pollIntervalMinutes ?? DEFAULT_POLL_MINUTES) * 60_000; const last = f.lastFetchedAt ? f.lastFetchedAt.getTime() : 0; if (now - last >= intervalMs) { await syncFeed(f.id); } } } export async function syncAllDueFeeds() { const allFeeds = await dbGlobal.select().from(rssFeeds); const now = Date.now(); for (const f of allFeeds) { const intervalMs = (f.pollIntervalMinutes ?? DEFAULT_POLL_MINUTES) * 60_000; const last = f.lastFetchedAt ? f.lastFetchedAt.getTime() : 0; if (now - last >= intervalMs) { await syncFeed(f.id); } } }