Search before asking
Fluss version
0.9.0 (latest release)
Please describe the bug 🐞
There is a NPE when a flink job reads a fluss table in batch mode, lake enabled, and 'start.up.mode' is not the default "FULL". (e.g. 'start.up.mode' = 'earlist'/'latest'/'timestamp' ).
-- Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to generate hybrid lake fluss splits
-- Caused by: java.lang.NullPointerException
-- at org.apache.fluss.flink.lake.LakeSplitGenerator.generateHybridLakeFlussSplits(LakeSplitGenerator.java:105)
-- at org.apache.fluss.flink.source.enumerator.FlinkSourceEnumerator.generateHybridLakeFlussSplits(FlinkSourceEnumerator.java:753)
The reason is:
- In FlinkTableSource.getScanRuntimeProvider(), a
enableLakeSource flag can be true only when it is FULL mode. Otherwise it will be false.
boolean enableLakeSource = false;
switch (startupOptions.startupMode) {
case EARLIEST:
offsetsInitializer = OffsetsInitializer.earliest();
break;
case LATEST:
offsetsInitializer = OffsetsInitializer.latest();
break;
case FULL:
offsetsInitializer = OffsetsInitializer.full();
enableLakeSource = lakeSource != null; // can be 'true' only in FULL mode
break;
case TIMESTAMP:
offsetsInitializer = OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
break;
}
- In FlinkSource constructor, this
enabledLakeSource flag determines the lakeSource in FlinkSource. It will be null when enabledLakeSource is false.
FlinkSource<RowData> source = new FlinkSource<>(..., offsetsInitializer, ...,
enableLakeSource ? lakeSource : null, // lakeSource will be null is not FULL
...);
- In FlinkSourceEnumerator.startInBatchMode(), only check
lakeEnabled, no check lakeSource is null or not. When generate splits, there will be a NPE since the lakeSource is null.
FlinkSourceEnumerator.startInBatchMode().
private void startInBatchMode() {
if (lakeEnabled) { // only check lakeEnabled,
context.callAsync(() -> {
List<SourceSplitBase> splits = generateHybridLakeFlussSplits(); // use lakeSource, which is null, throw NPE when generate splits
...
}, this::handleSplitsAdd);
} else {
throw new UnsupportedOperationException(
String.format(
"Batch only supports when table option '%s' is set to true.",
ConfigOptions.TABLE_DATALAKE_ENABLED));
}
}
Solution
No response
Are you willing to submit a PR?
Search before asking
Fluss version
0.9.0 (latest release)
Please describe the bug 🐞
There is a NPE when a flink job reads a fluss table in batch mode, lake enabled, and 'start.up.mode' is not the default "FULL". (e.g. 'start.up.mode' = 'earlist'/'latest'/'timestamp' ).
The reason is:
enableLakeSourceflag can betrueonly when it is FULL mode. Otherwise it will be false.enabledLakeSourceflag determines thelakeSourcein FlinkSource. It will be null whenenabledLakeSourceis false.lakeEnabled, no checklakeSourceis null or not. When generate splits, there will be a NPE since thelakeSourceis null.Solution
No response
Are you willing to submit a PR?