Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@ export interface EnhancedExecutionSnapshot extends TaskRunExecutionSnapshot {
type ExecutionSnapshotWithCheckAndWaitpoints = Prisma.TaskRunExecutionSnapshotGetPayload<{
include: {
checkpoint: true;
completedWaitpoints: true;
completedWaitpoints: {
include: {
completedByTaskRun: {
select: {
taskIdentifier: true;
};
};
};
};
};
}>;

Expand All @@ -57,7 +65,9 @@ function enhanceExecutionSnapshot(
*/
function enhanceExecutionSnapshotWithWaitpoints(
snapshot: ExecutionSnapshotWithCheckpoint,
waitpoints: Waitpoint[],
waitpoints: (Waitpoint & {
completedByTaskRun: { taskIdentifier: string | null } | null;
})[],
completedWaitpointOrder: string[]
): EnhancedExecutionSnapshot {
return {
Expand Down Expand Up @@ -89,22 +99,23 @@ function enhanceExecutionSnapshotWithWaitpoints(
w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey ? w.idempotencyKey : undefined,
completedByTaskRun: w.completedByTaskRunId
? {
id: w.completedByTaskRunId,
friendlyId: RunId.toFriendlyId(w.completedByTaskRunId),
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
}
id: w.completedByTaskRunId,
friendlyId: RunId.toFriendlyId(w.completedByTaskRunId),
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
taskIdentifier: w.completedByTaskRun?.taskIdentifier ?? undefined,
}
: undefined,
completedAfter: w.completedAfter ?? undefined,
completedByBatch: w.completedByBatchId
? {
id: w.completedByBatchId,
friendlyId: BatchId.toFriendlyId(w.completedByBatchId),
}
id: w.completedByBatchId,
friendlyId: BatchId.toFriendlyId(w.completedByBatchId),
}
: undefined,
output: w.output ?? undefined,
outputType: w.outputType,
Expand Down Expand Up @@ -137,14 +148,23 @@ async function getSnapshotWaitpointIds(
async function fetchWaitpointsInChunks(
prisma: PrismaClientOrTransaction,
waitpointIds: string[]
): Promise<Waitpoint[]> {
): Promise<(Waitpoint & { completedByTaskRun: { taskIdentifier: string | null } | null })[]> {
if (waitpointIds.length === 0) return [];

const allWaitpoints: Waitpoint[] = [];
const allWaitpoints: (Waitpoint & {
completedByTaskRun: { taskIdentifier: string | null } | null;
})[] = [];
for (let i = 0; i < waitpointIds.length; i += WAITPOINT_CHUNK_SIZE) {
const chunk = waitpointIds.slice(i, i + WAITPOINT_CHUNK_SIZE);
const waitpoints = await prisma.waitpoint.findMany({
where: { id: { in: chunk } },
include: {
completedByTaskRun: {
select: {
taskIdentifier: true,
},
},
},
});
allWaitpoints.push(...waitpoints);
}
Expand All @@ -159,7 +179,15 @@ export async function getLatestExecutionSnapshot(
const snapshot = await prisma.taskRunExecutionSnapshot.findFirst({
where: { runId, isValid: true },
include: {
completedWaitpoints: true,
completedWaitpoints: {
include: {
completedByTaskRun: {
select: {
taskIdentifier: true,
},
},
},
},
checkpoint: true,
},
orderBy: { createdAt: "desc" },
Expand All @@ -179,7 +207,15 @@ export async function getExecutionSnapshotCompletedWaitpoints(
const waitpoints = await prisma.taskRunExecutionSnapshot.findFirst({
where: { id: snapshotId },
include: {
completedWaitpoints: true,
completedWaitpoints: {
include: {
completedByTaskRun: {
select: {
taskIdentifier: true,
},
},
},
},
},
});

Expand Down Expand Up @@ -233,19 +269,19 @@ export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot):
},
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
checkpoint: snapshot.checkpoint
? {
id: snapshot.checkpoint.id,
friendlyId: snapshot.checkpoint.friendlyId,
type: snapshot.checkpoint.type,
location: snapshot.checkpoint.location,
imageRef: snapshot.checkpoint.imageRef,
reason: snapshot.checkpoint.reason ?? undefined,
}
id: snapshot.checkpoint.id,
friendlyId: snapshot.checkpoint.friendlyId,
type: snapshot.checkpoint.type,
location: snapshot.checkpoint.location,
imageRef: snapshot.checkpoint.imageRef,
reason: snapshot.checkpoint.reason ?? undefined,
}
: undefined,
completedWaitpoints: snapshot.completedWaitpoints,
};
Expand Down
121 changes: 121 additions & 0 deletions packages/core/src/v3/runtime/sharedRuntimeManager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { describe, expect, it } from "vitest";
import { SharedRuntimeManager } from "./sharedRuntimeManager.js";
import { CompletedWaitpoint } from "../schemas/index.js";

describe("SharedRuntimeManager", () => {
const mockIpc = {
send: () => { },
} as any;

const manager = new SharedRuntimeManager(mockIpc, false);

// Access private method
const waitpointToResult = (manager as any).waitpointToTaskRunExecutionResult.bind(manager);

describe("waitpointToTaskRunExecutionResult", () => {
it("should use the taskIdentifier from the waitpoint if present (success)", () => {
const waitpoint: CompletedWaitpoint = {
id: "wp_1",
friendlyId: "wp_1",
type: "RUN",
completedAt: new Date(),
outputIsError: false,
output: JSON.stringify({ foo: "bar" }),
outputType: "application/json",
completedByTaskRun: {
id: "run_1",
friendlyId: "run_1",
taskIdentifier: "my-task",
},
};

const result = waitpointToResult(waitpoint);

expect(result).toEqual({
ok: true,
id: "run_1",
taskIdentifier: "my-task",
output: JSON.stringify({ foo: "bar" }),
outputType: "application/json",
});
});

it("should default taskIdentifier to 'unknown' if missing (success)", () => {
const waitpoint: CompletedWaitpoint = {
id: "wp_2",
friendlyId: "wp_2",
type: "RUN",
completedAt: new Date(),
outputIsError: false,
output: JSON.stringify({ foo: "bar" }),
outputType: "application/json",
completedByTaskRun: {
id: "run_2",
friendlyId: "run_2",
// database/legacy object missing taskIdentifier
} as any,
};

const result = waitpointToResult(waitpoint);

expect(result).toEqual({
ok: true,
id: "run_2",
taskIdentifier: "unknown",
output: JSON.stringify({ foo: "bar" }),
outputType: "application/json",
});
});

it("should use the taskIdentifier from the waitpoint if present (failure)", () => {
const waitpoint: CompletedWaitpoint = {
id: "wp_3",
friendlyId: "wp_3",
type: "RUN",
completedAt: new Date(),
outputIsError: true,
output: JSON.stringify({ message: "Boom" }),
outputType: "application/json",
completedByTaskRun: {
id: "run_3",
friendlyId: "run_3",
taskIdentifier: "my-failed-task",
},
};

const result = waitpointToResult(waitpoint);

expect(result).toEqual({
ok: false,
id: "run_3",
taskIdentifier: "my-failed-task",
error: { message: "Boom" },
});
});

it("should default taskIdentifier to 'unknown' if missing (failure)", () => {
const waitpoint: CompletedWaitpoint = {
id: "wp_4",
friendlyId: "wp_4",
type: "RUN",
completedAt: new Date(),
outputIsError: true,
output: JSON.stringify({ message: "Boom" }),
outputType: "application/json",
completedByTaskRun: {
id: "run_4",
friendlyId: "run_4",
} as any,
};

const result = waitpointToResult(waitpoint);

expect(result).toEqual({
ok: false,
id: "run_4",
taskIdentifier: "unknown",
error: { message: "Boom" },
});
});
});
});
8 changes: 5 additions & 3 deletions packages/core/src/v3/runtime/sharedRuntimeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,17 +293,19 @@ export class SharedRuntimeManager implements RuntimeManager {
return {
ok: false,
id: waitpoint.completedByTaskRun.friendlyId,
taskIdentifier: waitpoint.completedByTaskRun.taskIdentifier ?? "unknown",
error: waitpoint.output
? JSON.parse(waitpoint.output)
: {
type: "STRING_ERROR",
message: "Missing error output",
},
type: "STRING_ERROR",
message: "Missing error output",
},
} satisfies TaskRunFailedExecutionResult;
} else {
return {
ok: true,
id: waitpoint.completedByTaskRun.friendlyId,
taskIdentifier: waitpoint.completedByTaskRun.taskIdentifier ?? "unknown",
output: waitpoint.output,
outputType: waitpoint.outputType ?? "application/json",
} satisfies TaskRunSuccessfulExecutionResult;
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/schemas/runEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export const CompletedWaitpoint = z.object({
.object({
id: z.string(),
friendlyId: z.string(),
taskIdentifier: z.string().optional(),
/** If the run has an associated batch */
batch: z
.object({
Expand Down