diff --git a/docs/en/sql-reference/table-functions/iceberg.md b/docs/en/sql-reference/table-functions/iceberg.md index 9fd78c5e17d7..3a159cfb7b81 100644 --- a/docs/en/sql-reference/table-functions/iceberg.md +++ b/docs/en/sql-reference/table-functions/iceberg.md @@ -420,7 +420,7 @@ y: 993 ### Schema evolution {#iceberg-writes-schema-evolution} -ClickHouse allows you to add, drop, or modify columns with simple types (non-tuple, non-array, non-map). +ClickHouse allows you to add, drop, modify, or rename columns with simple types (non-tuple, non-array, non-map). ### Example {#example-iceberg-writes-evolution} @@ -479,6 +479,27 @@ Row 1: ────── x: Ivanov y: 993 + +ALTER TABLE iceberg_writes_example RENAME COLUMN y TO value; +SHOW CREATE TABLE iceberg_writes_example; + + ┌─statement─────────────────────────────────────────────────┐ +1. │ CREATE TABLE default.iceberg_writes_example ↴│ + │↳( ↴│ + │↳ `x` Nullable(String), ↴│ + │↳ `value` Nullable(Int64) ↴│ + │↳) ↴│ + │↳ENGINE = IcebergLocal('/home/scanhex12/iceberg_example/') │ + └───────────────────────────────────────────────────────────┘ + +SELECT * +FROM iceberg_writes_example +FORMAT VERTICAL; + +Row 1: +────── +x: Ivanov +value: 993 ``` ### Compaction {#iceberg-writes-compaction} diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp index fb7031e3d052..f30d97c599ff 100644 --- a/src/Databases/DataLake/RestCatalog.cpp +++ b/src/Databases/DataLake/RestCatalog.cpp @@ -29,6 +29,7 @@ #include #include +#include #include #include #include @@ -42,6 +43,8 @@ #include #include +#include + namespace DB::ErrorCodes { @@ -116,6 +119,187 @@ String encodeNamespaceForURI(const String & namespace_name) } +namespace +{ +Poco::JSON::Object::Ptr cloneJsonObject(const Poco::JSON::Object::Ptr & obj) +{ + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + obj->stringify(oss); + + Poco::JSON::Parser parser; + return parser.parse(oss.str()).extract(); +} +} + +UpdateMetadataRequestBodyResult buildUpdateMetadataRequestBody( + const String & namespace_name, const String & table_name, Poco::JSON::Object::Ptr new_snapshot) +{ + UpdateMetadataRequestBodyResult result; + + if (!new_snapshot) // If new_snapshot is nullptr, return Skip + { + result.status = UpdateMetadataRequestBodyResult::Status::Skip; + return result; + } + + Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object; + { + Poco::JSON::Object::Ptr identifier = new Poco::JSON::Object; + identifier->set("name", table_name); + Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array; + namespaces->add(namespace_name); + identifier->set("namespace", namespaces); + + request_body->set("identifier", identifier); + } + + // If the metadata has a schemas field, we need to update the schema + if (new_snapshot->has(DB::Iceberg::f_schemas)) + { + if (!new_snapshot->has(DB::Iceberg::f_current_schema_id)) + { + result.status = UpdateMetadataRequestBodyResult::Status::Error; + return result; + } + + // Extract the new schema id and the old schema id + const Int32 new_schema_id = new_snapshot->getValue(DB::Iceberg::f_current_schema_id); + const Int32 old_schema_id = new_schema_id - 1; + + // schemas is a JSON array of schema objects, we need to find the schema object with the new schema id + // "schemas" : [ + // { + // "fields" : [ + // { + // "name" : "id", + // "type" : "int", + // "id" : 1 + // } + // ], + // "schema-id" : 0, + // "type" : "struct" + // }, + // ... + // "fields" : [ + // { + // "name" : "id2", // id renamed from id to id2 + // "type" : "int", + // "id" : 1 + // } + // ], + // "schema-id" : 1 // new_schema_id, + // "type" : "struct" + // }, + + // Find the schema object with the new schema id + Poco::JSON::Object::Ptr new_schema_obj; + auto schemas = new_snapshot->getArray(DB::Iceberg::f_schemas); + for (UInt32 i = 0; i < schemas->size(); ++i) + { + auto s = schemas->getObject(i); + if (s->getValue(DB::Iceberg::f_schema_id) == new_schema_id) + { + new_schema_obj = s; + break; + } + } + // if we don't find the schema object with the new schema id, return an error + if (!new_schema_obj) + { + result.status = UpdateMetadataRequestBodyResult::Status::Error; + return result; + } + + Poco::JSON::Object::Ptr schema_for_rest = cloneJsonObject(new_schema_obj); + { + Poco::JSON::Array::Ptr identifier_fields = new Poco::JSON::Array; + schema_for_rest->set("identifier-field-ids", identifier_fields); + } + + if (old_schema_id >= 0) + { + Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object; + requirement->set("type", "assert-current-schema-id"); + requirement->set("current-schema-id", old_schema_id); + + Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array; + requirements->add(requirement); + request_body->set("requirements", requirements); + } + + Poco::JSON::Array::Ptr updates = new Poco::JSON::Array; + { + Poco::JSON::Object::Ptr add_schema = new Poco::JSON::Object; + add_schema->set("action", "add-schema"); + add_schema->set("schema", schema_for_rest); + if (new_snapshot->has(DB::Iceberg::f_last_column_id)) + add_schema->set("last-column-id", new_snapshot->getValue(DB::Iceberg::f_last_column_id)); + updates->add(add_schema); + } + { + Poco::JSON::Object::Ptr set_current_schema = new Poco::JSON::Object; + set_current_schema->set("action", "set-current-schema"); + set_current_schema->set("schema-id", new_schema_id); + updates->add(set_current_schema); + } + request_body->set("updates", updates); + } + else + { + // If the metadata has a parent-snapshot-id field, we need to update the parent-snapshot-id + // "parent-snapshot-id" : 1, + // "snapshot-id" : 2, + // "timestamp-ms" : 1717334400000, + // "schema-id" : 1, + // "operation" : "replace", + // "summary" : "replace snapshot 1 with snapshot 2" + // } + if (new_snapshot->has("parent-snapshot-id")) + { + auto parent_snapshot_id = new_snapshot->getValue("parent-snapshot-id"); + if (parent_snapshot_id != -1) + { + Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object; + requirement->set("type", "assert-ref-snapshot-id"); + requirement->set("ref", "main"); + requirement->set("snapshot-id", parent_snapshot_id); + + Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array; + requirements->add(requirement); + + request_body->set("requirements", requirements); + } + } + + // If the metadata has a snapshot-id field, we need to update the snapshot-id + { + Poco::JSON::Array::Ptr updates = new Poco::JSON::Array; + + { + Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object; + add_snapshot->set("action", "add-snapshot"); + add_snapshot->set("snapshot", new_snapshot); + updates->add(add_snapshot); + } + + { + Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object; + set_snapshot->set("action", "set-snapshot-ref"); + set_snapshot->set("ref-name", "main"); + set_snapshot->set("type", "branch"); + set_snapshot->set("snapshot-id", new_snapshot->getValue("snapshot-id")); + + updates->add(set_snapshot); + } + request_body->set("updates", updates); + } + } + + result.status = UpdateMetadataRequestBodyResult::Status::Ok; + result.request_body = request_body; + return result; +} + std::string RestCatalog::Config::toString() const { DB::WriteBufferFromOwnString wb; @@ -1294,59 +1478,15 @@ bool RestCatalog::updateMetadata(const String & namespace_name, const String & t { const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}", base_url, namespace_name, table_name); - Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object; - { - Poco::JSON::Object::Ptr identifier = new Poco::JSON::Object; - identifier->set("name", table_name); - Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array; - namespaces->add(namespace_name); - identifier->set("namespace", namespaces); - - request_body->set("identifier", identifier); - } - - if (new_snapshot->has("parent-snapshot-id")) - { - auto parent_snapshot_id = new_snapshot->getValue("parent-snapshot-id"); - if (parent_snapshot_id != -1) - { - Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object; - requirement->set("type", "assert-ref-snapshot-id"); - requirement->set("ref", "main"); - requirement->set("snapshot-id", parent_snapshot_id); - - Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array; - requirements->add(requirement); - - request_body->set("requirements", requirements); - } - } - - { - Poco::JSON::Array::Ptr updates = new Poco::JSON::Array; - - { - Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object; - add_snapshot->set("action", "add-snapshot"); - add_snapshot->set("snapshot", new_snapshot); - updates->add(add_snapshot); - } - - { - Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object; - set_snapshot->set("action", "set-snapshot-ref"); - set_snapshot->set("ref-name", "main"); - set_snapshot->set("type", "branch"); - set_snapshot->set("snapshot-id", new_snapshot->getValue("snapshot-id")); - - updates->add(set_snapshot); - } - request_body->set("updates", updates); - } + const auto built = buildUpdateMetadataRequestBody(namespace_name, table_name, new_snapshot); + if (built.status == UpdateMetadataRequestBodyResult::Status::Skip) + return true; + if (built.status == UpdateMetadataRequestBodyResult::Status::Error) + return false; try { - sendRequest(endpoint, request_body); + sendRequest(endpoint, built.request_body); } catch (const DB::HTTPException &) { diff --git a/src/Databases/DataLake/RestCatalog.h b/src/Databases/DataLake/RestCatalog.h index 17170436898d..11f3e27d6ff7 100644 --- a/src/Databases/DataLake/RestCatalog.h +++ b/src/Databases/DataLake/RestCatalog.h @@ -32,6 +32,28 @@ struct AccessToken } }; +/// Result of building the Iceberg REST catalog request body for `RestCatalog::updateMetadata`. +struct UpdateMetadataRequestBodyResult +{ + enum class Status + { + /// `new_snapshot` is null; caller should not send HTTP and return true. + Skip, + /// `request_body` is valid; caller should `sendRequest`. + Ok, + /// Validation failed; caller should return false. + Error, + }; + Status status = Status::Error; + Poco::JSON::Object::Ptr request_body; +}; + +/// Builds the JSON body for `POST .../namespaces/{ns}/tables/{table}` (Iceberg REST update). +UpdateMetadataRequestBodyResult buildUpdateMetadataRequestBody( + const String & namespace_name, + const String & table_name, + Poco::JSON::Object::Ptr new_snapshot); + class RestCatalog : public ICatalog, public DB::WithContext { public: diff --git a/src/Databases/DataLake/tests/gtest_rest_catalog_update_metadata.cpp b/src/Databases/DataLake/tests/gtest_rest_catalog_update_metadata.cpp new file mode 100644 index 000000000000..f5db29353192 --- /dev/null +++ b/src/Databases/DataLake/tests/gtest_rest_catalog_update_metadata.cpp @@ -0,0 +1,170 @@ +#include "config.h" + +#if USE_AVRO + +#include +#include +#include +#include +#include +#include + +using namespace DB; + +namespace +{ +Poco::JSON::Object::Ptr findUpdateByAction(const Poco::JSON::Array::Ptr & updates, const std::string & action) +{ + for (unsigned int i = 0; i < updates->size(); ++i) + { + auto o = updates->getObject(i); + if (o->getValue("action") == action) + return o; + } + return nullptr; +} +} + +TEST(RestCatalogUpdateMetadataBody, NullSnapshotSkips) +{ + auto r = DataLake::buildUpdateMetadataRequestBody("ns", "t", nullptr); + EXPECT_EQ(r.status, DataLake::UpdateMetadataRequestBodyResult::Status::Skip); + EXPECT_FALSE(r.request_body); +} + +TEST(RestCatalogUpdateMetadataBody, SchemaUpdateValid) +{ + Poco::JSON::Object::Ptr snapshot = new Poco::JSON::Object; + Poco::JSON::Array::Ptr schemas = new Poco::JSON::Array; + Poco::JSON::Object::Ptr schema = new Poco::JSON::Object; + schema->set(Iceberg::f_schema_id, 1); + schema->set(Iceberg::f_type, "struct"); + schema->set(Iceberg::f_fields, Poco::JSON::Array::Ptr(new Poco::JSON::Array)); + schemas->add(schema); + snapshot->set(Iceberg::f_schemas, schemas); + snapshot->set(Iceberg::f_current_schema_id, 1); + snapshot->set(Iceberg::f_last_column_id, 3); + + auto r = DataLake::buildUpdateMetadataRequestBody("my.ns", "tbl", snapshot); + ASSERT_EQ(r.status, DataLake::UpdateMetadataRequestBodyResult::Status::Ok); + ASSERT_TRUE(r.request_body); + + auto id = r.request_body->getObject("identifier"); + EXPECT_EQ(id->getValue("name"), "tbl"); + auto ns = id->getArray("namespace"); + ASSERT_EQ(ns->size(), 1u); + EXPECT_EQ(ns->getElement(0), "my.ns"); + + ASSERT_TRUE(r.request_body->has("requirements")); + auto req = r.request_body->getArray("requirements")->getObject(0); + EXPECT_EQ(req->getValue("type"), "assert-current-schema-id"); + EXPECT_EQ(req->getValue("current-schema-id"), 0); + + auto updates = r.request_body->getArray("updates"); + auto add_schema = findUpdateByAction(updates, "add-schema"); + ASSERT_TRUE(add_schema); + EXPECT_TRUE(add_schema->has("schema")); + EXPECT_EQ(add_schema->getValue("last-column-id"), 3); + + auto set_schema = findUpdateByAction(updates, "set-current-schema"); + ASSERT_TRUE(set_schema); + EXPECT_EQ(set_schema->getValue("schema-id"), 1); +} + +TEST(RestCatalogUpdateMetadataBody, SchemaUpdateCurrentIdZeroNoRequirement) +{ + Poco::JSON::Object::Ptr snapshot = new Poco::JSON::Object; + Poco::JSON::Array::Ptr schemas = new Poco::JSON::Array; + Poco::JSON::Object::Ptr schema = new Poco::JSON::Object; + schema->set(Iceberg::f_schema_id, 0); + schema->set(Iceberg::f_type, "struct"); + schema->set(Iceberg::f_fields, Poco::JSON::Array::Ptr(new Poco::JSON::Array)); + schemas->add(schema); + snapshot->set(Iceberg::f_schemas, schemas); + snapshot->set(Iceberg::f_current_schema_id, 0); + + auto r = DataLake::buildUpdateMetadataRequestBody("ns", "t", snapshot); + ASSERT_EQ(r.status, DataLake::UpdateMetadataRequestBodyResult::Status::Ok); + EXPECT_FALSE(r.request_body->has("requirements")); +} + +TEST(RestCatalogUpdateMetadataBody, SchemaUpdateMissingCurrentSchemaId) +{ + Poco::JSON::Object::Ptr snapshot = new Poco::JSON::Object; + snapshot->set(Iceberg::f_schemas, Poco::JSON::Array::Ptr(new Poco::JSON::Array)); + + auto r = DataLake::buildUpdateMetadataRequestBody("ns", "t", snapshot); + EXPECT_EQ(r.status, DataLake::UpdateMetadataRequestBodyResult::Status::Error); + EXPECT_FALSE(r.request_body); +} + +TEST(RestCatalogUpdateMetadataBody, SchemaUpdateNoMatchingSchemaId) +{ + Poco::JSON::Object::Ptr snapshot = new Poco::JSON::Object; + Poco::JSON::Array::Ptr schemas = new Poco::JSON::Array; + Poco::JSON::Object::Ptr schema = new Poco::JSON::Object; + schema->set(Iceberg::f_schema_id, 1); + schema->set(Iceberg::f_type, "struct"); + schema->set(Iceberg::f_fields, Poco::JSON::Array::Ptr(new Poco::JSON::Array)); + schemas->add(schema); + snapshot->set(Iceberg::f_schemas, schemas); + snapshot->set(Iceberg::f_current_schema_id, 99); + + auto r = DataLake::buildUpdateMetadataRequestBody("ns", "t", snapshot); + EXPECT_EQ(r.status, DataLake::UpdateMetadataRequestBodyResult::Status::Error); + EXPECT_FALSE(r.request_body); +} + +TEST(RestCatalogUpdateMetadataBody, SnapshotUpdateWithParent) +{ + Poco::JSON::Object::Ptr snapshot = new Poco::JSON::Object; + snapshot->set("snapshot-id", static_cast(12345)); + snapshot->set("parent-snapshot-id", static_cast(12344)); + snapshot->set(Iceberg::f_timestamp_ms, static_cast(1700000000000LL)); + + auto r = DataLake::buildUpdateMetadataRequestBody("ns", "t", snapshot); + ASSERT_EQ(r.status, DataLake::UpdateMetadataRequestBodyResult::Status::Ok); + ASSERT_TRUE(r.request_body); + + ASSERT_TRUE(r.request_body->has("requirements")); + auto req = r.request_body->getArray("requirements")->getObject(0); + EXPECT_EQ(req->getValue("type"), "assert-ref-snapshot-id"); + EXPECT_EQ(req->getValue("ref"), "main"); + EXPECT_EQ(req->getValue("snapshot-id"), 12344); + + auto updates = r.request_body->getArray("updates"); + auto add_snap = findUpdateByAction(updates, "add-snapshot"); + ASSERT_TRUE(add_snap); + EXPECT_EQ(add_snap->getObject("snapshot")->getValue("snapshot-id"), 12345); + + auto set_ref = findUpdateByAction(updates, "set-snapshot-ref"); + ASSERT_TRUE(set_ref); + EXPECT_EQ(set_ref->getValue("snapshot-id"), 12345); +} + +TEST(RestCatalogUpdateMetadataBody, SnapshotUpdateWithoutParent) +{ + Poco::JSON::Object::Ptr snapshot = new Poco::JSON::Object; + snapshot->set("snapshot-id", static_cast(999)); + + auto r = DataLake::buildUpdateMetadataRequestBody("ns", "t", snapshot); + ASSERT_EQ(r.status, DataLake::UpdateMetadataRequestBodyResult::Status::Ok); + EXPECT_FALSE(r.request_body->has("requirements")); + + auto updates = r.request_body->getArray("updates"); + ASSERT_TRUE(findUpdateByAction(updates, "add-snapshot")); + ASSERT_TRUE(findUpdateByAction(updates, "set-snapshot-ref")); +} + +TEST(RestCatalogUpdateMetadataBody, SnapshotUpdateParentMinusOneNoRequirement) +{ + Poco::JSON::Object::Ptr snapshot = new Poco::JSON::Object; + snapshot->set("snapshot-id", static_cast(1)); + snapshot->set("parent-snapshot-id", static_cast(-1)); + + auto r = DataLake::buildUpdateMetadataRequestBody("ns", "t", snapshot); + ASSERT_EQ(r.status, DataLake::UpdateMetadataRequestBodyResult::Status::Ok); + EXPECT_FALSE(r.request_body->has("requirements")); +} + +#endif diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 93ffa305b89f..a7c66b5b7f53 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -168,15 +168,15 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl void checkAlterIsPossible(const AlterCommands & commands) override { - assertInitializedDL(); - current_metadata->checkAlterIsPossible(commands); + if(current_metadata) + current_metadata->checkAlterIsPossible(commands); } - void alter(const AlterCommands & params, ContextPtr context) override + void alter(const AlterCommands & params, ContextPtr context, + const StorageID & storage_id, std::shared_ptr catalog) override { assertInitializedDL(); - current_metadata->alter(params, context); - + current_metadata->alter(params, shared_from_this(), context, storage_id, catalog); } ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly) override @@ -451,7 +451,6 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl void assertInitializedDL() const { - BaseStorageConfiguration::assertInitialized(); if (!current_metadata) throw Exception(ErrorCodes::LOGICAL_ERROR, "Metadata is not initialized"); } @@ -675,7 +674,9 @@ class StorageIcebergConfiguration : public StorageObjectStorageConfiguration, pu void checkAlterIsPossible(const AlterCommands & commands) override { getImpl().checkAlterIsPossible(commands); } - void alter(const AlterCommands & params, ContextPtr context) override { getImpl().alter(params, context); } + void alter(const AlterCommands & params, ContextPtr context, + const StorageID & storage_id, std::shared_ptr catalog) override + { getImpl().alter(params, context, storage_id, catalog); } const DataLakeStorageSettings & getDataLakeSettings() const override { return getImpl().getDataLakeSettings(); } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index f58bf8d04dcd..1a18cc7c2163 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -201,7 +201,12 @@ class IDataLakeMetadata : boost::noncopyable virtual void addDeleteTransformers(ObjectInfoPtr, QueryPipelineBuilder &, const std::optional &, FormatParserSharedResourcesPtr, ContextPtr) const { } virtual void checkAlterIsPossible(const AlterCommands & /*commands*/) { throwNotImplemented("alter"); } - virtual void alter(const AlterCommands & /*params*/, ContextPtr /*context*/) { throwNotImplemented("alter"); } + virtual void alter( + const AlterCommands & /*params*/, + StorageObjectStorageConfigurationPtr /*configuration*/, + ContextPtr /*context*/, + const StorageID & /*storage_id*/, + std::shared_ptr /*catalog*/) { throwNotImplemented("alter"); } virtual void drop(ContextPtr) { } virtual std::optional partitionKey(ContextPtr) const { return {}; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 36ac299adc4b..76aa32ee5a06 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -717,12 +717,17 @@ void IcebergMetadata::checkAlterIsPossible(const AlterCommands & commands) for (const auto & command : commands) { if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::DROP_COLUMN - && command.type != AlterCommand::Type::MODIFY_COLUMN) + && command.type != AlterCommand::Type::MODIFY_COLUMN && command.type != AlterCommand::Type::RENAME_COLUMN) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by Iceberg storage", command.type); } } -void IcebergMetadata::alter(const AlterCommands & params, ContextPtr context) +void IcebergMetadata::alter( + const AlterCommands & params, + StorageObjectStorageConfigurationPtr configuration, + ContextPtr context, + const StorageID & storage_id, + std::shared_ptr catalog) { if (!context->getSettingsRef()[Setting::allow_experimental_insert_into_iceberg].value) { @@ -732,7 +737,11 @@ void IcebergMetadata::alter(const AlterCommands & params, ContextPtr context) "To allow its usage, enable setting allow_experimental_insert_into_iceberg"); } - Iceberg::alter(params, context, object_storage, data_lake_settings, persistent_components, write_format); + Iceberg::alter( + params, context, object_storage, data_lake_settings, persistent_components, write_format, + storage_id, catalog, + configuration ? configuration->getTypeName() : "", + configuration ? configuration->getNamespace() : ""); } void IcebergMetadata::createInitial( diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index c672f36de014..6fadce65baf5 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -129,7 +129,12 @@ class IcebergMetadata : public IDataLakeMetadata void modifyFormatSettings(FormatSettings & format_settings, const Context & local_context) const override; void addDeleteTransformers(ObjectInfoPtr object_info, QueryPipelineBuilder & builder, const std::optional & format_settings, FormatParserSharedResourcesPtr parser_shared_resources, ContextPtr local_context) const override; void checkAlterIsPossible(const AlterCommands & commands) override; - void alter(const AlterCommands & params, ContextPtr context) override; + void alter( + const AlterCommands & params, + StorageObjectStorageConfigurationPtr configuration, + ContextPtr context, + const StorageID & storage_id, + std::shared_ptr catalog) override; ObjectIterator iterate( const ActionsDAG * filter_dag, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp index 184c6a7f9359..6073c0a4bbb1 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp @@ -351,6 +351,53 @@ void MetadataGenerator::generateModifyColumnMetadata(const String & column_name, metadata_object->getArray(Iceberg::f_schemas)->add(current_schema); } +void MetadataGenerator::generateRenameColumnMetadata(const String & column_name, const String & new_column_name) +{ + auto current_schema_id = metadata_object->getValue(Iceberg::f_current_schema_id); + + Poco::JSON::Object::Ptr current_schema; + auto schemas = metadata_object->getArray(Iceberg::f_schemas); + for (UInt32 i = 0; i < schemas->size(); ++i) + { + if (schemas->getObject(i)->getValue(Iceberg::f_schema_id) == current_schema_id) + { + current_schema = schemas->getObject(i); + break; + } + } + + if (!current_schema) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not found schema with id {}", current_schema_id); + current_schema = deepCopy(current_schema); + + auto schema_fields = current_schema->getArray(Iceberg::f_fields); + + for (UInt32 i = 0; i < schema_fields->size(); ++i) + { + if (schema_fields->getObject(i)->getValue(Iceberg::f_name) == new_column_name) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Column {} already exists", new_column_name); + } + + bool found = false; + for (UInt32 i = 0; i < schema_fields->size(); ++i) + { + auto current_field = schema_fields->getObject(i); + if (current_field->getValue(Iceberg::f_name) == column_name) + { + current_field->set(Iceberg::f_name, new_column_name); + found = true; + break; + } + } + + if (!found) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not found column {}", column_name); + + metadata_object->set(Iceberg::f_current_schema_id, current_schema_id + 1); + current_schema->set(Iceberg::f_schema_id, current_schema_id + 1); + metadata_object->getArray(Iceberg::f_schemas)->add(current_schema); +} + } #endif diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h index 035747dafa14..652d6009a37f 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h @@ -36,6 +36,7 @@ class MetadataGenerator void generateAddColumnMetadata(const String & column_name, DataTypePtr type); void generateDropColumnMetadata(const String & column_name); void generateModifyColumnMetadata(const String & column_name, DataTypePtr type); + void generateRenameColumnMetadata(const String & column_name, const String & new_column_name); private: Poco::JSON::Object::Ptr metadata_object; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp index deeb05a49102..b58574b3d218 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp @@ -677,13 +677,16 @@ void alter( ObjectStoragePtr object_storage, const DataLakeStorageSettings & data_lake_settings, PersistentTableComponents & persistent_table_components, - const String & write_format) + const String & write_format, + StorageID storage_id, + std::shared_ptr catalog, + const String & blob_storage_type_name, + const String & blob_storage_namespace_name) { if (params.size() != 1) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Params with size 1 is not supported"); + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Iceberg alter supports exactly one command at a time, got {}", params.size()); - size_t i = 0; - while (i++ < MAX_TRANSACTION_RETRIES) + for (size_t i = 0; i < MAX_TRANSACTION_RETRIES; ++i) { FileNamesGenerator filename_generator( persistent_table_components.table_path, persistent_table_components.table_path, false, CompressionMethod::None, write_format); @@ -717,6 +720,9 @@ void alter( case AlterCommand::Type::MODIFY_COLUMN: metadata_json_generator.generateModifyColumnMetadata(params[0].column_name, params[0].data_type); break; + case AlterCommand::Type::RENAME_COLUMN: + metadata_json_generator.generateRenameColumnMetadata(params[0].column_name, params[0].rename_to); + break; default: throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown type of alter {}", params[0].type); } @@ -727,6 +733,8 @@ void alter( auto [metadata_name, storage_metadata_name] = filename_generator.generateMetadataName(); + LOG_INFO(log, "Iceberg alter: writing metadata to '{}', latest version was {}", storage_metadata_name, last_version); + auto hint = filename_generator.generateVersionHint(); if (writeMetadataFileAndVersionHint( storage_metadata_name, @@ -737,11 +745,43 @@ void alter( context, compression_method, data_lake_settings[DataLakeStorageSetting::iceberg_use_version_hint])) - break; + { + if (catalog) + { + String catalog_filename = metadata_name; + if (!catalog_filename.starts_with(blob_storage_type_name)) + catalog_filename = blob_storage_type_name + "://" + blob_storage_namespace_name + "/" + metadata_name; + + const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName()); + if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, metadata)) + { + LOG_WARNING(log, "Iceberg alter: catalog update failed for '{}'", catalog_filename); + continue; + } + } + return; + } + + bool file_exists = object_storage->exists(StoredObject(storage_metadata_name)); + LOG_WARNING(log, "Iceberg alter: failed to write metadata to '{}' (attempt {}, file exists: {})", + storage_metadata_name, i + 1, file_exists); + + if (file_exists && catalog) + { + String catalog_filename = metadata_name; + if (!catalog_filename.starts_with(blob_storage_type_name)) + catalog_filename = blob_storage_type_name + "://" + blob_storage_namespace_name + "/" + metadata_name; + + const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName()); + if (catalog->updateMetadata(namespace_name, table_name, catalog_filename, metadata)) + { + LOG_INFO(log, "Iceberg alter: adopted existing metadata file '{}' and updated catalog", storage_metadata_name); + return; + } + } } - if (i == MAX_TRANSACTION_RETRIES) - throw Exception(ErrorCodes::LIMIT_EXCEEDED, "Too many unsuccessed retries to alter iceberg table"); + throw Exception(ErrorCodes::LIMIT_EXCEEDED, "Too many unsuccessful retries to alter iceberg table"); } #endif diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h index c19282318319..8a7a0057c7ae 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h @@ -38,7 +38,11 @@ void alter( ObjectStoragePtr object_storage, const DataLakeStorageSettings & data_lake_settings, PersistentTableComponents & persistent_table_components, - const String & write_format); + const String & write_format, + StorageID storage_id, + std::shared_ptr catalog, + const String & blob_storage_type_name, + const String & blob_storage_namespace_name); } #endif diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index b7a3260bc791..02dcb59ad3df 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -792,14 +792,17 @@ void StorageObjectStorage::checkMutationIsPossible(const MutationCommands & comm void StorageObjectStorage::alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & /*alter_lock_holder*/) { + configuration->update(object_storage, context, /* if_not_updated_before */ true); + StorageInMemoryMetadata new_metadata = getInMemoryMetadata(); params.apply(new_metadata, context); - configuration->alter(params, context); + configuration->alter(params, context, getStorageID(), catalog); + + auto database = DatabaseCatalog::instance().getDatabase(storage_id.database_name); + if (!database->isDatalakeCatalog()) + database->alterTable(context, storage_id, new_metadata, /*validate_new_create_query=*/true); - DatabaseCatalog::instance() - .getDatabase(storage_id.database_name) - ->alterTable(context, storage_id, new_metadata, /*validate_new_create_query=*/true); setInMemoryMetadata(new_metadata); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h index 3f3129a9d3c4..175867021751 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h @@ -244,7 +244,8 @@ class StorageObjectStorageConfiguration } } - virtual void alter(const AlterCommands & /*params*/, ContextPtr /*context*/) {} + virtual void alter(const AlterCommands & /*params*/, ContextPtr /*context*/, + const StorageID & /*storage_id*/, std::shared_ptr /*catalog*/) {} virtual const DataLakeStorageSettings & getDataLakeSettings() const { diff --git a/tests/integration/test_storage_iceberg_no_spark/test_writes_add_column.py b/tests/integration/test_storage_iceberg_no_spark/test_writes_add_column.py new file mode 100644 index 000000000000..f9ce478a5033 --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/test_writes_add_column.py @@ -0,0 +1,72 @@ +import pytest + +from helpers.iceberg_utils import ( + create_iceberg_table, + get_uuid_str, +) + +INSERT_SETTINGS = {"allow_experimental_insert_into_iceberg": 1} + + +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_add_column_basic(started_cluster_iceberg_no_spark, format_version, storage_type): + """ADD COLUMN (nullable): existing rows read with NULL in the new column; new inserts can set it.""" + instance = started_cluster_iceberg_no_spark.instances["node1"] + TABLE_NAME = "test_add_column_basic_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster_iceberg_no_spark, + "(id Int32, value Nullable(String))", + format_version, + ) + + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 'hello'), (2, 'world');", settings=INSERT_SETTINGS) + assert instance.query(f"SELECT id, value FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n" + + instance.query(f"ALTER TABLE {TABLE_NAME} ADD COLUMN extra Nullable(Int32);", settings=INSERT_SETTINGS) + + assert instance.query(f"SELECT id, value, extra FROM {TABLE_NAME} ORDER BY id") == ( + "1\thello\t\\N\n2\tworld\t\\N\n" + ) + + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (3, 'foo', 7);", settings=INSERT_SETTINGS) + assert instance.query(f"SELECT id, value, extra FROM {TABLE_NAME} ORDER BY id") == ( + "1\thello\t\\N\n2\tworld\t\\N\n3\tfoo\t7\n" + ) + + +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_add_column_errors(started_cluster_iceberg_no_spark, format_version, storage_type): + """Non-nullable ADD COLUMN and duplicate name must fail; schema unchanged.""" + instance = started_cluster_iceberg_no_spark.instances["node1"] + TABLE_NAME = "test_add_column_errors_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster_iceberg_no_spark, + "(id Int32, value Nullable(String))", + format_version, + ) + + error = instance.query_and_get_error( + f"ALTER TABLE {TABLE_NAME} ADD COLUMN bad Int32;", + settings=INSERT_SETTINGS, + ) + assert "non-nullable" in error.lower() or "doesn't allow" in error.lower() + + error = instance.query_and_get_error( + f"ALTER TABLE {TABLE_NAME} ADD COLUMN value Nullable(Int32);", + settings=INSERT_SETTINGS, + ) + assert "DUPLICATE_COLUMN" in error or "already exists" in error + + assert instance.query( + f"SELECT name FROM system.columns WHERE database = currentDatabase() AND table = '{TABLE_NAME}' ORDER BY name" + ) == "id\nvalue\n" diff --git a/tests/integration/test_storage_iceberg_no_spark/test_writes_drop_column.py b/tests/integration/test_storage_iceberg_no_spark/test_writes_drop_column.py new file mode 100644 index 000000000000..2f5120001ef8 --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/test_writes_drop_column.py @@ -0,0 +1,62 @@ +import pytest + +from helpers.iceberg_utils import ( + create_iceberg_table, + get_uuid_str, +) + +INSERT_SETTINGS = {"allow_experimental_insert_into_iceberg": 1} + + +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_drop_column_basic(started_cluster_iceberg_no_spark, format_version, storage_type): + """DROP COLUMN removes the column from reads and inserts; remaining columns unchanged.""" + instance = started_cluster_iceberg_no_spark.instances["node1"] + TABLE_NAME = "test_drop_column_basic_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster_iceberg_no_spark, + "(id Int32, value Nullable(String))", + format_version, + ) + + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 'hello'), (2, 'world');", settings=INSERT_SETTINGS) + assert instance.query(f"SELECT id, value FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n" + + instance.query(f"ALTER TABLE {TABLE_NAME} DROP COLUMN value;", settings=INSERT_SETTINGS) + + assert instance.query(f"SELECT id FROM {TABLE_NAME} ORDER BY id") == "1\n2\n" + + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (3);", settings=INSERT_SETTINGS) + assert instance.query(f"SELECT id FROM {TABLE_NAME} ORDER BY id") == "1\n2\n3\n" + + +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_drop_column_errors(started_cluster_iceberg_no_spark, format_version, storage_type): + """Dropping a non-existent column must fail; table structure unchanged.""" + instance = started_cluster_iceberg_no_spark.instances["node1"] + TABLE_NAME = "test_drop_column_errors_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster_iceberg_no_spark, + "(id Int32, value Nullable(String))", + format_version, + ) + + error = instance.query_and_get_error( + f"ALTER TABLE {TABLE_NAME} DROP COLUMN nonexistent;", + settings=INSERT_SETTINGS, + ) + assert "nonexistent" in error + + assert instance.query( + f"SELECT name FROM system.columns WHERE database = currentDatabase() AND table = '{TABLE_NAME}' ORDER BY name" + ) == "id\nvalue\n" diff --git a/tests/integration/test_storage_iceberg_no_spark/test_writes_modify_column.py b/tests/integration/test_storage_iceberg_no_spark/test_writes_modify_column.py new file mode 100644 index 000000000000..51a17b2fae56 --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/test_writes_modify_column.py @@ -0,0 +1,71 @@ +import pytest + +from helpers.iceberg_utils import ( + create_iceberg_table, + get_uuid_str, +) + +INSERT_SETTINGS = {"allow_experimental_insert_into_iceberg": 1} + + +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_modify_column_basic(started_cluster_iceberg_no_spark, format_version, storage_type): + """Widen Int32 to Int64 (Iceberg int→long); existing and new rows read correctly.""" + instance = started_cluster_iceberg_no_spark.instances["node1"] + TABLE_NAME = "test_modify_column_basic_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster_iceberg_no_spark, + "(id Int32, value Nullable(String))", + format_version, + ) + + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 'hello'), (2, 'world');", settings=INSERT_SETTINGS) + assert instance.query(f"SELECT id, value FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n" + + instance.query(f"ALTER TABLE {TABLE_NAME} MODIFY COLUMN id Int64;", settings=INSERT_SETTINGS) + + assert instance.query(f"SELECT id, value FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n" + + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (3, 'foo');", settings=INSERT_SETTINGS) + assert instance.query(f"SELECT id, value FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n3\tfoo\n" + + +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_modify_column_errors(started_cluster_iceberg_no_spark, format_version, storage_type): + """Invalid schema evolution (e.g. String→Int64) must fail; columns unchanged.""" + instance = started_cluster_iceberg_no_spark.instances["node1"] + TABLE_NAME = "test_modify_column_errors_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster_iceberg_no_spark, + "(id Int32, value Nullable(String))", + format_version, + ) + + error = instance.query_and_get_error( + f"ALTER TABLE {TABLE_NAME} MODIFY COLUMN value Int64;", + settings=INSERT_SETTINGS, + ) + el = error.lower() + # String→integer: mismatched Poco::Var kinds in checkValidSchemaEvolution → BadCastException + # (not always the explicit Iceberg "schema evolution" BAD_ARGUMENTS text). + assert ( + "bad cast" in el + or "can not convert" in el + or "cannot convert" in el + or "schema evolution" in el + or "doesn't allow" in el + ) + + assert instance.query( + f"SELECT name FROM system.columns WHERE database = currentDatabase() AND table = '{TABLE_NAME}' ORDER BY name" + ) == "id\nvalue\n" diff --git a/tests/integration/test_storage_iceberg_no_spark/test_writes_rename_column.py b/tests/integration/test_storage_iceberg_no_spark/test_writes_rename_column.py new file mode 100644 index 000000000000..9d9f4220660a --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/test_writes_rename_column.py @@ -0,0 +1,74 @@ +import pytest + +from helpers.iceberg_utils import ( + create_iceberg_table, + get_uuid_str, +) + +INSERT_SETTINGS = {"allow_experimental_insert_into_iceberg": 1} + + +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_rename_column_basic(started_cluster_iceberg_no_spark, format_version, storage_type): + """Rename a column: existing rows are readable under the new name, and new inserts work.""" + instance = started_cluster_iceberg_no_spark.instances["node1"] + TABLE_NAME = "test_rename_column_basic_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster_iceberg_no_spark, + "(id Int32, value Nullable(String))", + format_version, + ) + + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 'hello'), (2, 'world');", settings=INSERT_SETTINGS) + assert instance.query(f"SELECT id, value FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n" + + instance.query(f"ALTER TABLE {TABLE_NAME} RENAME COLUMN value TO label;", settings=INSERT_SETTINGS) + + # existing rows readable under the new name + assert instance.query(f"SELECT id, label FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n" + + # new inserts work under the new name + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (3, 'foo');", settings=INSERT_SETTINGS) + assert instance.query(f"SELECT id, label FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n3\tfoo\n" + + +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_rename_column_errors(started_cluster_iceberg_no_spark, format_version, storage_type): + """Renaming a non-existent column or renaming to an already-used name must raise BAD_ARGUMENTS.""" + instance = started_cluster_iceberg_no_spark.instances["node1"] + TABLE_NAME = "test_rename_column_errors_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster_iceberg_no_spark, + "(id Int32, value Nullable(String))", + format_version, + ) + + # rename a column that does not exist — rejected by AlterCommands::validate (NOT_FOUND_COLUMN_IN_BLOCK) + error = instance.query_and_get_error( + f"ALTER TABLE {TABLE_NAME} RENAME COLUMN nonexistent TO other;", + settings=INSERT_SETTINGS, + ) + assert "nonexistent" in error + + # rename to a name already used by another column — rejected by AlterCommands::validate (DUPLICATE_COLUMN) + error = instance.query_and_get_error( + f"ALTER TABLE {TABLE_NAME} RENAME COLUMN value TO id;", + settings=INSERT_SETTINGS, + ) + assert "DUPLICATE_COLUMN" in error + assert "id" in error + + # table structure must be unchanged after both failed renames + assert instance.query( + f"SELECT name FROM system.columns WHERE database = currentDatabase() AND table = '{TABLE_NAME}' ORDER BY name" + ) == "id\nvalue\n"