[WIP] Spark: Make Spark readers function asynchronously for many small files [issue #15287]#15341
[WIP] Spark: Make Spark readers function asynchronously for many small files [issue #15287]#15341varun-lakhyani wants to merge 7 commits intoapache:mainfrom
Conversation
|
High level rough implementation of asyncTaskOpener, a separate test enabling marker in table properties to test it and Benchmarking of this new async against current sync is done. |
|
Benchmarking details: Basically little latency overhead is created manually using LockSupport.parkNanos(1_000_000) in open () function in org/apache/iceberg/spark/source/BatchDataReader.java to stimulate real IO overhead caused by cloud storages. ( used @WarmUp(iterations = 5) @measurement(iterations = 15) for benchmarking) Result for 1000 files - 15-20 Kb each compaction (rewrite_data_files) for various cases:
|
|
This is really great initial testing, I've only take a small look but I would recommend you try out using ParallelIterable for parallelism rather than the current implementation. You could use this in conjunction with ThreadPools.getWorkerPool as a default (which auto scales with the reported cpu stats) although I think leaving it configurable is also interesting and good for testing) Iterable<Iterable<T>> taskIterables = tasks.stream()
.map(task -> (Iterable<T>) () -> open(task))
.collect(toList());
ParallelIterable<T> parallel = new ParallelIterable<>(taskIterables, ThreadPools.getWorkerPool());Could you try that out with your same benchmark? I know you are using only 10 files, but I'd really bet interested at the scale of improvement all the way up to 10 threads (one per file) My hunch is we can basically get an order of magnitude at least |
|
Another note, we probably want to make sure we don't enable this except in cases where we know the tasks won't be reading a huge amount of data but we can leave that for later. |
7f3d6e9 to
c01a268
Compare
|
Updated
Updated the code and ran benchmark. For 1000 files - 15-20 Kb each
Concern here is when no manual overhead consider async actually is taking bit extra time to manage threads via this impleemntation but as overheads increases (which is common for cloud connections) betterment is nearly 90% |
Yes it's on my checklist, Currently passing through a marker later we can set some conditions to decide which path to go sync vs async |
mukund-thakur
left a comment
There was a problem hiding this comment.
I am new to this code but I have added some questions based on my current understanding.
| this.table = table; | ||
| this.taskGroup = taskGroup; | ||
| this.tasks = taskGroup.tasks().iterator(); | ||
| this.currentIterator = CloseableIterator.empty(); |
There was a problem hiding this comment.
currentIterator shouldn't be initialized in case of async I think.
There was a problem hiding this comment.
This actually needs to separate async v/s sync path in next() which is much cleaner.
Updated in latest commit.
| this.current = currentIterator.next(); | ||
| return true; | ||
| } else if (isAsyncEnabled) { | ||
| this.currentIterator.close(); |
There was a problem hiding this comment.
In case of async you don't need to close this as it was never opened. I think the flow will be
if (isAsyncEnabled)
use parallelIterator
else
use currentIterator ( maybe we can rename this is sequentialIterator.)
There was a problem hiding this comment.
Updated in latest commit.
For renaming currentIterator to sequentialIterator, I am not sure renaming is good idea as of now for POC and benchmarking purpose as its been named this for a while, Can think of it afterwards.
| @@ -166,9 +196,13 @@ public void close() throws IOException { | |||
| // close the current iterator | |||
| this.currentIterator.close(); | |||
There was a problem hiding this comment.
close either the parallel or serial iterator?
|
Changed Benchmarking!
It can be seen as cloud storages has high IO overheads so, async flow can be beneficial for small files. |
Github issue #15287
Adding AyncTaskOpener which executes an asynchronous opening of tasks and storing them in a queue which base reader can access and take iterators directly from there instead of opening it then.
For lot of smaller files where IO/open overhead is comparable to processing time, parallel opening could make it work faster.
Keeping this specific test in mind to start working on this (Compaction of 10 small files).
iceberg/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
Line 218 in 852a71b
[WIP] High level design and rough implementation done along with benchmarking and a flag in terms of table properties
Pending: Deciding when to go through async path, Exact implementation along with edge cases like making ALL_TASKS_COMPLETE marker surely not reproducible by user, testing it, flow or design changes if any.