// msgQueue.js - 基于事件的消息队列 import { EventEmitter } from "events"; import fs from "fs"; import path from "path"; // import { EmailSender } from "./mailer.js"; import { sendQYWechatMessage } from "../utils.js" import { SQLiteMessageQueue } from "../sqlite.js"; import { md5 } from "../utils.js"; import axios from "axios"; class MessageQueue extends EventEmitter { constructor() { super(); this.queue = new SQLiteMessageQueue(); this.processing = false; // this.queueFile = path.resolve("message_queue.json");K // this.emailSender = new EmailSender({ // host: "smtp.exmail.qq.com", // port: 465, // secure: true, // auth: { // user: "jiqiren@axbbaoxian.com", // pass: "Am13579q", // }, // }); this.recipients = [ "huzhengrong@axbbaoxian.com", ]; // 启动处理器 this.startProcessor(); // 启动失败消息重试机制 this.startFailedMessageRetry(); } // 添加消息到队列 // 处理队列 async startProcessor() { // 计算到明天上午9点的时间间隔 const now = new Date(); const nextRun = new Date(); nextRun.setHours(9, 0, 0, 0); // 设置为今天的上午9点 // 如果当前时间已经过了今天的9点,则设置为明天的9点 if (now > nextRun) { nextRun.setDate(nextRun.getDate() + 1); } const timeUntilNextRun = nextRun - now; console.log(`消息队列处理器将在 ${nextRun.toString()} 开始执行`); // 第一次执行使用setTimeout setTimeout(async () => { try { console.log("开始处理队列"); const pendingMessages = this.queue.getPendingMessages(); if (!this.processing && pendingMessages.length > 0) { await this.processQueue(pendingMessages); } } catch (error) { console.error(`❌ 获取待处理消息失败:`, error); } // 之后每天执行一次 this.scheduleDailyProcessing(); }, timeUntilNextRun); } // 每天上午9点执行消息处理 scheduleDailyProcessing() { // 每天间隔24小时执行一次 setInterval(async () => { try { console.log("开始处理队列"); const pendingMessages = this.queue.getPendingMessages(); if (!this.processing && pendingMessages.length > 0) { await this.processQueue(pendingMessages); } } catch (error) { console.error(`❌ 获取待处理消息失败:`, error); } }, 24 * 60 * 60 * 1000); // 24小时 console.log("已设置每天上午9点执行消息处理"); } async processQueue(pendingMessages) { this.processing = true; let msgMap = {}; // 用于跟踪消息是否是重试的 let isRetryMap = {}; for (const message of pendingMessages) { try { console.log(`📧 处理消息: ${message.spider_name}`); // console.log(typeof message.data); // let formdata = JSON.parse(message.data); if (!msgMap[message.spider_name]) { msgMap[message.spider_name] = message.data; isRetryMap[message.spider_name] = message.status === 'failed'; } else { msgMap[message.spider_name].push(...message.data); } // 只有当消息不是重试消息时,才更新状态为sent // 如果是重试消息且成功发送,则删除错误信息 if (message.status !== 'failed') { message.status = "sent"; message.sent_at = new Date().toISOString(); this.queue.updateMessageStatus( message.id, message.status, message.sent_at ); } else { // 重试成功,清除错误信息 message.status = "sent"; message.sent_at = new Date().toISOString(); this.queue.updateMessageStatus( message.id, message.status, message.sent_at, null // 清除错误信息 ); } } catch (error) { console.error(`❌ 消息处理失败: ${message.id}`, error); message.status = "failed"; message.error_message = error.message; this.queue.updateMessageStatus( message.id, message.status, null, // message.sent_at, message.error_message ); } } // 消息发送部分添加延迟,确保每分钟不超过20条消息(每条间隔至少3秒) for (const spiderName in msgMap) { try { let sendResult = await sendQYWechatMessage(this.generateTable(spiderName, msgMap[spiderName])) console.log(`✅ 通知发送成功: ${JSON.stringify(sendResult)}`); // 添加3秒延迟以满足腾讯文档的频率限制(每分钟不超过20条消息) await new Promise(resolve => setTimeout(resolve, 3000)); } catch (error) { console.error(`❌ 通知发送失败: ${error}`); } } this.processing = false; } generateTable(spiderName, data) { let tableHtml =` 各位好, 本次检测发现了${data.length}条新增招标信息,详情如下: ` data.forEach((item, index) => { tableHtml += ` 【${index+1}】 保司:${spiderName} 招标标题:${item.name} 详情链接:[链接](${item.urls}) `; }); return tableHtml; } getSign(timestamp) { let secret = "cpwyyds"; let uri = "/common/message/push"; const url = uri + timestamp + secret; const myCalc = md5(url); let sign = myCalc.substring(5, 13) + myCalc.substring(29, 31) + myCalc.substring(18, 27); //sign 转大写 sign = sign.toUpperCase(); return sign; } // 启动失败消息重试机制 startFailedMessageRetry() { // 每5分钟检查一次失败的消息并重试 setInterval(async () => { try { console.log("检查失败消息并重试..."); const failedMessages = this.queue.getFailedMessages(); if (failedMessages.length > 0) { console.log(`发现 ${failedMessages.length} 条失败消息,尝试重试发送...`); // 重新处理失败的消息 await this.processQueue(failedMessages); } } catch (error) { console.error(`❌ 失败消息重试失败:`, error); } }, 5 * 60 * 1000); // 每5分钟检查一次 } } const messageQueue = new MessageQueue(); export { messageQueue }; // export default MessageQueue;