Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ private static LogRecordReadContext createArrowReadContext(
SchemaGetter schemaGetter) {
// TODO: use a more reasonable memory limit
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
FieldGetter[] fieldGetters = buildProjectedFieldGetters(dataRowType, selectedFields);
FieldGetter[] fieldGetters =
buildProjectedFieldGetters(dataRowType, selectedFields, LogFormat.ARROW);
return new LogRecordReadContext(
LogFormat.ARROW,
dataRowType,
Expand Down Expand Up @@ -191,7 +192,8 @@ public static LogRecordReadContext createCompactedRowReadContext(
*/
public static LogRecordReadContext createIndexedReadContext(
RowType rowType, int schemaId, int[] selectedFields, SchemaGetter schemaGetter) {
FieldGetter[] fieldGetters = buildProjectedFieldGetters(rowType, selectedFields);
FieldGetter[] fieldGetters =
buildProjectedFieldGetters(rowType, selectedFields, LogFormat.INDEXED);
// for INDEXED log format, the projection is NEVER push downed to the server side
return new LogRecordReadContext(
LogFormat.INDEXED, rowType, schemaId, null, fieldGetters, false, schemaGetter);
Expand Down Expand Up @@ -222,7 +224,8 @@ public static LogRecordReadContext createCompactedRowReadContext(
int schemaId,
int[] selectedFields,
@Nullable SchemaGetter schemaGetter) {
FieldGetter[] fieldGetters = buildProjectedFieldGetters(rowType, selectedFields);
FieldGetter[] fieldGetters =
buildProjectedFieldGetters(rowType, selectedFields, LogFormat.COMPACTED);
// for COMPACTED log format, the projection is NEVER push downed to the server side
return new LogRecordReadContext(
LogFormat.COMPACTED, rowType, schemaId, null, fieldGetters, false, schemaGetter);
Expand Down Expand Up @@ -325,14 +328,17 @@ private boolean isSameRowType(int schemaId) {
return targetSchemaId == schemaId || isProjectionPushDowned();
}

private static FieldGetter[] buildProjectedFieldGetters(RowType rowType, int[] selectedFields) {
private static FieldGetter[] buildProjectedFieldGetters(
RowType rowType, int[] selectedFields, LogFormat logFormat) {
// ARROW already copies strings during deserialization;
// other formats reference pooled network buffers and need explicit copying.
boolean copyStrings = logFormat != LogFormat.ARROW;
List<DataType> dataTypeList = rowType.getChildren();
FieldGetter[] fieldGetters = new FieldGetter[selectedFields.length];
for (int i = 0; i < fieldGetters.length; i++) {
// build deep field getter to support nested types
fieldGetters[i] =
InternalRow.createDeepFieldGetter(
dataTypeList.get(selectedFields[i]), selectedFields[i]);
dataTypeList.get(selectedFields[i]), selectedFields[i], copyStrings);
}
return fieldGetters;
}
Expand Down
38 changes: 20 additions & 18 deletions fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.fluss.row;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.row.columnar.ColumnarRow;
import org.apache.fluss.row.columnar.VectorizedColumnBatch;
import org.apache.fluss.types.ArrayType;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.MapType;
Expand Down Expand Up @@ -161,23 +159,26 @@ static ElementGetter createElementGetter(DataType fieldType) {
};
}

/**
* Creates a deep accessor for getting elements in an internal array data structure at the given
* position. It returns new objects (GenericArray/GenericMap/GenericMap) for nested
* array/map/row types.
*
* <p>NOTE: Currently, it is only used for deep copying {@link ColumnarRow} for Arrow which
* avoid the arrow buffer is released before accessing elements. It doesn't deep copy STRING and
* BYTES types, because {@link ColumnarRow} already deep copies the bytes, see {@link
* VectorizedColumnBatch#getString(int, int)}. This can be removed once we supports object reuse
* for Arrow {@link ColumnarRow}, see {@code CompletedFetch#toScanRecord(LogRecord)}.
*/
static ElementGetter createDeepElementGetter(DataType fieldType) {
/** Same as InternalRow.createDeepFieldGetter but for array elements. */
static ElementGetter createDeepElementGetter(DataType fieldType, boolean copyStrings) {
final ElementGetter elementGetter;
switch (fieldType.getTypeRoot()) {
case CHAR:
final int charLen = getLength(fieldType);
elementGetter =
copyStrings
? (array, pos) -> array.getChar(pos, charLen).copy()
: (array, pos) -> array.getChar(pos, charLen);
break;
case STRING:
elementGetter =
copyStrings
? (array, pos) -> array.getString(pos).copy()
: InternalArray::getString;
break;
case ARRAY:
DataType nestedType = ((ArrayType) fieldType).getElementType();
ElementGetter nestedGetter = createDeepElementGetter(nestedType);
ElementGetter nestedGetter = createDeepElementGetter(nestedType, copyStrings);
elementGetter =
(array, pos) -> {
InternalArray inner = array.getArray(pos);
Expand All @@ -191,8 +192,8 @@ static ElementGetter createDeepElementGetter(DataType fieldType) {
case MAP:
DataType keyType = ((MapType) fieldType).getKeyType();
DataType valueType = ((MapType) fieldType).getValueType();
ElementGetter keyGetter = createDeepElementGetter(keyType);
ElementGetter valueGetter = createDeepElementGetter(valueType);
ElementGetter keyGetter = createDeepElementGetter(keyType, copyStrings);
ElementGetter valueGetter = createDeepElementGetter(valueType, copyStrings);
elementGetter =
(array, pos) -> {
InternalMap inner = array.getMap(pos);
Expand All @@ -212,7 +213,8 @@ static ElementGetter createDeepElementGetter(DataType fieldType) {
int numFields = rowType.getFieldCount();
InternalRow.FieldGetter[] fieldGetters = new InternalRow.FieldGetter[numFields];
for (int i = 0; i < numFields; i++) {
fieldGetters[i] = InternalRow.createDeepFieldGetter(rowType.getTypeAt(i), i);
fieldGetters[i] =
InternalRow.createDeepFieldGetter(rowType.getTypeAt(i), i, copyStrings);
}
elementGetter =
(array, pos) -> {
Expand Down
43 changes: 27 additions & 16 deletions fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.row.columnar.ColumnarRow;
import org.apache.fluss.row.columnar.VectorizedColumnBatch;
import org.apache.fluss.types.ArrayType;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.MapType;
Expand All @@ -32,7 +30,6 @@
import java.util.HashMap;
import java.util.Map;

import static org.apache.fluss.row.InternalArray.createDeepElementGetter;
import static org.apache.fluss.types.DataTypeChecks.getLength;
import static org.apache.fluss.types.DataTypeChecks.getPrecision;
import static org.apache.fluss.types.DataTypeChecks.getScale;
Expand Down Expand Up @@ -239,22 +236,35 @@ static FieldGetter createFieldGetter(DataType fieldType, int fieldPos) {
}

/**
* Creates a deep accessor for getting elements in an internal array data structure at the given
* position. It returns new objects (GenericArray/GenericMap/GenericMap) for nested
* array/map/row types.
* Creates a deep accessor for getting elements in an internal row data structure at the given
* position. Returns new objects (GenericArray/GenericMap/GenericRow) for nested array/map/row
* types to prevent use-after-free when the underlying buffer is released.
*
* <p>NOTE: Currently, it is only used for deep copying {@link ColumnarRow} for Arrow which
* avoid the arrow buffer is released before accessing elements. It doesn't deep copy STRING and
* BYTES types, because {@link ColumnarRow} already deep copies the bytes, see {@link
* VectorizedColumnBatch#getString(int, int)}. This can be removed once we supports object reuse
* for Arrow {@link ColumnarRow}, see {@code CompletedFetch#toScanRecord(LogRecord)}.
* <p>ARROW already deep copies strings in VectorizedColumnBatch, so copyStrings should be
* false. INDEXED and COMPACTED rows reference pooled network buffers, so copyStrings should be
* true to copy STRING/CHAR via BinaryString.copy().
*/
static FieldGetter createDeepFieldGetter(DataType fieldType, int fieldPos) {
static FieldGetter createDeepFieldGetter(
DataType fieldType, int fieldPos, boolean copyStrings) {
final FieldGetter fieldGetter;
switch (fieldType.getTypeRoot()) {
case CHAR:
final int charLen = getLength(fieldType);
fieldGetter =
copyStrings
? row -> row.getChar(fieldPos, charLen).copy()
: row -> row.getChar(fieldPos, charLen);
break;
case STRING:
fieldGetter =
copyStrings
? row -> row.getString(fieldPos).copy()
: row -> row.getString(fieldPos);
break;
case ARRAY:
DataType elementType = ((ArrayType) fieldType).getElementType();
InternalArray.ElementGetter nestedGetter = createDeepElementGetter(elementType);
InternalArray.ElementGetter nestedGetter =
InternalArray.createDeepElementGetter(elementType, copyStrings);
fieldGetter =
row -> {
InternalArray array = row.getArray(fieldPos);
Expand All @@ -268,9 +278,9 @@ static FieldGetter createDeepFieldGetter(DataType fieldType, int fieldPos) {
case MAP:
MapType mapType = (MapType) fieldType;
InternalArray.ElementGetter keyGetter =
createDeepElementGetter(mapType.getKeyType());
InternalArray.createDeepElementGetter(mapType.getKeyType(), copyStrings);
InternalArray.ElementGetter valueGetter =
createDeepElementGetter(mapType.getValueType());
InternalArray.createDeepElementGetter(mapType.getValueType(), copyStrings);
fieldGetter =
row -> {
InternalMap map = row.getMap(fieldPos);
Expand All @@ -288,7 +298,8 @@ static FieldGetter createDeepFieldGetter(DataType fieldType, int fieldPos) {
int numFields = rowType.getFieldCount();
FieldGetter[] nestedFieldGetters = new FieldGetter[numFields];
for (int i = 0; i < numFields; i++) {
nestedFieldGetters[i] = createDeepFieldGetter(rowType.getTypeAt(i), i);
nestedFieldGetters[i] =
createDeepFieldGetter(rowType.getTypeAt(i), i, copyStrings);
}
fieldGetter =
row -> {
Expand Down