Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6931,6 +6931,9 @@ This is not a hard limit, and it highly depends on the output format granularity
)", 0) \
DECLARE(Bool, serialize_string_in_memory_with_zero_byte, true, R"(
Serialize String values during aggregation with zero byte at the end. Enable to keep compatibility when querying cluster of incompatible versions.
)", 0) \
DECLARE(String, export_merge_tree_part_filename_pattern, "{part_name}_{checksum}", R"(
Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported.
)", 0) \
\
/* ####################################################### */ \
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ struct ExportReplicatedMergeTreePartitionManifest
size_t max_bytes_per_file;
size_t max_rows_per_file;
MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy;
String filename_pattern;
bool lock_inside_the_task; /// todo temporary

std::string toJsonString() const
Expand All @@ -137,6 +138,7 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("max_bytes_per_file", max_bytes_per_file);
json.set("max_rows_per_file", max_rows_per_file);
json.set("file_already_exists_policy", String(magic_enum::enum_name(file_already_exists_policy)));
json.set("filename_pattern", filename_pattern);
json.set("create_time", create_time);
json.set("max_retries", max_retries);
json.set("ttl_seconds", ttl_seconds);
Expand Down Expand Up @@ -179,6 +181,7 @@ struct ExportReplicatedMergeTreePartitionManifest

manifest.max_bytes_per_file = json->getValue<size_t>("max_bytes_per_file");
manifest.max_rows_per_file = json->getValue<size_t>("max_rows_per_file");
manifest.filename_pattern = json->getValue<String>("filename_pattern");

manifest.lock_inside_the_task = json->getValue<bool>("lock_inside_the_task");

Expand Down
26 changes: 24 additions & 2 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Core/Settings.h>
#include <Common/Macros.h>
#include <boost/algorithm/string/replace.hpp>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
Expand Down Expand Up @@ -43,6 +45,7 @@ namespace Setting
extern const SettingsUInt64 min_bytes_to_use_direct_io;
extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file;
extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file;
extern const SettingsString export_merge_tree_part_filename_pattern;
}

namespace
Expand Down Expand Up @@ -80,6 +83,23 @@ namespace
plan_for_part.addStep(std::move(expression_step));
}
}

String buildDestinationFilename(
const MergeTreePartExportManifest & manifest,
const StorageID & storage_id,
const ContextPtr & local_context)
{
auto filename = manifest.settings[Setting::export_merge_tree_part_filename_pattern].value;

boost::replace_all(filename, "{part_name}", manifest.data_part->name);
boost::replace_all(filename, "{checksum}", manifest.data_part->checksums.getTotalChecksumHex());

Macros::MacroExpansionInfo macro_info;
macro_info.table_id = storage_id;
filename = local_context->getMacros()->expand(filename, macro_info);

return filename;
}
}

ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_)
Expand Down Expand Up @@ -147,8 +167,10 @@ bool ExportPartTask::executeStep()

try
{
const auto filename = buildDestinationFilename(manifest, storage.getStorageID(), local_context);

sink = destination_storage->import(
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),
filename,
block_with_partition_values,
new_file_path_callback,
manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite,
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ namespace
context_copy->setSetting("export_merge_tree_part_file_already_exists_policy", String(magic_enum::enum_name(manifest.file_already_exists_policy)));
context_copy->setSetting("export_merge_tree_part_max_bytes_per_file", manifest.max_bytes_per_file);
context_copy->setSetting("export_merge_tree_part_max_rows_per_file", manifest.max_rows_per_file);
context_copy->setSetting("export_merge_tree_part_filename_pattern", manifest.filename_pattern);
return context_copy;
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ namespace Setting
extern const SettingsBool export_merge_tree_partition_lock_inside_the_task;
extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file;
extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file;
extern const SettingsString export_merge_tree_part_filename_pattern;
}

namespace MergeTreeSetting
Expand Down Expand Up @@ -8146,6 +8147,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
manifest.max_rows_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_rows_per_file];

manifest.file_already_exists_policy = query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value;
manifest.filename_pattern = query_context->getSettingsRef()[Setting::export_merge_tree_part_filename_pattern].value;

ops.emplace_back(zkutil::makeCreateRequest(
fs::path(partition_exports_path) / "metadata.json",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<clickhouse>
<macros>
<shard>shard1</shard>
<replica>replica1</replica>
</macros>
</clickhouse>
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<clickhouse>
<macros>
<shard>shard2</shard>
<replica>replica1</replica>
</macros>
</clickhouse>
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,26 @@ def cluster():
with_zookeeper=True,
keeper_required_feature_flags=["multi_read"],
)
# Sharded instances for filename pattern tests
cluster.add_instance(
"shard1_replica1",
main_configs=["configs/named_collections.xml", "configs/allow_experimental_export_partition.xml", "configs/macros_shard1_replica1.xml"],
user_configs=["configs/users.d/profile.xml"],
with_minio=True,
stay_alive=True,
with_zookeeper=True,
keeper_required_feature_flags=["multi_read"],
)

cluster.add_instance(
"shard2_replica1",
main_configs=["configs/named_collections.xml", "configs/allow_experimental_export_partition.xml", "configs/macros_shard2_replica1.xml"],
user_configs=["configs/users.d/profile.xml"],
with_minio=True,
stay_alive=True,
with_zookeeper=True,
keeper_required_feature_flags=["multi_read"],
)
logging.info("Starting cluster...")
cluster.start()
yield cluster
Expand All @@ -122,6 +142,14 @@ def create_tables_and_insert_data(node, mt_table, s3_table, replica_name):
create_s3_table(node, s3_table)


def create_sharded_tables_and_insert_data(node, mt_table, s3_table, replica_name):
"""Create sharded ReplicatedMergeTree table with {shard} macro in ZooKeeper path."""
node.query(f"CREATE TABLE {mt_table} (id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{{shard}}/{mt_table}', '{replica_name}') PARTITION BY year ORDER BY tuple()")
node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)")

create_s3_table(node, s3_table)


def test_restart_nodes_during_export(cluster):
node = cluster.instances["replica1"]
node2 = cluster.instances["replica2"]
Expand Down Expand Up @@ -817,3 +845,82 @@ def test_export_partition_with_mixed_computed_columns(cluster):
AND partition_id = '1'
""")
assert status.strip() == "COMPLETED", f"Expected COMPLETED status, got: {status}"


def test_sharded_export_partition_with_filename_pattern(cluster):
"""Test that export partition with filename pattern prevents collisions in sharded setup."""
shard1_r1 = cluster.instances["shard1_replica1"]
shard2_r1 = cluster.instances["shard2_replica1"]
watcher_node = cluster.instances["watcher_node"]

mt_table = "sharded_mt_table"
s3_table = "sharded_s3_table"

# Create sharded tables on all shards with same partition data (same part names)
# Each shard uses different ZooKeeper path via {shard} macro
create_sharded_tables_and_insert_data(shard1_r1, mt_table, s3_table, "replica1")
create_sharded_tables_and_insert_data(shard2_r1, mt_table, s3_table, "replica1")
create_s3_table(watcher_node, s3_table)

# Export partition from both shards with filename pattern including shard
# This should prevent filename collisions
shard1_r1.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} "
f"SETTINGS export_merge_tree_part_filename_pattern = '{{part_name}}_{{shard}}_{{replica}}_{{checksum}}'"
)
shard2_r1.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} "
f"SETTINGS export_merge_tree_part_filename_pattern = '{{part_name}}_{{shard}}_{{replica}}_{{checksum}}'"
)

# Wait for exports to complete
wait_for_export_status(shard1_r1, mt_table, s3_table, "2020", "COMPLETED")
wait_for_export_status(shard2_r1, mt_table, s3_table, "2020", "COMPLETED")

total_count = watcher_node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip()
assert total_count == "6", f"Expected 6 total rows (3 from each shard), got {total_count}"

# Verify filenames contain shard information (check via S3 directly)
# Get all files from S3 - query from watcher_node since S3 is shared
files_shard1 = watcher_node.query(
f"SELECT _file FROM s3(s3_conn, filename='{s3_table}/**', format='One') WHERE _file LIKE '%shard1%' LIMIT 1"
).strip()
files_shard2 = watcher_node.query(
f"SELECT _file FROM s3(s3_conn, filename='{s3_table}/**', format='One') WHERE _file LIKE '%shard2%' LIMIT 1"
).strip()

# Both shards should have files with their shard names
assert "shard1" in files_shard1 or files_shard1 == "", f"Expected shard1 in filenames, got: {files_shard1}"
assert "shard2" in files_shard2 or files_shard2 == "", f"Expected shard2 in filenames, got: {files_shard2}"


def test_sharded_export_partition_default_pattern(cluster):
shard1_r1 = cluster.instances["shard1_replica1"]
shard2_r1 = cluster.instances["shard2_replica1"]
watcher_node = cluster.instances["watcher_node"]

mt_table = "sharded_mt_table_default"
s3_table = "sharded_s3_table_default"

# Create sharded tables with different ZooKeeper paths per shard
create_sharded_tables_and_insert_data(shard1_r1, mt_table, s3_table, "replica1")
create_sharded_tables_and_insert_data(shard2_r1, mt_table, s3_table, "replica1")
create_s3_table(watcher_node, s3_table)

# Export with default pattern ({part_name}_{checksum}) - may cause collisions if parts have same name and the same checksum
shard1_r1.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}"
)
shard2_r1.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}"
)

wait_for_export_status(shard1_r1, mt_table, s3_table, "2020", "COMPLETED")
wait_for_export_status(shard2_r1, mt_table, s3_table, "2020", "COMPLETED")

# Both exports should complete (even if there are collisions, the overwrite policy handles it)
# S3 tables are shared, so query from watcher_node
total_count = watcher_node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip()

# only one file with 3 rows should be present
assert int(total_count) == 3, f"Expected 3 rows, got {total_count}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---- Test: Default pattern {part_name}_{checksum}
1 2020
2 2020
3 2020
---- Verify filename matches 2020_1_1_0_*.1.parquet
1
---- Test: Custom prefix pattern
4 2021
---- Verify filename matches myprefix_2021_2_2_0.1.parquet
1
---- Test: Pattern with macros
1 2020
2 2020
3 2020
---- Verify macros expanded (no literal braces in parquet filenames, that's the best we can do for stateless tests)
1
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env bash
# Tags: no-fasttest
# Tag no-fasttest: requires s3 storage

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

R=$RANDOM
mt="mt_${R}"
dest1="fp_dest1_${R}"
dest2="fp_dest2_${R}"
dest3="fp_dest3_${R}"

query() {
$CLICKHOUSE_CLIENT --query "$1"
}

query "DROP TABLE IF EXISTS $mt, $dest1, $dest2, $dest3"

query "CREATE TABLE $mt (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()"
query "INSERT INTO $mt VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)"

query "CREATE TABLE $dest1 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dest1', format=Parquet, partition_strategy='hive') PARTITION BY year"
query "CREATE TABLE $dest2 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dest2', format=Parquet, partition_strategy='hive') PARTITION BY year"
query "CREATE TABLE $dest3 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dest3', format=Parquet, partition_strategy='hive') PARTITION BY year"

echo "---- Test: Default pattern {part_name}_{checksum}"
query "ALTER TABLE $mt EXPORT PART '2020_1_1_0' TO TABLE $dest1 SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_filename_pattern = '{part_name}_{checksum}'"
sleep 3
query "SELECT * FROM $dest1 ORDER BY id"
echo "---- Verify filename matches 2020_1_1_0_*.1.parquet"
query "SELECT count() FROM s3(s3_conn, filename='$dest1/**/2020_1_1_0_*.1.parquet', format='One')"

echo "---- Test: Custom prefix pattern"
query "ALTER TABLE $mt EXPORT PART '2021_2_2_0' TO TABLE $dest2 SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_filename_pattern = 'myprefix_{part_name}'"
sleep 3
query "SELECT * FROM $dest2 ORDER BY id"
echo "---- Verify filename matches myprefix_2021_2_2_0.1.parquet"
query "SELECT count() FROM s3(s3_conn, filename='$dest2/**/myprefix_2021_2_2_0.1.parquet', format='One')"

echo "---- Test: Pattern with macros"
query "ALTER TABLE $mt EXPORT PART '2020_1_1_0' TO TABLE $dest3 SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_filename_pattern = '{database}_{table}_{part_name}'"
sleep 3
query "SELECT * FROM $dest3 ORDER BY id"
echo "---- Verify macros expanded (no literal braces in parquet filenames, that's the best we can do for stateless tests)"
query "SELECT count() = 0 FROM s3(s3_conn, filename='$dest3/**/*.1.parquet', format='One') WHERE _file LIKE '%{%'"

query "DROP TABLE IF EXISTS $mt, $dest1, $dest2, $dest3"