Skip to content

Commit 3addd5f

Browse files
authored
Merge pull request #1579 from Altinity/backports/antalya-26.1/96620
Antalya 26.1 Backport of ClickHouse#96620 - Iceberg partitioing fix
2 parents 84f4a41 + dd35d60 commit 3addd5f

2 files changed

Lines changed: 39 additions & 7 deletions

File tree

src/Storages/ObjectStorage/DataLakes/Iceberg/ChunkPartitioner.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ ChunkPartitioner::partitionChunk(const Chunk & chunk)
9393
}
9494

9595
std::vector<ChunkPartitioner::PartitionKey> transform_results(chunk.getNumRows());
96+
ColumnRawPtrs raw_columns;
97+
Columns functions_columns;
9698
for (size_t transform_ind = 0; transform_ind < functions.size(); ++transform_ind)
9799
{
98100
ColumnsWithTypeAndName arguments;
@@ -115,6 +117,8 @@ ChunkPartitioner::partitionChunk(const Chunk & chunk)
115117
}
116118
auto result
117119
= functions[transform_ind]->build(arguments)->execute(arguments, std::make_shared<DataTypeString>(), chunk.getNumRows(), false);
120+
functions_columns.push_back(result);
121+
raw_columns.push_back(result.get());
118122
for (size_t i = 0; i < chunk.getNumRows(); ++i)
119123
{
120124
Field field;
@@ -127,12 +131,9 @@ ChunkPartitioner::partitionChunk(const Chunk & chunk)
127131
{
128132
return transform_results[row_num];
129133
};
130-
134+
std::vector<std::pair<ChunkPartitioner::PartitionKey, Chunk>> result;
131135
PODArray<size_t> partition_num_to_first_row;
132136
IColumn::Selector selector;
133-
ColumnRawPtrs raw_columns;
134-
for (const auto & column : chunk.getColumns())
135-
raw_columns.push_back(column.get());
136137

137138
buildScatterSelector(raw_columns, partition_num_to_first_row, selector, 0, Context::getGlobalContextInstance());
138139

@@ -164,9 +165,6 @@ ChunkPartitioner::partitionChunk(const Chunk & chunk)
164165
result_columns[0].second[col] = chunk.getColumns()[col]->cloneFinalized();
165166
}
166167
}
167-
168-
std::vector<std::pair<ChunkPartitioner::PartitionKey, Chunk>> result;
169-
result.reserve(result_columns.size());
170168
for (auto && [key, partition_columns] : result_columns)
171169
{
172170
size_t column_size = partition_columns[0]->size();

tests/integration/test_storage_iceberg_with_spark/test_writes_with_partitioned_table.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,37 @@ def execute_spark_query(query: str):
8383

8484
df = spark.read.format("iceberg").load(f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}").collect()
8585
assert len(df) == 10
86+
87+
88+
@pytest.mark.parametrize("format_version", ["2"])
89+
@pytest.mark.parametrize("storage_type", ["s3"])
90+
def test_writes_with_partitioned_table_count_partitions(started_cluster_iceberg_with_spark, format_version, storage_type):
91+
instance = started_cluster_iceberg_with_spark.instances["node1"]
92+
93+
TABLE_NAME = "test_writes_with_partitioned_table_count_partitions_" + storage_type + "_" + get_uuid_str()
94+
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark, "(y String)", partition_by="(icebergBucket(16, y))", format_version=format_version)
95+
values = ""
96+
num_rows = 1000
97+
target = []
98+
for i in range(num_rows):
99+
values += f"({i})"
100+
target.append(str(i))
101+
if i != num_rows - 1:
102+
values += ","
103+
target = '\n'.join(sorted(target))
104+
instance.query(f"INSERT INTO {TABLE_NAME} VALUES {values};", settings={"allow_experimental_insert_into_iceberg": 1})
105+
assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == target + "\n"
106+
107+
files = default_download_directory(
108+
started_cluster_iceberg_with_spark,
109+
storage_type,
110+
f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}/",
111+
f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}/",
112+
)
113+
114+
num_pq_files = 0
115+
for file in files:
116+
if file[-7:] == 'parquet':
117+
num_pq_files += 1
118+
119+
assert num_pq_files == 16

0 commit comments

Comments
 (0)