You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

444 lines
14 KiB

import { dbGlobal } from "drizzle-pkg/lib/db";
import { rssFeeds, rssItems } from "drizzle-pkg/lib/schema/rss";
import { users } from "drizzle-pkg/lib/schema/auth";
import { PUBLIC_LIST_PAGE_SIZE, PUBLIC_PREVIEW_LIMIT } from "#server/constants/public-profile-lists";
import { normalizePublicListPage } from "#server/utils/public-pagination";
import { and, count, desc, eq } from "drizzle-orm";
import { XMLParser } from "fast-xml-parser";
import { assertSafeRssUrl } from "#server/utils/rss-url";
import { RssUrlUnsafeError } from "#server/utils/rss-url";
import { nextIntegerId } from "#server/utils/sqlite-id";
import { visibilityShareToken } from "#server/utils/share-token";
const DEFAULT_POLL_MINUTES = Number(process.env.RSS_SYNC_INTERVAL_MINUTES ?? 60);
const FETCH_TIMEOUT_MS = Number(process.env.RSS_FETCH_TIMEOUT_MS ?? 15_000);
const MAX_BODY_BYTES = Number(process.env.RSS_MAX_RESPONSE_BYTES ?? 2_000_000);
export type MeRssFeedDto = {
id: number;
feedUrl: string;
title: string | null;
siteUrl: string | null;
lastError: string | null;
lastFetchedAt: string | null;
pollIntervalMinutes: number;
nextSyncAt: string | null;
};
export function toMeRssFeedDto(feed: (typeof rssFeeds.$inferSelect)): MeRssFeedDto {
const poll = feed.pollIntervalMinutes ?? DEFAULT_POLL_MINUTES;
const last = feed.lastFetchedAt;
const nextSyncAt = last ? new Date(last.getTime() + poll * 60_000).toISOString() : null;
return {
id: feed.id,
feedUrl: feed.feedUrl,
title: feed.title,
siteUrl: feed.siteUrl,
lastError: feed.lastError,
lastFetchedAt: last?.toISOString() ?? null,
pollIntervalMinutes: poll,
nextSyncAt,
};
}
export function meRssInboxSyncMeta() {
return { serverCheckIntervalMinutes: DEFAULT_POLL_MINUTES };
}
export async function getFeedForUser(userId: number, feedId: number) {
const [row] = await dbGlobal
.select()
.from(rssFeeds)
.where(and(eq(rssFeeds.id, feedId), eq(rssFeeds.userId, userId)))
.limit(1);
return row ?? null;
}
export async function listFeedsForUser(userId: number) {
return dbGlobal.select().from(rssFeeds).where(eq(rssFeeds.userId, userId)).orderBy(desc(rssFeeds.id));
}
export async function addFeed(userId: number, feedUrl: string) {
const url = assertSafeRssUrl(feedUrl.trim());
const [dup] = await dbGlobal
.select({ id: rssFeeds.id })
.from(rssFeeds)
.where(and(eq(rssFeeds.userId, userId), eq(rssFeeds.feedUrl, url)))
.limit(1);
if (dup) {
throw createError({ statusCode: 409, statusMessage: "该订阅地址已存在" });
}
const id = await nextIntegerId(rssFeeds, rssFeeds.id);
await dbGlobal.insert(rssFeeds).values({
id,
userId,
feedUrl: url,
});
const [row] = await dbGlobal.select().from(rssFeeds).where(eq(rssFeeds.id, id)).limit(1);
return row;
}
export async function deleteFeed(userId: number, feedId: number) {
const [row] = await dbGlobal
.select({ id: rssFeeds.id })
.from(rssFeeds)
.where(and(eq(rssFeeds.id, feedId), eq(rssFeeds.userId, userId)))
.limit(1);
if (!row) {
return false;
}
await dbGlobal.delete(rssFeeds).where(and(eq(rssFeeds.id, feedId), eq(rssFeeds.userId, userId)));
return true;
}
export async function listItemsForUser(userId: number, feedId?: number) {
const cond = feedId !== undefined ? and(eq(rssItems.userId, userId), eq(rssItems.feedId, feedId)) : eq(rssItems.userId, userId);
return dbGlobal.select().from(rssItems).where(cond).orderBy(desc(rssItems.publishedAt), desc(rssItems.id)).limit(500);
}
export async function updateItemVisibility(userId: number, itemId: number, visibility: "private" | "unlisted" | "public") {
const [existing] = await dbGlobal
.select()
.from(rssItems)
.where(and(eq(rssItems.id, itemId), eq(rssItems.userId, userId)))
.limit(1);
if (!existing) {
return null;
}
const shareToken = visibilityShareToken(visibility, existing.shareToken);
await dbGlobal
.update(rssItems)
.set({ visibility, shareToken })
.where(and(eq(rssItems.id, itemId), eq(rssItems.userId, userId)));
const [row] = await dbGlobal
.select()
.from(rssItems)
.where(and(eq(rssItems.id, itemId), eq(rssItems.userId, userId)))
.limit(1);
return row ?? null;
}
function publicRssListWhere(publicSlug: string) {
return and(
eq(users.publicSlug, publicSlug),
eq(users.status, "active"),
eq(rssItems.visibility, "public"),
);
}
export async function getPublicRssPreviewBySlug(publicSlug: string) {
const whereClause = publicRssListWhere(publicSlug);
const [countRows, rows] = await Promise.all([
dbGlobal
.select({ total: count() })
.from(rssItems)
.innerJoin(users, eq(rssItems.userId, users.id))
.where(whereClause),
dbGlobal
.select({ it: rssItems })
.from(rssItems)
.innerJoin(users, eq(rssItems.userId, users.id))
.where(whereClause)
.orderBy(desc(rssItems.publishedAt), desc(rssItems.id))
.limit(PUBLIC_PREVIEW_LIMIT),
]);
return { items: rows.map((r) => r.it), total: countRows[0]?.total ?? 0 };
}
export async function getPublicRssPageBySlug(publicSlug: string, pageRaw: unknown) {
const page = normalizePublicListPage(pageRaw);
const pageSize = PUBLIC_LIST_PAGE_SIZE;
const offset = (page - 1) * pageSize;
const whereClause = publicRssListWhere(publicSlug);
const [countRows, rows] = await Promise.all([
dbGlobal
.select({ total: count() })
.from(rssItems)
.innerJoin(users, eq(rssItems.userId, users.id))
.where(whereClause),
dbGlobal
.select({ it: rssItems })
.from(rssItems)
.innerJoin(users, eq(rssItems.userId, users.id))
.where(whereClause)
.orderBy(desc(rssItems.publishedAt), desc(rssItems.id))
.limit(pageSize)
.offset(offset),
]);
return { items: rows.map((r) => r.it), total: countRows[0]?.total ?? 0, page, pageSize };
}
export async function getUnlistedRssItem(publicSlug: string, shareToken: string) {
const [row] = await dbGlobal
.select({ it: rssItems })
.from(rssItems)
.innerJoin(users, eq(rssItems.userId, users.id))
.where(
and(
eq(users.publicSlug, publicSlug),
eq(users.status, "active"),
eq(rssItems.visibility, "unlisted"),
eq(rssItems.shareToken, shareToken),
),
)
.limit(1);
return row?.it ?? null;
}
type ParsedItem = {
guid?: string;
canonicalUrl: string;
title?: string;
summary?: string;
contentSnippet?: string;
author?: string;
publishedAt?: Date;
};
function normalizeLink(link: unknown): string | undefined {
if (typeof link === "string") {
return link;
}
if (link && typeof link === "object" && "@_href" in link) {
return String((link as { "@_href": string })["@_href"]);
}
return undefined;
}
function parseFeedXml(xml: string): { title?: string; siteUrl?: string; items: ParsedItem[] } {
const parser = new XMLParser({
ignoreAttributes: false,
attributeNamePrefix: "@_",
isArray: (name) => name === "item" || name === "entry",
});
const doc = parser.parse(xml) as Record<string, unknown>;
if (doc.rss && typeof doc.rss === "object") {
const channel = (doc.rss as { channel?: Record<string, unknown> }).channel;
if (!channel) {
return { items: [] };
}
const rawItems = channel.item;
const itemsArr = Array.isArray(rawItems) ? rawItems : rawItems ? [rawItems] : [];
const items: ParsedItem[] = [];
for (const it of itemsArr as Record<string, unknown>[]) {
const link = normalizeLink(it.link) ?? normalizeLink(it.guid);
if (!link) {
continue;
}
const guid = typeof it.guid === "string" ? it.guid : typeof it.guid === "object" && it.guid && "#text" in (it.guid as object)
? String((it.guid as { "#text": string })["#text"])
: undefined;
let pub: Date | undefined;
if (typeof it.pubDate === "string") {
const d = new Date(it.pubDate);
if (!Number.isNaN(d.getTime())) {
pub = d;
}
}
items.push({
guid,
canonicalUrl: link,
title: typeof it.title === "string" ? it.title : undefined,
summary:
typeof it.description === "string"
? it.description.slice(0, 4000)
: typeof it["content:encoded"] === "string"
? String(it["content:encoded"]).slice(0, 4000)
: undefined,
contentSnippet: typeof it.description === "string" ? it.description.slice(0, 500) : undefined,
author: typeof it.author === "string" ? it.author : undefined,
publishedAt: pub,
});
}
return {
title: typeof channel.title === "string" ? channel.title : undefined,
siteUrl: typeof channel.link === "string" ? channel.link : undefined,
items,
};
}
if (doc.feed && typeof doc.feed === "object") {
const feed = doc.feed as Record<string, unknown>;
const rawEntries = feed.entry;
const entries = Array.isArray(rawEntries) ? rawEntries : rawEntries ? [rawEntries] : [];
const items: ParsedItem[] = [];
for (const en of entries as Record<string, unknown>[]) {
const link = normalizeLink(en.link) ?? normalizeLink(en.id);
if (!link) {
continue;
}
const guid = typeof en.id === "string" ? en.id : undefined;
let pub: Date | undefined;
if (typeof en.updated === "string") {
const d = new Date(en.updated);
if (!Number.isNaN(d.getTime())) {
pub = d;
}
}
items.push({
guid,
canonicalUrl: link,
title: typeof en.title === "string" ? en.title : typeof en.title === "object" && en.title && "#text" in (en.title as object)
? String((en.title as { "#text": string })["#text"])
: undefined,
summary: typeof en.summary === "string" ? en.summary.slice(0, 4000) : undefined,
contentSnippet: typeof en.summary === "string" ? en.summary.slice(0, 500) : undefined,
author:
typeof en.author === "string"
? en.author
: typeof en.author === "object" && en.author && "name" in (en.author as object)
? String((en.author as { name?: string }).name)
: undefined,
publishedAt: pub,
});
}
return {
title:
typeof feed.title === "string"
? feed.title
: typeof feed.title === "object" && feed.title && "#text" in (feed.title as object)
? String((feed.title as { "#text": string })["#text"])
: undefined,
siteUrl: typeof feed.id === "string" ? feed.id : undefined,
items,
};
}
return { items: [] };
}
async function readResponseBody(res: Response): Promise<string> {
const reader = res.body?.getReader();
if (!reader) {
return await res.text();
}
const chunks: Uint8Array[] = [];
let total = 0;
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
if (value) {
total += value.length;
if (total > MAX_BODY_BYTES) {
reader.cancel();
throw new Error("响应过大");
}
chunks.push(value);
}
}
const all = new Uint8Array(total);
let off = 0;
for (const c of chunks) {
all.set(c, off);
off += c.length;
}
return new TextDecoder().decode(all);
}
export async function syncFeed(feedId: number): Promise<{ ok: boolean; error?: string }> {
const [feed] = await dbGlobal.select().from(rssFeeds).where(eq(rssFeeds.id, feedId)).limit(1);
if (!feed) {
return { ok: false, error: "feed not found" };
}
try {
assertSafeRssUrl(feed.feedUrl);
} catch (e) {
const msg = e instanceof RssUrlUnsafeError ? e.message : "unsafe url";
await dbGlobal
.update(rssFeeds)
.set({ lastError: msg, lastFetchedAt: new Date() })
.where(eq(rssFeeds.id, feedId));
return { ok: false, error: msg };
}
const ac = new AbortController();
const t = setTimeout(() => ac.abort(), FETCH_TIMEOUT_MS);
try {
const res = await fetch(feed.feedUrl, {
signal: ac.signal,
headers: { "User-Agent": "PersonPanelRSS/1.0" },
redirect: "follow",
});
if (!res.ok) {
throw new Error(`HTTP ${res.status}`);
}
const xml = await readResponseBody(res);
const parsed = parseFeedXml(xml);
await dbGlobal
.update(rssFeeds)
.set({
title: parsed.title ?? feed.title,
siteUrl: parsed.siteUrl ?? feed.siteUrl,
lastFetchedAt: new Date(),
lastError: null,
})
.where(eq(rssFeeds.id, feedId));
for (const it of parsed.items) {
const dedupeCond = it.guid
? and(eq(rssItems.feedId, feedId), eq(rssItems.guid, it.guid))
: and(eq(rssItems.feedId, feedId), eq(rssItems.canonicalUrl, it.canonicalUrl));
const existing = await dbGlobal
.select({ id: rssItems.id })
.from(rssItems)
.where(dedupeCond)
.limit(1);
if (existing.length > 0) {
continue;
}
const id = await nextIntegerId(rssItems, rssItems.id);
await dbGlobal.insert(rssItems).values({
id,
userId: feed.userId,
feedId,
guid: it.guid ?? null,
canonicalUrl: it.canonicalUrl,
title: it.title ?? null,
summary: it.summary ?? null,
contentSnippet: it.contentSnippet ?? null,
author: it.author ?? null,
publishedAt: it.publishedAt ?? null,
visibility: "private",
shareToken: null,
});
}
return { ok: true };
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
await dbGlobal
.update(rssFeeds)
.set({ lastError: msg, lastFetchedAt: new Date() })
.where(eq(rssFeeds.id, feedId));
return { ok: false, error: msg };
} finally {
clearTimeout(t);
}
}
export async function syncDueFeedsForUser(userId: number) {
const feeds = await listFeedsForUser(userId);
const now = Date.now();
for (const f of feeds) {
const intervalMs = (f.pollIntervalMinutes ?? DEFAULT_POLL_MINUTES) * 60_000;
const last = f.lastFetchedAt ? f.lastFetchedAt.getTime() : 0;
if (now - last >= intervalMs) {
await syncFeed(f.id);
}
}
}
export async function syncAllDueFeeds() {
const allFeeds = await dbGlobal.select().from(rssFeeds);
const now = Date.now();
for (const f of allFeeds) {
const intervalMs = (f.pollIntervalMinutes ?? DEFAULT_POLL_MINUTES) * 60_000;
const last = f.lastFetchedAt ? f.lastFetchedAt.getTime() : 0;
if (now - last >= intervalMs) {
await syncFeed(f.id);
}
}
}