Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 89 additions & 38 deletions arrow-avro/src/reader/async_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,18 @@ impl<R> ReaderBuilder<R> {
}
}

impl<R: AsyncFileReader> ReaderBuilder<R> {
async fn read_header(&mut self) -> Result<(Header, u64), AvroError> {
impl<R> ReaderBuilder<R>
where
R: AsyncFileReader,
{
/// Reads the Avro file header asynchronously to extract metadata.
///
/// The returned builder contains the parsed header information.
pub async fn read_header(mut self) -> Result<ReaderBuilderWithHeaderInfo<R>, AvroError> {
if self.file_size == 0 {
return Err(AvroError::InvalidArgument("File size cannot be 0".into()));
}

let mut decoder = HeaderDecoder::default();
let mut position = 0;
loop {
Expand Down Expand Up @@ -147,42 +157,83 @@ impl<R: AsyncFileReader> ReaderBuilder<R> {
position += read as u64;
}

decoder
.flush()
.map(|header| (header, position))
.ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro header".into()))
let header = decoder.flush().ok_or_else(|| {
AvroError::ParseError("Unexpected EOF while reading Avro header".into())
})?;
Ok(ReaderBuilderWithHeaderInfo::new(self, header, position))
}

/// Build the asynchronous Avro reader with the provided parameters.
/// This reads the header first to initialize the reader state.
pub async fn try_build(mut self) -> Result<AsyncAvroFileReader<R>, AvroError> {
if self.file_size == 0 {
return Err(AvroError::InvalidArgument("File size cannot be 0".into()));
}

pub async fn try_build(self) -> Result<AsyncAvroFileReader<R>, AvroError> {
// Start by reading the header from the beginning of the avro file
// take the writer schema from the header
let (header, header_len) = self.read_header().await?;
let writer_schema = {
let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
AvroError::ParseError("No Avro schema present in file header".to_string())
})?;
let json_string = std::str::from_utf8(raw)
.map_err(|e| {
AvroError::ParseError(format!("Invalid UTF-8 in Avro schema header: {e}"))
})?
.to_string();
AvroSchema::new(json_string)
};
let builder_with_header = self.read_header().await?;
builder_with_header.try_build().await
}
}

/// Intermediate builder struct that holds the writer schema and header length
/// parsed from the file.
pub struct ReaderBuilderWithHeaderInfo<R> {
inner: ReaderBuilder<R>,
header: Header,
header_len: u64,
}

impl<R> ReaderBuilderWithHeaderInfo<R> {
fn new(inner: ReaderBuilder<R>, header: Header, header_len: u64) -> Self {
Self {
inner,
header,
header_len,
}
}

/// Returns the writer schema parsed from the Avro file header.
pub fn writer_schema(&self) -> Result<AvroSchema, AvroError> {
let raw = self.header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
AvroError::ParseError("No Avro schema present in file header".to_string())
})?;
let json_string = std::str::from_utf8(raw)
.map_err(|e| {
AvroError::ParseError(format!("Invalid UTF-8 in Avro schema header: {e}"))
})?
.to_string();
Ok(AvroSchema::new(json_string))
}

/// Sets the reader schema used during decoding.
///
/// If not provided, the writer schema from the OCF header is used directly.
///
/// A reader schema can be used for schema evolution or projection.
pub fn with_reader_schema(mut self, reader_schema: AvroSchema) -> Self {
self.inner = self.inner.with_reader_schema(reader_schema);
self
}

/// Specify a projection of column indices to read from the Avro file.
/// This can help optimize reading by only fetching the necessary columns.
pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
self.inner = self.inner.with_projection(projection);
self
}

/// Build the asynchronous Avro reader with the provided parameters.
pub async fn try_build(self) -> Result<AsyncAvroFileReader<R>, AvroError> {
// Take the writer schema from the header
let writer_schema = self.writer_schema()?;

// If projection exists, project the reader schema,
// if no reader schema is provided, parse it from the header(get the raw writer schema), and project that
// if no reader schema is provided, project the writer schema.
// this projected schema will be the schema used for reading.
let projected_reader_schema = self
.inner
.projection
.as_deref()
.map(|projection| {
let base_schema = if let Some(reader_schema) = &self.reader_schema {
let base_schema = if let Some(reader_schema) = &self.inner.reader_schema {
reader_schema
} else {
&writer_schema
Expand All @@ -195,7 +246,7 @@ impl<R: AsyncFileReader> ReaderBuilder<R> {
// (both optional, at worst no reader schema is provided, in which case we read with the writer schema)
let effective_reader_schema = projected_reader_schema
.as_ref()
.or(self.reader_schema.as_ref())
.or(self.inner.reader_schema.as_ref())
.map(|s| s.schema())
.transpose()?;

Expand All @@ -206,47 +257,47 @@ impl<R: AsyncFileReader> ReaderBuilder<R> {
builder = builder.with_reader_schema(reader_schema);
}
builder
.with_utf8view(self.utf8_view)
.with_strict_mode(self.strict_mode)
.with_utf8view(self.inner.utf8_view)
.with_strict_mode(self.inner.strict_mode)
.build()
}?;

let record_decoder = RecordDecoder::try_new_with_options(root.data_type())?;
let decoder = Decoder::from_parts(
self.batch_size,
self.inner.batch_size,
record_decoder,
None,
IndexMap::new(),
FingerprintAlgorithm::Rabin,
);
let range = match self.range {
let range = match self.inner.range {
Some(r) => {
// If this PartitionedFile's range starts at 0, we need to skip the header bytes.
// But then we need to seek back 16 bytes to include the sync marker for the first block,
// as the logic in this reader searches the data for the first sync marker(after which a block starts),
// then reads blocks from the count, size etc.
let start = r.start.max(header_len.checked_sub(16).ok_or(AvroError::ParseError("Avro header length overflow, header was not long enough to contain avro bytes".to_string()))?);
let end = r.end.max(start).min(self.file_size); // Ensure end is not less than start, worst case range is empty
let start = r.start.max(self.header_len.checked_sub(16).ok_or(AvroError::ParseError("Avro header length overflow, header was not long enough to contain avro bytes".to_string()))?);
let end = r.end.max(start).min(self.inner.file_size); // Ensure end is not less than start, worst case range is empty
start..end
}
None => 0..self.file_size,
None => 0..self.inner.file_size,
};

// Determine if there is actually data to fetch, note that we subtract the header len from range.start,
// so we need to check if range.end == header_len to see if there's no data after the header
let reader_state = if range.start == range.end || header_len == range.end {
let reader_state = if range.start == range.end || self.header_len == range.end {
ReaderState::Finished
} else {
ReaderState::Idle {
reader: self.reader,
reader: self.inner.reader,
}
};
let codec = header.compression()?;
let sync_marker = header.sync();
let codec = self.header.compression()?;
let sync_marker = self.header.sync();

Ok(AsyncAvroFileReader::new(
range,
self.file_size,
self.inner.file_size,
decoder,
codec,
sync_marker,
Expand Down
Loading