diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts index 0c285a7b96..f1bb7b3dc8 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts @@ -261,6 +261,9 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] { ...allBlocks.map((b) => new Date(b.endedAt || b.timestamp).getTime()) ) const totalDuration = allBlocks.reduce((sum, b) => sum + (b.durationMs || 0), 0) + // Parallel branches run concurrently — use wall-clock time. Loop iterations run serially — use sum. + const subflowDuration = + iterationType === 'parallel' ? subflowEndMs - subflowStartMs : totalDuration // Create synthetic subflow parent entry // Use the minimum executionOrder from all child blocks for proper ordering @@ -276,7 +279,7 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] { startedAt: new Date(subflowStartMs).toISOString(), executionOrder: subflowExecutionOrder, endedAt: new Date(subflowEndMs).toISOString(), - durationMs: totalDuration, + durationMs: subflowDuration, success: !allBlocks.some((b) => b.error), } @@ -291,6 +294,9 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] { ...iterBlocks.map((b) => new Date(b.endedAt || b.timestamp).getTime()) ) const iterDuration = iterBlocks.reduce((sum, b) => sum + (b.durationMs || 0), 0) + // Parallel branches run concurrently — use wall-clock time. Loop iterations run serially — use sum. + const iterDisplayDuration = + iterationType === 'parallel' ? iterEndMs - iterStartMs : iterDuration // Use the minimum executionOrder from blocks in this iteration const iterExecutionOrder = Math.min(...iterBlocks.map((b) => b.executionOrder)) @@ -305,7 +311,7 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] { startedAt: new Date(iterStartMs).toISOString(), executionOrder: iterExecutionOrder, endedAt: new Date(iterEndMs).toISOString(), - durationMs: iterDuration, + durationMs: iterDisplayDuration, success: !iterBlocks.some((b) => b.error), iterationCurrent: iterGroup.iterationCurrent, iterationTotal: iterGroup.iterationTotal, diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index 1088f8c87f..10186d6876 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -20,6 +20,7 @@ import { TriggerUtils, } from '@/lib/workflows/triggers/triggers' import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow' +import { updateActiveBlockRefCount } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils' import { getBlock } from '@/blocks' import type { SerializableExecutionState } from '@/executor/execution/types' import type { @@ -63,6 +64,7 @@ interface BlockEventHandlerConfig { executionIdRef: { current: string } workflowEdges: Array<{ id: string; target: string; sourceHandle?: string | null }> activeBlocksSet: Set + activeBlockRefCounts: Map accumulatedBlockLogs: BlockLog[] accumulatedBlockStates: Map executedBlockIds: Set @@ -309,6 +311,7 @@ export function useWorkflowExecution() { executionIdRef, workflowEdges, activeBlocksSet, + activeBlockRefCounts, accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, @@ -327,11 +330,7 @@ export function useWorkflowExecution() { const updateActiveBlocks = (blockId: string, isActive: boolean) => { if (!workflowId) return - if (isActive) { - activeBlocksSet.add(blockId) - } else { - activeBlocksSet.delete(blockId) - } + updateActiveBlockRefCount(activeBlockRefCounts, activeBlocksSet, blockId, isActive) setActiveBlocks(workflowId, new Set(activeBlocksSet)) } @@ -1280,6 +1279,7 @@ export function useWorkflowExecution() { } const activeBlocksSet = new Set() + const activeBlockRefCounts = new Map() const streamedContent = new Map() const accumulatedBlockLogs: BlockLog[] = [] const accumulatedBlockStates = new Map() @@ -1292,6 +1292,7 @@ export function useWorkflowExecution() { executionIdRef, workflowEdges, activeBlocksSet, + activeBlockRefCounts, accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, @@ -1902,6 +1903,7 @@ export function useWorkflowExecution() { const accumulatedBlockStates = new Map() const executedBlockIds = new Set() const activeBlocksSet = new Set() + const activeBlockRefCounts = new Map() try { const blockHandlers = buildBlockEventHandlers({ @@ -1909,6 +1911,7 @@ export function useWorkflowExecution() { executionIdRef, workflowEdges, activeBlocksSet, + activeBlockRefCounts, accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, @@ -2104,6 +2107,7 @@ export function useWorkflowExecution() { const workflowEdges = useWorkflowStore.getState().edges const activeBlocksSet = new Set() + const activeBlockRefCounts = new Map() const accumulatedBlockLogs: BlockLog[] = [] const accumulatedBlockStates = new Map() const executedBlockIds = new Set() @@ -2115,6 +2119,7 @@ export function useWorkflowExecution() { executionIdRef, workflowEdges, activeBlocksSet, + activeBlockRefCounts, accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts index c0e54ea437..ff1baf222a 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts @@ -5,6 +5,30 @@ import { useTerminalConsoleStore } from '@/stores/terminal' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { useWorkflowStore } from '@/stores/workflows/workflow/store' +/** + * Updates the active blocks set and ref counts for a single block. + * Ref counting ensures a block stays active until all parallel branches for it complete. + */ +export function updateActiveBlockRefCount( + refCounts: Map, + activeSet: Set, + blockId: string, + isActive: boolean +): void { + if (isActive) { + refCounts.set(blockId, (refCounts.get(blockId) ?? 0) + 1) + activeSet.add(blockId) + } else { + const next = (refCounts.get(blockId) ?? 1) - 1 + if (next <= 0) { + refCounts.delete(blockId) + activeSet.delete(blockId) + } else { + refCounts.set(blockId, next) + } + } +} + export interface WorkflowExecutionOptions { workflowInput?: any onStream?: (se: StreamingExecution) => Promise @@ -39,6 +63,7 @@ export async function executeWorkflowWithFullLogging( const workflowEdges = useWorkflowStore.getState().edges const activeBlocksSet = new Set() + const activeBlockRefCounts = new Map() const payload: any = { input: options.workflowInput, @@ -103,7 +128,12 @@ export async function executeWorkflowWithFullLogging( switch (event.type) { case 'block:started': { - activeBlocksSet.add(event.data.blockId) + updateActiveBlockRefCount( + activeBlockRefCounts, + activeBlocksSet, + event.data.blockId, + true + ) setActiveBlocks(wfId, new Set(activeBlocksSet)) const incomingEdges = workflowEdges.filter( @@ -115,8 +145,13 @@ export async function executeWorkflowWithFullLogging( break } - case 'block:completed': - activeBlocksSet.delete(event.data.blockId) + case 'block:completed': { + updateActiveBlockRefCount( + activeBlockRefCounts, + activeBlocksSet, + event.data.blockId, + false + ) setActiveBlocks(wfId, new Set(activeBlocksSet)) setBlockRunStatus(wfId, event.data.blockId, 'success') @@ -144,9 +179,15 @@ export async function executeWorkflowWithFullLogging( options.onBlockComplete(event.data.blockId, event.data.output).catch(() => {}) } break + } - case 'block:error': - activeBlocksSet.delete(event.data.blockId) + case 'block:error': { + updateActiveBlockRefCount( + activeBlockRefCounts, + activeBlocksSet, + event.data.blockId, + false + ) setActiveBlocks(wfId, new Set(activeBlocksSet)) setBlockRunStatus(wfId, event.data.blockId, 'error') @@ -171,6 +212,7 @@ export async function executeWorkflowWithFullLogging( iterationContainerId: event.data.iterationContainerId, }) break + } case 'execution:completed': executionResult = { diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 58143e5832..56b7c6a915 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -428,7 +428,7 @@ export class BlockExecutor { block: SerializedBlock, executionOrder: number ): void { - const blockId = node.id + const blockId = node.metadata?.originalBlockId ?? node.id const blockName = block.metadata?.name ?? blockId const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE @@ -456,7 +456,7 @@ export class BlockExecutor { executionOrder: number, endedAt: string ): void { - const blockId = node.id + const blockId = node.metadata?.originalBlockId ?? node.id const blockName = block.metadata?.name ?? blockId const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE