From 2430ba5e7ba6d5a831ef5b074fbf1b1dc9664fcc Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Fri, 10 Oct 2025 23:19:12 +0200 Subject: [PATCH 1/5] 26.1 Antalya port: fixes for s3Cluster distributed calls --- src/Core/Settings.cpp | 16 + src/Core/Settings.h | 1 + src/Core/SettingsChangesHistory.cpp | 5 + src/Core/SettingsEnums.cpp | 5 + src/Core/SettingsEnums.h | 10 + .../ClusterProxy/executeQuery.cpp | 1 + src/Interpreters/Context.cpp | 5 +- src/Interpreters/InterpreterSelectQuery.cpp | 19 ++ src/Planner/Planner.cpp | 28 ++ src/Planner/PlannerJoinTree.cpp | 2 + src/Processors/QueryPlan/ObjectFilterStep.cpp | 63 ++++ src/Processors/QueryPlan/ObjectFilterStep.h | 35 ++ .../optimizePrimaryKeyConditionAndLimit.cpp | 5 + .../QueryPlan/QueryPlanStepRegistry.cpp | 2 + src/Processors/QueryPlan/ReadFromRemote.cpp | 9 +- src/Processors/QueryPlan/ReadFromRemote.h | 2 + src/QueryPipeline/RemoteQueryExecutor.cpp | 11 +- src/QueryPipeline/RemoteQueryExecutor.h | 7 + src/Storages/IStorageCluster.cpp | 269 ++++++++++++--- src/Storages/IStorageCluster.h | 50 +++ .../StorageObjectStorageCluster.cpp | 2 +- .../StorageObjectStorageCluster.h | 1 - .../extractTableFunctionFromSelectQuery.cpp | 26 +- .../extractTableFunctionFromSelectQuery.h | 3 + tests/integration/test_s3_cluster/test.py | 307 +++++++++++++++++- .../test_cluster_table_function.py | 26 ++ .../03550_analyzer_remote_view_columns.sql | 2 +- 27 files changed, 852 insertions(+), 60 deletions(-) create mode 100644 src/Processors/QueryPlan/ObjectFilterStep.cpp create mode 100644 src/Processors/QueryPlan/ObjectFilterStep.h diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index e72653747d54..93c4708379f3 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -1914,6 +1914,22 @@ Possible values: - `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` - `allow` — Allows the use of these types of subqueries. )", IMPORTANT) \ + DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"( +Changes the behaviour of object storage cluster function or table. + +ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table. + +Restrictions: + +- Only applied for JOIN subqueries. +- Only if the FROM section uses a object storage cluster function ot table. + +Possible values: + +- `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.` +- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` +- `allow` — Default value. Allows the use of these types of subqueries. +)", 0) \ \ DECLARE(UInt64, max_concurrent_queries_for_all_users, 0, R"( Throw exception if the value of this setting is less or equal than the current number of simultaneously processed queries. diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 4a80702f3a87..cc70cafa817c 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -60,6 +60,7 @@ class WriteBuffer; M(CLASS_NAME, DistributedCachePoolBehaviourOnLimit) /* Cloud only */ \ M(CLASS_NAME, DistributedDDLOutputMode) \ M(CLASS_NAME, DistributedProductMode) \ + M(CLASS_NAME, ObjectStorageClusterJoinMode) \ M(CLASS_NAME, Double) \ M(CLASS_NAME, EscapingRule) \ M(CLASS_NAME, Float) \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index e8686e2a43ea..def54c7b272f 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -220,6 +220,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"os_threads_nice_value_query", 0, 0, "New setting."}, {"os_threads_nice_value_materialized_view", 0, 0, "New setting."}, {"os_thread_priority", 0, 0, "Alias for os_threads_nice_value_query."}, + {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, + {"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."}, + {"object_storage_cluster", "", "", "Antalya: New setting"}, + {"object_storage_max_nodes", 0, 0, "Antalya: New setting"}, + {"allow_retries_in_cluster_requests", false, false, "Antalya: New setting"}, }); addSettingsChanges(settings_changes_history, "25.8", { diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index c3fea7d48a48..127b2587b5ea 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -96,6 +96,11 @@ IMPLEMENT_SETTING_ENUM(DistributedProductMode, ErrorCodes::UNKNOWN_DISTRIBUTED_P {"global", DistributedProductMode::GLOBAL}, {"allow", DistributedProductMode::ALLOW}}) +IMPLEMENT_SETTING_ENUM(ObjectStorageClusterJoinMode, ErrorCodes::BAD_ARGUMENTS, + {{"local", ObjectStorageClusterJoinMode::LOCAL}, + {"global", ObjectStorageClusterJoinMode::GLOBAL}, + {"allow", ObjectStorageClusterJoinMode::ALLOW}}) + IMPLEMENT_SETTING_ENUM(QueryResultCacheNondeterministicFunctionHandling, ErrorCodes::BAD_ARGUMENTS, {{"throw", QueryResultCacheNondeterministicFunctionHandling::Throw}, diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index aab55857d2a6..c62048d1f0b4 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -164,6 +164,16 @@ enum class DistributedProductMode : uint8_t DECLARE_SETTING_ENUM(DistributedProductMode) +/// The setting for executing object storage cluster function ot table JOIN sections. +enum class ObjectStorageClusterJoinMode : uint8_t +{ + LOCAL, /// Convert to local query + GLOBAL, /// Convert to global query + ALLOW /// Enable +}; + +DECLARE_SETTING_ENUM(ObjectStorageClusterJoinMode) + /// How the query result cache handles queries with non-deterministic functions, e.g. now() enum class QueryResultCacheNondeterministicFunctionHandling : uint8_t { diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index cc3a1e2955b5..afc6d49ef028 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -463,6 +463,7 @@ void executeQuery( not_optimized_cluster->getName()); read_from_remote->setStepDescription("Read from remote replica"); + read_from_remote->setIsRemoteFunction(is_remote_function); plan->addStep(std::move(read_from_remote)); plan->addInterpreterContext(new_context); plans.emplace_back(std::move(plan)); diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 82756929cce9..6ff32deaddcd 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -3151,8 +3151,11 @@ void Context::setCurrentQueryId(const String & query_id) client_info.current_query_id = query_id_to_set; - if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY) + if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY + && (getApplicationType() != ApplicationType::SERVER || client_info.initial_query_id.empty())) + { client_info.initial_query_id = client_info.current_query_id; + } } void Context::setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType background_operation) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index a298a3357311..23982cc61e7d 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -79,6 +79,7 @@ #include #include #include +#include #include #include #include @@ -91,6 +92,7 @@ #include #include #include +#include #include #include @@ -208,6 +210,7 @@ namespace Setting extern const SettingsBool enable_producing_buckets_out_of_order_in_aggregation; extern const SettingsBool enable_lazy_columns_replication; extern const SettingsBool serialize_string_in_memory_with_zero_byte; + extern const SettingsBool use_hive_partitioning; } namespace ServerSetting @@ -2046,6 +2049,22 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

(query_plan.getRootNode()->step.get())) + { + auto object_filter_step = std::make_unique( + query_plan.getCurrentHeader(), + expressions.before_where->dag.clone(), + getSelectQuery().where()->getColumnName()); + + object_filter_step->setStepDescription("WHERE"); + query_plan.addStep(std::move(object_filter_step)); + } + } + if (from_aggregation_stage) { /// No need to aggregate anything, since this was done on remote shards. diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp index b34780c4b5d9..7c47ace1d42b 100644 --- a/src/Planner/Planner.cpp +++ b/src/Planner/Planner.cpp @@ -41,6 +41,7 @@ #include #include #include +#include #include #include @@ -54,6 +55,7 @@ #include #include #include +#include #include @@ -145,6 +147,7 @@ namespace Setting extern const SettingsBool serialize_string_in_memory_with_zero_byte; extern const SettingsString temporary_files_codec; extern const SettingsNonZeroUInt64 temporary_files_buffer_size; + extern const SettingsBool use_hive_partitioning; } namespace ServerSetting @@ -547,6 +550,21 @@ ALWAYS_INLINE void addFilterStep( query_plan.addStep(std::move(where_step)); } +template +ALWAYS_INLINE void addObjectFilterStep( + QueryPlan & query_plan, + FilterAnalysisResult & filter_analysis_result, + const char (&step_description)[size]) +{ + auto actions = std::move(filter_analysis_result.filter_actions->dag); + + auto where_step = std::make_unique(query_plan.getCurrentHeader(), + std::move(actions), + filter_analysis_result.filter_column_name); + where_step->setStepDescription(step_description); + query_plan.addStep(std::move(where_step)); +} + Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context, const AggregationAnalysisResult & aggregation_analysis_result, const QueryAnalysisResult & query_analysis_result, @@ -2016,6 +2034,16 @@ void Planner::buildPlanForQueryNode() if (query_processing_info.isSecondStage() || query_processing_info.isFromAggregationState()) { + if (settings[Setting::use_hive_partitioning] + && !query_processing_info.isFirstStage() + && expression_analysis_result.hasWhere()) + { + if (typeid_cast(query_plan.getRootNode()->step.get())) + { + addObjectFilterStep(query_plan, expression_analysis_result.getWhere(), "WHERE"); + } + } + if (query_processing_info.isFromAggregationState()) { /// Aggregation was performed on remote shards diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index dd7c0c72fc9f..7410ea1f1522 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -1450,6 +1450,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres /// Hopefully there is no other case when we read from Distributed up to FetchColumns. if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns) updated_actions_dag_outputs.push_back(output_node); + else if (table_function_node && table_function_node->getStorage()->isRemote()) + updated_actions_dag_outputs.push_back(output_node); } else updated_actions_dag_outputs.push_back(&rename_actions_dag.addAlias(*output_node, *column_identifier)); diff --git a/src/Processors/QueryPlan/ObjectFilterStep.cpp b/src/Processors/QueryPlan/ObjectFilterStep.cpp new file mode 100644 index 000000000000..a635aee729c7 --- /dev/null +++ b/src/Processors/QueryPlan/ObjectFilterStep.cpp @@ -0,0 +1,63 @@ +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int INCORRECT_DATA; +} + +ObjectFilterStep::ObjectFilterStep( + SharedHeader input_header_, + ActionsDAG actions_dag_, + String filter_column_name_) + : actions_dag(std::move(actions_dag_)) + , filter_column_name(std::move(filter_column_name_)) +{ + input_headers.emplace_back(input_header_); + output_header = input_headers.front(); +} + +QueryPipelineBuilderPtr ObjectFilterStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & /* settings */) +{ + return std::move(pipelines.front()); +} + +void ObjectFilterStep::updateOutputHeader() +{ + output_header = input_headers.front(); +} + +void ObjectFilterStep::serialize(Serialization & ctx) const +{ + writeStringBinary(filter_column_name, ctx.out); + + actions_dag.serialize(ctx.out, ctx.registry); +} + +std::unique_ptr ObjectFilterStep::deserialize(Deserialization & ctx) +{ + if (ctx.input_headers.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "ObjectFilterStep must have one input stream"); + + String filter_column_name; + readStringBinary(filter_column_name, ctx.in); + + ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context); + + return std::make_unique(ctx.input_headers.front(), std::move(actions_dag), std::move(filter_column_name)); +} + +void registerObjectFilterStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("ObjectFilter", ObjectFilterStep::deserialize); +} + +} diff --git a/src/Processors/QueryPlan/ObjectFilterStep.h b/src/Processors/QueryPlan/ObjectFilterStep.h new file mode 100644 index 000000000000..ef35d20068ba --- /dev/null +++ b/src/Processors/QueryPlan/ObjectFilterStep.h @@ -0,0 +1,35 @@ +#pragma once +#include +#include + +namespace DB +{ + +/// Implements WHERE operation. +class ObjectFilterStep : public IQueryPlanStep +{ +public: + ObjectFilterStep( + SharedHeader input_header_, + ActionsDAG actions_dag_, + String filter_column_name_); + + String getName() const override { return "ObjectFilter"; } + QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) override; + + const ActionsDAG & getExpression() const { return actions_dag; } + ActionsDAG & getExpression() { return actions_dag; } + const String & getFilterColumnName() const { return filter_column_name; } + + void serialize(Serialization & ctx) const override; + + static std::unique_ptr deserialize(Deserialization & ctx); + +private: + void updateOutputHeader() override; + + ActionsDAG actions_dag; + String filter_column_name; +}; + +} diff --git a/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp b/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp index f8c8c83171f5..850451c39a0a 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB::QueryPlanOptimizations { @@ -40,6 +41,10 @@ void optimizePrimaryKeyConditionAndLimit(const Stack & stack) /// So this is likely not needed. continue; } + else if (auto * object_filter_step = typeid_cast(iter->node->step.get())) + { + source_step_with_filter->addFilter(object_filter_step->getExpression().clone(), object_filter_step->getFilterColumnName()); + } else { break; diff --git a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp index 97868db191d2..b600c80147dd 100644 --- a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp +++ b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp @@ -54,6 +54,7 @@ void registerFilterStep(QueryPlanStepRegistry & registry); void registerTotalsHavingStep(QueryPlanStepRegistry & registry); void registerExtremesStep(QueryPlanStepRegistry & registry); void registerJoinStep(QueryPlanStepRegistry & registry); +void registerObjectFilterStep(QueryPlanStepRegistry & registry); void registerReadFromTableStep(QueryPlanStepRegistry & registry); void registerReadFromTableFunctionStep(QueryPlanStepRegistry & registry); @@ -85,6 +86,7 @@ void QueryPlanStepRegistry::registerPlanSteps() registerReadFromTableStep(registry); registerReadFromTableFunctionStep(registry); registerBuildRuntimeFilterStep(registry); + registerObjectFilterStep(registry); } } diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index b2e6eea1068c..0c923fc3117e 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -534,7 +534,8 @@ void ReadFromRemote::addLazyPipe( my_stage = stage, my_storage = storage, add_agg_info, add_totals, add_extremes, async_read, async_query_sending, query_tree = shard.query_tree, planner_context = shard.planner_context, - pushed_down_filters, parallel_marshalling_threads]() mutable + pushed_down_filters, parallel_marshalling_threads, + my_is_remote_function = is_remote_function]() mutable -> QueryPipelineBuilder { auto current_settings = my_context->getSettingsRef(); @@ -630,6 +631,8 @@ void ReadFromRemote::addLazyPipe( {DataTypeUInt32().createColumnConst(1, my_shard.shard_info.shard_num), std::make_shared(), "_shard_num"}}; auto remote_query_executor = std::make_shared( std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, stage_to_use, my_shard.query_plan); + remote_query_executor->setRemoteFunction(my_is_remote_function); + remote_query_executor->setShardCount(my_shard_count); auto pipe = createRemoteSourcePipe( remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending, parallel_marshalling_threads); @@ -722,6 +725,8 @@ void ReadFromRemote::addPipe( priority_func); remote_query_executor->setLogger(log); remote_query_executor->setPoolMode(PoolMode::GET_ONE); + remote_query_executor->setRemoteFunction(is_remote_function); + remote_query_executor->setShardCount(shard_count); if (!table_func_ptr) remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table); @@ -742,6 +747,8 @@ void ReadFromRemote::addPipe( auto remote_query_executor = std::make_shared( shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage_to_use, shard.query_plan); remote_query_executor->setLogger(log); + remote_query_executor->setRemoteFunction(is_remote_function); + remote_query_executor->setShardCount(shard_count); if (context->canUseTaskBasedParallelReplicas() || parallel_replicas_disabled) { diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h index d9bc56a5d9f1..bed0e55c52ac 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.h +++ b/src/Processors/QueryPlan/ReadFromRemote.h @@ -45,6 +45,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase void enableMemoryBoundMerging(); void enforceAggregationInOrder(const SortDescription & sort_description); + void setIsRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } bool hasSerializedPlan() const; @@ -62,6 +63,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase UInt32 shard_count; const String cluster_name; std::optional priority_func_factory; + bool is_remote_function = false; Pipes addPipes(const ClusterProxy::SelectStreamFactory::Shards & used_shards, const SharedHeader & out_header); diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 8e2e0f0d3079..53b4f80cd0f4 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -406,7 +406,16 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); ClientInfo modified_client_info = context->getClientInfo(); - modified_client_info.query_kind = query_kind; + + /// Doesn't support now "remote('1.1.1.{1,2}')"" + if (is_remote_function && (shard_count == 1)) + { + modified_client_info.setInitialQuery(); + modified_client_info.client_name = "ClickHouse server"; + modified_client_info.interface = ClientInfo::Interface::TCP; + } + else + modified_client_info.query_kind = query_kind; if (extension) modified_client_info.collaborate_with_initiator = true; diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 790a406b542d..b1678109902f 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -209,6 +209,10 @@ class RemoteQueryExecutor void setLogger(LoggerPtr logger) { log = logger; } + void setRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } + + void setShardCount(UInt32 shard_count_) { shard_count = shard_count_; } + const Block & getHeader() const { return *header; } const SharedHeader & getSharedHeader() const { return header; } @@ -305,6 +309,9 @@ class RemoteQueryExecutor bool packet_in_progress = false; #endif + bool is_remote_function = false; + UInt32 shard_count = 0; + /// Parts uuids, collected from remote replicas std::vector duplicated_part_uuids; diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index c6c69c0f21bc..607831f13c5a 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -13,13 +13,18 @@ #include #include #include -#include #include #include #include #include #include #include +#include +#include +#include +#include +#include +#include #include #include @@ -37,6 +42,12 @@ namespace Setting extern const SettingsBool parallel_replicas_local_plan; extern const SettingsString cluster_for_parallel_replicas; extern const SettingsNonZeroUInt64 max_parallel_replicas; + extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode; +} + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; } namespace ErrorCodes @@ -54,51 +65,6 @@ IStorageCluster::IStorageCluster( { } -class ReadFromCluster : public SourceStepWithFilter -{ -public: - std::string getName() const override { return "ReadFromCluster"; } - void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; - void applyFilters(ActionDAGNodes added_filter_nodes) override; - - ReadFromCluster( - const Names & column_names_, - const SelectQueryInfo & query_info_, - const StorageSnapshotPtr & storage_snapshot_, - const ContextPtr & context_, - SharedHeader sample_block, - std::shared_ptr storage_, - ASTPtr query_to_send_, - QueryProcessingStage::Enum processed_stage_, - ClusterPtr cluster_, - LoggerPtr log_) - : SourceStepWithFilter( - std::move(sample_block), - column_names_, - query_info_, - storage_snapshot_, - context_) - , storage(std::move(storage_)) - , query_to_send(std::move(query_to_send_)) - , processed_stage(processed_stage_) - , cluster(std::move(cluster_)) - , log(log_) - { - } - -private: - std::shared_ptr storage; - ASTPtr query_to_send; - QueryProcessingStage::Enum processed_stage; - ClusterPtr cluster; - LoggerPtr log; - - std::optional extension; - - void createExtension(const ActionsDAG::Node * predicate); - ContextPtr updateSettings(const Settings & settings); -}; - void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes) { SourceStepWithFilter::applyFilters(std::move(added_filter_nodes)); @@ -123,6 +89,175 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) getStorageSnapshot()->metadata); } +namespace +{ + +/* +Helping class to find in query tree first node of required type +*/ +class SearcherVisitor : public InDepthQueryTreeVisitorWithContext +{ +public: + using Base = InDepthQueryTreeVisitorWithContext; + using Base::Base; + + explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {} + + bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/) + { + return !passed_node; + } + + void enterImpl(QueryTreeNodePtr & node) + { + if (passed_node) + return; + + auto node_type = node->getNodeType(); + + if (node_type == type) + passed_node = node; + } + + QueryTreeNodePtr getNode() const { return passed_node; } + +private: + QueryTreeNodeType type; + QueryTreeNodePtr passed_node; +}; + +/* +Helping class to find all used columns with specific source +*/ +class CollectUsedColumnsForSourceVisitor : public InDepthQueryTreeVisitorWithContext +{ +public: + using Base = InDepthQueryTreeVisitorWithContext; + using Base::Base; + + explicit CollectUsedColumnsForSourceVisitor( + QueryTreeNodePtr source_, + ContextPtr context, + bool collect_columns_from_other_sources_ = false) + : Base(context) + , source(source_) + , collect_columns_from_other_sources(collect_columns_from_other_sources_) + {} + + void enterImpl(QueryTreeNodePtr & node) + { + auto node_type = node->getNodeType(); + + if (node_type != QueryTreeNodeType::COLUMN) + return; + + auto & column_node = node->as(); + auto column_source = column_node.getColumnSourceOrNull(); + if (!column_source) + return; + + if ((column_source == source) != collect_columns_from_other_sources) + { + const auto & name = column_node.getColumnName(); + if (!names.count(name)) + { + columns.emplace_back(column_node.getColumn()); + names.insert(name); + } + } + } + + const NamesAndTypes & getColumns() const { return columns; } + +private: + std::unordered_set names; + QueryTreeNodePtr source; + NamesAndTypes columns; + bool collect_columns_from_other_sources; +}; + +}; + +/* +Try to make subquery to send on nodes +Converts + + SELECT s3.c1, s3.c2, t.c3 + FROM + s3Cluster(...) AS s3 + JOIN + localtable as t + ON s3.key == t.key + +to + + SELECT s3.c1, s3.c2, s3.key + FROM + s3Cluster(...) AS s3 +*/ +void IStorageCluster::updateQueryWithJoinToSendIfNeeded( + ASTPtr & query_to_send, + QueryTreeNodePtr query_tree, + const ContextPtr & context) +{ + auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; + switch (object_storage_cluster_join_mode) + { + case ObjectStorageClusterJoinMode::LOCAL: + { + auto modified_query_tree = query_tree->clone(); + bool need_modify = false; + + SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); + table_function_searcher.visit(query_tree); + auto table_function_node = table_function_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + + if (has_join) + { + auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send); + auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context); + auto & table_function_ast = table_function->as(); + query_tree_distributed->setAlias(table_function_ast.alias); + + // Find add used columns from table function to make proper projection list + CollectUsedColumnsForSourceVisitor collector(table_function_node, context); + collector.visit(query_tree); + const auto & columns = collector.getColumns(); + + auto & query_node = modified_query_tree->as(); + query_node.resolveProjectionColumns(columns); + auto column_nodes_to_select = std::make_shared(); + column_nodes_to_select->getNodes().reserve(columns.size()); + for (auto & column : columns) + column_nodes_to_select->getNodes().emplace_back(std::make_shared(column, table_function_node)); + query_node.getProjectionNode() = column_nodes_to_select; + + // Left only table function to send on cluster nodes + modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + + need_modify = true; + } + + if (has_local_columns_in_where) + { + auto & query_node = modified_query_tree->as(); + query_node.getWhere() = {}; + } + + if (need_modify) + query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); + return; + } + case ObjectStorageClusterJoinMode::GLOBAL: + // TODO + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now"); + case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special + return; + } +} + /// The code executes on initiator void IStorageCluster::read( QueryPlan & query_plan, @@ -146,13 +281,15 @@ void IStorageCluster::read( SharedHeader sample_block; ASTPtr query_to_send = query_info.query; + updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context); + if (context->getSettingsRef()[Setting::allow_experimental_analyzer]) { - sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage)); + sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_to_send, context, SelectQueryOptions(processed_stage)); } else { - auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()); + auto interpreter = InterpreterSelectQuery(query_to_send, context, SelectQueryOptions(processed_stage).analyze()); sample_block = interpreter.getSampleBlock(); query_to_send = interpreter.getQueryInfo().query->clone(); } @@ -160,7 +297,7 @@ void IStorageCluster::read( updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context); RestoreQualifiedNamesVisitor::Data data; - data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as(), 0)); + data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as(), 0)); data.remote_table.database = context->getCurrentDatabase(); data.remote_table.table = getName(); RestoreQualifiedNamesVisitor(data).visit(query_to_send); @@ -255,8 +392,40 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const } QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( - ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const + ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const { + auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; + + if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW) + { + if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); + + SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context); + join_searcher.visit(query_info.query_tree); + if (join_searcher.getNode()) + has_join = true; + + SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); + table_function_searcher.visit(query_info.query_tree); + auto table_function_node = table_function_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + + CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); + auto & query_node = query_info.query_tree->as(); + if (query_node.hasWhere()) + collector_where.visit(query_node.getWhere()); + + // Can't use 'WHERE' on remote node if it contains columns from other sources + if (!collector_where.getColumns().empty()) + has_local_columns_in_where = true; + + if (has_join || has_local_columns_in_where) + return QueryProcessingStage::Enum::FetchColumns; + } + /// Initiator executes query on remote node. if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY) if (to_stage >= QueryProcessingStage::Enum::WithMergeableState) diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 3248b26b8c5e..3df691ffc4c3 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB { @@ -54,13 +55,62 @@ class IStorageCluster : public IStorage protected: virtual void updateBeforeRead(const ContextPtr &) {} virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} + void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context); virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {} private: LoggerPtr log; String cluster_name; + + mutable bool has_join = false; + mutable bool has_local_columns_in_where = false; }; +class ReadFromCluster : public SourceStepWithFilter +{ +public: + std::string getName() const override { return "ReadFromCluster"; } + void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; + void applyFilters(ActionDAGNodes added_filter_nodes) override; + + ReadFromCluster( + const Names & column_names_, + const SelectQueryInfo & query_info_, + const StorageSnapshotPtr & storage_snapshot_, + const ContextPtr & context_, + SharedHeader sample_block, + std::shared_ptr storage_, + ASTPtr query_to_send_, + QueryProcessingStage::Enum processed_stage_, + ClusterPtr cluster_, + LoggerPtr log_) + : SourceStepWithFilter( + std::move(sample_block), + column_names_, + query_info_, + storage_snapshot_, + context_) + , storage(std::move(storage_)) + , query_to_send(std::move(query_to_send_)) + , processed_stage(processed_stage_) + , cluster(std::move(cluster_)) + , log(log_) + { + } + +private: + std::shared_ptr storage; + ASTPtr query_to_send; + QueryProcessingStage::Enum processed_stage; + ClusterPtr cluster; + LoggerPtr log; + + std::optional extension; + + void createExtension(const ActionsDAG::Node * predicate); + ContextPtr updateSettings(const Settings & settings); +}; + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 4611aa7cb271..1ea0d9a164e8 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -234,7 +234,7 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten local_context, predicate, filter, - virtual_columns, + getVirtualsList(), hive_partition_columns_to_read_from_file_path, nullptr, local_context->getFileProgressCallback(), diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 45509cbf09a3..9d8afa668d9b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -48,7 +48,6 @@ class StorageObjectStorageCluster : public IStorageCluster const String engine_name; const StorageObjectStorageConfigurationPtr configuration; const ObjectStoragePtr object_storage; - NamesAndTypesList virtual_columns; NamesAndTypesList hive_partition_columns_to_read_from_file_path; bool update_configuration_on_read_write; }; diff --git a/src/Storages/extractTableFunctionFromSelectQuery.cpp b/src/Storages/extractTableFunctionFromSelectQuery.cpp index 57302036c889..8477798b62b1 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionFromSelectQuery.cpp @@ -9,7 +9,7 @@ namespace DB { -ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query) { auto * select_query = query->as(); if (!select_query || !select_query->tables()) @@ -17,10 +17,30 @@ ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) auto * tables = select_query->tables()->as(); auto * table_expression = tables->children[0]->as()->table_expression->as(); - if (!table_expression->table_function) + return table_expression; +} + +ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query) +{ + auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query); + return table_expression ? table_expression->table_function : nullptr; +} + +ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +{ + auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query); + if (!table_function_ast) return nullptr; - return table_expression->table_function->as(); + return table_function_ast->as(); +} + +ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query) +{ + auto * table_function = extractTableFunctionFromSelectQuery(query); + if (!table_function) + return nullptr; + return table_function->arguments->as(); } } diff --git a/src/Storages/extractTableFunctionFromSelectQuery.h b/src/Storages/extractTableFunctionFromSelectQuery.h index c69cc7ce6c52..609be0e4f985 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.h +++ b/src/Storages/extractTableFunctionFromSelectQuery.h @@ -6,7 +6,10 @@ namespace DB { +struct ASTTableExpression; +ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query); +ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query); ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); } diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 76b8f0df2881..54351b894a0c 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -2,7 +2,7 @@ import logging import os import shutil -import time +import uuid from email.errors import HeaderParseError import pytest @@ -509,3 +509,308 @@ def test_cluster_default_expression(started_cluster): ) assert result == expected_result + + +def test_remote_hedged(started_cluster): + node = started_cluster.instances["s0_0_0"] + pure_s3 = node.query( + f""" + SELECT * from s3( + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', + 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + ORDER BY (name, value, polygon) + LIMIT 1 + """ + ) + s3_distributed = node.query( + f""" + SELECT * from remote('s0_0_1', s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')) + ORDER BY (name, value, polygon) + LIMIT 1 + SETTINGS use_hedged_requests=True + """ + ) + + assert TSV(pure_s3) == TSV(s3_distributed) + + +def test_remote_no_hedged(started_cluster): + node = started_cluster.instances["s0_0_0"] + pure_s3 = node.query( + f""" + SELECT * from s3( + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', + 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + ORDER BY (name, value, polygon) + LIMIT 1 + """ + ) + s3_distributed = node.query( + f""" + SELECT * from remote('s0_0_1', s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')) + ORDER BY (name, value, polygon) + LIMIT 1 + SETTINGS use_hedged_requests=False + """ + ) + + assert TSV(pure_s3) == TSV(s3_distributed) + + +@pytest.mark.parametrize("allow_experimental_analyzer", [0, 1]) +def test_hive_partitioning(started_cluster, allow_experimental_analyzer): + node = started_cluster.instances["s0_0_0"] + + node.query(f"SET allow_experimental_analyzer = {allow_experimental_analyzer}") + + for i in range(1, 5): + exists = node.query( + f""" + SELECT + count() + FROM s3('http://minio1:9001/root/data/hive/key={i}/*', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + GROUP BY ALL + FORMAT TSV + """ + ) + if int(exists) == 0: + node.query( + f""" + INSERT + INTO FUNCTION s3('http://minio1:9001/root/data/hive/key={i}/data.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + SELECT {i}, {i} + SETTINGS use_hive_partitioning = 0 + """ + ) + + query_id_full = str(uuid.uuid4()) + result = node.query( + f""" + SELECT count() + FROM s3('http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + WHERE key <= 2 + FORMAT TSV + SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 0 + """, + query_id=query_id_full, + ) + result = int(result) + assert result == 2 + + query_id_optimized = str(uuid.uuid4()) + result = node.query( + f""" + SELECT count() + FROM s3('http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + WHERE key <= 2 + FORMAT TSV + SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 1 + """, + query_id=query_id_optimized, + ) + result = int(result) + assert result == 2 + + query_id_cluster_full = str(uuid.uuid4()) + result = node.query( + f""" + SELECT count() + FROM s3Cluster(cluster_simple, 'http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + WHERE key <= 2 + FORMAT TSV + SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 0 + """, + query_id=query_id_cluster_full, + ) + result = int(result) + assert result == 2 + + query_id_cluster_optimized = str(uuid.uuid4()) + result = node.query( + f""" + SELECT count() + FROM s3Cluster(cluster_simple, 'http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + WHERE key <= 2 + FORMAT TSV + SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 1 + """, + query_id=query_id_cluster_optimized, + ) + result = int(result) + assert result == 2 + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + + full_traffic = node.query( + f""" + SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) + FROM clusterAllReplicas(cluster_simple, system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_full}' + FORMAT TSV + """ + ) + full_traffic = int(full_traffic) + assert full_traffic > 0 # 612*4 + + optimized_traffic = node.query( + f""" + SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) + FROM clusterAllReplicas(cluster_simple, system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_optimized}' + FORMAT TSV + """ + ) + optimized_traffic = int(optimized_traffic) + assert optimized_traffic > 0 # 612*2 + assert full_traffic > optimized_traffic + + cluster_full_traffic = node.query( + f""" + SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) + FROM clusterAllReplicas(cluster_simple, system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_cluster_full}' + FORMAT TSV + """ + ) + cluster_full_traffic = int(cluster_full_traffic) + assert cluster_full_traffic == full_traffic + + cluster_optimized_traffic = node.query( + f""" + SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) + FROM clusterAllReplicas(cluster_simple, system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_cluster_optimized}' + FORMAT TSV + """ + ) + cluster_optimized_traffic = int(cluster_optimized_traffic) + assert cluster_optimized_traffic == optimized_traffic + + node.query("SET allow_experimental_analyzer = DEFAULT") + + +def test_joins(started_cluster): + node = started_cluster.instances["s0_0_0"] + + # Table join_table only exists on the node 's0_0_0'. + node.query( + """ + CREATE TABLE IF NOT EXISTS join_table ( + id UInt32, + name String + ) ENGINE=MergeTree() + ORDER BY id; + """ + ) + + node.query( + f""" + INSERT INTO join_table + SELECT value, concat(name, '_jt') FROM s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))'); + """ + ) + + result1 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + res = list(map(str.split, result1.splitlines())) + assert len(res) == 25 + + for line in res: + if len(line) == 2: + assert line[1] == f"{line[0]}_jt" + else: + assert line == ["_jt"] # for empty name + + result2 = node.query( + f""" + SELECT t1.name, t2.name FROM + join_table AS t2 + JOIN + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + ON t1.value = t2.id + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + assert result1 == result2 + + # With WHERE clause with remote column only + result3 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + WHERE (t1.value % 2) + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + res = list(map(str.split, result3.splitlines())) + assert len(res) == 8 + + # With WHERE clause with local column only + result4 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + WHERE (t2.id % 2) + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + assert result3 == result4 + + # With WHERE clause with local and remote columns + result5 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + WHERE (t1.value % 2) AND ((t2.id % 3) == 2) + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + res = list(map(str.split, result5.splitlines())) + assert len(res) == 6 diff --git a/tests/integration/test_storage_iceberg_with_spark/test_cluster_table_function.py b/tests/integration/test_storage_iceberg_with_spark/test_cluster_table_function.py index 56fc76859d5e..d1bfa0963f8e 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_cluster_table_function.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_cluster_table_function.py @@ -132,6 +132,32 @@ def add_df(mode): # write 3 times assert int(instance.query(f"SELECT count() FROM {table_function_expr_cluster}")) == 100 * 3 + + # Cluster Query with node1 as coordinator + table_function_expr_cluster = get_creation_expression( + storage_type, + TABLE_NAME, + started_cluster_iceberg_with_spark, + table_function=True, + run_on_cluster=True, + ) + select_remote_cluster = ( + instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})") + .strip() + .split() + ) + assert len(select_remote_cluster) == 600 + assert select_remote_cluster == select_regular + + select_remote_cluster = ( + instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})") + .strip() + .split() + ) + assert len(select_remote_cluster) == 600 + assert select_remote_cluster == select_regular + + @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_writes_cluster_table_function(started_cluster_iceberg_with_spark, format_version, storage_type): diff --git a/tests/queries/0_stateless/03550_analyzer_remote_view_columns.sql b/tests/queries/0_stateless/03550_analyzer_remote_view_columns.sql index 8489551a5ae4..e40939554e9b 100644 --- a/tests/queries/0_stateless/03550_analyzer_remote_view_columns.sql +++ b/tests/queries/0_stateless/03550_analyzer_remote_view_columns.sql @@ -39,4 +39,4 @@ WHERE AND log_comment = 'THIS IS A COMMENT TO MARK THE INITIAL QUERY' LIMIT 1) AND type = 'QueryFinish' - AND NOT is_initial_query; + AND query_id != initial_query_id; From 643c59f3e36035c991e057231e5dc9185b1d90d7 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Mon, 17 Nov 2025 22:17:12 +0100 Subject: [PATCH 2/5] Merge pull request #1082 from Altinity/feature/antalya-25.8/s3cluster_global_join_fixes 25.8 Antalya: Fix joins with Iceberg tables --- src/Core/Settings.cpp | 4 +- src/Core/SettingsChangesHistory.cpp | 6 + src/Core/SettingsEnums.h | 2 +- src/Planner/PlannerJoinTree.cpp | 2 +- src/Storages/IStorageCluster.cpp | 148 +++++++++++------ src/Storages/IStorageCluster.h | 10 +- .../extractTableFunctionFromSelectQuery.cpp | 6 + .../extractTableFunctionFromSelectQuery.h | 1 + .../integration/test_database_iceberg/test.py | 142 +++++++++++++++- tests/integration/test_s3_cluster/test.py | 14 ++ .../test_cluster_joins.py | 151 ++++++++++++++++++ 11 files changed, 430 insertions(+), 56 deletions(-) create mode 100644 tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 93c4708379f3..6797ebaeda6a 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -1917,12 +1917,12 @@ Possible values: DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"( Changes the behaviour of object storage cluster function or table. -ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table. +ClickHouse applies this setting when the query contains the product of object storage cluster function or table, i.e. when the query for a object storage cluster function or table contains a non-GLOBAL subquery for the object storage cluster function or table. Restrictions: - Only applied for JOIN subqueries. -- Only if the FROM section uses a object storage cluster function ot table. +- Only if the FROM section uses a object storage cluster function or table. Possible values: diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index def54c7b272f..54317c8327f7 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -225,6 +225,12 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_cluster", "", "", "Antalya: New setting"}, {"object_storage_max_nodes", 0, 0, "Antalya: New setting"}, {"allow_retries_in_cluster_requests", false, false, "Antalya: New setting"}, + {"allow_retries_in_cluster_requests", false, false, "New setting"}, + {"object_storage_remote_initiator", false, false, "New setting."}, + {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, + {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, + {"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."}, + {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."} }); addSettingsChanges(settings_changes_history, "25.8", { diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index c62048d1f0b4..5186237241d8 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -164,7 +164,7 @@ enum class DistributedProductMode : uint8_t DECLARE_SETTING_ENUM(DistributedProductMode) -/// The setting for executing object storage cluster function ot table JOIN sections. +/// The setting for executing object storage cluster function or table JOIN sections. enum class ObjectStorageClusterJoinMode : uint8_t { LOCAL, /// Convert to local query diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 7410ea1f1522..dd16786ca995 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -1448,7 +1448,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres /// Overall, IStorage::read -> FetchColumns returns normal column names (except Distributed, which is inconsistent) /// Interpreter::getQueryPlan -> FetchColumns returns identifiers (why?) and this the reason for the bug ^ in Distributed /// Hopefully there is no other case when we read from Distributed up to FetchColumns. - if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns) + if (table_node && table_node->getStorage()->isRemote()) updated_actions_dag_outputs.push_back(output_node); else if (table_function_node && table_function_node->getStorage()->isRemote()) updated_actions_dag_outputs.push_back(output_node); diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 607831f13c5a..984076c75f4a 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -24,7 +24,9 @@ #include #include #include +#include #include +#include #include #include @@ -101,7 +103,7 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext; using Base::Base; - explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {} + explicit SearcherVisitor(std::unordered_set types_, ContextPtr context) : Base(context), types(types_) {} bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/) { @@ -115,15 +117,20 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContextgetNodeType(); - if (node_type == type) + if (types.contains(node_type)) + { passed_node = node; + passed_type = node_type; + } } QueryTreeNodePtr getNode() const { return passed_node; } + std::optional getType() const { return passed_type; } private: - QueryTreeNodeType type; + std::unordered_set types; QueryTreeNodePtr passed_node; + std::optional passed_type; }; /* @@ -205,28 +212,44 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( { case ObjectStorageClusterJoinMode::LOCAL: { - auto modified_query_tree = query_tree->clone(); - bool need_modify = false; - - SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); - table_function_searcher.visit(query_tree); - auto table_function_node = table_function_searcher.getNode(); - if (!table_function_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + auto info = getQueryTreeInfo(query_tree, context); - if (has_join) + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) { - auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send); - auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context); - auto & table_function_ast = table_function->as(); - query_tree_distributed->setAlias(table_function_ast.alias); + auto modified_query_tree = query_tree->clone(); + + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + left_table_expression_searcher.visit(modified_query_tree); + auto table_function_node = left_table_expression_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + + QueryTreeNodePtr query_tree_distributed; + + auto & query_node = modified_query_tree->as(); + + if (info.has_join) + { + auto join_node = query_node.getJoinTree(); + query_tree_distributed = join_node->as()->getLeftTableExpression()->clone(); + } + else if (info.has_cross_join) + { + SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context); + join_searcher.visit(modified_query_tree); + auto cross_join_node = join_searcher.getNode(); + if (!cross_join_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find CROSS JOIN node"); + // CrossJoinNode contains vector of nodes. 0 is left expression, always exists. + query_tree_distributed = cross_join_node->as()->getTableExpressions()[0]->clone(); + } // Find add used columns from table function to make proper projection list + // Need to do before changing WHERE condition CollectUsedColumnsForSourceVisitor collector(table_function_node, context); - collector.visit(query_tree); + collector.visit(modified_query_tree); const auto & columns = collector.getColumns(); - auto & query_node = modified_query_tree->as(); query_node.resolveProjectionColumns(columns); auto column_nodes_to_select = std::make_shared(); column_nodes_to_select->getNodes().reserve(columns.size()); @@ -234,20 +257,26 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( column_nodes_to_select->getNodes().emplace_back(std::make_shared(column, table_function_node)); query_node.getProjectionNode() = column_nodes_to_select; - // Left only table function to send on cluster nodes - modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + if (info.has_local_columns_in_where) + { + if (query_node.getPrewhere()) + removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getPrewhere(), table_function_node, context); + if (query_node.getWhere()) + removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getWhere(), table_function_node, context); + } - need_modify = true; - } + query_node.getOrderByNode() = std::make_shared(); + query_node.getGroupByNode() = std::make_shared(); - if (has_local_columns_in_where) - { - auto & query_node = modified_query_tree->as(); - query_node.getWhere() = {}; - } + if (query_tree_distributed) + { + // Left only table function to send on cluster nodes + modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + } - if (need_modify) query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); + } + return; } case ObjectStorageClusterJoinMode::GLOBAL: @@ -391,38 +420,59 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const pipeline.init(std::move(pipe)); } -QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( - ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const +IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePtr query_tree, ContextPtr context) { - auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; + QueryTreeInfo info; - if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW) - { - if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) - throw Exception(ErrorCodes::NOT_IMPLEMENTED, - "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); + SearcherVisitor join_searcher({QueryTreeNodeType::JOIN, QueryTreeNodeType::CROSS_JOIN}, context); + join_searcher.visit(query_tree); - SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context); - join_searcher.visit(query_info.query_tree); - if (join_searcher.getNode()) - has_join = true; + if (join_searcher.getNode()) + { + if (join_searcher.getType() == QueryTreeNodeType::JOIN) + info.has_join = true; + else + info.has_cross_join = true; + } - SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); - table_function_searcher.visit(query_info.query_tree); - auto table_function_node = table_function_searcher.getNode(); - if (!table_function_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + left_table_expression_searcher.visit(query_tree); + auto table_function_node = left_table_expression_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node"); + auto & query_node = query_tree->as(); + if (query_node.hasWhere() || query_node.hasPrewhere()) + { CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); - auto & query_node = query_info.query_tree->as(); + if (query_node.hasPrewhere()) + collector_where.visit(query_node.getPrewhere()); if (query_node.hasWhere()) collector_where.visit(query_node.getWhere()); - // Can't use 'WHERE' on remote node if it contains columns from other sources + // SELECT x FROM datalake.table WHERE x IN local.table. + // Need to modify 'WHERE' on remote node if it contains columns from other sources + // because remote node might not have those sources. if (!collector_where.getColumns().empty()) - has_local_columns_in_where = true; + info.has_local_columns_in_where = true; + } + + return info; +} + +QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( + ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const +{ + auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; + + if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW) + { + if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); - if (has_join || has_local_columns_in_where) + auto info = getQueryTreeInfo(query_info.query_tree, context); + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) return QueryProcessingStage::Enum::FetchColumns; } diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 3df691ffc4c3..96964a02dd35 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -63,8 +63,14 @@ class IStorageCluster : public IStorage LoggerPtr log; String cluster_name; - mutable bool has_join = false; - mutable bool has_local_columns_in_where = false; + struct QueryTreeInfo + { + bool has_join = false; + bool has_cross_join = false; + bool has_local_columns_in_where = false; + }; + + static QueryTreeInfo getQueryTreeInfo(QueryTreeNodePtr query_tree, ContextPtr context); }; diff --git a/src/Storages/extractTableFunctionFromSelectQuery.cpp b/src/Storages/extractTableFunctionFromSelectQuery.cpp index 8477798b62b1..064f538eeae7 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionFromSelectQuery.cpp @@ -26,6 +26,12 @@ ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query) return table_expression ? table_expression->table_function : nullptr; } +ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query) +{ + auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query); + return table_expression ? table_expression->database_and_table_name : nullptr; +} + ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) { auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query); diff --git a/src/Storages/extractTableFunctionFromSelectQuery.h b/src/Storages/extractTableFunctionFromSelectQuery.h index 609be0e4f985..20cc1ae93896 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.h +++ b/src/Storages/extractTableFunctionFromSelectQuery.h @@ -10,6 +10,7 @@ struct ASTTableExpression; ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query); ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query); +ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query); ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); } diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 327a8f299bdb..a4c9c8116304 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -14,12 +14,13 @@ import pytz from minio import Minio from pyiceberg.catalog import load_catalog -from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.partitioning import PartitionField, PartitionSpec, UNPARTITIONED_PARTITION_SPEC from pyiceberg.schema import Schema from pyiceberg.table.sorting import SortField, SortOrder from pyiceberg.transforms import DayTransform, IdentityTransform from pyiceberg.types import ( DoubleType, + LongType, FloatType, NestedField, StringType, @@ -27,6 +28,7 @@ TimestampType, TimestamptzType ) +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm from helpers.config_cluster import minio_secret_key, minio_access_key @@ -205,6 +207,7 @@ def started_cluster(): user_configs=[], stay_alive=True, with_iceberg_catalog=True, + with_zookeeper=True, ) logging.info("Starting cluster...") @@ -704,3 +707,140 @@ def test_gcs(started_cluster): """ ) assert "Google cloud storage converts to S3" in str(err.value) + + +def test_cluster_joins(started_cluster): + node = started_cluster.instances["node1"] + + test_ref = f"test_join_tables_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + table_name_2 = f"{test_ref}_table_2" + table_name_local = f"{test_ref}_table_local" + + root_namespace = f"{test_ref}_namespace" + + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(root_namespace) + + schema = Schema( + NestedField( + field_id=1, + name="tag", + field_type=LongType(), + required=False + ), + NestedField( + field_id=2, + name="name", + field_type=StringType(), + required=False, + ), + ) + table = create_table(catalog, root_namespace, table_name, schema, + partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER) + data = [{"tag": 1, "name": "John"}, {"tag": 2, "name": "Jack"}] + df = pa.Table.from_pylist(data) + table.append(df) + + schema2 = Schema( + NestedField( + field_id=1, + name="id", + field_type=LongType(), + required=False + ), + NestedField( + field_id=2, + name="second_name", + field_type=StringType(), + required=False, + ), + ) + table2 = create_table(catalog, root_namespace, table_name_2, schema2, + partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER) + data = [{"id": 1, "second_name": "Dow"}, {"id": 2, "second_name": "Sparrow"}] + df = pa.Table.from_pylist(data) + table2.append(df) + + node.query(f"CREATE TABLE `{table_name_local}` (id Int64, second_name String) ENGINE = Memory()") + node.query(f"INSERT INTO `{table_name_local}` VALUES (1, 'Silver'), (2, 'Black')") + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + res = node.query( + f""" + SELECT t1.name,t2.second_name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 + JOIN {CATALOG_NAME}.`{root_namespace}.{table_name_2}` AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\tSparrow\nJohn\tDow\n" + + res = node.query( + f""" + SELECT name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + WHERE tag in ( + SELECT id + FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\nJohn\n" + + res = node.query( + f""" + SELECT t1.name,t2.second_name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 + JOIN `{table_name_local}` AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\tBlack\nJohn\tSilver\n" + + res = node.query( + f""" + SELECT name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + WHERE tag in ( + SELECT id + FROM `{table_name_local}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\nJohn\n" + + res = node.query( + f""" + SELECT t1.name,t2.second_name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 + CROSS JOIN `{table_name_local}` AS t2 + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\tBlack\nJack\tSilver\nJohn\tBlack\nJohn\tSilver\n" diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 54351b894a0c..b61cc8eeb37c 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -814,3 +814,17 @@ def test_joins(started_cluster): res = list(map(str.split, result5.splitlines())) assert len(res) == 6 + + result6 = node.query( + f""" + SELECT name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + WHERE value IN (SELECT id FROM join_table) + ORDER BY name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + res = list(map(str.split, result6.splitlines())) + assert len(res) == 25 diff --git a/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py b/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py new file mode 100644 index 000000000000..2079c07ef9d2 --- /dev/null +++ b/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py @@ -0,0 +1,151 @@ +import pytest + +from helpers.iceberg_utils import ( + get_uuid_str, + get_creation_expression, + execute_spark_query_general, +) + +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) +def test_cluster_joins(started_cluster_iceberg_with_spark, storage_type): + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = "test_cluster_joins_" + storage_type + "_" + get_uuid_str() + TABLE_NAME_2 = "test_cluster_joins_2_" + storage_type + "_" + get_uuid_str() + TABLE_NAME_LOCAL = "test_cluster_joins_local_" + storage_type + "_" + get_uuid_str() + + def execute_spark_query(query: str, table_name): + return execute_spark_query_general( + spark, + started_cluster_iceberg_with_spark, + storage_type, + table_name, + query, + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} ( + tag INT, + name VARCHAR(50) + ) + USING iceberg + OPTIONS('format-version'='2') + """, TABLE_NAME + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (1, 'john'), + (2, 'jack') + """, TABLE_NAME + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME_2} ( + id INT, + second_name VARCHAR(50) + ) + USING iceberg + OPTIONS('format-version'='2') + """, TABLE_NAME_2 + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME_2} VALUES + (1, 'dow'), + (2, 'sparrow') + """, TABLE_NAME_2 + ) + + creation_expression = get_creation_expression( + storage_type, TABLE_NAME, started_cluster_iceberg_with_spark, table_function=True, run_on_cluster=True + ) + + creation_expression_2 = get_creation_expression( + storage_type, TABLE_NAME_2, started_cluster_iceberg_with_spark, table_function=True, run_on_cluster=True + ) + + instance.query(f"CREATE TABLE `{TABLE_NAME_LOCAL}` (id Int64, second_name String) ENGINE = Memory()") + instance.query(f"INSERT INTO `{TABLE_NAME_LOCAL}` VALUES (1, 'silver'), (2, 'black')") + + res = instance.query( + f""" + SELECT t1.name,t2.second_name + FROM {creation_expression} AS t1 + JOIN {creation_expression_2} AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\tsparrow\njohn\tdow\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag in ( + SELECT id + FROM {creation_expression_2} + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\njohn\n" + + res = instance.query( + f""" + SELECT t1.name,t2.second_name + FROM {creation_expression} AS t1 + JOIN `{TABLE_NAME_LOCAL}` AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\tblack\njohn\tsilver\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag in ( + SELECT id + FROM `{TABLE_NAME_LOCAL}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\njohn\n" + + res = instance.query( + f""" + SELECT t1.name,t2.second_name + FROM {creation_expression} AS t1 + CROSS JOIN `{TABLE_NAME_LOCAL}` AS t2 + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\tblack\njack\tsilver\njohn\tblack\njohn\tsilver\n" From c100cf6c7331b324a554d385fd5460fe4ff15504 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Mon, 24 Nov 2025 15:17:11 +0100 Subject: [PATCH 3/5] Merge pull request #1160 from Altinity/bugfix/antalya-25.8/revert_in_for_cluster_requests Revert support for 'IN' in cluster requests --- src/Storages/IStorageCluster.cpp | 2 +- .../integration/test_database_iceberg/test.py | 67 ++++++++++--------- tests/integration/test_s3_cluster/test.py | 26 +++---- .../test_cluster_joins.py | 66 +++++++++--------- 4 files changed, 83 insertions(+), 78 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 984076c75f4a..7413e624559f 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -472,7 +472,7 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); auto info = getQueryTreeInfo(query_info.query_tree, context); - if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) + if (info.has_join || info.has_cross_join /*|| info.has_local_columns_in_where*/) return QueryProcessingStage::Enum::FetchColumns; } diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index a4c9c8116304..6b30d590c6b6 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -773,6 +773,7 @@ def test_cluster_joins(started_cluster): FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 JOIN {CATALOG_NAME}.`{root_namespace}.{table_name_2}` AS t2 ON t1.tag=t2.id + WHERE t1.tag < 10 AND t2.id < 20 ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', @@ -782,22 +783,22 @@ def test_cluster_joins(started_cluster): assert res == "Jack\tSparrow\nJohn\tDow\n" - res = node.query( - f""" - SELECT name - FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` - WHERE tag in ( - SELECT id - FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}` - ) - ORDER BY ALL - SETTINGS - object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' - """ - ) - - assert res == "Jack\nJohn\n" + #res = node.query( + # f""" + # SELECT name + # FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + # WHERE tag in ( + # SELECT id + # FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}` + # ) + # ORDER BY ALL + # SETTINGS + # object_storage_cluster='cluster_simple', + # object_storage_cluster_join_mode='local' + # """ + #) + + #assert res == "Jack\nJohn\n" res = node.query( f""" @@ -805,6 +806,7 @@ def test_cluster_joins(started_cluster): FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 JOIN `{table_name_local}` AS t2 ON t1.tag=t2.id + WHERE t1.tag < 10 AND t2.id < 20 ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', @@ -814,28 +816,29 @@ def test_cluster_joins(started_cluster): assert res == "Jack\tBlack\nJohn\tSilver\n" - res = node.query( - f""" - SELECT name - FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` - WHERE tag in ( - SELECT id - FROM `{table_name_local}` - ) - ORDER BY ALL - SETTINGS - object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' - """ - ) - - assert res == "Jack\nJohn\n" + #res = node.query( + # f""" + # SELECT name + # FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + # WHERE tag in ( + # SELECT id + # FROM `{table_name_local}` + # ) + # ORDER BY ALL + # SETTINGS + # object_storage_cluster='cluster_simple', + # object_storage_cluster_join_mode='local' + # """ + #) + + #assert res == "Jack\nJohn\n" res = node.query( f""" SELECT t1.name,t2.second_name FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 CROSS JOIN `{table_name_local}` AS t2 + WHERE t1.tag < 10 AND t2.id < 20 ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index b61cc8eeb37c..98ed08eb30a1 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -815,16 +815,16 @@ def test_joins(started_cluster): res = list(map(str.split, result5.splitlines())) assert len(res) == 6 - result6 = node.query( - f""" - SELECT name FROM - s3Cluster('cluster_simple', - 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', - 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') - WHERE value IN (SELECT id FROM join_table) - ORDER BY name - SETTINGS object_storage_cluster_join_mode='local'; - """ - ) - res = list(map(str.split, result6.splitlines())) - assert len(res) == 25 + #result6 = node.query( + # f""" + # SELECT name FROM + # s3Cluster('cluster_simple', + # 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + # 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + # WHERE value IN (SELECT id FROM join_table) + # ORDER BY name + # SETTINGS object_storage_cluster_join_mode='local'; + # """ + #) + #res = list(map(str.split, result6.splitlines())) + #assert len(res) == 25 diff --git a/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py b/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py index 2079c07ef9d2..5352de860281 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py @@ -87,22 +87,22 @@ def execute_spark_query(query: str, table_name): assert res == "jack\tsparrow\njohn\tdow\n" - res = instance.query( - f""" - SELECT name - FROM {creation_expression} - WHERE tag in ( - SELECT id - FROM {creation_expression_2} - ) - ORDER BY ALL - SETTINGS - object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' - """ - ) - - assert res == "jack\njohn\n" + #res = instance.query( + # f""" + # SELECT name + # FROM {creation_expression} + # WHERE tag in ( + # SELECT id + # FROM {creation_expression_2} + # ) + # ORDER BY ALL + # SETTINGS + # object_storage_cluster='cluster_simple', + # object_storage_cluster_join_mode='local' + # """ + #) + + #assert res == "jack\njohn\n" res = instance.query( f""" @@ -110,6 +110,7 @@ def execute_spark_query(query: str, table_name): FROM {creation_expression} AS t1 JOIN `{TABLE_NAME_LOCAL}` AS t2 ON t1.tag=t2.id + WHERE t1.tag < 10 AND t2.id < 20 ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', @@ -119,28 +120,29 @@ def execute_spark_query(query: str, table_name): assert res == "jack\tblack\njohn\tsilver\n" - res = instance.query( - f""" - SELECT name - FROM {creation_expression} - WHERE tag in ( - SELECT id - FROM `{TABLE_NAME_LOCAL}` - ) - ORDER BY ALL - SETTINGS - object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' - """ - ) - - assert res == "jack\njohn\n" + #res = instance.query( + # f""" + # SELECT name + # FROM {creation_expression} + # WHERE tag in ( + # SELECT id + # FROM `{TABLE_NAME_LOCAL}` + # ) + # ORDER BY ALL + # SETTINGS + # object_storage_cluster='cluster_simple', + # object_storage_cluster_join_mode='local' + # """ + #) + + #assert res == "jack\njohn\n" res = instance.query( f""" SELECT t1.name,t2.second_name FROM {creation_expression} AS t1 CROSS JOIN `{TABLE_NAME_LOCAL}` AS t2 + WHERE t1.tag < 10 AND t2.id < 20 ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', From eb56b477a46bb4f6a0f22005a0c16cfb0893971c Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Tue, 9 Dec 2025 15:59:17 +0100 Subject: [PATCH 4/5] Merge pull request #1168 from Altinity/bugfix/antalya-25.8/fix_in_for_cluster_request Fix IN with Iceberg table --- src/Interpreters/PreparedSets.cpp | 28 +++++++- src/Interpreters/PreparedSets.h | 7 ++ src/Storages/IStorageCluster.cpp | 2 +- .../integration/test_database_iceberg/test.py | 64 +++++++++---------- tests/integration/test_s3_cluster/test.py | 27 ++++---- .../test_cluster_joins.py | 64 +++++++++---------- 6 files changed, 111 insertions(+), 81 deletions(-) diff --git a/src/Interpreters/PreparedSets.cpp b/src/Interpreters/PreparedSets.cpp index c5da7e33a252..1810206c1207 100644 --- a/src/Interpreters/PreparedSets.cpp +++ b/src/Interpreters/PreparedSets.cpp @@ -180,6 +180,12 @@ FutureSetFromSubquery::FutureSetFromSubquery( FutureSetFromSubquery::~FutureSetFromSubquery() = default; SetPtr FutureSetFromSubquery::get() const +{ + std::lock_guard lock(mutex); + return get_unsafe(); +} + +SetPtr FutureSetFromSubquery::get_unsafe() const { if (set_and_key->set != nullptr && set_and_key->set->isCreated()) return set_and_key->set; @@ -189,12 +195,15 @@ SetPtr FutureSetFromSubquery::get() const void FutureSetFromSubquery::setQueryPlan(std::unique_ptr source_) { + std::lock_guard lock(mutex); source = std::move(source_); set_and_key->set->setHeader(source->getCurrentHeader()->getColumnsWithTypeAndName()); } void FutureSetFromSubquery::buildExternalTableFromInplaceSet(StoragePtr external_table_) { + std::lock_guard lock(mutex); + const auto & set = *set_and_key->set; LOG_TRACE(getLogger("FutureSetFromSubquery"), "Building external table from set of {} elements", set.getTotalRowCount()); @@ -240,6 +249,8 @@ void FutureSetFromSubquery::buildExternalTableFromInplaceSet(StoragePtr external void FutureSetFromSubquery::setExternalTable(StoragePtr external_table_) { + std::lock_guard lock(mutex); + if (set_and_key->set->isCreated()) { if (!set_and_key->set->hasExplicitSetElements()) @@ -253,12 +264,19 @@ void FutureSetFromSubquery::setExternalTable(StoragePtr external_table_) DataTypes FutureSetFromSubquery::getTypes() const { + std::lock_guard lock(mutex); return set_and_key->set->getElementsTypes(); } FutureSet::Hash FutureSetFromSubquery::getHash() const { return hash; } std::unique_ptr FutureSetFromSubquery::build(const SizeLimits & network_transfer_limits, const PreparedSetsCachePtr & prepared_sets_cache) +{ + std::lock_guard lock(mutex); + return build_unsafe(network_transfer_limits, prepared_sets_cache); +} + +std::unique_ptr FutureSetFromSubquery::build_unsafe(const SizeLimits & network_transfer_limits, const PreparedSetsCachePtr & prepared_sets_cache) { if (set_and_key->set->isCreated()) return nullptr; @@ -280,6 +298,8 @@ std::unique_ptr FutureSetFromSubquery::build(const SizeLimits & netwo void FutureSetFromSubquery::buildSetInplace(const ContextPtr & context) { + std::lock_guard lock(mutex); + if (external_table_set) external_table_set->buildSetInplace(context); @@ -287,7 +307,7 @@ void FutureSetFromSubquery::buildSetInplace(const ContextPtr & context) SizeLimits network_transfer_limits(settings[Setting::max_rows_to_transfer], settings[Setting::max_bytes_to_transfer], settings[Setting::transfer_overflow_mode]); auto prepared_sets_cache = context->getPreparedSetsCache(); - auto plan = build(network_transfer_limits, prepared_sets_cache); + auto plan = build_unsafe(network_transfer_limits, prepared_sets_cache); if (!plan) return; @@ -305,7 +325,9 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context) if (!context->getSettingsRef()[Setting::use_index_for_in_with_subqueries]) return nullptr; - if (auto set = get()) + std::lock_guard lock(mutex); + + if (auto set = get_unsafe()) { if (set->hasExplicitSetElements()) return set; @@ -327,7 +349,7 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context) SizeLimits network_transfer_limits(settings[Setting::max_rows_to_transfer], settings[Setting::max_bytes_to_transfer], settings[Setting::transfer_overflow_mode]); auto prepared_sets_cache = context->getPreparedSetsCache(); - auto plan = build(network_transfer_limits, prepared_sets_cache); + auto plan = build_unsafe(network_transfer_limits, prepared_sets_cache); if (!plan) return nullptr; diff --git a/src/Interpreters/PreparedSets.h b/src/Interpreters/PreparedSets.h index 8e84880cd605..7303df105679 100644 --- a/src/Interpreters/PreparedSets.h +++ b/src/Interpreters/PreparedSets.h @@ -170,6 +170,11 @@ class FutureSetFromSubquery final : public FutureSet QueryPlan * getQueryPlan() { return source.get(); } private: + SetPtr get_unsafe() const; + std::unique_ptr build_unsafe( + const SizeLimits & network_transfer_limits, + const PreparedSetsCachePtr & prepared_sets_cache); + Hash hash; ASTPtr ast; SetAndKeyPtr set_and_key; @@ -177,6 +182,8 @@ class FutureSetFromSubquery final : public FutureSet std::unique_ptr source; QueryTreeNodePtr query_tree; + + mutable std::mutex mutex; }; using FutureSetFromSubqueryPtr = std::shared_ptr; diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 7413e624559f..984076c75f4a 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -472,7 +472,7 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); auto info = getQueryTreeInfo(query_info.query_tree, context); - if (info.has_join || info.has_cross_join /*|| info.has_local_columns_in_where*/) + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) return QueryProcessingStage::Enum::FetchColumns; } diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 6b30d590c6b6..50e547d4fb76 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -783,22 +783,22 @@ def test_cluster_joins(started_cluster): assert res == "Jack\tSparrow\nJohn\tDow\n" - #res = node.query( - # f""" - # SELECT name - # FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` - # WHERE tag in ( - # SELECT id - # FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}` - # ) - # ORDER BY ALL - # SETTINGS - # object_storage_cluster='cluster_simple', - # object_storage_cluster_join_mode='local' - # """ - #) - - #assert res == "Jack\nJohn\n" + res = node.query( + f""" + SELECT name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + WHERE tag in ( + SELECT id + FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\nJohn\n" res = node.query( f""" @@ -816,22 +816,22 @@ def test_cluster_joins(started_cluster): assert res == "Jack\tBlack\nJohn\tSilver\n" - #res = node.query( - # f""" - # SELECT name - # FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` - # WHERE tag in ( - # SELECT id - # FROM `{table_name_local}` - # ) - # ORDER BY ALL - # SETTINGS - # object_storage_cluster='cluster_simple', - # object_storage_cluster_join_mode='local' - # """ - #) - - #assert res == "Jack\nJohn\n" + res = node.query( + f""" + SELECT name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + WHERE tag in ( + SELECT id + FROM `{table_name_local}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\nJohn\n" res = node.query( f""" diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 98ed08eb30a1..31db9f76e52b 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -701,6 +701,7 @@ def test_joins(started_cluster): node = started_cluster.instances["s0_0_0"] # Table join_table only exists on the node 's0_0_0'. + node.query("DROP TABLE IF EXISTS join_table SYNC") node.query( """ CREATE TABLE IF NOT EXISTS join_table ( @@ -815,16 +816,16 @@ def test_joins(started_cluster): res = list(map(str.split, result5.splitlines())) assert len(res) == 6 - #result6 = node.query( - # f""" - # SELECT name FROM - # s3Cluster('cluster_simple', - # 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', - # 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') - # WHERE value IN (SELECT id FROM join_table) - # ORDER BY name - # SETTINGS object_storage_cluster_join_mode='local'; - # """ - #) - #res = list(map(str.split, result6.splitlines())) - #assert len(res) == 25 + result6 = node.query( + f""" + SELECT name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + WHERE value IN (SELECT id FROM join_table) + ORDER BY name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + res = list(map(str.split, result6.splitlines())) + assert len(res) == 25 diff --git a/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py b/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py index 5352de860281..c04940c1eeb8 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py @@ -87,22 +87,22 @@ def execute_spark_query(query: str, table_name): assert res == "jack\tsparrow\njohn\tdow\n" - #res = instance.query( - # f""" - # SELECT name - # FROM {creation_expression} - # WHERE tag in ( - # SELECT id - # FROM {creation_expression_2} - # ) - # ORDER BY ALL - # SETTINGS - # object_storage_cluster='cluster_simple', - # object_storage_cluster_join_mode='local' - # """ - #) - - #assert res == "jack\njohn\n" + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag in ( + SELECT id + FROM {creation_expression_2} + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\njohn\n" res = instance.query( f""" @@ -120,22 +120,22 @@ def execute_spark_query(query: str, table_name): assert res == "jack\tblack\njohn\tsilver\n" - #res = instance.query( - # f""" - # SELECT name - # FROM {creation_expression} - # WHERE tag in ( - # SELECT id - # FROM `{TABLE_NAME_LOCAL}` - # ) - # ORDER BY ALL - # SETTINGS - # object_storage_cluster='cluster_simple', - # object_storage_cluster_join_mode='local' - # """ - #) - - #assert res == "jack\njohn\n" + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag in ( + SELECT id + FROM `{TABLE_NAME_LOCAL}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\njohn\n" res = instance.query( f""" From 1a502b620ece13631854017e2b0928091980cda8 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 11 Feb 2026 18:44:41 +0100 Subject: [PATCH 5/5] Temporary turn off tests depends on object_storage_cluster --- tests/integration/test_database_iceberg/test.py | 3 ++- .../test_storage_iceberg_with_spark/test_cluster_joins.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 50e547d4fb76..fa1ff5b45359 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -709,7 +709,8 @@ def test_gcs(started_cluster): assert "Google cloud storage converts to S3" in str(err.value) -def test_cluster_joins(started_cluster): +# TODO - turn on after merge alternative syntax +def _test_cluster_joins(started_cluster): node = started_cluster.instances["node1"] test_ref = f"test_join_tables_{uuid.uuid4()}" diff --git a/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py b/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py index c04940c1eeb8..bb637f8e8cc2 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py @@ -6,8 +6,9 @@ execute_spark_query_general, ) +# TODO - turn on after merge alternative syntax @pytest.mark.parametrize("storage_type", ["s3", "azure"]) -def test_cluster_joins(started_cluster_iceberg_with_spark, storage_type): +def _test_cluster_joins(started_cluster_iceberg_with_spark, storage_type): instance = started_cluster_iceberg_with_spark.instances["node1"] spark = started_cluster_iceberg_with_spark.spark_session TABLE_NAME = "test_cluster_joins_" + storage_type + "_" + get_uuid_str()