wow
This commit is contained in:
commit
6786cb6148
31 changed files with 2352 additions and 0 deletions
397
apps/backend/src/index.ts
Normal file
397
apps/backend/src/index.ts
Normal file
|
|
@ -0,0 +1,397 @@
|
|||
import { Elysia, t } from "elysia";
|
||||
import config from "../config.toml";
|
||||
import { S3Client } from "bun";
|
||||
import {
|
||||
HeadObjectCommand,
|
||||
PutObjectCommand,
|
||||
S3Client as AwsS3Client,
|
||||
} from "@aws-sdk/client-s3";
|
||||
import * as mongoose from "mongoose";
|
||||
import openapi from "@elysiajs/openapi";
|
||||
|
||||
await mongoose.connect(config.mongodb.uri);
|
||||
|
||||
const mediaUploadSchema = new mongoose.Schema({
|
||||
tweetId: { type: String, required: true },
|
||||
tweet: { type: Object, required: true },
|
||||
mediaIndex: { type: Number, required: true },
|
||||
mediaUrl: { type: String, required: true },
|
||||
s3Key: { type: String, required: true },
|
||||
tags: { type: [String], default: [] },
|
||||
author: { type: String, required: true },
|
||||
}, {
|
||||
timestamps: true,
|
||||
});
|
||||
|
||||
const tagSchema = new mongoose.Schema({
|
||||
name: { type: String, required: true, unique: true },
|
||||
usageCount: { type: Number, default: 0 },
|
||||
lastUsedAt: { type: Date, default: Date.now },
|
||||
}, {
|
||||
timestamps: true,
|
||||
});
|
||||
|
||||
const MediaUpload = mongoose.models.MediaUpload || mongoose.model("MediaUpload", mediaUploadSchema);
|
||||
const Tag = mongoose.models.Tag || mongoose.model("Tag", tagSchema);
|
||||
const inFlightUploads = new Set<string>();
|
||||
|
||||
const client = new S3Client({
|
||||
accessKeyId: config.s3.access_key,
|
||||
secretAccessKey: config.s3.secret_key,
|
||||
bucket: config.s3.bucket,
|
||||
endpoint: config.s3.endpoint,
|
||||
});
|
||||
|
||||
const awsClient = new AwsS3Client({
|
||||
region: "auto",
|
||||
endpoint: config.s3.endpoint,
|
||||
forcePathStyle: true,
|
||||
credentials: {
|
||||
accessKeyId: config.s3.access_key,
|
||||
secretAccessKey: config.s3.secret_key,
|
||||
},
|
||||
});
|
||||
|
||||
async function checkTweetData(url: string, selected: Array<boolean>) {
|
||||
// get tweet id from url
|
||||
const match = url.match(/\/status\/(\d+)/);
|
||||
if (!match) {
|
||||
throw new Error("Invalid tweet URL");
|
||||
}
|
||||
const tweetId = match[1];
|
||||
// find in mongodb if there is already a record with the same tweet id and media index in selected
|
||||
const existing = await MediaUpload.findOne({ tweetId, mediaIndex: { $in: selected.map((s, i) => s ? i : -1).filter(i => i >= 0) } });
|
||||
if (existing) {
|
||||
return true;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
async function fetchTweetData(url: string) {
|
||||
const apiUrl = `https://api.fxtwitter.com/${url.replace(/^https?:\/\/(www\.)?(x\.com|twitter\.com|fxtwitter\.com|fixupx\.com|vxwitter\.com)\//, "")}`;
|
||||
const response = await fetch(apiUrl);
|
||||
if (response.ok) {
|
||||
// const dataText = await response.text();
|
||||
// console.log("Raw API response:", dataText);
|
||||
const data = await response.json();
|
||||
return data;
|
||||
} else {
|
||||
throw new Error(`Failed to fetch tweet data: ${response.status} ${response.statusText}`);
|
||||
}
|
||||
}
|
||||
|
||||
function makeS3FileName(authorId: string, tweetId: string, mediaUrl: string, index: number) {
|
||||
const rawName = mediaUrl.split("/").pop() || `media_${Date.now()}_${index}`;
|
||||
const withoutQuery = rawName.split("?")[0]?.split("#")[0] || `media_${Date.now()}_${index}`;
|
||||
const safeName = withoutQuery.replace(/[^a-zA-Z0-9._-]/g, "_");
|
||||
return `twitter/${authorId}/${tweetId}/${safeName || `media_${Date.now()}_${index}`}`;
|
||||
}
|
||||
|
||||
async function uploadToS3WithRetry(fileName: string, mediaUrl: string, maxRetry = 3) {
|
||||
async function existsInS3(key: string) {
|
||||
try {
|
||||
return await client.exists(key);
|
||||
} catch {
|
||||
try {
|
||||
await awsClient.send(new HeadObjectCommand({
|
||||
Bucket: config.s3.bucket,
|
||||
Key: key,
|
||||
}));
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function writeToS3(key: string, body: Uint8Array, mediaType?: string | null) {
|
||||
try {
|
||||
await client.write(key, body);
|
||||
return;
|
||||
} catch (bunWriteError) {
|
||||
console.warn(`[S3 bun write failed, fallback to aws-sdk] key=${key}`, bunWriteError);
|
||||
await awsClient.send(new PutObjectCommand({
|
||||
Bucket: config.s3.bucket,
|
||||
Key: key,
|
||||
Body: body,
|
||||
ContentType: mediaType ?? undefined,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
async function recoverByPollingExists(reason: string) {
|
||||
for (let probe = 1; probe <= 4; probe++) {
|
||||
await Bun.sleep(probe * 600);
|
||||
try {
|
||||
if (await existsInS3(fileName)) {
|
||||
console.warn(`[S3 upload recovered-${reason}] key=${fileName} probe=${probe}`);
|
||||
return true;
|
||||
}
|
||||
} catch (existsError) {
|
||||
console.error(`[S3 exists probe failed] key=${fileName} probe=${probe}`, existsError);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
let lastError: unknown;
|
||||
|
||||
for (let attempt = 1; attempt <= maxRetry; attempt++) {
|
||||
try {
|
||||
const response = await fetch(mediaUrl);
|
||||
if (!response.ok) {
|
||||
throw new Error(`Failed to fetch media from ${mediaUrl}: ${response.status} ${response.statusText}`);
|
||||
}
|
||||
|
||||
const arrayBuffer = await response.arrayBuffer();
|
||||
const buffer = Buffer.from(arrayBuffer);
|
||||
await writeToS3(fileName, buffer, response.headers.get("content-type"));
|
||||
return;
|
||||
} catch (error) {
|
||||
lastError = error;
|
||||
console.error(`[S3 upload attempt ${attempt}/${maxRetry}] key=${fileName} url=${mediaUrl}`, error);
|
||||
|
||||
const errorCode =
|
||||
typeof error === "object" && error !== null && "code" in error
|
||||
? String((error as { code?: unknown }).code)
|
||||
: "";
|
||||
|
||||
// Some S3 providers return UnknownError even when the object is eventually persisted.
|
||||
if (errorCode === "UnknownError") {
|
||||
if (await recoverByPollingExists("unknown")) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (attempt < maxRetry) {
|
||||
await Bun.sleep(attempt * 800);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Final guard: do one last exists check before surfacing failure.
|
||||
if (await recoverByPollingExists("final")) {
|
||||
return;
|
||||
}
|
||||
|
||||
throw lastError;
|
||||
}
|
||||
|
||||
function normalizeTags(tags: string[]) {
|
||||
const unique = new Set(
|
||||
tags
|
||||
.map((tag) => tag.trim())
|
||||
.filter((tag) => tag.length > 0),
|
||||
);
|
||||
|
||||
if (unique.size === 0) {
|
||||
return ["미분류"];
|
||||
}
|
||||
|
||||
return Array.from(unique);
|
||||
}
|
||||
|
||||
function normalizeQueryTags(tags?: string | string[]) {
|
||||
if (!tags) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const rawTags = Array.isArray(tags) ? tags : [tags];
|
||||
return normalizeTags(rawTags);
|
||||
}
|
||||
|
||||
function buildUploadKey(url: string, selected: boolean[]) {
|
||||
const match = url.match(/\/status\/(\d+)/);
|
||||
const tweetId = match?.[1] ?? url;
|
||||
const selectedIndices = selected
|
||||
.map((isSelected, index) => (isSelected ? index : -1))
|
||||
.filter((index) => index >= 0)
|
||||
.join(",");
|
||||
|
||||
return `${tweetId}:${selectedIndices}`;
|
||||
}
|
||||
|
||||
async function saveTags(tags: string[]) {
|
||||
await Promise.all(
|
||||
tags.map((tag) =>
|
||||
Tag.updateOne(
|
||||
{ name: tag },
|
||||
{
|
||||
$inc: { usageCount: 1 },
|
||||
$set: { lastUsedAt: new Date() },
|
||||
$setOnInsert: { name: tag },
|
||||
},
|
||||
{ upsert: true },
|
||||
),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
const app = new Elysia()
|
||||
.use(openapi())
|
||||
.get("/", () => "어...")
|
||||
|
||||
.get("/total", async ({ query }) => {
|
||||
const filterTags = normalizeQueryTags(query.tags);
|
||||
const filter = filterTags.length > 0
|
||||
? { tags: { $in: filterTags } }
|
||||
: {};
|
||||
|
||||
const count = await MediaUpload.countDocuments(filter);
|
||||
return count;
|
||||
}, {
|
||||
query: t.Object({
|
||||
tags: t.Optional(t.Union([t.String(), t.Array(t.String())])),
|
||||
})
|
||||
})
|
||||
|
||||
.get("/list", async ({ query }) => {
|
||||
const page = query.page;
|
||||
const pageSize = 20;
|
||||
const filterTags = normalizeQueryTags(query.tags);
|
||||
|
||||
const filter = filterTags.length > 0
|
||||
? { tags: { $in: filterTags } }
|
||||
: {};
|
||||
|
||||
const uploads = await MediaUpload.find(filter)
|
||||
.sort({ createdAt: -1 })
|
||||
.skip((page - 1) * pageSize)
|
||||
.limit(pageSize);
|
||||
return uploads;
|
||||
}, {
|
||||
query: t.Object({
|
||||
page: t.Number({default: 1, minimum: 1}),
|
||||
tags: t.Optional(t.Union([t.String(), t.Array(t.String())])),
|
||||
})
|
||||
})
|
||||
|
||||
.get("/tags", async () => {
|
||||
const tags = await Tag.find().sort({ usageCount: -1, lastUsedAt: -1 });
|
||||
return tags;
|
||||
})
|
||||
|
||||
.post("/upload", async ({ body, status }) => {
|
||||
if (body.url.startsWith("https://www.pixiv.net/")) {
|
||||
return "저는 저능아입니다";
|
||||
} else if (body.url.startsWith("https://x.com/") || body.url.startsWith("https://twitter.com/") || body.url.startsWith("https://fxtwitter.com/") || body.url.startsWith("https://fixupx.com/") || body.url.startsWith("https://vxwitter.com/")) {
|
||||
const requestId = crypto.randomUUID();
|
||||
const uploadKey = buildUploadKey(body.url, body.selected);
|
||||
|
||||
if (inFlightUploads.has(uploadKey)) {
|
||||
console.warn(`[Upload skipped-duplicate] requestId=${requestId} key=${uploadKey}`);
|
||||
return status(202, "이미 처리 중인 업로드입니다.");
|
||||
}
|
||||
|
||||
inFlightUploads.add(uploadKey);
|
||||
console.log(`[Upload started] requestId=${requestId} key=${uploadKey}`);
|
||||
|
||||
if (await checkTweetData(body.url, body.selected)) {
|
||||
inFlightUploads.delete(uploadKey);
|
||||
console.log(`[Upload skipped-existing] requestId=${requestId} key=${uploadKey}`);
|
||||
return "이미 저장된 트윗입니다.";
|
||||
}
|
||||
try {
|
||||
const tweetData = await fetchTweetData(body.url);
|
||||
if (tweetData.tweet) {
|
||||
const media = tweetData.tweet.media.photos || [];
|
||||
if (media.length > 0) {
|
||||
const mediaUrls = media.map((m: any) => m.url);
|
||||
// Upload to S3
|
||||
|
||||
let savedCount = 0;
|
||||
let failedCount = 0;
|
||||
const hasExplicitSelection = body.selected.length > 0;
|
||||
for (const [index, url] of mediaUrls.entries()) {
|
||||
const isSelected = hasExplicitSelection
|
||||
? body.selected[index] === true
|
||||
: true;
|
||||
|
||||
if (!isSelected) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const fileName = makeS3FileName(tweetData.tweet.author.id, tweetData.tweet.id, url, index);
|
||||
|
||||
try {
|
||||
if (await client.exists(fileName)) {
|
||||
console.log(`File ${fileName} already exists in S3, skipping upload.`);
|
||||
} else {
|
||||
await uploadToS3WithRetry(fileName, url);
|
||||
console.log(`Uploaded ${fileName} to S3`);
|
||||
}
|
||||
|
||||
const { media: _media, ...tweetWithoutMedia } = tweetData.tweet;
|
||||
const normalizedTags = normalizeTags(body.tag || ["미분류"]);
|
||||
|
||||
await MediaUpload.create({
|
||||
type: "twitter",
|
||||
tweetId: tweetData.tweet.id,
|
||||
tweet: tweetWithoutMedia,
|
||||
mediaIndex: index,
|
||||
mediaUrl: `${config.s3.endpoint}/${config.s3.bucket}/${fileName}`,
|
||||
s3Key: fileName,
|
||||
tags: normalizedTags,
|
||||
author: body.author ? body.author : tweetData.tweet.author.name,
|
||||
});
|
||||
|
||||
await saveTags(normalizedTags);
|
||||
savedCount += 1;
|
||||
} catch (error) {
|
||||
failedCount += 1;
|
||||
console.error(`[Upload failed] index=${index} url=${url} key=${fileName}`, error);
|
||||
}
|
||||
}
|
||||
|
||||
if (savedCount === 0 && failedCount === 0) {
|
||||
console.warn("No media uploaded: selected[] did not include any upload target.");
|
||||
}
|
||||
|
||||
console.log(`Saved ${savedCount} media records to MongoDB. Failed: ${failedCount}`);
|
||||
} else {
|
||||
console.log("No media found in the tweet.");
|
||||
}
|
||||
}
|
||||
console.log(`[Upload finished] requestId=${requestId} key=${uploadKey}`);
|
||||
console.log(tweetData);
|
||||
} catch (error) {
|
||||
console.error(`[Upload aborted] requestId=${requestId} key=${uploadKey}`, error);
|
||||
console.error(error);
|
||||
return status(500, "Failed to fetch tweet data");
|
||||
} finally {
|
||||
inFlightUploads.delete(uploadKey);
|
||||
}
|
||||
} else {
|
||||
return status(400, "어...");
|
||||
}
|
||||
return "아...";
|
||||
}, {
|
||||
body: t.Object({
|
||||
url: t.String(),
|
||||
tag: t.Optional(t.Array(t.String({default: "미분류"}))),
|
||||
author: t.Optional(t.String()),
|
||||
selected: t.Array(t.Boolean()),
|
||||
})
|
||||
})
|
||||
|
||||
.get("/fetch/tweet", async ({ query, status }) => {
|
||||
try {
|
||||
const tweetData = await fetchTweetData(query.url);
|
||||
return tweetData;
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
return status(500, "Failed to fetch tweet data");
|
||||
}
|
||||
}, {
|
||||
query: t.Object({
|
||||
url: t.String(),
|
||||
})
|
||||
})
|
||||
|
||||
.listen(config.server.port)
|
||||
;
|
||||
|
||||
console.log(
|
||||
`🎀 Elysia is running at http://${app.server?.hostname}:${app.server?.port}`
|
||||
);
|
||||
Loading…
Add table
Add a link
Reference in a new issue