Skip to content

Commit c039d65

Browse files
committed
Add annotations to task runs
1 parent 35298ac commit c039d65

23 files changed

+126
-36
lines changed

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import {
22
AttemptStatus,
3+
RunAnnotations,
34
RunStatus,
45
SerializedError,
56
TaskRunError,
@@ -56,6 +57,7 @@ const commonRunSelect = {
5657
},
5758
},
5859
runTags: true,
60+
annotations: true,
5961
} satisfies Prisma.TaskRunSelect;
6062

6163
type CommonRelatedRun = Prisma.Result<
@@ -466,6 +468,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V
466468
triggerFunction: resolveTriggerFunction(run),
467469
batchId: run.batch?.friendlyId,
468470
metadata,
471+
annotations: run.annotations ? RunAnnotations.safeParse(run.annotations).data : undefined,
469472
};
470473
}
471474

apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
4242
}
4343

4444
const service = new ReplayTaskRunService();
45-
const newRun = await service.call(taskRun);
45+
const newRun = await service.call(taskRun, { triggerSource: "api" });
4646

4747
if (!newRun) {
4848
return json({ error: "Failed to create new run" }, { status: 400 });

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ export const HeadersSchema = z.object({
3636
"x-trigger-engine-version": RunEngineVersionSchema.nullish(),
3737
"x-trigger-request-idempotency-key": z.string().nullish(),
3838
"x-trigger-realtime-streams-version": z.string().nullish(),
39+
"x-trigger-source": z.string().nullish(),
3940
traceparent: z.string().optional(),
4041
tracestate: z.string().optional(),
4142
});
@@ -67,6 +68,7 @@ const { action, loader } = createActionApiRoute(
6768
"x-trigger-engine-version": engineVersion,
6869
"x-trigger-request-idempotency-key": requestIdempotencyKey,
6970
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
71+
"x-trigger-source": triggerSourceHeader,
7072
} = headers;
7173

7274
const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, {
@@ -119,6 +121,10 @@ const { action, loader } = createActionApiRoute(
119121
realtimeStreamsVersion: determineRealtimeStreamsVersion(
120122
realtimeStreamsVersion ?? undefined
121123
),
124+
triggerSource: body.options?.parentRunId
125+
? "sdk"
126+
: triggerSourceHeader ?? "api",
127+
triggerAction: "trigger",
122128
},
123129
engineVersion ?? undefined
124130
);

apps/webapp/app/routes/api.v1.tasks.batch.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ const { action, loader } = createActionApiRoute(
7272
"x-trigger-engine-version": engineVersion,
7373
"batch-processing-strategy": batchProcessingStrategy,
7474
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
75+
"x-trigger-source": triggerSourceHeader,
7576
traceparent,
7677
tracestate,
7778
} = headers;
@@ -113,6 +114,8 @@ const { action, loader } = createActionApiRoute(
113114
realtimeStreamsVersion: determineRealtimeStreamsVersion(
114115
realtimeStreamsVersion ?? undefined
115116
),
117+
triggerSource: triggerSourceHeader ?? undefined,
118+
triggerAction: "trigger",
116119
});
117120

118121
const $responseHeaders = await responseHeaders(

apps/webapp/app/routes/api.v2.tasks.batch.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ const { action, loader } = createActionApiRoute(
6262
"batch-processing-strategy": batchProcessingStrategy,
6363
"x-trigger-request-idempotency-key": requestIdempotencyKey,
6464
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
65+
"x-trigger-source": triggerSourceHeader,
6566
traceparent,
6667
tracestate,
6768
} = headers;
@@ -127,6 +128,8 @@ const { action, loader } = createActionApiRoute(
127128
realtimeStreamsVersion: determineRealtimeStreamsVersion(
128129
realtimeStreamsVersion ?? undefined
129130
),
131+
triggerSource: triggerSourceHeader ?? undefined,
132+
triggerAction: "trigger",
130133
});
131134

132135
const $responseHeaders = await responseHeaders(

apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ export const action: ActionFunction = async ({ request, params }) => {
214214
ttlSeconds: submission.value.ttlSeconds,
215215
version: submission.value.version,
216216
prioritySeconds: submission.value.prioritySeconds,
217+
triggerSource: "dashboard",
217218
});
218219

219220
if (!newRun) {

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ export type BatchTriggerTaskServiceOptions = {
4848
spanParentAsLink?: boolean;
4949
oneTimeUseToken?: string;
5050
realtimeStreamsVersion?: "v1" | "v2";
51+
triggerSource?: string;
52+
triggerAction?: string;
5153
};
5254

5355
/**
@@ -678,6 +680,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
678680
batchId: batch.id,
679681
batchIndex: currentIndex,
680682
realtimeStreamsVersion: options?.realtimeStreamsVersion,
683+
triggerSource: parentRunId ? "sdk" : options?.triggerSource ?? "api",
684+
triggerAction: options?.triggerAction ?? "trigger",
681685
},
682686
"V2"
683687
);

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,23 @@ export class RunEngineTriggerTaskService {
289289

290290
const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);
291291

292+
// Build annotations for this run
293+
const triggerSource = options.triggerSource ?? "api";
294+
const triggerAction = options.triggerAction ?? "trigger";
295+
const parentAnnotations = parentRun?.annotations as
296+
| Record<string, unknown>
297+
| null
298+
| undefined;
299+
const annotations = {
300+
triggerSource,
301+
triggerAction,
302+
rootTriggerSource: parentAnnotations?.rootTriggerSource ?? triggerSource,
303+
rootScheduleId:
304+
(parentAnnotations?.rootScheduleId as string | undefined) ||
305+
options.scheduleId ||
306+
undefined,
307+
};
308+
292309
try {
293310
return await this.traceEventConcern.traceRun(
294311
triggerRequest,
@@ -369,6 +386,7 @@ export class RunEngineTriggerTaskService {
369386
planType,
370387
realtimeStreamsVersion: options.realtimeStreamsVersion,
371388
debounce: body.options?.debounce,
389+
annotations,
372390
// When debouncing with triggerAndWait, create a span for the debounced trigger
373391
onDebounced:
374392
body.options?.debounce && body.options?.resumeParentOnCompletion

apps/webapp/app/v3/runEngineHandlers.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,8 @@ export function setupBatchQueueCallbacks() {
750750
batchIndex: itemIndex,
751751
realtimeStreamsVersion: meta.realtimeStreamsVersion,
752752
planType: meta.planType,
753+
triggerSource: meta.parentRunId ? "sdk" : "api",
754+
triggerAction: "trigger",
753755
},
754756
"V2"
755757
);

apps/webapp/app/v3/scheduleEngine.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ function createScheduleEngine() {
106106
scheduleInstanceId,
107107
queueTimestamp: exactScheduleTime,
108108
overrideCreatedAt: exactScheduleTime,
109+
triggerSource: "schedule",
110+
triggerAction: "trigger",
109111
}
110112
);
111113

0 commit comments

Comments
 (0)