// msgQueue.js - 基于事件的消息队列 import { EventEmitter } from "events"; import fs from "fs"; import path from "path"; // import { EmailSender } from "./mailer.js"; import { sendQYWechatMessage } from "./utils.js" import { SQLiteMessageQueue } from "./sqlite.js"; import { md5 } from "./utils.js"; import axios from "axios"; class MessageQueue extends EventEmitter { constructor() { super(); this.queue = new SQLiteMessageQueue(); this.processing = false; // this.queueFile = path.resolve("message_queue.json");K // this.emailSender = new EmailSender({ // host: "smtp.exmail.qq.com", // port: 465, // secure: true, // auth: { // user: "jiqiren@axbbaoxian.com", // pass: "Am13579q", // }, // }); this.recipients = [ "huzhengrong@axbbaoxian.com", ]; // 启动处理器 this.startProcessor(); } // 添加消息到队列 // 处理队列 async startProcessor() { setInterval(async () => { // 清除状态 不等于 pending的数据 console.log("开始处理队列"); try { const pendingMessages = this.queue.getPendingMessages(); if (!this.processing && pendingMessages.length > 0) { await this.processQueue(pendingMessages); } } catch (error) { console.error(`❌ 获取待处理消息失败:`, error); } }, 60 * 60 * 1000); // 1h处理一次 } async processQueue(pendingMessages) { this.processing = true; let msgMap = {}; for (const message of pendingMessages) { try { console.log(`📧 处理消息: ${message.spider_name}`); // console.log(typeof message.data); // let formdata = JSON.parse(message.data); if (!msgMap[message.spider_name]) { msgMap[message.spider_name] = message.data; } else { msgMap[message.spider_name].push(...message.data); } message.status = "sent"; message.sent_at = new Date().toISOString(); this.queue.updateMessageStatus( message.id, message.status, message.sent_at ); } catch (error) { console.error(`❌ 消息处理失败: ${message.id}`, error); message.status = "failed"; message.error_message = error.message; this.queue.updateMessageStatus( message.id, message.status, null, // message.sent_at, message.error_message ); } } let html = ""; for (const spiderName in msgMap) { html += this.generateTable(spiderName, msgMap[spiderName]); } try { // this.emailSender.sendBulkEmail(this.recipients, "招标项目最新公告", html); await sendQYWechatMessage(html) } catch (error) { console.error(`❌ 通知发送失败: ${error}`); } this.processing = false; } generateTable(spiderName, data) { let tableHtml =` 各位好, 本次检测发现了${data.length}条新增招标信息,详情如下: ` data.forEach((item, index) => { tableHtml += ` 【${index+1}】 保司:${spiderName} 招标标题:${item.name} 详情链接:[链接](${item.urls}) `; }); return tableHtml; } getSign(timestamp) { let secret = "cpwyyds"; let uri = "/common/message/push"; const url = uri + timestamp + secret; const myCalc = md5(url); let sign = myCalc.substring(5, 13) + myCalc.substring(29, 31) + myCalc.substring(18, 27); //sign 转大写 sign = sign.toUpperCase(); return sign; } formatUrls(urls) { if (!urls) { return '无链接'; } // 处理数组形式的URLs if (Array.isArray(urls)) { if (urls.length === 0) { return '无链接'; } if (urls.length === 1) { return `📄 查看`; } // 多个链接的情况 let linksHtml = '