Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.DefaultIndex;
import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.test.util.SQLJobSubmission;
Expand All @@ -46,7 +47,9 @@ public class CreateTableAsITCase extends SqlITCaseBase {
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("user_name")),
Collections.singletonList(
DefaultIndex.newIndex("idx", Collections.singletonList("user_name"))));
DefaultIndex.newIndex("idx", Collections.singletonList("user_name"))),
ImmutableColumnsConstraint.immutableColumns(
"imt", Collections.singletonList("user_name")));

private static final DebeziumJsonDeserializationSchema DESERIALIZATION_SCHEMA =
createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.DefaultIndex;
import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.test.util.SQLJobSubmission;
Expand Down Expand Up @@ -53,7 +54,9 @@ public class PlannerScalaFreeITCase extends SqlITCaseBase {
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("user_name")),
Collections.singletonList(
DefaultIndex.newIndex("idx", Collections.singletonList("user_name"))));
DefaultIndex.newIndex("idx", Collections.singletonList("user_name"))),
ImmutableColumnsConstraint.immutableColumns(
"imt", Collections.singletonList("user_name")));

private static final DebeziumJsonDeserializationSchema DESERIALIZATION_SCHEMA =
createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.DefaultIndex;
import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;

Expand All @@ -47,7 +48,9 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("user_name")),
Collections.singletonList(
DefaultIndex.newIndex("idx", Collections.singletonList("user_name"))));
DefaultIndex.newIndex("idx", Collections.singletonList("user_name"))),
ImmutableColumnsConstraint.immutableColumns(
"imt", Collections.singletonList("user_name")));

private static final DebeziumJsonDeserializationSchema USER_ORDER_DESERIALIZATION_SCHEMA =
createDebeziumDeserializationSchema(USER_ORDER_SCHEMA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ def java_class(cls):
@classmethod
def excluded_methods(cls):
# getIndexes are not needed in Python API as they are used internally
return {'getIndexes'}
# TODO FLINK-39319 add immutable column constraint in Python API
return {'getIndexes', 'getImmutableColumns', 'getImmutableColumnIndexes'}


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1984,7 +1984,8 @@ public ResolvedCatalogView resolveCatalogView(CatalogView view) {
.collect(Collectors.toList()),
resolvedSchema.getWatermarkSpecs(),
resolvedSchema.getPrimaryKey().orElse(null),
resolvedSchema.getIndexes());
resolvedSchema.getIndexes(),
resolvedSchema.getImmutableColumns().orElse(null));
return new ResolvedCatalogView(
// pass a view that has the query parsed and
// validated already
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Schema.UnresolvedComputedColumn;
import org.apache.flink.table.api.Schema.UnresolvedImmutableColumns;
import org.apache.flink.table.api.Schema.UnresolvedIndex;
import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn;
import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
Expand Down Expand Up @@ -92,7 +93,12 @@ public ResolvedSchema resolve(Schema schema) {

final List<Index> indexes = resolveIndexes(schema.getIndexes(), columnsWithRowtime);

return new ResolvedSchema(columnsWithRowtime, watermarkSpecs, primaryKey, indexes);
final ImmutableColumnsConstraint immutableColumns =
resolveImmutableColumns(
schema.getImmutableColumns().orElse(null), primaryKey, columnsWithRowtime);

return new ResolvedSchema(
columnsWithRowtime, watermarkSpecs, primaryKey, indexes, immutableColumns);
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -459,6 +465,74 @@ private void validateSingleIndex(Index index, Map<String, Column> columnsByNameL
}
}

private @Nullable ImmutableColumnsConstraint resolveImmutableColumns(
@Nullable UnresolvedImmutableColumns unresolvedImmutableColumns,
@Nullable UniqueConstraint primaryKey,
List<Column> columns) {
if (unresolvedImmutableColumns == null) {
return null;
}

final ImmutableColumnsConstraint immutableColumns =
ImmutableColumnsConstraint.immutableColumns(
unresolvedImmutableColumns.getConstraintName(),
unresolvedImmutableColumns.getColumnNames());

validateImmutableColumns(immutableColumns, primaryKey, columns);

return immutableColumns;
}

private void validateImmutableColumns(
ImmutableColumnsConstraint immutableColumns,
@Nullable UniqueConstraint primaryKey,
List<Column> columns) {
if (primaryKey == null) {
throw new ValidationException(
String.format(
"Invalid immutable constraint '%s'. "
+ "An immutable constraint must be defined on the table that contains primary key.",
immutableColumns.getName()));
}

final Map<String, Column> columnsByNameLookup =
columns.stream().collect(Collectors.toMap(Column::getName, Function.identity()));

final Set<String> duplicateColumns =
immutableColumns.getColumns().stream()
.filter(
name ->
Collections.frequency(immutableColumns.getColumns(), name)
> 1)
.collect(Collectors.toSet());

if (!duplicateColumns.isEmpty()) {
throw new ValidationException(
String.format(
"Invalid immutable constraint '%s'. "
+ "An immutable constraint must not contain duplicate columns. "
+ "Found: %s",
immutableColumns.getName(), duplicateColumns));
}

for (String columnName : immutableColumns.getColumns()) {
Column column = columnsByNameLookup.get(columnName);
if (column == null) {
throw new ValidationException(
String.format(
"Invalid immutable constraint '%s'. Column '%s' does not exist.",
immutableColumns.getName(), columnName));
}

if (!column.isPhysical()) {
throw new ValidationException(
String.format(
"Invalid immutable constraint '%s'. Column '%s' is not a physical column.",
immutableColumns.getName(), columnName));
}
}
}

private ResolvedExpression resolveExpression(
List<Column> columns, Expression expression, @Nullable DataType outputDataType) {
final LocalReferenceExpression[] localRefs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
Expand Down Expand Up @@ -73,13 +74,24 @@ class ShowCreateUtilTest {
Column.metadata("mt_column", DataTypes.STRING(), null, true)),
List.of(),
UniqueConstraint.primaryKey("pk", List.of("id")),
List.of());
List.of(),
null);

private static final ResolvedSchema TWO_COLUMNS_SCHEMA =
ResolvedSchema.of(
Column.physical("id", DataTypes.INT()),
Column.physical("name", DataTypes.STRING()));

private static final ResolvedSchema TWO_COLUMNS_SCHEMA_WITH_PRIMARY_KEY_AND_IMMUTABLE_COLS =
new ResolvedSchema(
List.of(
Column.physical("id", DataTypes.INT()),
Column.physical("name", DataTypes.STRING())),
List.of(),
UniqueConstraint.primaryKey("pk", List.of("id")),
List.of(),
ImmutableColumnsConstraint.immutableColumns("imt", List.of("name")));

@ParameterizedTest(name = "{index}: {2}")
@MethodSource("argsForShowCreateTable")
void showCreateTable(
Expand Down Expand Up @@ -213,6 +225,23 @@ private static Collection<Arguments> argsForShowCreateTable() {
+ "COMMENT 'Table comment'\n"
+ "DISTRIBUTED BY RANGE(`1`, `10`) INTO 2 BUCKETS\n");

addTemporaryAndPermanent(
argList,
createResolvedTable(
TWO_COLUMNS_SCHEMA_WITH_PRIMARY_KEY_AND_IMMUTABLE_COLS,
Collections.emptyMap(),
Collections.emptyList(),
TableDistribution.of(
TableDistribution.Kind.RANGE, 2, Arrays.asList("1", "10")),
"Table comment"),
"CREATE %sTABLE `catalogName`.`dbName`.`tableName` (\n"
+ " `id` INT,\n"
+ " `name` VARCHAR(2147483647),\n"
+ " CONSTRAINT `pk` PRIMARY KEY (`id`) NOT ENFORCED\n"
+ ")\n"
+ "COMMENT 'Table comment'\n"
+ "DISTRIBUTED BY RANGE(`1`, `10`) INTO 2 BUCKETS\n");

final Map<String, String> options = new HashMap<>();
options.put("option_key_a", "option_value_a");
options.put("option_key_b", "option_value_b");
Expand Down Expand Up @@ -315,6 +344,26 @@ private static Collection<Arguments> argsForShowCreateMaterializedTable() {
+ "REFRESH_MODE = CONTINUOUS\n"
+ "AS SELECT 1\n"));

argList.add(
Arguments.of(
createResolvedMaterialized(
TWO_COLUMNS_SCHEMA_WITH_PRIMARY_KEY_AND_IMMUTABLE_COLS,
null,
List.of(),
null,
IntervalFreshness.ofMinute("1"),
RefreshMode.CONTINUOUS,
"SELECT 1",
"SELECT 1"),
"CREATE MATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName` (\n"
+ " `id` INT,\n"
+ " `name` VARCHAR(2147483647),\n"
+ " CONSTRAINT `pk` PRIMARY KEY (`id`) NOT ENFORCED\n"
+ ")\n"
+ "FRESHNESS = INTERVAL '1' MINUTE\n"
+ "REFRESH_MODE = CONTINUOUS\n"
+ "AS SELECT 1\n"));

argList.add(
Arguments.of(
createResolvedMaterialized(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class CatalogBaseTableResolutionTest {
.watermark("ts", WATERMARK_SQL)
.primaryKeyNamed("primary_constraint", "id")
.indexNamed("idx", Collections.singletonList("id"))
.immutableColumnsNamed("imt", Collections.singletonList("region"))
.build();

private static final Schema MATERIALIZED_TABLE_SCHEMA =
Expand All @@ -101,6 +102,7 @@ class CatalogBaseTableResolutionTest {
.withComment("") // empty column comment
.primaryKeyNamed("primary_constraint", "id")
.indexNamed("idx", Collections.singletonList("id"))
.immutableColumnsNamed("imt", Collections.singletonList("region"))
.build();

private static final TableSchema LEGACY_TABLE_SCHEMA =
Expand Down Expand Up @@ -138,7 +140,9 @@ class CatalogBaseTableResolutionTest {
UniqueConstraint.primaryKey(
"primary_constraint", Collections.singletonList("id")),
Collections.singletonList(
DefaultIndex.newIndex("idx", Collections.singletonList("id"))));
DefaultIndex.newIndex("idx", Collections.singletonList("id"))),
ImmutableColumnsConstraint.immutableColumns(
"imt", Collections.singletonList("region")));

private static final ResolvedSchema RESOLVED_MATERIALIZED_TABLE_SCHEMA =
new ResolvedSchema(
Expand All @@ -152,7 +156,9 @@ class CatalogBaseTableResolutionTest {
UniqueConstraint.primaryKey(
"primary_constraint", Collections.singletonList("id")),
Collections.singletonList(
DefaultIndex.newIndex("idx", Collections.singletonList("id"))));
DefaultIndex.newIndex("idx", Collections.singletonList("id"))),
ImmutableColumnsConstraint.immutableColumns(
"imt", Collections.singletonList("region")));

private static final ContinuousRefreshHandler CONTINUOUS_REFRESH_HANDLER =
new ContinuousRefreshHandler(
Expand All @@ -171,7 +177,8 @@ class CatalogBaseTableResolutionTest {
Column.physical("county", DataTypes.VARCHAR(200))),
Collections.emptyList(),
null,
Collections.emptyList());
Collections.emptyList(),
null);

@Test
void testCatalogTableResolution() {
Expand Down Expand Up @@ -397,6 +404,8 @@ private static Map<String, String> catalogTableAsProperties() {
properties.put("schema.primary-key.columns", "id");
properties.put("schema.index.0.name", "idx");
properties.put("schema.index.0.columns", "id");
properties.put("schema.immutable.name", "imt");
properties.put("schema.immutable.columns", "region");
properties.put("partition.keys.0.name", "region");
properties.put("partition.keys.1.name", "county");
properties.put("version", "12");
Expand Down Expand Up @@ -424,6 +433,8 @@ private static Map<String, String> catalogMaterializedTableAsProperties() throws
properties.put("schema.primary-key.columns", "id");
properties.put("schema.index.0.name", "idx");
properties.put("schema.index.0.columns", "id");
properties.put("schema.immutable.name", "imt");
properties.put("schema.immutable.columns", "region");
properties.put("freshness-interval", "30");
properties.put("freshness-unit", "SECOND");
properties.put("logical-refresh-mode", "CONTINUOUS");
Expand Down
Loading