diff --git a/docs/static/rest-catalog-open-api.yaml b/docs/static/rest-catalog-open-api.yaml index 237d9a987e23..a4bda071f921 100644 --- a/docs/static/rest-catalog-open-api.yaml +++ b/docs/static/rest-catalog-open-api.yaml @@ -669,6 +669,42 @@ paths: $ref: '#/components/examples/TagNotExistError' "500": $ref: '#/components/responses/ServerErrorResponse' + /v1/{prefix}/databases/{database}/tables/{table}/rollback-schema: + post: + tags: + - table + summary: Rollback table schema + operationId: rollbackSchema + parameters: + - name: prefix + in: path + required: true + schema: + type: string + - name: database + in: path + required: true + schema: + type: string + - name: table + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/RollbackSchemaRequest' + responses: + "200": + description: Success, no content + "401": + $ref: '#/components/responses/UnauthorizedErrorResponse' + "404": + $ref: '#/components/responses/TableNotExistErrorResponse' + "500": + $ref: '#/components/responses/ServerErrorResponse' /v1/{prefix}/databases/{database}/tables/{table}/token: get: tags: @@ -2936,6 +2972,14 @@ components: type: integer format: int64 nullable: true + RollbackSchemaRequest: + type: object + required: + - schemaId + properties: + schemaId: + type: integer + format: int64 Instant: anyOf: - $ref: '#/components/schemas/SnapshotInstant' diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java index 1e549a116c32..a939c5ce169a 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java @@ -51,6 +51,7 @@ import org.apache.paimon.rest.requests.RegisterTableRequest; import org.apache.paimon.rest.requests.RenameTableRequest; import org.apache.paimon.rest.requests.ResetConsumerRequest; +import org.apache.paimon.rest.requests.RollbackSchemaRequest; import org.apache.paimon.rest.requests.RollbackTableRequest; import org.apache.paimon.rest.responses.AlterDatabaseResponse; import org.apache.paimon.rest.responses.AuthTableQueryResponse; @@ -711,6 +712,24 @@ public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fr restAuthFunction); } + /** + * Rollback schema for table. + * + * @param identifier database name and table name. + * @param schemaId the target schema version to rollback to + * @throws NoSuchResourceException Exception thrown on HTTP 404 means the table not exists + * @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for + * this table + */ + public void rollbackSchema(Identifier identifier, long schemaId) { + RollbackSchemaRequest request = new RollbackSchemaRequest(schemaId); + client.post( + resourcePaths.rollbackSchemaTable( + identifier.getDatabaseName(), identifier.getObjectName()), + request, + restAuthFunction); + } + /** * Create table. * diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java index baf0b8a4af2f..466ac975b464 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java @@ -120,6 +120,17 @@ public String rollbackTable(String databaseName, String objectName) { ROLLBACK); } + public String rollbackSchemaTable(String databaseName, String objectName) { + return SLASH.join( + V1, + prefix, + DATABASES, + encodeString(databaseName), + TABLES, + encodeString(objectName), + "rollback-schema"); + } + public String registerTable(String databaseName) { return SLASH.join(V1, prefix, DATABASES, encodeString(databaseName), REGISTER); } diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/requests/RollbackSchemaRequest.java b/paimon-api/src/main/java/org/apache/paimon/rest/requests/RollbackSchemaRequest.java new file mode 100644 index 000000000000..f35d22ca80cf --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/requests/RollbackSchemaRequest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.requests; + +import org.apache.paimon.rest.RESTRequest; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** Request for rollback table schema. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class RollbackSchemaRequest implements RESTRequest { + + private static final String FIELD_SCHEMA_ID = "schemaId"; + + @JsonProperty(FIELD_SCHEMA_ID) + private final long schemaId; + + @JsonCreator + public RollbackSchemaRequest(@JsonProperty(FIELD_SCHEMA_ID) long schemaId) { + this.schemaId = schemaId; + } + + @JsonGetter(FIELD_SCHEMA_ID) + public long getSchemaId() { + return schemaId; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 3947a950e59a..8924dc6ae166 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -809,6 +809,22 @@ default void rollbackTo(Identifier identifier, Instant instant) void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot) throws Catalog.TableNotExistException; + /** + * Rollback table schema to a specific schema version. All schema versions greater than the + * target will be deleted. This operation will fail if any snapshot, tag, or changelog + * references a schema version greater than the target. + * + * @param identifier path of the table + * @param schemaId the target schema version to rollback to + * @throws Catalog.TableNotExistException if the table does not exist + * @throws UnsupportedOperationException if the catalog does not {@link + * #supportsVersionManagement()} + */ + default void rollbackSchema(Identifier identifier, long schemaId) + throws Catalog.TableNotExistException { + throw new UnsupportedOperationException(); + } + /** * Create a new branch for this table. By default, an empty branch will be created using the * latest schema. If you provide {@code #fromTag}, a branch will be created from the tag and the diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 81558ac68a48..4a8eeabe50ed 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -222,6 +222,12 @@ public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fr wrapped.rollbackTo(identifier, instant, fromSnapshot); } + @Override + public void rollbackSchema(Identifier identifier, long schemaId) + throws Catalog.TableNotExistException { + wrapped.rollbackSchema(identifier, schemaId); + } + @Override public void createBranch(Identifier identifier, String branch, @Nullable String fromTag) throws TableNotExistException, BranchAlreadyExistException, TagNotExistException { diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 7b053d120c90..dbb2a8fd48d3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -442,6 +442,18 @@ public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fr } } + @Override + public void rollbackSchema(Identifier identifier, long schemaId) + throws Catalog.TableNotExistException { + try { + api.rollbackSchema(identifier, schemaId); + } catch (NoSuchResourceException e) { + throw new TableNotExistException(identifier); + } catch (ForbiddenException e) { + throw new TableNoPermissionException(identifier, e); + } + } + private TableMetadata loadTableMetadata(Identifier identifier) throws TableNotExistException { // if the table is system table, we need to load table metadata from the system table's data // table diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index a896d3f63f24..dbc605b96c84 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -45,10 +45,12 @@ import org.apache.paimon.types.ReassignFieldId; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.StringUtils; +import org.apache.paimon.utils.TagManager; import org.apache.paimon.shade.guava30.com.google.common.collect.FluentIterable; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; @@ -66,10 +68,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; @@ -1099,6 +1103,56 @@ public void deleteSchema(long schemaId) { fileIO.deleteQuietly(toSchemaPath(schemaId)); } + /** + * Rollback to a specific schema version. All schema versions greater than the target will be + * deleted. This operation will fail if any snapshot, tag, or changelog references a schema + * version greater than the target. + * + * @param targetSchemaId the schema version to rollback to. + * @param snapshotManager the snapshot manager to check snapshot references. + * @param tagManager the tag manager to check tag references. + * @param changelogManager the changelog manager to check changelog references. + */ + public void rollbackTo( + long targetSchemaId, + SnapshotManager snapshotManager, + TagManager tagManager, + ChangelogManager changelogManager) + throws IOException { + checkArgument(schemaExists(targetSchemaId), "Schema %s does not exist.", targetSchemaId); + + // Collect all schemaIds referenced by snapshots, tags, and changelogs + Set usedSchemaIds = new HashSet<>(); + + snapshotManager.pickOrLatest( + snapshot -> { + usedSchemaIds.add(snapshot.schemaId()); + return false; + }); + tagManager.taggedSnapshots().forEach(s -> usedSchemaIds.add(s.schemaId())); + changelogManager.changelogs().forEachRemaining(c -> usedSchemaIds.add(c.schemaId())); + + // Check if any referenced schema is newer than the target + Optional conflict = + usedSchemaIds.stream().filter(id -> id > targetSchemaId).min(Long::compareTo); + if (conflict.isPresent()) { + throw new RuntimeException( + String.format( + "Cannot rollback to schema %d, schema %d is still referenced by snapshots/tags/changelogs.", + targetSchemaId, conflict.get())); + } + + // Delete all schemas newer than the target + List toBeDeleted = + listAllIds().stream() + .filter(id -> id > targetSchemaId) + .collect(Collectors.toList()); + toBeDeleted.sort((o1, o2) -> Long.compare(o2, o1)); + for (Long id : toBeDeleted) { + fileIO.delete(toSchemaPath(id), false); + } + } + public static void checkAlterTableOption( Map options, String key, @Nullable String oldValue, String newValue) { if (CoreOptions.IMMUTABLE_OPTIONS.contains(key)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 014df441ac58..9e18714ab2fc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -72,6 +72,8 @@ import javax.annotation.Nullable; import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; import java.time.Duration; import java.util.HashMap; import java.util.List; @@ -558,6 +560,20 @@ public void rollbackTo(String tagName) { } } + @Override + public void rollbackSchema(long schemaId) { + try { + schemaManager() + .rollbackTo( + schemaId, + snapshotManager(), + tagManager(), + new ChangelogManager(fileIO(), location(), null)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + public Snapshot findSnapshot(long fromSnapshotId) throws SnapshotNotExistException { SnapshotManager snapshotManager = snapshotManager(); Snapshot snapshot = null; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java index b92c4c163066..9eaa4f8d2e9f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java @@ -255,6 +255,11 @@ public void rollbackTo(String tagName) { wrapped.rollbackTo(tagName); } + @Override + public void rollbackSchema(long schemaId) { + wrapped.rollbackSchema(schemaId); + } + @Override public void createBranch(String branchName) { wrapped.createBranch(branchName); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java index f1fbc74c1231..a2b01fb7f55b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java @@ -374,6 +374,11 @@ default void rollbackTo(String tagName) { throw new UnsupportedOperationException(); } + @Override + default void rollbackSchema(long schemaId) { + throw new UnsupportedOperationException(); + } + @Override default void createBranch(String branchName) { throw new UnsupportedOperationException(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index c5b8deeca91c..55f14e7025f1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -217,6 +217,14 @@ default void rollbackTo(String tagName) { this.getClass().getSimpleName())); } + @Override + default void rollbackSchema(long schemaId) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support rollbackSchema.", + this.getClass().getSimpleName())); + } + @Override default void createBranch(String branchName) { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 8d49d7206e7e..53e99ea45f80 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -157,6 +157,10 @@ default void deleteTags(String tagStr) { @Experimental void rollbackTo(String tagName); + /** Rollback table's schema to a specific schema version. */ + @Experimental + void rollbackSchema(long schemaId); + /** Create an empty branch. */ @Experimental void createBranch(String branchName); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 2fb624c71e8e..414ef3f5b738 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -272,11 +272,12 @@ public boolean earliestFileNotExists() { } for (long snapshotId = latestId; snapshotId >= earliestId; snapshotId--) { - if (snapshotExists(snapshotId)) { - Snapshot snapshot = snapshot(snapshotId); + try { + Snapshot snapshot = tryGetSnapshot(snapshotId); if (predicate.test(snapshot)) { return snapshot.id(); } + } catch (FileNotFoundException ignored) { } } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 81cb097f5adb..bd9036a4b01f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -63,6 +63,7 @@ import org.apache.paimon.rest.requests.MarkDonePartitionsRequest; import org.apache.paimon.rest.requests.RenameTableRequest; import org.apache.paimon.rest.requests.ResetConsumerRequest; +import org.apache.paimon.rest.requests.RollbackSchemaRequest; import org.apache.paimon.rest.requests.RollbackTableRequest; import org.apache.paimon.rest.responses.AlterDatabaseResponse; import org.apache.paimon.rest.responses.AuthTableQueryResponse; @@ -104,6 +105,7 @@ import org.apache.paimon.table.object.ObjectTable; import org.apache.paimon.tag.Tag; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.Pair; @@ -417,6 +419,10 @@ && isTableByIdRequest(request.getPath())) { resources.length == 4 && ResourcePaths.TABLES.equals(resources[1]) && ResourcePaths.ROLLBACK.equals(resources[3]); + boolean isRollbackSchema = + resources.length == 4 + && ResourcePaths.TABLES.equals(resources[1]) + && "rollback-schema".equals(resources[3]); boolean isPartitions = resources.length == 4 && ResourcePaths.TABLES.equals(resources[1]) @@ -538,6 +544,8 @@ && isTableByIdRequest(request.getPath())) { .getTagName(); return rollbackTableByTagNameHandle(identifier, tagName); } + } else if (isRollbackSchema) { + return rollbackSchemaHandle(identifier, restAuthParameter.data()); } else if (isTable) { return tableHandle( restAuthParameter.method(), @@ -1009,6 +1017,30 @@ private MockResponse rollbackTableByTagNameHandle(Identifier identifier, String new ErrorResponse(ErrorResponse.RESOURCE_TYPE_TAG, "" + tagName, "", 404), 404); } + private MockResponse rollbackSchemaHandle(Identifier identifier, String data) throws Exception { + RollbackSchemaRequest requestBody = RESTApi.fromJson(data, RollbackSchemaRequest.class); + if (noPermissionTables.contains(identifier.getFullName())) { + throw new Catalog.TableNoPermissionException(identifier); + } + if (!tableMetadataStore.containsKey(identifier.getFullName())) { + throw new Catalog.TableNotExistException(identifier); + } + FileStoreTable table = getFileTable(identifier); + long schemaId = requestBody.getSchemaId(); + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); + try { + schemaManager.rollbackTo( + schemaId, + table.snapshotManager(), + table.tagManager(), + new ChangelogManager(table.fileIO(), table.location(), null)); + } catch (Exception e) { + return mockResponse(new ErrorResponse(null, null, e.getMessage(), 500), 500); + } + + return new MockResponse().setResponseCode(200); + } + private void cleanSnapshot(Identifier identifier, Long snapshotId, Long latestSnapshotId) throws IOException { if (latestSnapshotId > snapshotId) { diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 35f2792f7bdb..d71406a799b9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -1933,6 +1933,71 @@ public void testTableRollback() throws Exception { assertThat(table.latestSnapshot().get().id()).isEqualTo(2); } + @Test + public void testRollbackSchema() throws Exception { + Identifier identifier = Identifier.create("test_rollback_schema", "table_for_schema"); + createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1")); + + // get initial schema id + FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); + long firstSchemaId = schemaManager.latest().get().id(); + + // evolve schema + catalog.alterTable(identifier, SchemaChange.setOption("aa", "bb"), false); + long secondSchemaId = schemaManager.latest().get().id(); + assertThat(secondSchemaId).isEqualTo(firstSchemaId + 1); + + // rollback schema to first version + catalog.rollbackSchema(identifier, firstSchemaId); + assertThat(schemaManager.latest().get().id()).isEqualTo(firstSchemaId); + assertThat(schemaManager.schemaExists(secondSchemaId)).isFalse(); + + // rollback to non-existent schema should fail + assertThatThrownBy(() -> catalog.rollbackSchema(identifier, 999)) + .isInstanceOf(Exception.class); + } + + @Test + public void testRollbackSchemaFailedWithSnapshotReference() throws Exception { + Identifier identifier = + Identifier.create("test_rollback_schema_fail", "table_for_schema_fail"); + createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1")); + + FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); + long firstSchemaId = schemaManager.latest().get().id(); + + // write data to create a snapshot referencing firstSchemaId + StreamTableWrite write = table.newWrite("commitUser"); + StreamTableCommit commit = table.newCommit("commitUser"); + write.write(GenericRow.of(1)); + commit.commit(0, write.prepareCommit(false, 0)); + write.close(); + commit.close(); + + // evolve schema + catalog.alterTable(identifier, SchemaChange.setOption("aa", "bb"), false); + long secondSchemaId = schemaManager.latest().get().id(); + + // write data to create a snapshot referencing secondSchemaId + table = (FileStoreTable) catalog.getTable(identifier); + write = table.newWrite("commitUser"); + commit = table.newCommit("commitUser"); + write.write(GenericRow.of(2)); + commit.commit(1, write.prepareCommit(false, 1)); + write.close(); + commit.close(); + + // rollback should fail because snapshot references secondSchemaId + assertThatThrownBy(() -> catalog.rollbackSchema(identifier, firstSchemaId)) + .hasMessageContaining("Cannot rollback to schema " + firstSchemaId) + .hasMessageContaining( + "schema " + + secondSchemaId + + " is still referenced by snapshots/tags/changelogs"); + } + @Test public void testDataTokenExpired() throws Exception { this.catalog = newRestCatalogWithDataToken(); diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index bc433ff18ea0..31496f8bb8f9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -40,7 +40,10 @@ import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FailingFileIO; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; @@ -791,4 +794,104 @@ public void testAlterDeletionVectorsMode() throws Exception { table = FileStoreTableFactory.create(LocalFileIO.create(), tableRoot); assertThat(table.options().get(DELETION_VECTORS_ENABLED.key())).isEqualTo("true"); } + + @Test + public void testRollbackSchemaSuccess() throws Exception { + SchemaManager manager = new SchemaManager(LocalFileIO.create(), path); + manager.createTable(schema); + long firstSchemaId = manager.latest().get().id(); + + manager.commitChanges(SchemaChange.setOption("aa", "bb")); + long secondSchemaId = manager.latest().get().id(); + assertThat(secondSchemaId).isEqualTo(firstSchemaId + 1); + + manager.commitChanges(SchemaChange.setOption("cc", "dd")); + long thirdSchemaId = manager.latest().get().id(); + assertThat(thirdSchemaId).isEqualTo(firstSchemaId + 2); + + // rollback to first schema + SnapshotManager snapshotManager = + new SnapshotManager(LocalFileIO.create(), path, null, null, null); + TagManager tagManager = new TagManager(LocalFileIO.create(), path); + ChangelogManager changelogManager = new ChangelogManager(LocalFileIO.create(), path, null); + manager.rollbackTo(firstSchemaId, snapshotManager, tagManager, changelogManager); + + assertThat(manager.latest().get().id()).isEqualTo(firstSchemaId); + assertThat(manager.schemaExists(secondSchemaId)).isFalse(); + assertThat(manager.schemaExists(thirdSchemaId)).isFalse(); + } + + @Test + public void testRollbackSchemaFailedDueToSnapshotReference() throws Exception { + Schema appendOnlySchema = + new Schema( + rowType.getFields(), + Collections.emptyList(), + Collections.emptyList(), + options, + ""); + Path tableRoot = new Path(tempDir.toString(), "table"); + SchemaManager manager = new SchemaManager(LocalFileIO.create(), tableRoot); + manager.createTable(appendOnlySchema); + long firstSchemaId = manager.latest().get().id(); + + // write data to create a snapshot referencing firstSchemaId + FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tableRoot); + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = + table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io")); + TableCommitImpl commit = table.newCommit(commitUser); + write.write(GenericRow.of(1, 10L, BinaryString.fromString("apple"))); + commit.commit(1, write.prepareCommit(false, 1)); + write.close(); + commit.close(); + + // evolve schema + manager.commitChanges(SchemaChange.setOption("aa", "bb")); + long secondSchemaId = manager.latest().get().id(); + + // write data to create a snapshot referencing secondSchemaId + table = FileStoreTableFactory.create(LocalFileIO.create(), tableRoot); + write = table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io")); + commit = table.newCommit(commitUser); + write.write(GenericRow.of(2, 20L, BinaryString.fromString("banana"))); + commit.commit(2, write.prepareCommit(false, 2)); + write.close(); + commit.close(); + + // rollback to first schema should fail because snapshot references secondSchemaId + SnapshotManager snapshotManager = + new SnapshotManager(LocalFileIO.create(), tableRoot, null, null, null); + TagManager tagManager = new TagManager(LocalFileIO.create(), tableRoot); + ChangelogManager changelogManager = + new ChangelogManager(LocalFileIO.create(), tableRoot, null); + assertThatThrownBy( + () -> + manager.rollbackTo( + firstSchemaId, + snapshotManager, + tagManager, + changelogManager)) + .hasMessageContaining("Cannot rollback to schema " + firstSchemaId) + .hasMessageContaining( + "schema " + + secondSchemaId + + " is still referenced by snapshots/tags/changelogs"); + } + + @Test + public void testRollbackSchemaNotExist() throws Exception { + SchemaManager manager = new SchemaManager(LocalFileIO.create(), path); + manager.createTable(schema); + + assertThatThrownBy( + () -> + manager.rollbackTo( + 999, + new SnapshotManager( + LocalFileIO.create(), path, null, null, null), + new TagManager(LocalFileIO.create(), path), + new ChangelogManager(LocalFileIO.create(), path, null))) + .hasMessageContaining("Schema 999 does not exist"); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java index 1b58b012cd6a..f50e9922f562 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java @@ -1844,4 +1844,55 @@ protected void assertRollbackTo( assertThat(table.tagManager().allTagNames()) .containsExactlyInAnyOrderElementsOf(expectedTags); } + + @Test + public void testRollbackSchemaSuccess() throws Exception { + FileStoreTable table = createFileStoreTable(); + SchemaManager schemaManager = table.schemaManager(); + long firstSchemaId = schemaManager.latest().get().id(); + + // evolve schema twice + schemaManager.commitChanges(SchemaChange.setOption("aa", "bb")); + long secondSchemaId = schemaManager.latest().get().id(); + schemaManager.commitChanges(SchemaChange.setOption("cc", "dd")); + long thirdSchemaId = schemaManager.latest().get().id(); + + // rollback to first schema + table.rollbackSchema(firstSchemaId); + assertThat(schemaManager.latest().get().id()).isEqualTo(firstSchemaId); + assertThat(schemaManager.schemaExists(secondSchemaId)).isFalse(); + assertThat(schemaManager.schemaExists(thirdSchemaId)).isFalse(); + } + + @Test + public void testRollbackSchemaFailedWithSnapshotReference() throws Exception { + FileStoreTable table = createFileStoreTable(); + SchemaManager schemaManager = table.schemaManager(); + long firstSchemaId = schemaManager.latest().get().id(); + + // write data to create a snapshot referencing firstSchemaId + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 0)); + write.close(); + commit.close(); + + // evolve schema + schemaManager.commitChanges(SchemaChange.setOption("aa", "bb")); + + // write data to create a snapshot referencing secondSchemaId + table = table.copyWithLatestSchema(); + write = table.newWrite(commitUser); + commit = table.newCommit(commitUser); + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 1)); + write.close(); + commit.close(); + + // rollback should fail because snapshot references secondSchemaId + FileStoreTable finalTable = table; + assertThatThrownBy(() -> finalTable.rollbackSchema(firstSchemaId)) + .hasMessageContaining("Cannot rollback to schema"); + } }