Skip to content

ByteArrayColumnChunkWriter.writeValues drops values preceding large entries #756

@kli19

Description

@kli19

Summary

The writeValues method in parquet/file/column_writer_types.gen.go.tmpl (starting in v18.5.2) silently drops small ByteArray values that precede a ≥1MB value within the same write batch. The page header retains the full value count from def/rep levels, but the encoded data stream is short, producing corrupt parquet files.

Impact
We write BSON documents to parquet using ByteArrayColumnChunkWriter.WriteBatch. After upgrading from v18.5.1 to v18.5.2, round-trip validation began failing. Readers (including ours, which is independent of arrow-go) misinterpret the truncated data stream. For example, a 65-byte string was read back as 2,483,541 bytes because the reader consumed bytes from subsequent values as a length prefix. Reverting to v18.5.1 resolved the issue.

Root Cause
Relevant file: parquet/file/column_writer_types.gen.go.tmpl

v18.5.1 had this one-liner:

  func (w *ByteArrayColumnChunkWriter) writeValues(values []parquet.ByteArray, numNulls int64) {
      w.currentEncoder.(encoding.ByteArrayEncoder).Put(values)  // all values, one call
      ...
  }

v18.5.2 replaced it with a loop that has two safety checks: a 1GB overflow check and a 1MB large-value check. The 1GB check is correct. The 1MB check has a bug. Here's the v18.5.2 code with annotations:

  batchStart := 0
  for i := 0; i < len(values); i++ {
      valueSize := int64(len(values[i]))
 
      // 1GB CHECK — correctly flushes pending small values first
      if currentSize+valueSize >= maxSafeBufferSize {
          if i > batchStart {
              encoder.Put(values[batchStart:i])  // correctly encodes pending batch
          }
          w.FlushCurrentPage()
          batchStart = i
          currentSize = 0
      }
 
      currentSize += valueSize + 4
 
      // 1MB CHECK — BUG: does NOT flush pending small values
      if valueSize >= largeValueThreshold {
          // MISSING: encoder.Put(values[batchStart:i])
          encoder.Put(values[i : i+1])  // only encodes the large value
          batchStart = i + 1            // skips everything from batchStart to i-1
          currentSize = w.currentEncoder.EstimatedDataEncodedSize()
      }
  }
 
  // encode whatever's left
  if batchStart < len(values) {
      encoder.Put(values[batchStart:])
  }

Say we have 5 values: [10B, 20B, 30B, 2MB, 50B] and currentSize starts at 0.

Iteration Breakdown:
i = 0: valueSize = 10
    1GB check: 0 + 10 < 1GB --> skip
    Current Size: 14 (10 + 4)
    1MB check: 10 < 1MB --> skip

i = 1: valueSize = 20
    1GB check: 14 + 20 < 1GB --> skip
    Current Size: 38 (14 + 24)
    1MB check: 20 < 1MB --> skip

i = 2: valueSize = 30
    1GB check: 38 + 30 < 1GB --> skip
    Current Size: 72 (38 + 34)
    1MB check: 30 < 1MB --> skip

i = 3: valueSize = 2MB
    1GB check: 72 + 2MB < 1GB --> skip
        --> THIS IS WHERE BUG HAPPENS
    Current Size: 2MB + 76
    1MB check: 2MB >= 1MB --> ENTER
        Action: encoder.Put(values[3:4]) --> Encodes only the 2MB value
        Batch Start: 4
        Current Size: encoder.EstimatedDataEncodedSize()

i = 4: valueSize = 50
    1GB check: skip
    Current Size: currentSize += 54
    1MB check: skip

End Condition:
    Final Check: batchStart = 4 < 5
        --> Action: encoder.Put(values[4:5])
    Key Values Not Encoded: values[0], [1], [2]

Suggested Fix
Flush the pending batch in the largeValueThreshold branch, matching what the maxSafeBufferSize branch already does:

  if valueSize >= largeValueThreshold {                                                                                                                                                                             
      if i > batchStart {
          encoder.Put(values[batchStart:i])
      }
      encoder.Put(values[i : i+1])
      batchStart = i + 1                                                                                                                                                                                            
      currentSize = w.currentEncoder.EstimatedDataEncodedSize()
  } 

Component(s)

Parquet

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type: bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions