From bedf36658dd0d733554c264a8b7f78620db2fd11 Mon Sep 17 00:00:00 2001 From: Hendrik Liebau Date: Wed, 18 Feb 2026 17:25:26 -0800 Subject: [PATCH] [refactor] Replace runtime prefetch sentinel transform stream (#90160) Runtime prefetch responses previously encoded `isPartial` and `staleTime` using a sentinel-and-replace pattern: a random number was embedded in the RSC payload during prerendering, then a TransformStream scanned the prelude stream to find and replace it with the actual values. This was fragile because it relied on matching a specific byte sequence in a chunked stream that could be split at arbitrary boundaries. This replaces that mechanism with two simpler approaches: - **isPartial**: A single byte prepended to the response stream (`#` for complete, `~` for partial). - **staleTime**: An `AsyncIterable` in the Flight payload that yields updated stale time values as the prerender store changes. This ensures the latest value is always serialized in the stream, even when sync IO aborts the prerender before the abort callback runs. The stale time tracking logic is extracted into a new `stale-time.ts` module with `StaleTimeIterable`, `trackStaleTime`, and `finishStaleTimeTracking`. On the client, dynamic runtime prefetch responses (detected via the absence of `NEXT_IS_PRERENDER_HEADER`) have the byte stripped before Flight decoding, and the stale time is read by iterating the async iterable. Static responses served from cache are handled unchanged. --- .../client/components/segment-cache/cache.ts | 132 +++++++++-- .../next/src/server/app-render/app-render.tsx | 213 +++++------------- .../next/src/server/app-render/stale-time.ts | 104 +++++++++ .../next/src/shared/lib/app-router-types.ts | 4 +- 4 files changed, 270 insertions(+), 183 deletions(-) create mode 100644 packages/next/src/server/app-render/stale-time.ts diff --git a/packages/next/src/client/components/segment-cache/cache.ts b/packages/next/src/client/components/segment-cache/cache.ts index 384e006bd6129..85ed71855c431 100644 --- a/packages/next/src/client/components/segment-cache/cache.ts +++ b/packages/next/src/client/components/segment-cache/cache.ts @@ -14,6 +14,7 @@ import { import { NEXT_DID_POSTPONE_HEADER, NEXT_INSTANT_PREFETCH_HEADER, + NEXT_IS_PRERENDER_HEADER, NEXT_ROUTER_PREFETCH_HEADER, NEXT_ROUTER_SEGMENT_PREFETCH_HEADER, NEXT_ROUTER_STALE_TIME_HEADER, @@ -2012,13 +2013,28 @@ export async function fetchSegmentPrefetchesUsingDynamicRequest( // Track when the network connection closes. const closed = createPromiseWithResolvers() + let isResponsePartial = false + let responseBody = response.body + + // For dynamic runtime prefetches, strip the leading isPartial byte before + // passing the stream to Flight. Static responses (served from cache) don't + // have the byte. We detect them via NEXT_IS_PRERENDER_HEADER. + if ( + fetchStrategy === FetchStrategy.PPRRuntime && + !response.headers.get(NEXT_IS_PRERENDER_HEADER) + ) { + const stripped = await stripIsPartialByte(responseBody) + isResponsePartial = stripped.isPartial + responseBody = stripped.stream + } + let fulfilledEntries: Array | null = null const prefetchStream = createPrefetchResponseStream( - response.body, + responseBody, closed.resolve, function onResponseSizeUpdate(totalBytesReceivedSoFar) { // When processing a dynamic response, we don't know how large each - // individual segment is, so approximate by assiging each segment + // individual segment is, so approximate by assigning each segment // the average of the total response size. if (fulfilledEntries === null) { // Haven't received enough data yet to know which segments @@ -2038,13 +2054,6 @@ export async function fetchSegmentPrefetchesUsingDynamicRequest( { allowPartialStream: true } ) - const isResponsePartial = - fetchStrategy === FetchStrategy.PPRRuntime - ? // A runtime prefetch may have holes. - serverData.rp?.[0] === true - : // Full and LoadingBoundary prefetches cannot have holes. - // (even if we did set the prefetch header, we only use this codepath for non-PPR-enabled routes) - false // Read head vary params synchronously. Individual segments carry their // own thenables in CacheNodeSeedData. const headVaryParamsThenable = serverData.h @@ -2053,17 +2062,21 @@ export async function fetchSegmentPrefetchesUsingDynamicRequest( ? readVaryParams(headVaryParamsThenable) : null + const now = Date.now() + const staleAt = await getStaleAt(now, serverData, response) + // Aside from writing the data into the cache, this function also returns // the entries that were fulfilled, so we can streamingly update their sizes // in the LRU as more data comes in. fulfilledEntries = writeDynamicRenderResponseIntoCache( - Date.now(), + now, task, fetchStrategy, response as RSCResponse, serverData, isResponsePartial, headVaryParams, + staleAt, route, spawnedEntries ) @@ -2166,6 +2179,7 @@ function writeDynamicTreeResponseIntoCache( serverData, isResponsePartial, headVaryParams, + getStaleAtFromHeader(now, response), fulfilledEntry, null ) @@ -2197,6 +2211,7 @@ function writeDynamicRenderResponseIntoCache( serverData: NavigationFlightResponse, isResponsePartial: boolean, headVaryParams: VaryParams | null, + staleAt: number, route: FulfilledRouteCacheEntry, spawnedEntries: Map | null ): Array | null { @@ -2219,17 +2234,6 @@ function writeDynamicRenderResponseIntoCache( return null } - // For runtime prefetches, stale time is in the payload at rp[1]. - // For other responses, fall back to the header. - const staleTimeSeconds = - typeof serverData.rp?.[1] === 'number' - ? serverData.rp[1] - : parseInt(response.headers.get(NEXT_ROUTER_STALE_TIME_HEADER) ?? '', 10) - const staleTimeMs = !isNaN(staleTimeSeconds) - ? getStaleTimeMs(staleTimeSeconds) - : STATIC_STALETIME_MS - const staleAt = now + staleTimeMs - for (const flightData of flightDatas) { const seedData = flightData.seedData if (seedData !== null) { @@ -2590,3 +2594,89 @@ function addInstantPrefetchHeaderIfLocked( } } } + +function getStaleAtFromHeader( + now: number, + response: RSCResponse +): number { + const staleTimeSeconds = parseInt( + response.headers.get(NEXT_ROUTER_STALE_TIME_HEADER) ?? '', + 10 + ) + + const staleTimeMs = !isNaN(staleTimeSeconds) + ? getStaleTimeMs(staleTimeSeconds) + : STATIC_STALETIME_MS + + return now + staleTimeMs +} + +async function getStaleAt( + now: number, + serverData: NavigationFlightResponse, + response: RSCResponse +): Promise { + if (serverData.s !== undefined) { + // Iterate the async iterable and take the last yielded value. The server + // yields updated staleTime values during the render; the last one is the + // final staleTime. + let staleTimeSeconds: number | undefined + + // TODO: Buffer the response and then read the iterable values + // synchronously, similar to readVaryParams. This would avoid the need to + // make getStaleAt async, and we could also use it in + // writeDynamicTreeResponseIntoCache. This will also be needed when React + // starts leaving async iterables hanging when the outer RSC stream is + // aborted e.g. due to sync I/O (with unstable_allowPartialStream). + for await (const value of serverData.s) { + staleTimeSeconds = value + } + + if (staleTimeSeconds !== undefined) { + const staleTimeMs = isNaN(staleTimeSeconds) + ? STATIC_STALETIME_MS + : getStaleTimeMs(staleTimeSeconds) + + return now + staleTimeMs + } + } + + return getStaleAtFromHeader(now, response) +} + +/** + * Strips the leading isPartial byte from a runtime prefetch response stream. + * Returns the remaining stream and whether the response is partial. + */ +async function stripIsPartialByte( + stream: ReadableStream +): Promise<{ stream: ReadableStream; isPartial: boolean }> { + const reader = stream.getReader() + const { done, value } = await reader.read() + if (done || !value || value.byteLength === 0) { + return { + stream: new ReadableStream({ start: (c) => c.close() }), + isPartial: false, + } + } + const isPartial = value[0] === 0x7e // ASCII '~' + const remainder = value.byteLength > 1 ? value.subarray(1) : null + return { + isPartial, + stream: new ReadableStream({ + start(controller) { + if (remainder) { + controller.enqueue(remainder) + } + }, + async pull(controller) { + const result = await reader.read() + if (result.done) { + controller.close() + } else { + controller.enqueue(result.value) + } + }, + }), + } +} diff --git a/packages/next/src/server/app-render/app-render.tsx b/packages/next/src/server/app-render/app-render.tsx index e6967bda4bc1d..b758ad5df1a60 100644 --- a/packages/next/src/server/app-render/app-render.tsx +++ b/packages/next/src/server/app-render/app-render.tsx @@ -189,6 +189,12 @@ import { } from './vary-params' import { getTracedMetadata } from '../lib/trace/utils' import { InvariantError } from '../../shared/lib/invariant-error' +import { + StaleTimeIterable, + createSelectStaleTime, + trackStaleTime, + finishStaleTimeTracking, +} from './stale-time' import { HTML_CONTENT_TYPE_HEADER, INFINITE_CACHE } from '../../lib/constants' import { createComponentStylesAndScripts } from './create-component-styles-and-scripts' @@ -217,7 +223,6 @@ import { getDynamicParam, interpolateParallelRouteParams, } from '../../shared/lib/router/utils/get-dynamic-param' -import type { ExperimentalConfig } from '../config-shared' import type { Params } from '../request/params' import { ImageConfigContext } from '../../shared/lib/image-config-context.shared-runtime' import { imageConfigDefault } from '../../shared/lib/image-config' @@ -497,7 +502,7 @@ async function generateDynamicRSCPayload( options?: { actionResult?: ActionResult skipPageRendering?: boolean - runtimePrefetchSentinel?: number + staleTimeIterable?: AsyncIterable } ): Promise { // Flight data that is going to be passed to the browser. @@ -621,13 +626,10 @@ async function generateDynamicRSCPayload( h: getMetadataVaryParamsThenable(), }) - // For runtime prefetches, we encode the stale time and isPartial flag in the response body - // rather than relying on response headers. Both of these values will be transformed - // by a transform stream before being sent to the client. - if (options?.runtimePrefetchSentinel !== undefined) { + if (options?.staleTimeIterable !== undefined) { return { ...baseResponse, - rp: [options.runtimePrefetchSentinel] as any, + s: options.staleTimeIterable, } } @@ -1005,15 +1007,7 @@ async function generateRuntimePrefetchResult( ) const metadata: AppPageRenderResultMetadata = {} - - // Generate a random sentinel that will be used as a placeholder in the payload - // and later replaced by the transform stream - const runtimePrefetchSentinel = Math.floor( - Math.random() * Number.MAX_SAFE_INTEGER - ) - - const generatePayload = () => - generateDynamicRSCPayload(ctx, { runtimePrefetchSentinel }) + const staleTimeIterable = new StaleTimeIterable() const { componentMod: { @@ -1033,7 +1027,7 @@ async function generateRuntimePrefetchResult( await prospectiveRuntimeServerPrerender( ctx, - generatePayload, + generateDynamicRSCPayload.bind(null, ctx), prerenderResumeDataCache, renderResumeDataCache, rootParams, @@ -1044,7 +1038,7 @@ async function generateRuntimePrefetchResult( const response = await finalRuntimeServerPrerender( ctx, - generatePayload, + generateDynamicRSCPayload.bind(null, ctx, { staleTimeIterable }), prerenderResumeDataCache, renderResumeDataCache, rootParams, @@ -1052,7 +1046,7 @@ async function generateRuntimePrefetchResult( requestStore.cookies, requestStore.draftMode, onError, - runtimePrefetchSentinel + staleTimeIterable ) applyMetadataFromPrerenderResult(response, metadata, workStore) @@ -1202,116 +1196,24 @@ async function prospectiveRuntimeServerPrerender( return null } } + /** - * Updates the runtime prefetch metadata in the RSC payload as it streams: - * "rp":[] -> "rp":[,] - * - * We use a transform stream to do this to avoid needing to trigger an additional render. - * A random sentinel number guarantees no collision with user data. + * Prepends a single ASCII byte to the stream indicating whether the response + * is partial (contains dynamic holes): '~' (0x7e) for partial, '#' (0x23) + * for complete. */ -function createRuntimePrefetchTransformStream( - sentinel: number, - isPartial: boolean, - staleTime: number -): TransformStream { - const encoder = new TextEncoder() - - // Search for: [] - // Replace with: [,] - const search = encoder.encode(`[${sentinel}]`) - const first = search[0] - const replace = encoder.encode(`[${isPartial},${staleTime}]`) - const searchLen = search.length - - let currentChunk: Uint8Array | null = null - let found = false - - function processChunk( - controller: TransformStreamDefaultController, - nextChunk: null | Uint8Array - ) { - if (found) { - if (nextChunk) { - controller.enqueue(nextChunk) - } - return - } - - if (currentChunk) { - // We can't search past the index that can contain a full match - let exclusiveUpperBound = currentChunk.length - (searchLen - 1) - if (nextChunk) { - // If we have any overflow bytes we can search up to the chunk's final byte - exclusiveUpperBound += Math.min(nextChunk.length, searchLen - 1) - } - if (exclusiveUpperBound < 1) { - // we can't match the current chunk. - controller.enqueue(currentChunk) - currentChunk = nextChunk // advance so we don't process this chunk again - return - } - - let currentIndex = currentChunk.indexOf(first) - - // check the current candidate match if it is within the bounds of our search space for the currentChunk - candidateLoop: while ( - -1 < currentIndex && - currentIndex < exclusiveUpperBound - ) { - // We already know index 0 matches because we used indexOf to find the candidateIndex so we start at index 1 - let matchIndex = 1 - while (matchIndex < searchLen) { - const candidateIndex = currentIndex + matchIndex - const candidateValue = - candidateIndex < currentChunk.length - ? currentChunk[candidateIndex] - : // if we ever hit this condition it is because there is a nextChunk we can read from - nextChunk![candidateIndex - currentChunk.length] - if (candidateValue !== search[matchIndex]) { - // No match, reset and continue the search from the next position - currentIndex = currentChunk.indexOf(first, currentIndex + 1) - continue candidateLoop - } - matchIndex++ - } - // We found a complete match. currentIndex is our starting point to replace the value. - found = true - // enqueue everything up to the match - controller.enqueue(currentChunk.subarray(0, currentIndex)) - // enqueue the replacement value - controller.enqueue(replace) - // If there are bytes in the currentChunk after the match enqueue them - if (currentIndex + searchLen < currentChunk.length) { - controller.enqueue(currentChunk.slice(currentIndex + searchLen)) - } - // If we have a next chunk we enqueue it now - if (nextChunk) { - // if replacement spills over to the next chunk we first exclude the replaced bytes - const overflowBytes = currentIndex + searchLen - currentChunk.length - const truncatedChunk = - overflowBytes > 0 ? nextChunk!.subarray(overflowBytes) : nextChunk - controller.enqueue(truncatedChunk) - } - // We are now in found mode and don't need to track currentChunk anymore - currentChunk = null - return - } - // No match found in this chunk, emit it and wait for the next one - controller.enqueue(currentChunk) - } - - // Advance to the next chunk - currentChunk = nextChunk - } - - return new TransformStream({ - transform(chunk, controller) { - processChunk(controller, chunk) - }, - flush(controller) { - processChunk(controller, null) - }, - }) +function prependIsPartialByte( + stream: ReadableStream, + isPartial: boolean +): ReadableStream { + const byte = new Uint8Array([isPartial ? 0x7e : 0x23]) + return stream.pipeThrough( + new TransformStream({ + start(controller) { + controller.enqueue(byte) + }, + }) + ) } async function finalRuntimeServerPrerender( @@ -1324,7 +1226,7 @@ async function finalRuntimeServerPrerender( cookies: PrerenderStoreModernRuntime['cookies'], draftMode: PrerenderStoreModernRuntime['draftMode'], onError: (err: unknown) => string | undefined, - runtimePrefetchSentinel: number + staleTimeIterable: StaleTimeIterable ) { const { implicitTags, renderOpts } = ctx const { ComponentMod, experimental, isDebugDynamicAccesses } = renderOpts @@ -1370,6 +1272,8 @@ async function finalRuntimeServerPrerender( draftMode, } + trackStaleTime(finalServerPrerenderStore, staleTimeIterable, selectStaleTime) + const { clientModules } = getClientReferenceManifest() const finalRSCPayload = await workUnitAsyncStorage.run( @@ -1414,33 +1318,30 @@ async function finalRuntimeServerPrerender( finalStageController.advanceStage(RenderStage.Runtime) }, () => { - // Abort. - if (finalServerController.signal.aborted) { - // If the server controller is already aborted we must have called something - // that required aborting the prerender synchronously such as with new Date() - serverIsDynamic = true - return - } + finishStaleTimeTracking(staleTimeIterable).then(() => { + // Abort. This runs as a microtask after Flight has flushed the + // staleTime closing chunk, but before the next macrotask resolves the + // overall result. + if (finalServerController.signal.aborted) { + // If the server controller is already aborted we must have called + // something that required aborting the prerender synchronously such + // as with new Date() + serverIsDynamic = true + return + } - if (prerenderIsPending) { - // If prerenderIsPending then we have blocked for longer than a Task and we assume - // there is something unfinished. - serverIsDynamic = true - } - finalServerController.abort() + if (prerenderIsPending) { + // If prerenderIsPending then we have blocked for longer than a Task + // and we assume there is something unfinished. + serverIsDynamic = true + } + finalServerController.abort() + }) } ) - // Update the RSC payload stream to replace the sentinel with actual values. - // React has already serialized the payload with the sentinel, so we need to transform the stream. - const collectedStale = selectStaleTime(finalServerPrerenderStore.stale) - result.prelude = result.prelude.pipeThrough( - createRuntimePrefetchTransformStream( - runtimePrefetchSentinel, - serverIsDynamic, - collectedStale - ) - ) + // Prepend a byte indicating whether the response contains dynamic holes. + result.prelude = prependIsPartialByte(result.prelude, serverIsDynamic) return { result, @@ -1450,7 +1351,7 @@ async function finalRuntimeServerPrerender( isPartial: serverIsDynamic, collectedRevalidate: finalServerPrerenderStore.revalidate, collectedExpire: finalServerPrerenderStore.expire, - collectedStale, + collectedStale: staleTimeIterable.currentValue, collectedTags: finalServerPrerenderStore.tags, } } @@ -6048,14 +5949,6 @@ const getGlobalErrorStyles = async ( } } -function createSelectStaleTime(experimental: ExperimentalConfig) { - return (stale: number) => - stale === INFINITE_CACHE && - typeof experimental.staleTimes?.static === 'number' - ? experimental.staleTimes.static - : stale -} - async function collectSegmentData( fullPageDataBuffer: Buffer, prerenderStore: PrerenderStore, diff --git a/packages/next/src/server/app-render/stale-time.ts b/packages/next/src/server/app-render/stale-time.ts new file mode 100644 index 0000000000000..5f429d2bdbea7 --- /dev/null +++ b/packages/next/src/server/app-render/stale-time.ts @@ -0,0 +1,104 @@ +import type { ExperimentalConfig } from '../config-shared' +import { INFINITE_CACHE } from '../../lib/constants' + +/** + * An AsyncIterable that yields staleTime values. Each call to + * `update()` yields the new value. When `close()` is called, the iteration + * ends. + * + * This is included in the RSC payload so Flight serializes each yielded value + * into the stream immediately. If the prerender is aborted by sync IO, the last + * yielded value is already in the stream, allowing the prerender to be aborted + * synchronously. + */ +export class StaleTimeIterable { + private _resolve: ((result: IteratorResult) => void) | null = null + private _done = false + + /** The last value passed to `update()`. */ + public currentValue: number = 0 + + update(value: number): void { + if (this._done) return + this.currentValue = value + if (this._resolve) { + this._resolve({ value, done: false }) + this._resolve = null + } + } + + close(): void { + if (this._done) return + this._done = true + if (this._resolve) { + this._resolve({ value: undefined, done: true }) + this._resolve = null + } + } + + [Symbol.asyncIterator](): AsyncIterator { + return { + next: () => { + if (this._done) { + return Promise.resolve({ value: undefined, done: true }) + } + return new Promise>((resolve) => { + this._resolve = resolve + }) + }, + } + } +} + +export function createSelectStaleTime(experimental: ExperimentalConfig) { + return (stale: number) => + stale === INFINITE_CACHE && + typeof experimental.staleTimes?.static === 'number' + ? experimental.staleTimes.static + : stale +} + +/** + * Intercepts writes to the `stale` field on the prerender store and yields + * each update (after applying selectStaleTime) through the iterable. This + * ensures the latest stale time is always serialized in the Flight stream, + * even if the prerender is aborted by sync IO. + */ +export function trackStaleTime( + store: { stale: number }, + iterable: StaleTimeIterable, + selectStaleTime: (stale: number) => number +): void { + let _stale = store.stale + iterable.update(selectStaleTime(_stale)) + Object.defineProperty(store, 'stale', { + get: () => _stale, + set: (value: number) => { + _stale = value + iterable.update(selectStaleTime(value)) + }, + configurable: true, + enumerable: true, + }) +} + +/** + * Closes the stale time iterable and waits for React to flush the closing + * chunk into the Flight stream. This also allows the prerender to complete if + * no other work is pending. + * + * Flight's internal work gets scheduled as a microtask when we close the + * iterable. We need to ensure Flight's pending queues are emptied before this + * function returns, because the caller will abort the prerender immediately + * after. We can't use a macrotask (that would allow dynamic IO to sneak into + * the response), so we use microtasks instead. The exact number of awaits + * isn't important as long as we wait enough ticks for Flight to finish writing. + */ +export async function finishStaleTimeTracking( + iterable: StaleTimeIterable +): Promise { + iterable.close() + await Promise.resolve() + await Promise.resolve() + await Promise.resolve() +} diff --git a/packages/next/src/shared/lib/app-router-types.ts b/packages/next/src/shared/lib/app-router-types.ts index d46fee58a72a3..d6930c784459e 100644 --- a/packages/next/src/shared/lib/app-router-types.ts +++ b/packages/next/src/shared/lib/app-router-types.ts @@ -276,8 +276,8 @@ export type NavigationFlightResponse = { q: string /** couldBeIntercepted */ i: boolean - /** runtimePrefetch - [isPartial, staleTime]. Only present in runtime prefetch responses. */ - rp?: [boolean, number] + /** staleTime - Only present in dynamic runtime prefetch responses. */ + s?: AsyncIterable /** headVaryParams */ h: VaryParamsThenable | null }