-
Notifications
You must be signed in to change notification settings - Fork 6
fix(dataset-raw): do not unwrap range when closing the file #2046
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -180,6 +180,28 @@ impl ParquetFileWriter { | |
| }) | ||
| } | ||
|
|
||
| /// Aborts the writer without finalizing the Parquet file. | ||
| pub async fn abort(self) -> Result<(), ParquetError> { | ||
| let meta = async { self.writer.close().await } | ||
| .instrument(tracing::info_span!("abort_parquet_writer")) | ||
| .await?; | ||
|
|
||
| if meta.file_metadata().num_rows() > 0 { | ||
| tracing::warn!( | ||
| "Aborting ParquetFileWriter for {} with non-empty file ({} rows). The file may be incomplete or corrupted.", | ||
| self.filename, | ||
| meta.file_metadata().num_rows(), | ||
| ); | ||
| } else { | ||
| tracing::info!( | ||
| "Aborting ParquetFileWriter for {} with empty file. No data should have been written.", | ||
| self.filename, | ||
| ); | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
Comment on lines
+183
to
+203
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The doc comment says "Aborts the writer without finalizing the Parquet file", but the implementation calls This means every abort leaves an orphaned parquet file in the object store that will never be registered in the metadata DB and won't be cleaned up. Over time this could accumulate storage waste. Consider either:
Option 2 would make the semantics match the name |
||
|
|
||
| /// Total bytes written: flushed row groups plus the in-progress row group's encoded size. | ||
| pub fn bytes_written(&self) -> usize { | ||
| self.writer.bytes_written() + self.writer.in_progress_size() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -277,11 +277,10 @@ impl RawTableWriter { | |
| .last() | ||
| .is_some_and(|r| *r.end() < block_num) | ||
| { | ||
| parquet_meta = Some( | ||
| self.close_current_file() | ||
| .await | ||
| .map_err(RawTableWriterError::CloseCurrentFile)?, | ||
| ); | ||
| parquet_meta = self | ||
| .close_current_file() | ||
| .await | ||
| .map_err(RawTableWriterError::CloseCurrentFile)?; | ||
| assert!(self.current_file.is_none()); | ||
| self.ranges_to_write.pop(); | ||
| let new_file = self | ||
|
|
@@ -336,15 +335,28 @@ impl RawTableWriter { | |
| let time_exceeded = self.segment_opened_at.elapsed() >= self.opts.segment_flush_interval | ||
| && self.current_file.as_ref().unwrap().rows_written() > 0; | ||
| if reorg || partition_size_exceeded || time_exceeded { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is happening randomly, we could suspect the |
||
| // FIXME: Before the introduction of `ParquetFileWriter::abort`, this scenario | ||
| // (closing the current file + `current_range.is_none()`) would result in a panic. | ||
| // | ||
| // Aborting the current file is a best effort attempt to avoid that panic, but is | ||
| // not a good long term solution. These logs are intended to help us understand the | ||
| // circumstances under which this scenario occurs, so that we can eventually remove | ||
| // the `abort` logic and handle this scenario properly. | ||
| self.warn_if_no_current_range( | ||
| &table_rows, | ||
| block_num, | ||
| partition_size_exceeded, | ||
| time_exceeded, | ||
| ); | ||
|
|
||
| // `parquet_meta` would be `Some` if we have had just created a new a file above, so no | ||
| // bytes would have been written yet. | ||
| assert!(parquet_meta.is_none()); | ||
|
|
||
| parquet_meta = Some( | ||
| self.close_current_file() | ||
| .await | ||
| .map_err(RawTableWriterError::CloseCurrentFile)?, | ||
| ); | ||
| parquet_meta = self | ||
| .close_current_file() | ||
| .await | ||
| .map_err(RawTableWriterError::CloseCurrentFile)?; | ||
|
Comment on lines
+356
to
+359
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When But we did just abort a file and open a new one (lines 364-382), and we're about to write rows to the new file (line 386-391). The caller doesn't learn that an abort happened. Is that acceptable? If the caller uses |
||
|
|
||
| // The current range was partially written, so we need to split it. | ||
| let range = self.ranges_to_write.pop().unwrap(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When In the normal flow this invariant holds because we enter this block only if |
||
|
|
@@ -397,39 +409,90 @@ impl RawTableWriter { | |
| Ok(parquet_meta) | ||
| } | ||
|
|
||
| fn warn_if_no_current_range( | ||
| &self, | ||
| table_rows: &TableRows, | ||
| block_num: u64, | ||
| partition_size_exceeded: bool, | ||
| time_exceeded: bool, | ||
| ) { | ||
| if self.current_range.is_some() { | ||
| // Not the scenario we are trying to debug, no need to log. | ||
| return; | ||
| } | ||
|
|
||
| if partition_size_exceeded { | ||
| let bytes_written = self.current_file.as_ref().unwrap().bytes_written(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This The caller (
|
||
| let byte_limit = self.opts.partition.0.bytes; | ||
| tracing::warn!( | ||
| block = block_num, | ||
| bytes_written, | ||
| byte_limit, | ||
| "Aborting current segment due to partition size exceeded", | ||
| ); | ||
|
Comment on lines
+427
to
+432
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logging guideline violations (
These apply to lines 427-432, 436-441, and 447-452. |
||
| } else if time_exceeded { | ||
| let elapsed = self.segment_opened_at.elapsed().as_secs(); | ||
| let time_limit = self.opts.segment_flush_interval.as_secs(); | ||
| tracing::warn!( | ||
| block = block_num, | ||
| elapsed, | ||
| time_limit, | ||
| "Aborting current segment due to time exceeded", | ||
| ); | ||
| } else { | ||
| // We shouldn't reach this branch since `reorg` should only be true | ||
| // if `current_range` is `Some`. Log in case this assumption is violated. | ||
| let current_range_hash = self.current_range.as_ref().map(|r| r.hash.to_string()); | ||
| let incoming_prev_hash = table_rows.range.prev_hash.to_string(); | ||
| tracing::warn!( | ||
| block = block_num, | ||
| ?current_range_hash, | ||
| %incoming_prev_hash, | ||
| "Aborting current segment due to reorg", | ||
| ); | ||
| } | ||
|
Comment on lines
+442
to
+453
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Since the comment on line 443-444 already explains that reorg should only be true when |
||
| } | ||
|
|
||
| async fn close(mut self) -> Result<Option<ParquetFileWriterOutput>, CloseCurrentFileError> { | ||
| if self.current_file.is_none() { | ||
| assert!(self.ranges_to_write.is_empty()); | ||
| return Ok(None); | ||
| } | ||
| // We should be closing the last range. | ||
| assert_eq!(self.ranges_to_write.len(), 1); | ||
| self.close_current_file().await.map(Some) | ||
| self.close_current_file().await | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: The Is that the intended behavior for the final |
||
| } | ||
|
|
||
| async fn close_current_file( | ||
| &mut self, | ||
| ) -> Result<ParquetFileWriterOutput, CloseCurrentFileError> { | ||
| ) -> Result<Option<ParquetFileWriterOutput>, CloseCurrentFileError> { | ||
| assert!(self.current_file.is_some()); | ||
| let file = self.current_file.take().unwrap(); | ||
| let range = self.current_range.take().unwrap(); | ||
|
|
||
| let metadata = file | ||
| .close(range, vec![], Generation::default()) | ||
| .await | ||
| .map_err(CloseCurrentFileError::Close)?; | ||
| match self.current_range.take() { | ||
| Some(range) => { | ||
| let metadata = file | ||
| .close(range, vec![], Generation::default()) | ||
| .await | ||
| .map_err(CloseCurrentFileError::Close)?; | ||
|
|
||
| if let Some(ref metrics) = self.metrics { | ||
| let table_name = self.table.table_name().to_string(); | ||
| let location_id = self.table.location_id(); | ||
| metrics.record_file_written(table_name, *location_id); | ||
| } | ||
| if let Some(ref metrics) = self.metrics { | ||
| let table_name = self.table.table_name().to_string(); | ||
| let location_id = self.table.location_id(); | ||
| metrics.record_file_written(table_name, *location_id); | ||
| } | ||
|
|
||
| self.compactor | ||
| .try_run() | ||
| .map_err(CloseCurrentFileError::Compactor)?; | ||
| self.compactor | ||
| .try_run() | ||
| .map_err(CloseCurrentFileError::Compactor)?; | ||
|
|
||
| Ok(metadata) | ||
| Ok(Some(metadata)) | ||
| } | ||
| None => { | ||
| file.abort().await.map_err(CloseCurrentFileError::Abort)?; | ||
| Ok(None) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn post_write_metrics(&self, rows: &RecordBatch) { | ||
|
|
@@ -519,13 +582,21 @@ pub enum CloseCurrentFileError { | |
| /// - Resource exhaustion during compaction | ||
| #[error("Failed to run compactor")] | ||
| Compactor(#[source] AmpCompactorTaskError), | ||
|
|
||
| /// Failed to abort the parquet file writer when closing a file with no rows written. | ||
| /// | ||
| /// This occurs when the writer encounters an error while trying to abort the current file, | ||
| /// most likely due to an I/O error. | ||
| #[error("Failed to abort file writer")] | ||
| Abort(#[source] ParquetError), | ||
| } | ||
|
|
||
| impl RetryableErrorExt for CloseCurrentFileError { | ||
| fn is_retryable(&self) -> bool { | ||
| match self { | ||
| Self::Close(err) => err.is_retryable(), | ||
| Self::Compactor(err) => err.is_retryable(), | ||
| Self::Abort(_) => true, | ||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logging guideline violations (
docs/code/logging.md):self.filenameandmeta.file_metadata().num_rows()are interpolated into the message string. These should be structured fields instead:No punctuation in messages: Both messages end with periods (
.).Action-oriented past tense: Messages use present progressive "Aborting..." — should use past tense like "parquet file writer aborted".
Brief and clear: The editorial comment "The file may be incomplete or corrupted" / "No data should have been written" should either be structured fields or omitted.
Same issues apply to the
infolog on line 196-199.