Skip to content

Commit 80e6505

Browse files
committed
[common] Support variant type
1 parent 9a299db commit 80e6505

81 files changed

Lines changed: 9506 additions & 50 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/variant-shredding-design.md

Lines changed: 810 additions & 0 deletions
Large diffs are not rendered by default.

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,9 +528,10 @@ Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
528528
new PbFetchLogReqForTable().setTableId(finalTableId);
529529
if (readContext.isProjectionPushDowned()) {
530530
assert projection != null;
531+
int[] projectedFields = projection.getProjectionInOrder();
531532
reqForTable
532533
.setProjectionPushdownEnabled(true)
533-
.setProjectedFields(projection.getProjectionInOrder());
534+
.setProjectedFields(projectedFields);
534535
} else {
535536
reqForTable.setProjectionPushdownEnabled(false);
536537
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ public LogScannerImpl(
112112
*/
113113
@Nullable
114114
private Projection sanityProjection(@Nullable int[] projectedFields, TableInfo tableInfo) {
115+
// Validate against the user-visible row type (excludes internal shredded columns like $v.x)
116+
// so that projection indices from callers (e.g. Flink) stay within user-visible bounds.
115117
RowType tableRowType = tableInfo.getRowType();
116118
if (projectedFields != null) {
117119
for (int projectedField : projectedFields) {

fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,13 @@
3333
import org.apache.fluss.metadata.TableInfo;
3434
import org.apache.fluss.metrics.MetricNames;
3535
import org.apache.fluss.record.LogRecordBatchStatisticsCollector;
36+
import org.apache.fluss.row.InternalRow;
3637
import org.apache.fluss.row.arrow.ArrowWriter;
3738
import org.apache.fluss.row.arrow.ArrowWriterPool;
3839
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
3940
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
41+
import org.apache.fluss.types.variant.ShreddingSchema;
42+
import org.apache.fluss.types.variant.ShreddingSchemaInferrer;
4043
import org.apache.fluss.utils.CopyOnWriteMap;
4144
import org.apache.fluss.utils.MathUtils;
4245
import org.apache.fluss.utils.clock.Clock;
@@ -113,6 +116,13 @@ public final class RecordAccumulator {
113116
private final Clock clock;
114117
private final DynamicWriteBatchSizeEstimator batchSizeEstimator;
115118

119+
/**
120+
* Per-table {@link VariantShreddingManager}s. Created lazily on first append to an ARROW_LOG
121+
* table with Variant shredding enabled and at least one Variant column.
122+
*/
123+
private final ConcurrentMap<PhysicalTablePath, VariantShreddingManager> shreddingManagers =
124+
new CopyOnWriteMap<>();
125+
116126
// TODO add retryBackoffMs to retry the produce request upon receiving an error.
117127
// TODO add deliveryTimeoutMs to report success or failure on record delivery.
118128
// TODO add nextBatchExpiryTimeMs
@@ -158,6 +168,71 @@ private void registerMetrics(WriterMetricGroup writerMetricGroup) {
158168
MetricNames.WRITER_BUFFER_WAITING_THREADS, writerBufferPool::queued);
159169
}
160170

171+
/**
172+
* Collects Variant statistics for the row being appended, and — once enough samples have been
173+
* observed — stores the inferred shredding schema locally in the per-table manager.
174+
*
175+
* <p>This method is a no-op when:
176+
*
177+
* <ul>
178+
* <li>the write format is not {@link WriteFormat#ARROW_LOG}
179+
* <li>the table has no Variant columns
180+
* <li>Variant shredding is disabled in the table's configuration
181+
* </ul>
182+
*/
183+
private void maybeCollectVariantStats(
184+
PhysicalTablePath physicalTablePath,
185+
TableInfo tableInfo,
186+
WriteFormat writeFormat,
187+
InternalRow row) {
188+
if (writeFormat != WriteFormat.ARROW_LOG) {
189+
return;
190+
}
191+
if (!tableInfo.isVariantShreddingEnabled()) {
192+
return;
193+
}
194+
int[] variantIndices = tableInfo.getVariantColumnIndices();
195+
if (variantIndices.length == 0) {
196+
return;
197+
}
198+
199+
VariantShreddingManager manager =
200+
shreddingManagers.computeIfAbsent(
201+
physicalTablePath,
202+
path -> {
203+
String[] colNames = new String[variantIndices.length];
204+
for (int i = 0; i < variantIndices.length; i++) {
205+
colNames[i] =
206+
tableInfo
207+
.getRowType()
208+
.getFields()
209+
.get(variantIndices[i])
210+
.getName();
211+
}
212+
ShreddingSchemaInferrer inferrer =
213+
new ShreddingSchemaInferrer()
214+
.setPresenceThreshold(
215+
tableInfo
216+
.getTableConfig()
217+
.getVariantShreddingPresenceThreshold())
218+
.setTypeConsistencyThreshold(
219+
tableInfo
220+
.getTableConfig()
221+
.getVariantShreddingTypeConsistencyThreshold())
222+
.setMaxShreddedFields(
223+
tableInfo
224+
.getTableConfig()
225+
.getVariantShreddingMaxFields())
226+
.setMinSampleSize(
227+
tableInfo
228+
.getTableConfig()
229+
.getVariantShreddingMinSampleSize());
230+
return new VariantShreddingManager(
231+
path.getTablePath(), variantIndices, colNames, inferrer);
232+
});
233+
manager.collectRow(row);
234+
}
235+
161236
/**
162237
* Add a record to the accumulator, return to append result.
163238
*
@@ -195,6 +270,12 @@ public RecordAppendResult append(
195270
synchronized (dq) {
196271
RecordAppendResult appendResult = tryAppend(writeRecord, callback, dq);
197272
if (appendResult != null) {
273+
// Row was appended to an existing batch; collect Variant statistics.
274+
maybeCollectVariantStats(
275+
physicalTablePath,
276+
tableInfo,
277+
writeRecord.getWriteFormat(),
278+
writeRecord.getRow());
198279
return appendResult;
199280
}
200281
}
@@ -212,6 +293,12 @@ public RecordAppendResult append(
212293
writeRecord, callback, bucketId, tableInfo, dq, memorySegments);
213294
if (appendResult.newBatchCreated) {
214295
memorySegments = Collections.emptyList();
296+
// Row was appended to the new batch; collect Variant statistics.
297+
maybeCollectVariantStats(
298+
physicalTablePath,
299+
tableInfo,
300+
writeRecord.getWriteFormat(),
301+
writeRecord.getRow());
215302
}
216303
return appendResult;
217304
}
@@ -622,13 +709,23 @@ private WriteBatch createWriteBatch(
622709
clock.milliseconds());
623710

624711
case ARROW_LOG:
712+
// Get shredding schemas from the local manager (Writer-independent decision)
713+
Map<String, ShreddingSchema> shreddingSchemas = null;
714+
VariantShreddingManager mgr = shreddingManagers.get(physicalTablePath);
715+
if (mgr != null) {
716+
Map<String, ShreddingSchema> inferred = mgr.getShreddingSchemas();
717+
if (!inferred.isEmpty()) {
718+
shreddingSchemas = inferred;
719+
}
720+
}
625721
ArrowWriter arrowWriter =
626722
arrowWriterPool.getOrCreateWriter(
627723
tableInfo.getTableId(),
628724
schemaId,
629725
outputView.getPreAllocatedSize(),
630726
tableInfo.getRowType(),
631-
tableInfo.getTableConfig().getArrowCompressionInfo());
727+
tableInfo.getTableConfig().getArrowCompressionInfo(),
728+
shreddingSchemas);
632729
LogRecordBatchStatisticsCollector statisticsCollector = null;
633730
if (tableInfo.isStatisticsEnabled()) {
634731
statisticsCollector =
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.write;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.metadata.TablePath;
22+
import org.apache.fluss.row.InternalRow;
23+
import org.apache.fluss.types.variant.ShreddingSchema;
24+
import org.apache.fluss.types.variant.ShreddingSchemaInferrer;
25+
import org.apache.fluss.types.variant.Variant;
26+
import org.apache.fluss.types.variant.VariantStatisticsCollector;
27+
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.util.Collections;
32+
import java.util.HashMap;
33+
import java.util.Map;
34+
35+
/**
36+
* Manages automatic Variant shredding inference for a single table on the client write path.
37+
*
38+
* <p>Uses the Writer-independent-decision approach (Plan A): each Writer independently samples,
39+
* infers, and decides the typed_value layout for its own batches. No server-side coordination or
40+
* RPC is involved.
41+
*
42+
* <p>Each time a row is appended to a write batch ({@link #collectRow(InternalRow)}), this manager
43+
* extracts the Variant values from the row's Variant-typed columns and feeds them into per-column
44+
* {@link VariantStatisticsCollector}s. Once the minimum sample threshold is met and a non-empty
45+
* {@link ShreddingSchema} is inferred, the result is stored locally and made available via {@link
46+
* #getShreddingSchemas()}.
47+
*
48+
* <p>Inference is triggered <em>at most once</em> per manager instance. Once schemas are inferred,
49+
* subsequent calls to {@link #collectRow} are no-ops.
50+
*
51+
* <p>Thread safety: {@link #collectRow} is called from the writer thread and is guarded by the
52+
* deque lock in {@link RecordAccumulator}.
53+
*/
54+
@Internal
55+
public class VariantShreddingManager {
56+
57+
private static final Logger LOG = LoggerFactory.getLogger(VariantShreddingManager.class);
58+
59+
private final TablePath tablePath;
60+
61+
/**
62+
* Column indices (into the row's schema) of all Variant-typed columns. Each index maps to the
63+
* corresponding {@link VariantStatisticsCollector} in {@link #collectors} at the same array
64+
* position.
65+
*/
66+
private final int[] variantColumnIndices;
67+
68+
/**
69+
* Names of the Variant columns, used to construct the column-name-based {@link
70+
* ShreddingSchema}.
71+
*/
72+
private final String[] variantColumnNames;
73+
74+
/** One statistics collector per Variant column. */
75+
private final VariantStatisticsCollector[] collectors;
76+
77+
/** Inferrer, configured from the table's shredding options. */
78+
private final ShreddingSchemaInferrer inferrer;
79+
80+
/** Whether inference has already been completed. */
81+
private volatile boolean inferenceCompleted = false;
82+
83+
/**
84+
* Locally inferred shredding schemas. Key is the variant column name. Empty until inference
85+
* completes with non-empty results.
86+
*/
87+
private volatile Map<String, ShreddingSchema> inferredSchemas = Collections.emptyMap();
88+
89+
public VariantShreddingManager(
90+
TablePath tablePath,
91+
int[] variantColumnIndices,
92+
String[] variantColumnNames,
93+
ShreddingSchemaInferrer inferrer) {
94+
this.tablePath = tablePath;
95+
this.variantColumnIndices = variantColumnIndices;
96+
this.variantColumnNames = variantColumnNames;
97+
this.inferrer = inferrer;
98+
99+
this.collectors = new VariantStatisticsCollector[variantColumnIndices.length];
100+
for (int i = 0; i < variantColumnIndices.length; i++) {
101+
this.collectors[i] = new VariantStatisticsCollector();
102+
}
103+
}
104+
105+
/**
106+
* Collects statistics from one row that is about to be (or has just been) written.
107+
*
108+
* <p>This method extracts the Variant value at each variant-column index from {@code row} and
109+
* feeds it into the corresponding {@link VariantStatisticsCollector}. Once enough samples are
110+
* collected and the inferrer produces a non-empty schema, the result is stored locally.
111+
*
112+
* @param row the row being written
113+
*/
114+
public void collectRow(InternalRow row) {
115+
if (inferenceCompleted) {
116+
return;
117+
}
118+
119+
for (int c = 0; c < variantColumnIndices.length; c++) {
120+
int colIdx = variantColumnIndices[c];
121+
Variant variant = row.isNullAt(colIdx) ? null : row.getVariant(colIdx);
122+
collectors[c].collect(variant);
123+
}
124+
125+
maybeInfer();
126+
}
127+
128+
/**
129+
* Returns the locally inferred shredding schemas. Returns an empty map if inference has not yet
130+
* completed or produced no results.
131+
*
132+
* @return unmodifiable map of variant column name to its ShreddingSchema
133+
*/
134+
public Map<String, ShreddingSchema> getShreddingSchemas() {
135+
return inferredSchemas;
136+
}
137+
138+
// --------------------------------------------------------------------------------------------
139+
// Internal helpers
140+
// --------------------------------------------------------------------------------------------
141+
142+
private void maybeInfer() {
143+
Map<String, ShreddingSchema> schemas = new HashMap<>();
144+
145+
for (int c = 0; c < variantColumnIndices.length; c++) {
146+
VariantStatisticsCollector collector = collectors[c];
147+
long totalRecords = collector.getTotalRecords();
148+
149+
// Skip inference until we have enough samples to be statistically meaningful.
150+
if (totalRecords < inferrer.getMinSampleSize()) {
151+
return;
152+
}
153+
154+
ShreddingSchema schema =
155+
inferrer.infer(variantColumnNames[c], collector.getStatistics(), totalRecords);
156+
if (!schema.getFields().isEmpty()) {
157+
schemas.put(variantColumnNames[c], schema);
158+
}
159+
}
160+
161+
if (!schemas.isEmpty()) {
162+
inferredSchemas = Collections.unmodifiableMap(schemas);
163+
inferenceCompleted = true;
164+
LOG.info(
165+
"Inferred Variant shredding schemas for table {} (Writer-local): {}",
166+
tablePath,
167+
inferredSchemas);
168+
}
169+
}
170+
}

0 commit comments

Comments
 (0)