You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
404 lines
13 KiB
404 lines
13 KiB
import { dbGlobal } from "drizzle-pkg/lib/db";
|
|
import { rssFeeds, rssItems } from "drizzle-pkg/lib/schema/rss";
|
|
import { users } from "drizzle-pkg/lib/schema/auth";
|
|
import { PUBLIC_LIST_PAGE_SIZE, PUBLIC_PREVIEW_LIMIT } from "#server/constants/public-profile-lists";
|
|
import { normalizePublicListPage } from "#server/utils/public-pagination";
|
|
import { and, count, desc, eq } from "drizzle-orm";
|
|
import { XMLParser } from "fast-xml-parser";
|
|
import { assertSafeRssUrl } from "#server/utils/rss-url";
|
|
import { RssUrlUnsafeError } from "#server/utils/rss-url";
|
|
import { nextIntegerId } from "#server/utils/sqlite-id";
|
|
import { visibilityShareToken } from "#server/utils/share-token";
|
|
|
|
const DEFAULT_POLL_MINUTES = Number(process.env.RSS_SYNC_INTERVAL_MINUTES ?? 60);
|
|
const FETCH_TIMEOUT_MS = Number(process.env.RSS_FETCH_TIMEOUT_MS ?? 15_000);
|
|
const MAX_BODY_BYTES = Number(process.env.RSS_MAX_RESPONSE_BYTES ?? 2_000_000);
|
|
|
|
export async function listFeedsForUser(userId: number) {
|
|
return dbGlobal.select().from(rssFeeds).where(eq(rssFeeds.userId, userId)).orderBy(desc(rssFeeds.id));
|
|
}
|
|
|
|
export async function addFeed(userId: number, feedUrl: string) {
|
|
const url = assertSafeRssUrl(feedUrl.trim());
|
|
const [dup] = await dbGlobal
|
|
.select({ id: rssFeeds.id })
|
|
.from(rssFeeds)
|
|
.where(and(eq(rssFeeds.userId, userId), eq(rssFeeds.feedUrl, url)))
|
|
.limit(1);
|
|
if (dup) {
|
|
throw createError({ statusCode: 409, statusMessage: "该订阅地址已存在" });
|
|
}
|
|
const id = await nextIntegerId(rssFeeds, rssFeeds.id);
|
|
await dbGlobal.insert(rssFeeds).values({
|
|
id,
|
|
userId,
|
|
feedUrl: url,
|
|
});
|
|
const [row] = await dbGlobal.select().from(rssFeeds).where(eq(rssFeeds.id, id)).limit(1);
|
|
return row;
|
|
}
|
|
|
|
export async function deleteFeed(userId: number, feedId: number) {
|
|
const [row] = await dbGlobal
|
|
.select({ id: rssFeeds.id })
|
|
.from(rssFeeds)
|
|
.where(and(eq(rssFeeds.id, feedId), eq(rssFeeds.userId, userId)))
|
|
.limit(1);
|
|
if (!row) {
|
|
return false;
|
|
}
|
|
await dbGlobal.delete(rssFeeds).where(and(eq(rssFeeds.id, feedId), eq(rssFeeds.userId, userId)));
|
|
return true;
|
|
}
|
|
|
|
export async function listItemsForUser(userId: number, feedId?: number) {
|
|
const cond = feedId !== undefined ? and(eq(rssItems.userId, userId), eq(rssItems.feedId, feedId)) : eq(rssItems.userId, userId);
|
|
return dbGlobal.select().from(rssItems).where(cond).orderBy(desc(rssItems.publishedAt), desc(rssItems.id)).limit(500);
|
|
}
|
|
|
|
export async function updateItemVisibility(userId: number, itemId: number, visibility: "private" | "unlisted" | "public") {
|
|
const [existing] = await dbGlobal
|
|
.select()
|
|
.from(rssItems)
|
|
.where(and(eq(rssItems.id, itemId), eq(rssItems.userId, userId)))
|
|
.limit(1);
|
|
if (!existing) {
|
|
return null;
|
|
}
|
|
const shareToken = visibilityShareToken(visibility, existing.shareToken);
|
|
await dbGlobal
|
|
.update(rssItems)
|
|
.set({ visibility, shareToken })
|
|
.where(and(eq(rssItems.id, itemId), eq(rssItems.userId, userId)));
|
|
const [row] = await dbGlobal
|
|
.select()
|
|
.from(rssItems)
|
|
.where(and(eq(rssItems.id, itemId), eq(rssItems.userId, userId)))
|
|
.limit(1);
|
|
return row ?? null;
|
|
}
|
|
|
|
function publicRssListWhere(publicSlug: string) {
|
|
return and(
|
|
eq(users.publicSlug, publicSlug),
|
|
eq(users.status, "active"),
|
|
eq(rssItems.visibility, "public"),
|
|
);
|
|
}
|
|
|
|
export async function getPublicRssPreviewBySlug(publicSlug: string) {
|
|
const whereClause = publicRssListWhere(publicSlug);
|
|
const [countRows, rows] = await Promise.all([
|
|
dbGlobal
|
|
.select({ total: count() })
|
|
.from(rssItems)
|
|
.innerJoin(users, eq(rssItems.userId, users.id))
|
|
.where(whereClause),
|
|
dbGlobal
|
|
.select({ it: rssItems })
|
|
.from(rssItems)
|
|
.innerJoin(users, eq(rssItems.userId, users.id))
|
|
.where(whereClause)
|
|
.orderBy(desc(rssItems.publishedAt), desc(rssItems.id))
|
|
.limit(PUBLIC_PREVIEW_LIMIT),
|
|
]);
|
|
return { items: rows.map((r) => r.it), total: countRows[0]?.total ?? 0 };
|
|
}
|
|
|
|
export async function getPublicRssPageBySlug(publicSlug: string, pageRaw: unknown) {
|
|
const page = normalizePublicListPage(pageRaw);
|
|
const pageSize = PUBLIC_LIST_PAGE_SIZE;
|
|
const offset = (page - 1) * pageSize;
|
|
const whereClause = publicRssListWhere(publicSlug);
|
|
const [countRows, rows] = await Promise.all([
|
|
dbGlobal
|
|
.select({ total: count() })
|
|
.from(rssItems)
|
|
.innerJoin(users, eq(rssItems.userId, users.id))
|
|
.where(whereClause),
|
|
dbGlobal
|
|
.select({ it: rssItems })
|
|
.from(rssItems)
|
|
.innerJoin(users, eq(rssItems.userId, users.id))
|
|
.where(whereClause)
|
|
.orderBy(desc(rssItems.publishedAt), desc(rssItems.id))
|
|
.limit(pageSize)
|
|
.offset(offset),
|
|
]);
|
|
return { items: rows.map((r) => r.it), total: countRows[0]?.total ?? 0, page, pageSize };
|
|
}
|
|
|
|
export async function getUnlistedRssItem(publicSlug: string, shareToken: string) {
|
|
const [row] = await dbGlobal
|
|
.select({ it: rssItems })
|
|
.from(rssItems)
|
|
.innerJoin(users, eq(rssItems.userId, users.id))
|
|
.where(
|
|
and(
|
|
eq(users.publicSlug, publicSlug),
|
|
eq(users.status, "active"),
|
|
eq(rssItems.visibility, "unlisted"),
|
|
eq(rssItems.shareToken, shareToken),
|
|
),
|
|
)
|
|
.limit(1);
|
|
return row?.it ?? null;
|
|
}
|
|
|
|
type ParsedItem = {
|
|
guid?: string;
|
|
canonicalUrl: string;
|
|
title?: string;
|
|
summary?: string;
|
|
contentSnippet?: string;
|
|
author?: string;
|
|
publishedAt?: Date;
|
|
};
|
|
|
|
function normalizeLink(link: unknown): string | undefined {
|
|
if (typeof link === "string") {
|
|
return link;
|
|
}
|
|
if (link && typeof link === "object" && "@_href" in link) {
|
|
return String((link as { "@_href": string })["@_href"]);
|
|
}
|
|
return undefined;
|
|
}
|
|
|
|
function parseFeedXml(xml: string): { title?: string; siteUrl?: string; items: ParsedItem[] } {
|
|
const parser = new XMLParser({
|
|
ignoreAttributes: false,
|
|
attributeNamePrefix: "@_",
|
|
isArray: (name) => name === "item" || name === "entry",
|
|
});
|
|
const doc = parser.parse(xml) as Record<string, unknown>;
|
|
|
|
if (doc.rss && typeof doc.rss === "object") {
|
|
const channel = (doc.rss as { channel?: Record<string, unknown> }).channel;
|
|
if (!channel) {
|
|
return { items: [] };
|
|
}
|
|
const rawItems = channel.item;
|
|
const itemsArr = Array.isArray(rawItems) ? rawItems : rawItems ? [rawItems] : [];
|
|
const items: ParsedItem[] = [];
|
|
for (const it of itemsArr as Record<string, unknown>[]) {
|
|
const link = normalizeLink(it.link) ?? normalizeLink(it.guid);
|
|
if (!link) {
|
|
continue;
|
|
}
|
|
const guid = typeof it.guid === "string" ? it.guid : typeof it.guid === "object" && it.guid && "#text" in (it.guid as object)
|
|
? String((it.guid as { "#text": string })["#text"])
|
|
: undefined;
|
|
let pub: Date | undefined;
|
|
if (typeof it.pubDate === "string") {
|
|
const d = new Date(it.pubDate);
|
|
if (!Number.isNaN(d.getTime())) {
|
|
pub = d;
|
|
}
|
|
}
|
|
items.push({
|
|
guid,
|
|
canonicalUrl: link,
|
|
title: typeof it.title === "string" ? it.title : undefined,
|
|
summary:
|
|
typeof it.description === "string"
|
|
? it.description.slice(0, 4000)
|
|
: typeof it["content:encoded"] === "string"
|
|
? String(it["content:encoded"]).slice(0, 4000)
|
|
: undefined,
|
|
contentSnippet: typeof it.description === "string" ? it.description.slice(0, 500) : undefined,
|
|
author: typeof it.author === "string" ? it.author : undefined,
|
|
publishedAt: pub,
|
|
});
|
|
}
|
|
return {
|
|
title: typeof channel.title === "string" ? channel.title : undefined,
|
|
siteUrl: typeof channel.link === "string" ? channel.link : undefined,
|
|
items,
|
|
};
|
|
}
|
|
|
|
if (doc.feed && typeof doc.feed === "object") {
|
|
const feed = doc.feed as Record<string, unknown>;
|
|
const rawEntries = feed.entry;
|
|
const entries = Array.isArray(rawEntries) ? rawEntries : rawEntries ? [rawEntries] : [];
|
|
const items: ParsedItem[] = [];
|
|
for (const en of entries as Record<string, unknown>[]) {
|
|
const link = normalizeLink(en.link) ?? normalizeLink(en.id);
|
|
if (!link) {
|
|
continue;
|
|
}
|
|
const guid = typeof en.id === "string" ? en.id : undefined;
|
|
let pub: Date | undefined;
|
|
if (typeof en.updated === "string") {
|
|
const d = new Date(en.updated);
|
|
if (!Number.isNaN(d.getTime())) {
|
|
pub = d;
|
|
}
|
|
}
|
|
items.push({
|
|
guid,
|
|
canonicalUrl: link,
|
|
title: typeof en.title === "string" ? en.title : typeof en.title === "object" && en.title && "#text" in (en.title as object)
|
|
? String((en.title as { "#text": string })["#text"])
|
|
: undefined,
|
|
summary: typeof en.summary === "string" ? en.summary.slice(0, 4000) : undefined,
|
|
contentSnippet: typeof en.summary === "string" ? en.summary.slice(0, 500) : undefined,
|
|
author:
|
|
typeof en.author === "string"
|
|
? en.author
|
|
: typeof en.author === "object" && en.author && "name" in (en.author as object)
|
|
? String((en.author as { name?: string }).name)
|
|
: undefined,
|
|
publishedAt: pub,
|
|
});
|
|
}
|
|
return {
|
|
title:
|
|
typeof feed.title === "string"
|
|
? feed.title
|
|
: typeof feed.title === "object" && feed.title && "#text" in (feed.title as object)
|
|
? String((feed.title as { "#text": string })["#text"])
|
|
: undefined,
|
|
siteUrl: typeof feed.id === "string" ? feed.id : undefined,
|
|
items,
|
|
};
|
|
}
|
|
|
|
return { items: [] };
|
|
}
|
|
|
|
async function readResponseBody(res: Response): Promise<string> {
|
|
const reader = res.body?.getReader();
|
|
if (!reader) {
|
|
return await res.text();
|
|
}
|
|
const chunks: Uint8Array[] = [];
|
|
let total = 0;
|
|
while (true) {
|
|
const { done, value } = await reader.read();
|
|
if (done) {
|
|
break;
|
|
}
|
|
if (value) {
|
|
total += value.length;
|
|
if (total > MAX_BODY_BYTES) {
|
|
reader.cancel();
|
|
throw new Error("响应过大");
|
|
}
|
|
chunks.push(value);
|
|
}
|
|
}
|
|
const all = new Uint8Array(total);
|
|
let off = 0;
|
|
for (const c of chunks) {
|
|
all.set(c, off);
|
|
off += c.length;
|
|
}
|
|
return new TextDecoder().decode(all);
|
|
}
|
|
|
|
export async function syncFeed(feedId: number): Promise<{ ok: boolean; error?: string }> {
|
|
const [feed] = await dbGlobal.select().from(rssFeeds).where(eq(rssFeeds.id, feedId)).limit(1);
|
|
if (!feed) {
|
|
return { ok: false, error: "feed not found" };
|
|
}
|
|
try {
|
|
assertSafeRssUrl(feed.feedUrl);
|
|
} catch (e) {
|
|
const msg = e instanceof RssUrlUnsafeError ? e.message : "unsafe url";
|
|
await dbGlobal
|
|
.update(rssFeeds)
|
|
.set({ lastError: msg, lastFetchedAt: new Date() })
|
|
.where(eq(rssFeeds.id, feedId));
|
|
return { ok: false, error: msg };
|
|
}
|
|
|
|
const ac = new AbortController();
|
|
const t = setTimeout(() => ac.abort(), FETCH_TIMEOUT_MS);
|
|
try {
|
|
const res = await fetch(feed.feedUrl, {
|
|
signal: ac.signal,
|
|
headers: { "User-Agent": "PersonPanelRSS/1.0" },
|
|
redirect: "follow",
|
|
});
|
|
if (!res.ok) {
|
|
throw new Error(`HTTP ${res.status}`);
|
|
}
|
|
const xml = await readResponseBody(res);
|
|
const parsed = parseFeedXml(xml);
|
|
|
|
await dbGlobal
|
|
.update(rssFeeds)
|
|
.set({
|
|
title: parsed.title ?? feed.title,
|
|
siteUrl: parsed.siteUrl ?? feed.siteUrl,
|
|
lastFetchedAt: new Date(),
|
|
lastError: null,
|
|
})
|
|
.where(eq(rssFeeds.id, feedId));
|
|
|
|
for (const it of parsed.items) {
|
|
const dedupeCond = it.guid
|
|
? and(eq(rssItems.feedId, feedId), eq(rssItems.guid, it.guid))
|
|
: and(eq(rssItems.feedId, feedId), eq(rssItems.canonicalUrl, it.canonicalUrl));
|
|
const existing = await dbGlobal
|
|
.select({ id: rssItems.id })
|
|
.from(rssItems)
|
|
.where(dedupeCond)
|
|
.limit(1);
|
|
if (existing.length > 0) {
|
|
continue;
|
|
}
|
|
const id = await nextIntegerId(rssItems, rssItems.id);
|
|
await dbGlobal.insert(rssItems).values({
|
|
id,
|
|
userId: feed.userId,
|
|
feedId,
|
|
guid: it.guid ?? null,
|
|
canonicalUrl: it.canonicalUrl,
|
|
title: it.title ?? null,
|
|
summary: it.summary ?? null,
|
|
contentSnippet: it.contentSnippet ?? null,
|
|
author: it.author ?? null,
|
|
publishedAt: it.publishedAt ?? null,
|
|
visibility: "private",
|
|
shareToken: null,
|
|
});
|
|
}
|
|
return { ok: true };
|
|
} catch (e) {
|
|
const msg = e instanceof Error ? e.message : String(e);
|
|
await dbGlobal
|
|
.update(rssFeeds)
|
|
.set({ lastError: msg, lastFetchedAt: new Date() })
|
|
.where(eq(rssFeeds.id, feedId));
|
|
return { ok: false, error: msg };
|
|
} finally {
|
|
clearTimeout(t);
|
|
}
|
|
}
|
|
|
|
export async function syncDueFeedsForUser(userId: number) {
|
|
const feeds = await listFeedsForUser(userId);
|
|
const now = Date.now();
|
|
for (const f of feeds) {
|
|
const intervalMs = (f.pollIntervalMinutes ?? DEFAULT_POLL_MINUTES) * 60_000;
|
|
const last = f.lastFetchedAt ? f.lastFetchedAt.getTime() : 0;
|
|
if (now - last >= intervalMs) {
|
|
await syncFeed(f.id);
|
|
}
|
|
}
|
|
}
|
|
|
|
export async function syncAllDueFeeds() {
|
|
const allFeeds = await dbGlobal.select().from(rssFeeds);
|
|
const now = Date.now();
|
|
for (const f of allFeeds) {
|
|
const intervalMs = (f.pollIntervalMinutes ?? DEFAULT_POLL_MINUTES) * 60_000;
|
|
const last = f.lastFetchedAt ? f.lastFetchedAt.getTime() : 0;
|
|
if (now - last >= intervalMs) {
|
|
await syncFeed(f.id);
|
|
}
|
|
}
|
|
}
|
|
|