Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
878 changes: 878 additions & 0 deletions docs/variant-shredding-design.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* Used to configure and create a scanner to scan data for a table.
Expand Down Expand Up @@ -72,6 +73,19 @@ public interface Scan {
*/
Scan filter(@Nullable Predicate predicate);

/**
* Returns a new scan from this with variant sub-field projection hints. When a Variant column
* has been shredded, specifying which sub-fields are needed allows the server to send only the
* relevant typed_value children, reducing network transfer.
*
* <p>The map key is the column index (in the projected or full schema), and the value is the
* list of field names to project within that Variant column.
*
* @param columnToFields mapping from Variant column index to desired field names, or null to
* disable sub-field projection
*/
Scan variantFieldProjection(@Nullable Map<Integer, List<String>> columnToFields);

/**
* Creates a {@link LogScanner} to continuously read log data for this scan.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -56,12 +57,14 @@ public class TableScan implements Scan {

/** The limited row number to read. No limit if is null. */
@Nullable private final Integer limit;
/** Variant sub-field projection hints. Null means no sub-field projection. */
@Nullable private final Map<Integer, List<String>> variantFieldProjection;

/** The record batch filter to apply. No filter if is null. */
@Nullable private final Predicate recordBatchFilter;

public TableScan(FlussConnection conn, TableInfo tableInfo, SchemaGetter schemaGetter) {
this(conn, tableInfo, schemaGetter, null, null, null);
this(conn, tableInfo, schemaGetter, null, null, null, null);
}

private TableScan(
Expand All @@ -70,19 +73,27 @@ private TableScan(
SchemaGetter schemaGetter,
@Nullable int[] projectedColumns,
@Nullable Integer limit,
@Nullable Predicate recordBatchFilter) {
@Nullable Predicate recordBatchFilter,
@Nullable Map<Integer, List<String>> variantFieldProjection) {
this.conn = conn;
this.tableInfo = tableInfo;
this.projectedColumns = projectedColumns;
this.limit = limit;
this.schemaGetter = schemaGetter;
this.recordBatchFilter = recordBatchFilter;
this.variantFieldProjection = variantFieldProjection;
}

@Override
public Scan project(@Nullable int[] projectedColumns) {
return new TableScan(
conn, tableInfo, schemaGetter, projectedColumns, limit, recordBatchFilter);
conn,
tableInfo,
schemaGetter,
projectedColumns,
limit,
recordBatchFilter,
variantFieldProjection);
}

@Override
Expand All @@ -102,18 +113,49 @@ public Scan project(List<String> projectedColumnNames) {
columnIndexes[i] = index;
}
return new TableScan(
conn, tableInfo, schemaGetter, columnIndexes, limit, recordBatchFilter);
conn,
tableInfo,
schemaGetter,
columnIndexes,
limit,
recordBatchFilter,
variantFieldProjection);
}

@Override
public Scan limit(int rowNumber) {
return new TableScan(
conn, tableInfo, schemaGetter, projectedColumns, rowNumber, recordBatchFilter);
conn,
tableInfo,
schemaGetter,
projectedColumns,
rowNumber,
recordBatchFilter,
variantFieldProjection);
}

@Override
public Scan filter(@Nullable Predicate predicate) {
return new TableScan(conn, tableInfo, schemaGetter, projectedColumns, limit, predicate);
return new TableScan(
conn,
tableInfo,
schemaGetter,
projectedColumns,
limit,
predicate,
variantFieldProjection);
}

@Override
public Scan variantFieldProjection(@Nullable Map<Integer, List<String>> columnToFields) {
return new TableScan(
conn,
tableInfo,
schemaGetter,
projectedColumns,
limit,
recordBatchFilter,
columnToFields);
}

@Override
Expand Down Expand Up @@ -142,7 +184,8 @@ public LogScanner createLogScanner() {
conn.getOrCreateRemoteFileDownloader(),
projectedColumns,
schemaGetter,
recordBatchFilter);
recordBatchFilter,
variantFieldProjection);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.fluss.rpc.messages.PbFetchLogReqForTable;
import org.apache.fluss.rpc.messages.PbFetchLogRespForBucket;
import org.apache.fluss.rpc.messages.PbFetchLogRespForTable;
import org.apache.fluss.rpc.messages.PbVariantFieldProjection;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.Errors;
import org.apache.fluss.rpc.util.PredicateMessageUtils;
Expand Down Expand Up @@ -97,6 +98,12 @@ public class LogFetcher implements Closeable {
@Nullable private final Projection projection;
@Nullable private final org.apache.fluss.rpc.messages.PbPredicate cachedPbPredicate;
private final int filterSchemaId;
/**
* Variant sub-field projection hints. Maps Variant column index to desired sub-field names.
* Null means no sub-field projection (initial version).
*/
@Nullable private final Map<Integer, List<String>> variantFieldProjection;

private final int maxFetchBytes;
private final int maxBucketFetchBytes;
private final int minFetchBytes;
Expand All @@ -120,6 +127,7 @@ public LogFetcher(
TableInfo tableInfo,
@Nullable Projection projection,
@Nullable Predicate recordBatchFilter,
@Nullable Map<Integer, List<String>> variantFieldProjection,
LogScannerStatus logScannerStatus,
Configuration conf,
MetadataUpdater metadataUpdater,
Expand All @@ -139,6 +147,7 @@ public LogFetcher(
recordBatchFilter, tableInfo.getRowType())
: null;
this.filterSchemaId = tableInfo.getSchemaId();
this.variantFieldProjection = variantFieldProjection;
this.logScannerStatus = logScannerStatus;
this.maxFetchBytes =
(int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES).getBytes();
Expand Down Expand Up @@ -555,16 +564,27 @@ Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
new PbFetchLogReqForTable().setTableId(finalTableId);
if (readContext.isProjectionPushDowned()) {
assert projection != null;
int[] projectedFields = projection.getProjectionInOrder();
reqForTable
.setProjectionPushdownEnabled(true)
.setProjectedFields(projection.getProjectionInOrder());
.setProjectedFields(projectedFields);
} else {
reqForTable.setProjectionPushdownEnabled(false);
}
if (cachedPbPredicate != null) {
reqForTable.setFilterPredicate(cachedPbPredicate);
reqForTable.setFilterSchemaId(filterSchemaId);
}
// Serialize variant sub-field projection hints if present
if (variantFieldProjection != null && !variantFieldProjection.isEmpty()) {
variantFieldProjection.forEach(
(colIdx, fieldNames) -> {
PbVariantFieldProjection vfp =
reqForTable.addVariantFieldProjection();
vfp.setColumnIndex(colIdx);
vfp.addAllFieldNames(fieldNames);
});
}
reqForTable.addAllBucketsReqs(reqForBuckets);
fetchLogRequest.addAllTablesReqs(Collections.singletonList(reqForTable));
fetchLogRequests.put(leaderId, fetchLogRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;

/**
* Used to describe the operation to scan log data by {@link LogScanner} to a table.
*
Expand All @@ -32,12 +35,21 @@ public class LogScan {
/** The projected fields to do projection. No projection if is null. */
@Nullable private final int[] projectedFields;

/**
* Variant sub-field projection hints. Maps Variant column index to the list of sub-field names
* that should be projected. Null means no sub-field projection.
*/
@Nullable private final Map<Integer, List<String>> variantFieldProjection;

public LogScan() {
this(null);
this(null, null);
}

private LogScan(@Nullable int[] projectedFields) {
private LogScan(
@Nullable int[] projectedFields,
@Nullable Map<Integer, List<String>> variantFieldProjection) {
this.projectedFields = projectedFields;
this.variantFieldProjection = variantFieldProjection;
}

/**
Expand All @@ -46,11 +58,26 @@ private LogScan(@Nullable int[] projectedFields) {
* @param projectedFields the projection fields
*/
public LogScan withProjectedFields(int[] projectedFields) {
return new LogScan(projectedFields);
return new LogScan(projectedFields, variantFieldProjection);
}

/**
* Returns a new instance of LogScan description with variant sub-field projection hints.
*
* @param variantFieldProjection mapping from Variant column index to desired field names
*/
public LogScan withVariantFieldProjection(
@Nullable Map<Integer, List<String>> variantFieldProjection) {
return new LogScan(projectedFields, variantFieldProjection);
}

@Nullable
public int[] getProjectedFields() {
return projectedFields;
}

@Nullable
public Map<Integer, List<String>> getVariantFieldProjection() {
return variantFieldProjection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.time.Duration;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -83,7 +85,8 @@ public LogScannerImpl(
RemoteFileDownloader remoteFileDownloader,
@Nullable int[] projectedFields,
SchemaGetter schemaGetter,
@Nullable Predicate recordBatchFilter) {
@Nullable Predicate recordBatchFilter,
@Nullable Map<Integer, List<String>> variantFieldProjection) {
this.tablePath = tableInfo.getTablePath();
this.tableId = tableInfo.getTableId();
this.isPartitionedTable = tableInfo.isPartitioned();
Expand All @@ -98,6 +101,7 @@ public LogScannerImpl(
tableInfo,
projection,
recordBatchFilter,
variantFieldProjection,
logScannerStatus,
conf,
metadataUpdater,
Expand All @@ -112,6 +116,8 @@ public LogScannerImpl(
*/
@Nullable
private Projection sanityProjection(@Nullable int[] projectedFields, TableInfo tableInfo) {
// Validate against the user-visible row type (excludes internal shredded columns like $v.x)
// so that projection indices from callers (e.g. Flink) stay within user-visible bounds.
RowType tableRowType = tableInfo.getRowType();
if (projectedFields != null) {
for (int projectedField : projectedFields) {
Expand Down
Loading
Loading