From 32d77ce04a302a506d5486e77942ee40b2cd2133 Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Mon, 23 Mar 2026 11:57:08 +0100 Subject: [PATCH 1/7] Add annotations to task runs --- .../routes/api.v1.runs.$runParam.replay.ts | 2 +- .../routes/api.v1.tasks.$taskId.trigger.ts | 6 +++ apps/webapp/app/routes/api.v1.tasks.batch.ts | 3 ++ apps/webapp/app/routes/api.v2.tasks.batch.ts | 3 ++ .../resources.taskruns.$runParam.replay.ts | 1 + .../runEngine/services/batchTrigger.server.ts | 4 ++ .../runEngine/services/triggerTask.server.ts | 13 +++++ .../webapp/app/v3/runEngineHandlers.server.ts | 2 + apps/webapp/app/v3/scheduleEngine.server.ts | 2 + .../app/v3/services/batchTriggerV3.server.ts | 6 +++ .../v3/services/bulk/BulkActionV2.server.ts | 1 + .../services/bulk/performBulkAction.server.ts | 2 +- .../app/v3/services/replayTaskRun.server.ts | 3 ++ .../webapp/app/v3/services/testTask.server.ts | 51 +++++++++++-------- .../app/v3/services/triggerTask.server.ts | 2 + .../migration.sql | 2 + .../database/prisma/schema.prisma | 3 ++ .../run-engine/src/engine/index.ts | 2 + .../run-engine/src/engine/types.ts | 6 +++ packages/cli-v3/src/apiClient.ts | 6 ++- packages/cli-v3/src/mcp/context.ts | 4 +- packages/core/src/v3/apiClient/core.ts | 5 +- packages/core/src/v3/apiClient/index.ts | 13 ++++- packages/core/src/v3/schemas/api.ts | 19 +++++++ 24 files changed, 133 insertions(+), 28 deletions(-) create mode 100644 internal-packages/database/prisma/migrations/20260323085243_add_task_run_annotations/migration.sql diff --git a/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts b/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts index 66f30ca1dff..3c65cc963da 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts @@ -42,7 +42,7 @@ export async function action({ request, params }: ActionFunctionArgs) { } const service = new ReplayTaskRunService(); - const newRun = await service.call(taskRun); + const newRun = await service.call(taskRun, { triggerSource: "api" }); if (!newRun) { return json({ error: "Failed to create new run" }, { status: 400 }); diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts index d55c3659eea..3d1cfd969d0 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts @@ -36,6 +36,7 @@ export const HeadersSchema = z.object({ "x-trigger-engine-version": RunEngineVersionSchema.nullish(), "x-trigger-request-idempotency-key": z.string().nullish(), "x-trigger-realtime-streams-version": z.string().nullish(), + "x-trigger-source": z.string().nullish(), traceparent: z.string().optional(), tracestate: z.string().optional(), }); @@ -67,6 +68,7 @@ const { action, loader } = createActionApiRoute( "x-trigger-engine-version": engineVersion, "x-trigger-request-idempotency-key": requestIdempotencyKey, "x-trigger-realtime-streams-version": realtimeStreamsVersion, + "x-trigger-source": triggerSourceHeader, } = headers; const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, { @@ -119,6 +121,10 @@ const { action, loader } = createActionApiRoute( realtimeStreamsVersion: determineRealtimeStreamsVersion( realtimeStreamsVersion ?? undefined ), + triggerSource: body.options?.parentRunId + ? "sdk" + : triggerSourceHeader ?? "api", + triggerAction: "trigger", }, engineVersion ?? undefined ); diff --git a/apps/webapp/app/routes/api.v1.tasks.batch.ts b/apps/webapp/app/routes/api.v1.tasks.batch.ts index 4a10ade0e60..99ab11cab22 100644 --- a/apps/webapp/app/routes/api.v1.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v1.tasks.batch.ts @@ -72,6 +72,7 @@ const { action, loader } = createActionApiRoute( "x-trigger-engine-version": engineVersion, "batch-processing-strategy": batchProcessingStrategy, "x-trigger-realtime-streams-version": realtimeStreamsVersion, + "x-trigger-source": triggerSourceHeader, traceparent, tracestate, } = headers; @@ -113,6 +114,8 @@ const { action, loader } = createActionApiRoute( realtimeStreamsVersion: determineRealtimeStreamsVersion( realtimeStreamsVersion ?? undefined ), + triggerSource: triggerSourceHeader ?? undefined, + triggerAction: "trigger", }); const $responseHeaders = await responseHeaders( diff --git a/apps/webapp/app/routes/api.v2.tasks.batch.ts b/apps/webapp/app/routes/api.v2.tasks.batch.ts index cd351505b50..ed3fd46ed09 100644 --- a/apps/webapp/app/routes/api.v2.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v2.tasks.batch.ts @@ -62,6 +62,7 @@ const { action, loader } = createActionApiRoute( "batch-processing-strategy": batchProcessingStrategy, "x-trigger-request-idempotency-key": requestIdempotencyKey, "x-trigger-realtime-streams-version": realtimeStreamsVersion, + "x-trigger-source": triggerSourceHeader, traceparent, tracestate, } = headers; @@ -127,6 +128,8 @@ const { action, loader } = createActionApiRoute( realtimeStreamsVersion: determineRealtimeStreamsVersion( realtimeStreamsVersion ?? undefined ), + triggerSource: triggerSourceHeader ?? undefined, + triggerAction: "trigger", }); const $responseHeaders = await responseHeaders( diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts index eba924f409b..8a22822d06b 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts @@ -214,6 +214,7 @@ export const action: ActionFunction = async ({ request, params }) => { ttlSeconds: submission.value.ttlSeconds, version: submission.value.version, prioritySeconds: submission.value.prioritySeconds, + triggerSource: "dashboard", }); if (!newRun) { diff --git a/apps/webapp/app/runEngine/services/batchTrigger.server.ts b/apps/webapp/app/runEngine/services/batchTrigger.server.ts index 78427a001e1..c46880497b0 100644 --- a/apps/webapp/app/runEngine/services/batchTrigger.server.ts +++ b/apps/webapp/app/runEngine/services/batchTrigger.server.ts @@ -48,6 +48,8 @@ export type BatchTriggerTaskServiceOptions = { spanParentAsLink?: boolean; oneTimeUseToken?: string; realtimeStreamsVersion?: "v1" | "v2"; + triggerSource?: string; + triggerAction?: string; }; /** @@ -678,6 +680,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine { batchId: batch.id, batchIndex: currentIndex, realtimeStreamsVersion: options?.realtimeStreamsVersion, + triggerSource: parentRunId ? "sdk" : options?.triggerSource ?? "api", + triggerAction: options?.triggerAction ?? "trigger", }, "V2" ); diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 2cc849e78de..bbd1331e61a 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -6,6 +6,7 @@ import { import { Tracer } from "@opentelemetry/api"; import { tryCatch } from "@trigger.dev/core/utils"; import { + RunAnnotations, TaskRunError, taskRunErrorEnhancer, taskRunErrorToString, @@ -289,6 +290,17 @@ export class RunEngineTriggerTaskService { const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region); + // Build annotations for this run + const triggerSource = options.triggerSource ?? "api"; + const triggerAction = options.triggerAction ?? "trigger"; + const parentAnnotations = RunAnnotations.safeParse(parentRun?.annotations).data; + const annotations = { + triggerSource, + triggerAction, + rootTriggerSource: parentAnnotations?.rootTriggerSource ?? triggerSource, + rootScheduleId: parentAnnotations?.rootScheduleId || options.scheduleId || undefined, + }; + try { return await this.traceEventConcern.traceRun( triggerRequest, @@ -369,6 +381,7 @@ export class RunEngineTriggerTaskService { planType, realtimeStreamsVersion: options.realtimeStreamsVersion, debounce: body.options?.debounce, + annotations, // When debouncing with triggerAndWait, create a span for the debounced trigger onDebounced: body.options?.debounce && body.options?.resumeParentOnCompletion diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts index df0df255de2..19b135614a6 100644 --- a/apps/webapp/app/v3/runEngineHandlers.server.ts +++ b/apps/webapp/app/v3/runEngineHandlers.server.ts @@ -750,6 +750,8 @@ export function setupBatchQueueCallbacks() { batchIndex: itemIndex, realtimeStreamsVersion: meta.realtimeStreamsVersion, planType: meta.planType, + triggerSource: meta.parentRunId ? "sdk" : "api", + triggerAction: "trigger", }, "V2" ); diff --git a/apps/webapp/app/v3/scheduleEngine.server.ts b/apps/webapp/app/v3/scheduleEngine.server.ts index ef3cabe64df..cbec21a3bdb 100644 --- a/apps/webapp/app/v3/scheduleEngine.server.ts +++ b/apps/webapp/app/v3/scheduleEngine.server.ts @@ -106,6 +106,8 @@ function createScheduleEngine() { scheduleInstanceId, queueTimestamp: exactScheduleTime, overrideCreatedAt: exactScheduleTime, + triggerSource: "schedule", + triggerAction: "trigger", } ); diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index e4bc583b7cc..686635406a1 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -57,6 +57,8 @@ export type BatchTriggerTaskServiceOptions = { spanParentAsLink?: boolean; oneTimeUseToken?: string; realtimeStreamsVersion?: "v1" | "v2"; + triggerSource?: string; + triggerAction?: string; }; type RunItemData = { @@ -853,6 +855,10 @@ export class BatchTriggerV3Service extends BaseService { skipChecks: true, runFriendlyId: task.runId, realtimeStreamsVersion: options?.realtimeStreamsVersion, + triggerSource: task.item.options?.parentRunId + ? "sdk" + : options?.triggerSource ?? "api", + triggerAction: options?.triggerAction ?? "trigger", } ); diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index 4ca558bc2b7..156b68bff59 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -242,6 +242,7 @@ export class BulkActionService extends BaseService { const [error, result] = await tryCatch( replayService.call(run, { bulkActionId: bulkActionId, + triggerSource: "dashboard", }) ); if (error) { diff --git a/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts b/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts index dc2a8d11ded..e535fd7db32 100644 --- a/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts +++ b/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts @@ -27,7 +27,7 @@ export class PerformBulkActionService extends BaseService { switch (item.group.type) { case "REPLAY": { const service = new ReplayTaskRunService(this._prisma); - const result = await service.call(item.sourceRun); + const result = await service.call(item.sourceRun, { triggerSource: "dashboard" }); await this._prisma.bulkActionItem.update({ where: { id: item.id }, diff --git a/apps/webapp/app/v3/services/replayTaskRun.server.ts b/apps/webapp/app/v3/services/replayTaskRun.server.ts index a5018f51c57..836611b3610 100644 --- a/apps/webapp/app/v3/services/replayTaskRun.server.ts +++ b/apps/webapp/app/v3/services/replayTaskRun.server.ts @@ -18,6 +18,7 @@ type OverrideOptions = { payload?: string; metadata?: unknown; bulkActionId?: string; + triggerSource?: string; } & RunOptionsData; export class ReplayTaskRunService extends BaseService { @@ -123,6 +124,8 @@ export class ReplayTaskRunService extends BaseService { realtimeStreamsVersion: determineRealtimeStreamsVersion( existingTaskRun.realtimeStreamsVersion ), + triggerSource: overrideOptions.triggerSource ?? "api", + triggerAction: "replay", } ); diff --git a/apps/webapp/app/v3/services/testTask.server.ts b/apps/webapp/app/v3/services/testTask.server.ts index 0b64367f600..3ad2af22e51 100644 --- a/apps/webapp/app/v3/services/testTask.server.ts +++ b/apps/webapp/app/v3/services/testTask.server.ts @@ -11,28 +11,35 @@ export class TestTaskService extends BaseService { switch (triggerSource) { case "STANDARD": { - const result = await triggerTaskService.call(data.taskIdentifier, environment, { - payload: data.payload, - options: { - test: true, - metadata: data.metadata, - delay: data.delaySeconds ? new Date(Date.now() + data.delaySeconds * 1000) : undefined, - ttl: data.ttlSeconds, - idempotencyKey: data.idempotencyKey, - idempotencyKeyTTL: data.idempotencyKeyTTLSeconds - ? `${data.idempotencyKeyTTLSeconds}s` - : undefined, - queue: data.queue ? { name: data.queue } : undefined, - concurrencyKey: data.concurrencyKey, - maxAttempts: data.maxAttempts, - maxDuration: data.maxDurationSeconds, - tags: data.tags, - machine: data.machine, - region: data.region, - lockToVersion: data.version === "latest" ? undefined : data.version, - priority: data.prioritySeconds, + const result = await triggerTaskService.call( + data.taskIdentifier, + environment, + { + payload: data.payload, + options: { + test: true, + metadata: data.metadata, + delay: data.delaySeconds + ? new Date(Date.now() + data.delaySeconds * 1000) + : undefined, + ttl: data.ttlSeconds, + idempotencyKey: data.idempotencyKey, + idempotencyKeyTTL: data.idempotencyKeyTTLSeconds + ? `${data.idempotencyKeyTTLSeconds}s` + : undefined, + queue: data.queue ? { name: data.queue } : undefined, + concurrencyKey: data.concurrencyKey, + maxAttempts: data.maxAttempts, + maxDuration: data.maxDurationSeconds, + tags: data.tags, + machine: data.machine, + region: data.region, + lockToVersion: data.version === "latest" ? undefined : data.version, + priority: data.prioritySeconds, + }, }, - }); + { triggerSource: "dashboard", triggerAction: "test" } + ); return result?.run; } @@ -72,7 +79,7 @@ export class TestTaskService extends BaseService { priority: data.prioritySeconds, }, }, - { customIcon: "scheduled" } + { customIcon: "scheduled", triggerSource: "dashboard", triggerAction: "test" } ); return result?.run; diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 2ed34f0342c..000633fb73f 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -33,6 +33,8 @@ export type TriggerTaskServiceOptions = { replayedFromTaskRunFriendlyId?: string; planType?: string; realtimeStreamsVersion?: "v1" | "v2"; + triggerSource?: string; + triggerAction?: string; }; export class OutOfEntitlementError extends Error { diff --git a/internal-packages/database/prisma/migrations/20260323085243_add_task_run_annotations/migration.sql b/internal-packages/database/prisma/migrations/20260323085243_add_task_run_annotations/migration.sql new file mode 100644 index 00000000000..4a5abdc464c --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260323085243_add_task_run_annotations/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "public"."TaskRun" ADD COLUMN "annotations" JSONB; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 677a4ab4136..5ebc78508b9 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -837,6 +837,9 @@ model TaskRun { metadataType String @default("application/json") metadataVersion Int @default(1) + /// Structured annotations: triggerSource, triggerAction, rootTriggerSource, rootScheduleId + annotations Json? + /// Run output output String? outputType String @default("application/json") diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 6cf80bd46ca..e459dd85697 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -495,6 +495,7 @@ export class RunEngine { planType, realtimeStreamsVersion, debounce, + annotations, onDebounced, }: TriggerParams, tx?: PrismaClientOrTransaction @@ -668,6 +669,7 @@ export class RunEngine { createdAt: new Date(), } : undefined, + annotations, executionSnapshots: { create: { engine: "V2", diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 9af808754ce..f30a0fb3796 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -222,6 +222,12 @@ export type TriggerParams = { mode?: "leading" | "trailing"; maxDelay?: string; }; + annotations?: { + triggerSource: string; + triggerAction: string; + rootTriggerSource: string; + rootScheduleId?: string; + }; /** * Called when a run is debounced (existing delayed run found with triggerAndWait). * Return spanIdToComplete to enable span closing when the run completes. diff --git a/packages/cli-v3/src/apiClient.ts b/packages/cli-v3/src/apiClient.ts index 693b48d992e..a22d0ca6151 100644 --- a/packages/cli-v3/src/apiClient.ts +++ b/packages/cli-v3/src/apiClient.ts @@ -60,16 +60,19 @@ import { VERSION } from "./version.js"; export class CliApiClient { private engineURL: string; + private source: "cli" | "mcp"; constructor( public readonly apiURL: string, // TODO: consider making this required public readonly accessToken?: string, - public readonly branch?: string + public readonly branch?: string, + options?: { source?: "cli" | "mcp" } ) { this.apiURL = apiURL.replace(/\/$/, ""); this.engineURL = this.apiURL; this.branch = branch; + this.source = options?.source ?? "cli"; } async createAuthorizationCode() { @@ -819,6 +822,7 @@ export class CliApiClient { const headers: Record = { Authorization: `Bearer ${this.accessToken}`, "Content-Type": "application/json", + "x-trigger-source": this.source, }; if (this.branch) { diff --git a/packages/cli-v3/src/mcp/context.ts b/packages/cli-v3/src/mcp/context.ts index 886935db181..30a28857b89 100644 --- a/packages/cli-v3/src/mcp/context.ts +++ b/packages/cli-v3/src/mcp/context.ts @@ -109,7 +109,9 @@ export class McpContext { ); } - return new ApiClient(cliApiClient.apiURL, jwt.data.token); + return new ApiClient(cliApiClient.apiURL, jwt.data.token, undefined, { + additionalHeaders: { "x-trigger-source": "mcp" }, + }); } public async getCwd() { diff --git a/packages/core/src/v3/apiClient/core.ts b/packages/core/src/v3/apiClient/core.ts index 1f3dca03feb..5541bb2a9a1 100644 --- a/packages/core/src/v3/apiClient/core.ts +++ b/packages/core/src/v3/apiClient/core.ts @@ -40,7 +40,9 @@ export type ZodFetchOptions = { export type AnyZodFetchOptions = ZodFetchOptions; -export type ApiRequestOptions = Pick; +export type ApiRequestOptions = Pick & { + additionalHeaders?: Record; +}; type KeysEnum = { [P in keyof Required]: true }; @@ -49,6 +51,7 @@ type KeysEnum = { [P in keyof Required]: true }; // compiler such that any missing / extraneous keys will cause an error. const requestOptionsKeys: KeysEnum = { retry: true, + additionalHeaders: true, }; export const isRequestOptions = (obj: unknown): obj is ApiRequestOptions => { diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index bc94ff38fd8..77471907287 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -189,6 +189,7 @@ export class ApiClient { public readonly accessToken: string; public readonly previewBranch?: string; public readonly futureFlags: ApiClientFutureFlags; + private readonly additionalHeaders?: Record; private readonly defaultRequestOptions: ZodFetchOptions; constructor( @@ -201,7 +202,9 @@ export class ApiClient { this.accessToken = accessToken; this.baseUrl = baseUrl.replace(/\/$/, ""); this.previewBranch = previewBranch; - this.defaultRequestOptions = mergeRequestOptions(DEFAULT_ZOD_FETCH_OPTIONS, requestOptions); + const { additionalHeaders, ...restRequestOptions } = requestOptions; + this.additionalHeaders = additionalHeaders; + this.defaultRequestOptions = mergeRequestOptions(DEFAULT_ZOD_FETCH_OPTIONS, restRequestOptions); this.futureFlags = futureFlags; } @@ -1540,6 +1543,14 @@ export class ApiClient { ), }; + if (this.additionalHeaders) { + for (const [key, value] of Object.entries(this.additionalHeaders)) { + if (!(key in headers)) { + headers[key] = value; + } + } + } + if (this.previewBranch) { headers["x-trigger-branch"] = this.previewBranch; } diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index bb6623b3f3c..152005eb001 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -540,6 +540,25 @@ export const DeploymentTriggeredVia = z export type DeploymentTriggeredVia = z.infer; +export const TriggerSource = z + .enum(["sdk", "api", "dashboard", "cli", "mcp", "schedule"]) + .or(anyString); + +export type TriggerSource = z.infer; + +export const TriggerAction = z.enum(["trigger", "replay", "test"]).or(anyString); + +export type TriggerAction = z.infer; + +export const RunAnnotations = z.object({ + triggerSource: TriggerSource, + triggerAction: TriggerAction, + rootTriggerSource: TriggerSource, + rootScheduleId: z.string().optional(), +}); + +export type RunAnnotations = z.infer; + export const UpsertBranchRequestBody = z.object({ git: GitMeta.optional(), env: z.enum(["preview"]), From 7b9d2bb8b8247af89d5294beba9b9dfabecd20bc Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Mon, 23 Mar 2026 12:00:18 +0100 Subject: [PATCH 2/7] Add changeset --- .changeset/tame-oranges-change.md | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .changeset/tame-oranges-change.md diff --git a/.changeset/tame-oranges-change.md b/.changeset/tame-oranges-change.md new file mode 100644 index 00000000000..9755a41a26a --- /dev/null +++ b/.changeset/tame-oranges-change.md @@ -0,0 +1,8 @@ +--- +"@trigger.dev/redis-worker": patch +"@trigger.dev/sdk": patch +"trigger.dev": patch +"@trigger.dev/core": patch +--- + +Adapted the CLI API client to propagate the trigger source via http headers. From 4150bc5a58ea34d4ac858b5e7be09b25789b762a Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Mon, 23 Mar 2026 13:03:33 +0100 Subject: [PATCH 3/7] Fix trigger source for batch triggers --- apps/webapp/app/v3/services/batchTriggerV3.server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index 686635406a1..6ccb7c69b23 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -855,7 +855,7 @@ export class BatchTriggerV3Service extends BaseService { skipChecks: true, runFriendlyId: task.runId, realtimeStreamsVersion: options?.realtimeStreamsVersion, - triggerSource: task.item.options?.parentRunId + triggerSource: batch.dependentTaskAttemptId ? "sdk" : options?.triggerSource ?? "api", triggerAction: options?.triggerAction ?? "trigger", From 85d456e71315914cf8fd693e1547a15af0bb8a81 Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Mon, 23 Mar 2026 14:12:45 +0100 Subject: [PATCH 4/7] Add a couple tests --- .../src/engine/tests/trigger.test.ts | 199 ++++++++++++++++++ 1 file changed, 199 insertions(+) diff --git a/internal-packages/run-engine/src/engine/tests/trigger.test.ts b/internal-packages/run-engine/src/engine/tests/trigger.test.ts index 11200ab5cd7..a4f500e72aa 100644 --- a/internal-packages/run-engine/src/engine/tests/trigger.test.ts +++ b/internal-packages/run-engine/src/engine/tests/trigger.test.ts @@ -328,4 +328,203 @@ describe("RunEngine trigger()", () => { await engine.quit(); } }); + + containerTest("Annotations are stored on the run", async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x" as const, cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_ann1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + workerQueue: "main", + queue: `task/${taskIdentifier}`, + isTest: false, + tags: [], + annotations: { + triggerSource: "schedule", + triggerAction: "trigger", + rootTriggerSource: "schedule", + rootScheduleId: "sched_abc123", + }, + }, + prisma + ); + + const runFromDb = await prisma.taskRun.findUnique({ + where: { id: run.id }, + }); + + expect(runFromDb).toBeDefined(); + expect(runFromDb?.annotations).toEqual({ + triggerSource: "schedule", + triggerAction: "trigger", + rootTriggerSource: "schedule", + rootScheduleId: "sched_abc123", + }); + } finally { + await engine.quit(); + } + }); + + containerTest( + "Annotations propagation pattern (parent → child)", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const parentTask = "parent-task"; + const childTask = "child-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask, childTask]); + + // Trigger parent with schedule annotations + const parentRun = await engine.trigger( + { + number: 1, + friendlyId: "run_p1234ann", + environment: authenticatedEnvironment, + taskIdentifier: parentTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + workerQueue: "main", + queue: `task/${parentTask}`, + isTest: false, + tags: [], + annotations: { + triggerSource: "schedule", + triggerAction: "trigger", + rootTriggerSource: "schedule", + rootScheduleId: "sched_abc123", + }, + }, + prisma + ); + + // Trigger child — simulating what RunEngineTriggerTaskService builds: + // triggerSource is "sdk" (child triggered from within parent), + // but rootTriggerSource and rootScheduleId are propagated from parent + const childRun = await engine.trigger( + { + number: 2, + friendlyId: "run_c1234ann", + environment: authenticatedEnvironment, + taskIdentifier: childTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12346", + workerQueue: "main", + queue: `task/${childTask}`, + isTest: false, + tags: [], + parentTaskRunId: parentRun.id, + resumeParentOnCompletion: true, + annotations: { + triggerSource: "sdk", + triggerAction: "trigger", + rootTriggerSource: "schedule", + rootScheduleId: "sched_abc123", + }, + }, + prisma + ); + + const parentFromDb = await prisma.taskRun.findUnique({ + where: { id: parentRun.id }, + }); + const childFromDb = await prisma.taskRun.findUnique({ + where: { id: childRun.id }, + }); + + // Parent: schedule-triggered + expect(parentFromDb?.annotations).toEqual({ + triggerSource: "schedule", + triggerAction: "trigger", + rootTriggerSource: "schedule", + rootScheduleId: "sched_abc123", + }); + + // Child: sdk-triggered but root is still schedule + expect(childFromDb?.annotations).toEqual({ + triggerSource: "sdk", + triggerAction: "trigger", + rootTriggerSource: "schedule", + rootScheduleId: "sched_abc123", + }); + } finally { + await engine.quit(); + } + } + ); + }); From 032bfae916050ea985b125dc42a1f192b95c437b Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Mon, 23 Mar 2026 14:27:41 +0100 Subject: [PATCH 5/7] Detect sdk source more reliably --- apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts | 4 +--- apps/webapp/app/routes/api.v1.tasks.batch.ts | 2 +- apps/webapp/app/routes/api.v2.tasks.batch.ts | 2 +- apps/webapp/app/runEngine/services/batchTrigger.server.ts | 2 +- apps/webapp/app/v3/services/batchTriggerV3.server.ts | 4 +--- packages/core/src/v3/apiClient/index.ts | 4 ++++ 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts index 3d1cfd969d0..9bebab8c179 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts @@ -121,9 +121,7 @@ const { action, loader } = createActionApiRoute( realtimeStreamsVersion: determineRealtimeStreamsVersion( realtimeStreamsVersion ?? undefined ), - triggerSource: body.options?.parentRunId - ? "sdk" - : triggerSourceHeader ?? "api", + triggerSource: isFromWorker ? "sdk" : triggerSourceHeader ?? "api", triggerAction: "trigger", }, engineVersion ?? undefined diff --git a/apps/webapp/app/routes/api.v1.tasks.batch.ts b/apps/webapp/app/routes/api.v1.tasks.batch.ts index 99ab11cab22..77584c7eb8b 100644 --- a/apps/webapp/app/routes/api.v1.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v1.tasks.batch.ts @@ -114,7 +114,7 @@ const { action, loader } = createActionApiRoute( realtimeStreamsVersion: determineRealtimeStreamsVersion( realtimeStreamsVersion ?? undefined ), - triggerSource: triggerSourceHeader ?? undefined, + triggerSource: isFromWorker ? "sdk" : triggerSourceHeader ?? undefined, triggerAction: "trigger", }); diff --git a/apps/webapp/app/routes/api.v2.tasks.batch.ts b/apps/webapp/app/routes/api.v2.tasks.batch.ts index ed3fd46ed09..d64307d605d 100644 --- a/apps/webapp/app/routes/api.v2.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v2.tasks.batch.ts @@ -128,7 +128,7 @@ const { action, loader } = createActionApiRoute( realtimeStreamsVersion: determineRealtimeStreamsVersion( realtimeStreamsVersion ?? undefined ), - triggerSource: triggerSourceHeader ?? undefined, + triggerSource: isFromWorker ? "sdk" : triggerSourceHeader ?? undefined, triggerAction: "trigger", }); diff --git a/apps/webapp/app/runEngine/services/batchTrigger.server.ts b/apps/webapp/app/runEngine/services/batchTrigger.server.ts index c46880497b0..98aaaf60ae1 100644 --- a/apps/webapp/app/runEngine/services/batchTrigger.server.ts +++ b/apps/webapp/app/runEngine/services/batchTrigger.server.ts @@ -680,7 +680,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { batchId: batch.id, batchIndex: currentIndex, realtimeStreamsVersion: options?.realtimeStreamsVersion, - triggerSource: parentRunId ? "sdk" : options?.triggerSource ?? "api", + triggerSource: options?.triggerSource ?? "api", triggerAction: options?.triggerAction ?? "trigger", }, "V2" diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index 6ccb7c69b23..2ed8a38b3a1 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -855,9 +855,7 @@ export class BatchTriggerV3Service extends BaseService { skipChecks: true, runFriendlyId: task.runId, realtimeStreamsVersion: options?.realtimeStreamsVersion, - triggerSource: batch.dependentTaskAttemptId - ? "sdk" - : options?.triggerSource ?? "api", + triggerSource: options?.triggerSource ?? "api", triggerAction: options?.triggerAction ?? "trigger", } ); diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index 77471907287..6a0e4d04e05 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -1551,6 +1551,10 @@ export class ApiClient { } } + if (!headers["x-trigger-source"]) { + headers["x-trigger-source"] = "sdk"; + } + if (this.previewBranch) { headers["x-trigger-branch"] = this.previewBranch; } From 1fdc5e3e25e90bedc9d6af64522b330d75035b24 Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Mon, 23 Mar 2026 14:52:30 +0100 Subject: [PATCH 6/7] Sanitize trigger source --- .../webapp/app/routes/api.v1.tasks.$taskId.trigger.ts | 11 ++++++++++- apps/webapp/app/routes/api.v1.tasks.batch.ts | 4 ++-- apps/webapp/app/routes/api.v2.tasks.batch.ts | 4 ++-- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts index 9bebab8c179..dd39f185cf8 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts @@ -22,6 +22,15 @@ import { import { ServiceValidationError } from "~/v3/services/baseService.server"; import { OutOfEntitlementError, TriggerTaskService } from "~/v3/services/triggerTask.server"; +const ALLOWED_TRIGGER_SOURCES = new Set(["sdk", "cli", "mcp"]); + +export function sanitizeTriggerSource(value: string | null | undefined): string | undefined { + if (value && ALLOWED_TRIGGER_SOURCES.has(value)) { + return value; + } + return undefined; +} + const ParamsSchema = z.object({ taskId: z.string(), }); @@ -121,7 +130,7 @@ const { action, loader } = createActionApiRoute( realtimeStreamsVersion: determineRealtimeStreamsVersion( realtimeStreamsVersion ?? undefined ), - triggerSource: isFromWorker ? "sdk" : triggerSourceHeader ?? "api", + triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader) ?? "api", triggerAction: "trigger", }, engineVersion ?? undefined diff --git a/apps/webapp/app/routes/api.v1.tasks.batch.ts b/apps/webapp/app/routes/api.v1.tasks.batch.ts index 77584c7eb8b..9c21753ce5c 100644 --- a/apps/webapp/app/routes/api.v1.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v1.tasks.batch.ts @@ -15,7 +15,7 @@ import { BatchTriggerV3Service, } from "~/v3/services/batchTriggerV3.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; -import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; +import { HeadersSchema, sanitizeTriggerSource } from "./api.v1.tasks.$taskId.trigger"; import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server"; import { extractJwtSigningSecretKey } from "~/services/realtime/jwtAuth.server"; @@ -114,7 +114,7 @@ const { action, loader } = createActionApiRoute( realtimeStreamsVersion: determineRealtimeStreamsVersion( realtimeStreamsVersion ?? undefined ), - triggerSource: isFromWorker ? "sdk" : triggerSourceHeader ?? undefined, + triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader), triggerAction: "trigger", }); diff --git a/apps/webapp/app/routes/api.v2.tasks.batch.ts b/apps/webapp/app/routes/api.v2.tasks.batch.ts index d64307d605d..0453395e2b0 100644 --- a/apps/webapp/app/routes/api.v2.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v2.tasks.batch.ts @@ -17,7 +17,7 @@ import { import { ServiceValidationError } from "~/v3/services/baseService.server"; import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; -import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; +import { HeadersSchema, sanitizeTriggerSource } from "./api.v1.tasks.$taskId.trigger"; import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server"; import { extractJwtSigningSecretKey } from "~/services/realtime/jwtAuth.server"; @@ -128,7 +128,7 @@ const { action, loader } = createActionApiRoute( realtimeStreamsVersion: determineRealtimeStreamsVersion( realtimeStreamsVersion ?? undefined ), - triggerSource: isFromWorker ? "sdk" : triggerSourceHeader ?? undefined, + triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader), triggerAction: "trigger", }); From 1020f0af0e5410fef1bf7521793e69e9ed27f0fb Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Mon, 23 Mar 2026 15:17:48 +0100 Subject: [PATCH 7/7] Address devin review comments --- apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts | 6 +++++- apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts | 10 +--------- apps/webapp/app/routes/api.v1.tasks.batch.ts | 5 +++-- apps/webapp/app/routes/api.v2.tasks.batch.ts | 5 +++-- apps/webapp/app/routes/api.v3.batches.ts | 3 +++ .../app/runEngine/services/createBatch.server.ts | 2 ++ apps/webapp/app/utils/triggerSource.ts | 9 +++++++++ apps/webapp/app/v3/runEngineHandlers.server.ts | 2 +- internal-packages/run-engine/src/batch-queue/index.ts | 1 + internal-packages/run-engine/src/batch-queue/types.ts | 4 ++++ 10 files changed, 32 insertions(+), 15 deletions(-) create mode 100644 apps/webapp/app/utils/triggerSource.ts diff --git a/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts b/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts index 3c65cc963da..72ad202467d 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts @@ -5,6 +5,7 @@ import { prisma } from "~/db.server"; import { authenticateApiRequest } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { ReplayTaskRunService } from "~/v3/services/replayTaskRun.server"; +import { sanitizeTriggerSource } from "~/utils/triggerSource"; const ParamsSchema = z.object({ /* This is the run friendly ID */ @@ -41,8 +42,11 @@ export async function action({ request, params }: ActionFunctionArgs) { return json({ error: "Run not found" }, { status: 404 }); } + const triggerSource = + sanitizeTriggerSource(request.headers.get("x-trigger-source")) ?? "api"; + const service = new ReplayTaskRunService(); - const newRun = await service.call(taskRun, { triggerSource: "api" }); + const newRun = await service.call(taskRun, { triggerSource }); if (!newRun) { return json({ error: "Failed to create new run" }, { status: 400 }); diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts index dd39f185cf8..5811fc67709 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts @@ -19,18 +19,10 @@ import { handleRequestIdempotency, saveRequestIdempotency, } from "~/utils/requestIdempotency.server"; +import { sanitizeTriggerSource } from "~/utils/triggerSource"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { OutOfEntitlementError, TriggerTaskService } from "~/v3/services/triggerTask.server"; -const ALLOWED_TRIGGER_SOURCES = new Set(["sdk", "cli", "mcp"]); - -export function sanitizeTriggerSource(value: string | null | undefined): string | undefined { - if (value && ALLOWED_TRIGGER_SOURCES.has(value)) { - return value; - } - return undefined; -} - const ParamsSchema = z.object({ taskId: z.string(), }); diff --git a/apps/webapp/app/routes/api.v1.tasks.batch.ts b/apps/webapp/app/routes/api.v1.tasks.batch.ts index 9c21753ce5c..e6ada1a739c 100644 --- a/apps/webapp/app/routes/api.v1.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v1.tasks.batch.ts @@ -15,7 +15,8 @@ import { BatchTriggerV3Service, } from "~/v3/services/batchTriggerV3.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; -import { HeadersSchema, sanitizeTriggerSource } from "./api.v1.tasks.$taskId.trigger"; +import { sanitizeTriggerSource } from "~/utils/triggerSource"; +import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server"; import { extractJwtSigningSecretKey } from "~/services/realtime/jwtAuth.server"; @@ -114,7 +115,7 @@ const { action, loader } = createActionApiRoute( realtimeStreamsVersion: determineRealtimeStreamsVersion( realtimeStreamsVersion ?? undefined ), - triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader), + triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader) ?? "api", triggerAction: "trigger", }); diff --git a/apps/webapp/app/routes/api.v2.tasks.batch.ts b/apps/webapp/app/routes/api.v2.tasks.batch.ts index 0453395e2b0..8db98b4d343 100644 --- a/apps/webapp/app/routes/api.v2.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v2.tasks.batch.ts @@ -17,7 +17,8 @@ import { import { ServiceValidationError } from "~/v3/services/baseService.server"; import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; -import { HeadersSchema, sanitizeTriggerSource } from "./api.v1.tasks.$taskId.trigger"; +import { sanitizeTriggerSource } from "~/utils/triggerSource"; +import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server"; import { extractJwtSigningSecretKey } from "~/services/realtime/jwtAuth.server"; @@ -128,7 +129,7 @@ const { action, loader } = createActionApiRoute( realtimeStreamsVersion: determineRealtimeStreamsVersion( realtimeStreamsVersion ?? undefined ), - triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader), + triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader) ?? "api", triggerAction: "trigger", }); diff --git a/apps/webapp/app/routes/api.v3.batches.ts b/apps/webapp/app/routes/api.v3.batches.ts index 070e6d9f80c..5067eaef06e 100644 --- a/apps/webapp/app/routes/api.v3.batches.ts +++ b/apps/webapp/app/routes/api.v3.batches.ts @@ -13,6 +13,7 @@ import { } from "~/utils/requestIdempotency.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; +import { sanitizeTriggerSource } from "~/utils/triggerSource"; import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server"; import { extractJwtSigningSecretKey } from "~/services/realtime/jwtAuth.server"; @@ -65,6 +66,7 @@ const { action, loader } = createActionApiRoute( "x-trigger-worker": isFromWorker, "x-trigger-client": triggerClient, "x-trigger-realtime-streams-version": realtimeStreamsVersion, + "x-trigger-source": triggerSourceHeader, traceparent, tracestate, } = headers; @@ -132,6 +134,7 @@ const { action, loader } = createActionApiRoute( realtimeStreamsVersion: determineRealtimeStreamsVersion( realtimeStreamsVersion ?? undefined ), + triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader) ?? "api", }); const $responseHeaders = await responseHeaders( diff --git a/apps/webapp/app/runEngine/services/createBatch.server.ts b/apps/webapp/app/runEngine/services/createBatch.server.ts index a5d77ef349a..309a7700f1a 100644 --- a/apps/webapp/app/runEngine/services/createBatch.server.ts +++ b/apps/webapp/app/runEngine/services/createBatch.server.ts @@ -17,6 +17,7 @@ export type CreateBatchServiceOptions = { spanParentAsLink?: boolean; oneTimeUseToken?: string; realtimeStreamsVersion?: "v1" | "v2"; + triggerSource?: string; }; /** @@ -143,6 +144,7 @@ export class CreateBatchService extends WithRunEngine { idempotencyKey: body.idempotencyKey, processingConcurrency: config.processingConcurrency, planType, + triggerSource: options.triggerSource, }; await this._engine.initializeBatch(initOptions); diff --git a/apps/webapp/app/utils/triggerSource.ts b/apps/webapp/app/utils/triggerSource.ts new file mode 100644 index 00000000000..720ba4231ac --- /dev/null +++ b/apps/webapp/app/utils/triggerSource.ts @@ -0,0 +1,9 @@ +const ALLOWED_TRIGGER_SOURCES = new Set(["sdk", "cli", "mcp"]); + +/** Validates a client-provided trigger source header against the allowlist. */ +export function sanitizeTriggerSource(value: string | null | undefined): string | undefined { + if (value && ALLOWED_TRIGGER_SOURCES.has(value)) { + return value; + } + return undefined; +} diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts index 19b135614a6..411f91ff75d 100644 --- a/apps/webapp/app/v3/runEngineHandlers.server.ts +++ b/apps/webapp/app/v3/runEngineHandlers.server.ts @@ -750,7 +750,7 @@ export function setupBatchQueueCallbacks() { batchIndex: itemIndex, realtimeStreamsVersion: meta.realtimeStreamsVersion, planType: meta.planType, - triggerSource: meta.parentRunId ? "sdk" : "api", + triggerSource: meta.parentRunId ? "sdk" : meta.triggerSource ?? "api", triggerAction: "trigger", }, "V2" diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index 22401f76b6e..96a827b53ce 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -296,6 +296,7 @@ export class BatchQueue { realtimeStreamsVersion: options.realtimeStreamsVersion, idempotencyKey: options.idempotencyKey, processingConcurrency: options.processingConcurrency, + triggerSource: options.triggerSource, }; // Store metadata in completion tracker diff --git a/internal-packages/run-engine/src/batch-queue/types.ts b/internal-packages/run-engine/src/batch-queue/types.ts index aa02d84717a..3695faf120e 100644 --- a/internal-packages/run-engine/src/batch-queue/types.ts +++ b/internal-packages/run-engine/src/batch-queue/types.ts @@ -79,6 +79,8 @@ export const BatchMeta = z.object({ processingConcurrency: z.number().optional(), /** Plan type for billing (e.g., "free", "paid") - used when skipChecks is enabled */ planType: z.string().optional(), + /** Trigger source for run annotations (e.g., "sdk", "cli", "mcp") */ + triggerSource: z.string().optional(), }); export type BatchMeta = z.infer; @@ -168,6 +170,8 @@ export type InitializeBatchOptions = { processingConcurrency?: number; /** Plan type for billing (e.g., "free", "paid") - used when skipChecks is enabled */ planType?: string; + /** Trigger source for run annotations (e.g., "sdk", "cli", "mcp") */ + triggerSource?: string; }; /**