Skip to content

Commit e911a1d

Browse files
committed
[common] Support variant type
1 parent 9a299db commit e911a1d

90 files changed

Lines changed: 10257 additions & 61 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/variant-shredding-design.md

Lines changed: 878 additions & 0 deletions
Large diffs are not rendered by default.

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import java.io.IOException;
2929
import java.util.List;
30+
import java.util.Map;
3031

3132
/**
3233
* Used to configure and create a scanner to scan data for a table.
@@ -60,6 +61,19 @@ public interface Scan {
6061
*/
6162
Scan limit(int rowNumber);
6263

64+
/**
65+
* Returns a new scan from this with variant sub-field projection hints. When a Variant column
66+
* has been shredded, specifying which sub-fields are needed allows the server to send only the
67+
* relevant typed_value children, reducing network transfer.
68+
*
69+
* <p>The map key is the column index (in the projected or full schema), and the value is the
70+
* list of field names to project within that Variant column.
71+
*
72+
* @param columnToFields mapping from Variant column index to desired field names, or null to
73+
* disable sub-field projection
74+
*/
75+
Scan variantFieldProjection(@Nullable Map<Integer, List<String>> columnToFields);
76+
6377
/**
6478
* Creates a {@link LogScanner} to continuously read log data for this scan.
6579
*

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import java.io.IOException;
4242
import java.util.List;
43+
import java.util.Map;
4344
import java.util.stream.Collectors;
4445
import java.util.stream.IntStream;
4546

@@ -53,27 +54,32 @@ public class TableScan implements Scan {
5354
@Nullable private final int[] projectedColumns;
5455
/** The limited row number to read. No limit if is null. */
5556
@Nullable private final Integer limit;
57+
/** Variant sub-field projection hints. Null means no sub-field projection. */
58+
@Nullable private final Map<Integer, List<String>> variantFieldProjection;
5659

5760
public TableScan(FlussConnection conn, TableInfo tableInfo, SchemaGetter schemaGetter) {
58-
this(conn, tableInfo, schemaGetter, null, null);
61+
this(conn, tableInfo, schemaGetter, null, null, null);
5962
}
6063

6164
private TableScan(
6265
FlussConnection conn,
6366
TableInfo tableInfo,
6467
SchemaGetter schemaGetter,
6568
@Nullable int[] projectedColumns,
66-
@Nullable Integer limit) {
69+
@Nullable Integer limit,
70+
@Nullable Map<Integer, List<String>> variantFieldProjection) {
6771
this.conn = conn;
6872
this.tableInfo = tableInfo;
6973
this.projectedColumns = projectedColumns;
7074
this.limit = limit;
7175
this.schemaGetter = schemaGetter;
76+
this.variantFieldProjection = variantFieldProjection;
7277
}
7378

7479
@Override
7580
public Scan project(@Nullable int[] projectedColumns) {
76-
return new TableScan(conn, tableInfo, schemaGetter, projectedColumns, limit);
81+
return new TableScan(
82+
conn, tableInfo, schemaGetter, projectedColumns, limit, variantFieldProjection);
7783
}
7884

7985
@Override
@@ -92,12 +98,20 @@ public Scan project(List<String> projectedColumnNames) {
9298
}
9399
columnIndexes[i] = index;
94100
}
95-
return new TableScan(conn, tableInfo, schemaGetter, columnIndexes, limit);
101+
return new TableScan(
102+
conn, tableInfo, schemaGetter, columnIndexes, limit, variantFieldProjection);
96103
}
97104

98105
@Override
99106
public Scan limit(int rowNumber) {
100-
return new TableScan(conn, tableInfo, schemaGetter, projectedColumns, rowNumber);
107+
return new TableScan(
108+
conn, tableInfo, schemaGetter, projectedColumns, rowNumber, variantFieldProjection);
109+
}
110+
111+
@Override
112+
public Scan variantFieldProjection(@Nullable Map<Integer, List<String>> columnToFields) {
113+
return new TableScan(
114+
conn, tableInfo, schemaGetter, projectedColumns, limit, columnToFields);
101115
}
102116

103117
@Override
@@ -116,6 +130,7 @@ public LogScanner createLogScanner() {
116130
conn.getClientMetricGroup(),
117131
conn.getOrCreateRemoteFileDownloader(),
118132
projectedColumns,
133+
variantFieldProjection,
119134
schemaGetter);
120135
}
121136

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.fluss.rpc.messages.PbFetchLogReqForTable;
5151
import org.apache.fluss.rpc.messages.PbFetchLogRespForBucket;
5252
import org.apache.fluss.rpc.messages.PbFetchLogRespForTable;
53+
import org.apache.fluss.rpc.messages.PbVariantFieldProjection;
5354
import org.apache.fluss.rpc.protocol.ApiError;
5455
import org.apache.fluss.rpc.protocol.Errors;
5556
import org.apache.fluss.utils.IOUtils;
@@ -93,6 +94,12 @@ public class LogFetcher implements Closeable {
9394
// bytes from remote file.
9495
private final LogRecordReadContext remoteReadContext;
9596
@Nullable private final Projection projection;
97+
/**
98+
* Variant sub-field projection hints. Maps Variant column index to desired sub-field names.
99+
* Null means no sub-field projection (initial version).
100+
*/
101+
@Nullable private final Map<Integer, List<String>> variantFieldProjection;
102+
96103
private final int maxFetchBytes;
97104
private final int maxBucketFetchBytes;
98105
private final int minFetchBytes;
@@ -121,13 +128,36 @@ public LogFetcher(
121128
ScannerMetricGroup scannerMetricGroup,
122129
RemoteFileDownloader remoteFileDownloader,
123130
SchemaGetter schemaGetter) {
131+
this(
132+
tableInfo,
133+
projection,
134+
null,
135+
logScannerStatus,
136+
conf,
137+
metadataUpdater,
138+
scannerMetricGroup,
139+
remoteFileDownloader,
140+
schemaGetter);
141+
}
142+
143+
public LogFetcher(
144+
TableInfo tableInfo,
145+
@Nullable Projection projection,
146+
@Nullable Map<Integer, List<String>> variantFieldProjection,
147+
LogScannerStatus logScannerStatus,
148+
Configuration conf,
149+
MetadataUpdater metadataUpdater,
150+
ScannerMetricGroup scannerMetricGroup,
151+
RemoteFileDownloader remoteFileDownloader,
152+
SchemaGetter schemaGetter) {
124153
this.tablePath = tableInfo.getTablePath();
125154
this.isPartitioned = tableInfo.isPartitioned();
126155
this.readContext =
127156
LogRecordReadContext.createReadContext(tableInfo, false, projection, schemaGetter);
128157
this.remoteReadContext =
129158
LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter);
130159
this.projection = projection;
160+
this.variantFieldProjection = variantFieldProjection;
131161
this.logScannerStatus = logScannerStatus;
132162
this.maxFetchBytes =
133163
(int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES).getBytes();
@@ -528,12 +558,23 @@ Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
528558
new PbFetchLogReqForTable().setTableId(finalTableId);
529559
if (readContext.isProjectionPushDowned()) {
530560
assert projection != null;
561+
int[] projectedFields = projection.getProjectionInOrder();
531562
reqForTable
532563
.setProjectionPushdownEnabled(true)
533-
.setProjectedFields(projection.getProjectionInOrder());
564+
.setProjectedFields(projectedFields);
534565
} else {
535566
reqForTable.setProjectionPushdownEnabled(false);
536567
}
568+
// Serialize variant sub-field projection hints if present
569+
if (variantFieldProjection != null && !variantFieldProjection.isEmpty()) {
570+
variantFieldProjection.forEach(
571+
(colIdx, fieldNames) -> {
572+
PbVariantFieldProjection vfp =
573+
reqForTable.addVariantFieldProjection();
574+
vfp.setColumnIndex(colIdx);
575+
vfp.addAllFieldNames(fieldNames);
576+
});
577+
}
537578
reqForTable.addAllBucketsReqs(reqForBuckets);
538579
fetchLogRequest.addAllTablesReqs(Collections.singletonList(reqForTable));
539580
fetchLogRequests.put(leaderId, fetchLogRequest);

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScan.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121

2222
import javax.annotation.Nullable;
2323

24+
import java.util.List;
25+
import java.util.Map;
26+
2427
/**
2528
* Used to describe the operation to scan log data by {@link LogScanner} to a table.
2629
*
@@ -32,12 +35,21 @@ public class LogScan {
3235
/** The projected fields to do projection. No projection if is null. */
3336
@Nullable private final int[] projectedFields;
3437

38+
/**
39+
* Variant sub-field projection hints. Maps Variant column index to the list of sub-field names
40+
* that should be projected. Null means no sub-field projection.
41+
*/
42+
@Nullable private final Map<Integer, List<String>> variantFieldProjection;
43+
3544
public LogScan() {
36-
this(null);
45+
this(null, null);
3746
}
3847

39-
private LogScan(@Nullable int[] projectedFields) {
48+
private LogScan(
49+
@Nullable int[] projectedFields,
50+
@Nullable Map<Integer, List<String>> variantFieldProjection) {
4051
this.projectedFields = projectedFields;
52+
this.variantFieldProjection = variantFieldProjection;
4153
}
4254

4355
/**
@@ -46,11 +58,26 @@ private LogScan(@Nullable int[] projectedFields) {
4658
* @param projectedFields the projection fields
4759
*/
4860
public LogScan withProjectedFields(int[] projectedFields) {
49-
return new LogScan(projectedFields);
61+
return new LogScan(projectedFields, variantFieldProjection);
62+
}
63+
64+
/**
65+
* Returns a new instance of LogScan description with variant sub-field projection hints.
66+
*
67+
* @param variantFieldProjection mapping from Variant column index to desired field names
68+
*/
69+
public LogScan withVariantFieldProjection(
70+
@Nullable Map<Integer, List<String>> variantFieldProjection) {
71+
return new LogScan(projectedFields, variantFieldProjection);
5072
}
5173

5274
@Nullable
5375
public int[] getProjectedFields() {
5476
return projectedFields;
5577
}
78+
79+
@Nullable
80+
public Map<Integer, List<String>> getVariantFieldProjection() {
81+
return variantFieldProjection;
82+
}
5683
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,26 @@ public LogScannerImpl(
8585
RemoteFileDownloader remoteFileDownloader,
8686
@Nullable int[] projectedFields,
8787
SchemaGetter schemaGetter) {
88+
this(
89+
conf,
90+
tableInfo,
91+
metadataUpdater,
92+
clientMetricGroup,
93+
remoteFileDownloader,
94+
projectedFields,
95+
null,
96+
schemaGetter);
97+
}
98+
99+
public LogScannerImpl(
100+
Configuration conf,
101+
TableInfo tableInfo,
102+
MetadataUpdater metadataUpdater,
103+
ClientMetricGroup clientMetricGroup,
104+
RemoteFileDownloader remoteFileDownloader,
105+
@Nullable int[] projectedFields,
106+
@Nullable Map<Integer, List<String>> variantFieldProjection,
107+
SchemaGetter schemaGetter) {
88108
this.tablePath = tableInfo.getTablePath();
89109
this.tableId = tableInfo.getTableId();
90110
this.isPartitionedTable = tableInfo.isPartitioned();
@@ -98,6 +118,7 @@ public LogScannerImpl(
98118
new LogFetcher(
99119
tableInfo,
100120
projection,
121+
variantFieldProjection,
101122
logScannerStatus,
102123
conf,
103124
metadataUpdater,
@@ -112,6 +133,8 @@ public LogScannerImpl(
112133
*/
113134
@Nullable
114135
private Projection sanityProjection(@Nullable int[] projectedFields, TableInfo tableInfo) {
136+
// Validate against the user-visible row type (excludes internal shredded columns like $v.x)
137+
// so that projection indices from callers (e.g. Flink) stay within user-visible bounds.
115138
RowType tableRowType = tableInfo.getRowType();
116139
if (projectedFields != null) {
117140
for (int projectedField : projectedFields) {

0 commit comments

Comments
 (0)