From ec7999b04f8717cf5dd8bbf42459a1ff161a70f0 Mon Sep 17 00:00:00 2001 From: Alexandre Reyes Martins Date: Mon, 23 Feb 2026 21:31:20 +0000 Subject: [PATCH 1/3] feat(history-sync): emit messaging-history.set event on sync completion and fix race condition Reorder webhook emissions (CHATS_SET, MESSAGES_SET) to fire after database persistence, fixing a race condition where consumers received the event before data was queryable. Emit a new MESSAGING_HISTORY_SET event when progress reaches 100%, allowing consumers to know exactly when history sync is complete and messages are available in the database. Register the new event across all transport types (Webhook, WebSocket, RabbitMQ, NATS, SQS, Kafka, Pusher) and validation schemas. --- .../whatsapp/whatsapp.baileys.service.ts | 20 +++++++++++++------ .../integrations/event/event.controller.ts | 1 + src/config/env.config.ts | 10 ++++++++++ src/validate/instance.schema.ts | 4 ++++ 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts index 60e857fcc..9f4a900af 100644 --- a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts +++ b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts @@ -989,12 +989,12 @@ export class BaileysStartupService extends ChannelStartupService { chatsRaw.push({ remoteJid: chat.id, instanceId: this.instanceId, name: chat.name }); } - this.sendDataWebhook(Events.CHATS_SET, chatsRaw); - if (this.configService.get('DATABASE').SAVE_DATA.HISTORIC) { await this.prismaRepository.chat.createMany({ data: chatsRaw, skipDuplicates: true }); } + this.sendDataWebhook(Events.CHATS_SET, chatsRaw); + const messagesRaw: any[] = []; const messagesRepository: Set = new Set( @@ -1046,15 +1046,15 @@ export class BaileysStartupService extends ChannelStartupService { messagesRaw.push(this.prepareMessage(m)); } + if (this.configService.get('DATABASE').SAVE_DATA.HISTORIC) { + await this.prismaRepository.message.createMany({ data: messagesRaw, skipDuplicates: true }); + } + this.sendDataWebhook(Events.MESSAGES_SET, [...messagesRaw], true, undefined, { isLatest, progress, }); - if (this.configService.get('DATABASE').SAVE_DATA.HISTORIC) { - await this.prismaRepository.message.createMany({ data: messagesRaw, skipDuplicates: true }); - } - if ( this.configService.get('CHATWOOT').ENABLED && this.localChatwoot?.enabled && @@ -1071,6 +1071,14 @@ export class BaileysStartupService extends ChannelStartupService { contacts.filter((c) => !!c.notify || !!c.name).map((c) => ({ id: c.id, name: c.name ?? c.notify })), ); + if (progress === 100) { + this.sendDataWebhook(Events.MESSAGING_HISTORY_SET, { + messageCount: messagesRaw.length, + chatCount: chatsRaw.length, + contactCount: contacts?.length ?? 0, + }); + } + contacts = undefined; messages = undefined; chats = undefined; diff --git a/src/api/integrations/event/event.controller.ts b/src/api/integrations/event/event.controller.ts index 39b52184b..63061ea10 100644 --- a/src/api/integrations/event/event.controller.ts +++ b/src/api/integrations/event/event.controller.ts @@ -162,6 +162,7 @@ export class EventController { 'CALL', 'TYPEBOT_START', 'TYPEBOT_CHANGE_STATUS', + 'MESSAGING_HISTORY_SET', 'REMOVE_INSTANCE', 'LOGOUT_INSTANCE', 'INSTANCE_CREATE', diff --git a/src/config/env.config.ts b/src/config/env.config.ts index 7c4e382e7..772ae9279 100644 --- a/src/config/env.config.ts +++ b/src/config/env.config.ts @@ -91,6 +91,7 @@ export type EventsRabbitmq = { CALL: boolean; TYPEBOT_START: boolean; TYPEBOT_CHANGE_STATUS: boolean; + MESSAGING_HISTORY_SET: boolean; }; export type Rabbitmq = { @@ -150,6 +151,7 @@ export type Sqs = { SEND_MESSAGE: boolean; TYPEBOT_CHANGE_STATUS: boolean; TYPEBOT_START: boolean; + MESSAGING_HISTORY_SET: boolean; }; }; @@ -223,6 +225,7 @@ export type EventsWebhook = { CALL: boolean; TYPEBOT_START: boolean; TYPEBOT_CHANGE_STATUS: boolean; + MESSAGING_HISTORY_SET: boolean; ERRORS: boolean; ERRORS_WEBHOOK: string; }; @@ -256,6 +259,7 @@ export type EventsPusher = { CALL: boolean; TYPEBOT_START: boolean; TYPEBOT_CHANGE_STATUS: boolean; + MESSAGING_HISTORY_SET: boolean; }; export type ApiKey = { KEY: string }; @@ -537,6 +541,7 @@ export class ConfigService { CALL: process.env?.RABBITMQ_EVENTS_CALL === 'true', TYPEBOT_START: process.env?.RABBITMQ_EVENTS_TYPEBOT_START === 'true', TYPEBOT_CHANGE_STATUS: process.env?.RABBITMQ_EVENTS_TYPEBOT_CHANGE_STATUS === 'true', + MESSAGING_HISTORY_SET: process.env?.RABBITMQ_EVENTS_MESSAGING_HISTORY_SET === 'true', }, }, NATS: { @@ -574,6 +579,7 @@ export class ConfigService { CALL: process.env?.NATS_EVENTS_CALL === 'true', TYPEBOT_START: process.env?.NATS_EVENTS_TYPEBOT_START === 'true', TYPEBOT_CHANGE_STATUS: process.env?.NATS_EVENTS_TYPEBOT_CHANGE_STATUS === 'true', + MESSAGING_HISTORY_SET: process.env?.NATS_EVENTS_MESSAGING_HISTORY_SET === 'true', }, }, SQS: { @@ -614,6 +620,7 @@ export class ConfigService { SEND_MESSAGE: process.env?.SQS_GLOBAL_SEND_MESSAGE === 'true', TYPEBOT_CHANGE_STATUS: process.env?.SQS_GLOBAL_TYPEBOT_CHANGE_STATUS === 'true', TYPEBOT_START: process.env?.SQS_GLOBAL_TYPEBOT_START === 'true', + MESSAGING_HISTORY_SET: process.env?.SQS_GLOBAL_MESSAGING_HISTORY_SET === 'true', }, }, KAFKA: { @@ -657,6 +664,7 @@ export class ConfigService { CALL: process.env?.KAFKA_EVENTS_CALL === 'true', TYPEBOT_START: process.env?.KAFKA_EVENTS_TYPEBOT_START === 'true', TYPEBOT_CHANGE_STATUS: process.env?.KAFKA_EVENTS_TYPEBOT_CHANGE_STATUS === 'true', + MESSAGING_HISTORY_SET: process.env?.KAFKA_EVENTS_MESSAGING_HISTORY_SET === 'true', }, SASL: process.env?.KAFKA_SASL_ENABLED === 'true' @@ -722,6 +730,7 @@ export class ConfigService { CALL: process.env?.PUSHER_EVENTS_CALL === 'true', TYPEBOT_START: process.env?.PUSHER_EVENTS_TYPEBOT_START === 'true', TYPEBOT_CHANGE_STATUS: process.env?.PUSHER_EVENTS_TYPEBOT_CHANGE_STATUS === 'true', + MESSAGING_HISTORY_SET: process.env?.PUSHER_EVENTS_MESSAGING_HISTORY_SET === 'true', }, }, WA_BUSINESS: { @@ -779,6 +788,7 @@ export class ConfigService { CALL: process.env?.WEBHOOK_EVENTS_CALL === 'true', TYPEBOT_START: process.env?.WEBHOOK_EVENTS_TYPEBOT_START === 'true', TYPEBOT_CHANGE_STATUS: process.env?.WEBHOOK_EVENTS_TYPEBOT_CHANGE_STATUS === 'true', + MESSAGING_HISTORY_SET: process.env?.WEBHOOK_EVENTS_MESSAGING_HISTORY_SET === 'true', ERRORS: process.env?.WEBHOOK_EVENTS_ERRORS === 'true', ERRORS_WEBHOOK: process.env?.WEBHOOK_EVENTS_ERRORS_WEBHOOK || '', }, diff --git a/src/validate/instance.schema.ts b/src/validate/instance.schema.ts index a0553b666..16fd4fe80 100644 --- a/src/validate/instance.schema.ts +++ b/src/validate/instance.schema.ts @@ -86,6 +86,7 @@ export const instanceSchema: JSONSchema7 = { 'CALL', 'TYPEBOT_START', 'TYPEBOT_CHANGE_STATUS', + 'MESSAGING_HISTORY_SET', ], }, }, @@ -123,6 +124,7 @@ export const instanceSchema: JSONSchema7 = { 'CALL', 'TYPEBOT_START', 'TYPEBOT_CHANGE_STATUS', + 'MESSAGING_HISTORY_SET', ], }, }, @@ -160,6 +162,7 @@ export const instanceSchema: JSONSchema7 = { 'CALL', 'TYPEBOT_START', 'TYPEBOT_CHANGE_STATUS', + 'MESSAGING_HISTORY_SET', ], }, }, @@ -197,6 +200,7 @@ export const instanceSchema: JSONSchema7 = { 'CALL', 'TYPEBOT_START', 'TYPEBOT_CHANGE_STATUS', + 'MESSAGING_HISTORY_SET', ], }, }, From 1242baa5a4bec20f2eced8f1d017186c15d68a4d Mon Sep 17 00:00:00 2001 From: Alexandre Reyes Martins Date: Mon, 23 Feb 2026 21:48:30 +0000 Subject: [PATCH 2/3] fix(history-sync): use cumulative counts in MESSAGING_HISTORY_SET event Track message, chat and contact counts across all history sync batches using instance-level counters, so the final event reports accurate totals instead of only the last batch counts. Addresses Sourcery review feedback on PR #2440. --- .../whatsapp/whatsapp.baileys.service.ts | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts index 9f4a900af..1c9bd2324 100644 --- a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts +++ b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts @@ -252,6 +252,11 @@ export class BaileysStartupService extends ChannelStartupService { private logBaileys = this.configService.get('LOG').BAILEYS; private eventProcessingQueue: Promise = Promise.resolve(); + // Cumulative history sync counters (reset on sync completion) + private historySyncMessageCount = 0; + private historySyncChatCount = 0; + private historySyncContactCount = 0; + // Cache TTL constants (in seconds) private readonly MESSAGE_CACHE_TTL_SECONDS = 5 * 60; // 5 minutes - avoid duplicate message processing private readonly UPDATE_CACHE_TTL_SECONDS = 30 * 60; // 30 minutes - avoid duplicate status updates @@ -993,6 +998,8 @@ export class BaileysStartupService extends ChannelStartupService { await this.prismaRepository.chat.createMany({ data: chatsRaw, skipDuplicates: true }); } + this.historySyncChatCount += chatsRaw.length; + this.sendDataWebhook(Events.CHATS_SET, chatsRaw); const messagesRaw: any[] = []; @@ -1046,6 +1053,8 @@ export class BaileysStartupService extends ChannelStartupService { messagesRaw.push(this.prepareMessage(m)); } + this.historySyncMessageCount += messagesRaw.length; + if (this.configService.get('DATABASE').SAVE_DATA.HISTORIC) { await this.prismaRepository.message.createMany({ data: messagesRaw, skipDuplicates: true }); } @@ -1067,16 +1076,23 @@ export class BaileysStartupService extends ChannelStartupService { ); } + const filteredContacts = contacts.filter((c) => !!c.notify || !!c.name); + this.historySyncContactCount += filteredContacts.length; + await this.contactHandle['contacts.upsert']( - contacts.filter((c) => !!c.notify || !!c.name).map((c) => ({ id: c.id, name: c.name ?? c.notify })), + filteredContacts.map((c) => ({ id: c.id, name: c.name ?? c.notify })), ); if (progress === 100) { this.sendDataWebhook(Events.MESSAGING_HISTORY_SET, { - messageCount: messagesRaw.length, - chatCount: chatsRaw.length, - contactCount: contacts?.length ?? 0, + messageCount: this.historySyncMessageCount, + chatCount: this.historySyncChatCount, + contactCount: this.historySyncContactCount, }); + + this.historySyncMessageCount = 0; + this.historySyncChatCount = 0; + this.historySyncContactCount = 0; } contacts = undefined; From 6f759443b0b6a658ee5144f02790d2ec3d473204 Mon Sep 17 00:00:00 2001 From: Alexandre Reyes Martins Date: Tue, 24 Feb 2026 14:06:14 +0000 Subject: [PATCH 3/3] fix(history-sync): reset cumulative counters on new sync start and abort MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Detect new sync runs by tracking lastProgress — when progress resets or decreases, counters are zeroed before accumulating. This prevents stale counts from aborted syncs leaking into subsequent runs. Addresses Sourcery review feedback on PR #2442. --- .../channel/whatsapp/whatsapp.baileys.service.ts | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts index 1c9bd2324..4b5a115ba 100644 --- a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts +++ b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts @@ -252,10 +252,11 @@ export class BaileysStartupService extends ChannelStartupService { private logBaileys = this.configService.get('LOG').BAILEYS; private eventProcessingQueue: Promise = Promise.resolve(); - // Cumulative history sync counters (reset on sync completion) + // Cumulative history sync counters (reset on new sync or completion) private historySyncMessageCount = 0; private historySyncChatCount = 0; private historySyncContactCount = 0; + private historySyncLastProgress = -1; // Cache TTL constants (in seconds) private readonly MESSAGE_CACHE_TTL_SECONDS = 5 * 60; // 5 minutes - avoid duplicate message processing @@ -945,6 +946,14 @@ export class BaileysStartupService extends ChannelStartupService { syncType?: proto.HistorySync.HistorySyncType; }) => { try { + // Reset counters when a new sync starts (progress resets or decreases) + if (progress <= this.historySyncLastProgress) { + this.historySyncMessageCount = 0; + this.historySyncChatCount = 0; + this.historySyncContactCount = 0; + } + this.historySyncLastProgress = progress ?? -1; + if (syncType === proto.HistorySync.HistorySyncType.ON_DEMAND) { console.log('received on-demand history sync, messages=', messages); } @@ -1093,6 +1102,7 @@ export class BaileysStartupService extends ChannelStartupService { this.historySyncMessageCount = 0; this.historySyncChatCount = 0; this.historySyncContactCount = 0; + this.historySyncLastProgress = -1; } contacts = undefined;