From 43d016fa77291b9476edcd84bac18fa670761ec5 Mon Sep 17 00:00:00 2001 From: sistemd Date: Mon, 30 Mar 2026 14:44:28 +0200 Subject: [PATCH 1/2] fix(dataset-raw): do not unwrap range when closing the file When a parquet file is being closed (due to exceeding size limit etc.), some of the metadata related to that segment is extracted from the writer for logging/saving in the metadata DB. One such piece of metadata is the block range currently being written. This range is optional in the writer and is initialized to `None`. It only gets assigned a `Some` value whenever rows associated with a block range are written to the current file. However, if the file gets closed before any rows have been written to it, the current range is still `None`. Due to this, it is incorrect to unwrap the current range when closing the file. This patch changes the writer to handle the above described case gracefully, aborting (closing without flush) the current file. --- crates/core/parquet/src/writer.rs | 22 +++++++ .../src/job_impl/writer.rs | 65 +++++++++++-------- 2 files changed, 61 insertions(+), 26 deletions(-) diff --git a/crates/core/parquet/src/writer.rs b/crates/core/parquet/src/writer.rs index 11f3442a5..02a48f79d 100644 --- a/crates/core/parquet/src/writer.rs +++ b/crates/core/parquet/src/writer.rs @@ -180,6 +180,28 @@ impl ParquetFileWriter { }) } + /// Aborts the writer without finalizing the Parquet file. + pub async fn abort(self) -> Result<(), ParquetError> { + let meta = async { self.writer.close().await } + .instrument(tracing::info_span!("abort_parquet_writer")) + .await?; + + if meta.file_metadata().num_rows() > 0 { + tracing::warn!( + "Aborting ParquetFileWriter for {} with non-empty file ({} rows). The file may be incomplete or corrupted.", + self.filename, + meta.file_metadata().num_rows(), + ); + } else { + tracing::info!( + "Aborting ParquetFileWriter for {} with empty file. No data should have been written.", + self.filename, + ); + } + + Ok(()) + } + /// Total bytes written: flushed row groups plus the in-progress row group's encoded size. pub fn bytes_written(&self) -> usize { self.writer.bytes_written() + self.writer.in_progress_size() diff --git a/crates/core/worker-datasets-raw/src/job_impl/writer.rs b/crates/core/worker-datasets-raw/src/job_impl/writer.rs index 286b31206..e8fb60d1b 100644 --- a/crates/core/worker-datasets-raw/src/job_impl/writer.rs +++ b/crates/core/worker-datasets-raw/src/job_impl/writer.rs @@ -277,11 +277,10 @@ impl RawTableWriter { .last() .is_some_and(|r| *r.end() < block_num) { - parquet_meta = Some( - self.close_current_file() - .await - .map_err(RawTableWriterError::CloseCurrentFile)?, - ); + parquet_meta = self + .close_current_file() + .await + .map_err(RawTableWriterError::CloseCurrentFile)?; assert!(self.current_file.is_none()); self.ranges_to_write.pop(); let new_file = self @@ -340,11 +339,10 @@ impl RawTableWriter { // bytes would have been written yet. assert!(parquet_meta.is_none()); - parquet_meta = Some( - self.close_current_file() - .await - .map_err(RawTableWriterError::CloseCurrentFile)?, - ); + parquet_meta = self + .close_current_file() + .await + .map_err(RawTableWriterError::CloseCurrentFile)?; // The current range was partially written, so we need to split it. let range = self.ranges_to_write.pop().unwrap(); @@ -404,32 +402,39 @@ impl RawTableWriter { } // We should be closing the last range. assert_eq!(self.ranges_to_write.len(), 1); - self.close_current_file().await.map(Some) + self.close_current_file().await } async fn close_current_file( &mut self, - ) -> Result { + ) -> Result, CloseCurrentFileError> { assert!(self.current_file.is_some()); let file = self.current_file.take().unwrap(); - let range = self.current_range.take().unwrap(); - let metadata = file - .close(range, vec![], Generation::default()) - .await - .map_err(CloseCurrentFileError::Close)?; + match self.current_range.take() { + Some(range) => { + let metadata = file + .close(range, vec![], Generation::default()) + .await + .map_err(CloseCurrentFileError::Close)?; - if let Some(ref metrics) = self.metrics { - let table_name = self.table.table_name().to_string(); - let location_id = self.table.location_id(); - metrics.record_file_written(table_name, *location_id); - } + if let Some(ref metrics) = self.metrics { + let table_name = self.table.table_name().to_string(); + let location_id = self.table.location_id(); + metrics.record_file_written(table_name, *location_id); + } - self.compactor - .try_run() - .map_err(CloseCurrentFileError::Compactor)?; + self.compactor + .try_run() + .map_err(CloseCurrentFileError::Compactor)?; - Ok(metadata) + Ok(Some(metadata)) + } + None => { + file.abort().await.map_err(CloseCurrentFileError::Abort)?; + Ok(None) + } + } } fn post_write_metrics(&self, rows: &RecordBatch) { @@ -519,6 +524,13 @@ pub enum CloseCurrentFileError { /// - Resource exhaustion during compaction #[error("Failed to run compactor")] Compactor(#[source] AmpCompactorTaskError), + + /// Failed to abort the parquet file writer when closing a file with no rows written. + /// + /// This occurs when the writer encounters an error while trying to abort the current file, + /// most likely due to an I/O error. + #[error("Failed to abort file writer")] + Abort(#[source] ParquetError), } impl RetryableErrorExt for CloseCurrentFileError { @@ -526,6 +538,7 @@ impl RetryableErrorExt for CloseCurrentFileError { match self { Self::Close(err) => err.is_retryable(), Self::Compactor(err) => err.is_retryable(), + Self::Abort(_) => true, } } } From d056446316936e0ce77aa00f55c0eab42d3e43ff Mon Sep 17 00:00:00 2001 From: sistemd Date: Mon, 30 Mar 2026 14:44:29 +0200 Subject: [PATCH 2/2] dbg(dataset-raw): warn when closing parquet file without block range Add logs in the scenario where we are closing the current parquet file but the current range is `None`, which lead to runtime panics in the past. At the moment these panics should be temporarily suppressed because of `ParquetFileWriter::abort` but this isn't a good long term solution. These logs are intended to help us understand the circumstances under which this scenario occurs, so that we can eventually remove the `abort` logic and handle this scenario properly. --- .../src/job_impl/writer.rs | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/crates/core/worker-datasets-raw/src/job_impl/writer.rs b/crates/core/worker-datasets-raw/src/job_impl/writer.rs index e8fb60d1b..c6aaec711 100644 --- a/crates/core/worker-datasets-raw/src/job_impl/writer.rs +++ b/crates/core/worker-datasets-raw/src/job_impl/writer.rs @@ -335,6 +335,20 @@ impl RawTableWriter { let time_exceeded = self.segment_opened_at.elapsed() >= self.opts.segment_flush_interval && self.current_file.as_ref().unwrap().rows_written() > 0; if reorg || partition_size_exceeded || time_exceeded { + // FIXME: Before the introduction of `ParquetFileWriter::abort`, this scenario + // (closing the current file + `current_range.is_none()`) would result in a panic. + // + // Aborting the current file is a best effort attempt to avoid that panic, but is + // not a good long term solution. These logs are intended to help us understand the + // circumstances under which this scenario occurs, so that we can eventually remove + // the `abort` logic and handle this scenario properly. + self.warn_if_no_current_range( + &table_rows, + block_num, + partition_size_exceeded, + time_exceeded, + ); + // `parquet_meta` would be `Some` if we have had just created a new a file above, so no // bytes would have been written yet. assert!(parquet_meta.is_none()); @@ -395,6 +409,50 @@ impl RawTableWriter { Ok(parquet_meta) } + fn warn_if_no_current_range( + &self, + table_rows: &TableRows, + block_num: u64, + partition_size_exceeded: bool, + time_exceeded: bool, + ) { + if self.current_range.is_some() { + // Not the scenario we are trying to debug, no need to log. + return; + } + + if partition_size_exceeded { + let bytes_written = self.current_file.as_ref().unwrap().bytes_written(); + let byte_limit = self.opts.partition.0.bytes; + tracing::warn!( + block = block_num, + bytes_written, + byte_limit, + "Aborting current segment due to partition size exceeded", + ); + } else if time_exceeded { + let elapsed = self.segment_opened_at.elapsed().as_secs(); + let time_limit = self.opts.segment_flush_interval.as_secs(); + tracing::warn!( + block = block_num, + elapsed, + time_limit, + "Aborting current segment due to time exceeded", + ); + } else { + // We shouldn't reach this branch since `reorg` should only be true + // if `current_range` is `Some`. Log in case this assumption is violated. + let current_range_hash = self.current_range.as_ref().map(|r| r.hash.to_string()); + let incoming_prev_hash = table_rows.range.prev_hash.to_string(); + tracing::warn!( + block = block_num, + ?current_range_hash, + %incoming_prev_hash, + "Aborting current segment due to reorg", + ); + } + } + async fn close(mut self) -> Result, CloseCurrentFileError> { if self.current_file.is_none() { assert!(self.ranges_to_write.is_empty());