Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions crates/core/parquet/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,28 @@ impl ParquetFileWriter {
})
}

/// Aborts the writer without finalizing the Parquet file.
pub async fn abort(self) -> Result<(), ParquetError> {
let meta = async { self.writer.close().await }
.instrument(tracing::info_span!("abort_parquet_writer"))
.await?;

if meta.file_metadata().num_rows() > 0 {
tracing::warn!(
"Aborting ParquetFileWriter for {} with non-empty file ({} rows). The file may be incomplete or corrupted.",
self.filename,
meta.file_metadata().num_rows(),
);
Comment on lines +190 to +194
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging guideline violations (docs/code/logging.md):

  1. Structured logging is mandatory: self.filename and meta.file_metadata().num_rows() are interpolated into the message string. These should be structured fields instead:
tracing::warn!(
    filename = %self.filename,
    num_rows = meta.file_metadata().num_rows(),
    "parquet file writer aborted with non-empty file",
);
  1. No punctuation in messages: Both messages end with periods (.).

  2. Action-oriented past tense: Messages use present progressive "Aborting..." — should use past tense like "parquet file writer aborted".

  3. Brief and clear: The editorial comment "The file may be incomplete or corrupted" / "No data should have been written" should either be structured fields or omitted.

Same issues apply to the info log on line 196-199.

} else {
tracing::info!(
"Aborting ParquetFileWriter for {} with empty file. No data should have been written.",
self.filename,
);
}

Ok(())
}
Comment on lines +183 to +203
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc comment says "Aborts the writer without finalizing the Parquet file", but the implementation calls self.writer.close().await which does finalize the parquet file — it flushes any buffered data and writes the parquet footer to the object store. The file is fully written to storage, just without Amp-specific key-value metadata.

This means every abort leaves an orphaned parquet file in the object store that will never be registered in the metadata DB and won't be cleaned up. Over time this could accumulate storage waste.

Consider either:

  1. Updating the doc comment to accurately reflect that the file is written but without Amp metadata, and noting the orphaned file implication.
  2. Or deleting the written file after close() (e.g., via self.store.delete(...)) so abort truly discards the file.

Option 2 would make the semantics match the name abort, but may require passing the store/revision info. At minimum the doc comment should be corrected.


/// Total bytes written: flushed row groups plus the in-progress row group's encoded size.
pub fn bytes_written(&self) -> usize {
self.writer.bytes_written() + self.writer.in_progress_size()
Expand Down
123 changes: 97 additions & 26 deletions crates/core/worker-datasets-raw/src/job_impl/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,10 @@ impl RawTableWriter {
.last()
.is_some_and(|r| *r.end() < block_num)
{
parquet_meta = Some(
self.close_current_file()
.await
.map_err(RawTableWriterError::CloseCurrentFile)?,
);
parquet_meta = self
.close_current_file()
.await
.map_err(RawTableWriterError::CloseCurrentFile)?;
assert!(self.current_file.is_none());
self.ranges_to_write.pop();
let new_file = self
Expand Down Expand Up @@ -336,15 +335,28 @@ impl RawTableWriter {
let time_exceeded = self.segment_opened_at.elapsed() >= self.opts.segment_flush_interval
&& self.current_file.as_ref().unwrap().rows_written() > 0;
if reorg || partition_size_exceeded || time_exceeded {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is happening randomly, we could suspect the time_exceeded condition. though above we do require at least one row written.

// FIXME: Before the introduction of `ParquetFileWriter::abort`, this scenario
// (closing the current file + `current_range.is_none()`) would result in a panic.
//
// Aborting the current file is a best effort attempt to avoid that panic, but is
// not a good long term solution. These logs are intended to help us understand the
// circumstances under which this scenario occurs, so that we can eventually remove
// the `abort` logic and handle this scenario properly.
self.warn_if_no_current_range(
&table_rows,
block_num,
partition_size_exceeded,
time_exceeded,
);

// `parquet_meta` would be `Some` if we have had just created a new a file above, so no
// bytes would have been written yet.
assert!(parquet_meta.is_none());

parquet_meta = Some(
self.close_current_file()
.await
.map_err(RawTableWriterError::CloseCurrentFile)?,
);
parquet_meta = self
.close_current_file()
.await
.map_err(RawTableWriterError::CloseCurrentFile)?;
Comment on lines +356 to +359
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When current_range is None and close_current_file returns Ok(None), parquet_meta becomes None. This value gets returned at line 409, so the caller of write() receives Ok(None) — meaning "no file was finalized this call".

But we did just abort a file and open a new one (lines 364-382), and we're about to write rows to the new file (line 386-391). The caller doesn't learn that an abort happened. Is that acceptable? If the caller uses parquet_meta to track finalized files (e.g., for metadata DB registration), silently returning None means the aborted file is never accounted for — no metadata DB entry, no cleanup.


// The current range was partially written, so we need to split it.
let range = self.ranges_to_write.pop().unwrap();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When current_range is None and close_current_file() returns Ok(None), execution continues to line 362 where self.ranges_to_write.pop().unwrap() is called. This unwrap() assumes ranges_to_write is non-empty.

In the normal flow this invariant holds because we enter this block only if reorg || partition_size_exceeded || time_exceeded, and these checks happen after verifying ranges_to_write is not empty (line 311-316). So this should be safe. However, given this is specifically a defensive fix for an edge case that "shouldn't happen", it would be worth adding a brief comment explaining why the unwrap() is safe here, or consider using .expect("ranges_to_write must be non-empty when ...") to make the invariant self-documenting.

Expand Down Expand Up @@ -397,39 +409,90 @@ impl RawTableWriter {
Ok(parquet_meta)
}

fn warn_if_no_current_range(
&self,
table_rows: &TableRows,
block_num: u64,
partition_size_exceeded: bool,
time_exceeded: bool,
) {
if self.current_range.is_some() {
// Not the scenario we are trying to debug, no need to log.
return;
}

if partition_size_exceeded {
let bytes_written = self.current_file.as_ref().unwrap().bytes_written();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This .unwrap() is in new code and lacks the // SAFETY: comment required by docs/code/errors-handling.md.

The caller (write()) guarantees current_file.is_some() at line 332 (an earlier unwrap on current_file is reached before this function is called), but this isn't locally provable within warn_if_no_current_range. Consider either:

  • Adding a // SAFETY: current_file is guaranteed to be Some because <reason> comment
  • Or passing bytes_written as a parameter to avoid the unwrap entirely

let byte_limit = self.opts.partition.0.bytes;
tracing::warn!(
block = block_num,
bytes_written,
byte_limit,
"Aborting current segment due to partition size exceeded",
);
Comment on lines +427 to +432
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging guideline violations (docs/code/logging.md):

  1. Field naming: Per the standard field names table, block numbers should use field name block_number, not block. This applies to all three tracing::warn! calls in this function.

  2. Action-oriented past tense: All three messages use "Aborting current segment due to..." — should use past tense, e.g. "current segment aborted, partition size exceeded".

These apply to lines 427-432, 436-441, and 447-452.

} else if time_exceeded {
let elapsed = self.segment_opened_at.elapsed().as_secs();
let time_limit = self.opts.segment_flush_interval.as_secs();
tracing::warn!(
block = block_num,
elapsed,
time_limit,
"Aborting current segment due to time exceeded",
);
} else {
// We shouldn't reach this branch since `reorg` should only be true
// if `current_range` is `Some`. Log in case this assumption is violated.
let current_range_hash = self.current_range.as_ref().map(|r| r.hash.to_string());
let incoming_prev_hash = table_rows.range.prev_hash.to_string();
tracing::warn!(
block = block_num,
?current_range_hash,
%incoming_prev_hash,
"Aborting current segment due to reorg",
);
}
Comment on lines +442 to +453
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current_range_hash will always be None here. The function returns early at line 419 when self.current_range.is_some(), so by the time we reach this else branch, self.current_range is guaranteed to be None. The .as_ref().map(...) call is dead code and the log field is misleading — it suggests the hash could have a value.

Since the comment on line 443-444 already explains that reorg should only be true when current_range is Some, you could simplify this to just log without the current_range_hash field, or use a hardcoded current_range_hash = "None (unexpected)" to make the situation clearer.

}

async fn close(mut self) -> Result<Option<ParquetFileWriterOutput>, CloseCurrentFileError> {
if self.current_file.is_none() {
assert!(self.ranges_to_write.is_empty());
return Ok(None);
}
// We should be closing the last range.
assert_eq!(self.ranges_to_write.len(), 1);
self.close_current_file().await.map(Some)
self.close_current_file().await
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The close() method previously returned Result<Option<...>> with an explicit Some wrapping. Now that close_current_file() returns Option directly, close() passes it through. But this means that if current_range is None at the time close() is called, the file gets silently aborted and close() returns Ok(None).

Is that the intended behavior for the final close() call? The assert on line 462 (ranges_to_write.len() == 1) still passes, but the caller may not expect the final close to abort. It might be worth adding a log or assert here to flag if close() results in an abort rather than a proper finalization — this would help catch the "no current range" scenario at the end of the writer's lifecycle too.

}

async fn close_current_file(
&mut self,
) -> Result<ParquetFileWriterOutput, CloseCurrentFileError> {
) -> Result<Option<ParquetFileWriterOutput>, CloseCurrentFileError> {
assert!(self.current_file.is_some());
let file = self.current_file.take().unwrap();
let range = self.current_range.take().unwrap();

let metadata = file
.close(range, vec![], Generation::default())
.await
.map_err(CloseCurrentFileError::Close)?;
match self.current_range.take() {
Some(range) => {
let metadata = file
.close(range, vec![], Generation::default())
.await
.map_err(CloseCurrentFileError::Close)?;

if let Some(ref metrics) = self.metrics {
let table_name = self.table.table_name().to_string();
let location_id = self.table.location_id();
metrics.record_file_written(table_name, *location_id);
}
if let Some(ref metrics) = self.metrics {
let table_name = self.table.table_name().to_string();
let location_id = self.table.location_id();
metrics.record_file_written(table_name, *location_id);
}

self.compactor
.try_run()
.map_err(CloseCurrentFileError::Compactor)?;
self.compactor
.try_run()
.map_err(CloseCurrentFileError::Compactor)?;

Ok(metadata)
Ok(Some(metadata))
}
None => {
file.abort().await.map_err(CloseCurrentFileError::Abort)?;
Ok(None)
}
}
}

fn post_write_metrics(&self, rows: &RecordBatch) {
Expand Down Expand Up @@ -519,13 +582,21 @@ pub enum CloseCurrentFileError {
/// - Resource exhaustion during compaction
#[error("Failed to run compactor")]
Compactor(#[source] AmpCompactorTaskError),

/// Failed to abort the parquet file writer when closing a file with no rows written.
///
/// This occurs when the writer encounters an error while trying to abort the current file,
/// most likely due to an I/O error.
#[error("Failed to abort file writer")]
Abort(#[source] ParquetError),
}

impl RetryableErrorExt for CloseCurrentFileError {
fn is_retryable(&self) -> bool {
match self {
Self::Close(err) => err.is_retryable(),
Self::Compactor(err) => err.is_retryable(),
Self::Abort(_) => true,
}
}
}
Expand Down
Loading