fix: race condition in SpillPool caused by buffered stream#20067
fix: race condition in SpillPool caused by buffered stream#20067dekuu5 wants to merge 8 commits intoapache:mainfrom
Conversation
Signed-off-by: Ahmed hossam <ahmed.hossambahig@gmail.com>
There was a problem hiding this comment.
Pull request overview
This PR fixes an indeterministic test failure in SpillPool caused by a race condition between the writer and reader coordination logic when using a buffered stream.
Changes:
- Removed the
spawn_bufferedwrapper fromread_spill_as_streamto return an unbuffered stream - Removed the unused import of
spawn_bufferedfromspill_manager.rs
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
martin-g
left a comment
There was a problem hiding this comment.
If it is not too complex maybe add a test case verifying that there is no racing any more
| max_record_batch_memory, | ||
| ))); | ||
|
|
||
| Ok(spawn_buffered(stream, self.batch_read_buffer_capacity)) |
There was a problem hiding this comment.
It seems batch_read_buffer_capacity is no more used.
It could be deprecated or maybe even removed.
https://github.com/dekuu5/datafusion/blob/1b8ef43fdd1424a3e4fe2db213fec4e7228788b0/datafusion/physical-plan/src/sorts/multi_level_merge.rs#L276 is a no-op
There was a problem hiding this comment.
okay sure i will look into that and the tests also
Signed-off-by: Ahmed hossam <ahmed.hossambahig@gmail.com>
Signed-off-by: Ahmed hossam <ahmed.hossambahig@gmail.com>
|
Hello can i get a review on this pr |
|
I have yet to review this but I wonder how it relates to #20159 |
|
@martin-g are you able to triage this since you started review already? |
|
@dekuu5 Please fix the failing CI checks! |
|
|
||
| #[tokio::test(flavor = "multi_thread", worker_threads = 1)] | ||
| async fn test_concurrent_writer_reader_race_condition() -> Result<()> { | ||
| // stress testing the concurncy in the reader and the reader to make sure there is now race condtion |
There was a problem hiding this comment.
| // stress testing the concurncy in the reader and the reader to make sure there is now race condtion | |
| // stress testing the concurrency in the reader and the writer to make sure there is now race condition |
| /// Reads a spill file as a stream. The file must be created by the current `SpillManager`. | ||
| /// This method will generate output in FIFO order: the batch appended first | ||
| /// will be read first. | ||
| pub fn read_spill_as_stream( |
There was a problem hiding this comment.
Now, read_spill_as_stream() is exactly the same as read_spill_as_stream_unbuffered() below.
Maybe the buffering impl should have been preserved but the caller with the issue should have been changed to use read_spill_as_stream_unbuffered() ?!
There was a problem hiding this comment.
The more I dig into this, the more I think there should be a better solution.
The PR solves the issue by removing the pre-fetching of spilled data.
IMO we should focus on finding the reason why the pre-fetching gets the wrong EOF (and drops the reader) and fix it.
#20027 (comment)
There was a problem hiding this comment.
Now,
read_spill_as_stream()is exactly the same asread_spill_as_stream_unbuffered()below. Maybe the buffering impl should have been preserved but the caller with the issue should have been changed to useread_spill_as_stream_unbuffered()?!
yes indeed but i wrote this before #20159 was merged i was thinking about doing an unbuffered stream but i thought no other functions use it so why make another one
There was a problem hiding this comment.
The more I dig into this, the more I think there should be a better solution. The PR solves the issue by removing the pre-fetching of spilled data. IMO we should focus on finding the reason why the pre-fetching gets the wrong EOF (and drops the reader) and fix it. #20027 (comment)
Yes, I thought of that. I think a better approach is to make the buffered stream somehow aware of the synchronization between the reader and the writer. Maybe spawn_buffered should know the writer's status? That was what came to mind at the time, but I am not yet sure how to implement this.
|
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
Which issue does this PR close?
SpillPool#20027Rationale for this change
The SpillPool test spill::spill_pool::channel was failing indeterministically due to a race condition between SpillFile's coordination logic and the spawn_buffered background task used in the stream reader.
What changes are included in this PR?
read_spill_as_stream now returns a normal stream not a buffered stream
Are these changes tested?
No, the fix is for an existing test mentioned in the issue. I wasn't able to find the bug initially, but I found it by stress-testing the original test 100 times in parallel.
Are there any user-facing changes?
no
i still think we should add a buffer stream that is aware of the coordination of the writer and reader but i can't warp my head around it yet.