insurance-spider/msgManager.js

213 lines
7.2 KiB
JavaScript

// msgQueue.js - 基于事件的消息队列
import { EventEmitter } from "events";
import fs from "fs";
import path from "path";
import { EmailSender } from "./mailer.js";
import { SQLiteMessageQueue } from "./sqlite.js";
import { md5 } from "./utils.js";
import axios from "axios";
class MessageQueue extends EventEmitter {
constructor() {
super();
this.queue = new SQLiteMessageQueue();
this.processing = false;
// this.queueFile = path.resolve("message_queue.json");K
this.emailSender = new EmailSender({
host: "smtp.exmail.qq.com",
port: 465,
secure: true,
auth: {
user: "jiqiren@axbbaoxian.com",
pass: "Am13579q",
},
});
this.recipients = [
"huzhengrong@axbbaoxian.com",
];
// 启动处理器
this.startProcessor();
}
// 添加消息到队列
// 处理队列
async startProcessor() {
setInterval(async () => {
// 清除状态 不等于 pending的数据
console.log("开始处理队列");
try {
const pendingMessages = this.queue.getPendingMessages();
if (!this.processing && pendingMessages.length > 0) {
await this.processQueue(pendingMessages);
}
} catch (error) {
console.error(`❌ 获取待处理消息失败:`, error);
}
}, 60 * 60 * 1000); // 1h处理一次
}
async processQueue(pendingMessages) {
this.processing = true;
let msgMap = {};
for (const message of pendingMessages) {
try {
console.log(`📧 处理消息: ${message.spider_name}`);
// console.log(typeof message.data);
// let formdata = JSON.parse(message.data);
if (!msgMap[message.spider_name]) {
msgMap[message.spider_name] = message.data;
} else {
msgMap[message.spider_name].push(...message.data);
}
message.status = "sent";
message.sent_at = new Date().toISOString();
this.queue.updateMessageStatus(
message.id,
message.status,
message.sent_at
);
} catch (error) {
console.error(`❌ 消息处理失败: ${message.id}`, error);
message.status = "failed";
message.error_message = error.message;
this.queue.updateMessageStatus(
message.id,
message.status,
null,
// message.sent_at,
message.error_message
);
}
}
let html = "";
for (const spiderName in msgMap) {
html += this.generateTable(spiderName, msgMap[spiderName]);
}
try {
this.emailSender.sendBulkEmail(this.recipients, "招标项目最新公告", html);
} catch (error) {
console.error(`❌ 通知发送失败: ${error}`);
}
this.processing = false;
}
generateTable(spiderName, data) {
let tableHtml = `
<div style="margin: 30px 0; font-family: Arial, sans-serif;">
<h2 style="color: #2c3e50; border-bottom: 3px solid #3498db; padding-bottom: 10px; margin-bottom: 20px;">
🕷️ ${spiderName} (${data.length} 条新增)
</h2>
<div style="overflow-x: auto; box-shadow: 0 2px 8px rgba(0,0,0,0.1); border-radius: 8px; margin-bottom: 20px;">
<table style="width: 100%; border-collapse: collapse; background: white; min-width: 800px;">
<thead>
<tr style="background: linear-gradient(135deg, #3498db 0%, #2980b9 100%); color: white;">
<th style="border: 1px solid #ddd; padding: 12px 8px; text-align: left; font-weight: bold; width: 50px;">序号</th>
<th style="border: 1px solid #ddd; padding: 12px 8px; text-align: left; font-weight: bold; min-width: 300px;">项目名称</th>
<th style="border: 1px solid #ddd; padding: 12px 8px; text-align: left; font-weight: bold; width: 140px;">发布时间</th>
<th style="border: 1px solid #ddd; padding: 12px 8px; text-align: left; font-weight: bold; width: 140px;">截止时间</th>
<th style="border: 1px solid #ddd; padding: 12px 8px; text-align: left; font-weight: bold; width: 100px;">查看详情</th>
</tr>
</thead>
<tbody>
`;
data.forEach((item, index) => {
const rowColor = index % 2 === 0 ? "#f8f9fa" : "white";
// const publishTime = this.formatDateTime(item.publishTime);
// const endTime = this.formatDateTime(item.endTime);
const urls = this.formatUrls(item.urls);
tableHtml += `
<tr style="background-color: ${rowColor}; border-bottom: 1px solid #eee;">
<td style="border: 1px solid #ddd; padding: 10px 8px; text-align: center; font-weight: bold; color: #666;">
${index + 1}
</td>
<td style="border: 1px solid #ddd; padding: 10px 8px; line-height: 1.4;">
<div style="font-weight: 500; color: #2c3e50; margin-bottom: 4px;">
${item.name}
</div>
</td>
<td style="border: 1px solid #ddd; padding: 10px 8px; color: #495057;">
${item.publishTime}
</td>
<td style="border: 1px solid #ddd; padding: 10px 8px; color: #495057;">
<div>${item.endTime}</div>
</td>
<td style="border: 1px solid #ddd; padding: 10px 8px; text-align: center;">
${urls}
</td>
</tr>
`;
});
tableHtml += `
</tbody>
</table>
</div>
</div>
`;
return tableHtml;
}
getSign(timestamp) {
let secret = "cpwyyds";
let uri = "/common/message/push";
const url = uri + timestamp + secret;
const myCalc = md5(url);
let sign =
myCalc.substring(5, 13) +
myCalc.substring(29, 31) +
myCalc.substring(18, 27);
//sign 转大写
sign = sign.toUpperCase();
return sign;
}
formatUrls(urls) {
if (!urls) {
return '<span style="color: #6c757d;">无链接</span>';
}
// 处理数组形式的URLs
if (Array.isArray(urls)) {
if (urls.length === 0) {
return '<span style="color: #6c757d;">无链接</span>';
}
if (urls.length === 1) {
return `<a href="${urls[0]}" target="_blank" style="color: #007bff; text-decoration: none; padding: 6px 12px; background-color: #e3f2fd; border-radius: 4px; font-size: 12px; border: 1px solid #90caf9; display: inline-block;">📄 查看</a>`;
}
// 多个链接的情况
let linksHtml = '<div style="line-height: 1.6;">';
urls.forEach((url, index) => {
linksHtml += `<a href="${url}" target="_blank" style="color: #007bff; text-decoration: none; padding: 4px 8px; background-color: #e3f2fd; border-radius: 3px; font-size: 11px; margin: 2px; display: inline-block; border: 1px solid #90caf9;">📄 链接${
index + 1
}</a>`;
});
linksHtml += "</div>";
return linksHtml;
}
// 处理字符串形式的URL
if (typeof urls === "string") {
return `<a href="${urls}" target="_blank" style="color: #007bff; text-decoration: none; padding: 6px 12px; background-color: #e3f2fd; border-radius: 4px; font-size: 12px; border: 1px solid #90caf9; display: inline-block;">📄 查看</a>`;
}
return '<span style="color: #6c757d;">链接格式错误</span>';
}
}
const messageQueue = new MessageQueue();
export { messageQueue };
// export default MessageQueue;