Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
Expand All @@ -36,6 +37,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
Expand All @@ -44,8 +46,11 @@
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -133,35 +138,157 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
throws TableNotExistException {
try {
Table table = icebergCatalog.loadTable(toIcebergTableIdentifier(tablePath));
UpdateProperties updateProperties = table.updateProperties();
for (TableChange tableChange : tableChanges) {
if (tableChange instanceof TableChange.SetOption) {
TableChange.SetOption option = (TableChange.SetOption) tableChange;
checkArgument(
!RESERVED_PROPERTIES.contains(option.getKey()),
"Cannot set table property '%s'",
option.getKey());
updateProperties.set(
convertFlussPropertyKeyToIceberg(option.getKey()), option.getValue());
} else if (tableChange instanceof TableChange.ResetOption) {
TableChange.ResetOption option = (TableChange.ResetOption) tableChange;
checkArgument(
!RESERVED_PROPERTIES.contains(option.getKey()),
"Cannot reset table property '%s'",
option.getKey());
updateProperties.remove(convertFlussPropertyKeyToIceberg(option.getKey()));

List<TableChange> schemaChanges = new ArrayList<>();
List<TableChange> propertyChanges = new ArrayList<>();
for (TableChange change : tableChanges) {
if (change instanceof TableChange.SchemaChange) {
schemaChanges.add(change);
} else {
throw new UnsupportedOperationException(
"Unsupported table change: " + tableChange.getClass());
propertyChanges.add(change);
}
}

updateProperties.commit();
if (!schemaChanges.isEmpty()) {
applySchemaChanges(table, schemaChanges, context);
}

if (!propertyChanges.isEmpty()) {
applyPropertyChanges(table, propertyChanges);
}
} catch (NoSuchTableException e) {
throw new TableNotExistException("Table " + tablePath + " does not exist.");
}
}

private void applyPropertyChanges(Table table, List<TableChange> propertyChanges) {
UpdateProperties updateProperties = table.updateProperties();
for (TableChange tableChange : propertyChanges) {
if (tableChange instanceof TableChange.SetOption) {
TableChange.SetOption option = (TableChange.SetOption) tableChange;
checkArgument(
!RESERVED_PROPERTIES.contains(option.getKey()),
"Cannot set table property '%s'",
option.getKey());
updateProperties.set(
convertFlussPropertyKeyToIceberg(option.getKey()), option.getValue());
} else if (tableChange instanceof TableChange.ResetOption) {
TableChange.ResetOption option = (TableChange.ResetOption) tableChange;
checkArgument(
!RESERVED_PROPERTIES.contains(option.getKey()),
"Cannot reset table property '%s'",
option.getKey());
updateProperties.remove(convertFlussPropertyKeyToIceberg(option.getKey()));
} else {
throw new UnsupportedOperationException(
"Unsupported table change: " + tableChange.getClass());
}
}
updateProperties.commit();
}

private void applySchemaChanges(Table table, List<TableChange> schemaChanges, Context context) {
Schema currentIcebergSchema = table.schema();

// Check schema compatibility to handle crash recovery idempotency.
boolean skipAddColumns;
if (isIcebergSchemaCompatible(currentIcebergSchema, context.getCurrentTable())) {
// Iceberg schema matches current Fluss schema, apply all changes.
skipAddColumns = false;
} else if (isIcebergSchemaCompatible(currentIcebergSchema, context.getExpectedTable())) {
// Iceberg schema already matches expected (post-alter) schema,
// skip AddColumn changes since they were already applied.
skipAddColumns = true;
} else {
throw new InvalidAlterTableException(
String.format(
"Iceberg schema is not compatible with Fluss schema: "
+ "Iceberg schema: %s, Fluss schema: %s. "
+ "therefore you need to add the diff columns all at once, "
+ "rather than applying other table changes: %s.",
currentIcebergSchema,
context.getCurrentTable().getSchema(),
schemaChanges));
}

UpdateSchema updateSchema = table.updateSchema();
String firstSystemColumnName = SYSTEM_COLUMNS.keySet().iterator().next();
boolean hasChanges = false;

for (TableChange tableChange : schemaChanges) {
if (tableChange instanceof TableChange.AddColumn) {
if (skipAddColumns) {
continue;
}
TableChange.AddColumn addColumn = (TableChange.AddColumn) tableChange;

if (!(addColumn.getPosition() instanceof TableChange.Last)) {
throw new UnsupportedOperationException(
"Only support to add column at last for iceberg table.");
}

org.apache.fluss.types.DataType flussDataType = addColumn.getDataType();
if (!flussDataType.isNullable()) {
throw new UnsupportedOperationException(
"Only support to add nullable column for iceberg table.");
}

Type icebergType = flussDataType.accept(new FlussDataTypeToIcebergDataType());
updateSchema.addColumn(addColumn.getName(), icebergType, addColumn.getComment());
updateSchema.moveBefore(addColumn.getName(), firstSystemColumnName);
hasChanges = true;
} else {
throw new UnsupportedOperationException(
"Unsupported table change: " + tableChange.getClass());
}
}

if (hasChanges) {
updateSchema.commit();
}
}

/**
* Checks whether the current Iceberg schema is compatible with the given Fluss table
* descriptor. Compatibility means the user columns and system columns match in name, type, and
* nullability (ignoring Iceberg-assigned field IDs).
*
* <p>Iceberg reassigns field IDs during table creation, so the IDs in the stored schema differ
* from those we generate via {@link #convertToIcebergSchema}. We normalize both schemas to the
* same fresh ID space using {@link TypeUtil#assignIncreasingFreshIds} before comparing, so that
* {@link Type#equals} works correctly for all types including complex ones (Map, List, Struct).
*/
@VisibleForTesting
boolean isIcebergSchemaCompatible(
Schema icebergSchema, @Nullable TableDescriptor flussTableDescriptor) {
if (flussTableDescriptor == null) {
return false;
}
// isPkTable=false: identifier fields don't affect the comparison
Schema expectedSchema = convertToIcebergSchema(flussTableDescriptor, false);

Schema normalizedIceberg = TypeUtil.assignIncreasingFreshIds(icebergSchema);
Schema normalizedExpected = TypeUtil.assignIncreasingFreshIds(expectedSchema);

List<Types.NestedField> currentFields = normalizedIceberg.columns();
List<Types.NestedField> expectedFields = normalizedExpected.columns();

if (currentFields.size() != expectedFields.size()) {
return false;
}

for (int i = 0; i < currentFields.size(); i++) {
Types.NestedField current = currentFields.get(i);
Types.NestedField expected = expectedFields.get(i);
if (!current.name().equals(expected.name())
|| !current.type().equals(expected.type())
|| current.isOptional() != expected.isOptional()) {
return false;
}
}
return true;
}

private TableIdentifier toIcebergTableIdentifier(TablePath tablePath) {
return TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName());
}
Expand Down
Loading