diff --git a/service/msgManager.js b/service/msgManager.js index 4c8b583..45b1eb9 100644 --- a/service/msgManager.js +++ b/service/msgManager.js @@ -29,6 +29,9 @@ class MessageQueue extends EventEmitter { // 启动处理器 this.startProcessor(); + + // 启动失败消息重试机制 + this.startFailedMessageRetry(); } // 添加消息到队列 @@ -88,6 +91,9 @@ class MessageQueue extends EventEmitter { this.processing = true; let msgMap = {}; + // 用于跟踪消息是否是重试的 + let isRetryMap = {}; + for (const message of pendingMessages) { try { console.log(`📧 处理消息: ${message.spider_name}`); @@ -95,17 +101,32 @@ class MessageQueue extends EventEmitter { // let formdata = JSON.parse(message.data); if (!msgMap[message.spider_name]) { msgMap[message.spider_name] = message.data; + isRetryMap[message.spider_name] = message.status === 'failed'; } else { msgMap[message.spider_name].push(...message.data); } - message.status = "sent"; - message.sent_at = new Date().toISOString(); - this.queue.updateMessageStatus( - message.id, - message.status, - message.sent_at - ); + // 只有当消息不是重试消息时,才更新状态为sent + // 如果是重试消息且成功发送,则删除错误信息 + if (message.status !== 'failed') { + message.status = "sent"; + message.sent_at = new Date().toISOString(); + this.queue.updateMessageStatus( + message.id, + message.status, + message.sent_at + ); + } else { + // 重试成功,清除错误信息 + message.status = "sent"; + message.sent_at = new Date().toISOString(); + this.queue.updateMessageStatus( + message.id, + message.status, + message.sent_at, + null // 清除错误信息 + ); + } } catch (error) { console.error(`❌ 消息处理失败: ${message.id}`, error); message.status = "failed"; @@ -162,6 +183,24 @@ class MessageQueue extends EventEmitter { sign = sign.toUpperCase(); return sign; } + + // 启动失败消息重试机制 + startFailedMessageRetry() { + // 每5分钟检查一次失败的消息并重试 + setInterval(async () => { + try { + console.log("检查失败消息并重试..."); + const failedMessages = this.queue.getFailedMessages(); + if (failedMessages.length > 0) { + console.log(`发现 ${failedMessages.length} 条失败消息,尝试重试发送...`); + // 重新处理失败的消息 + await this.processQueue(failedMessages); + } + } catch (error) { + console.error(`❌ 失败消息重试失败:`, error); + } + }, 5 * 60 * 1000); // 每5分钟检查一次 + } } const messageQueue = new MessageQueue();