This is a rough draft of an epic to handle large state objects (dataclips) in the runtime
The Problem
Our infrastructure can struggle when processing large state objects - either within the runtime itself or between the runtime and worker.
This can lead to:
- Worker instances running slowly or even blowing up when processing large state objects
- runs getting Killed because they blow memory limits
- higher memory requirements per worker process resulting in a lower capacity on worker servers
- Database timeouts in lightning from processing large state objects
Probably related, we also can struggle within a step when downloading large datasets and trying to load them into memory at once. Some adaptors provide streaming APIs (although most don't) - but if you're processing 1M items in batches of 1k, it can be hard in the current runtime architecture to process those batches. Like you probably want to run step a to get a batch, then step b to process it, then loop back to step a to get the next batch.
Sub Issues
Here are some things to look at:
Set up benchmarks and instrumentation
We shouldn't make any changes in this area unless we can quantify the benefit. I don't want to introduce a bleeding edge node feature to production unless I'm absolutely sure we get a benefit out of it.
So, first steps, get some instrumentation set up and a couple of benchmark workflows established.
Don't stringify in runtime
The runtime runs JSON.stringify() on state at the end of each step.
Don't do that. For large state objects its very very expensive in terms of compute and memory.
We do it to remove unserializable properties (functions, streams, classes) and circular properties before they're returned to the worker or CLI or whoever. The runtime guarantees that returned state is JSON serializable.
We can probably approximate this though. if we assume that 95% of state is data, rather than anything logical, we can do something like:
- Iterate over the top n keys (just 2 or 3 deep) of the state object
- remove anything we don;t think will serialize safely, or call toString on it
User functions will likely get written to the top of state.
We should also remove any keys starting with _. This is private state keys, documented elsewhere, and could be super useful.
Note that if we do this optimisation, we might have to handle cases where the stringify call fails (because at some point we DO have to call stringify) if there's a circular reference or something. But so long as we're alive to it and can return a good error, I don't see a problem.
At the end of the day, writing unsupported structures to state is a user error, and we should catch and report it.
Support Streams
streams are cool: they allow use to process large sets if data in small tasty chunks.
But you can't pass a stream between steps in the runtime, and you can't send streams back to lightning.
To support streams, we have to do something like:
- Don't serialize streams at the end of a step
- Replace streams with a string like
[stream] when emitting state out of the runtime to the worker
- giving lightning (and CLI?) some awareness that some state objects have a stream attached and can't be used as inputs. If you retry a step that relies on a stream, it'll fail. Which is good and right! but we must explain it.
use shared array buffers in the engine
A worker thread can share an array buffer with its hosting process. Rather than stringifying objects between the worker thread and the child process, can we use this shared buffer?
I don;t think this works with objects so you probably have to stringify first before you can use the shared buffer.
I also wonder if we can do something clever like use a TCP server to send events to the main worker thread and out to the pheonix socket. Like pipe a stream through more efficiently, rather than serialize whole objects to strings.
Reduce serialization in engine and worker
I think that when we emit a state object from the runtime, we send the object straight to process.send(), maybe stringifying it first, then it gets parsed back into an object in the child process, then it's serialized out of the child process into the main thread, and then serialized back out of the main thread through the phoenix socket.
This would work a lot better if we did something like:
- Convert the object to a JSON string or array buffer inside the runtime
- Emit the encoded object out of the runtime and then forward it through the layers
- The worker receives a flat string or array before sending it out through the websocket
You're basically treating the state object more like a binary and reducing the amount of processing you do with it.
Where this gets a bit tricky is that the event changes a bit through the layers before it hits the socket. But maybe you can do { meta, payload } and stick the big state stuff in the payload, then assemble it into a big Json string right at the end.
This probably applies to log events too (although they might just be flat strings so will have less JSON overhead)
Use node 25 pointer optimisation
There is an experimental node25 build which uses pointer optimisation which claims to reduce node.js memory overhead by 50%.
I've read up on it and it seems legit and useful to use.
The catch is that it's a compile-time nodejs flag. We can really only support it in the docker image. So we can't enable it conditionally for some workflows. Its even hard to enable conditionally for some deployments.
I would be confident that this Just Works for all our use-cases and we don't need to worry - we just need to do it. However I would want to run a really good test in staging before going to prod.
See https://blog.platformatic.dev/we-cut-nodejs-memory-in-half
Batch Processing support features
Processing batches doesn't work in the current runtime design.
If you want to batch 1 million items into pages, and you need more than 1 step to process each page, you have to call a webhook to run another workflow, which is kinda mad.
What I really want to do is either:
a) call a step from another step and await it. Like await call('step-2', state) (do we return immediately after that step, or do we let the workflow run to the leaf before returning?)
b) Allow looping - so step A calls step B with a page of data, and then step B conditionally calls step A again to get a new step. This may not actually work with streaming (it doesn't work if I'm using an onPage callback)
Both features would need robust UI support in Lightning, which is probably the hardest bit.
This is a rough draft of an epic to handle large state objects (dataclips) in the runtime
The Problem
Our infrastructure can struggle when processing large state objects - either within the runtime itself or between the runtime and worker.
This can lead to:
Probably related, we also can struggle within a step when downloading large datasets and trying to load them into memory at once. Some adaptors provide streaming APIs (although most don't) - but if you're processing 1M items in batches of 1k, it can be hard in the current runtime architecture to process those batches. Like you probably want to run step a to get a batch, then step b to process it, then loop back to step a to get the next batch.
Sub Issues
Here are some things to look at:
Set up benchmarks and instrumentation
We shouldn't make any changes in this area unless we can quantify the benefit. I don't want to introduce a bleeding edge node feature to production unless I'm absolutely sure we get a benefit out of it.
So, first steps, get some instrumentation set up and a couple of benchmark workflows established.
Don't stringify in runtime
The runtime runs
JSON.stringify()on state at the end of each step.Don't do that. For large state objects its very very expensive in terms of compute and memory.
We do it to remove unserializable properties (functions, streams, classes) and circular properties before they're returned to the worker or CLI or whoever. The runtime guarantees that returned state is JSON serializable.
We can probably approximate this though. if we assume that 95% of state is data, rather than anything logical, we can do something like:
User functions will likely get written to the top of state.
We should also remove any keys starting with _. This is private state keys, documented elsewhere, and could be super useful.
Note that if we do this optimisation, we might have to handle cases where the stringify call fails (because at some point we DO have to call stringify) if there's a circular reference or something. But so long as we're alive to it and can return a good error, I don't see a problem.
At the end of the day, writing unsupported structures to state is a user error, and we should catch and report it.
Support Streams
streams are cool: they allow use to process large sets if data in small tasty chunks.
But you can't pass a stream between steps in the runtime, and you can't send streams back to lightning.
To support streams, we have to do something like:
[stream]when emitting state out of the runtime to the workeruse shared array buffers in the engine
A worker thread can share an array buffer with its hosting process. Rather than stringifying objects between the worker thread and the child process, can we use this shared buffer?
I don;t think this works with objects so you probably have to stringify first before you can use the shared buffer.
I also wonder if we can do something clever like use a TCP server to send events to the main worker thread and out to the pheonix socket. Like pipe a stream through more efficiently, rather than serialize whole objects to strings.
Reduce serialization in engine and worker
I think that when we emit a state object from the runtime, we send the object straight to process.send(), maybe stringifying it first, then it gets parsed back into an object in the child process, then it's serialized out of the child process into the main thread, and then serialized back out of the main thread through the phoenix socket.
This would work a lot better if we did something like:
You're basically treating the state object more like a binary and reducing the amount of processing you do with it.
Where this gets a bit tricky is that the event changes a bit through the layers before it hits the socket. But maybe you can do
{ meta, payload }and stick the big state stuff in the payload, then assemble it into a big Json string right at the end.This probably applies to log events too (although they might just be flat strings so will have less JSON overhead)
Use node 25 pointer optimisation
There is an experimental node25 build which uses pointer optimisation which claims to reduce node.js memory overhead by 50%.
I've read up on it and it seems legit and useful to use.
The catch is that it's a compile-time nodejs flag. We can really only support it in the docker image. So we can't enable it conditionally for some workflows. Its even hard to enable conditionally for some deployments.
I would be confident that this Just Works for all our use-cases and we don't need to worry - we just need to do it. However I would want to run a really good test in staging before going to prod.
See https://blog.platformatic.dev/we-cut-nodejs-memory-in-half
Batch Processing support features
Processing batches doesn't work in the current runtime design.
If you want to batch 1 million items into pages, and you need more than 1 step to process each page, you have to call a webhook to run another workflow, which is kinda mad.
What I really want to do is either:
a) call a step from another step and await it. Like
await call('step-2', state)(do we return immediately after that step, or do we let the workflow run to the leaf before returning?)b) Allow looping - so step A calls step B with a page of data, and then step B conditionally calls step A again to get a new step. This may not actually work with streaming (it doesn't work if I'm using an onPage callback)
Both features would need robust UI support in Lightning, which is probably the hardest bit.