diff --git a/crates/core/parquet/src/writer.rs b/crates/core/parquet/src/writer.rs index 11f3442a5..02a48f79d 100644 --- a/crates/core/parquet/src/writer.rs +++ b/crates/core/parquet/src/writer.rs @@ -180,6 +180,28 @@ impl ParquetFileWriter { }) } + /// Aborts the writer without finalizing the Parquet file. + pub async fn abort(self) -> Result<(), ParquetError> { + let meta = async { self.writer.close().await } + .instrument(tracing::info_span!("abort_parquet_writer")) + .await?; + + if meta.file_metadata().num_rows() > 0 { + tracing::warn!( + "Aborting ParquetFileWriter for {} with non-empty file ({} rows). The file may be incomplete or corrupted.", + self.filename, + meta.file_metadata().num_rows(), + ); + } else { + tracing::info!( + "Aborting ParquetFileWriter for {} with empty file. No data should have been written.", + self.filename, + ); + } + + Ok(()) + } + /// Total bytes written: flushed row groups plus the in-progress row group's encoded size. pub fn bytes_written(&self) -> usize { self.writer.bytes_written() + self.writer.in_progress_size() diff --git a/crates/core/worker-datasets-raw/src/job_impl/writer.rs b/crates/core/worker-datasets-raw/src/job_impl/writer.rs index 286b31206..c6aaec711 100644 --- a/crates/core/worker-datasets-raw/src/job_impl/writer.rs +++ b/crates/core/worker-datasets-raw/src/job_impl/writer.rs @@ -277,11 +277,10 @@ impl RawTableWriter { .last() .is_some_and(|r| *r.end() < block_num) { - parquet_meta = Some( - self.close_current_file() - .await - .map_err(RawTableWriterError::CloseCurrentFile)?, - ); + parquet_meta = self + .close_current_file() + .await + .map_err(RawTableWriterError::CloseCurrentFile)?; assert!(self.current_file.is_none()); self.ranges_to_write.pop(); let new_file = self @@ -336,15 +335,28 @@ impl RawTableWriter { let time_exceeded = self.segment_opened_at.elapsed() >= self.opts.segment_flush_interval && self.current_file.as_ref().unwrap().rows_written() > 0; if reorg || partition_size_exceeded || time_exceeded { + // FIXME: Before the introduction of `ParquetFileWriter::abort`, this scenario + // (closing the current file + `current_range.is_none()`) would result in a panic. + // + // Aborting the current file is a best effort attempt to avoid that panic, but is + // not a good long term solution. These logs are intended to help us understand the + // circumstances under which this scenario occurs, so that we can eventually remove + // the `abort` logic and handle this scenario properly. + self.warn_if_no_current_range( + &table_rows, + block_num, + partition_size_exceeded, + time_exceeded, + ); + // `parquet_meta` would be `Some` if we have had just created a new a file above, so no // bytes would have been written yet. assert!(parquet_meta.is_none()); - parquet_meta = Some( - self.close_current_file() - .await - .map_err(RawTableWriterError::CloseCurrentFile)?, - ); + parquet_meta = self + .close_current_file() + .await + .map_err(RawTableWriterError::CloseCurrentFile)?; // The current range was partially written, so we need to split it. let range = self.ranges_to_write.pop().unwrap(); @@ -397,6 +409,50 @@ impl RawTableWriter { Ok(parquet_meta) } + fn warn_if_no_current_range( + &self, + table_rows: &TableRows, + block_num: u64, + partition_size_exceeded: bool, + time_exceeded: bool, + ) { + if self.current_range.is_some() { + // Not the scenario we are trying to debug, no need to log. + return; + } + + if partition_size_exceeded { + let bytes_written = self.current_file.as_ref().unwrap().bytes_written(); + let byte_limit = self.opts.partition.0.bytes; + tracing::warn!( + block = block_num, + bytes_written, + byte_limit, + "Aborting current segment due to partition size exceeded", + ); + } else if time_exceeded { + let elapsed = self.segment_opened_at.elapsed().as_secs(); + let time_limit = self.opts.segment_flush_interval.as_secs(); + tracing::warn!( + block = block_num, + elapsed, + time_limit, + "Aborting current segment due to time exceeded", + ); + } else { + // We shouldn't reach this branch since `reorg` should only be true + // if `current_range` is `Some`. Log in case this assumption is violated. + let current_range_hash = self.current_range.as_ref().map(|r| r.hash.to_string()); + let incoming_prev_hash = table_rows.range.prev_hash.to_string(); + tracing::warn!( + block = block_num, + ?current_range_hash, + %incoming_prev_hash, + "Aborting current segment due to reorg", + ); + } + } + async fn close(mut self) -> Result, CloseCurrentFileError> { if self.current_file.is_none() { assert!(self.ranges_to_write.is_empty()); @@ -404,32 +460,39 @@ impl RawTableWriter { } // We should be closing the last range. assert_eq!(self.ranges_to_write.len(), 1); - self.close_current_file().await.map(Some) + self.close_current_file().await } async fn close_current_file( &mut self, - ) -> Result { + ) -> Result, CloseCurrentFileError> { assert!(self.current_file.is_some()); let file = self.current_file.take().unwrap(); - let range = self.current_range.take().unwrap(); - let metadata = file - .close(range, vec![], Generation::default()) - .await - .map_err(CloseCurrentFileError::Close)?; + match self.current_range.take() { + Some(range) => { + let metadata = file + .close(range, vec![], Generation::default()) + .await + .map_err(CloseCurrentFileError::Close)?; - if let Some(ref metrics) = self.metrics { - let table_name = self.table.table_name().to_string(); - let location_id = self.table.location_id(); - metrics.record_file_written(table_name, *location_id); - } + if let Some(ref metrics) = self.metrics { + let table_name = self.table.table_name().to_string(); + let location_id = self.table.location_id(); + metrics.record_file_written(table_name, *location_id); + } - self.compactor - .try_run() - .map_err(CloseCurrentFileError::Compactor)?; + self.compactor + .try_run() + .map_err(CloseCurrentFileError::Compactor)?; - Ok(metadata) + Ok(Some(metadata)) + } + None => { + file.abort().await.map_err(CloseCurrentFileError::Abort)?; + Ok(None) + } + } } fn post_write_metrics(&self, rows: &RecordBatch) { @@ -519,6 +582,13 @@ pub enum CloseCurrentFileError { /// - Resource exhaustion during compaction #[error("Failed to run compactor")] Compactor(#[source] AmpCompactorTaskError), + + /// Failed to abort the parquet file writer when closing a file with no rows written. + /// + /// This occurs when the writer encounters an error while trying to abort the current file, + /// most likely due to an I/O error. + #[error("Failed to abort file writer")] + Abort(#[source] ParquetError), } impl RetryableErrorExt for CloseCurrentFileError { @@ -526,6 +596,7 @@ impl RetryableErrorExt for CloseCurrentFileError { match self { Self::Close(err) => err.is_retryable(), Self::Compactor(err) => err.is_retryable(), + Self::Abort(_) => true, } } }