From a5d509376084ab73fb93c8f31366a84058474739 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 3 Apr 2026 15:49:10 -0400 Subject: [PATCH] feat(arrow/array): add Validate/ValidateFull to binary and string arrays Adds Validate() and ValidateFull() methods to Binary, LargeBinary, String, and LargeString array types, plus top-level Validate/ValidateFull dispatch functions and ValidateRecord/ValidateRecordFull convenience helpers. The existing setData check only verifies the last offset is within the data buffer. Subtly corrupted data (e.g. non-monotonic or negative intermediate offsets) passes construction but causes runtime panics when Value() is called later, after the IPC reader's recover() scope has ended. ValidateFull() catches these cases by checking all offsets are non-negative and monotonically non-decreasing. This allows users receiving data from untrusted sources such as Flight SQL servers to detect and skip corrupted batches rather than crashing. Fixes #691 --- arrow/array/binary.go | 98 ++++++++++++++++ arrow/array/string.go | 98 ++++++++++++++++ arrow/array/validate.go | 82 ++++++++++++++ arrow/array/validate_test.go | 211 +++++++++++++++++++++++++++++++++++ 4 files changed, 489 insertions(+) create mode 100644 arrow/array/validate.go create mode 100644 arrow/array/validate_test.go diff --git a/arrow/array/binary.go b/arrow/array/binary.go index 942fe3071..df1b6108f 100644 --- a/arrow/array/binary.go +++ b/arrow/array/binary.go @@ -169,6 +169,55 @@ func (a *Binary) MarshalJSON() ([]byte, error) { return json.Marshal(vals) } +// Validate performs a basic, O(1) consistency check on the array data. +// It returns an error if: +// - The offset buffer is too small for the array length and offset +// - The last offset exceeds the data buffer length +// +// This is useful for detecting corrupted data from untrusted sources (e.g. +// Arrow Flight / Flight SQL servers) before accessing values, which may +// otherwise cause a runtime panic. +func (a *Binary) Validate() error { + if a.data.length == 0 { + return nil + } + if a.data.buffers[1] == nil { + return fmt.Errorf("arrow/array: non-empty binary array has no offsets buffer") + } + expNumOffsets := a.data.offset + a.data.length + 1 + if len(a.valueOffsets) < expNumOffsets { + return fmt.Errorf("arrow/array: binary offset buffer must have at least %d values, got %d", expNumOffsets, len(a.valueOffsets)) + } + lastOffset := int(a.valueOffsets[expNumOffsets-1]) + if lastOffset > len(a.valueBytes) { + return fmt.Errorf("arrow/array: binary offset %d out of bounds of data buffer (length %d)", lastOffset, len(a.valueBytes)) + } + return nil +} + +// ValidateFull performs a full O(n) consistency check on the array data. +// In addition to the checks performed by Validate, it also verifies that +// all offsets are non-negative and monotonically non-decreasing. +func (a *Binary) ValidateFull() error { + if err := a.Validate(); err != nil { + return err + } + if a.data.length == 0 { + return nil + } + offsets := a.valueOffsets[a.data.offset : a.data.offset+a.data.length+1] + if offsets[0] < 0 { + return fmt.Errorf("arrow/array: binary offset at index %d is negative: %d", a.data.offset, offsets[0]) + } + for i := 1; i < len(offsets); i++ { + if offsets[i] < offsets[i-1] { + return fmt.Errorf("arrow/array: binary offsets are not monotonically non-decreasing at index %d: %d < %d", + a.data.offset+i, offsets[i], offsets[i-1]) + } + } + return nil +} + func arrayEqualBinary(left, right *Binary) bool { for i := 0; i < left.Len(); i++ { if left.IsNull(i) { @@ -309,6 +358,55 @@ func (a *LargeBinary) MarshalJSON() ([]byte, error) { return json.Marshal(vals) } +// Validate performs a basic, O(1) consistency check on the array data. +// It returns an error if: +// - The offset buffer is too small for the array length and offset +// - The last offset exceeds the data buffer length +// +// This is useful for detecting corrupted data from untrusted sources (e.g. +// Arrow Flight / Flight SQL servers) before accessing values, which may +// otherwise cause a runtime panic. +func (a *LargeBinary) Validate() error { + if a.data.length == 0 { + return nil + } + if a.data.buffers[1] == nil { + return fmt.Errorf("arrow/array: non-empty large binary array has no offsets buffer") + } + expNumOffsets := a.data.offset + a.data.length + 1 + if len(a.valueOffsets) < expNumOffsets { + return fmt.Errorf("arrow/array: large binary offset buffer must have at least %d values, got %d", expNumOffsets, len(a.valueOffsets)) + } + lastOffset := int(a.valueOffsets[expNumOffsets-1]) + if lastOffset > len(a.valueBytes) { + return fmt.Errorf("arrow/array: large binary offset %d out of bounds of data buffer (length %d)", lastOffset, len(a.valueBytes)) + } + return nil +} + +// ValidateFull performs a full O(n) consistency check on the array data. +// In addition to the checks performed by Validate, it also verifies that +// all offsets are non-negative and monotonically non-decreasing. +func (a *LargeBinary) ValidateFull() error { + if err := a.Validate(); err != nil { + return err + } + if a.data.length == 0 { + return nil + } + offsets := a.valueOffsets[a.data.offset : a.data.offset+a.data.length+1] + if offsets[0] < 0 { + return fmt.Errorf("arrow/array: large binary offset at index %d is negative: %d", a.data.offset, offsets[0]) + } + for i := 1; i < len(offsets); i++ { + if offsets[i] < offsets[i-1] { + return fmt.Errorf("arrow/array: large binary offsets are not monotonically non-decreasing at index %d: %d < %d", + a.data.offset+i, offsets[i], offsets[i-1]) + } + } + return nil +} + func arrayEqualLargeBinary(left, right *LargeBinary) bool { for i := 0; i < left.Len(); i++ { if left.IsNull(i) { diff --git a/arrow/array/string.go b/arrow/array/string.go index dd2e2d205..67bb8ba6d 100644 --- a/arrow/array/string.go +++ b/arrow/array/string.go @@ -169,6 +169,55 @@ func (a *String) MarshalJSON() ([]byte, error) { return json.Marshal(vals) } +// Validate performs a basic, O(1) consistency check on the array data. +// It returns an error if: +// - The offset buffer is too small for the array length and offset +// - The last offset exceeds the data buffer length +// +// This is useful for detecting corrupted data from untrusted sources (e.g. +// Arrow Flight / Flight SQL servers) before accessing values, which may +// otherwise cause a runtime panic. +func (a *String) Validate() error { + if a.data.length == 0 { + return nil + } + if a.data.buffers[1] == nil { + return fmt.Errorf("arrow/array: non-empty string array has no offsets buffer") + } + expNumOffsets := a.data.offset + a.data.length + 1 + if len(a.offsets) < expNumOffsets { + return fmt.Errorf("arrow/array: string offset buffer must have at least %d values, got %d", expNumOffsets, len(a.offsets)) + } + lastOffset := int(a.offsets[expNumOffsets-1]) + if lastOffset > len(a.values) { + return fmt.Errorf("arrow/array: string offset %d out of bounds of data buffer (length %d)", lastOffset, len(a.values)) + } + return nil +} + +// ValidateFull performs a full O(n) consistency check on the array data. +// In addition to the checks performed by Validate, it also verifies that +// all offsets are non-negative and monotonically non-decreasing. +func (a *String) ValidateFull() error { + if err := a.Validate(); err != nil { + return err + } + if a.data.length == 0 { + return nil + } + offsets := a.offsets[a.data.offset : a.data.offset+a.data.length+1] + if offsets[0] < 0 { + return fmt.Errorf("arrow/array: string offset at index %d is negative: %d", a.data.offset, offsets[0]) + } + for i := 1; i < len(offsets); i++ { + if offsets[i] < offsets[i-1] { + return fmt.Errorf("arrow/array: string offsets are not monotonically non-decreasing at index %d: %d < %d", + a.data.offset+i, offsets[i], offsets[i-1]) + } + } + return nil +} + func arrayEqualString(left, right *String) bool { for i := 0; i < left.Len(); i++ { if left.IsNull(i) { @@ -312,6 +361,55 @@ func (a *LargeString) MarshalJSON() ([]byte, error) { return json.Marshal(vals) } +// Validate performs a basic, O(1) consistency check on the array data. +// It returns an error if: +// - The offset buffer is too small for the array length and offset +// - The last offset exceeds the data buffer length +// +// This is useful for detecting corrupted data from untrusted sources (e.g. +// Arrow Flight / Flight SQL servers) before accessing values, which may +// otherwise cause a runtime panic. +func (a *LargeString) Validate() error { + if a.data.length == 0 { + return nil + } + if a.data.buffers[1] == nil { + return fmt.Errorf("arrow/array: non-empty large string array has no offsets buffer") + } + expNumOffsets := a.data.offset + a.data.length + 1 + if len(a.offsets) < expNumOffsets { + return fmt.Errorf("arrow/array: large string offset buffer must have at least %d values, got %d", expNumOffsets, len(a.offsets)) + } + lastOffset := int(a.offsets[expNumOffsets-1]) + if lastOffset > len(a.values) { + return fmt.Errorf("arrow/array: large string offset %d out of bounds of data buffer (length %d)", lastOffset, len(a.values)) + } + return nil +} + +// ValidateFull performs a full O(n) consistency check on the array data. +// In addition to the checks performed by Validate, it also verifies that +// all offsets are non-negative and monotonically non-decreasing. +func (a *LargeString) ValidateFull() error { + if err := a.Validate(); err != nil { + return err + } + if a.data.length == 0 { + return nil + } + offsets := a.offsets[a.data.offset : a.data.offset+a.data.length+1] + if offsets[0] < 0 { + return fmt.Errorf("arrow/array: large string offset at index %d is negative: %d", a.data.offset, offsets[0]) + } + for i := 1; i < len(offsets); i++ { + if offsets[i] < offsets[i-1] { + return fmt.Errorf("arrow/array: large string offsets are not monotonically non-decreasing at index %d: %d < %d", + a.data.offset+i, offsets[i], offsets[i-1]) + } + } + return nil +} + func arrayEqualLargeString(left, right *LargeString) bool { for i := 0; i < left.Len(); i++ { if left.IsNull(i) { diff --git a/arrow/array/validate.go b/arrow/array/validate.go new file mode 100644 index 000000000..70b7f6690 --- /dev/null +++ b/arrow/array/validate.go @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package array + +import ( + "fmt" + + "github.com/apache/arrow-go/v18/arrow" +) + +// Validator is implemented by array types that can validate their internal +// consistency. See Validate and ValidateFull for top-level dispatch. +type Validator interface { + arrow.Array + // Validate performs a basic O(1) consistency check. + Validate() error + // ValidateFull performs a thorough O(n) consistency check. + ValidateFull() error +} + +// Validate performs a basic O(1) consistency check on arr, returning an error +// if the array's internal buffers are inconsistent. For array types that do not +// implement Validator, nil is returned. +// +// Use this to detect corrupted data from untrusted sources such as Arrow Flight +// or Flight SQL servers before accessing values, which may otherwise panic. +func Validate(arr arrow.Array) error { + if v, ok := arr.(Validator); ok { + return v.Validate() + } + return nil +} + +// ValidateFull performs a thorough O(n) consistency check on arr, returning an +// error if the array's internal buffers are inconsistent. For array types that +// do not implement Validator, nil is returned. +// +// Unlike Validate, this checks every element and is therefore O(n). Use this +// when receiving data from untrusted sources where subtle corruption (e.g. +// non-monotonic offsets) may not be detected by Validate alone. +func ValidateFull(arr arrow.Array) error { + if v, ok := arr.(Validator); ok { + return v.ValidateFull() + } + return nil +} + +// ValidateRecord validates each column in rec using Validate, returning the +// first error encountered. The error includes the column index and field name. +func ValidateRecord(rec arrow.RecordBatch) error { + for i := int64(0); i < rec.NumCols(); i++ { + if err := Validate(rec.Column(int(i))); err != nil { + return fmt.Errorf("column %d (%s): %w", i, rec.Schema().Field(int(i)).Name, err) + } + } + return nil +} + +// ValidateRecordFull validates each column in rec using ValidateFull, returning +// the first error encountered. The error includes the column index and field name. +func ValidateRecordFull(rec arrow.RecordBatch) error { + for i := int64(0); i < rec.NumCols(); i++ { + if err := ValidateFull(rec.Column(int(i))); err != nil { + return fmt.Errorf("column %d (%s): %w", i, rec.Schema().Field(int(i)).Name, err) + } + } + return nil +} diff --git a/arrow/array/validate_test.go b/arrow/array/validate_test.go new file mode 100644 index 000000000..5d45ce50c --- /dev/null +++ b/arrow/array/validate_test.go @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package array + +import ( + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// makeBinaryArrayRaw creates a Binary array directly from raw buffers, +// bypassing builder validation. Used to simulate corrupted IPC data. +func makeBinaryArrayRaw(t *testing.T, offsets []int32, data []byte, length, offset int) *Binary { + t.Helper() + offsetBuf := memory.NewBufferBytes(arrow.Int32Traits.CastToBytes(offsets)) + dataBuf := memory.NewBufferBytes(data) + d := NewData(arrow.BinaryTypes.Binary, length, []*memory.Buffer{nil, offsetBuf, dataBuf}, nil, 0, offset) + return NewBinaryData(d) +} + +// makeLargeBinaryArrayRaw creates a LargeBinary array directly from raw buffers. +func makeLargeBinaryArrayRaw(t *testing.T, offsets []int64, data []byte, length, offset int) *LargeBinary { + t.Helper() + offsetBuf := memory.NewBufferBytes(arrow.Int64Traits.CastToBytes(offsets)) + dataBuf := memory.NewBufferBytes(data) + d := NewData(arrow.BinaryTypes.LargeBinary, length, []*memory.Buffer{nil, offsetBuf, dataBuf}, nil, 0, offset) + return NewLargeBinaryData(d) +} + +// makeStringArrayRaw creates a String array directly from raw buffers. +func makeStringArrayRaw(t *testing.T, offsets []int32, data string, length, offset int) *String { + t.Helper() + offsetBuf := memory.NewBufferBytes(arrow.Int32Traits.CastToBytes(offsets)) + dataBuf := memory.NewBufferBytes([]byte(data)) + d := NewData(arrow.BinaryTypes.String, length, []*memory.Buffer{nil, offsetBuf, dataBuf}, nil, 0, offset) + return NewStringData(d) +} + +// makeLargeStringArrayRaw creates a LargeString array directly from raw buffers. +func makeLargeStringArrayRaw(t *testing.T, offsets []int64, data string, length, offset int) *LargeString { + t.Helper() + offsetBuf := memory.NewBufferBytes(arrow.Int64Traits.CastToBytes(offsets)) + dataBuf := memory.NewBufferBytes([]byte(data)) + d := NewData(arrow.BinaryTypes.LargeString, length, []*memory.Buffer{nil, offsetBuf, dataBuf}, nil, 0, offset) + return NewLargeStringData(d) +} + +func TestBinaryValidate(t *testing.T) { + t.Run("valid array passes", func(t *testing.T) { + // offsets [0,3,6,9], data "abcdefghi" — 3 elements of 3 bytes each + arr := makeBinaryArrayRaw(t, []int32{0, 3, 6, 9}, []byte("abcdefghi"), 3, 0) + assert.NoError(t, arr.Validate()) + assert.NoError(t, arr.ValidateFull()) + }) + + t.Run("valid sliced array passes", func(t *testing.T) { + arr := makeBinaryArrayRaw(t, []int32{0, 3, 6, 9}, []byte("abcdefghi"), 1, 1) + assert.NoError(t, arr.Validate()) + assert.NoError(t, arr.ValidateFull()) + }) + + t.Run("empty array passes", func(t *testing.T) { + arr := makeBinaryArrayRaw(t, nil, nil, 0, 0) + assert.NoError(t, arr.Validate()) + assert.NoError(t, arr.ValidateFull()) + }) + + t.Run("non-monotonic offsets pass Validate but fail ValidateFull", func(t *testing.T) { + // last offset (5) is within data bounds so setData/Validate pass, + // but offset[1]=5 then offset[2]=3 is decreasing — ValidateFull must catch this. + arr := makeBinaryArrayRaw(t, []int32{0, 5, 3, 5}, []byte("hello"), 3, 0) + assert.NoError(t, arr.Validate()) + err := arr.ValidateFull() + require.Error(t, err) + assert.Contains(t, err.Error(), "not monotonically non-decreasing") + }) + + t.Run("negative first offset passes Validate but fails ValidateFull", func(t *testing.T) { + // last offset (5) is within bounds, but first offset is negative. + arr := makeBinaryArrayRaw(t, []int32{-1, 2, 5}, []byte("hello"), 2, 0) + assert.NoError(t, arr.Validate()) + err := arr.ValidateFull() + require.Error(t, err) + assert.Contains(t, err.Error(), "negative") + }) +} + +func TestLargeBinaryValidate(t *testing.T) { + t.Run("valid array passes", func(t *testing.T) { + arr := makeLargeBinaryArrayRaw(t, []int64{0, 3, 6, 9}, []byte("abcdefghi"), 3, 0) + assert.NoError(t, arr.Validate()) + assert.NoError(t, arr.ValidateFull()) + }) + + t.Run("non-monotonic offsets pass Validate but fail ValidateFull", func(t *testing.T) { + arr := makeLargeBinaryArrayRaw(t, []int64{0, 5, 3, 5}, []byte("hello"), 3, 0) + assert.NoError(t, arr.Validate()) + err := arr.ValidateFull() + require.Error(t, err) + assert.Contains(t, err.Error(), "not monotonically non-decreasing") + }) + + t.Run("negative first offset passes Validate but fails ValidateFull", func(t *testing.T) { + arr := makeLargeBinaryArrayRaw(t, []int64{-1, 2, 5}, []byte("hello"), 2, 0) + assert.NoError(t, arr.Validate()) + err := arr.ValidateFull() + require.Error(t, err) + assert.Contains(t, err.Error(), "negative") + }) +} + +func TestStringValidate(t *testing.T) { + t.Run("valid array passes", func(t *testing.T) { + arr := makeStringArrayRaw(t, []int32{0, 3, 5, 10}, "abcdeabcde", 3, 0) + assert.NoError(t, arr.Validate()) + assert.NoError(t, arr.ValidateFull()) + }) + + t.Run("non-monotonic offsets pass Validate but fail ValidateFull", func(t *testing.T) { + arr := makeStringArrayRaw(t, []int32{0, 5, 3, 5}, "hello", 3, 0) + assert.NoError(t, arr.Validate()) + err := arr.ValidateFull() + require.Error(t, err) + assert.Contains(t, err.Error(), "not monotonically non-decreasing") + }) + + t.Run("negative first offset passes Validate but fails ValidateFull", func(t *testing.T) { + arr := makeStringArrayRaw(t, []int32{-1, 2, 5}, "hello", 2, 0) + assert.NoError(t, arr.Validate()) + err := arr.ValidateFull() + require.Error(t, err) + assert.Contains(t, err.Error(), "negative") + }) +} + +func TestLargeStringValidate(t *testing.T) { + t.Run("valid array passes", func(t *testing.T) { + arr := makeLargeStringArrayRaw(t, []int64{0, 3, 5, 10}, "abcdeabcde", 3, 0) + assert.NoError(t, arr.Validate()) + assert.NoError(t, arr.ValidateFull()) + }) + + t.Run("non-monotonic offsets pass Validate but fail ValidateFull", func(t *testing.T) { + arr := makeLargeStringArrayRaw(t, []int64{0, 5, 3, 5}, "hello", 3, 0) + assert.NoError(t, arr.Validate()) + err := arr.ValidateFull() + require.Error(t, err) + assert.Contains(t, err.Error(), "not monotonically non-decreasing") + }) + + t.Run("negative first offset passes Validate but fails ValidateFull", func(t *testing.T) { + arr := makeLargeStringArrayRaw(t, []int64{-1, 2, 5}, "hello", 2, 0) + assert.NoError(t, arr.Validate()) + err := arr.ValidateFull() + require.Error(t, err) + assert.Contains(t, err.Error(), "negative") + }) +} + +func TestTopLevelValidate(t *testing.T) { + t.Run("Validate dispatches to Validator", func(t *testing.T) { + // non-monotonic string array: passes setData but ValidateFull must fail + arr := makeStringArrayRaw(t, []int32{0, 5, 3, 5}, "hello", 3, 0) + assert.NoError(t, Validate(arr)) + require.Error(t, ValidateFull(arr)) + }) + + t.Run("Validate returns nil for non-Validator types", func(t *testing.T) { + // Bool arrays don't implement Validator — should return nil + bldr := NewBooleanBuilder(memory.NewGoAllocator()) + bldr.AppendValues([]bool{true, false}, nil) + arr := bldr.NewBooleanArray() + defer arr.Release() + assert.NoError(t, Validate(arr)) + assert.NoError(t, ValidateFull(arr)) + }) + + t.Run("ValidateRecord validates all columns", func(t *testing.T) { + validArr := makeStringArrayRaw(t, []int32{0, 3, 6}, "abcdef", 2, 0) + corruptArr := makeStringArrayRaw(t, []int32{0, 5, 3, 5}, "hello", 3, 0) + + schema := arrow.NewSchema([]arrow.Field{ + {Name: "ok", Type: arrow.BinaryTypes.String}, + {Name: "bad", Type: arrow.BinaryTypes.String}, + }, nil) + rec := NewRecordBatch(schema, []arrow.Array{validArr, corruptArr}, 2) + defer rec.Release() + + assert.NoError(t, ValidateRecord(rec)) + err := ValidateRecordFull(rec) + require.Error(t, err) + assert.Contains(t, err.Error(), "column 1 (bad)") + }) +}