Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion docs/en/sql-reference/table-functions/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ y: 993

### Schema evolution {#iceberg-writes-schema-evolution}

ClickHouse allows you to add, drop, or modify columns with simple types (non-tuple, non-array, non-map).
ClickHouse allows you to add, drop, modify, or rename columns with simple types (non-tuple, non-array, non-map).

### Example {#example-iceberg-writes-evolution}

Expand Down Expand Up @@ -479,6 +479,27 @@ Row 1:
──────
x: Ivanov
y: 993

ALTER TABLE iceberg_writes_example RENAME COLUMN y TO value;
SHOW CREATE TABLE iceberg_writes_example;

┌─statement─────────────────────────────────────────────────┐
1. │ CREATE TABLE default.iceberg_writes_example ↴│
│↳( ↴│
│↳ `x` Nullable(String), ↴│
│↳ `value` Nullable(Int64) ↴│
│↳) ↴│
│↳ENGINE = IcebergLocal('/home/scanhex12/iceberg_example/') │
└───────────────────────────────────────────────────────────┘

SELECT *
FROM iceberg_writes_example
FORMAT VERTICAL;

Row 1:
──────
x: Ivanov
value: 993
```

### Compaction {#iceberg-writes-compaction}
Expand Down
240 changes: 190 additions & 50 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <IO/Operators.h>
#include <Interpreters/Context.h>

#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
#include <Server/HTTP/HTMLForm.h>
#include <Formats/FormatFactory.h>
Expand All @@ -42,6 +43,8 @@
#include <Poco/Net/SSLManager.h>
#include <Poco/StreamCopier.h>

#include <sstream>


namespace DB::ErrorCodes
{
Expand Down Expand Up @@ -116,6 +119,187 @@ String encodeNamespaceForURI(const String & namespace_name)

}

namespace
{
Poco::JSON::Object::Ptr cloneJsonObject(const Poco::JSON::Object::Ptr & obj)
{
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
obj->stringify(oss);

Poco::JSON::Parser parser;
return parser.parse(oss.str()).extract<Poco::JSON::Object::Ptr>();
}
}

UpdateMetadataRequestBodyResult buildUpdateMetadataRequestBody(
const String & namespace_name, const String & table_name, Poco::JSON::Object::Ptr new_snapshot)
{
UpdateMetadataRequestBodyResult result;

if (!new_snapshot) // If new_snapshot is nullptr, return Skip
{
result.status = UpdateMetadataRequestBodyResult::Status::Skip;
return result;
}

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Poco::JSON::Object::Ptr identifier = new Poco::JSON::Object;
identifier->set("name", table_name);
Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array;
namespaces->add(namespace_name);
identifier->set("namespace", namespaces);

request_body->set("identifier", identifier);
}

// If the metadata has a schemas field, we need to update the schema
if (new_snapshot->has(DB::Iceberg::f_schemas))
{
if (!new_snapshot->has(DB::Iceberg::f_current_schema_id))
{
result.status = UpdateMetadataRequestBodyResult::Status::Error;
return result;
}

// Extract the new schema id and the old schema id
const Int32 new_schema_id = new_snapshot->getValue<Int32>(DB::Iceberg::f_current_schema_id);
const Int32 old_schema_id = new_schema_id - 1;

// schemas is a JSON array of schema objects, we need to find the schema object with the new schema id
// "schemas" : [
// {
// "fields" : [
// {
// "name" : "id",
// "type" : "int",
// "id" : 1
// }
// ],
// "schema-id" : 0,
// "type" : "struct"
// },
// ...
// "fields" : [
// {
// "name" : "id2", // id renamed from id to id2
// "type" : "int",
// "id" : 1
// }
// ],
// "schema-id" : 1 // new_schema_id,
// "type" : "struct"
// },

// Find the schema object with the new schema id
Poco::JSON::Object::Ptr new_schema_obj;
auto schemas = new_snapshot->getArray(DB::Iceberg::f_schemas);
for (UInt32 i = 0; i < schemas->size(); ++i)
{
auto s = schemas->getObject(i);
if (s->getValue<Int32>(DB::Iceberg::f_schema_id) == new_schema_id)
{
new_schema_obj = s;
break;
}
}
// if we don't find the schema object with the new schema id, return an error
if (!new_schema_obj)
{
result.status = UpdateMetadataRequestBodyResult::Status::Error;
return result;
}

Poco::JSON::Object::Ptr schema_for_rest = cloneJsonObject(new_schema_obj);
{
Poco::JSON::Array::Ptr identifier_fields = new Poco::JSON::Array;
schema_for_rest->set("identifier-field-ids", identifier_fields);
}

if (old_schema_id >= 0)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-current-schema-id");
requirement->set("current-schema-id", old_schema_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);
request_body->set("requirements", requirements);
}

Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;
{
Poco::JSON::Object::Ptr add_schema = new Poco::JSON::Object;
add_schema->set("action", "add-schema");
add_schema->set("schema", schema_for_rest);
if (new_snapshot->has(DB::Iceberg::f_last_column_id))
add_schema->set("last-column-id", new_snapshot->getValue<Int32>(DB::Iceberg::f_last_column_id));
updates->add(add_schema);
}
{
Poco::JSON::Object::Ptr set_current_schema = new Poco::JSON::Object;
set_current_schema->set("action", "set-current-schema");
set_current_schema->set("schema-id", new_schema_id);
updates->add(set_current_schema);
}
request_body->set("updates", updates);
}
else
{
// If the metadata has a parent-snapshot-id field, we need to update the parent-snapshot-id
// "parent-snapshot-id" : 1,
// "snapshot-id" : 2,
// "timestamp-ms" : 1717334400000,
// "schema-id" : 1,
// "operation" : "replace",
// "summary" : "replace snapshot 1 with snapshot 2"
// }
if (new_snapshot->has("parent-snapshot-id"))
{
auto parent_snapshot_id = new_snapshot->getValue<Int64>("parent-snapshot-id");
if (parent_snapshot_id != -1)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-ref-snapshot-id");
requirement->set("ref", "main");
requirement->set("snapshot-id", parent_snapshot_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);

request_body->set("requirements", requirements);
}
}

// If the metadata has a snapshot-id field, we need to update the snapshot-id
{
Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;

{
Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object;
add_snapshot->set("action", "add-snapshot");
add_snapshot->set("snapshot", new_snapshot);
updates->add(add_snapshot);
}

{
Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object;
set_snapshot->set("action", "set-snapshot-ref");
set_snapshot->set("ref-name", "main");
set_snapshot->set("type", "branch");
set_snapshot->set("snapshot-id", new_snapshot->getValue<Int64>("snapshot-id"));

updates->add(set_snapshot);
}
request_body->set("updates", updates);
}
}

result.status = UpdateMetadataRequestBodyResult::Status::Ok;
result.request_body = request_body;
return result;
}

std::string RestCatalog::Config::toString() const
{
DB::WriteBufferFromOwnString wb;
Expand Down Expand Up @@ -1294,59 +1478,15 @@ bool RestCatalog::updateMetadata(const String & namespace_name, const String & t
{
const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}", base_url, namespace_name, table_name);

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Poco::JSON::Object::Ptr identifier = new Poco::JSON::Object;
identifier->set("name", table_name);
Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array;
namespaces->add(namespace_name);
identifier->set("namespace", namespaces);

request_body->set("identifier", identifier);
}

if (new_snapshot->has("parent-snapshot-id"))
{
auto parent_snapshot_id = new_snapshot->getValue<Int64>("parent-snapshot-id");
if (parent_snapshot_id != -1)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-ref-snapshot-id");
requirement->set("ref", "main");
requirement->set("snapshot-id", parent_snapshot_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);

request_body->set("requirements", requirements);
}
}

{
Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;

{
Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object;
add_snapshot->set("action", "add-snapshot");
add_snapshot->set("snapshot", new_snapshot);
updates->add(add_snapshot);
}

{
Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object;
set_snapshot->set("action", "set-snapshot-ref");
set_snapshot->set("ref-name", "main");
set_snapshot->set("type", "branch");
set_snapshot->set("snapshot-id", new_snapshot->getValue<Int64>("snapshot-id"));

updates->add(set_snapshot);
}
request_body->set("updates", updates);
}
const auto built = buildUpdateMetadataRequestBody(namespace_name, table_name, new_snapshot);
if (built.status == UpdateMetadataRequestBodyResult::Status::Skip)
return true;
if (built.status == UpdateMetadataRequestBodyResult::Status::Error)
return false;

try
{
sendRequest(endpoint, request_body);
sendRequest(endpoint, built.request_body);
}
catch (const DB::HTTPException &)
{
Expand Down
22 changes: 22 additions & 0 deletions src/Databases/DataLake/RestCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@ struct AccessToken
}
};

/// Result of building the Iceberg REST catalog request body for `RestCatalog::updateMetadata`.
struct UpdateMetadataRequestBodyResult
{
enum class Status
{
/// `new_snapshot` is null; caller should not send HTTP and return true.
Skip,
/// `request_body` is valid; caller should `sendRequest`.
Ok,
/// Validation failed; caller should return false.
Error,
};
Status status = Status::Error;
Poco::JSON::Object::Ptr request_body;
};

/// Builds the JSON body for `POST .../namespaces/{ns}/tables/{table}` (Iceberg REST update).
UpdateMetadataRequestBodyResult buildUpdateMetadataRequestBody(
const String & namespace_name,
const String & table_name,
Poco::JSON::Object::Ptr new_snapshot);

class RestCatalog : public ICatalog, public DB::WithContext
{
public:
Expand Down
Loading
Loading