Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ public StreamPhysicalRel visit(
} else if (rel instanceof StreamPhysicalMLPredictTableFunction) {
return visitMLPredictTableFunction(
(StreamPhysicalMLPredictTableFunction) rel, requireDeterminism);
} else if (rel instanceof StreamPhysicalChangelogNormalize
|| rel instanceof StreamPhysicalDropUpdateBefore
} else if (rel instanceof StreamPhysicalChangelogNormalize) {
return visitChangelogNormalize(
(StreamPhysicalChangelogNormalize) rel, requireDeterminism);
} else if (rel instanceof StreamPhysicalDropUpdateBefore
|| rel instanceof StreamPhysicalMiniBatchAssigner
|| rel instanceof StreamPhysicalUnion
|| rel instanceof StreamPhysicalSort
Expand Down Expand Up @@ -342,6 +344,21 @@ private StreamPhysicalRel visitMLPredictTableFunction(
return transmitDeterminismRequirement(predictTableFunction, NO_REQUIRED_DETERMINISM);
}

/**
* ChangelogNormalize may have a filter condition pushed down by the optimizer. A
* non-deterministic filter (e.g. NOW()) can asymmetrically drop -U and +U records, corrupting
* the retract contract for downstream operators.
*/
private StreamPhysicalRel visitChangelogNormalize(
final StreamPhysicalChangelogNormalize changelogNormalize,
final ImmutableBitSet requireDeterminism) {
final RexNode filterCondition = changelogNormalize.filterCondition();
if (filterCondition != null) {
checkNonDeterministicCondition(filterCondition, changelogNormalize);
}
return transmitDeterminismRequirement(changelogNormalize, requireDeterminism);
}

private StreamPhysicalRel visitCorrelate(
final StreamPhysicalCorrelateBase correlate, final ImmutableBitSet requireDeterminism) {
if (inputInsertOnly(correlate) || requireDeterminism.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import scala.Enumeration;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static scala.runtime.BoxedUnit.UNIT;
Expand Down Expand Up @@ -424,4 +425,86 @@ void testOverAggregateWithNonDeterminismInProjection() {

assertEquals(expectedOverAggNduErrorMsg, tableException.getMessage());
}

@Test
void testNowFilterPushedIntoChangelogNormalizeOnUpsertSource() {
tEnv.getConfig()
.set(
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
NonDeterministicUpdateStrategy.TRY_RESOLVE);

tEnv.executeSql(
"CREATE TEMPORARY TABLE upsert_src (\n"
+ " a INT,\n"
+ " b BIGINT,\n"
+ " c STRING,\n"
+ " ts TIMESTAMP(3),\n"
+ " PRIMARY KEY (a) NOT ENFORCED\n"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'changelog-mode' = 'I,UA,D'\n"
+ ")");

final String sql =
"INSERT INTO sink_with_pk SELECT a, b, c FROM upsert_src"
+ " WHERE ts >= NOW() - INTERVAL '90' DAY";

assertThatThrownBy(() -> util.verifyJsonPlan(sql))
.isInstanceOf(TableException.class)
.hasMessageContaining("non deterministic function")
.hasMessageContaining("NOW");
}

@Test
void testCurrentTimestampFilterPushedIntoChangelogNormalizeOnUpsertSource() {
tEnv.getConfig()
.set(
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
NonDeterministicUpdateStrategy.TRY_RESOLVE);

tEnv.executeSql(
"CREATE TEMPORARY TABLE upsert_src2 (\n"
+ " a INT,\n"
+ " b BIGINT,\n"
+ " c STRING,\n"
+ " ts TIMESTAMP(3),\n"
+ " PRIMARY KEY (a) NOT ENFORCED\n"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'changelog-mode' = 'I,UA,D'\n"
+ ")");

final String sql =
"INSERT INTO sink_with_pk SELECT a, b, c FROM upsert_src2"
+ " WHERE ts >= CURRENT_TIMESTAMP - INTERVAL '90' DAY";

assertThatThrownBy(() -> util.verifyJsonPlan(sql))
.isInstanceOf(TableException.class)
.hasMessageContaining("non deterministic function")
.hasMessageContaining("CURRENT_TIMESTAMP");
}

@Test
void testDeterministicFilterOnUpsertSourceIsAllowed() {
tEnv.getConfig()
.set(
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
NonDeterministicUpdateStrategy.TRY_RESOLVE);

tEnv.executeSql(
"CREATE TEMPORARY TABLE upsert_src3 (\n"
+ " a INT,\n"
+ " b BIGINT,\n"
+ " c STRING,\n"
+ " PRIMARY KEY (a) NOT ENFORCED\n"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'changelog-mode' = 'I,UA,D'\n"
+ ")");

// Deterministic filter should compile without error
final String sql = "INSERT INTO sink_with_pk SELECT a, b, c FROM upsert_src3 WHERE b > 100";

util.verifyJsonPlan(sql);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
{
"flinkVersion" : "",
"nodes" : [ {
"id" : 1,
"type" : "stream-exec-table-source-scan_2",
"scanTableSource" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`upsert_src3`"
},
"abilities" : [ {
"type" : "FilterPushDown",
"predicates" : [ ]
} ]
},
"outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
"description" : "TableSourceScan(table=[[default_catalog, default_database, upsert_src3, filter=[]]], fields=[a, b, c])"
}, {
"id" : 2,
"type" : "stream-exec-exchange_1",
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "HASH",
"keys" : [ 0 ]
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
"description" : "Exchange(distribution=[hash[a]])"
}, {
"id" : 3,
"type" : "stream-exec-changelog-normalize_1",
"configuration" : {
"table.exec.mini-batch.enabled" : "false",
"table.exec.mini-batch.size" : "-1"
},
"uniqueKeys" : [ 0 ],
"generateUpdateBefore" : false,
"state" : [ {
"index" : 0,
"ttl" : "0 ms",
"name" : "changelogNormalizeState"
} ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
"description" : "ChangelogNormalize(key=[a], condition=[(b > 100)])",
"filterCondition" : {
"kind" : "CALL",
"syntax" : "BINARY",
"internalName" : "$>$1",
"operands" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 1,
"type" : "BIGINT"
}, {
"kind" : "LITERAL",
"value" : 100,
"type" : "INT NOT NULL"
} ],
"type" : "BOOLEAN"
}
}, {
"id" : 4,
"type" : "stream-exec-sink_2",
"configuration" : {
"table.exec.sink.keyed-shuffle" : "AUTO",
"table.exec.sink.not-null-enforcer" : "ERROR",
"table.exec.sink.rowtime-inserter" : "ENABLED",
"table.exec.sink.type-length-enforcer" : "IGNORE",
"table.exec.sink.upsert-materialize" : "AUTO"
},
"dynamicTableSink" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`sink_with_pk`"
}
},
"inputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "DELETE" ],
"inputUpsertKey" : [ 0 ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
"description" : "Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])"
} ],
"edges" : [ {
"source" : 1,
"target" : 2,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 2,
"target" : 3,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 3,
"target" : 4,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
} ]
}