-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Add ParquetFileMerger for efficient row-group level file merging #14435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
25 commits
Select commit
Hold shift + click to select a range
7f2d5b0
Add ParquetFileMerger for efficient row-group level file merging
shangxinli fa1d073
Address feedbacks
shangxinli 7a34353
Address feedbacks for second round
shangxinli c150887
Address comments round 3
shangxinli c593e9e
Trigger CI
shangxinli c71b419
Add row lineage preservation to ParquetFileMerger with binpack-compat…
shangxinli 4ddb5b4
Address feedback of adding row_id support
shangxinli 4130a79
Address feedback for another round
shangxinli 4e4874e
Address comments for another round
shangxinli eabaa0d
Address another round of feedbacks
shangxinli 55aa295
Merge branch 'main' into rewrite_data_files2
shangxinli 047f9b6
Address feedback for another round
shangxinli 0709582
Simplify ParquetFileMerger API to accept DataFile objects
shangxinli cdc322d
Initialize columnIndexTruncateLength internally in ParquetFileMerger
shangxinli 853fd19
Address review feedback: refactor ParquetFileMerger API and validation
shangxinli 4c4f2cb
Refactor ParquetFileMerger API to return MessageType
shangxinli aa5fc36
Address review feedback: optimize validation and file I/O
shangxinli 5962e74
Address review feedback
shangxinli 2eca995
Refactor SparkParquetFileMergeRunner to pass RewriteFileGroup to exec…
shangxinli 45b0197
Inline ParquetFileReader in try-with-resources block
shangxinli 3194f1e
Address pvary's review comments on ParquetFileMerger PR
shangxinli 66532a3
Address reviewer comments on ParquetFileMerger PR
417e0fa
Trigger CI re-run
2404008
Add partition spec ID validation for binary merge
a471220
Address pvary review comments on test structure
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe qualify with 'may provide'?
btw, how about case of small files? It will make very small row groups? Should we have some threshold when not to run this optimization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you make a decision around this? Any normal files could have a small rowgroup at the end.
Shall we make a threshold, like the average rowgroup size should be greater than that?
I would suggest that add this later in a different PR if we decide that we need this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry i just see @lintingbin comment above, its the same one. So currently user opt in and its all or nothing.
Yea, maybe when stitching two files, a threshold percent of how many rows will be in small row groups, and another threshold for what is a small row group
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current scenario involves Flink writing to a large wide table in Iceberg, with a checkpoint being committed every minute.
This is the problem we aim to solve. Ideally, each row of data would only be compressed once, but this is often unachievable in high-frequency write scenarios. However, if data compressed once using traditional methods can later be merged and compressed using rawgroup, it would still be highly desirable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lintingbin the compression is at page level. If your streaming checkpoint interval can make the typical page size (default 1MB), we can consider later to add a PR to merge at page level. In Parquet, we have made changes to rewrite parquet files without decompression-and-then-compression. For example, we do encryption (also at page level) rewrite in that way. We walk though each page, without decoding-decompression, and immediately do encryption on that page and send it to disk. It gains a few times faster than record by record rewrite. But that is a more complex change. We can do that later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow the problem. I thought it was generally bad practice to have so many row groups in a single file, i'm also not a fan of how careful we have to be on schema and field id matching.
@lintingbin Why would we have to do 3) ? Why not just not mark the 150mb file for compaction if that's an issue? You could always just have compaction only compacts files smaller than 10 mb or what not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly from the comment @lintingbin wrote, it sounds like this is an attempt to decrease the cost of compaction when it is unstable -- that is, when files that have already been compacted (the 150 MB file) are compacted a second time. It's a little unclear, but I think the assertion in the last item (5) is that this is useful if you first rewrite small files to a larger file and then compact the larger files without rewriting row groups. This would mean a 2-pass approach: first rewrite the content into medium-sized files (and whole row groups) and then rewrite into large files with multiple row groups.
I don't understand the value of that approach. Once you've solved the small files problem (~100x file count) by rewriting into larger row groups, the additional benefit of a second compaction is very low (~2x file count). I don't see why you would perform the second compaction at all if it is just concatenating the row groups from other files. As long as you're rewriting the data a second time, it makes much more sense to prepare the data for long-term storage and query by clustering and ordering the rows. That would significantly decrease overall size and speed up queries at the same time, which is worth the cost of the rewrite.
And while you're clustering and sorting data, I doubt it makes sense to do the initial rewrite as well. Why incur the cost of rewriting and then not reorganize the data in the first pass as long as you're already rewriting to avoid tiny row groups?
I don't see much value in exposing this -- is it really something that is worth supporting when it is extremely limited and has a very narrow use case (if any)?