From 0a279831aafce8dcfbb6d40d2eca6a2d7cabf8e8 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 9 Feb 2026 09:33:36 -0300 Subject: [PATCH] add setting to define filename pattern for part exports --- src/Core/Settings.cpp | 3 + ...portReplicatedMergeTreePartitionManifest.h | 3 + src/Storages/MergeTree/ExportPartTask.cpp | 26 ++++- .../ExportPartitionTaskScheduler.cpp | 1 + src/Storages/StorageReplicatedMergeTree.cpp | 2 + .../configs/macros_shard1_replica1.xml | 6 + .../configs/macros_shard2_replica1.xml | 6 + .../test.py | 107 ++++++++++++++++++ ...merge_tree_part_filename_pattern.reference | 16 +++ ...export_merge_tree_part_filename_pattern.sh | 49 ++++++++ 10 files changed, 217 insertions(+), 2 deletions(-) create mode 100644 tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard1_replica1.xml create mode 100644 tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard2_replica1.xml create mode 100644 tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.reference create mode 100755 tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.sh diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 9ab038620e0d..8c662b02d046 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6931,6 +6931,9 @@ This is not a hard limit, and it highly depends on the output format granularity )", 0) \ DECLARE(Bool, serialize_string_in_memory_with_zero_byte, true, R"( Serialize String values during aggregation with zero byte at the end. Enable to keep compatibility when querying cluster of incompatible versions. +)", 0) \ + DECLARE(String, export_merge_tree_part_filename_pattern, "{part_name}_{checksum}", R"( +Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported. )", 0) \ \ /* ####################################################### */ \ diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index 7490775cbccc..65bcebc0da3e 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -115,6 +115,7 @@ struct ExportReplicatedMergeTreePartitionManifest size_t max_bytes_per_file; size_t max_rows_per_file; MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy; + String filename_pattern; bool lock_inside_the_task; /// todo temporary std::string toJsonString() const @@ -137,6 +138,7 @@ struct ExportReplicatedMergeTreePartitionManifest json.set("max_bytes_per_file", max_bytes_per_file); json.set("max_rows_per_file", max_rows_per_file); json.set("file_already_exists_policy", String(magic_enum::enum_name(file_already_exists_policy))); + json.set("filename_pattern", filename_pattern); json.set("create_time", create_time); json.set("max_retries", max_retries); json.set("ttl_seconds", ttl_seconds); @@ -179,6 +181,7 @@ struct ExportReplicatedMergeTreePartitionManifest manifest.max_bytes_per_file = json->getValue("max_bytes_per_file"); manifest.max_rows_per_file = json->getValue("max_rows_per_file"); + manifest.filename_pattern = json->getValue("filename_pattern"); manifest.lock_inside_the_task = json->getValue("lock_inside_the_task"); diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index 3b8d9c85c5ed..416d28576280 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -3,10 +3,12 @@ #include #include #include -#include #include #include +#include +#include #include +#include #include #include #include @@ -43,6 +45,7 @@ namespace Setting extern const SettingsUInt64 min_bytes_to_use_direct_io; extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file; extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file; + extern const SettingsString export_merge_tree_part_filename_pattern; } namespace @@ -80,6 +83,23 @@ namespace plan_for_part.addStep(std::move(expression_step)); } } + + String buildDestinationFilename( + const MergeTreePartExportManifest & manifest, + const StorageID & storage_id, + const ContextPtr & local_context) + { + auto filename = manifest.settings[Setting::export_merge_tree_part_filename_pattern].value; + + boost::replace_all(filename, "{part_name}", manifest.data_part->name); + boost::replace_all(filename, "{checksum}", manifest.data_part->checksums.getTotalChecksumHex()); + + Macros::MacroExpansionInfo macro_info; + macro_info.table_id = storage_id; + filename = local_context->getMacros()->expand(filename, macro_info); + + return filename; + } } ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_) @@ -147,8 +167,10 @@ bool ExportPartTask::executeStep() try { + const auto filename = buildDestinationFilename(manifest, storage.getStorageID(), local_context); + sink = destination_storage->import( - manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), + filename, block_with_partition_values, new_file_path_callback, manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite, diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index 3beb67e1968f..c08aee556579 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -51,6 +51,7 @@ namespace context_copy->setSetting("export_merge_tree_part_file_already_exists_policy", String(magic_enum::enum_name(manifest.file_already_exists_policy))); context_copy->setSetting("export_merge_tree_part_max_bytes_per_file", manifest.max_bytes_per_file); context_copy->setSetting("export_merge_tree_part_max_rows_per_file", manifest.max_rows_per_file); + context_copy->setSetting("export_merge_tree_part_filename_pattern", manifest.filename_pattern); return context_copy; } } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 6302d954fa22..7470b4412b9f 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -212,6 +212,7 @@ namespace Setting extern const SettingsBool export_merge_tree_partition_lock_inside_the_task; extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file; extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file; + extern const SettingsString export_merge_tree_part_filename_pattern; } namespace MergeTreeSetting @@ -8146,6 +8147,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & manifest.max_rows_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_rows_per_file]; manifest.file_already_exists_policy = query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value; + manifest.filename_pattern = query_context->getSettingsRef()[Setting::export_merge_tree_part_filename_pattern].value; ops.emplace_back(zkutil::makeCreateRequest( fs::path(partition_exports_path) / "metadata.json", diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard1_replica1.xml b/tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard1_replica1.xml new file mode 100644 index 000000000000..bae1ce119255 --- /dev/null +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard1_replica1.xml @@ -0,0 +1,6 @@ + + + shard1 + replica1 + + diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard2_replica1.xml b/tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard2_replica1.xml new file mode 100644 index 000000000000..fb9a587e736d --- /dev/null +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard2_replica1.xml @@ -0,0 +1,6 @@ + + + shard2 + replica1 + + diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index 2a35e2d8bb73..57417197644b 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -104,6 +104,26 @@ def cluster(): with_zookeeper=True, keeper_required_feature_flags=["multi_read"], ) + # Sharded instances for filename pattern tests + cluster.add_instance( + "shard1_replica1", + main_configs=["configs/named_collections.xml", "configs/allow_experimental_export_partition.xml", "configs/macros_shard1_replica1.xml"], + user_configs=["configs/users.d/profile.xml"], + with_minio=True, + stay_alive=True, + with_zookeeper=True, + keeper_required_feature_flags=["multi_read"], + ) + + cluster.add_instance( + "shard2_replica1", + main_configs=["configs/named_collections.xml", "configs/allow_experimental_export_partition.xml", "configs/macros_shard2_replica1.xml"], + user_configs=["configs/users.d/profile.xml"], + with_minio=True, + stay_alive=True, + with_zookeeper=True, + keeper_required_feature_flags=["multi_read"], + ) logging.info("Starting cluster...") cluster.start() yield cluster @@ -122,6 +142,14 @@ def create_tables_and_insert_data(node, mt_table, s3_table, replica_name): create_s3_table(node, s3_table) +def create_sharded_tables_and_insert_data(node, mt_table, s3_table, replica_name): + """Create sharded ReplicatedMergeTree table with {shard} macro in ZooKeeper path.""" + node.query(f"CREATE TABLE {mt_table} (id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{{shard}}/{mt_table}', '{replica_name}') PARTITION BY year ORDER BY tuple()") + node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)") + + create_s3_table(node, s3_table) + + def test_restart_nodes_during_export(cluster): node = cluster.instances["replica1"] node2 = cluster.instances["replica2"] @@ -817,3 +845,82 @@ def test_export_partition_with_mixed_computed_columns(cluster): AND partition_id = '1' """) assert status.strip() == "COMPLETED", f"Expected COMPLETED status, got: {status}" + + +def test_sharded_export_partition_with_filename_pattern(cluster): + """Test that export partition with filename pattern prevents collisions in sharded setup.""" + shard1_r1 = cluster.instances["shard1_replica1"] + shard2_r1 = cluster.instances["shard2_replica1"] + watcher_node = cluster.instances["watcher_node"] + + mt_table = "sharded_mt_table" + s3_table = "sharded_s3_table" + + # Create sharded tables on all shards with same partition data (same part names) + # Each shard uses different ZooKeeper path via {shard} macro + create_sharded_tables_and_insert_data(shard1_r1, mt_table, s3_table, "replica1") + create_sharded_tables_and_insert_data(shard2_r1, mt_table, s3_table, "replica1") + create_s3_table(watcher_node, s3_table) + + # Export partition from both shards with filename pattern including shard + # This should prevent filename collisions + shard1_r1.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} " + f"SETTINGS export_merge_tree_part_filename_pattern = '{{part_name}}_{{shard}}_{{replica}}_{{checksum}}'" + ) + shard2_r1.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} " + f"SETTINGS export_merge_tree_part_filename_pattern = '{{part_name}}_{{shard}}_{{replica}}_{{checksum}}'" + ) + + # Wait for exports to complete + wait_for_export_status(shard1_r1, mt_table, s3_table, "2020", "COMPLETED") + wait_for_export_status(shard2_r1, mt_table, s3_table, "2020", "COMPLETED") + + total_count = watcher_node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip() + assert total_count == "6", f"Expected 6 total rows (3 from each shard), got {total_count}" + + # Verify filenames contain shard information (check via S3 directly) + # Get all files from S3 - query from watcher_node since S3 is shared + files_shard1 = watcher_node.query( + f"SELECT _file FROM s3(s3_conn, filename='{s3_table}/**', format='One') WHERE _file LIKE '%shard1%' LIMIT 1" + ).strip() + files_shard2 = watcher_node.query( + f"SELECT _file FROM s3(s3_conn, filename='{s3_table}/**', format='One') WHERE _file LIKE '%shard2%' LIMIT 1" + ).strip() + + # Both shards should have files with their shard names + assert "shard1" in files_shard1 or files_shard1 == "", f"Expected shard1 in filenames, got: {files_shard1}" + assert "shard2" in files_shard2 or files_shard2 == "", f"Expected shard2 in filenames, got: {files_shard2}" + + +def test_sharded_export_partition_default_pattern(cluster): + shard1_r1 = cluster.instances["shard1_replica1"] + shard2_r1 = cluster.instances["shard2_replica1"] + watcher_node = cluster.instances["watcher_node"] + + mt_table = "sharded_mt_table_default" + s3_table = "sharded_s3_table_default" + + # Create sharded tables with different ZooKeeper paths per shard + create_sharded_tables_and_insert_data(shard1_r1, mt_table, s3_table, "replica1") + create_sharded_tables_and_insert_data(shard2_r1, mt_table, s3_table, "replica1") + create_s3_table(watcher_node, s3_table) + + # Export with default pattern ({part_name}_{checksum}) - may cause collisions if parts have same name and the same checksum + shard1_r1.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}" + ) + shard2_r1.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}" + ) + + wait_for_export_status(shard1_r1, mt_table, s3_table, "2020", "COMPLETED") + wait_for_export_status(shard2_r1, mt_table, s3_table, "2020", "COMPLETED") + + # Both exports should complete (even if there are collisions, the overwrite policy handles it) + # S3 tables are shared, so query from watcher_node + total_count = watcher_node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip() + + # only one file with 3 rows should be present + assert int(total_count) == 3, f"Expected 3 rows, got {total_count}" diff --git a/tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.reference b/tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.reference new file mode 100644 index 000000000000..8016f5aa113e --- /dev/null +++ b/tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.reference @@ -0,0 +1,16 @@ +---- Test: Default pattern {part_name}_{checksum} +1 2020 +2 2020 +3 2020 +---- Verify filename matches 2020_1_1_0_*.1.parquet +1 +---- Test: Custom prefix pattern +4 2021 +---- Verify filename matches myprefix_2021_2_2_0.1.parquet +1 +---- Test: Pattern with macros +1 2020 +2 2020 +3 2020 +---- Verify macros expanded (no literal braces in parquet filenames, that's the best we can do for stateless tests) +1 diff --git a/tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.sh b/tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.sh new file mode 100755 index 000000000000..12b47f4f2664 --- /dev/null +++ b/tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.sh @@ -0,0 +1,49 @@ +#!/usr/bin/env bash +# Tags: no-fasttest +# Tag no-fasttest: requires s3 storage + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +R=$RANDOM +mt="mt_${R}" +dest1="fp_dest1_${R}" +dest2="fp_dest2_${R}" +dest3="fp_dest3_${R}" + +query() { + $CLICKHOUSE_CLIENT --query "$1" +} + +query "DROP TABLE IF EXISTS $mt, $dest1, $dest2, $dest3" + +query "CREATE TABLE $mt (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()" +query "INSERT INTO $mt VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)" + +query "CREATE TABLE $dest1 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dest1', format=Parquet, partition_strategy='hive') PARTITION BY year" +query "CREATE TABLE $dest2 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dest2', format=Parquet, partition_strategy='hive') PARTITION BY year" +query "CREATE TABLE $dest3 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dest3', format=Parquet, partition_strategy='hive') PARTITION BY year" + +echo "---- Test: Default pattern {part_name}_{checksum}" +query "ALTER TABLE $mt EXPORT PART '2020_1_1_0' TO TABLE $dest1 SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_filename_pattern = '{part_name}_{checksum}'" +sleep 3 +query "SELECT * FROM $dest1 ORDER BY id" +echo "---- Verify filename matches 2020_1_1_0_*.1.parquet" +query "SELECT count() FROM s3(s3_conn, filename='$dest1/**/2020_1_1_0_*.1.parquet', format='One')" + +echo "---- Test: Custom prefix pattern" +query "ALTER TABLE $mt EXPORT PART '2021_2_2_0' TO TABLE $dest2 SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_filename_pattern = 'myprefix_{part_name}'" +sleep 3 +query "SELECT * FROM $dest2 ORDER BY id" +echo "---- Verify filename matches myprefix_2021_2_2_0.1.parquet" +query "SELECT count() FROM s3(s3_conn, filename='$dest2/**/myprefix_2021_2_2_0.1.parquet', format='One')" + +echo "---- Test: Pattern with macros" +query "ALTER TABLE $mt EXPORT PART '2020_1_1_0' TO TABLE $dest3 SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_filename_pattern = '{database}_{table}_{part_name}'" +sleep 3 +query "SELECT * FROM $dest3 ORDER BY id" +echo "---- Verify macros expanded (no literal braces in parquet filenames, that's the best we can do for stateless tests)" +query "SELECT count() = 0 FROM s3(s3_conn, filename='$dest3/**/*.1.parquet', format='One') WHERE _file LIKE '%{%'" + +query "DROP TABLE IF EXISTS $mt, $dest1, $dest2, $dest3"