diff --git a/packages/next/src/client/components/segment-cache/cache.ts b/packages/next/src/client/components/segment-cache/cache.ts index 384e006bd6129..85ed71855c431 100644 --- a/packages/next/src/client/components/segment-cache/cache.ts +++ b/packages/next/src/client/components/segment-cache/cache.ts @@ -14,6 +14,7 @@ import { import { NEXT_DID_POSTPONE_HEADER, NEXT_INSTANT_PREFETCH_HEADER, + NEXT_IS_PRERENDER_HEADER, NEXT_ROUTER_PREFETCH_HEADER, NEXT_ROUTER_SEGMENT_PREFETCH_HEADER, NEXT_ROUTER_STALE_TIME_HEADER, @@ -2012,13 +2013,28 @@ export async function fetchSegmentPrefetchesUsingDynamicRequest( // Track when the network connection closes. const closed = createPromiseWithResolvers() + let isResponsePartial = false + let responseBody = response.body + + // For dynamic runtime prefetches, strip the leading isPartial byte before + // passing the stream to Flight. Static responses (served from cache) don't + // have the byte. We detect them via NEXT_IS_PRERENDER_HEADER. + if ( + fetchStrategy === FetchStrategy.PPRRuntime && + !response.headers.get(NEXT_IS_PRERENDER_HEADER) + ) { + const stripped = await stripIsPartialByte(responseBody) + isResponsePartial = stripped.isPartial + responseBody = stripped.stream + } + let fulfilledEntries: Array | null = null const prefetchStream = createPrefetchResponseStream( - response.body, + responseBody, closed.resolve, function onResponseSizeUpdate(totalBytesReceivedSoFar) { // When processing a dynamic response, we don't know how large each - // individual segment is, so approximate by assiging each segment + // individual segment is, so approximate by assigning each segment // the average of the total response size. if (fulfilledEntries === null) { // Haven't received enough data yet to know which segments @@ -2038,13 +2054,6 @@ export async function fetchSegmentPrefetchesUsingDynamicRequest( { allowPartialStream: true } ) - const isResponsePartial = - fetchStrategy === FetchStrategy.PPRRuntime - ? // A runtime prefetch may have holes. - serverData.rp?.[0] === true - : // Full and LoadingBoundary prefetches cannot have holes. - // (even if we did set the prefetch header, we only use this codepath for non-PPR-enabled routes) - false // Read head vary params synchronously. Individual segments carry their // own thenables in CacheNodeSeedData. const headVaryParamsThenable = serverData.h @@ -2053,17 +2062,21 @@ export async function fetchSegmentPrefetchesUsingDynamicRequest( ? readVaryParams(headVaryParamsThenable) : null + const now = Date.now() + const staleAt = await getStaleAt(now, serverData, response) + // Aside from writing the data into the cache, this function also returns // the entries that were fulfilled, so we can streamingly update their sizes // in the LRU as more data comes in. fulfilledEntries = writeDynamicRenderResponseIntoCache( - Date.now(), + now, task, fetchStrategy, response as RSCResponse, serverData, isResponsePartial, headVaryParams, + staleAt, route, spawnedEntries ) @@ -2166,6 +2179,7 @@ function writeDynamicTreeResponseIntoCache( serverData, isResponsePartial, headVaryParams, + getStaleAtFromHeader(now, response), fulfilledEntry, null ) @@ -2197,6 +2211,7 @@ function writeDynamicRenderResponseIntoCache( serverData: NavigationFlightResponse, isResponsePartial: boolean, headVaryParams: VaryParams | null, + staleAt: number, route: FulfilledRouteCacheEntry, spawnedEntries: Map | null ): Array | null { @@ -2219,17 +2234,6 @@ function writeDynamicRenderResponseIntoCache( return null } - // For runtime prefetches, stale time is in the payload at rp[1]. - // For other responses, fall back to the header. - const staleTimeSeconds = - typeof serverData.rp?.[1] === 'number' - ? serverData.rp[1] - : parseInt(response.headers.get(NEXT_ROUTER_STALE_TIME_HEADER) ?? '', 10) - const staleTimeMs = !isNaN(staleTimeSeconds) - ? getStaleTimeMs(staleTimeSeconds) - : STATIC_STALETIME_MS - const staleAt = now + staleTimeMs - for (const flightData of flightDatas) { const seedData = flightData.seedData if (seedData !== null) { @@ -2590,3 +2594,89 @@ function addInstantPrefetchHeaderIfLocked( } } } + +function getStaleAtFromHeader( + now: number, + response: RSCResponse +): number { + const staleTimeSeconds = parseInt( + response.headers.get(NEXT_ROUTER_STALE_TIME_HEADER) ?? '', + 10 + ) + + const staleTimeMs = !isNaN(staleTimeSeconds) + ? getStaleTimeMs(staleTimeSeconds) + : STATIC_STALETIME_MS + + return now + staleTimeMs +} + +async function getStaleAt( + now: number, + serverData: NavigationFlightResponse, + response: RSCResponse +): Promise { + if (serverData.s !== undefined) { + // Iterate the async iterable and take the last yielded value. The server + // yields updated staleTime values during the render; the last one is the + // final staleTime. + let staleTimeSeconds: number | undefined + + // TODO: Buffer the response and then read the iterable values + // synchronously, similar to readVaryParams. This would avoid the need to + // make getStaleAt async, and we could also use it in + // writeDynamicTreeResponseIntoCache. This will also be needed when React + // starts leaving async iterables hanging when the outer RSC stream is + // aborted e.g. due to sync I/O (with unstable_allowPartialStream). + for await (const value of serverData.s) { + staleTimeSeconds = value + } + + if (staleTimeSeconds !== undefined) { + const staleTimeMs = isNaN(staleTimeSeconds) + ? STATIC_STALETIME_MS + : getStaleTimeMs(staleTimeSeconds) + + return now + staleTimeMs + } + } + + return getStaleAtFromHeader(now, response) +} + +/** + * Strips the leading isPartial byte from a runtime prefetch response stream. + * Returns the remaining stream and whether the response is partial. + */ +async function stripIsPartialByte( + stream: ReadableStream +): Promise<{ stream: ReadableStream; isPartial: boolean }> { + const reader = stream.getReader() + const { done, value } = await reader.read() + if (done || !value || value.byteLength === 0) { + return { + stream: new ReadableStream({ start: (c) => c.close() }), + isPartial: false, + } + } + const isPartial = value[0] === 0x7e // ASCII '~' + const remainder = value.byteLength > 1 ? value.subarray(1) : null + return { + isPartial, + stream: new ReadableStream({ + start(controller) { + if (remainder) { + controller.enqueue(remainder) + } + }, + async pull(controller) { + const result = await reader.read() + if (result.done) { + controller.close() + } else { + controller.enqueue(result.value) + } + }, + }), + } +} diff --git a/packages/next/src/server/app-render/app-render.tsx b/packages/next/src/server/app-render/app-render.tsx index e6967bda4bc1d..b758ad5df1a60 100644 --- a/packages/next/src/server/app-render/app-render.tsx +++ b/packages/next/src/server/app-render/app-render.tsx @@ -189,6 +189,12 @@ import { } from './vary-params' import { getTracedMetadata } from '../lib/trace/utils' import { InvariantError } from '../../shared/lib/invariant-error' +import { + StaleTimeIterable, + createSelectStaleTime, + trackStaleTime, + finishStaleTimeTracking, +} from './stale-time' import { HTML_CONTENT_TYPE_HEADER, INFINITE_CACHE } from '../../lib/constants' import { createComponentStylesAndScripts } from './create-component-styles-and-scripts' @@ -217,7 +223,6 @@ import { getDynamicParam, interpolateParallelRouteParams, } from '../../shared/lib/router/utils/get-dynamic-param' -import type { ExperimentalConfig } from '../config-shared' import type { Params } from '../request/params' import { ImageConfigContext } from '../../shared/lib/image-config-context.shared-runtime' import { imageConfigDefault } from '../../shared/lib/image-config' @@ -497,7 +502,7 @@ async function generateDynamicRSCPayload( options?: { actionResult?: ActionResult skipPageRendering?: boolean - runtimePrefetchSentinel?: number + staleTimeIterable?: AsyncIterable } ): Promise { // Flight data that is going to be passed to the browser. @@ -621,13 +626,10 @@ async function generateDynamicRSCPayload( h: getMetadataVaryParamsThenable(), }) - // For runtime prefetches, we encode the stale time and isPartial flag in the response body - // rather than relying on response headers. Both of these values will be transformed - // by a transform stream before being sent to the client. - if (options?.runtimePrefetchSentinel !== undefined) { + if (options?.staleTimeIterable !== undefined) { return { ...baseResponse, - rp: [options.runtimePrefetchSentinel] as any, + s: options.staleTimeIterable, } } @@ -1005,15 +1007,7 @@ async function generateRuntimePrefetchResult( ) const metadata: AppPageRenderResultMetadata = {} - - // Generate a random sentinel that will be used as a placeholder in the payload - // and later replaced by the transform stream - const runtimePrefetchSentinel = Math.floor( - Math.random() * Number.MAX_SAFE_INTEGER - ) - - const generatePayload = () => - generateDynamicRSCPayload(ctx, { runtimePrefetchSentinel }) + const staleTimeIterable = new StaleTimeIterable() const { componentMod: { @@ -1033,7 +1027,7 @@ async function generateRuntimePrefetchResult( await prospectiveRuntimeServerPrerender( ctx, - generatePayload, + generateDynamicRSCPayload.bind(null, ctx), prerenderResumeDataCache, renderResumeDataCache, rootParams, @@ -1044,7 +1038,7 @@ async function generateRuntimePrefetchResult( const response = await finalRuntimeServerPrerender( ctx, - generatePayload, + generateDynamicRSCPayload.bind(null, ctx, { staleTimeIterable }), prerenderResumeDataCache, renderResumeDataCache, rootParams, @@ -1052,7 +1046,7 @@ async function generateRuntimePrefetchResult( requestStore.cookies, requestStore.draftMode, onError, - runtimePrefetchSentinel + staleTimeIterable ) applyMetadataFromPrerenderResult(response, metadata, workStore) @@ -1202,116 +1196,24 @@ async function prospectiveRuntimeServerPrerender( return null } } + /** - * Updates the runtime prefetch metadata in the RSC payload as it streams: - * "rp":[] -> "rp":[,] - * - * We use a transform stream to do this to avoid needing to trigger an additional render. - * A random sentinel number guarantees no collision with user data. + * Prepends a single ASCII byte to the stream indicating whether the response + * is partial (contains dynamic holes): '~' (0x7e) for partial, '#' (0x23) + * for complete. */ -function createRuntimePrefetchTransformStream( - sentinel: number, - isPartial: boolean, - staleTime: number -): TransformStream { - const encoder = new TextEncoder() - - // Search for: [] - // Replace with: [,] - const search = encoder.encode(`[${sentinel}]`) - const first = search[0] - const replace = encoder.encode(`[${isPartial},${staleTime}]`) - const searchLen = search.length - - let currentChunk: Uint8Array | null = null - let found = false - - function processChunk( - controller: TransformStreamDefaultController, - nextChunk: null | Uint8Array - ) { - if (found) { - if (nextChunk) { - controller.enqueue(nextChunk) - } - return - } - - if (currentChunk) { - // We can't search past the index that can contain a full match - let exclusiveUpperBound = currentChunk.length - (searchLen - 1) - if (nextChunk) { - // If we have any overflow bytes we can search up to the chunk's final byte - exclusiveUpperBound += Math.min(nextChunk.length, searchLen - 1) - } - if (exclusiveUpperBound < 1) { - // we can't match the current chunk. - controller.enqueue(currentChunk) - currentChunk = nextChunk // advance so we don't process this chunk again - return - } - - let currentIndex = currentChunk.indexOf(first) - - // check the current candidate match if it is within the bounds of our search space for the currentChunk - candidateLoop: while ( - -1 < currentIndex && - currentIndex < exclusiveUpperBound - ) { - // We already know index 0 matches because we used indexOf to find the candidateIndex so we start at index 1 - let matchIndex = 1 - while (matchIndex < searchLen) { - const candidateIndex = currentIndex + matchIndex - const candidateValue = - candidateIndex < currentChunk.length - ? currentChunk[candidateIndex] - : // if we ever hit this condition it is because there is a nextChunk we can read from - nextChunk![candidateIndex - currentChunk.length] - if (candidateValue !== search[matchIndex]) { - // No match, reset and continue the search from the next position - currentIndex = currentChunk.indexOf(first, currentIndex + 1) - continue candidateLoop - } - matchIndex++ - } - // We found a complete match. currentIndex is our starting point to replace the value. - found = true - // enqueue everything up to the match - controller.enqueue(currentChunk.subarray(0, currentIndex)) - // enqueue the replacement value - controller.enqueue(replace) - // If there are bytes in the currentChunk after the match enqueue them - if (currentIndex + searchLen < currentChunk.length) { - controller.enqueue(currentChunk.slice(currentIndex + searchLen)) - } - // If we have a next chunk we enqueue it now - if (nextChunk) { - // if replacement spills over to the next chunk we first exclude the replaced bytes - const overflowBytes = currentIndex + searchLen - currentChunk.length - const truncatedChunk = - overflowBytes > 0 ? nextChunk!.subarray(overflowBytes) : nextChunk - controller.enqueue(truncatedChunk) - } - // We are now in found mode and don't need to track currentChunk anymore - currentChunk = null - return - } - // No match found in this chunk, emit it and wait for the next one - controller.enqueue(currentChunk) - } - - // Advance to the next chunk - currentChunk = nextChunk - } - - return new TransformStream({ - transform(chunk, controller) { - processChunk(controller, chunk) - }, - flush(controller) { - processChunk(controller, null) - }, - }) +function prependIsPartialByte( + stream: ReadableStream, + isPartial: boolean +): ReadableStream { + const byte = new Uint8Array([isPartial ? 0x7e : 0x23]) + return stream.pipeThrough( + new TransformStream({ + start(controller) { + controller.enqueue(byte) + }, + }) + ) } async function finalRuntimeServerPrerender( @@ -1324,7 +1226,7 @@ async function finalRuntimeServerPrerender( cookies: PrerenderStoreModernRuntime['cookies'], draftMode: PrerenderStoreModernRuntime['draftMode'], onError: (err: unknown) => string | undefined, - runtimePrefetchSentinel: number + staleTimeIterable: StaleTimeIterable ) { const { implicitTags, renderOpts } = ctx const { ComponentMod, experimental, isDebugDynamicAccesses } = renderOpts @@ -1370,6 +1272,8 @@ async function finalRuntimeServerPrerender( draftMode, } + trackStaleTime(finalServerPrerenderStore, staleTimeIterable, selectStaleTime) + const { clientModules } = getClientReferenceManifest() const finalRSCPayload = await workUnitAsyncStorage.run( @@ -1414,33 +1318,30 @@ async function finalRuntimeServerPrerender( finalStageController.advanceStage(RenderStage.Runtime) }, () => { - // Abort. - if (finalServerController.signal.aborted) { - // If the server controller is already aborted we must have called something - // that required aborting the prerender synchronously such as with new Date() - serverIsDynamic = true - return - } + finishStaleTimeTracking(staleTimeIterable).then(() => { + // Abort. This runs as a microtask after Flight has flushed the + // staleTime closing chunk, but before the next macrotask resolves the + // overall result. + if (finalServerController.signal.aborted) { + // If the server controller is already aborted we must have called + // something that required aborting the prerender synchronously such + // as with new Date() + serverIsDynamic = true + return + } - if (prerenderIsPending) { - // If prerenderIsPending then we have blocked for longer than a Task and we assume - // there is something unfinished. - serverIsDynamic = true - } - finalServerController.abort() + if (prerenderIsPending) { + // If prerenderIsPending then we have blocked for longer than a Task + // and we assume there is something unfinished. + serverIsDynamic = true + } + finalServerController.abort() + }) } ) - // Update the RSC payload stream to replace the sentinel with actual values. - // React has already serialized the payload with the sentinel, so we need to transform the stream. - const collectedStale = selectStaleTime(finalServerPrerenderStore.stale) - result.prelude = result.prelude.pipeThrough( - createRuntimePrefetchTransformStream( - runtimePrefetchSentinel, - serverIsDynamic, - collectedStale - ) - ) + // Prepend a byte indicating whether the response contains dynamic holes. + result.prelude = prependIsPartialByte(result.prelude, serverIsDynamic) return { result, @@ -1450,7 +1351,7 @@ async function finalRuntimeServerPrerender( isPartial: serverIsDynamic, collectedRevalidate: finalServerPrerenderStore.revalidate, collectedExpire: finalServerPrerenderStore.expire, - collectedStale, + collectedStale: staleTimeIterable.currentValue, collectedTags: finalServerPrerenderStore.tags, } } @@ -6048,14 +5949,6 @@ const getGlobalErrorStyles = async ( } } -function createSelectStaleTime(experimental: ExperimentalConfig) { - return (stale: number) => - stale === INFINITE_CACHE && - typeof experimental.staleTimes?.static === 'number' - ? experimental.staleTimes.static - : stale -} - async function collectSegmentData( fullPageDataBuffer: Buffer, prerenderStore: PrerenderStore, diff --git a/packages/next/src/server/app-render/stale-time.ts b/packages/next/src/server/app-render/stale-time.ts new file mode 100644 index 0000000000000..5f429d2bdbea7 --- /dev/null +++ b/packages/next/src/server/app-render/stale-time.ts @@ -0,0 +1,104 @@ +import type { ExperimentalConfig } from '../config-shared' +import { INFINITE_CACHE } from '../../lib/constants' + +/** + * An AsyncIterable that yields staleTime values. Each call to + * `update()` yields the new value. When `close()` is called, the iteration + * ends. + * + * This is included in the RSC payload so Flight serializes each yielded value + * into the stream immediately. If the prerender is aborted by sync IO, the last + * yielded value is already in the stream, allowing the prerender to be aborted + * synchronously. + */ +export class StaleTimeIterable { + private _resolve: ((result: IteratorResult) => void) | null = null + private _done = false + + /** The last value passed to `update()`. */ + public currentValue: number = 0 + + update(value: number): void { + if (this._done) return + this.currentValue = value + if (this._resolve) { + this._resolve({ value, done: false }) + this._resolve = null + } + } + + close(): void { + if (this._done) return + this._done = true + if (this._resolve) { + this._resolve({ value: undefined, done: true }) + this._resolve = null + } + } + + [Symbol.asyncIterator](): AsyncIterator { + return { + next: () => { + if (this._done) { + return Promise.resolve({ value: undefined, done: true }) + } + return new Promise>((resolve) => { + this._resolve = resolve + }) + }, + } + } +} + +export function createSelectStaleTime(experimental: ExperimentalConfig) { + return (stale: number) => + stale === INFINITE_CACHE && + typeof experimental.staleTimes?.static === 'number' + ? experimental.staleTimes.static + : stale +} + +/** + * Intercepts writes to the `stale` field on the prerender store and yields + * each update (after applying selectStaleTime) through the iterable. This + * ensures the latest stale time is always serialized in the Flight stream, + * even if the prerender is aborted by sync IO. + */ +export function trackStaleTime( + store: { stale: number }, + iterable: StaleTimeIterable, + selectStaleTime: (stale: number) => number +): void { + let _stale = store.stale + iterable.update(selectStaleTime(_stale)) + Object.defineProperty(store, 'stale', { + get: () => _stale, + set: (value: number) => { + _stale = value + iterable.update(selectStaleTime(value)) + }, + configurable: true, + enumerable: true, + }) +} + +/** + * Closes the stale time iterable and waits for React to flush the closing + * chunk into the Flight stream. This also allows the prerender to complete if + * no other work is pending. + * + * Flight's internal work gets scheduled as a microtask when we close the + * iterable. We need to ensure Flight's pending queues are emptied before this + * function returns, because the caller will abort the prerender immediately + * after. We can't use a macrotask (that would allow dynamic IO to sneak into + * the response), so we use microtasks instead. The exact number of awaits + * isn't important as long as we wait enough ticks for Flight to finish writing. + */ +export async function finishStaleTimeTracking( + iterable: StaleTimeIterable +): Promise { + iterable.close() + await Promise.resolve() + await Promise.resolve() + await Promise.resolve() +} diff --git a/packages/next/src/shared/lib/app-router-types.ts b/packages/next/src/shared/lib/app-router-types.ts index d46fee58a72a3..d6930c784459e 100644 --- a/packages/next/src/shared/lib/app-router-types.ts +++ b/packages/next/src/shared/lib/app-router-types.ts @@ -276,8 +276,8 @@ export type NavigationFlightResponse = { q: string /** couldBeIntercepted */ i: boolean - /** runtimePrefetch - [isPartial, staleTime]. Only present in runtime prefetch responses. */ - rp?: [boolean, number] + /** staleTime - Only present in dynamic runtime prefetch responses. */ + s?: AsyncIterable /** headVaryParams */ h: VaryParamsThenable | null }