Skip to content

[WIP] Spark: Make Spark readers function asynchronously for many small files [issue #15287]#15341

Open
varun-lakhyani wants to merge 7 commits intoapache:mainfrom
varun-lakhyani:spark-readers
Open

[WIP] Spark: Make Spark readers function asynchronously for many small files [issue #15287]#15341
varun-lakhyani wants to merge 7 commits intoapache:mainfrom
varun-lakhyani:spark-readers

Conversation

@varun-lakhyani
Copy link
Contributor

@varun-lakhyani varun-lakhyani commented Feb 16, 2026

Github issue #15287
Adding AyncTaskOpener which executes an asynchronous opening of tasks and storing them in a queue which base reader can access and take iterators directly from there instead of opening it then.

For lot of smaller files where IO/open overhead is comparable to processing time, parallel opening could make it work faster.

Keeping this specific test in mind to start working on this (Compaction of 10 small files).

[WIP] High level design and rough implementation done along with benchmarking and a flag in terms of table properties

Pending: Deciding when to go through async path, Exact implementation along with edge cases like making ALL_TASKS_COMPLETE marker surely not reproducible by user, testing it, flow or design changes if any.

@github-actions github-actions bot added the spark label Feb 16, 2026
@varun-lakhyani varun-lakhyani changed the title [WIP] Spark: Make Spark readers function asynchronously for many small files [Issue - 15287] [WIP] Spark: Make Spark readers function asynchronously for many small files [#15287] Feb 16, 2026
@varun-lakhyani varun-lakhyani changed the title [WIP] Spark: Make Spark readers function asynchronously for many small files [#15287] [WIP] Spark: Make Spark readers function asynchronously for many small files [issue #15287] Feb 16, 2026
@varun-lakhyani varun-lakhyani marked this pull request as draft February 16, 2026 22:16
@varun-lakhyani
Copy link
Contributor Author

High level rough implementation of asyncTaskOpener, a separate test enabling marker in table properties to test it and Benchmarking of this new async against current sync is done.

@varun-lakhyani
Copy link
Contributor Author

Benchmarking details:

Basically little latency overhead is created manually using LockSupport.parkNanos(1_000_000) in open () function in org/apache/iceberg/spark/source/BatchDataReader.java to stimulate real IO overhead caused by cloud storages. ( used @WarmUp(iterations = 5) @measurement(iterations = 15) for benchmarking)

Result for 1000 files - 15-20 Kb each compaction (rewrite_data_files) for various cases:

Overhead (ms) Async (s) Sync (s) (existing) % Improvement
No manual Overhead 0.765 0.932 17.9%
1 0.772 2.881 73.2%
5 1.778 8.512 79.1%
10 3.284 15.159 78.3%
15 4.709 21.260 77.8%

@RussellSpitzer
Copy link
Member

This is really great initial testing, I've only take a small look but I would recommend you try out using ParallelIterable for parallelism rather than the current implementation. You could use this in conjunction with ThreadPools.getWorkerPool as a default (which auto scales with the reported cpu stats) although I think leaving it configurable is also interesting and good for testing)

Iterable<Iterable<T>> taskIterables = tasks.stream()
    .map(task -> (Iterable<T>) () -> open(task))
    .collect(toList());
ParallelIterable<T> parallel = new ParallelIterable<>(taskIterables, ThreadPools.getWorkerPool());

Could you try that out with your same benchmark? I know you are using only 10 files, but I'd really bet interested at the scale of improvement all the way up to 10 threads (one per file) My hunch is we can basically get an order of magnitude at least

@RussellSpitzer
Copy link
Member

Another note, we probably want to make sure we don't enable this except in cases where we know the tasks won't be reading a huge amount of data but we can leave that for later.

@varun-lakhyani
Copy link
Contributor Author

Updated

This is really great initial testing, I've only take a small look but I would recommend you try out using ParallelIterable for parallelism rather than the current implementation. You could use this in conjunction with ThreadPools.getWorkerPool as a default (which auto scales with the reported cpu stats) although I think leaving it configurable is also interesting and good for testing)

Iterable<Iterable<T>> taskIterables = tasks.stream()
    .map(task -> (Iterable<T>) () -> open(task))
    .collect(toList());
ParallelIterable<T> parallel = new ParallelIterable<>(taskIterables, ThreadPools.getWorkerPool());

Could you try that out with your same benchmark? I know you are using only 10 files, but I'd really bet interested at the scale of improvement all the way up to 10 threads (one per file) My hunch is we can basically get an order of magnitude at least

Updated the code and ran benchmark.

For 1000 files - 15-20 Kb each

Overhead (ms) Async (s) Sync (s) (existing) % Improvement
No manual Overhead 0.855 0.842 -1.5%
1 0.863 2.674 67.7%
5 1.172 9.270 87.4%
10 1.670 15.629 89.3%
15 2.145 21.100 89.8%

Concern here is when no manual overhead consider async actually is taking bit extra time to manage threads via this impleemntation but as overheads increases (which is common for cloud connections) betterment is nearly 90%

@varun-lakhyani
Copy link
Contributor Author

Another note, we probably want to make sure we don't enable this except in cases where we know the tasks won't be reading a huge amount of data but we can leave that for later.

Yes it's on my checklist, Currently passing through a marker later we can set some conditions to decide which path to go sync vs async

Copy link

@mukund-thakur mukund-thakur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am new to this code but I have added some questions based on my current understanding.

this.table = table;
this.taskGroup = taskGroup;
this.tasks = taskGroup.tasks().iterator();
this.currentIterator = CloseableIterator.empty();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentIterator shouldn't be initialized in case of async I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually needs to separate async v/s sync path in next() which is much cleaner.
Updated in latest commit.

this.current = currentIterator.next();
return true;
} else if (isAsyncEnabled) {
this.currentIterator.close();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of async you don't need to close this as it was never opened. I think the flow will be
if (isAsyncEnabled)
use parallelIterator
else
use currentIterator ( maybe we can rename this is sequentialIterator.)

Copy link
Contributor Author

@varun-lakhyani varun-lakhyani Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in latest commit.

For renaming currentIterator to sequentialIterator, I am not sure renaming is good idea as of now for POC and benchmarking purpose as its been named this for a while, Can think of it afterwards.

@@ -166,9 +196,13 @@ public void close() throws IOException {
// close the current iterator
this.currentIterator.close();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close either the parallel or serial iterator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@varun-lakhyani
Copy link
Contributor Author

varun-lakhyani commented Mar 20, 2026

Changed Benchmarking!
Instead of replicating cloud storage IO overhead, Benchmarked the sync and async flow with AWS S3 (1000 files - 14.6 Kb each) :

  • Sync time = 219.694 s
  • Async time = 51.853 s
  • % Improvement = 76.4%

It can be seen as cloud storages has high IO overheads so, async flow can be beneficial for small files.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants