394 lines
11 KiB
JavaScript
394 lines
11 KiB
JavaScript
import Database from "better-sqlite3";
|
||
import fs from "fs";
|
||
// import { wechatPush } from "./utils.js";
|
||
|
||
class SQLiteMessageQueue {
|
||
constructor() {
|
||
// this.db = new Database("message_queue.db");
|
||
this.db = new Database("spider_data.db");
|
||
this.init();
|
||
this.setupGracefulShutdown();
|
||
}
|
||
init() {
|
||
this.db.exec(`
|
||
CREATE TABLE IF NOT EXISTS announcements (
|
||
id TEXT PRIMARY KEY,
|
||
spider_name TEXT NOT NULL,
|
||
name TEXT NOT NULL,
|
||
publish_time TEXT,
|
||
end_time TEXT,
|
||
urls TEXT,
|
||
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TEXT
|
||
)
|
||
`);
|
||
|
||
this.db.exec(`
|
||
CREATE TABLE IF NOT EXISTS messages (
|
||
id TEXT PRIMARY KEY,
|
||
spider_name TEXT NOT NULL,
|
||
data TEXT NOT NULL,
|
||
timestamp TEXT NOT NULL,
|
||
status TEXT DEFAULT 'pending',
|
||
sent_at TEXT,
|
||
error_message TEXT
|
||
)
|
||
`);
|
||
this.db.exec(`
|
||
CREATE INDEX IF NOT EXISTS idx_announcements_spider ON announcements(spider_name);
|
||
CREATE INDEX IF NOT EXISTS idx_announcements_time ON announcements(publish_time);
|
||
CREATE INDEX IF NOT EXISTS idx_announcements_created ON announcements(created_at);
|
||
CREATE INDEX IF NOT EXISTS idx_status ON messages(status);
|
||
CREATE INDEX IF NOT EXISTS idx_spider_status ON messages(spider_name, status);
|
||
CREATE INDEX IF NOT EXISTS idx_timestamp ON messages(timestamp);
|
||
`);
|
||
|
||
this.insertAnnouncementStmt = this.db.prepare(`
|
||
INSERT OR REPLACE INTO announcements
|
||
(id, spider_name, name, publish_time, end_time, urls, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
`);
|
||
|
||
this.getAnnouncementStmt = this.db.prepare(`
|
||
SELECT * FROM announcements WHERE id = ?
|
||
`);
|
||
|
||
this.getAnnouncementsBySpiderStmt = this.db.prepare(`
|
||
SELECT * FROM announcements WHERE spider_name = ?
|
||
ORDER BY created_at DESC
|
||
`);
|
||
|
||
this.checkAnnouncementExistsStmt = this.db.prepare(`
|
||
SELECT COUNT(*) as count FROM announcements WHERE id = ?
|
||
`);
|
||
|
||
// 预编译SQL语句(提高性能)
|
||
this.insertStmt = this.db.prepare(`
|
||
INSERT INTO messages (id, spider_name, data, timestamp, status)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
`);
|
||
|
||
this.getPendingStmt = this.db.prepare(`
|
||
SELECT * FROM messages WHERE status = 'pending'
|
||
ORDER BY timestamp ASC
|
||
`);
|
||
|
||
this.getFailedStmt = this.db.prepare(`
|
||
SELECT * FROM messages WHERE status = 'failed'
|
||
ORDER BY timestamp ASC
|
||
`);
|
||
|
||
this.updateStatusStmt = this.db.prepare(`
|
||
UPDATE messages
|
||
SET status = ?, sent_at = ?, error_message = ?
|
||
WHERE id = ?
|
||
`);
|
||
}
|
||
// safeExecute(methodName, operation, ...args) {
|
||
// }
|
||
saveAnnouncement(spiderName, announcement) {
|
||
const now = new Date().toISOString();
|
||
const isNew = !this.isAnnouncementExists(announcement.id);
|
||
|
||
this.insertAnnouncementStmt.run(
|
||
announcement.id,
|
||
spiderName,
|
||
announcement.name,
|
||
announcement.publishTime,
|
||
announcement.endTime,
|
||
announcement.urls,
|
||
isNew ? now : this.getAnnouncement(announcement.id)?.created_at || now,
|
||
now
|
||
);
|
||
|
||
return isNew;
|
||
}
|
||
/**
|
||
* 批量保存公告并返回新公告
|
||
*/
|
||
saveAnnouncements(spiderName, announcements) {
|
||
const newAnnouncements = [];
|
||
|
||
// 使用事务提高性能
|
||
const saveMany = this.db.transaction((announcements) => {
|
||
for (const announcement of announcements) {
|
||
const isNew = this.saveAnnouncement(spiderName, announcement);
|
||
if (isNew) {
|
||
newAnnouncements.push(announcement);
|
||
}
|
||
}
|
||
});
|
||
|
||
saveMany(announcements);
|
||
|
||
console.log(`💾 ${spiderName}: 保存 ${announcements.length} 条公告`);
|
||
return newAnnouncements;
|
||
}
|
||
/**
|
||
* 检查公告是否存在
|
||
*/
|
||
isAnnouncementExists(announcementId) {
|
||
const result = this.checkAnnouncementExistsStmt.get(announcementId);
|
||
return result.count > 0;
|
||
}
|
||
|
||
/**
|
||
* 获取单个公告
|
||
*/
|
||
getAnnouncement(id) {
|
||
return this.getAnnouncementStmt.get(id);
|
||
}
|
||
|
||
/**
|
||
* 获取指定爬虫的所有公告
|
||
*/
|
||
getAnnouncementsBySpider(spiderName) {
|
||
return this.getAnnouncementsBySpiderStmt.all(spiderName);
|
||
}
|
||
/**
|
||
* 根据 spiderName 删除其所有公告
|
||
*/
|
||
deleteAnnouncementsBySpider(spiderName) {
|
||
const stmt = this.db.prepare(`DELETE FROM announcements WHERE spider_name = ?`);
|
||
const info = stmt.run(spiderName);
|
||
console.log(`🗑️ 删除 ${spiderName} 的公告,共删除 ${info.changes} 条`);
|
||
return info.changes;
|
||
}
|
||
/**
|
||
* 过滤出新公告
|
||
*/
|
||
filterNewAnnouncements(spiderName, announcements) {
|
||
return announcements.filter(
|
||
(announcement) => !this.isAnnouncementExists(announcement.id)
|
||
);
|
||
}
|
||
|
||
// =============
|
||
// 消息队列相关方法
|
||
// =============
|
||
|
||
addMessage(spiderName, data) {
|
||
// 生成完整的消息内容
|
||
const fullData = JSON.stringify(data);
|
||
|
||
// 企业微信消息的最大长度是4096字节,一个中文字符三个字节
|
||
const MAX_LENGTH = 1300;
|
||
|
||
// 如果消息长度在限制内,直接添加
|
||
if (fullData.length <= MAX_LENGTH) {
|
||
const message = {
|
||
id: Date.now() + "-" + Math.random().toString(36).substr(2, 9),
|
||
spider_name: spiderName,
|
||
data: fullData,
|
||
timestamp: new Date().toISOString(),
|
||
status: "pending",
|
||
};
|
||
this.insertStmt.run(
|
||
message.id,
|
||
message.spider_name,
|
||
message.data,
|
||
message.timestamp,
|
||
message.status
|
||
);
|
||
console.log(`📤 添加消息到队列: ${spiderName} - ${data.length} 条数据`);
|
||
return message.id;
|
||
}
|
||
|
||
// 如果消息太长,需要分割处理
|
||
console.log(`📝 消息过长 (${fullData.length} 字符),正在进行分割处理...`);
|
||
|
||
// 分割数据数组以确保每条消息不超过限制
|
||
const messageIds = [];
|
||
let currentChunk = [];
|
||
let currentChunkStr = "[]"; // 空数组的JSON表示
|
||
|
||
for (let i = 0; i < data.length; i++) {
|
||
const item = data[i];
|
||
const itemStr = JSON.stringify([item]); // 包含方括号
|
||
// 计算添加当前项后的新长度(减去2是因为移除了空数组的方括号)
|
||
const newLength = currentChunkStr.length + itemStr.length - 2;
|
||
|
||
// 如果添加当前项会超出限制,先保存当前块
|
||
if (newLength > MAX_LENGTH && currentChunk.length > 0) {
|
||
// 保存当前块
|
||
const message = {
|
||
id: Date.now() + "-" + messageIds.length + "-" + Math.random().toString(36).substr(2, 9),
|
||
spider_name: spiderName,
|
||
data: currentChunkStr,
|
||
timestamp: new Date().toISOString(),
|
||
status: "pending",
|
||
};
|
||
this.insertStmt.run(
|
||
message.id,
|
||
message.spider_name,
|
||
message.data,
|
||
message.timestamp,
|
||
message.status
|
||
);
|
||
messageIds.push(message.id);
|
||
|
||
// 开始新的块
|
||
currentChunk = [item];
|
||
currentChunkStr = JSON.stringify(currentChunk);
|
||
} else {
|
||
// 添加当前项到块中
|
||
currentChunk.push(item);
|
||
currentChunkStr = JSON.stringify(currentChunk);
|
||
}
|
||
}
|
||
|
||
// 保存最后一个块(如果有数据)
|
||
if (currentChunk.length > 0) {
|
||
const message = {
|
||
id: Date.now() + "-" + messageIds.length + "-" + Math.random().toString(36).substr(2, 9),
|
||
spider_name: spiderName,
|
||
data: currentChunkStr,
|
||
timestamp: new Date().toISOString(),
|
||
status: "pending",
|
||
};
|
||
this.insertStmt.run(
|
||
message.id,
|
||
message.spider_name,
|
||
message.data,
|
||
message.timestamp,
|
||
message.status
|
||
);
|
||
messageIds.push(message.id);
|
||
}
|
||
|
||
console.log(`📤 添加消息到队列: ${spiderName} - ${data.length} 条数据,分割为 ${messageIds.length} 条消息`);
|
||
return messageIds;
|
||
}
|
||
|
||
getPendingMessages() {
|
||
const rows = this.getPendingStmt.all();
|
||
return rows.map((row) => ({
|
||
...row,
|
||
data: JSON.parse(row.data),
|
||
}));
|
||
}
|
||
|
||
getFailedMessages() {
|
||
const rows = this.getFailedStmt.all();
|
||
return rows.map((row) => ({
|
||
...row,
|
||
data: JSON.parse(row.data),
|
||
}));
|
||
}
|
||
|
||
updateMessageStatus(id, status, sentAt = null, errorMessage = null) {
|
||
this.updateStatusStmt.run(status, sentAt, errorMessage, id);
|
||
}
|
||
migrateFromJsonFile(spiderName, jsonFilePath) {
|
||
try {
|
||
if (!fs.existsSync(jsonFilePath)) {
|
||
console.log(`📁 ${jsonFilePath} 不存在,跳过迁移`);
|
||
return 0;
|
||
}
|
||
|
||
const data = JSON.parse(fs.readFileSync(jsonFilePath, "utf-8"));
|
||
if (!Array.isArray(data) || data.length === 0) {
|
||
console.log(`📁 ${jsonFilePath} 数据为空,跳过迁移`);
|
||
return 0;
|
||
}
|
||
|
||
const migrateMany = this.db.transaction((announcements) => {
|
||
for (const announcement of announcements) {
|
||
this.saveAnnouncement(spiderName, announcement);
|
||
}
|
||
});
|
||
|
||
migrateMany(data);
|
||
console.log(`🔄 成功迁移 ${data.length} 条 ${spiderName} 数据到数据库`);
|
||
return data.length;
|
||
} catch (error) {
|
||
console.error(`❌ 迁移 ${jsonFilePath} 失败:`, error);
|
||
return 0;
|
||
}
|
||
}
|
||
cleanOldMessages(daysBefore = 30) {
|
||
const cutoffDate = new Date();
|
||
cutoffDate.setDate(cutoffDate.getDate() - daysBefore);
|
||
|
||
const stmt = this.db.prepare(`
|
||
DELETE FROM messages
|
||
WHERE status = 'sent' AND sent_at < ?
|
||
`);
|
||
|
||
const result = stmt.run(cutoffDate.toISOString());
|
||
console.log(`🧹 清理了 ${result.changes} 条旧消息`);
|
||
}
|
||
|
||
/**
|
||
* 获取统计信息
|
||
*/
|
||
getStats() {
|
||
const stats = {};
|
||
|
||
// 按爬虫统计公告数量
|
||
const announcementStats = this.db
|
||
.prepare(
|
||
`
|
||
SELECT spider_name, COUNT(*) as count
|
||
FROM announcements
|
||
GROUP BY spider_name
|
||
`
|
||
).all()
|
||
// .prepare(`
|
||
// SELECT spider_name, name
|
||
// FROM announcements WHERE spider_name = '吉利'
|
||
// `)
|
||
// .all();
|
||
|
||
// 消息状态统计(status == pending)
|
||
const messageStats = this.db
|
||
.prepare(
|
||
`
|
||
SELECT status, data, sent_at
|
||
FROM messages WHERE status = 'pending'
|
||
`
|
||
)
|
||
.all();
|
||
|
||
stats.announcements = announcementStats;
|
||
stats.messages = messageStats;
|
||
|
||
return stats;
|
||
}
|
||
setupGracefulShutdown() {
|
||
// 正常退出信号
|
||
process.on("SIGINT", () => {
|
||
console.log("收到 SIGINT 信号,正在关闭数据库...");
|
||
this.close();
|
||
process.exit(0);
|
||
});
|
||
|
||
// 终止信号
|
||
process.on("SIGTERM", () => {
|
||
console.log("收到 SIGTERM 信号,正在关闭数据库...");
|
||
this.close();
|
||
process.exit(0);
|
||
});
|
||
|
||
// 未捕获异常
|
||
process.on("uncaughtException", (error) => {
|
||
console.error("未捕获异常:", error);
|
||
this.close();
|
||
process.exit(1);
|
||
});
|
||
|
||
// 未处理的Promise拒绝
|
||
process.on("unhandledRejection", (reason, promise) => {
|
||
console.error("未处理的Promise拒绝:", reason);
|
||
this.close();
|
||
process.exit(1);
|
||
});
|
||
}
|
||
// 关闭数据库连接
|
||
close() {
|
||
this.db.close();
|
||
}
|
||
}
|
||
|
||
export { SQLiteMessageQueue };
|