import Database from "better-sqlite3"; import fs from "fs"; // import { wechatPush } from "./utils.js"; class SQLiteMessageQueue { constructor() { // this.db = new Database("message_queue.db"); this.db = new Database("spider_data.db"); this.init(); this.setupGracefulShutdown(); } init() { this.db.exec(` CREATE TABLE IF NOT EXISTS announcements ( id TEXT PRIMARY KEY, spider_name TEXT NOT NULL, name TEXT NOT NULL, publish_time TEXT, end_time TEXT, urls TEXT, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT ) `); this.db.exec(` CREATE TABLE IF NOT EXISTS messages ( id TEXT PRIMARY KEY, spider_name TEXT NOT NULL, data TEXT NOT NULL, timestamp TEXT NOT NULL, status TEXT DEFAULT 'pending', sent_at TEXT, error_message TEXT ) `); this.db.exec(` CREATE INDEX IF NOT EXISTS idx_announcements_spider ON announcements(spider_name); CREATE INDEX IF NOT EXISTS idx_announcements_time ON announcements(publish_time); CREATE INDEX IF NOT EXISTS idx_announcements_created ON announcements(created_at); CREATE INDEX IF NOT EXISTS idx_status ON messages(status); CREATE INDEX IF NOT EXISTS idx_spider_status ON messages(spider_name, status); CREATE INDEX IF NOT EXISTS idx_timestamp ON messages(timestamp); `); this.insertAnnouncementStmt = this.db.prepare(` INSERT OR REPLACE INTO announcements (id, spider_name, name, publish_time, end_time, urls, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) `); this.getAnnouncementStmt = this.db.prepare(` SELECT * FROM announcements WHERE id = ? `); this.getAnnouncementsBySpiderStmt = this.db.prepare(` SELECT * FROM announcements WHERE spider_name = ? ORDER BY created_at DESC `); this.checkAnnouncementExistsStmt = this.db.prepare(` SELECT COUNT(*) as count FROM announcements WHERE id = ? `); // 预编译SQL语句(提高性能) this.insertStmt = this.db.prepare(` INSERT INTO messages (id, spider_name, data, timestamp, status) VALUES (?, ?, ?, ?, ?) `); this.getPendingStmt = this.db.prepare(` SELECT * FROM messages WHERE status = 'pending' ORDER BY timestamp ASC `); this.getFailedStmt = this.db.prepare(` SELECT * FROM messages WHERE status = 'failed' ORDER BY timestamp ASC `); this.updateStatusStmt = this.db.prepare(` UPDATE messages SET status = ?, sent_at = ?, error_message = ? WHERE id = ? `); } // safeExecute(methodName, operation, ...args) { // } saveAnnouncement(spiderName, announcement) { const now = new Date().toISOString(); const isNew = !this.isAnnouncementExists(announcement.id); this.insertAnnouncementStmt.run( announcement.id, spiderName, announcement.name, announcement.publishTime, announcement.endTime, announcement.urls, isNew ? now : this.getAnnouncement(announcement.id)?.created_at || now, now ); return isNew; } /** * 批量保存公告并返回新公告 */ saveAnnouncements(spiderName, announcements) { const newAnnouncements = []; // 使用事务提高性能 const saveMany = this.db.transaction((announcements) => { for (const announcement of announcements) { const isNew = this.saveAnnouncement(spiderName, announcement); if (isNew) { newAnnouncements.push(announcement); } } }); saveMany(announcements); console.log(`💾 ${spiderName}: 保存 ${announcements.length} 条公告`); return newAnnouncements; } /** * 检查公告是否存在 */ isAnnouncementExists(announcementId) { const result = this.checkAnnouncementExistsStmt.get(announcementId); return result.count > 0; } /** * 获取单个公告 */ getAnnouncement(id) { return this.getAnnouncementStmt.get(id); } /** * 获取指定爬虫的所有公告 */ getAnnouncementsBySpider(spiderName) { return this.getAnnouncementsBySpiderStmt.all(spiderName); } /** * 根据 spiderName 删除其所有公告 */ deleteAnnouncementsBySpider(spiderName) { const stmt = this.db.prepare(`DELETE FROM announcements WHERE spider_name = ?`); const info = stmt.run(spiderName); console.log(`🗑️ 删除 ${spiderName} 的公告,共删除 ${info.changes} 条`); return info.changes; } /** * 过滤出新公告 */ filterNewAnnouncements(spiderName, announcements) { return announcements.filter( (announcement) => !this.isAnnouncementExists(announcement.id) ); } // ============= // 消息队列相关方法 // ============= addMessage(spiderName, data) { const message = { id: Date.now() + "-" + Math.random().toString(36).substr(2, 9), spider_name: spiderName, data: JSON.stringify(data), timestamp: new Date().toISOString(), status: "pending", }; this.insertStmt.run( message.id, message.spider_name, message.data, message.timestamp, message.status ); // wechatPush(spiderName, data); console.log(`📤 添加消息到队列: ${spiderName} - ${data.length} 条数据`); return message.id; } getPendingMessages() { const rows = this.getPendingStmt.all(); return rows.map((row) => ({ ...row, data: JSON.parse(row.data), })); } getFailedMessages() { const rows = this.getFailedStmt.all(); return rows.map((row) => ({ ...row, data: JSON.parse(row.data), })); } updateMessageStatus(id, status, sentAt = null, errorMessage = null) { this.updateStatusStmt.run(status, sentAt, errorMessage, id); } migrateFromJsonFile(spiderName, jsonFilePath) { try { if (!fs.existsSync(jsonFilePath)) { console.log(`📁 ${jsonFilePath} 不存在,跳过迁移`); return 0; } const data = JSON.parse(fs.readFileSync(jsonFilePath, "utf-8")); if (!Array.isArray(data) || data.length === 0) { console.log(`📁 ${jsonFilePath} 数据为空,跳过迁移`); return 0; } const migrateMany = this.db.transaction((announcements) => { for (const announcement of announcements) { this.saveAnnouncement(spiderName, announcement); } }); migrateMany(data); console.log(`🔄 成功迁移 ${data.length} 条 ${spiderName} 数据到数据库`); return data.length; } catch (error) { console.error(`❌ 迁移 ${jsonFilePath} 失败:`, error); return 0; } } cleanOldMessages(daysBefore = 30) { const cutoffDate = new Date(); cutoffDate.setDate(cutoffDate.getDate() - daysBefore); const stmt = this.db.prepare(` DELETE FROM messages WHERE status = 'sent' AND sent_at < ? `); const result = stmt.run(cutoffDate.toISOString()); console.log(`🧹 清理了 ${result.changes} 条旧消息`); } /** * 获取统计信息 */ getStats() { const stats = {}; // 按爬虫统计公告数量 const announcementStats = this.db .prepare( ` SELECT spider_name, COUNT(*) as count FROM announcements GROUP BY spider_name ` ).all() // .prepare(` // SELECT spider_name, name // FROM announcements WHERE spider_name = '吉利' // `) // .all(); // 消息状态统计(status == pending) const messageStats = this.db .prepare( ` SELECT status, data, sent_at FROM messages WHERE status = 'pending' ` ) .all(); stats.announcements = announcementStats; stats.messages = messageStats; return stats; } setupGracefulShutdown() { // 正常退出信号 process.on("SIGINT", () => { console.log("收到 SIGINT 信号,正在关闭数据库..."); this.close(); process.exit(0); }); // 终止信号 process.on("SIGTERM", () => { console.log("收到 SIGTERM 信号,正在关闭数据库..."); this.close(); process.exit(0); }); // 未捕获异常 process.on("uncaughtException", (error) => { console.error("未捕获异常:", error); this.close(); process.exit(1); }); // 未处理的Promise拒绝 process.on("unhandledRejection", (reason, promise) => { console.error("未处理的Promise拒绝:", reason); this.close(); process.exit(1); }); } // 关闭数据库连接 close() { this.db.close(); } } export { SQLiteMessageQueue };