From f58a70f84adb42f45fe2c3bdc8cef64d0e504d44 Mon Sep 17 00:00:00 2001 From: huzhengrong Date: Tue, 28 Oct 2025 20:22:50 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20fix(msgManager):=20=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E4=BB=A5=E6=94=AF=E6=8C=81=E6=B6=88=E6=81=AF=E9=95=BF=E5=BA=A6?= =?UTF-8?q?=E9=99=90=E5=88=B6=EF=BC=8C=E6=B7=BB=E5=8A=A0=E5=88=86=E5=89=B2?= =?UTF-8?q?=E5=A4=84=E7=90=86=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- service/msgManager.js | 14 +++--- sqlite.js | 107 +++++++++++++++++++++++++++++++++++------- 2 files changed, 97 insertions(+), 24 deletions(-) diff --git a/service/msgManager.js b/service/msgManager.js index 8c82ce8..7b97d2d 100644 --- a/service/msgManager.js +++ b/service/msgManager.js @@ -96,14 +96,14 @@ class MessageQueue extends EventEmitter { for (const message of pendingMessages) { try { - console.log(`📧 处理消息: ${message.spider_name}`); + console.log(`📧 处理消息: ${message.id}-${message.spider_name}`); // console.log(typeof message.data); // let formdata = JSON.parse(message.data); - if (!msgMap[message.spider_name]) { - msgMap[message.spider_name] = message.data; - isRetryMap[message.spider_name] = message.status === 'failed'; + if (!msgMap[message.id]) { + msgMap[message.id] = message.data; + isRetryMap[message.id] = message.status === 'failed'; } else { - msgMap[message.spider_name].push(...message.data); + msgMap[message.id].push(...message.data); } // 只有当消息不是重试消息时,才更新状态为sent @@ -142,9 +142,9 @@ class MessageQueue extends EventEmitter { } // 消息发送部分添加延迟,确保每分钟不超过20条消息(每条间隔至少3秒) - for (const spiderName in msgMap) { + for (const spiderId in msgMap) { try { - let sendResult = await sendQYWechatMessage(this.generateTable(spiderName, msgMap[spiderName])) + let sendResult = await sendQYWechatMessage(this.generateTable(pendingMessages.find(item=>item.id===spiderId).spider_name, msgMap[spiderId])) console.log(`✅ 通知发送成功: ${JSON.stringify(sendResult)}`); // 添加3秒延迟以满足企微文档的频率限制(每分钟不超过20条消息) diff --git a/sqlite.js b/sqlite.js index a86d0f2..5880bd4 100644 --- a/sqlite.js +++ b/sqlite.js @@ -168,23 +168,96 @@ class SQLiteMessageQueue { // ============= addMessage(spiderName, data) { - const message = { - id: Date.now() + "-" + Math.random().toString(36).substr(2, 9), - spider_name: spiderName, - data: JSON.stringify(data), - timestamp: new Date().toISOString(), - status: "pending", - }; - this.insertStmt.run( - message.id, - message.spider_name, - message.data, - message.timestamp, - message.status - ); - // wechatPush(spiderName, data); - console.log(`📤 添加消息到队列: ${spiderName} - ${data.length} 条数据`); - return message.id; + // 生成完整的消息内容 + const fullData = JSON.stringify(data); + + // 企业微信消息的最大长度是4096字符 + const MAX_LENGTH = 4096; + + // 如果消息长度在限制内,直接添加 + if (fullData.length <= MAX_LENGTH) { + const message = { + id: Date.now() + "-" + Math.random().toString(36).substr(2, 9), + spider_name: spiderName, + data: fullData, + timestamp: new Date().toISOString(), + status: "pending", + }; + this.insertStmt.run( + message.id, + message.spider_name, + message.data, + message.timestamp, + message.status + ); + console.log(`📤 添加消息到队列: ${spiderName} - ${data.length} 条数据`); + return message.id; + } + + // 如果消息太长,需要分割处理 + console.log(`📝 消息过长 (${fullData.length} 字符),正在进行分割处理...`); + + // 分割数据数组以确保每条消息不超过限制 + const messageIds = []; + let currentChunk = []; + let currentChunkStr = "[]"; // 空数组的JSON表示 + + for (let i = 0; i < data.length; i++) { + const item = data[i]; + const itemStr = JSON.stringify([item]); // 包含方括号 + // 计算添加当前项后的新长度(减去2是因为移除了空数组的方括号) + const newLength = currentChunkStr.length + itemStr.length - 2; + + // 如果添加当前项会超出限制,先保存当前块 + if (newLength > MAX_LENGTH && currentChunk.length > 0) { + // 保存当前块 + const message = { + id: Date.now() + "-" + messageIds.length + "-" + Math.random().toString(36).substr(2, 9), + spider_name: spiderName, + data: currentChunkStr, + timestamp: new Date().toISOString(), + status: "pending", + }; + this.insertStmt.run( + message.id, + message.spider_name, + message.data, + message.timestamp, + message.status + ); + messageIds.push(message.id); + + // 开始新的块 + currentChunk = [item]; + currentChunkStr = JSON.stringify(currentChunk); + } else { + // 添加当前项到块中 + currentChunk.push(item); + currentChunkStr = JSON.stringify(currentChunk); + } + } + + // 保存最后一个块(如果有数据) + if (currentChunk.length > 0) { + const message = { + id: Date.now() + "-" + messageIds.length + "-" + Math.random().toString(36).substr(2, 9), + spider_name: spiderName, + data: currentChunkStr, + timestamp: new Date().toISOString(), + status: "pending", + }; + this.insertStmt.run( + message.id, + message.spider_name, + message.data, + message.timestamp, + message.status + ); + messageIds.push(message.id); + } + + console.log(`📤 添加消息到队列: ${spiderName} - ${data.length} 条数据,分割为 ${messageIds.length} 条消息`); + return messageIds; } getPendingMessages() {