fix(dataset-raw): do not unwrap range when closing the file#2046
fix(dataset-raw): do not unwrap range when closing the file#2046
Conversation
When a parquet file is being closed (due to exceeding size limit etc.), some of the metadata related to that segment is extracted from the writer for logging/saving in the metadata DB. One such piece of metadata is the block range currently being written. This range is optional in the writer and is initialized to `None`. It only gets assigned a `Some` value whenever rows associated with a block range are written to the current file. However, if the file gets closed before any rows have been written to it, the current range is still `None`. Due to this, it is incorrect to unwrap the current range when closing the file. This patch changes the writer to handle the above described case gracefully, aborting (closing without flush) the current file.
Add logs in the scenario where we are closing the current parquet file but the current range is `None`, which lead to runtime panics in the past. At the moment these panics should be temporarily suppressed because of `ParquetFileWriter::abort` but this isn't a good long term solution. These logs are intended to help us understand the circumstances under which this scenario occurs, so that we can eventually remove the `abort` logic and handle this scenario properly.
There was a problem hiding this comment.
Code review of the fix for the unwrap panic when closing a parquet file with no current range.
Summary of findings (by priority):
Design / Potential Bugs:
ParquetFileWriter::abort()callsself.writer.close()which actually finalizes and writes the parquet file to object storage — the doc comment says "without finalizing" but the file is written. This leaves orphaned files in the object store (no metadata DB entry, no cleanup).current_range_hashin the reorg branch ofwarn_if_no_current_rangeis alwaysNone(dead code — the function returns early whencurrent_range.is_some()).close()onRawTableWritercan now silently abort the final file and returnOk(None)— callers may not expect this.- When
close_current_filereturnsNone(abort path), the subsequentranges_to_write.pop().unwrap()and range-splitting logic at line 361-363 proceeds as if a valid range was partially written — worth verifying this is correct for the abort case.
Logging guideline violations (docs/code/logging.md):
- String interpolation in
abort()logs instead of structured fields - Present progressive tense ("Aborting...") instead of past tense
- Messages end with periods
- Field name
blockshould beblock_numberper standard field names
Error handling (docs/code/errors-handling.md):
.unwrap()at line 425 in new code lacks// SAFETY:comment
No test coverage for the new abort path — this is understandable given this is a targeted hotfix with debug logging, but worth noting for follow-up.
| /// Aborts the writer without finalizing the Parquet file. | ||
| pub async fn abort(self) -> Result<(), ParquetError> { | ||
| let meta = async { self.writer.close().await } | ||
| .instrument(tracing::info_span!("abort_parquet_writer")) | ||
| .await?; | ||
|
|
||
| if meta.file_metadata().num_rows() > 0 { | ||
| tracing::warn!( | ||
| "Aborting ParquetFileWriter for {} with non-empty file ({} rows). The file may be incomplete or corrupted.", | ||
| self.filename, | ||
| meta.file_metadata().num_rows(), | ||
| ); | ||
| } else { | ||
| tracing::info!( | ||
| "Aborting ParquetFileWriter for {} with empty file. No data should have been written.", | ||
| self.filename, | ||
| ); | ||
| } | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
The doc comment says "Aborts the writer without finalizing the Parquet file", but the implementation calls self.writer.close().await which does finalize the parquet file — it flushes any buffered data and writes the parquet footer to the object store. The file is fully written to storage, just without Amp-specific key-value metadata.
This means every abort leaves an orphaned parquet file in the object store that will never be registered in the metadata DB and won't be cleaned up. Over time this could accumulate storage waste.
Consider either:
- Updating the doc comment to accurately reflect that the file is written but without Amp metadata, and noting the orphaned file implication.
- Or deleting the written file after
close()(e.g., viaself.store.delete(...)) so abort truly discards the file.
Option 2 would make the semantics match the name abort, but may require passing the store/revision info. At minimum the doc comment should be corrected.
| } else { | ||
| // We shouldn't reach this branch since `reorg` should only be true | ||
| // if `current_range` is `Some`. Log in case this assumption is violated. | ||
| let current_range_hash = self.current_range.as_ref().map(|r| r.hash.to_string()); | ||
| let incoming_prev_hash = table_rows.range.prev_hash.to_string(); | ||
| tracing::warn!( | ||
| block = block_num, | ||
| ?current_range_hash, | ||
| %incoming_prev_hash, | ||
| "Aborting current segment due to reorg", | ||
| ); | ||
| } |
There was a problem hiding this comment.
current_range_hash will always be None here. The function returns early at line 419 when self.current_range.is_some(), so by the time we reach this else branch, self.current_range is guaranteed to be None. The .as_ref().map(...) call is dead code and the log field is misleading — it suggests the hash could have a value.
Since the comment on line 443-444 already explains that reorg should only be true when current_range is Some, you could simplify this to just log without the current_range_hash field, or use a hardcoded current_range_hash = "None (unexpected)" to make the situation clearer.
| // We should be closing the last range. | ||
| assert_eq!(self.ranges_to_write.len(), 1); | ||
| self.close_current_file().await.map(Some) | ||
| self.close_current_file().await |
There was a problem hiding this comment.
nit: The close() method previously returned Result<Option<...>> with an explicit Some wrapping. Now that close_current_file() returns Option directly, close() passes it through. But this means that if current_range is None at the time close() is called, the file gets silently aborted and close() returns Ok(None).
Is that the intended behavior for the final close() call? The assert on line 462 (ranges_to_write.len() == 1) still passes, but the caller may not expect the final close to abort. It might be worth adding a log or assert here to flag if close() results in an abort rather than a proper finalization — this would help catch the "no current range" scenario at the end of the writer's lifecycle too.
| .map_err(RawTableWriterError::CloseCurrentFile)?; | ||
|
|
||
| // The current range was partially written, so we need to split it. | ||
| let range = self.ranges_to_write.pop().unwrap(); |
There was a problem hiding this comment.
When current_range is None and close_current_file() returns Ok(None), execution continues to line 362 where self.ranges_to_write.pop().unwrap() is called. This unwrap() assumes ranges_to_write is non-empty.
In the normal flow this invariant holds because we enter this block only if reorg || partition_size_exceeded || time_exceeded, and these checks happen after verifying ranges_to_write is not empty (line 311-316). So this should be safe. However, given this is specifically a defensive fix for an edge case that "shouldn't happen", it would be worth adding a brief comment explaining why the unwrap() is safe here, or consider using .expect("ranges_to_write must be non-empty when ...") to make the invariant self-documenting.
| parquet_meta = self | ||
| .close_current_file() | ||
| .await | ||
| .map_err(RawTableWriterError::CloseCurrentFile)?; |
There was a problem hiding this comment.
When current_range is None and close_current_file returns Ok(None), parquet_meta becomes None. This value gets returned at line 409, so the caller of write() receives Ok(None) — meaning "no file was finalized this call".
But we did just abort a file and open a new one (lines 364-382), and we're about to write rows to the new file (line 386-391). The caller doesn't learn that an abort happened. Is that acceptable? If the caller uses parquet_meta to track finalized files (e.g., for metadata DB registration), silently returning None means the aborted file is never accounted for — no metadata DB entry, no cleanup.
| tracing::warn!( | ||
| "Aborting ParquetFileWriter for {} with non-empty file ({} rows). The file may be incomplete or corrupted.", | ||
| self.filename, | ||
| meta.file_metadata().num_rows(), | ||
| ); |
There was a problem hiding this comment.
Logging guideline violations (docs/code/logging.md):
- Structured logging is mandatory:
self.filenameandmeta.file_metadata().num_rows()are interpolated into the message string. These should be structured fields instead:
tracing::warn!(
filename = %self.filename,
num_rows = meta.file_metadata().num_rows(),
"parquet file writer aborted with non-empty file",
);-
No punctuation in messages: Both messages end with periods (
.). -
Action-oriented past tense: Messages use present progressive "Aborting..." — should use past tense like "parquet file writer aborted".
-
Brief and clear: The editorial comment "The file may be incomplete or corrupted" / "No data should have been written" should either be structured fields or omitted.
Same issues apply to the info log on line 196-199.
| tracing::warn!( | ||
| block = block_num, | ||
| bytes_written, | ||
| byte_limit, | ||
| "Aborting current segment due to partition size exceeded", | ||
| ); |
There was a problem hiding this comment.
Logging guideline violations (docs/code/logging.md):
-
Field naming: Per the standard field names table, block numbers should use field name
block_number, notblock. This applies to all threetracing::warn!calls in this function. -
Action-oriented past tense: All three messages use "Aborting current segment due to..." — should use past tense, e.g. "current segment aborted, partition size exceeded".
These apply to lines 427-432, 436-441, and 447-452.
| } | ||
|
|
||
| if partition_size_exceeded { | ||
| let bytes_written = self.current_file.as_ref().unwrap().bytes_written(); |
There was a problem hiding this comment.
This .unwrap() is in new code and lacks the // SAFETY: comment required by docs/code/errors-handling.md.
The caller (write()) guarantees current_file.is_some() at line 332 (an earlier unwrap on current_file is reached before this function is called), but this isn't locally provable within warn_if_no_current_range. Consider either:
- Adding a
// SAFETY: current_file is guaranteed to be Some because <reason>comment - Or passing
bytes_writtenas a parameter to avoid the unwrap entirely
| @@ -336,15 +335,28 @@ impl RawTableWriter { | |||
| let time_exceeded = self.segment_opened_at.elapsed() >= self.opts.segment_flush_interval | |||
| && self.current_file.as_ref().unwrap().rows_written() > 0; | |||
| if reorg || partition_size_exceeded || time_exceeded { | |||
There was a problem hiding this comment.
If this is happening randomly, we could suspect the time_exceeded condition. though above we do require at least one row written.
leoyvens
left a comment
There was a problem hiding this comment.
Actually maybe we shouldn't merge if we don't know what's going on. Are you able to trigger a build from the branch if you need to deploy and test this?
When a parquet file is being closed (due to exceeding size limit etc.), some of the metadata related to that segment is extracted from the writer for logging/saving in the metadata DB. One such piece of metadata is the block range currently being written.
This range is optional in the writer and is initialized to
None. It only gets assigned aSomevalue whenever rows associated with a block range are written to the current file. However, if the file gets closed before any rows have been written to it, the current range is stillNone.Due to this, it is incorrect to unwrap the current range when closing the file. This patch changes the writer to handle the above described case gracefully, aborting (closing without flush) the current file.
Additionally, there are some more logs added that will give more insight if this ever happens again.