🐛 fix(msgManager): 修改消息处理逻辑以支持消息长度限制,添加分割处理功能

This commit is contained in:
huzhengrong 2025-10-28 20:22:50 +08:00
parent 103c023a4f
commit f58a70f84a
2 changed files with 97 additions and 24 deletions

View File

@ -96,14 +96,14 @@ class MessageQueue extends EventEmitter {
for (const message of pendingMessages) {
try {
console.log(`📧 处理消息: ${message.spider_name}`);
console.log(`📧 处理消息: ${message.id}-${message.spider_name}`);
// console.log(typeof message.data);
// let formdata = JSON.parse(message.data);
if (!msgMap[message.spider_name]) {
msgMap[message.spider_name] = message.data;
isRetryMap[message.spider_name] = message.status === 'failed';
if (!msgMap[message.id]) {
msgMap[message.id] = message.data;
isRetryMap[message.id] = message.status === 'failed';
} else {
msgMap[message.spider_name].push(...message.data);
msgMap[message.id].push(...message.data);
}
// 只有当消息不是重试消息时才更新状态为sent
@ -142,9 +142,9 @@ class MessageQueue extends EventEmitter {
}
// 消息发送部分添加延迟确保每分钟不超过20条消息每条间隔至少3秒
for (const spiderName in msgMap) {
for (const spiderId in msgMap) {
try {
let sendResult = await sendQYWechatMessage(this.generateTable(spiderName, msgMap[spiderName]))
let sendResult = await sendQYWechatMessage(this.generateTable(pendingMessages.find(item=>item.id===spiderId).spider_name, msgMap[spiderId]))
console.log(`✅ 通知发送成功: ${JSON.stringify(sendResult)}`);
// 添加3秒延迟以满足企微文档的频率限制每分钟不超过20条消息

107
sqlite.js
View File

@ -168,23 +168,96 @@ class SQLiteMessageQueue {
// =============
addMessage(spiderName, data) {
const message = {
id: Date.now() + "-" + Math.random().toString(36).substr(2, 9),
spider_name: spiderName,
data: JSON.stringify(data),
timestamp: new Date().toISOString(),
status: "pending",
};
this.insertStmt.run(
message.id,
message.spider_name,
message.data,
message.timestamp,
message.status
);
// wechatPush(spiderName, data);
console.log(`📤 添加消息到队列: ${spiderName} - ${data.length} 条数据`);
return message.id;
// 生成完整的消息内容
const fullData = JSON.stringify(data);
// 企业微信消息的最大长度是4096字符
const MAX_LENGTH = 4096;
// 如果消息长度在限制内,直接添加
if (fullData.length <= MAX_LENGTH) {
const message = {
id: Date.now() + "-" + Math.random().toString(36).substr(2, 9),
spider_name: spiderName,
data: fullData,
timestamp: new Date().toISOString(),
status: "pending",
};
this.insertStmt.run(
message.id,
message.spider_name,
message.data,
message.timestamp,
message.status
);
console.log(`📤 添加消息到队列: ${spiderName} - ${data.length} 条数据`);
return message.id;
}
// 如果消息太长,需要分割处理
console.log(`📝 消息过长 (${fullData.length} 字符),正在进行分割处理...`);
// 分割数据数组以确保每条消息不超过限制
const messageIds = [];
let currentChunk = [];
let currentChunkStr = "[]"; // 空数组的JSON表示
for (let i = 0; i < data.length; i++) {
const item = data[i];
const itemStr = JSON.stringify([item]); // 包含方括号
// 计算添加当前项后的新长度减去2是因为移除了空数组的方括号
const newLength = currentChunkStr.length + itemStr.length - 2;
// 如果添加当前项会超出限制,先保存当前块
if (newLength > MAX_LENGTH && currentChunk.length > 0) {
// 保存当前块
const message = {
id: Date.now() + "-" + messageIds.length + "-" + Math.random().toString(36).substr(2, 9),
spider_name: spiderName,
data: currentChunkStr,
timestamp: new Date().toISOString(),
status: "pending",
};
this.insertStmt.run(
message.id,
message.spider_name,
message.data,
message.timestamp,
message.status
);
messageIds.push(message.id);
// 开始新的块
currentChunk = [item];
currentChunkStr = JSON.stringify(currentChunk);
} else {
// 添加当前项到块中
currentChunk.push(item);
currentChunkStr = JSON.stringify(currentChunk);
}
}
// 保存最后一个块(如果有数据)
if (currentChunk.length > 0) {
const message = {
id: Date.now() + "-" + messageIds.length + "-" + Math.random().toString(36).substr(2, 9),
spider_name: spiderName,
data: currentChunkStr,
timestamp: new Date().toISOString(),
status: "pending",
};
this.insertStmt.run(
message.id,
message.spider_name,
message.data,
message.timestamp,
message.status
);
messageIds.push(message.id);
}
console.log(`📤 添加消息到队列: ${spiderName} - ${data.length} 条数据,分割为 ${messageIds.length} 条消息`);
return messageIds;
}
getPendingMessages() {