stream: experimental stream/iter implementation#62066
stream: experimental stream/iter implementation#62066jasnell wants to merge 55 commits intonodejs:mainfrom
Conversation
|
Review requested:
|
ronag
left a comment
There was a problem hiding this comment.
Super impressed! This is amazing.
One note. Since this is supposed to be "web compatible" it looks to me like everything is based on Uint8Array which is a bit unfortunate for Node. Could the node implementation use Buffer it would still be compatible it's just that we can access the Buffer prototype methods without doing hacks like Buffer.prototype.write.call(...).
|
Also could you do some mitata based benchmarks so that we can see the gc and memory pressure relative to node streams? |
|
Another thing, in the async generator case, can we pass an optional AbortSignal? i.e. |
This makes me a bit nervous for code portability. If some one starts working with this in node.js, they would end up writing code that depends on the values being |
benjamingr
left a comment
There was a problem hiding this comment.
just to explore implementation feasibility, performance, etc
Sounds fine as this isn't exposed outside at the time
|
|
||
| // Buffer is full | ||
| switch (this._backpressure) { | ||
| case 'strict': |
There was a problem hiding this comment.
I'm not sure strict should be the default and not block here.
There was a problem hiding this comment.
That'll be a big part of the discussion around this. A big part of the challenge with web streams is that backpressure can be fully ignored. One of the design principles for this new approach is to apply it strictly by default. We'll need to debate this. Recommend opening an issue at https://github.com/jasnell/new-streams
benjamingr
left a comment
There was a problem hiding this comment.
sorry meant to approve, regardless of design changes/suggestions regarding timing and a lot of other stuff as experimental this is fine.
I would maybe update the docs to emphasize the experimentality even further than normal
|
@ronag ... implemented a couple of mitata benchmarks in the -- Memory Benchmark ResultsEnvironment: Node 25.6.0, Intel Xeon w9-3575X, --expose-gc, mitata with .gc('inner') Per-Operation Allocations (New Streams vs Web Streams)
Pipeline scenarios (pull, pipeTo) show the biggest gains: 16-25x less heap because transforms are inline function calls, not stream-to-stream pipes with internal queues. Push is faster but uses slightly more heap due to batch iteration (Uint8Array[]). Broadcast/tee are comparable at this scale. Sustained Load (97.7 MB volume)
pipeTo and broadcast show the largest sustained-load heap difference. Web Streams' pipeThrough chain buffers ~50% of total volume in flight; new streams' pipeTo pulls synchronously through the transform. Broadcast's shared ring buffer (0.5 MB) vs tee's per-branch queues (42.8 MB). Zero retained memory for both APIs after completion -- no leaks. |
|
@ronag passing a signal to an async generator allows the underlying source to abort it, but we're lacking a builtin way for the consumer iterating the async generator to safely cancel the stream. It can Barring an improvement at the language level, the consumer can only safely cancel the underlying source if it has a reference to an WHATWG Streams don't have this problem if the consumer Happy to create examples to reproduce this if it's not clear what I'm talking about. |
|
I think you misunderstand. The signal would be for any async calls inside the generator. |
|
Yes, I'm just saying that doesn't allow the consumer to abort calls the async generator is making, but the consumer often decides when streaming should be aborted. For example say I'm using a library that handles subscriptions from the frontend. When it gets a subscription it asks me to build an async iterable of events to stream back. Then it's responsible for iterating, then cancelling once the frontend unsubscribes. If the iterable I pass to that library is from an async generator, I'll have to also pass an AbortController to that library for it to safely clean up once the client unsubscribes. If all it has is an AsyncIterable interface, it may leak resources after the client unsubscribes. This is a fundamental weakness in using async generators for transformation and my longtime frustration with async iteration in general. In contrast, with WHATWG streams, when a consumer cancels its reader, the underlying source and any TransformStreams and get notified to clean up right away. |
|
@benjamingr was actually talking about the same thing I'm trying to resurrect awareness of in this old issue in the async-iteration proposal Note one of his comments: tc39/proposal-async-iteration#126 (comment) This was eight years ago but there hasn't been much improvement on this front, unfortunately. I'm really hoping I can get everyone to fully understand this pitfall and have a good plan for how to help people avoid it before getting too far along with this new proposed API. |
9f8af01 to
e1e1911
Compare
Refactors the cancelation per updates in the design doc
This comment was marked as outdated.
This comment was marked as outdated.
|
I've updated the implementation to address the remaining outstanding issues, round out tests, add benchmarks, fix bugs, etc. It's also now behind an experimental cli flag. This is ready for review. |
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #62066 +/- ##
==========================================
- Coverage 89.68% 89.50% -0.18%
==========================================
Files 676 688 +12
Lines 206575 212433 +5858
Branches 39549 40584 +1035
==========================================
+ Hits 185262 190138 +4876
- Misses 13446 14398 +952
- Partials 7867 7897 +30
🚀 New features to boost your workflow:
|
Opening this for discussion. Not intending to land this yet. It adds an implementation of the "new streams" to core and adds support toFileHandlewith tests and benchmarks just to explore implementation feasibility, performance, etc.This is an implementation of the "new streams" API for Node.js along with an example integration with
FileHandle. This covers the core part of the implementation.The module is
stream/iter. It is gated behind the--experimental-stream-iterCLI flag.Benchmark results comparing Node.js streams, Web streams, and stream/iter (higher number is better)
It's worth noting that the performance of the
FileHandlebenchmarked added, that reads files, converts them to upper case and then compresses them, is on par with node.js streams and twice as fast as web streams. (tho... web streams are not perf optimized in any way so take that 2x with a grain of salt). The majority of the perf cost in the benchmark is due to compression overhead. Without the compression transform, the new stream can be up to 15% faster than reading the file with classic node.js streams.The main thing this shows is that the new streams impl can (a) perform reasonably and (b) sit comfortably alongside the existing impls without any backwards compat concerns.
Benchmark runs:
Opencode/Opus 4.6 were leveraged heavily in the process of creating this PR following a strict iterative jasnell-in-the-loop process.
--
Reviewing Guide
The draft spec this is implementing is located at https://stream-iter.jasnell.me/
The implementation is primarily in
lib/internal/streams/iter... that's where you should start. The functionality is split between key files by operation, which should make it easier to review.The tests are in parallel prefixed as
test-stream-iter-*, they are organized also by functional area.The are benchmarks in
bench/streamsprefixed withiter-*.