insurance-spider/service/msgManager.js

206 lines
6.2 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// msgQueue.js - 基于事件的消息队列
import { EventEmitter } from "events";
import fs from "fs";
import path from "path";
// import { EmailSender } from "./mailer.js";
import { sendQYWechatMessage } from "../utils.js"
import { SQLiteMessageQueue } from "../sqlite.js";
import { md5 } from "../utils.js";
import axios from "axios";
class MessageQueue extends EventEmitter {
constructor() {
super();
this.queue = new SQLiteMessageQueue();
this.processing = false;
// this.queueFile = path.resolve("message_queue.json");K
// this.emailSender = new EmailSender({
// host: "smtp.exmail.qq.com",
// port: 465,
// secure: true,
// auth: {
// user: "jiqiren@axbbaoxian.com",
// pass: "Am13579q",
// },
// });
this.recipients = [
"huzhengrong@axbbaoxian.com",
];
// 启动处理器
this.startProcessor();
}
// 添加消息到队列
// 处理队列
async startProcessor() {
// 计算到明天上午9点的时间间隔
const now = new Date();
const nextRun = new Date();
nextRun.setHours(9, 0, 0, 0); // 设置为今天的上午9点
// 如果当前时间已经过了今天的9点则设置为明天的9点
if (now > nextRun) {
nextRun.setDate(nextRun.getDate() + 1);
}
const timeUntilNextRun = nextRun - now;
console.log(`消息队列处理器将在 ${nextRun.toString()} 开始执行`);
// 第一次执行使用setTimeout
setTimeout(async () => {
try {
console.log("开始处理队列");
const pendingMessages = this.queue.getPendingMessages();
if (!this.processing && pendingMessages.length > 0) {
await this.processQueue(pendingMessages);
}
} catch (error) {
console.error(`❌ 获取待处理消息失败:`, error);
}
// 之后每天执行一次
this.scheduleDailyProcessing();
}, timeUntilNextRun);
}
// 每天上午9点执行消息处理
scheduleDailyProcessing() {
// 每天间隔24小时执行一次
setInterval(async () => {
try {
console.log("开始处理队列");
const pendingMessages = this.queue.getPendingMessages();
if (!this.processing && pendingMessages.length > 0) {
await this.processQueue(pendingMessages);
}
} catch (error) {
console.error(`❌ 获取待处理消息失败:`, error);
}
}, 24 * 60 * 60 * 1000); // 24小时
console.log("已设置每天上午9点执行消息处理");
}
async processQueue(pendingMessages) {
this.processing = true;
let msgMap = {};
for (const message of pendingMessages) {
try {
console.log(`📧 处理消息: ${message.spider_name}`);
// console.log(typeof message.data);
// let formdata = JSON.parse(message.data);
if (!msgMap[message.spider_name]) {
msgMap[message.spider_name] = message.data;
} else {
msgMap[message.spider_name].push(...message.data);
}
message.status = "sent";
message.sent_at = new Date().toISOString();
this.queue.updateMessageStatus(
message.id,
message.status,
message.sent_at
);
} catch (error) {
console.error(`❌ 消息处理失败: ${message.id}`, error);
message.status = "failed";
message.error_message = error.message;
this.queue.updateMessageStatus(
message.id,
message.status,
null,
// message.sent_at,
message.error_message
);
}
}
let html = "";
for (const spiderName in msgMap) {
html += this.generateTable(spiderName, msgMap[spiderName]);
}
try {
// this.emailSender.sendBulkEmail(this.recipients, "招标项目最新公告", html);
await sendQYWechatMessage(html)
} catch (error) {
console.error(`❌ 通知发送失败: ${error}`);
}
this.processing = false;
}
generateTable(spiderName, data) {
let tableHtml =`
各位好,
本次检测发现了${data.length}条新增招标信息,详情如下:
`
data.forEach((item, index) => {
tableHtml += `
${index+1}
保司:${spiderName}
招标标题:${item.name}
详情链接:[链接](${item.urls})
`;
});
return tableHtml;
}
getSign(timestamp) {
let secret = "cpwyyds";
let uri = "/common/message/push";
const url = uri + timestamp + secret;
const myCalc = md5(url);
let sign =
myCalc.substring(5, 13) +
myCalc.substring(29, 31) +
myCalc.substring(18, 27);
//sign 转大写
sign = sign.toUpperCase();
return sign;
}
formatUrls(urls) {
if (!urls) {
return '<span style="color: #6c757d;">无链接</span>';
}
// 处理数组形式的URLs
if (Array.isArray(urls)) {
if (urls.length === 0) {
return '<span style="color: #6c757d;">无链接</span>';
}
if (urls.length === 1) {
return `<a href="${urls[0]}" target="_blank" style="color: #007bff; text-decoration: none; padding: 6px 12px; background-color: #e3f2fd; border-radius: 4px; font-size: 12px; border: 1px solid #90caf9; display: inline-block;">📄 查看</a>`;
}
// 多个链接的情况
let linksHtml = '<div style="line-height: 1.6;">';
urls.forEach((url, index) => {
linksHtml += `<a href="${url}" target="_blank" style="color: #007bff; text-decoration: none; padding: 4px 8px; background-color: #e3f2fd; border-radius: 3px; font-size: 11px; margin: 2px; display: inline-block; border: 1px solid #90caf9;">📄 链接${
index + 1
}</a>`;
});
linksHtml += "</div>";
return linksHtml;
}
// 处理字符串形式的URL
if (typeof urls === "string") {
return `<a href="${urls}" target="_blank" style="color: #007bff; text-decoration: none; padding: 6px 12px; background-color: #e3f2fd; border-radius: 4px; font-size: 12px; border: 1px solid #90caf9; display: inline-block;">📄 查看</a>`;
}
return '<span style="color: #6c757d;">链接格式错误</span>';
}
}
const messageQueue = new MessageQueue();
export { messageQueue };
// export default MessageQueue;