diff --git a/.changeset/fix-batch-queue-processing.md b/.changeset/fix-batch-queue-processing.md new file mode 100644 index 00000000000..39e63581458 --- /dev/null +++ b/.changeset/fix-batch-queue-processing.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/redis-worker": patch +--- + +Fix slow batch queue processing by removing spurious cooloff on concurrency blocks and fixing a race condition where retry attempt counts were not atomically updated during message re-queue. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 6d3e6fbe3d1..c9eaca627c3 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -549,7 +549,7 @@ const EnvironmentSchema = z BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(100), BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200), BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"), - BATCH_CONCURRENCY_LIMIT_DEFAULT: z.coerce.number().int().default(1), + BATCH_CONCURRENCY_LIMIT_DEFAULT: z.coerce.number().int().default(5), REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"), REALTIME_STREAM_MAX_LENGTH: z.coerce.number().int().default(1000), diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index 6ceac2ac6b1..deadf6ecca7 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -150,9 +150,7 @@ export class BatchQueue { visibilityTimeoutMs: 60_000, // 1 minute for batch item processing startConsumers: false, // We control when to start cooloff: { - enabled: true, - threshold: 5, - periodMs: 5_000, + enabled: false, }, // Worker queue configuration - FairQueue routes all messages to our single worker queue workerQueue: { diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index 541ccf55137..59177d11673 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -925,8 +925,11 @@ export class FairQueue { if (this.concurrencyManager) { const availableCapacity = await this.concurrencyManager.getAvailableCapacity(descriptor); if (availableCapacity === 0) { - // Queue at max concurrency, back off to avoid repeated attempts - this.#incrementCooloff(queueId); + // Queue at max concurrency - don't increment cooloff here. + // The outer loop already handles this case (concurrency blocked) + // and explicitly avoids cooloff for it. Cooloff here causes + // spurious 5s stalls when capacity races between the tenant + // pre-check and this per-queue check. return 0; } maxClaimCount = Math.min(maxClaimCount, availableCapacity); @@ -1228,19 +1231,18 @@ export class FairQueue { attempt: storedMessage.attempt + 1, }; - // Release with delay (and ensure queue is in master queue) + // Release with delay, passing the updated message data so the Lua script + // atomically writes the incremented attempt count when re-queuing. await this.visibilityManager.release( storedMessage.id, queueId, queueKey, queueItemsKey, masterQueueKey, - Date.now() + nextDelay + Date.now() + nextDelay, + JSON.stringify(updatedMessage) ); - // Update message in items hash with new attempt count - await this.redis.hset(queueItemsKey, storedMessage.id, JSON.stringify(updatedMessage)); - // Release concurrency if (this.concurrencyManager) { await this.concurrencyManager.release(descriptor, storedMessage.id); diff --git a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts index c7357f2fc39..345d97c04d4 100644 --- a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts @@ -1182,4 +1182,104 @@ describe("FairQueue", () => { } ); }); + + describe("concurrency block should not trigger cooloff", () => { + redisTest( + "should not enter cooloff when queue hits concurrency limit", + { timeout: 15000 }, + async ({ redisOptions }) => { + const processed: string[] = []; + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const scheduler = new DRRScheduler({ + redis: redisOptions, + keys, + quantum: 10, + maxDeficit: 100, + }); + + const queue = new TestFairQueueHelper(redisOptions, keys, { + scheduler, + payloadSchema: TestPayloadSchema, + shardCount: 1, + consumerCount: 1, + consumerIntervalMs: 20, + visibilityTimeoutMs: 5000, + cooloff: { + periodMs: 5000, // Long cooloff - if triggered, messages would stall + threshold: 1, // Enter cooloff after just 1 increment + }, + concurrencyGroups: [ + { + name: "tenant", + extractGroupId: (q) => q.tenantId, + getLimit: async () => 1, // Only 1 concurrent per tenant + defaultLimit: 1, + }, + ], + startConsumers: false, + }); + + // Hold first message to keep concurrency slot occupied + let releaseFirst: (() => void) | undefined; + const firstBlocking = new Promise((resolve) => { + releaseFirst = resolve; + }); + let firstStarted = false; + + queue.onMessage(async (ctx) => { + if (ctx.message.payload.value === "msg-0") { + firstStarted = true; + // Block this message to saturate concurrency + await firstBlocking; + } + processed.push(ctx.message.payload.value); + await ctx.complete(); + }); + + // Enqueue 3 messages to same tenant + for (let i = 0; i < 3; i++) { + await queue.enqueue({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + payload: { value: `msg-${i}` }, + }); + } + + queue.start(); + + // Wait for first message to start processing (blocking the concurrency slot) + await vi.waitFor( + () => { + expect(firstStarted).toBe(true); + }, + { timeout: 5000 } + ); + + // Release the first message so others can proceed + releaseFirst!(); + + // All 3 messages should process within a reasonable time. + // If cooloff was incorrectly triggered, this would take 5+ seconds. + const startTime = Date.now(); + await vi.waitFor( + () => { + expect(processed).toHaveLength(3); + }, + { timeout: 5000 } + ); + const elapsed = Date.now() - startTime; + + // Should complete well under the 5s cooloff period + expect(elapsed).toBeLessThan(3000); + + // Cooloff states should be empty (no spurious cooloffs) + const cacheSizes = queue.fairQueue.getCacheSizes(); + expect(cacheSizes.cooloffStatesSize).toBe(0); + + await queue.close(); + } + ); + }); + }); diff --git a/packages/redis-worker/src/fair-queue/visibility.ts b/packages/redis-worker/src/fair-queue/visibility.ts index 849c4cffb20..80fbf2ef004 100644 --- a/packages/redis-worker/src/fair-queue/visibility.ts +++ b/packages/redis-worker/src/fair-queue/visibility.ts @@ -284,7 +284,8 @@ export class VisibilityManager { queueKey: string, queueItemsKey: string, masterQueueKey: string, - score?: number + score?: number, + updatedData?: string ): Promise { const shardId = this.#getShardForQueue(queueId); const inflightKey = this.keys.inflightKey(shardId); @@ -293,7 +294,7 @@ export class VisibilityManager { const messageScore = score ?? Date.now(); // Use Lua script to atomically: - // 1. Get message data from in-flight + // 1. Get message data from in-flight (or use updatedData if provided) // 2. Remove from in-flight // 3. Add back to queue // 4. Update master queue to ensure queue is picked up @@ -306,7 +307,8 @@ export class VisibilityManager { member, messageId, messageScore.toString(), - queueId + queueId, + updatedData ?? "" ); this.logger.debug("Message released", { @@ -434,7 +436,8 @@ export class VisibilityManager { member, messageId, score.toString(), - queueId + queueId, + "" ); // Track reclaimed message for concurrency release @@ -680,6 +683,7 @@ local member = ARGV[1] local messageId = ARGV[2] local score = tonumber(ARGV[3]) local queueId = ARGV[4] +local updatedData = ARGV[5] -- Get message data from in-flight local payload = redis.call('HGET', inflightDataKey, messageId) @@ -688,6 +692,12 @@ if not payload then return 0 end +-- Use updatedData if provided (e.g. incremented attempt count for retries), +-- otherwise use the original in-flight data +if updatedData and updatedData ~= "" then + payload = updatedData +end + -- Remove from in-flight redis.call('ZREM', inflightKey, member) redis.call('HDEL', inflightDataKey, messageId) @@ -816,7 +826,8 @@ declare module "@internal/redis" { member: string, messageId: string, score: string, - queueId: string + queueId: string, + updatedData: string ): Promise; releaseMessageBatch(