Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 111 additions & 21 deletions packages/next/src/client/components/segment-cache/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
import {
NEXT_DID_POSTPONE_HEADER,
NEXT_INSTANT_PREFETCH_HEADER,
NEXT_IS_PRERENDER_HEADER,
NEXT_ROUTER_PREFETCH_HEADER,
NEXT_ROUTER_SEGMENT_PREFETCH_HEADER,
NEXT_ROUTER_STALE_TIME_HEADER,
Expand Down Expand Up @@ -2012,13 +2013,28 @@ export async function fetchSegmentPrefetchesUsingDynamicRequest(
// Track when the network connection closes.
const closed = createPromiseWithResolvers<void>()

let isResponsePartial = false
let responseBody = response.body

// For dynamic runtime prefetches, strip the leading isPartial byte before
// passing the stream to Flight. Static responses (served from cache) don't
// have the byte. We detect them via NEXT_IS_PRERENDER_HEADER.
if (
fetchStrategy === FetchStrategy.PPRRuntime &&
!response.headers.get(NEXT_IS_PRERENDER_HEADER)
) {
const stripped = await stripIsPartialByte(responseBody)
isResponsePartial = stripped.isPartial
responseBody = stripped.stream
}

let fulfilledEntries: Array<FulfilledSegmentCacheEntry> | null = null
const prefetchStream = createPrefetchResponseStream(
response.body,
responseBody,
closed.resolve,
function onResponseSizeUpdate(totalBytesReceivedSoFar) {
// When processing a dynamic response, we don't know how large each
// individual segment is, so approximate by assiging each segment
// individual segment is, so approximate by assigning each segment
// the average of the total response size.
if (fulfilledEntries === null) {
// Haven't received enough data yet to know which segments
Expand All @@ -2038,13 +2054,6 @@ export async function fetchSegmentPrefetchesUsingDynamicRequest(
{ allowPartialStream: true }
)

const isResponsePartial =
fetchStrategy === FetchStrategy.PPRRuntime
? // A runtime prefetch may have holes.
serverData.rp?.[0] === true
: // Full and LoadingBoundary prefetches cannot have holes.
// (even if we did set the prefetch header, we only use this codepath for non-PPR-enabled routes)
false
// Read head vary params synchronously. Individual segments carry their
// own thenables in CacheNodeSeedData.
const headVaryParamsThenable = serverData.h
Expand All @@ -2053,17 +2062,21 @@ export async function fetchSegmentPrefetchesUsingDynamicRequest(
? readVaryParams(headVaryParamsThenable)
: null

const now = Date.now()
const staleAt = await getStaleAt(now, serverData, response)

// Aside from writing the data into the cache, this function also returns
// the entries that were fulfilled, so we can streamingly update their sizes
// in the LRU as more data comes in.
fulfilledEntries = writeDynamicRenderResponseIntoCache(
Date.now(),
now,
task,
fetchStrategy,
response as RSCResponse<NavigationFlightResponse>,
serverData,
isResponsePartial,
headVaryParams,
staleAt,
route,
spawnedEntries
)
Expand Down Expand Up @@ -2166,6 +2179,7 @@ function writeDynamicTreeResponseIntoCache(
serverData,
isResponsePartial,
headVaryParams,
getStaleAtFromHeader(now, response),
fulfilledEntry,
null
)
Expand Down Expand Up @@ -2197,6 +2211,7 @@ function writeDynamicRenderResponseIntoCache(
serverData: NavigationFlightResponse,
isResponsePartial: boolean,
headVaryParams: VaryParams | null,
staleAt: number,
route: FulfilledRouteCacheEntry,
spawnedEntries: Map<SegmentRequestKey, PendingSegmentCacheEntry> | null
): Array<FulfilledSegmentCacheEntry> | null {
Expand All @@ -2219,17 +2234,6 @@ function writeDynamicRenderResponseIntoCache(
return null
}

// For runtime prefetches, stale time is in the payload at rp[1].
// For other responses, fall back to the header.
const staleTimeSeconds =
typeof serverData.rp?.[1] === 'number'
? serverData.rp[1]
: parseInt(response.headers.get(NEXT_ROUTER_STALE_TIME_HEADER) ?? '', 10)
const staleTimeMs = !isNaN(staleTimeSeconds)
? getStaleTimeMs(staleTimeSeconds)
: STATIC_STALETIME_MS
const staleAt = now + staleTimeMs

for (const flightData of flightDatas) {
const seedData = flightData.seedData
if (seedData !== null) {
Expand Down Expand Up @@ -2590,3 +2594,89 @@ function addInstantPrefetchHeaderIfLocked(
}
}
}

function getStaleAtFromHeader(
now: number,
response: RSCResponse<unknown>
): number {
const staleTimeSeconds = parseInt(
response.headers.get(NEXT_ROUTER_STALE_TIME_HEADER) ?? '',
10
)

const staleTimeMs = !isNaN(staleTimeSeconds)
? getStaleTimeMs(staleTimeSeconds)
: STATIC_STALETIME_MS

return now + staleTimeMs
}

async function getStaleAt(
now: number,
serverData: NavigationFlightResponse,
response: RSCResponse<unknown>
): Promise<number> {
if (serverData.s !== undefined) {
// Iterate the async iterable and take the last yielded value. The server
// yields updated staleTime values during the render; the last one is the
// final staleTime.
let staleTimeSeconds: number | undefined

// TODO: Buffer the response and then read the iterable values
// synchronously, similar to readVaryParams. This would avoid the need to
// make getStaleAt async, and we could also use it in
// writeDynamicTreeResponseIntoCache. This will also be needed when React
// starts leaving async iterables hanging when the outer RSC stream is
// aborted e.g. due to sync I/O (with unstable_allowPartialStream).
for await (const value of serverData.s) {
staleTimeSeconds = value
}

if (staleTimeSeconds !== undefined) {
const staleTimeMs = isNaN(staleTimeSeconds)
? STATIC_STALETIME_MS
: getStaleTimeMs(staleTimeSeconds)

return now + staleTimeMs
}
}

return getStaleAtFromHeader(now, response)
}

/**
* Strips the leading isPartial byte from a runtime prefetch response stream.
* Returns the remaining stream and whether the response is partial.
*/
async function stripIsPartialByte(
stream: ReadableStream<Uint8Array>
): Promise<{ stream: ReadableStream<Uint8Array>; isPartial: boolean }> {
const reader = stream.getReader()
const { done, value } = await reader.read()
if (done || !value || value.byteLength === 0) {
return {
stream: new ReadableStream({ start: (c) => c.close() }),
isPartial: false,
}
}
const isPartial = value[0] === 0x7e // ASCII '~'
const remainder = value.byteLength > 1 ? value.subarray(1) : null
return {
isPartial,
stream: new ReadableStream<Uint8Array>({
start(controller) {
if (remainder) {
controller.enqueue(remainder)
}
},
async pull(controller) {
const result = await reader.read()
if (result.done) {
controller.close()
} else {
controller.enqueue(result.value)
}
},
}),
}
}
Loading
Loading