Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions docs/static/rest-catalog-open-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,42 @@ paths:
$ref: '#/components/examples/TagNotExistError'
"500":
$ref: '#/components/responses/ServerErrorResponse'
/v1/{prefix}/databases/{database}/tables/{table}/rollback-schema:
post:
tags:
- table
summary: Rollback table schema
operationId: rollbackSchema
parameters:
- name: prefix
in: path
required: true
schema:
type: string
- name: database
in: path
required: true
schema:
type: string
- name: table
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/RollbackSchemaRequest'
responses:
"200":
description: Success, no content
"401":
$ref: '#/components/responses/UnauthorizedErrorResponse'
"404":
$ref: '#/components/responses/TableNotExistErrorResponse'
"500":
$ref: '#/components/responses/ServerErrorResponse'
/v1/{prefix}/databases/{database}/tables/{table}/token:
get:
tags:
Expand Down Expand Up @@ -2936,6 +2972,14 @@ components:
type: integer
format: int64
nullable: true
RollbackSchemaRequest:
type: object
required:
- schemaId
properties:
schemaId:
type: integer
format: int64
Instant:
anyOf:
- $ref: '#/components/schemas/SnapshotInstant'
Expand Down
19 changes: 19 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.paimon.rest.requests.RegisterTableRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.rest.requests.ResetConsumerRequest;
import org.apache.paimon.rest.requests.RollbackSchemaRequest;
import org.apache.paimon.rest.requests.RollbackTableRequest;
import org.apache.paimon.rest.responses.AlterDatabaseResponse;
import org.apache.paimon.rest.responses.AuthTableQueryResponse;
Expand Down Expand Up @@ -711,6 +712,24 @@ public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fr
restAuthFunction);
}

/**
* Rollback schema for table.
*
* @param identifier database name and table name.
* @param schemaId the target schema version to rollback to
* @throws NoSuchResourceException Exception thrown on HTTP 404 means the table not exists
* @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for
* this table
*/
public void rollbackSchema(Identifier identifier, long schemaId) {
RollbackSchemaRequest request = new RollbackSchemaRequest(schemaId);
client.post(
resourcePaths.rollbackSchemaTable(
identifier.getDatabaseName(), identifier.getObjectName()),
request,
restAuthFunction);
}

/**
* Create table.
*
Expand Down
11 changes: 11 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ public String rollbackTable(String databaseName, String objectName) {
ROLLBACK);
}

public String rollbackSchemaTable(String databaseName, String objectName) {
return SLASH.join(
V1,
prefix,
DATABASES,
encodeString(databaseName),
TABLES,
encodeString(objectName),
"rollback-schema");
}

public String registerTable(String databaseName) {
return SLASH.join(V1, prefix, DATABASES, encodeString(databaseName), REGISTER);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest.requests;

import org.apache.paimon.rest.RESTRequest;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

/** Request for rollback table schema. */
@JsonIgnoreProperties(ignoreUnknown = true)
public class RollbackSchemaRequest implements RESTRequest {

private static final String FIELD_SCHEMA_ID = "schemaId";

@JsonProperty(FIELD_SCHEMA_ID)
private final long schemaId;

@JsonCreator
public RollbackSchemaRequest(@JsonProperty(FIELD_SCHEMA_ID) long schemaId) {
this.schemaId = schemaId;
}

@JsonGetter(FIELD_SCHEMA_ID)
public long getSchemaId() {
return schemaId;
}
}
16 changes: 16 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,22 @@ default void rollbackTo(Identifier identifier, Instant instant)
void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot)
throws Catalog.TableNotExistException;

/**
* Rollback table schema to a specific schema version. All schema versions greater than the
* target will be deleted. This operation will fail if any snapshot, tag, or changelog
* references a schema version greater than the target.
*
* @param identifier path of the table
* @param schemaId the target schema version to rollback to
* @throws Catalog.TableNotExistException if the table does not exist
* @throws UnsupportedOperationException if the catalog does not {@link
* #supportsVersionManagement()}
*/
default void rollbackSchema(Identifier identifier, long schemaId)
throws Catalog.TableNotExistException {
throw new UnsupportedOperationException();
}

/**
* Create a new branch for this table. By default, an empty branch will be created using the
* latest schema. If you provide {@code #fromTag}, a branch will be created from the tag and the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fr
wrapped.rollbackTo(identifier, instant, fromSnapshot);
}

@Override
public void rollbackSchema(Identifier identifier, long schemaId)
throws Catalog.TableNotExistException {
wrapped.rollbackSchema(identifier, schemaId);
}

@Override
public void createBranch(Identifier identifier, String branch, @Nullable String fromTag)
throws TableNotExistException, BranchAlreadyExistException, TagNotExistException {
Expand Down
12 changes: 12 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,18 @@ public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fr
}
}

@Override
public void rollbackSchema(Identifier identifier, long schemaId)
throws Catalog.TableNotExistException {
try {
api.rollbackSchema(identifier, schemaId);
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
}
}

private TableMetadata loadTableMetadata(Identifier identifier) throws TableNotExistException {
// if the table is system table, we need to load table metadata from the system table's data
// table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@
import org.apache.paimon.types.ReassignFieldId;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.LazyField;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.TagManager;

import org.apache.paimon.shade.guava30.com.google.common.collect.FluentIterable;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
Expand All @@ -66,10 +68,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down Expand Up @@ -1099,6 +1103,56 @@ public void deleteSchema(long schemaId) {
fileIO.deleteQuietly(toSchemaPath(schemaId));
}

/**
* Rollback to a specific schema version. All schema versions greater than the target will be
* deleted. This operation will fail if any snapshot, tag, or changelog references a schema
* version greater than the target.
*
* @param targetSchemaId the schema version to rollback to.
* @param snapshotManager the snapshot manager to check snapshot references.
* @param tagManager the tag manager to check tag references.
* @param changelogManager the changelog manager to check changelog references.
*/
public void rollbackTo(
long targetSchemaId,
SnapshotManager snapshotManager,
TagManager tagManager,
ChangelogManager changelogManager)
throws IOException {
checkArgument(schemaExists(targetSchemaId), "Schema %s does not exist.", targetSchemaId);

// Collect all schemaIds referenced by snapshots, tags, and changelogs
Set<Long> usedSchemaIds = new HashSet<>();

snapshotManager.pickOrLatest(
snapshot -> {
usedSchemaIds.add(snapshot.schemaId());
return false;
});
tagManager.taggedSnapshots().forEach(s -> usedSchemaIds.add(s.schemaId()));
changelogManager.changelogs().forEachRemaining(c -> usedSchemaIds.add(c.schemaId()));

// Check if any referenced schema is newer than the target
Optional<Long> conflict =
usedSchemaIds.stream().filter(id -> id > targetSchemaId).min(Long::compareTo);
if (conflict.isPresent()) {
throw new RuntimeException(
String.format(
"Cannot rollback to schema %d, schema %d is still referenced by snapshots/tags/changelogs.",
targetSchemaId, conflict.get()));
}

// Delete all schemas newer than the target
List<Long> toBeDeleted =
listAllIds().stream()
.filter(id -> id > targetSchemaId)
.collect(Collectors.toList());
toBeDeleted.sort((o1, o2) -> Long.compare(o2, o1));
for (Long id : toBeDeleted) {
fileIO.delete(toSchemaPath(id), false);
}
}

public static void checkAlterTableOption(
Map<String, String> options, String key, @Nullable String oldValue, String newValue) {
if (CoreOptions.IMMUTABLE_OPTIONS.contains(key)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
import javax.annotation.Nullable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -558,6 +560,20 @@ public void rollbackTo(String tagName) {
}
}

@Override
public void rollbackSchema(long schemaId) {
try {
schemaManager()
.rollbackTo(
schemaId,
snapshotManager(),
tagManager(),
new ChangelogManager(fileIO(), location(), null));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public Snapshot findSnapshot(long fromSnapshotId) throws SnapshotNotExistException {
SnapshotManager snapshotManager = snapshotManager();
Snapshot snapshot = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ public void rollbackTo(String tagName) {
wrapped.rollbackTo(tagName);
}

@Override
public void rollbackSchema(long schemaId) {
wrapped.rollbackSchema(schemaId);
}

@Override
public void createBranch(String branchName) {
wrapped.createBranch(branchName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ default void rollbackTo(String tagName) {
throw new UnsupportedOperationException();
}

@Override
default void rollbackSchema(long schemaId) {
throw new UnsupportedOperationException();
}

@Override
default void createBranch(String branchName) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ default void rollbackTo(String tagName) {
this.getClass().getSimpleName()));
}

@Override
default void rollbackSchema(long schemaId) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support rollbackSchema.",
this.getClass().getSimpleName()));
}

@Override
default void createBranch(String branchName) {
throw new UnsupportedOperationException(
Expand Down
4 changes: 4 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ default void deleteTags(String tagStr) {
@Experimental
void rollbackTo(String tagName);

/** Rollback table's schema to a specific schema version. */
@Experimental
void rollbackSchema(long schemaId);

/** Create an empty branch. */
@Experimental
void createBranch(String branchName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,12 @@ public boolean earliestFileNotExists() {
}

for (long snapshotId = latestId; snapshotId >= earliestId; snapshotId--) {
if (snapshotExists(snapshotId)) {
Snapshot snapshot = snapshot(snapshotId);
try {
Snapshot snapshot = tryGetSnapshot(snapshotId);
if (predicate.test(snapshot)) {
return snapshot.id();
}
} catch (FileNotFoundException ignored) {
}
}

Expand Down
Loading
Loading