-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
Describe the problem
When Spark tasks or stages are retried (due to failures, speculation, or task resubmission), write operations may be re-executed. This can cause:
- Duplicate data files - Multiple copies of the same logical write are created with different file names
- Corrupted/partial Parquet files - When speculative tasks are killed mid-write, they leave behind incomplete files (often just headers)
- Downstream job failures - Subsequent read, ingestion, or compaction jobs fail when encountering corrupt or duplicate files
Related issues
- [SUPPORT] After enable speculation execution of spark compaction job, some broken parquet files might be generated #9615 - After enabling speculation execution, broken parquet files are generated
- Hi, about spark retry problem, still exists in 0.4.6 with consistency on #697 - Spark retry problem causing duplicate files
- [SUPPORT] Commits stays INFLIGHT forever after S3 consistency check fails when Hudi tries to delete duplicate datafiles #1764 - Commits stay INFLIGHT due to duplicate file cleanup failures
Describe the solution
When DIRECT markers are configured:
-
Store WriteStatus in completion markers - After a file is successfully written, serialize and store the
WriteStatusin the completed marker file (with checksum for integrity) -
Recover WriteStatus on retry - When a task retry attempts to create an in-progress marker, check if a completed marker already exists. If it does:
- Deserialize and recover the
WriteStatusfrom the existing marker - Skip the write operation entirely
- Reuse the original data file
- Deserialize and recover the
-
Support file splitting - Handle scenarios where a single executor splits records into multiple files by recovering
WriteStatusfor all files with matching fileId prefix
Key changes required
HoodieWriteHandle: AddhasRecoveredWriteStatusflag andrecoveredWriteStatuseslist; modifycreateInProgressMarkerFileto return boolean and recover write statuses from existing completed markersDirectWriteMarkers: Add methods to write serialized data with checksum to completion markers and read it back- Various write handles (
HoodieAppendHandle,HoodieCreateHandle,HoodieMergeHandle): Store serializedWriteStatusin completed markers - Table classes: Check for recovered write status and skip re-writing if already completed
Benefits
- Prevents duplicate data files during task/stage retries
- Avoids corrupted partial files from killed speculative tasks
- Improves job reliability when speculation is enabled
- No data loss since the original successfully written file is preserved
Additional context
This enhancement works with DIRECT markers only. Timeline-server-based markers do not support storing content in markers.