From fd967b2e7673fb11adc84f959bc7a0ea0b1387f8 Mon Sep 17 00:00:00 2001 From: waleed Date: Thu, 19 Feb 2026 13:06:07 -0800 Subject: [PATCH 1/2] fix(snapshot): changed insert to upsert when concurrent identical child workflows are running --- .../logs/execution/snapshot/service.test.ts | 204 +++++++++++++++++- .../lib/logs/execution/snapshot/service.ts | 53 ++--- 2 files changed, 218 insertions(+), 39 deletions(-) diff --git a/apps/sim/lib/logs/execution/snapshot/service.test.ts b/apps/sim/lib/logs/execution/snapshot/service.test.ts index 09353f7b21..2815872276 100644 --- a/apps/sim/lib/logs/execution/snapshot/service.test.ts +++ b/apps/sim/lib/logs/execution/snapshot/service.test.ts @@ -1,8 +1,43 @@ -import { describe, expect, it } from 'vitest' +/** + * @vitest-environment node + */ +import { databaseMock, drizzleOrmMock, loggerMock } from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +vi.mock('@sim/db', () => databaseMock) +vi.mock('@sim/db/schema', () => ({})) +vi.mock('@sim/logger', () => loggerMock) +vi.mock('drizzle-orm', () => drizzleOrmMock) +vi.mock('uuid', () => ({ v4: vi.fn(() => 'generated-uuid-1') })) + import { SnapshotService } from '@/lib/logs/execution/snapshot/service' import type { WorkflowState } from '@/lib/logs/types' +const mockState: WorkflowState = { + blocks: { + block1: { + id: 'block1', + name: 'Test Agent', + type: 'agent', + position: { x: 100, y: 200 }, + subBlocks: {}, + outputs: {}, + enabled: true, + horizontalHandles: true, + advancedMode: false, + height: 0, + }, + }, + edges: [{ id: 'edge1', source: 'block1', target: 'block2' }], + loops: {}, + parallels: {}, +} + describe('SnapshotService', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + describe('computeStateHash', () => { it.concurrent('should generate consistent hashes for identical states', () => { const service = new SnapshotService() @@ -62,7 +97,7 @@ describe('SnapshotService', () => { blocks: { block1: { ...baseState.blocks.block1, - position: { x: 500, y: 600 }, // Different position + position: { x: 500, y: 600 }, }, }, } @@ -140,7 +175,7 @@ describe('SnapshotService', () => { const state2: WorkflowState = { blocks: {}, edges: [ - { id: 'edge2', source: 'b', target: 'c' }, // Different order + { id: 'edge2', source: 'b', target: 'c' }, { id: 'edge1', source: 'a', target: 'b' }, ], loops: {}, @@ -219,7 +254,6 @@ describe('SnapshotService', () => { const hash = service.computeStateHash(complexState) expect(hash).toHaveLength(64) - // Should be consistent const hash2 = service.computeStateHash(complexState) expect(hash).toBe(hash2) }) @@ -335,4 +369,166 @@ describe('SnapshotService', () => { expect(hash1).toHaveLength(64) }) }) + + describe('createSnapshotWithDeduplication', () => { + it('should use upsert to insert a new snapshot', async () => { + const service = new SnapshotService() + const workflowId = 'wf-123' + + const mockReturning = vi.fn().mockResolvedValue([ + { + id: 'generated-uuid-1', + workflowId, + stateHash: 'abc123', + stateData: mockState, + createdAt: new Date('2026-02-19T00:00:00Z'), + }, + ]) + const mockOnConflictDoUpdate = vi.fn().mockReturnValue({ returning: mockReturning }) + const mockValues = vi.fn().mockReturnValue({ onConflictDoUpdate: mockOnConflictDoUpdate }) + const mockInsert = vi.fn().mockReturnValue({ values: mockValues }) + databaseMock.db.insert = mockInsert + + const result = await service.createSnapshotWithDeduplication(workflowId, mockState) + + expect(mockInsert).toHaveBeenCalled() + expect(mockValues).toHaveBeenCalledWith( + expect.objectContaining({ + id: 'generated-uuid-1', + workflowId, + stateData: mockState, + }) + ) + expect(mockOnConflictDoUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + set: expect.any(Object), + }) + ) + expect(result.snapshot.id).toBe('generated-uuid-1') + expect(result.isNew).toBe(true) + }) + + it('should detect reused snapshot when returned id differs from generated id', async () => { + const service = new SnapshotService() + const workflowId = 'wf-123' + + const mockReturning = vi.fn().mockResolvedValue([ + { + id: 'existing-snapshot-id', + workflowId, + stateHash: 'abc123', + stateData: mockState, + createdAt: new Date('2026-02-19T00:00:00Z'), + }, + ]) + const mockOnConflictDoUpdate = vi.fn().mockReturnValue({ returning: mockReturning }) + const mockValues = vi.fn().mockReturnValue({ onConflictDoUpdate: mockOnConflictDoUpdate }) + const mockInsert = vi.fn().mockReturnValue({ values: mockValues }) + databaseMock.db.insert = mockInsert + + const result = await service.createSnapshotWithDeduplication(workflowId, mockState) + + expect(result.snapshot.id).toBe('existing-snapshot-id') + expect(result.isNew).toBe(false) + }) + + it('should not throw on concurrent inserts with the same hash', async () => { + const service = new SnapshotService() + const workflowId = 'wf-123' + + const mockReturningNew = vi.fn().mockResolvedValue([ + { + id: 'generated-uuid-1', + workflowId, + stateHash: 'abc123', + stateData: mockState, + createdAt: new Date('2026-02-19T00:00:00Z'), + }, + ]) + const mockReturningExisting = vi.fn().mockResolvedValue([ + { + id: 'existing-snapshot-id', + workflowId, + stateHash: 'abc123', + stateData: mockState, + createdAt: new Date('2026-02-19T00:00:00Z'), + }, + ]) + + let callCount = 0 + databaseMock.db.insert = vi.fn().mockImplementation(() => ({ + values: vi.fn().mockImplementation(() => ({ + onConflictDoUpdate: vi.fn().mockImplementation(() => ({ + returning: callCount++ === 0 ? mockReturningNew : mockReturningExisting, + })), + })), + })) + + const [result1, result2] = await Promise.all([ + service.createSnapshotWithDeduplication(workflowId, mockState), + service.createSnapshotWithDeduplication(workflowId, mockState), + ]) + + expect(result1.snapshot.id).toBe('generated-uuid-1') + expect(result1.isNew).toBe(true) + expect(result2.snapshot.id).toBe('existing-snapshot-id') + expect(result2.isNew).toBe(false) + }) + + it('should pass state_data in the ON CONFLICT SET clause', async () => { + const service = new SnapshotService() + const workflowId = 'wf-123' + + let capturedConflictConfig: Record | undefined + const mockReturning = vi.fn().mockResolvedValue([ + { + id: 'generated-uuid-1', + workflowId, + stateHash: 'abc123', + stateData: mockState, + createdAt: new Date('2026-02-19T00:00:00Z'), + }, + ]) + + databaseMock.db.insert = vi.fn().mockReturnValue({ + values: vi.fn().mockReturnValue({ + onConflictDoUpdate: vi.fn().mockImplementation((config: Record) => { + capturedConflictConfig = config + return { returning: mockReturning } + }), + }), + }) + + await service.createSnapshotWithDeduplication(workflowId, mockState) + + expect(capturedConflictConfig).toBeDefined() + expect(capturedConflictConfig!.target).toBeDefined() + expect(capturedConflictConfig!.set).toBeDefined() + expect(capturedConflictConfig!.set).toHaveProperty('stateData') + }) + + it('should always call insert, never a separate select for deduplication', async () => { + const service = new SnapshotService() + const workflowId = 'wf-123' + + const mockReturning = vi.fn().mockResolvedValue([ + { + id: 'generated-uuid-1', + workflowId, + stateHash: 'abc123', + stateData: mockState, + createdAt: new Date('2026-02-19T00:00:00Z'), + }, + ]) + const mockOnConflictDoUpdate = vi.fn().mockReturnValue({ returning: mockReturning }) + const mockValues = vi.fn().mockReturnValue({ onConflictDoUpdate: mockOnConflictDoUpdate }) + databaseMock.db.insert = vi.fn().mockReturnValue({ values: mockValues }) + databaseMock.db.select = vi.fn() + + await service.createSnapshotWithDeduplication(workflowId, mockState) + + expect(databaseMock.db.insert).toHaveBeenCalledTimes(1) + expect(databaseMock.db.select).not.toHaveBeenCalled() + }) + }) }) diff --git a/apps/sim/lib/logs/execution/snapshot/service.ts b/apps/sim/lib/logs/execution/snapshot/service.ts index 856c3a1851..a2f4d0fff1 100644 --- a/apps/sim/lib/logs/execution/snapshot/service.ts +++ b/apps/sim/lib/logs/execution/snapshot/service.ts @@ -2,7 +2,7 @@ import { createHash } from 'crypto' import { db } from '@sim/db' import { workflowExecutionLogs, workflowExecutionSnapshots } from '@sim/db/schema' import { createLogger } from '@sim/logger' -import { and, eq, lt, notExists } from 'drizzle-orm' +import { and, eq, lt, notExists, sql } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' import type { SnapshotService as ISnapshotService, @@ -28,36 +28,8 @@ export class SnapshotService implements ISnapshotService { workflowId: string, state: WorkflowState ): Promise { - // Hash the position-less state for deduplication (functional equivalence) const stateHash = this.computeStateHash(state) - const existingSnapshot = await this.getSnapshotByHash(workflowId, stateHash) - if (existingSnapshot) { - let refreshedState: WorkflowState = existingSnapshot.stateData - try { - await db - .update(workflowExecutionSnapshots) - .set({ stateData: state }) - .where(eq(workflowExecutionSnapshots.id, existingSnapshot.id)) - refreshedState = state - } catch (error) { - logger.warn( - `Failed to refresh snapshot stateData for ${existingSnapshot.id}, continuing with existing data`, - error - ) - } - - logger.info( - `Reusing existing snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}...)` - ) - return { - snapshot: { ...existingSnapshot, stateData: refreshedState }, - isNew: false, - } - } - - // Store the FULL state (including positions) so we can recreate the exact workflow - // Even though we hash without positions, we want to preserve the complete state const snapshotData: WorkflowExecutionSnapshotInsert = { id: uuidv4(), workflowId, @@ -65,21 +37,32 @@ export class SnapshotService implements ISnapshotService { stateData: state, } - const [newSnapshot] = await db + const [upsertedSnapshot] = await db .insert(workflowExecutionSnapshots) .values(snapshotData) + .onConflictDoUpdate({ + target: [workflowExecutionSnapshots.workflowId, workflowExecutionSnapshots.stateHash], + set: { + stateData: sql`excluded.state_data`, + }, + }) .returning() + const isNew = upsertedSnapshot.id === snapshotData.id + logger.info( - `Created new snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}..., blocks: ${Object.keys(state.blocks || {}).length})` + isNew + ? `Created new snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}..., blocks: ${Object.keys(state.blocks || {}).length})` + : `Reusing existing snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}...)` ) + return { snapshot: { - ...newSnapshot, - stateData: newSnapshot.stateData as WorkflowState, - createdAt: newSnapshot.createdAt.toISOString(), + ...upsertedSnapshot, + stateData: upsertedSnapshot.stateData as WorkflowState, + createdAt: upsertedSnapshot.createdAt.toISOString(), }, - isNew: true, + isNew, } } From 43a47e8daed3a1afabb1b8c16ebba669a45af15f Mon Sep 17 00:00:00 2001 From: waleed Date: Thu, 19 Feb 2026 13:13:51 -0800 Subject: [PATCH 2/2] fixed ci tests failing --- .../logs/execution/snapshot/service.test.ts | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/apps/sim/lib/logs/execution/snapshot/service.test.ts b/apps/sim/lib/logs/execution/snapshot/service.test.ts index 2815872276..64a0f4284e 100644 --- a/apps/sim/lib/logs/execution/snapshot/service.test.ts +++ b/apps/sim/lib/logs/execution/snapshot/service.test.ts @@ -4,8 +4,24 @@ import { databaseMock, drizzleOrmMock, loggerMock } from '@sim/testing' import { beforeEach, describe, expect, it, vi } from 'vitest' +const { mockSchemaExports } = vi.hoisted(() => ({ + mockSchemaExports: { + workflowExecutionSnapshots: { + id: 'id', + workflowId: 'workflow_id', + stateHash: 'state_hash', + stateData: 'state_data', + createdAt: 'created_at', + }, + workflowExecutionLogs: { + id: 'id', + stateSnapshotId: 'state_snapshot_id', + }, + }, +})) + vi.mock('@sim/db', () => databaseMock) -vi.mock('@sim/db/schema', () => ({})) +vi.mock('@sim/db/schema', () => mockSchemaExports) vi.mock('@sim/logger', () => loggerMock) vi.mock('drizzle-orm', () => drizzleOrmMock) vi.mock('uuid', () => ({ v4: vi.fn(() => 'generated-uuid-1') }))