Skip to content
Open
5 changes: 5 additions & 0 deletions .changeset/fix-batch-trigger-task-identifier.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Fix run.taskIdentifier being reported as "unknown" for batch triggers
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@ export interface EnhancedExecutionSnapshot extends TaskRunExecutionSnapshot {
type ExecutionSnapshotWithCheckAndWaitpoints = Prisma.TaskRunExecutionSnapshotGetPayload<{
include: {
checkpoint: true;
completedWaitpoints: true;
completedWaitpoints: {
include: {
completedByTaskRun: {
select: {
taskIdentifier: true;
};
};
};
};
};
}>;

Expand All @@ -57,7 +65,9 @@ function enhanceExecutionSnapshot(
*/
function enhanceExecutionSnapshotWithWaitpoints(
snapshot: ExecutionSnapshotWithCheckpoint,
waitpoints: Waitpoint[],
waitpoints: (Waitpoint & {
completedByTaskRun: { taskIdentifier: string | null } | null;
})[],
completedWaitpointOrder: string[]
): EnhancedExecutionSnapshot {
return {
Expand Down Expand Up @@ -89,22 +99,23 @@ function enhanceExecutionSnapshotWithWaitpoints(
w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey ? w.idempotencyKey : undefined,
completedByTaskRun: w.completedByTaskRunId
? {
id: w.completedByTaskRunId,
friendlyId: RunId.toFriendlyId(w.completedByTaskRunId),
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
}
id: w.completedByTaskRunId,
friendlyId: RunId.toFriendlyId(w.completedByTaskRunId),
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
taskIdentifier: w.completedByTaskRun?.taskIdentifier ?? undefined,
}
: undefined,
completedAfter: w.completedAfter ?? undefined,
completedByBatch: w.completedByBatchId
? {
id: w.completedByBatchId,
friendlyId: BatchId.toFriendlyId(w.completedByBatchId),
}
id: w.completedByBatchId,
friendlyId: BatchId.toFriendlyId(w.completedByBatchId),
}
: undefined,
output: w.output ?? undefined,
outputType: w.outputType,
Expand Down Expand Up @@ -137,14 +148,23 @@ async function getSnapshotWaitpointIds(
async function fetchWaitpointsInChunks(
prisma: PrismaClientOrTransaction,
waitpointIds: string[]
): Promise<Waitpoint[]> {
): Promise<(Waitpoint & { completedByTaskRun: { taskIdentifier: string | null } | null })[]> {
if (waitpointIds.length === 0) return [];

const allWaitpoints: Waitpoint[] = [];
const allWaitpoints: (Waitpoint & {
completedByTaskRun: { taskIdentifier: string | null } | null;
})[] = [];
for (let i = 0; i < waitpointIds.length; i += WAITPOINT_CHUNK_SIZE) {
const chunk = waitpointIds.slice(i, i + WAITPOINT_CHUNK_SIZE);
const waitpoints = await prisma.waitpoint.findMany({
where: { id: { in: chunk } },
include: {
completedByTaskRun: {
select: {
taskIdentifier: true,
},
},
},
Comment on lines +161 to +167

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Additional Prisma JOINs added to multiple hot query paths

This PR adds include: { completedByTaskRun: { select: { taskIdentifier: true } } } to four separate query paths: getLatestExecutionSnapshot (line 182-190), getExecutionSnapshotCompletedWaitpoints (line 210-218), fetchWaitpointsInChunks (line 161-167), and the ExecutionSnapshotWithCheckAndWaitpoints type (line 34-42). Each of these now performs an extra JOIN to the TaskRun table for every waitpoint. For runs with many waitpoints (e.g., large batches of 1000), this adds 1000 JOINs. The completedByTaskRunId column has a @unique constraint, so the JOIN should be efficient (index lookup), but it's worth monitoring query performance for large batch scenarios.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

});
allWaitpoints.push(...waitpoints);
}
Expand All @@ -159,7 +179,15 @@ export async function getLatestExecutionSnapshot(
const snapshot = await prisma.taskRunExecutionSnapshot.findFirst({
where: { runId, isValid: true },
include: {
completedWaitpoints: true,
completedWaitpoints: {
include: {
completedByTaskRun: {
select: {
taskIdentifier: true,
},
},
},
},
checkpoint: true,
},
orderBy: { createdAt: "desc" },
Expand All @@ -179,7 +207,15 @@ export async function getExecutionSnapshotCompletedWaitpoints(
const waitpoints = await prisma.taskRunExecutionSnapshot.findFirst({
where: { id: snapshotId },
include: {
completedWaitpoints: true,
completedWaitpoints: {
include: {
completedByTaskRun: {
select: {
taskIdentifier: true,
},
},
},
},
},
});

Expand Down Expand Up @@ -233,19 +269,19 @@ export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot):
},
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
checkpoint: snapshot.checkpoint
? {
id: snapshot.checkpoint.id,
friendlyId: snapshot.checkpoint.friendlyId,
type: snapshot.checkpoint.type,
location: snapshot.checkpoint.location,
imageRef: snapshot.checkpoint.imageRef,
reason: snapshot.checkpoint.reason ?? undefined,
}
id: snapshot.checkpoint.id,
friendlyId: snapshot.checkpoint.friendlyId,
type: snapshot.checkpoint.type,
location: snapshot.checkpoint.location,
imageRef: snapshot.checkpoint.imageRef,
reason: snapshot.checkpoint.reason ?? undefined,
}
: undefined,
completedWaitpoints: snapshot.completedWaitpoints,
};
Expand Down
121 changes: 121 additions & 0 deletions packages/core/src/v3/runtime/sharedRuntimeManager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { describe, expect, it } from "vitest";
import { SharedRuntimeManager } from "./sharedRuntimeManager.js";
import { CompletedWaitpoint } from "../schemas/index.js";

describe("SharedRuntimeManager", () => {
const mockIpc = {
send: () => { },
} as any;

const manager = new SharedRuntimeManager(mockIpc, false);

// Access private method
const waitpointToResult = (manager as any).waitpointToTaskRunExecutionResult.bind(manager);

describe("waitpointToTaskRunExecutionResult", () => {
it("should use the taskIdentifier from the waitpoint if present (success)", () => {
const waitpoint: CompletedWaitpoint = {
id: "wp_1",
friendlyId: "wp_1",
type: "RUN",
completedAt: new Date(),
outputIsError: false,
output: JSON.stringify({ foo: "bar" }),
outputType: "application/json",
completedByTaskRun: {
id: "run_1",
friendlyId: "run_1",
taskIdentifier: "my-task",
},
};

const result = waitpointToResult(waitpoint);

expect(result).toEqual({
ok: true,
id: "run_1",
taskIdentifier: "my-task",
output: JSON.stringify({ foo: "bar" }),
outputType: "application/json",
});
});

it("should default taskIdentifier to 'unknown' if missing (success)", () => {
const waitpoint: CompletedWaitpoint = {
id: "wp_2",
friendlyId: "wp_2",
type: "RUN",
completedAt: new Date(),
outputIsError: false,
output: JSON.stringify({ foo: "bar" }),
outputType: "application/json",
completedByTaskRun: {
id: "run_2",
friendlyId: "run_2",
// database/legacy object missing taskIdentifier
} as any,
};

const result = waitpointToResult(waitpoint);

expect(result).toEqual({
ok: true,
id: "run_2",
taskIdentifier: "unknown",
output: JSON.stringify({ foo: "bar" }),
outputType: "application/json",
});
});

it("should use the taskIdentifier from the waitpoint if present (failure)", () => {
const waitpoint: CompletedWaitpoint = {
id: "wp_3",
friendlyId: "wp_3",
type: "RUN",
completedAt: new Date(),
outputIsError: true,
output: JSON.stringify({ message: "Boom" }),
outputType: "application/json",
completedByTaskRun: {
id: "run_3",
friendlyId: "run_3",
taskIdentifier: "my-failed-task",
},
};

const result = waitpointToResult(waitpoint);

expect(result).toEqual({
ok: false,
id: "run_3",
taskIdentifier: "my-failed-task",
error: { message: "Boom" },
});
});

it("should default taskIdentifier to 'unknown' if missing (failure)", () => {
const waitpoint: CompletedWaitpoint = {
id: "wp_4",
friendlyId: "wp_4",
type: "RUN",
completedAt: new Date(),
outputIsError: true,
output: JSON.stringify({ message: "Boom" }),
outputType: "application/json",
completedByTaskRun: {
id: "run_4",
friendlyId: "run_4",
} as any,
};

const result = waitpointToResult(waitpoint);

expect(result).toEqual({
ok: false,
id: "run_4",
taskIdentifier: "unknown",
error: { message: "Boom" },
});
});
});
});
10 changes: 6 additions & 4 deletions packages/core/src/v3/runtime/sharedRuntimeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ export class SharedRuntimeManager implements RuntimeManager {

return {
id: params.id,
items: waitpoints.map(this.waitpointToTaskRunExecutionResult),
items: waitpoints.map((wp) => this.waitpointToTaskRunExecutionResult(wp)),
};
});
}
Expand Down Expand Up @@ -293,17 +293,19 @@ export class SharedRuntimeManager implements RuntimeManager {
return {
ok: false,
id: waitpoint.completedByTaskRun.friendlyId,
taskIdentifier: waitpoint.completedByTaskRun.taskIdentifier ?? "unknown",
error: waitpoint.output
? JSON.parse(waitpoint.output)
: {
type: "STRING_ERROR",
message: "Missing error output",
},
type: "STRING_ERROR",
message: "Missing error output",
},
} satisfies TaskRunFailedExecutionResult;
} else {
return {
ok: true,
id: waitpoint.completedByTaskRun.friendlyId,
taskIdentifier: waitpoint.completedByTaskRun.taskIdentifier ?? "unknown",
output: waitpoint.output,
outputType: waitpoint.outputType ?? "application/json",
} satisfies TaskRunSuccessfulExecutionResult;
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/schemas/runEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export const CompletedWaitpoint = z.object({
.object({
id: z.string(),
friendlyId: z.string(),
taskIdentifier: z.string().optional(),
/** If the run has an associated batch */
batch: z
.object({
Expand Down