Skip to content

Commit e801280

Browse files
committed
Add support for toUIMessageStream() options
1 parent 4f22fdc commit e801280

File tree

3 files changed

+128
-13
lines changed

3 files changed

+128
-13
lines changed

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
type TaskSchema,
1515
type TaskWithSchema,
1616
} from "@trigger.dev/core/v3";
17-
import type { ModelMessage, UIMessage, UIMessageChunk } from "ai";
17+
import type { ModelMessage, UIMessage, UIMessageChunk, UIMessageStreamOptions } from "ai";
1818
import type { StreamWriteResult } from "@trigger.dev/core/v3";
1919
import { convertToModelMessages, dynamicTool, generateId as generateMessageId, jsonSchema, JSONSchema7, Schema, Tool, ToolCallOptions, zodSchema } from "ai";
2020
import { type Attributes, trace } from "@opentelemetry/api";
@@ -399,6 +399,10 @@ const chatDeferKey = locals.create<Set<Promise<unknown>>>("chat.defer");
399399
*/
400400
const chatPipeCountKey = locals.create<number>("chat.pipeCount");
401401
const chatStopControllerKey = locals.create<AbortController>("chat.stopController");
402+
/** Static (task-level) UIMessageStream options, set once during chatTask setup. @internal */
403+
const chatUIStreamStaticKey = locals.create<ChatUIMessageStreamOptions>("chat.uiMessageStreamOptions.static");
404+
/** Per-turn UIMessageStream options, set via chat.setUIMessageStreamOptions(). @internal */
405+
const chatUIStreamPerTurnKey = locals.create<ChatUIMessageStreamOptions>("chat.uiMessageStreamOptions.perTurn");
402406

403407
/**
404408
* Options for `pipeChat`.
@@ -423,6 +427,23 @@ export type PipeChatOptions = {
423427
spanName?: string;
424428
};
425429

430+
/**
431+
* Options for customizing the `toUIMessageStream()` call used when piping
432+
* `streamText` results to the frontend.
433+
*
434+
* Set static defaults via `uiMessageStreamOptions` on `chat.task()`, or
435+
* override per-turn via `chat.setUIMessageStreamOptions()`.
436+
*
437+
* `onFinish`, `originalMessages`, and `generateMessageId` are omitted because
438+
* they are managed internally for response capture and message accumulation.
439+
* Use `streamText`'s `onFinish` for custom finish handling, or drop down to
440+
* raw task mode with `chat.pipe()` for full control.
441+
*/
442+
export type ChatUIMessageStreamOptions = Omit<
443+
UIMessageStreamOptions<UIMessage>,
444+
"onFinish" | "originalMessages" | "generateMessageId"
445+
>;
446+
426447
/**
427448
* An object with a `toUIMessageStream()` method (e.g. `StreamTextResult` from `streamText()`).
428449
*/
@@ -803,6 +824,35 @@ export type ChatTaskOptions<
803824
* @default Same as `turnTimeout`
804825
*/
805826
preloadTimeout?: string;
827+
828+
/**
829+
* Default options for `toUIMessageStream()` when auto-piping or using
830+
* `turn.complete()` / `chat.pipeAndCapture()`.
831+
*
832+
* Controls how the `StreamTextResult` is converted to a `UIMessageChunk`
833+
* stream — error handling, reasoning/source visibility, metadata, etc.
834+
*
835+
* Can be overridden per-turn by calling `chat.setUIMessageStreamOptions()`
836+
* inside `run()` or lifecycle hooks. Per-turn values are merged on top
837+
* of these defaults (per-turn wins on conflicts).
838+
*
839+
* `onFinish`, `originalMessages`, and `generateMessageId` are managed
840+
* internally and cannot be overridden here. Use `streamText`'s `onFinish`
841+
* for custom finish handling, or drop to raw task mode for full control.
842+
*
843+
* @example
844+
* ```ts
845+
* chat.task({
846+
* id: "my-chat",
847+
* uiMessageStreamOptions: {
848+
* sendReasoning: true,
849+
* onError: (error) => error instanceof Error ? error.message : "An error occurred.",
850+
* },
851+
* run: async ({ messages, signal }) => { ... },
852+
* });
853+
* ```
854+
*/
855+
uiMessageStreamOptions?: ChatUIMessageStreamOptions;
806856
};
807857

808858
/**
@@ -851,6 +901,7 @@ function chatTask<
851901
chatAccessTokenTTL = "1h",
852902
preloadWarmTimeoutInSeconds,
853903
preloadTimeout,
904+
uiMessageStreamOptions,
854905
...restOptions
855906
} = options;
856907

@@ -867,6 +918,11 @@ function chatTask<
867918
activeSpan.setAttribute("gen_ai.conversation.id", payload.chatId);
868919
}
869920

921+
// Store static UIMessageStream options in locals so resolveUIMessageStreamOptions() can read them
922+
if (uiMessageStreamOptions) {
923+
locals.set(chatUIStreamStaticKey, uiMessageStreamOptions);
924+
}
925+
870926
let currentWirePayload = payload;
871927
const continuation = payload.continuation ?? false;
872928
const previousRunId = payload.previousRunId;
@@ -1192,6 +1248,7 @@ function chatTask<
11921248
if ((locals.get(chatPipeCountKey) ?? 0) === 0 && isUIMessageStreamable(result)) {
11931249
onFinishAttached = true;
11941250
const uiStream = result.toUIMessageStream({
1251+
...resolveUIMessageStreamOptions(),
11951252
onFinish: ({ responseMessage }: { responseMessage: UIMessage }) => {
11961253
capturedResponseMessage = responseMessage;
11971254
resolveOnFinish!();
@@ -1447,6 +1504,48 @@ function setWarmTimeoutInSeconds(seconds: number): void {
14471504
metadata.set(WARM_TIMEOUT_METADATA_KEY, seconds);
14481505
}
14491506

1507+
/**
1508+
* Override the `toUIMessageStream()` options for the current turn.
1509+
*
1510+
* These options control how the `StreamTextResult` is converted to a
1511+
* `UIMessageChunk` stream — error handling, reasoning/source visibility,
1512+
* message metadata, etc.
1513+
*
1514+
* Per-turn options are merged on top of the static `uiMessageStreamOptions`
1515+
* set on `chat.task()`. Per-turn values win on conflicts.
1516+
*
1517+
* @example
1518+
* ```ts
1519+
* run: async ({ messages, signal }) => {
1520+
* chat.setUIMessageStreamOptions({
1521+
* sendReasoning: true,
1522+
* onError: (error) => error instanceof Error ? error.message : "An error occurred.",
1523+
* });
1524+
* return streamText({ model, messages, abortSignal: signal });
1525+
* }
1526+
* ```
1527+
*/
1528+
function setUIMessageStreamOptions(options: ChatUIMessageStreamOptions): void {
1529+
locals.set(chatUIStreamPerTurnKey, options);
1530+
}
1531+
1532+
/**
1533+
* Resolve the effective UIMessageStream options by merging:
1534+
* 1. Static task-level options (from `chat.task({ uiMessageStreamOptions })`)
1535+
* 2. Per-turn overrides (from `chat.setUIMessageStreamOptions()`)
1536+
*
1537+
* Per-turn values win on conflicts. Clears the per-turn override after reading
1538+
* so it doesn't leak into subsequent turns.
1539+
* @internal
1540+
*/
1541+
function resolveUIMessageStreamOptions(): ChatUIMessageStreamOptions {
1542+
const staticOptions = locals.get(chatUIStreamStaticKey) ?? {};
1543+
const perTurnOptions = locals.get(chatUIStreamPerTurnKey) ?? {};
1544+
// Clear per-turn override so it doesn't leak into subsequent turns
1545+
locals.set(chatUIStreamPerTurnKey, undefined);
1546+
return { ...staticOptions, ...perTurnOptions };
1547+
}
1548+
14501549
// ---------------------------------------------------------------------------
14511550
// Stop detection
14521551
// ---------------------------------------------------------------------------
@@ -1641,6 +1740,7 @@ async function pipeChatAndCapture(
16411740
const onFinishPromise = new Promise<void>((r) => { resolveOnFinish = r; });
16421741

16431742
const uiStream = source.toUIMessageStream({
1743+
...resolveUIMessageStreamOptions(),
16441744
onFinish: ({ responseMessage }: { responseMessage: UIMessage }) => {
16451745
captured = responseMessage;
16461746
resolveOnFinish!();
@@ -2180,6 +2280,8 @@ export const chat = {
21802280
setTurnTimeoutInSeconds,
21812281
/** Override the warm timeout at runtime. See {@link setWarmTimeoutInSeconds}. */
21822282
setWarmTimeoutInSeconds,
2283+
/** Override toUIMessageStream() options for the current turn. See {@link setUIMessageStreamOptions}. */
2284+
setUIMessageStreamOptions,
21832285
/** Check if the current turn was stopped by the user. See {@link isStopped}. */
21842286
isStopped,
21852287
/** Clean up aborted parts from a UIMessage. See {@link cleanupAbortedParts}. */

pnpm-lock.yaml

Lines changed: 12 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

references/ai-chat/src/trigger/chat.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { chat, ai, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
2-
import { schemaTask, task } from "@trigger.dev/sdk";
2+
import { logger, schemaTask, task } from "@trigger.dev/sdk";
33
import { streamText, tool, dynamicTool, stepCountIs, generateId } from "ai";
44
import type { LanguageModel, Tool as AITool, UIMessage } from "ai";
55
import { openai } from "@ai-sdk/openai";
@@ -231,6 +231,18 @@ export const aiChat = chat.task({
231231
clientDataSchema: z.object({ model: z.string().optional(), userId: z.string() }),
232232
warmTimeoutInSeconds: 60,
233233
chatAccessTokenTTL: "2h",
234+
uiMessageStreamOptions: {
235+
sendReasoning: true,
236+
onError: (error) => {
237+
// Log the full error server-side for debugging
238+
logger.error("Stream error", { error });
239+
// Return a sanitized message — this is what the frontend sees
240+
if (error instanceof Error && error.message.includes("rate limit")) {
241+
return "Rate limited — please wait a moment and try again.";
242+
}
243+
return "Something went wrong. Please try again.";
244+
},
245+
},
234246
onPreload: async ({ chatId, runId, chatAccessToken, clientData }) => {
235247
if (!clientData) return;
236248
// Eagerly initialize before the user's first message arrives

0 commit comments

Comments
 (0)