From 9f00f379edbefb7ce6102ec74a032126b7976775 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 18 Mar 2026 17:17:16 +0100 Subject: [PATCH 1/4] Iceberg type 'time' support --- src/Processors/Formats/Impl/AvroRowInputFormat.cpp | 14 +++++++++++++- .../DataLakes/Iceberg/IcebergWrites.cpp | 4 ++++ .../DataLakes/Iceberg/SchemaProcessor.cpp | 2 +- .../ObjectStorage/DataLakes/Iceberg/Utils.cpp | 1 + 4 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index f3b98e73e60f..91c4e73e31ca 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -156,6 +156,9 @@ static void insertNumber(IColumn & column, WhichDataType type, T value) case TypeIndex::DateTime64: assert_cast &>(column).insertValue(static_cast(value)); break; + case TypeIndex::Time64: + assert_cast &>(column).insertValue(static_cast(value)); + break; case TypeIndex::IPv4: assert_cast(column).insertValue(IPv4(static_cast(value))); break; @@ -304,6 +307,8 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(const avro return createDecimalDeserializeFn(root_node, target_type, false); if (target.isDateTime64()) return createDecimalDeserializeFn(root_node, target_type, false); + if (target.isTime64()) + return createDecimalDeserializeFn(root_node, target_type, false); break; case avro::AVRO_INT: if (target_type->isValueRepresentedByNumber()) @@ -1283,8 +1288,11 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node) { case avro::Type::AVRO_INT: { - if (node->logicalType().type() == avro::LogicalType::DATE) + auto logical_type = node->logicalType(); + if (logical_type.type() == avro::LogicalType::DATE) return {std::make_shared()}; + if (logical_type.type() == avro::LogicalType::TIME_MILLIS) + return {std::make_shared(3)}; return {std::make_shared()}; } @@ -1295,6 +1303,10 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node) return {std::make_shared(3)}; if (logical_type.type() == avro::LogicalType::TIMESTAMP_MICROS) return {std::make_shared(6)}; + if (logical_type.type() == avro::LogicalType::TIME_MILLIS) + return {std::make_shared(3)}; + if (logical_type.type() == avro::LogicalType::TIME_MICROS) + return {std::make_shared(6)}; return std::make_shared(); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index c2d5a0c0c5bc..070a96d2b6c6 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -129,6 +129,8 @@ bool canDumpIcebergStats(const Field & field, DataTypePtr type) case TypeIndex::Date32: case TypeIndex::Int64: case TypeIndex::DateTime64: + case TypeIndex::Time: + case TypeIndex::Time64: case TypeIndex::String: return true; default: @@ -157,6 +159,8 @@ std::vector dumpFieldToBytes(const Field & field, DataTypePtr type) case TypeIndex::Int64: return dumpValue(field.safeGet()); case TypeIndex::DateTime64: + case TypeIndex::Time: + case TypeIndex::Time64: return dumpValue(field.safeGet().getValue().value); case TypeIndex::String: { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp index 4efa9b46615a..dda2050c9024 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp @@ -243,7 +243,7 @@ DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name, Cont if (type_name == f_date) return std::make_shared(); if (type_name == f_time) - return std::make_shared(); + return std::make_shared(6); if (type_name == f_timestamp) return std::make_shared(6); if (type_name == f_timestamptz) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 050d4f70b7c1..f785e1ca40af 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -402,6 +402,7 @@ std::pair getIcebergType(DataTypePtr type, Int32 & ite case TypeIndex::DateTime64: return {"timestamp", true}; case TypeIndex::Time: + case TypeIndex::Time64: return {"time", true}; case TypeIndex::String: return {"string", true}; From bb896c3fca79f564c20850b526ed60f9024f3e46 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 19 Mar 2026 17:03:56 +0100 Subject: [PATCH 2/4] Fix tests --- .../integration/test_database_iceberg/test.py | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 701b3bc6e8d8..3fcfc22d43d5 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -25,6 +25,7 @@ NestedField, StringType, StructType, + TimeType, TimestampType, TimestamptzType ) @@ -1015,3 +1016,95 @@ def _test_cluster_joins(started_cluster): ) assert res == "Jack\tBlack\nJack\tSilver\nJohn\tBlack\nJohn\tSilver\n" + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_partitioning_by_time(started_cluster, storage_type): + node = started_cluster.instances["node1"] + + test_ref = f"test_partitioning_by_time_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + namespace = f"{root_namespace}.A" + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(namespace) + + schema = Schema( + NestedField( + field_id=1, + name="key", + field_type=TimeType(), + required=False + ), + NestedField( + field_id=2, + name="value", + field_type=StringType(), + required=False, + ), + ) + + partition_spec = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_key" + ) + ) + + table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec) + data = [{"key": dtime(12,0,0), "value": "test"}] + df = pa.Table.from_pylist(data) + table.append(df) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "12:00:00.000000\ttest\n" + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_partitioning_by_string(started_cluster, storage_type): + node = started_cluster.instances["node1"] + + test_ref = f"test_partitioning_by_string_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + namespace = f"{root_namespace}.A" + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(namespace) + + schema = Schema( + NestedField( + field_id=1, + name="key", + field_type=StringType(), + required=False + ), + NestedField( + field_id=2, + name="value", + field_type=StringType(), + required=False, + ), + NestedField( + field_id=3, + name="time_value", + field_type=TimeType(), + required=False, + ), + ) + + partition_spec = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_key" + ) + ) + + table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec) + data = [{"key": "a:b,c[d=e/f%g?h", "value": "test", "time_value": dtime(12,0,0)}] + df = pa.Table.from_pylist(data) + table.append(df) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "a:b,c[d=e/f%g?h\ttest\t12:00:00.000000\n" From 28ba40b84d145f9264afd3bbc2f4b5478cfb4306 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 19 Mar 2026 17:26:54 +0100 Subject: [PATCH 3/4] Fix --- .../ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index 070a96d2b6c6..f92216bc9ca7 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -157,10 +157,10 @@ std::vector dumpFieldToBytes(const Field & field, DataTypePtr type) case TypeIndex::Date32: return dumpValue(field.safeGet()); case TypeIndex::Int64: - return dumpValue(field.safeGet()); - case TypeIndex::DateTime64: case TypeIndex::Time: + return dumpValue(field.safeGet()); case TypeIndex::Time64: + case TypeIndex::DateTime64: return dumpValue(field.safeGet().getValue().value); case TypeIndex::String: { From f0e1038e8324ad6a6b915138a3e6c697431f972b Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 2 Apr 2026 13:28:07 +0200 Subject: [PATCH 4/4] Fix test --- tests/integration/test_database_iceberg/test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 3fcfc22d43d5..9e5451215510 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -5,7 +5,7 @@ import random import time import uuid -from datetime import datetime, timedelta +from datetime import datetime, timedelta, time as dtime import pyarrow as pa import pytest @@ -25,9 +25,9 @@ NestedField, StringType, StructType, - TimeType, TimestampType, - TimestamptzType + TimestamptzType, + TimeType, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER