insurance-spider/service/msgManager.js

211 lines
6.2 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// msgQueue.js - 基于事件的消息队列
import { EventEmitter } from "events";
import fs from "fs";
import path from "path";
// import { EmailSender } from "./mailer.js";
import { sendQYWechatMessage } from "../utils.js"
import { SQLiteMessageQueue } from "../sqlite.js";
import { md5 } from "../utils.js";
import axios from "axios";
class MessageQueue extends EventEmitter {
constructor() {
super();
this.queue = new SQLiteMessageQueue();
this.processing = false;
// this.queueFile = path.resolve("message_queue.json");K
// this.emailSender = new EmailSender({
// host: "smtp.exmail.qq.com",
// port: 465,
// secure: true,
// auth: {
// user: "jiqiren@axbbaoxian.com",
// pass: "Am13579q",
// },
// });
this.recipients = [
"huzhengrong@axbbaoxian.com",
];
// 启动处理器
this.startProcessor();
// 启动失败消息重试机制
this.startFailedMessageRetry();
}
// 添加消息到队列
// 处理队列
async startProcessor() {
// 计算到明天上午9点的时间间隔
const now = new Date();
const nextRun = new Date();
nextRun.setHours(9, 0, 0, 0); // 设置为今天的上午9点
// 如果当前时间已经过了今天的9点则设置为明天的9点
if (now > nextRun) {
nextRun.setDate(nextRun.getDate() + 1);
}
const timeUntilNextRun = nextRun - now;
console.log(`消息队列处理器将在 ${nextRun.toString()} 开始执行`);
// 第一次执行使用setTimeout
setTimeout(async () => {
try {
console.log("开始处理队列");
const pendingMessages = this.queue.getPendingMessages();
if (!this.processing && pendingMessages.length > 0) {
await this.processQueue(pendingMessages);
}
} catch (error) {
console.error(`❌ 获取待处理消息失败:`, error);
}
// 之后每天执行一次
this.scheduleDailyProcessing();
}, timeUntilNextRun);
}
// 每天上午9点执行消息处理
scheduleDailyProcessing() {
// 每天间隔24小时执行一次
setInterval(async () => {
try {
console.log("开始处理队列");
const pendingMessages = this.queue.getPendingMessages();
if (!this.processing && pendingMessages.length > 0) {
await this.processQueue(pendingMessages);
}
} catch (error) {
console.error(`❌ 获取待处理消息失败:`, error);
}
}, 24 * 60 * 60 * 1000); // 24小时
console.log("已设置每天上午9点执行消息处理");
}
async processQueue(pendingMessages) {
this.processing = true;
let msgMap = {};
// 用于跟踪消息是否是重试的
let isRetryMap = {};
for (const message of pendingMessages) {
try {
console.log(`📧 处理消息: ${message.spider_name}`);
// console.log(typeof message.data);
// let formdata = JSON.parse(message.data);
if (!msgMap[message.spider_name]) {
msgMap[message.spider_name] = message.data;
isRetryMap[message.spider_name] = message.status === 'failed';
} else {
msgMap[message.spider_name].push(...message.data);
}
// 只有当消息不是重试消息时才更新状态为sent
// 如果是重试消息且成功发送,则删除错误信息
if (message.status !== 'failed') {
message.status = "sent";
message.sent_at = new Date().toISOString();
this.queue.updateMessageStatus(
message.id,
message.status,
message.sent_at
);
} else {
// 重试成功,清除错误信息
message.status = "sent";
message.sent_at = new Date().toISOString();
this.queue.updateMessageStatus(
message.id,
message.status,
message.sent_at,
null // 清除错误信息
);
}
} catch (error) {
console.error(`❌ 消息处理失败: ${message.id}`, error);
message.status = "failed";
message.error_message = error.message;
this.queue.updateMessageStatus(
message.id,
message.status,
null,
// message.sent_at,
message.error_message
);
}
}
let html = "";
for (const spiderName in msgMap) {
html += this.generateTable(spiderName, msgMap[spiderName]);
}
try {
// this.emailSender.sendBulkEmail(this.recipients, "招标项目最新公告", html);
await sendQYWechatMessage(html)
} catch (error) {
console.error(`❌ 通知发送失败: ${error}`);
}
this.processing = false;
}
generateTable(spiderName, data) {
let tableHtml =`
各位好,
本次检测发现了${data.length}条新增招标信息,详情如下:
`
data.forEach((item, index) => {
tableHtml += `
${index+1}
保司:${spiderName}
招标标题:${item.name}
详情链接:[链接](${item.urls})
`;
});
return tableHtml;
}
getSign(timestamp) {
let secret = "cpwyyds";
let uri = "/common/message/push";
const url = uri + timestamp + secret;
const myCalc = md5(url);
let sign =
myCalc.substring(5, 13) +
myCalc.substring(29, 31) +
myCalc.substring(18, 27);
//sign 转大写
sign = sign.toUpperCase();
return sign;
}
// 启动失败消息重试机制
startFailedMessageRetry() {
// 每5分钟检查一次失败的消息并重试
setInterval(async () => {
try {
console.log("检查失败消息并重试...");
const failedMessages = this.queue.getFailedMessages();
if (failedMessages.length > 0) {
console.log(`发现 ${failedMessages.length} 条失败消息,尝试重试发送...`);
// 重新处理失败的消息
await this.processQueue(failedMessages);
}
} catch (error) {
console.error(`❌ 失败消息重试失败:`, error);
}
}, 5 * 60 * 1000); // 每5分钟检查一次
}
}
const messageQueue = new MessageQueue();
export { messageQueue };
// export default MessageQueue;