Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 137 additions & 14 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ use tokio::sync::mpsc::{self, unbounded_channel};

use lance_core::error::LanceOptionExt;
use lance_core::{ArrowResult, Error, Result};
use std::fmt;
use tracing::instrument;

use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
Expand All @@ -257,6 +258,49 @@ use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
use crate::version::LanceFileVersion;
use crate::{BufferScheduler, EncodingsIo};

/// An error wrapper that adds field context (name and id) to decoding errors.
///
/// When decoding fails, the underlying error often lacks information about which field
/// was being decoded. This wrapper captures that context so error messages look like:
///
/// ```text
/// failed to decode field 'age' (id=10)
///
/// Caused by:
/// number out of range
/// ```
#[derive(Debug)]
pub struct DecodeFieldError {
/// The name of the field being decoded
pub field_name: String,
/// The Lance field id
pub field_id: i32,
/// The underlying error that occurred during decoding
pub source: Error,
}

impl fmt::Display for DecodeFieldError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"failed to decode field '{}' (id={})",
self.field_name, self.field_id
)
}
}

impl std::error::Error for DecodeFieldError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.source)
}
}

impl From<DecodeFieldError> for Error {
fn from(err: DecodeFieldError) -> Self {
Self::wrapped(Box::new(err))
}
}

// If users are getting batches over 10MiB large then it's time to reduce the batch size
const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str =
Expand Down Expand Up @@ -630,8 +674,14 @@ impl CoreFieldDecoderStrategy {
file_buffers: buffers,
positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
};
let items_scheduler =
self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
let child = &list_field.children[0];
let items_scheduler = self
.create_legacy_field_scheduler(child, column_infos, buffers)
.map_err(|source| DecodeFieldError {
field_name: child.name.clone(),
field_id: child.id,
source,
})?;

let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
.page_infos
Expand Down Expand Up @@ -762,9 +812,14 @@ impl CoreFieldDecoderStrategy {
}

let mut child_schedulers = Vec::with_capacity(field.children.len());
for field in field.children.iter() {
let field_scheduler =
self.create_structural_field_scheduler(field, column_infos)?;
for child_field in field.children.iter() {
let field_scheduler = self
.create_structural_field_scheduler(child_field, column_infos)
.map_err(|source| DecodeFieldError {
field_name: child_field.name.clone(),
field_id: child_field.id,
source,
})?;
child_schedulers.push(field_scheduler);
}

Expand All @@ -776,17 +831,27 @@ impl CoreFieldDecoderStrategy {
}
DataType::List(_) | DataType::LargeList(_) => {
let child = field.children.first().expect_ok()?;
let child_scheduler =
self.create_structural_field_scheduler(child, column_infos)?;
let child_scheduler = self
.create_structural_field_scheduler(child, column_infos)
.map_err(|source| DecodeFieldError {
field_name: child.name.clone(),
field_id: child.id,
source,
})?;
Ok(Box::new(StructuralListScheduler::new(child_scheduler))
as Box<dyn StructuralFieldScheduler>)
}
DataType::FixedSizeList(inner, dimension)
if matches!(inner.data_type(), DataType::Struct(_)) =>
{
let child = field.children.first().expect_ok()?;
let child_scheduler =
self.create_structural_field_scheduler(child, column_infos)?;
let child_scheduler = self
.create_structural_field_scheduler(child, column_infos)
.map_err(|source| DecodeFieldError {
field_name: child.name.clone(),
field_id: child.id,
source,
})?;
Ok(Box::new(StructuralFixedSizeListScheduler::new(
child_scheduler,
*dimension,
Expand All @@ -800,8 +865,13 @@ impl CoreFieldDecoderStrategy {
return Err(Error::not_supported_source(format!("Map data type is not supported with keys_sorted=true now, current value is {}", *keys_sorted).into()));
}
let entries_child = field.children.first().expect_ok()?;
let child_scheduler =
self.create_structural_field_scheduler(entries_child, column_infos)?;
let child_scheduler = self
.create_structural_field_scheduler(entries_child, column_infos)
.map_err(|source| DecodeFieldError {
field_name: entries_child.name.clone(),
field_id: entries_child.id,
source,
})?;
Ok(Box::new(StructuralMapScheduler::new(child_scheduler))
as Box<dyn StructuralFieldScheduler>)
}
Expand Down Expand Up @@ -926,10 +996,15 @@ impl CoreFieldDecoderStrategy {
.map(|page| page.num_rows)
.sum();
let mut child_schedulers = Vec::with_capacity(field.children.len());
for field in &field.children {
for child_field in &field.children {
column_infos.next_top_level();
let field_scheduler =
self.create_legacy_field_scheduler(field, column_infos, buffers)?;
let field_scheduler = self
.create_legacy_field_scheduler(child_field, column_infos, buffers)
.map_err(|source| DecodeFieldError {
field_name: child_field.name.clone(),
field_id: child_field.id,
source,
})?;
child_schedulers.push(Arc::from(field_scheduler));
}

Expand Down Expand Up @@ -2743,4 +2818,52 @@ mod tests {
let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
}

#[test]
fn test_decode_field_error_display() {
let source = Error::invalid_input("number out of range");
let err = DecodeFieldError {
field_name: "age".to_string(),
field_id: 10,
source,
};
assert_eq!(err.to_string(), "failed to decode field 'age' (id=10)");
}

#[test]
fn test_decode_field_error_source_chain() {
let inner = Error::invalid_input("value exceeds maximum");
let err = DecodeFieldError {
field_name: "score".to_string(),
field_id: 5,
source: inner,
};

// Verify the source chain is preserved
let source = std::error::Error::source(&err);
assert!(source.is_some());
let source_msg = source.unwrap().to_string();
assert!(
source_msg.contains("value exceeds maximum"),
"expected source to contain 'value exceeds maximum', got: {}",
source_msg
);
}

#[test]
fn test_decode_field_error_converts_to_lance_error() {
let inner = Error::invalid_input("bad data");
let field_err = DecodeFieldError {
field_name: "name".to_string(),
field_id: 3,
source: inner,
};
let lance_err: Error = field_err.into();
let msg = lance_err.to_string();
assert!(
msg.contains("failed to decode field 'name' (id=3)"),
"expected error to contain field context, got: {}",
msg
);
}
}
Loading