From e65212bc0e2e34a046476a9f013bfd4c5caec56e Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Tue, 7 Apr 2026 03:29:18 +0100 Subject: [PATCH] [client] Fix corruption in zero-copy lazy parse ByteBuf --- .../fluss/record/LogRecordReadContext.java | 18 +++++--- .../org/apache/fluss/row/InternalArray.java | 38 ++++++++-------- .../org/apache/fluss/row/InternalRow.java | 43 ++++++++++++------- 3 files changed, 59 insertions(+), 40 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 71536c8122..497ff4cb18 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -116,7 +116,8 @@ private static LogRecordReadContext createArrowReadContext( SchemaGetter schemaGetter) { // TODO: use a more reasonable memory limit BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); - FieldGetter[] fieldGetters = buildProjectedFieldGetters(dataRowType, selectedFields); + FieldGetter[] fieldGetters = + buildProjectedFieldGetters(dataRowType, selectedFields, LogFormat.ARROW); return new LogRecordReadContext( LogFormat.ARROW, dataRowType, @@ -191,7 +192,8 @@ public static LogRecordReadContext createCompactedRowReadContext( */ public static LogRecordReadContext createIndexedReadContext( RowType rowType, int schemaId, int[] selectedFields, SchemaGetter schemaGetter) { - FieldGetter[] fieldGetters = buildProjectedFieldGetters(rowType, selectedFields); + FieldGetter[] fieldGetters = + buildProjectedFieldGetters(rowType, selectedFields, LogFormat.INDEXED); // for INDEXED log format, the projection is NEVER push downed to the server side return new LogRecordReadContext( LogFormat.INDEXED, rowType, schemaId, null, fieldGetters, false, schemaGetter); @@ -222,7 +224,8 @@ public static LogRecordReadContext createCompactedRowReadContext( int schemaId, int[] selectedFields, @Nullable SchemaGetter schemaGetter) { - FieldGetter[] fieldGetters = buildProjectedFieldGetters(rowType, selectedFields); + FieldGetter[] fieldGetters = + buildProjectedFieldGetters(rowType, selectedFields, LogFormat.COMPACTED); // for COMPACTED log format, the projection is NEVER push downed to the server side return new LogRecordReadContext( LogFormat.COMPACTED, rowType, schemaId, null, fieldGetters, false, schemaGetter); @@ -325,14 +328,17 @@ private boolean isSameRowType(int schemaId) { return targetSchemaId == schemaId || isProjectionPushDowned(); } - private static FieldGetter[] buildProjectedFieldGetters(RowType rowType, int[] selectedFields) { + private static FieldGetter[] buildProjectedFieldGetters( + RowType rowType, int[] selectedFields, LogFormat logFormat) { + // ARROW already copies strings during deserialization; + // other formats reference pooled network buffers and need explicit copying. + boolean copyStrings = logFormat != LogFormat.ARROW; List dataTypeList = rowType.getChildren(); FieldGetter[] fieldGetters = new FieldGetter[selectedFields.length]; for (int i = 0; i < fieldGetters.length; i++) { - // build deep field getter to support nested types fieldGetters[i] = InternalRow.createDeepFieldGetter( - dataTypeList.get(selectedFields[i]), selectedFields[i]); + dataTypeList.get(selectedFields[i]), selectedFields[i], copyStrings); } return fieldGetters; } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java b/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java index 6575de1b39..c110e9a697 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java @@ -19,8 +19,6 @@ package org.apache.fluss.row; import org.apache.fluss.annotation.PublicEvolving; -import org.apache.fluss.row.columnar.ColumnarRow; -import org.apache.fluss.row.columnar.VectorizedColumnBatch; import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.DataType; import org.apache.fluss.types.MapType; @@ -161,23 +159,26 @@ static ElementGetter createElementGetter(DataType fieldType) { }; } - /** - * Creates a deep accessor for getting elements in an internal array data structure at the given - * position. It returns new objects (GenericArray/GenericMap/GenericMap) for nested - * array/map/row types. - * - *

NOTE: Currently, it is only used for deep copying {@link ColumnarRow} for Arrow which - * avoid the arrow buffer is released before accessing elements. It doesn't deep copy STRING and - * BYTES types, because {@link ColumnarRow} already deep copies the bytes, see {@link - * VectorizedColumnBatch#getString(int, int)}. This can be removed once we supports object reuse - * for Arrow {@link ColumnarRow}, see {@code CompletedFetch#toScanRecord(LogRecord)}. - */ - static ElementGetter createDeepElementGetter(DataType fieldType) { + /** Same as InternalRow.createDeepFieldGetter but for array elements. */ + static ElementGetter createDeepElementGetter(DataType fieldType, boolean copyStrings) { final ElementGetter elementGetter; switch (fieldType.getTypeRoot()) { + case CHAR: + final int charLen = getLength(fieldType); + elementGetter = + copyStrings + ? (array, pos) -> array.getChar(pos, charLen).copy() + : (array, pos) -> array.getChar(pos, charLen); + break; + case STRING: + elementGetter = + copyStrings + ? (array, pos) -> array.getString(pos).copy() + : InternalArray::getString; + break; case ARRAY: DataType nestedType = ((ArrayType) fieldType).getElementType(); - ElementGetter nestedGetter = createDeepElementGetter(nestedType); + ElementGetter nestedGetter = createDeepElementGetter(nestedType, copyStrings); elementGetter = (array, pos) -> { InternalArray inner = array.getArray(pos); @@ -191,8 +192,8 @@ static ElementGetter createDeepElementGetter(DataType fieldType) { case MAP: DataType keyType = ((MapType) fieldType).getKeyType(); DataType valueType = ((MapType) fieldType).getValueType(); - ElementGetter keyGetter = createDeepElementGetter(keyType); - ElementGetter valueGetter = createDeepElementGetter(valueType); + ElementGetter keyGetter = createDeepElementGetter(keyType, copyStrings); + ElementGetter valueGetter = createDeepElementGetter(valueType, copyStrings); elementGetter = (array, pos) -> { InternalMap inner = array.getMap(pos); @@ -212,7 +213,8 @@ static ElementGetter createDeepElementGetter(DataType fieldType) { int numFields = rowType.getFieldCount(); InternalRow.FieldGetter[] fieldGetters = new InternalRow.FieldGetter[numFields]; for (int i = 0; i < numFields; i++) { - fieldGetters[i] = InternalRow.createDeepFieldGetter(rowType.getTypeAt(i), i); + fieldGetters[i] = + InternalRow.createDeepFieldGetter(rowType.getTypeAt(i), i, copyStrings); } elementGetter = (array, pos) -> { diff --git a/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java b/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java index e19f42b2e0..6427266406 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java @@ -19,8 +19,6 @@ import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.record.ChangeType; -import org.apache.fluss.row.columnar.ColumnarRow; -import org.apache.fluss.row.columnar.VectorizedColumnBatch; import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.DataType; import org.apache.fluss.types.MapType; @@ -32,7 +30,6 @@ import java.util.HashMap; import java.util.Map; -import static org.apache.fluss.row.InternalArray.createDeepElementGetter; import static org.apache.fluss.types.DataTypeChecks.getLength; import static org.apache.fluss.types.DataTypeChecks.getPrecision; import static org.apache.fluss.types.DataTypeChecks.getScale; @@ -239,22 +236,35 @@ static FieldGetter createFieldGetter(DataType fieldType, int fieldPos) { } /** - * Creates a deep accessor for getting elements in an internal array data structure at the given - * position. It returns new objects (GenericArray/GenericMap/GenericMap) for nested - * array/map/row types. + * Creates a deep accessor for getting elements in an internal row data structure at the given + * position. Returns new objects (GenericArray/GenericMap/GenericRow) for nested array/map/row + * types to prevent use-after-free when the underlying buffer is released. * - *

NOTE: Currently, it is only used for deep copying {@link ColumnarRow} for Arrow which - * avoid the arrow buffer is released before accessing elements. It doesn't deep copy STRING and - * BYTES types, because {@link ColumnarRow} already deep copies the bytes, see {@link - * VectorizedColumnBatch#getString(int, int)}. This can be removed once we supports object reuse - * for Arrow {@link ColumnarRow}, see {@code CompletedFetch#toScanRecord(LogRecord)}. + *

ARROW already deep copies strings in VectorizedColumnBatch, so copyStrings should be + * false. INDEXED and COMPACTED rows reference pooled network buffers, so copyStrings should be + * true to copy STRING/CHAR via BinaryString.copy(). */ - static FieldGetter createDeepFieldGetter(DataType fieldType, int fieldPos) { + static FieldGetter createDeepFieldGetter( + DataType fieldType, int fieldPos, boolean copyStrings) { final FieldGetter fieldGetter; switch (fieldType.getTypeRoot()) { + case CHAR: + final int charLen = getLength(fieldType); + fieldGetter = + copyStrings + ? row -> row.getChar(fieldPos, charLen).copy() + : row -> row.getChar(fieldPos, charLen); + break; + case STRING: + fieldGetter = + copyStrings + ? row -> row.getString(fieldPos).copy() + : row -> row.getString(fieldPos); + break; case ARRAY: DataType elementType = ((ArrayType) fieldType).getElementType(); - InternalArray.ElementGetter nestedGetter = createDeepElementGetter(elementType); + InternalArray.ElementGetter nestedGetter = + InternalArray.createDeepElementGetter(elementType, copyStrings); fieldGetter = row -> { InternalArray array = row.getArray(fieldPos); @@ -268,9 +278,9 @@ static FieldGetter createDeepFieldGetter(DataType fieldType, int fieldPos) { case MAP: MapType mapType = (MapType) fieldType; InternalArray.ElementGetter keyGetter = - createDeepElementGetter(mapType.getKeyType()); + InternalArray.createDeepElementGetter(mapType.getKeyType(), copyStrings); InternalArray.ElementGetter valueGetter = - createDeepElementGetter(mapType.getValueType()); + InternalArray.createDeepElementGetter(mapType.getValueType(), copyStrings); fieldGetter = row -> { InternalMap map = row.getMap(fieldPos); @@ -288,7 +298,8 @@ static FieldGetter createDeepFieldGetter(DataType fieldType, int fieldPos) { int numFields = rowType.getFieldCount(); FieldGetter[] nestedFieldGetters = new FieldGetter[numFields]; for (int i = 0; i < numFields; i++) { - nestedFieldGetters[i] = createDeepFieldGetter(rowType.getTypeAt(i), i); + nestedFieldGetters[i] = + createDeepFieldGetter(rowType.getTypeAt(i), i, copyStrings); } fieldGetter = row -> {