Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 216 additions & 4 deletions apps/sim/lib/logs/execution/snapshot/service.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,59 @@
import { describe, expect, it } from 'vitest'
/**
* @vitest-environment node
*/
import { databaseMock, drizzleOrmMock, loggerMock } from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'

const { mockSchemaExports } = vi.hoisted(() => ({
mockSchemaExports: {
workflowExecutionSnapshots: {
id: 'id',
workflowId: 'workflow_id',
stateHash: 'state_hash',
stateData: 'state_data',
createdAt: 'created_at',
},
workflowExecutionLogs: {
id: 'id',
stateSnapshotId: 'state_snapshot_id',
},
},
}))

vi.mock('@sim/db', () => databaseMock)
vi.mock('@sim/db/schema', () => mockSchemaExports)
vi.mock('@sim/logger', () => loggerMock)
vi.mock('drizzle-orm', () => drizzleOrmMock)
vi.mock('uuid', () => ({ v4: vi.fn(() => 'generated-uuid-1') }))

import { SnapshotService } from '@/lib/logs/execution/snapshot/service'
import type { WorkflowState } from '@/lib/logs/types'

const mockState: WorkflowState = {
blocks: {
block1: {
id: 'block1',
name: 'Test Agent',
type: 'agent',
position: { x: 100, y: 200 },
subBlocks: {},
outputs: {},
enabled: true,
horizontalHandles: true,
advancedMode: false,
height: 0,
},
},
edges: [{ id: 'edge1', source: 'block1', target: 'block2' }],
loops: {},
parallels: {},
}

describe('SnapshotService', () => {
beforeEach(() => {
vi.clearAllMocks()
})

describe('computeStateHash', () => {
it.concurrent('should generate consistent hashes for identical states', () => {
const service = new SnapshotService()
Expand Down Expand Up @@ -62,7 +113,7 @@ describe('SnapshotService', () => {
blocks: {
block1: {
...baseState.blocks.block1,
position: { x: 500, y: 600 }, // Different position
position: { x: 500, y: 600 },
},
},
}
Expand Down Expand Up @@ -140,7 +191,7 @@ describe('SnapshotService', () => {
const state2: WorkflowState = {
blocks: {},
edges: [
{ id: 'edge2', source: 'b', target: 'c' }, // Different order
{ id: 'edge2', source: 'b', target: 'c' },
{ id: 'edge1', source: 'a', target: 'b' },
],
loops: {},
Expand Down Expand Up @@ -219,7 +270,6 @@ describe('SnapshotService', () => {
const hash = service.computeStateHash(complexState)
expect(hash).toHaveLength(64)

// Should be consistent
const hash2 = service.computeStateHash(complexState)
expect(hash).toBe(hash2)
})
Expand Down Expand Up @@ -335,4 +385,166 @@ describe('SnapshotService', () => {
expect(hash1).toHaveLength(64)
})
})

describe('createSnapshotWithDeduplication', () => {
it('should use upsert to insert a new snapshot', async () => {
const service = new SnapshotService()
const workflowId = 'wf-123'

const mockReturning = vi.fn().mockResolvedValue([
{
id: 'generated-uuid-1',
workflowId,
stateHash: 'abc123',
stateData: mockState,
createdAt: new Date('2026-02-19T00:00:00Z'),
},
])
const mockOnConflictDoUpdate = vi.fn().mockReturnValue({ returning: mockReturning })
const mockValues = vi.fn().mockReturnValue({ onConflictDoUpdate: mockOnConflictDoUpdate })
const mockInsert = vi.fn().mockReturnValue({ values: mockValues })
databaseMock.db.insert = mockInsert

const result = await service.createSnapshotWithDeduplication(workflowId, mockState)

expect(mockInsert).toHaveBeenCalled()
expect(mockValues).toHaveBeenCalledWith(
expect.objectContaining({
id: 'generated-uuid-1',
workflowId,
stateData: mockState,
})
)
expect(mockOnConflictDoUpdate).toHaveBeenCalledWith(
expect.objectContaining({
set: expect.any(Object),
})
)
expect(result.snapshot.id).toBe('generated-uuid-1')
expect(result.isNew).toBe(true)
})

it('should detect reused snapshot when returned id differs from generated id', async () => {
const service = new SnapshotService()
const workflowId = 'wf-123'

const mockReturning = vi.fn().mockResolvedValue([
{
id: 'existing-snapshot-id',
workflowId,
stateHash: 'abc123',
stateData: mockState,
createdAt: new Date('2026-02-19T00:00:00Z'),
},
])
const mockOnConflictDoUpdate = vi.fn().mockReturnValue({ returning: mockReturning })
const mockValues = vi.fn().mockReturnValue({ onConflictDoUpdate: mockOnConflictDoUpdate })
const mockInsert = vi.fn().mockReturnValue({ values: mockValues })
databaseMock.db.insert = mockInsert

const result = await service.createSnapshotWithDeduplication(workflowId, mockState)

expect(result.snapshot.id).toBe('existing-snapshot-id')
expect(result.isNew).toBe(false)
})

it('should not throw on concurrent inserts with the same hash', async () => {
const service = new SnapshotService()
const workflowId = 'wf-123'

const mockReturningNew = vi.fn().mockResolvedValue([
{
id: 'generated-uuid-1',
workflowId,
stateHash: 'abc123',
stateData: mockState,
createdAt: new Date('2026-02-19T00:00:00Z'),
},
])
const mockReturningExisting = vi.fn().mockResolvedValue([
{
id: 'existing-snapshot-id',
workflowId,
stateHash: 'abc123',
stateData: mockState,
createdAt: new Date('2026-02-19T00:00:00Z'),
},
])

let callCount = 0
databaseMock.db.insert = vi.fn().mockImplementation(() => ({
values: vi.fn().mockImplementation(() => ({
onConflictDoUpdate: vi.fn().mockImplementation(() => ({
returning: callCount++ === 0 ? mockReturningNew : mockReturningExisting,
})),
})),
}))

const [result1, result2] = await Promise.all([
service.createSnapshotWithDeduplication(workflowId, mockState),
service.createSnapshotWithDeduplication(workflowId, mockState),
])

expect(result1.snapshot.id).toBe('generated-uuid-1')
expect(result1.isNew).toBe(true)
expect(result2.snapshot.id).toBe('existing-snapshot-id')
expect(result2.isNew).toBe(false)
})

it('should pass state_data in the ON CONFLICT SET clause', async () => {
const service = new SnapshotService()
const workflowId = 'wf-123'

let capturedConflictConfig: Record<string, unknown> | undefined
const mockReturning = vi.fn().mockResolvedValue([
{
id: 'generated-uuid-1',
workflowId,
stateHash: 'abc123',
stateData: mockState,
createdAt: new Date('2026-02-19T00:00:00Z'),
},
])

databaseMock.db.insert = vi.fn().mockReturnValue({
values: vi.fn().mockReturnValue({
onConflictDoUpdate: vi.fn().mockImplementation((config: Record<string, unknown>) => {
capturedConflictConfig = config
return { returning: mockReturning }
}),
}),
})

await service.createSnapshotWithDeduplication(workflowId, mockState)

expect(capturedConflictConfig).toBeDefined()
expect(capturedConflictConfig!.target).toBeDefined()
expect(capturedConflictConfig!.set).toBeDefined()
expect(capturedConflictConfig!.set).toHaveProperty('stateData')
})

it('should always call insert, never a separate select for deduplication', async () => {
const service = new SnapshotService()
const workflowId = 'wf-123'

const mockReturning = vi.fn().mockResolvedValue([
{
id: 'generated-uuid-1',
workflowId,
stateHash: 'abc123',
stateData: mockState,
createdAt: new Date('2026-02-19T00:00:00Z'),
},
])
const mockOnConflictDoUpdate = vi.fn().mockReturnValue({ returning: mockReturning })
const mockValues = vi.fn().mockReturnValue({ onConflictDoUpdate: mockOnConflictDoUpdate })
databaseMock.db.insert = vi.fn().mockReturnValue({ values: mockValues })
databaseMock.db.select = vi.fn()

await service.createSnapshotWithDeduplication(workflowId, mockState)

expect(databaseMock.db.insert).toHaveBeenCalledTimes(1)
expect(databaseMock.db.select).not.toHaveBeenCalled()
})
})
})
53 changes: 18 additions & 35 deletions apps/sim/lib/logs/execution/snapshot/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { createHash } from 'crypto'
import { db } from '@sim/db'
import { workflowExecutionLogs, workflowExecutionSnapshots } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, lt, notExists } from 'drizzle-orm'
import { and, eq, lt, notExists, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import type {
SnapshotService as ISnapshotService,
Expand All @@ -28,58 +28,41 @@ export class SnapshotService implements ISnapshotService {
workflowId: string,
state: WorkflowState
): Promise<SnapshotCreationResult> {
// Hash the position-less state for deduplication (functional equivalence)
const stateHash = this.computeStateHash(state)

const existingSnapshot = await this.getSnapshotByHash(workflowId, stateHash)
if (existingSnapshot) {
let refreshedState: WorkflowState = existingSnapshot.stateData
try {
await db
.update(workflowExecutionSnapshots)
.set({ stateData: state })
.where(eq(workflowExecutionSnapshots.id, existingSnapshot.id))
refreshedState = state
} catch (error) {
logger.warn(
`Failed to refresh snapshot stateData for ${existingSnapshot.id}, continuing with existing data`,
error
)
}

logger.info(
`Reusing existing snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}...)`
)
return {
snapshot: { ...existingSnapshot, stateData: refreshedState },
isNew: false,
}
}

// Store the FULL state (including positions) so we can recreate the exact workflow
// Even though we hash without positions, we want to preserve the complete state
const snapshotData: WorkflowExecutionSnapshotInsert = {
id: uuidv4(),
workflowId,
stateHash,
stateData: state,
}

const [newSnapshot] = await db
const [upsertedSnapshot] = await db
.insert(workflowExecutionSnapshots)
.values(snapshotData)
.onConflictDoUpdate({
target: [workflowExecutionSnapshots.workflowId, workflowExecutionSnapshots.stateHash],
set: {
stateData: sql`excluded.state_data`,
},
})
.returning()

const isNew = upsertedSnapshot.id === snapshotData.id

logger.info(
`Created new snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}..., blocks: ${Object.keys(state.blocks || {}).length})`
isNew
? `Created new snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}..., blocks: ${Object.keys(state.blocks || {}).length})`
: `Reusing existing snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}...)`
)

return {
snapshot: {
...newSnapshot,
stateData: newSnapshot.stateData as WorkflowState,
createdAt: newSnapshot.createdAt.toISOString(),
...upsertedSnapshot,
stateData: upsertedSnapshot.stateData as WorkflowState,
createdAt: upsertedSnapshot.createdAt.toISOString(),
},
isNew: true,
isNew,
}
}

Expand Down