Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ import {
TaskRunExecutionSnapshot,
TaskRunExecutionStatus,
TaskRunStatus,
Waitpoint,
} from "@trigger.dev/database";
import { HeartbeatTimeouts } from "../types.js";
import { SystemResources } from "./systems.js";

/** Chunk size for fetching waitpoints to avoid NAPI string conversion limits */
const WAITPOINT_CHUNK_SIZE = 100;

export type ExecutionSnapshotSystemOptions = {
resources: SystemResources;
heartbeatTimeouts: HeartbeatTimeouts;
Expand All @@ -31,19 +35,41 @@ type ExecutionSnapshotWithCheckAndWaitpoints = Prisma.TaskRunExecutionSnapshotGe
};
}>;

type ExecutionSnapshotWithCheckpoint = Prisma.TaskRunExecutionSnapshotGetPayload<{
include: {
checkpoint: true;
};
}>;

function enhanceExecutionSnapshot(
snapshot: ExecutionSnapshotWithCheckAndWaitpoints
): EnhancedExecutionSnapshot {
return enhanceExecutionSnapshotWithWaitpoints(
snapshot,
snapshot.completedWaitpoints,
snapshot.completedWaitpointOrder
);
}

/**
* Transforms a snapshot (with checkpoint but without waitpoints) into an EnhancedExecutionSnapshot
* by combining it with pre-fetched waitpoints.
*/
function enhanceExecutionSnapshotWithWaitpoints(
snapshot: ExecutionSnapshotWithCheckpoint,
waitpoints: Waitpoint[],
completedWaitpointOrder: string[]
): EnhancedExecutionSnapshot {
return {
...snapshot,
friendlyId: SnapshotId.toFriendlyId(snapshot.id),
runFriendlyId: RunId.toFriendlyId(snapshot.runId),
completedWaitpoints: snapshot.completedWaitpoints.flatMap((w) => {
//get all indexes of the waitpoint in the completedWaitpointOrder
//we do this because the same run can be in a batch multiple times (i.e. same idempotencyKey)
completedWaitpoints: waitpoints.flatMap((w) => {
// Get all indexes of the waitpoint in the completedWaitpointOrder
// We do this because the same run can be in a batch multiple times (i.e. same idempotencyKey)
let indexes: (number | undefined)[] = [];
for (let i = 0; i < snapshot.completedWaitpointOrder.length; i++) {
if (snapshot.completedWaitpointOrder[i] === w.id) {
for (let i = 0; i < completedWaitpointOrder.length; i++) {
if (completedWaitpointOrder[i] === w.id) {
indexes.push(i);
}
}
Expand All @@ -60,9 +86,7 @@ function enhanceExecutionSnapshot(
type: w.type,
completedAt: w.completedAt ?? new Date(),
idempotencyKey:
w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey
? w.idempotencyKey
: undefined,
w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey ? w.idempotencyKey : undefined,
completedByTaskRun: w.completedByTaskRunId
? {
id: w.completedByTaskRunId,
Expand Down Expand Up @@ -91,6 +115,42 @@ function enhanceExecutionSnapshot(
};
}

/**
* Gets the waitpoint IDs linked to a snapshot via the _completedWaitpoints join table.
* Uses raw SQL to avoid fetching full waitpoint data.
*/
async function getSnapshotWaitpointIds(
prisma: PrismaClientOrTransaction,
snapshotId: string
): Promise<string[]> {
const result = await prisma.$queryRaw<{ B: string }[]>`
SELECT "B" FROM "_completedWaitpoints" WHERE "A" = ${snapshotId}
`;
return result.map((r) => r.B);
}

/**
* Fetches waitpoints in chunks to avoid NAPI string conversion limits.
* This is necessary because waitpoints can have large outputs (100KB+),
* and fetching many at once can exceed Node.js string limits.
*/
async function fetchWaitpointsInChunks(
prisma: PrismaClientOrTransaction,
waitpointIds: string[]
): Promise<Waitpoint[]> {
if (waitpointIds.length === 0) return [];

const allWaitpoints: Waitpoint[] = [];
for (let i = 0; i < waitpointIds.length; i += WAITPOINT_CHUNK_SIZE) {
const chunk = waitpointIds.slice(i, i + WAITPOINT_CHUNK_SIZE);
const waitpoints = await prisma.waitpoint.findMany({
where: { id: { in: chunk } },
});
allWaitpoints.push(...waitpoints);
}
return allWaitpoints;
}

/* Gets the most recent valid snapshot for a run */
export async function getLatestExecutionSnapshot(
prisma: PrismaClientOrTransaction,
Expand Down Expand Up @@ -191,12 +251,27 @@ export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot):
};
}

/**
* Gets execution snapshots created after the specified snapshot.
*
* IMPORTANT: This function is optimized to avoid N×M data explosion when runs have many
* completed waitpoints. Due to the many-to-many relation, once waitpoints complete,
* all subsequent snapshots have the same waitpoints linked. For a run with 24 snapshots
* and 236 waitpoints with 100KB outputs each, fetching all waitpoints for all snapshots
* would result in ~570MB of data, causing "Failed to convert rust String into napi string" errors.
*
* Solution: Only the LATEST snapshot's waitpoints are fetched and included. The runner's
* SnapshotManager only processes completedWaitpoints from the latest snapshot anyway -
* intermediate snapshots' waitpoints are ignored. This reduces data from N×M to just M.
*
* Waitpoints are fetched in chunks (100 at a time) to handle batches up to 1000 items.
*/
export async function getExecutionSnapshotsSince(
prisma: PrismaClientOrTransaction,
runId: string,
sinceSnapshotId: string
): Promise<EnhancedExecutionSnapshot[]> {
// Find the createdAt of the sinceSnapshotId
// Step 1: Find the createdAt of the sinceSnapshotId
const sinceSnapshot = await prisma.taskRunExecutionSnapshot.findFirst({
where: { id: sinceSnapshotId },
select: { createdAt: true },
Expand All @@ -206,21 +281,40 @@ export async function getExecutionSnapshotsSince(
throw new Error(`No execution snapshot found for id ${sinceSnapshotId}`);
}

// Step 2: Fetch snapshots WITHOUT waitpoints to avoid N×M data explosion
const snapshots = await prisma.taskRunExecutionSnapshot.findMany({
where: {
runId,
isValid: true,
createdAt: { gt: sinceSnapshot.createdAt },
},
include: {
completedWaitpoints: true,
checkpoint: true,
// DO NOT include completedWaitpoints here - this causes the N×M explosion
},
orderBy: { createdAt: "desc" },
take: 50,
});

return snapshots.reverse().map(enhanceExecutionSnapshot);
if (snapshots.length === 0) return [];

// Step 3: Get waitpoint IDs for the LATEST snapshot only (first in desc order)
const latestSnapshot = snapshots[0];
const waitpointIds = await getSnapshotWaitpointIds(prisma, latestSnapshot.id);

// Step 4: Fetch waitpoints in chunks to avoid NAPI string conversion limits
const waitpoints = await fetchWaitpointsInChunks(prisma, waitpointIds);

// Step 5: Build enhanced snapshots - only latest gets waitpoints, others get empty arrays
// The runner only uses completedWaitpoints from the latest snapshot anyway
return snapshots.reverse().map((snapshot) => {
const isLatest = snapshot.id === latestSnapshot.id;
return enhanceExecutionSnapshotWithWaitpoints(
snapshot,
isLatest ? waitpoints : [],
latestSnapshot.completedWaitpointOrder
);
});
}

export class ExecutionSnapshotSystem {
Expand Down
Loading