Skip to content

feat(history-sync): emit messaging-history.set event on sync completion and fix race condition#2442

Merged
DavidsonGomes merged 3 commits intoEvolutionAPI:developfrom
alexandrereyes:feat/history-sync-event
Feb 24, 2026
Merged

feat(history-sync): emit messaging-history.set event on sync completion and fix race condition#2442
DavidsonGomes merged 3 commits intoEvolutionAPI:developfrom
alexandrereyes:feat/history-sync-event

Conversation

@alexandrereyes
Copy link

@alexandrereyes alexandrereyes commented Feb 24, 2026

Problem

When an external application subscribes to messages.set events and tries to query messages from the API upon receiving the event, it hits a race condition: the webhook is emitted before the data is persisted to the database via createMany. This means consumers receive the notification but can't find the messages yet.

Additionally, there is no dedicated event signaling that history sync is complete. Consumers receive N partial messages.set events with incremental progress and must guess when it's safe to start querying.

Solution

1. Fix race condition: persist before emitting

Reorder CHATS_SET and MESSAGES_SET webhook emissions to fire after prismaRepository.*.createMany(), ensuring data is queryable when the event reaches consumers.

2. Emit messaging-history.set when sync completes

When progress === 100, emit a new MESSAGING_HISTORY_SET event with cumulative counts across the entire sync:

{
  "event": "messaging-history.set",
  "instance": "my-instance",
  "data": {
    "messageCount": 1420,
    "chatCount": 350,
    "contactCount": 500
  }
}

Counts are accumulated via instance-level counters (historySyncMessageCount, historySyncChatCount, historySyncContactCount) that increment on each batch and reset after the completion event is emitted, so they represent the full sync totals — not just the last batch.

3. Register event across all transports

Added MESSAGING_HISTORY_SET to:

  • EventController.events (subscribable event list)
  • Type definitions: EventsRabbitmq, EventsWebhook, EventsPusher, Sqs.EVENTS
  • Environment config: all 6 transports (RabbitMQ, NATS, SQS, Kafka, Pusher, Webhook)
  • Validation schemas: instance.schema.ts (webhook, rabbitmq, nats, sqs enums)

🔗 Related Issue

Builds on top of #2260 which added isLatest and progress to the messages.set payload.

🧪 Type of Change

  • 🐛 Bug fix (non-breaking change which fixes an issue)
  • ✨ New feature (non-breaking change which adds functionality)

🧪 Testing

  • Manual testing completed
  • Functionality verified in development environment
  • No breaking changes introduced
  • TypeScript build passes (tsc --noEmit && tsup)
  • Lint passes

✅ Checklist

  • My code follows the project's style guidelines
  • I have performed a self-review of my code
  • My changes generate no new warnings
  • Fully backward compatible — the new event is opt-in via event subscription

📝 Files Changed

File Change
whatsapp.baileys.service.ts Reorder webhooks after createMany; add cumulative sync counters; emit MESSAGING_HISTORY_SET on progress === 100
event.controller.ts Add MESSAGING_HISTORY_SET to subscribable events list
env.config.ts Add event to all transport type definitions and env config blocks
instance.schema.ts Add event to all 4 inline validation enums

Summary by Sourcery

Add a completion event for WhatsApp history sync and ensure data is persisted before emitting related events.

New Features:

  • Emit a new messaging-history.set event on WhatsApp history sync completion with cumulative message, chat, and contact counts.
  • Track cumulative history sync counts for messages, chats, and contacts across batches and reset them after sync completion.

Bug Fixes:

  • Ensure CHATS_SET and MESSAGES_SET events are emitted only after corresponding records are persisted to the database to avoid race conditions for consumers querying data.

Enhancements:

  • Reuse filtered contact lists when upserting contacts during history sync to avoid duplicate filtering.

Build:

  • Expose the MESSAGING_HISTORY_SET event flag across all configured transports and environment-driven event toggles.

Chores:

  • Allow configuration and validation of the new MESSAGING_HISTORY_SET event across webhook, RabbitMQ, NATS, SQS, Kafka, and Pusher in env config and instance schema.

…on and fix race condition

Reorder webhook emissions (CHATS_SET, MESSAGES_SET) to fire after database
persistence, fixing a race condition where consumers received the event
before data was queryable.

Emit a new MESSAGING_HISTORY_SET event when progress reaches 100%,
allowing consumers to know exactly when history sync is complete and
messages are available in the database.

Register the new event across all transport types (Webhook, WebSocket,
RabbitMQ, NATS, SQS, Kafka, Pusher) and validation schemas.
Track message, chat and contact counts across all history sync batches
using instance-level counters, so the final event reports accurate
totals instead of only the last batch counts.

Addresses Sourcery review feedback on PR EvolutionAPI#2440.
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Feb 24, 2026

Reviewer's Guide

Implements a reliable history sync completion signal by reordering WhatsApp Baileys history sync webhooks to fire after database persistence and introducing a new messaging-history.set event with cumulative sync counters, wired through all event transports, config, and validation schemas.

Sequence diagram for WhatsApp history sync with messaging-history.set and race-condition fix

sequenceDiagram
  participant WhatsAppBaileys as WhatsApp_Baileys_SDK
  participant BaileysService as BaileysStartupService
  participant PrismaChat as PrismaRepositoryChat
  participant PrismaMessage as PrismaRepositoryMessage
  participant ContactHandler as ContactHandle
  participant EventController
  participant ExternalApp as External_Application

  WhatsAppBaileys->>BaileysService: historySyncBatch(progress, isLatest, contacts, chats, messages)
  BaileysService->>BaileysService: build chatsRaw from chats
  BaileysService->>BaileysService: historySyncChatCount += chatsRaw.length

  alt SAVE_DATA.HISTORIC enabled for chats
    BaileysService->>PrismaChat: createMany(chatsRaw, skipDuplicates)
    PrismaChat-->>BaileysService: chats persisted
  end

  BaileysService->>EventController: sendDataWebhook(CHATS_SET, chatsRaw)
  EventController-->>ExternalApp: CHATS_SET webhook

  BaileysService->>BaileysService: build messagesRaw from messages
  BaileysService->>BaileysService: historySyncMessageCount += messagesRaw.length

  alt SAVE_DATA.HISTORIC enabled for messages
    BaileysService->>PrismaMessage: createMany(messagesRaw, skipDuplicates)
    PrismaMessage-->>BaileysService: messages persisted
  end

  BaileysService->>EventController: sendDataWebhook(MESSAGES_SET, messagesRaw, isStatus, instanceId, metadata(progress, isLatest))
  EventController-->>ExternalApp: messages.set webhook

  BaileysService->>BaileysService: filteredContacts = contacts with name or notify
  BaileysService->>BaileysService: historySyncContactCount += filteredContacts.length
  BaileysService->>ContactHandler: contacts.upsert(filteredContacts)
  ContactHandler-->>BaileysService: contacts upserted

  alt progress == 100
    BaileysService->>EventController: sendDataWebhook(MESSAGING_HISTORY_SET, totals(messageCount, chatCount, contactCount))
    EventController-->>ExternalApp: messaging-history.set webhook
    BaileysService->>BaileysService: reset historySyncMessageCount, historySyncChatCount, historySyncContactCount to 0
  end

  ExternalApp->>ExternalApp: query API for chats, messages, contacts (data already persisted)
Loading

Class diagram for BaileysStartupService and event config types with MESSAGING_HISTORY_SET

classDiagram
  class BaileysStartupService {
    - historySyncMessageCount int
    - historySyncChatCount int
    - historySyncContactCount int
    + handleHistorySyncBatch(progress int, isLatest boolean, contacts any, chats any, messages any) void
    + sendDataWebhook(event Events, data any, isStatus boolean, instanceId string, metadata any) void
  }

  class EventsRabbitmq {
    + MESSAGE boolean
    + ACK boolean
    + CALL boolean
    + TYPEBOT_START boolean
    + TYPEBOT_CHANGE_STATUS boolean
    + MESSAGING_HISTORY_SET boolean
  }

  class EventsWebhook {
    + MESSAGE boolean
    + ACK boolean
    + CALL boolean
    + TYPEBOT_START boolean
    + TYPEBOT_CHANGE_STATUS boolean
    + MESSAGING_HISTORY_SET boolean
    + ERRORS boolean
    + ERRORS_WEBHOOK string
  }

  class EventsPusher {
    + MESSAGE boolean
    + ACK boolean
    + CALL boolean
    + TYPEBOT_START boolean
    + TYPEBOT_CHANGE_STATUS boolean
    + MESSAGING_HISTORY_SET boolean
  }

  class SqsEvents {
    + SEND_MESSAGE boolean
    + TYPEBOT_CHANGE_STATUS boolean
    + TYPEBOT_START boolean
    + MESSAGING_HISTORY_SET boolean
  }

  class Sqs {
    + EVENTS SqsEvents
  }

  class ConfigService {
    + getDatabaseConfig() Database
    + getEventsRabbitmq() EventsRabbitmq
    + getEventsWebhook() EventsWebhook
    + getEventsPusher() EventsPusher
    + getSqs() Sqs
  }

  class EventController {
    + events string[]
  }

  BaileysStartupService --> ConfigService : uses
  BaileysStartupService --> EventController : emits_events
  ConfigService --> EventsRabbitmq : provides
  ConfigService --> EventsWebhook : provides
  ConfigService --> EventsPusher : provides
  ConfigService --> Sqs : provides
  Sqs --> SqsEvents : aggregates
  EventController --> EventsRabbitmq : references
  EventController --> EventsWebhook : references
  EventController --> EventsPusher : references
  EventController --> SqsEvents : references
Loading

File-Level Changes

Change Details Files
Ensure CHATS_SET and MESSAGES_SET webhooks are emitted only after history data is persisted and track cumulative sync counts to emit a final MESSAGING_HISTORY_SET event.
  • Move CHATS_SET webhook emission to occur after chat createMany completes.
  • Move MESSAGES_SET webhook emission to occur after message createMany completes, preserving existing payload including isLatest and progress metadata.
  • Introduce instance-level counters for messages, chats, and contacts that accumulate counts per batch during history sync.
  • Filter contacts once into a reusable list, increment the contact counter based on this filtered list, and reuse it for contacts.upsert.
  • On progress === 100, emit a MESSAGING_HISTORY_SET webhook containing cumulative message, chat, and contact counts, then reset all counters to 0.
src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts
Expose the new MESSAGING_HISTORY_SET event as a subscribable event across all transports and configuration types.
  • Add MESSAGING_HISTORY_SET to the EventController.events list so clients can subscribe to the messaging-history.set event.
  • Extend EventsRabbitmq, EventsWebhook, EventsPusher, and Sqs.EVENTS type definitions with the MESSAGING_HISTORY_SET flag.
  • Wire MESSAGING_HISTORY_SET environment-driven flags for RabbitMQ, NATS, SQS, Kafka, Pusher, and Webhook in ConfigService, reading from corresponding process.env variables.
  • Update instance JSON schema enums for webhook, rabbitmq, nats, and sqs event configuration to include MESSAGING_HISTORY_SET so validation permits the new event.
src/api/integrations/event/event.controller.ts
src/config/env.config.ts
src/validate/instance.schema.ts

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • The cumulative historySync*Count fields are stored as mutable state on the service instance; if multiple history syncs can run concurrently for the same service (or if batches interleave), these counters could mix data between syncs—consider scoping them to a specific sync session or deriving totals from per-sync context instead.
  • Counters are only reset when progress === 100; if a sync aborts, errors, or never reaches 100%, the next sync will emit incorrect cumulative totals—consider adding a timeout/error-path reset or tying reset logic to the lifecycle of a sync rather than just progress.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The cumulative `historySync*Count` fields are stored as mutable state on the service instance; if multiple history syncs can run concurrently for the same service (or if batches interleave), these counters could mix data between syncs—consider scoping them to a specific sync session or deriving totals from per-sync context instead.
- Counters are only reset when `progress === 100`; if a sync aborts, errors, or never reaches 100%, the next sync will emit incorrect cumulative totals—consider adding a timeout/error-path reset or tying reset logic to the lifecycle of a sync rather than just `progress`.

## Individual Comments

### Comment 1
<location path="src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts" line_range="255-258" />
<code_context>
   private logBaileys = this.configService.get<Log>('LOG').BAILEYS;
   private eventProcessingQueue: Promise<void> = Promise.resolve();

+  // Cumulative history sync counters (reset on sync completion)
+  private historySyncMessageCount = 0;
+  private historySyncChatCount = 0;
+  private historySyncContactCount = 0;
+
   // Cache TTL constants (in seconds)
</code_context>
<issue_to_address>
**issue (bug_risk):** History sync counters are global to the instance and may leak across multiple or aborted sync runs.

Because these counters live on the instance and are only reset when `progress === 100`, an aborted history sync (error, disconnect, restart, etc.) will leave them non-zero and the next run will report combined counts from multiple runs. Overlapping syncs on the same instance would also interleave their counts. Please scope these counters to a single sync run (e.g., locals or keyed by sync ID), or reset them at sync start as well as on completion.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Detect new sync runs by tracking lastProgress — when progress resets or
decreases, counters are zeroed before accumulating. This prevents stale
counts from aborted syncs leaking into subsequent runs.

Addresses Sourcery review feedback on PR EvolutionAPI#2442.
alexandrereyes added a commit to alexandrereyes/evolution-api that referenced this pull request Feb 24, 2026
Detect new sync runs by tracking lastProgress — when progress resets or
decreases, counters are zeroed before accumulating. This prevents stale
counts from aborted syncs leaking into subsequent runs.

Addresses Sourcery review feedback on PR EvolutionAPI#2442.
@DavidsonGomes DavidsonGomes changed the base branch from main to develop February 24, 2026 15:16
@DavidsonGomes DavidsonGomes merged commit 6bb1637 into EvolutionAPI:develop Feb 24, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants