Skip to content

[flink] Fix NPE then flink reading in batch mode and "scan.startup.mode" is not "FULL". #3055

@gyang94

Description

@gyang94

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

0.9.0 (latest release)

Please describe the bug 🐞

There is a NPE when a flink job reads a fluss table in batch mode, lake enabled, and 'start.up.mode' is not the default "FULL". (e.g. 'start.up.mode' = 'earlist'/'latest'/'timestamp' ).

-- Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to generate hybrid lake fluss splits
-- Caused by: java.lang.NullPointerException
	-- at org.apache.fluss.flink.lake.LakeSplitGenerator.generateHybridLakeFlussSplits(LakeSplitGenerator.java:105)
	-- at org.apache.fluss.flink.source.enumerator.FlinkSourceEnumerator.generateHybridLakeFlussSplits(FlinkSourceEnumerator.java:753)

The reason is:

  1. In FlinkTableSource.getScanRuntimeProvider(), a enableLakeSource flag can be true only when it is FULL mode. Otherwise it will be false.
boolean enableLakeSource = false;
switch (startupOptions.startupMode) {
      case EARLIEST:
          offsetsInitializer = OffsetsInitializer.earliest();
          break;
      case LATEST:
          offsetsInitializer = OffsetsInitializer.latest();
          break;
      case FULL:
          offsetsInitializer = OffsetsInitializer.full();
          enableLakeSource = lakeSource != null;   // can be 'true' only in FULL mode
          break;
      case TIMESTAMP:
          offsetsInitializer = OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
          break;
  }
  1. In FlinkSource constructor, this enabledLakeSource flag determines the lakeSource in FlinkSource. It will be null when enabledLakeSource is false.
FlinkSource<RowData> source = new FlinkSource<>(..., offsetsInitializer, ...,
          enableLakeSource ? lakeSource : null,  // lakeSource will be null is not FULL
          ...);
  1. In FlinkSourceEnumerator.startInBatchMode(), only check lakeEnabled, no check lakeSource is null or not. When generate splits, there will be a NPE since the lakeSource is null.
FlinkSourceEnumerator.startInBatchMode().

  private void startInBatchMode() {
      if (lakeEnabled) {  // only check lakeEnabled, 
          context.callAsync(() -> {
              List<SourceSplitBase> splits = generateHybridLakeFlussSplits();  // use lakeSource, which is null, throw NPE when generate splits
              ...
          }, this::handleSplitsAdd);
      } else {
            throw new UnsupportedOperationException(
                    String.format(
                            "Batch only supports when table option '%s' is set to true.",
                            ConfigOptions.TABLE_DATALAKE_ENABLED));
        }
  }

Solution

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions