// msgQueue.js - 基于事件的消息队列 import { EventEmitter } from "events"; import fs from "fs"; import path from "path"; // import { EmailSender } from "./mailer.js"; import { sendQYWechatMessage } from "../utils.js" import { SQLiteMessageQueue } from "../sqlite.js"; import { md5 } from "../utils.js"; import axios from "axios"; class MessageQueue extends EventEmitter { constructor() { super(); this.queue = new SQLiteMessageQueue(); this.processing = false; // this.queueFile = path.resolve("message_queue.json");K // this.emailSender = new EmailSender({ // host: "smtp.exmail.qq.com", // port: 465, // secure: true, // auth: { // user: "jiqiren@axbbaoxian.com", // pass: "Am13579q", // }, // }); this.recipients = [ "huzhengrong@axbbaoxian.com", ]; // 启动处理器 this.startProcessor(); } // 添加消息到队列 // 处理队列 async startProcessor() { // 计算到明天上午9点的时间间隔 const now = new Date(); const nextRun = new Date(); nextRun.setHours(9, 0, 0, 0); // 设置为今天的上午9点 // 如果当前时间已经过了今天的9点,则设置为明天的9点 if (now > nextRun) { nextRun.setDate(nextRun.getDate() + 1); } const timeUntilNextRun = nextRun - now; console.log(`消息队列处理器将在 ${nextRun.toString()} 开始执行`); // 第一次执行使用setTimeout setTimeout(async () => { try { console.log("开始处理队列"); const pendingMessages = this.queue.getPendingMessages(); if (!this.processing && pendingMessages.length > 0) { await this.processQueue(pendingMessages); } } catch (error) { console.error(`❌ 获取待处理消息失败:`, error); } // 之后每天执行一次 this.scheduleDailyProcessing(); }, timeUntilNextRun); } // 每天上午9点执行消息处理 scheduleDailyProcessing() { // 每天间隔24小时执行一次 setInterval(async () => { try { console.log("开始处理队列"); const pendingMessages = this.queue.getPendingMessages(); if (!this.processing && pendingMessages.length > 0) { await this.processQueue(pendingMessages); } } catch (error) { console.error(`❌ 获取待处理消息失败:`, error); } }, 24 * 60 * 60 * 1000); // 24小时 console.log("已设置每天上午9点执行消息处理"); } async processQueue(pendingMessages) { this.processing = true; let msgMap = {}; for (const message of pendingMessages) { try { console.log(`📧 处理消息: ${message.spider_name}`); // console.log(typeof message.data); // let formdata = JSON.parse(message.data); if (!msgMap[message.spider_name]) { msgMap[message.spider_name] = message.data; } else { msgMap[message.spider_name].push(...message.data); } message.status = "sent"; message.sent_at = new Date().toISOString(); this.queue.updateMessageStatus( message.id, message.status, message.sent_at ); } catch (error) { console.error(`❌ 消息处理失败: ${message.id}`, error); message.status = "failed"; message.error_message = error.message; this.queue.updateMessageStatus( message.id, message.status, null, // message.sent_at, message.error_message ); } } let html = ""; for (const spiderName in msgMap) { html += this.generateTable(spiderName, msgMap[spiderName]); } try { // this.emailSender.sendBulkEmail(this.recipients, "招标项目最新公告", html); await sendQYWechatMessage(html) } catch (error) { console.error(`❌ 通知发送失败: ${error}`); } this.processing = false; } generateTable(spiderName, data) { let tableHtml =` 各位好, 本次检测发现了${data.length}条新增招标信息,详情如下: ` data.forEach((item, index) => { tableHtml += ` 【${index+1}】 保司:${spiderName} 招标标题:${item.name} 详情链接:[链接](${item.urls}) `; }); return tableHtml; } getSign(timestamp) { let secret = "cpwyyds"; let uri = "/common/message/push"; const url = uri + timestamp + secret; const myCalc = md5(url); let sign = myCalc.substring(5, 13) + myCalc.substring(29, 31) + myCalc.substring(18, 27); //sign 转大写 sign = sign.toUpperCase(); return sign; } formatUrls(urls) { if (!urls) { return '无链接'; } // 处理数组形式的URLs if (Array.isArray(urls)) { if (urls.length === 0) { return '无链接'; } if (urls.length === 1) { return `📄 查看`; } // 多个链接的情况 let linksHtml = '