Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ mod tests {

use super::*;
use crate::arrow::delete_filter::tests::setup;
use crate::scan::FileScanTaskDeleteFile;
use crate::scan::{BaseFileScanTask, FileScanTaskDeleteFile};
use crate::spec::{DataContentType, Schema};

#[tokio::test]
Expand Down Expand Up @@ -927,19 +927,21 @@ mod tests {
};

let file_scan_task = FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/data-1.parquet", table_location.to_str().unwrap()),
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![2, 3],
predicate: None,
base: BaseFileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/data-1.parquet", table_location.to_str().unwrap()),
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![2, 3],
predicate: None,
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
},
deletes: vec![pos_del, eq_del],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
};

// Load the deletes - should handle both types without error
Expand Down
85 changes: 47 additions & 38 deletions crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,10 @@ impl DeleteFilter {
return Ok(None);
}

let bound_predicate = combined_predicate
.bind(file_scan_task.schema.clone(), file_scan_task.case_sensitive)?;
let bound_predicate = combined_predicate.bind(
file_scan_task.base.schema.clone(),
file_scan_task.base.case_sensitive,
)?;
Ok(Some(bound_predicate))
}

Expand Down Expand Up @@ -312,6 +314,7 @@ pub(crate) mod tests {
use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
use crate::expr::Reference;
use crate::io::FileIO;
use crate::scan::task::BaseFileScanTask;
use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Type};

type ArrowSchemaRef = Arc<ArrowSchema>;
Expand Down Expand Up @@ -432,34 +435,38 @@ pub(crate) mod tests {

let file_scan_tasks = vec![
FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()),
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
base: BaseFileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()),
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
},
deletes: vec![pos_del_1, pos_del_2.clone()],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
},
FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()),
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
base: BaseFileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()),
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
},
deletes: vec![pos_del_3],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
},
];

Expand Down Expand Up @@ -497,24 +504,26 @@ pub(crate) mod tests {

// ---------- fake FileScanTask ----------
let task = FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: "data.parquet".to_string(),
data_file_format: crate::spec::DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![],
predicate: None,
base: BaseFileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: "data.parquet".to_string(),
data_file_format: crate::spec::DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![],
predicate: None,
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: true,
},
deletes: vec![FileScanTaskDeleteFile {
file_path: "eq-del.parquet".to_string(),
file_type: DataContentType::EqualityDeletes,
partition_spec_id: 0,
equality_ids: None,
}],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: true,
};

let filter = DeleteFilter::default();
Expand Down
94 changes: 11 additions & 83 deletions crates/iceberg/src/arrow/incremental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,16 @@ use arrow_schema::Schema as ArrowSchema;
use futures::channel::mpsc::channel;
use futures::stream::select;
use futures::{Stream, StreamExt, TryStreamExt};
use parquet::arrow::arrow_reader::ArrowReaderOptions;

use crate::arrow::reader::process_record_batch_stream;
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
use crate::arrow::{ArrowReader, StreamsInto};
use crate::delete_vector::DeleteVector;
use crate::io::FileIO;
use crate::metadata_columns::{RESERVED_FIELD_ID_POS, row_pos_field};
use crate::runtime::spawn;
use crate::scan::ArrowRecordBatchStream;
use crate::scan::incremental::{
AppendedFileScanTask, DeleteScanTask, IncrementalFileScanTaskStreams,
};
use crate::spec::{Datum, PrimitiveType};
use crate::{Error, ErrorKind, Result};

/// Default batch size for incremental delete operations.
Expand All @@ -63,87 +59,19 @@ async fn process_incremental_append_task(
batch_size: Option<usize>,
file_io: FileIO,
) -> Result<ArrowRecordBatchStream> {
let mut virtual_columns = Vec::new();

// Check if _pos column is requested and add it as a virtual column
let has_pos_column = task.base.project_field_ids.contains(&RESERVED_FIELD_ID_POS);
if has_pos_column {
// Add _pos as a virtual column to be produced by the Parquet reader
virtual_columns.push(Arc::clone(row_pos_field()));
}

let arrow_reader_options = if !virtual_columns.is_empty() {
Some(ArrowReaderOptions::new().with_virtual_columns(virtual_columns.clone())?)
} else {
None
};

let mut record_batch_stream_builder = ArrowReader::create_parquet_record_batch_stream_builder(
&task.base.data_file_path,
// Call the unified file scanning method with pre-loaded positional deletes
// Incremental scans don't use predicates or row group filtering, only positional deletes
ArrowReader::process_base_file_scan_task(
task.base,
batch_size,
file_io,
true,
arrow_reader_options,
task.positional_deletes,
None, // No byte range filtering
None, // No pre-loaded row selection
false, // No row group filtering for incremental scans
false, // No row selection filtering beyond positional deletes
)
.await?;

// Create a projection mask for the batch stream to select which columns in the
// Parquet file that we want in the response
let projection_mask = ArrowReader::get_arrow_projection_mask(
&task.base.project_field_ids,
&task.schema_ref(),
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
false, // use_fallback
)?;
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);

// RecordBatchTransformer performs any transformations required on the RecordBatches
// that come back from the file, such as type promotion, default column insertion,
// column re-ordering, and virtual field addition (like _file)
let datum = Datum::new(
PrimitiveType::String,
crate::spec::PrimitiveLiteral::String(task.base.data_file_path.clone()),
);
let mut record_batch_transformer_builder =
RecordBatchTransformerBuilder::new(task.schema_ref(), &task.base.project_field_ids)
.with_constant(crate::metadata_columns::RESERVED_FIELD_ID_FILE, datum);

if has_pos_column {
record_batch_transformer_builder =
record_batch_transformer_builder.with_virtual_field(Arc::clone(row_pos_field()))?;
}

let mut record_batch_transformer = record_batch_transformer_builder.build();

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
}

// Apply positional deletes as row selections.
let row_selection = if let Some(positional_delete_indexes) = task.positional_deletes {
Some(ArrowReader::build_deletes_row_selection(
record_batch_stream_builder.metadata().row_groups(),
&None,
&positional_delete_indexes.lock().unwrap(),
)?)
} else {
None
};

if let Some(row_selection) = row_selection {
record_batch_stream_builder = record_batch_stream_builder.with_row_selection(row_selection);
}

// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let record_batch_stream = record_batch_stream_builder
.build()?
.map(move |batch| match batch {
Ok(batch) => record_batch_transformer.process_record_batch(batch),
Err(err) => Err(err.into()),
});

Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
.await
}

/// Helper function to create a RecordBatch from a chunk of position values.
Expand Down
Loading
Loading