[core] Ensure open stream flush is await-able in pendingOps#1446
[core] Ensure open stream flush is await-able in pendingOps#1446VaguelySerious wants to merge 6 commits intomainfrom
Conversation
WorkflowServerWritableStream buffers writes and flushes via a 10ms
setTimeout for batching. The write() callback returned immediately
after buffering, causing flushablePipe's pendingOps counter to reach 0
before data actually reached the server via the deferred HTTP flush.
Fix: write() now returns a promise that resolves only after the
scheduled flush completes. Multiple writes within the 10ms window
still share a single batched flush — the batching optimization is
preserved. Each write's promise resolves when the batch's HTTP
round-trip finishes, so pendingOps accurately reflects server state.
This is implemented via a flushWaiters array: each write() pushes
a {resolve, reject} pair. When the setTimeout fires and flush()
completes, all waiters are resolved (or rejected on error).
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Peter Wielander <mittgfu@gmail.com>
Signed-off-by: Peter Wielander <mittgfu@gmail.com>
🦋 Changeset detectedLatest commit: 3b5d85b The changes in this PR will be included in the next version bump. This PR includes changesets to release 16 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests🌍 Community Worlds (56 failed)mongodb (3 failed):
redis (2 failed):
turso (51 failed):
Details by Category✅ ▲ Vercel Production
✅ 💻 Local Development
✅ 📦 Local Production
✅ 🐘 Local Postgres
✅ 🪟 Windows
❌ 🌍 Community Worlds
✅ 📋 Other
|
📊 Benchmark Results
workflow with no steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) | Express workflow with 1 step💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro | Express workflow with 10 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Next.js (Turbopack) | Nitro workflow with 25 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) workflow with 50 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Express | Nitro Promise.all with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Express | Nitro Promise.all with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) Promise.all with 50 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Nitro | Next.js (Turbopack) Promise.race with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro | Express Promise.race with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) Promise.race with 50 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Nitro | Next.js (Turbopack) Stream Benchmarks (includes TTFB metrics)workflow with stream💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) SummaryFastest Framework by WorldWinner determined by most benchmark wins
Fastest World by FrameworkWinner determined by most benchmark wins
Column Definitions
Worlds:
|
Restore and update test coverage that was reduced in the initial change. The test suite now covers 15 scenarios (up from 9): Flush-on-write behavior: - write() resolves only after data reaches server - Single chunk uses writeToStream (not writeToStreamMulti) - Falls back to sequential writes when writeToStreamMulti unavailable - Multiple sequential writes trigger separate flush cycles - Concurrent writes wait for in-progress flush before buffering Close behavior: - closeStream called on close - Remaining buffer flushed on close - Empty buffer on close skips write methods Abort & error handling: - Abort discards buffer and skips closeStream - Write errors propagate to caller - Close errors propagate to caller - Flush errors during write propagate to caller Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| @@ -0,0 +1,15 @@ | |||
| --- | |||
| title: Changelog | |||
There was a problem hiding this comment.
This is a stub so we can add changelog entries for bigger changes later
| * @vercel/workflow | ||
|
|
||
| packages/next/ @ijjk @vercel/workflow | ||
| packages/next/src @ijjk @vercel/workflow |
There was a problem hiding this comment.
This ensures JJ doesn't get accidentally tagged on every release PR when only changesets change
…sh timer but never settles the `flushWaiters` promises, causing the internal `write()` async function to hang forever on an unsettled promise.
This commit fixes the issue reported at packages/core/src/serialization.ts:560
**Bug explanation:**
In `BufferedWritableStream`, the `write()` method (line 530) pushes chunks to a buffer, schedules a flush via a timer, and then awaits a promise that is added to `flushWaiters` (line 545):
```js
await new Promise<void>((resolve, reject) => {
flushWaiters.push({ resolve, reject });
});
```
Normally, when the flush timer fires (line 514), it captures `flushWaiters`, replaces it with a fresh array, calls `flush()`, and then resolves or rejects the captured waiters based on the flush result.
When `abort()` is called (line 560), it:
1. Clears the flush timer (preventing the timer callback from ever running)
2. Discards the buffer
But it does NOT settle the promises in `flushWaiters`. Since the timer that would have resolved/rejected those waiters has been cleared, and nothing else references them, those promises will **never settle**. The internal async function of `write()` is suspended at the `await` on line 545 and will remain suspended forever - it becomes a memory leak. The closures captured by those async functions (referencing `buffer`, `flushWaiters`, the world object, etc.) will also be retained in memory.
While the WritableStream infrastructure will reject the external write promise returned to consumers, the *internal* async function body doesn't get cancelled - JavaScript has no mechanism to cancel a suspended async function. It will hang indefinitely on the unsettled promise.
**Fix explanation:**
The fix adds code to the `abort()` handler to reject all pending `flushWaiters` before clearing the array. It:
1. Captures the current `flushWaiters` into a local variable
2. Replaces `flushWaiters` with an empty array (same pattern used in the timer callback)
3. Rejects each waiter with the abort reason (or a default "Stream aborted" error)
This ensures the `write()` async functions' awaited promises settle (with rejection), allowing those async functions to complete and their closures to be garbage collected. The rejection from `w.reject()` will propagate up through the `write()` function, but since the stream is already being aborted, this is the correct behavior.
Co-authored-by: Vercel <vercel[bot]@users.noreply.github.com>
Co-authored-by: VaguelySerious <mittgfu@gmail.com>
Stream tests seemed slightly flaky, and I suspect it might be partially because the 10ms buffer on streams wasn't await-able.