Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1914,6 +1914,22 @@ Possible values:
- `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.`
- `allow` — Allows the use of these types of subqueries.
)", IMPORTANT) \
DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"(
Changes the behaviour of object storage cluster function or table.

ClickHouse applies this setting when the query contains the product of object storage cluster function or table, i.e. when the query for a object storage cluster function or table contains a non-GLOBAL subquery for the object storage cluster function or table.

Restrictions:

- Only applied for JOIN subqueries.
- Only if the FROM section uses a object storage cluster function or table.

Possible values:

- `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.`
- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.`
- `allow` — Default value. Allows the use of these types of subqueries.
)", 0) \
\
DECLARE(UInt64, max_concurrent_queries_for_all_users, 0, R"(
Throw exception if the value of this setting is less or equal than the current number of simultaneously processed queries.
Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class WriteBuffer;
M(CLASS_NAME, DistributedCachePoolBehaviourOnLimit) /* Cloud only */ \
M(CLASS_NAME, DistributedDDLOutputMode) \
M(CLASS_NAME, DistributedProductMode) \
M(CLASS_NAME, ObjectStorageClusterJoinMode) \
M(CLASS_NAME, Double) \
M(CLASS_NAME, EscapingRule) \
M(CLASS_NAME, Float) \
Expand Down
11 changes: 11 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,17 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"os_threads_nice_value_query", 0, 0, "New setting."},
{"os_threads_nice_value_materialized_view", 0, 0, "New setting."},
{"os_thread_priority", 0, 0, "Alias for os_threads_nice_value_query."},
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
{"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."},
{"object_storage_cluster", "", "", "Antalya: New setting"},
{"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
{"allow_retries_in_cluster_requests", false, false, "Antalya: New setting"},
{"allow_retries_in_cluster_requests", false, false, "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
{"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."},
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}
});
addSettingsChanges(settings_changes_history, "25.8",
{
Expand Down
5 changes: 5 additions & 0 deletions src/Core/SettingsEnums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ IMPLEMENT_SETTING_ENUM(DistributedProductMode, ErrorCodes::UNKNOWN_DISTRIBUTED_P
{"global", DistributedProductMode::GLOBAL},
{"allow", DistributedProductMode::ALLOW}})

IMPLEMENT_SETTING_ENUM(ObjectStorageClusterJoinMode, ErrorCodes::BAD_ARGUMENTS,
{{"local", ObjectStorageClusterJoinMode::LOCAL},
{"global", ObjectStorageClusterJoinMode::GLOBAL},
{"allow", ObjectStorageClusterJoinMode::ALLOW}})


IMPLEMENT_SETTING_ENUM(QueryResultCacheNondeterministicFunctionHandling, ErrorCodes::BAD_ARGUMENTS,
{{"throw", QueryResultCacheNondeterministicFunctionHandling::Throw},
Expand Down
10 changes: 10 additions & 0 deletions src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ enum class DistributedProductMode : uint8_t

DECLARE_SETTING_ENUM(DistributedProductMode)

/// The setting for executing object storage cluster function or table JOIN sections.
enum class ObjectStorageClusterJoinMode : uint8_t
{
LOCAL, /// Convert to local query
GLOBAL, /// Convert to global query
ALLOW /// Enable
};

DECLARE_SETTING_ENUM(ObjectStorageClusterJoinMode)

/// How the query result cache handles queries with non-deterministic functions, e.g. now()
enum class QueryResultCacheNondeterministicFunctionHandling : uint8_t
{
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/ClusterProxy/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ void executeQuery(
not_optimized_cluster->getName());

read_from_remote->setStepDescription("Read from remote replica");
read_from_remote->setIsRemoteFunction(is_remote_function);
plan->addStep(std::move(read_from_remote));
plan->addInterpreterContext(new_context);
plans.emplace_back(std::move(plan));
Expand Down
5 changes: 4 additions & 1 deletion src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3151,8 +3151,11 @@ void Context::setCurrentQueryId(const String & query_id)

client_info.current_query_id = query_id_to_set;

if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY
&& (getApplicationType() != ApplicationType::SERVER || client_info.initial_query_id.empty()))
{
client_info.initial_query_id = client_info.current_query_id;
}
}

void Context::setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType background_operation)
Expand Down
19 changes: 19 additions & 0 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/ObjectFilterStep.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/AggregatingTransform.h>
Expand All @@ -91,6 +92,7 @@
#include <Storages/StorageValues.h>
#include <Storages/StorageView.h>
#include <Storages/ReadInOrderOptimizer.h>
#include <Storages/IStorageCluster.h>

#include <Columns/Collator.h>
#include <Columns/ColumnAggregateFunction.h>
Expand Down Expand Up @@ -208,6 +210,7 @@ namespace Setting
extern const SettingsBool enable_producing_buckets_out_of_order_in_aggregation;
extern const SettingsBool enable_lazy_columns_replication;
extern const SettingsBool serialize_string_in_memory_with_zero_byte;
extern const SettingsBool use_hive_partitioning;
}

namespace ServerSetting
Expand Down Expand Up @@ -2046,6 +2049,22 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P

if (expressions.second_stage || from_aggregation_stage)
{
if (settings[Setting::use_hive_partitioning]
&& !expressions.first_stage
&& expressions.hasWhere())
{
if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get()))
{
auto object_filter_step = std::make_unique<ObjectFilterStep>(
query_plan.getCurrentHeader(),
expressions.before_where->dag.clone(),
getSelectQuery().where()->getColumnName());

object_filter_step->setStepDescription("WHERE");
query_plan.addStep(std::move(object_filter_step));
}
}

if (from_aggregation_stage)
{
/// No need to aggregate anything, since this was done on remote shards.
Expand Down
28 changes: 25 additions & 3 deletions src/Interpreters/PreparedSets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ FutureSetFromSubquery::FutureSetFromSubquery(
FutureSetFromSubquery::~FutureSetFromSubquery() = default;

SetPtr FutureSetFromSubquery::get() const
{
std::lock_guard lock(mutex);
return get_unsafe();
}

SetPtr FutureSetFromSubquery::get_unsafe() const
{
if (set_and_key->set != nullptr && set_and_key->set->isCreated())
return set_and_key->set;
Expand All @@ -189,12 +195,15 @@ SetPtr FutureSetFromSubquery::get() const

void FutureSetFromSubquery::setQueryPlan(std::unique_ptr<QueryPlan> source_)
{
std::lock_guard lock(mutex);
source = std::move(source_);
set_and_key->set->setHeader(source->getCurrentHeader()->getColumnsWithTypeAndName());
}

void FutureSetFromSubquery::buildExternalTableFromInplaceSet(StoragePtr external_table_)
{
std::lock_guard lock(mutex);

const auto & set = *set_and_key->set;

LOG_TRACE(getLogger("FutureSetFromSubquery"), "Building external table from set of {} elements", set.getTotalRowCount());
Expand Down Expand Up @@ -240,6 +249,8 @@ void FutureSetFromSubquery::buildExternalTableFromInplaceSet(StoragePtr external

void FutureSetFromSubquery::setExternalTable(StoragePtr external_table_)
{
std::lock_guard lock(mutex);

if (set_and_key->set->isCreated())
{
if (!set_and_key->set->hasExplicitSetElements())
Expand All @@ -253,12 +264,19 @@ void FutureSetFromSubquery::setExternalTable(StoragePtr external_table_)

DataTypes FutureSetFromSubquery::getTypes() const
{
std::lock_guard lock(mutex);
return set_and_key->set->getElementsTypes();
}

FutureSet::Hash FutureSetFromSubquery::getHash() const { return hash; }

std::unique_ptr<QueryPlan> FutureSetFromSubquery::build(const SizeLimits & network_transfer_limits, const PreparedSetsCachePtr & prepared_sets_cache)
{
std::lock_guard lock(mutex);
return build_unsafe(network_transfer_limits, prepared_sets_cache);
}

std::unique_ptr<QueryPlan> FutureSetFromSubquery::build_unsafe(const SizeLimits & network_transfer_limits, const PreparedSetsCachePtr & prepared_sets_cache)
{
if (set_and_key->set->isCreated())
return nullptr;
Expand All @@ -280,14 +298,16 @@ std::unique_ptr<QueryPlan> FutureSetFromSubquery::build(const SizeLimits & netwo

void FutureSetFromSubquery::buildSetInplace(const ContextPtr & context)
{
std::lock_guard lock(mutex);

if (external_table_set)
external_table_set->buildSetInplace(context);

const auto & settings = context->getSettingsRef();
SizeLimits network_transfer_limits(settings[Setting::max_rows_to_transfer], settings[Setting::max_bytes_to_transfer], settings[Setting::transfer_overflow_mode]);
auto prepared_sets_cache = context->getPreparedSetsCache();

auto plan = build(network_transfer_limits, prepared_sets_cache);
auto plan = build_unsafe(network_transfer_limits, prepared_sets_cache);

if (!plan)
return;
Expand All @@ -305,7 +325,9 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context)
if (!context->getSettingsRef()[Setting::use_index_for_in_with_subqueries])
return nullptr;

if (auto set = get())
std::lock_guard lock(mutex);

if (auto set = get_unsafe())
{
if (set->hasExplicitSetElements())
return set;
Expand All @@ -327,7 +349,7 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context)
SizeLimits network_transfer_limits(settings[Setting::max_rows_to_transfer], settings[Setting::max_bytes_to_transfer], settings[Setting::transfer_overflow_mode]);
auto prepared_sets_cache = context->getPreparedSetsCache();

auto plan = build(network_transfer_limits, prepared_sets_cache);
auto plan = build_unsafe(network_transfer_limits, prepared_sets_cache);
if (!plan)
return nullptr;

Expand Down
7 changes: 7 additions & 0 deletions src/Interpreters/PreparedSets.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,20 @@ class FutureSetFromSubquery final : public FutureSet
QueryPlan * getQueryPlan() { return source.get(); }

private:
SetPtr get_unsafe() const;
std::unique_ptr<QueryPlan> build_unsafe(
const SizeLimits & network_transfer_limits,
const PreparedSetsCachePtr & prepared_sets_cache);

Hash hash;
ASTPtr ast;
SetAndKeyPtr set_and_key;
std::shared_ptr<FutureSetFromSubquery> external_table_set;

std::unique_ptr<QueryPlan> source;
QueryTreeNodePtr query_tree;

mutable std::mutex mutex;
};

using FutureSetFromSubqueryPtr = std::shared_ptr<FutureSetFromSubquery>;
Expand Down
28 changes: 28 additions & 0 deletions src/Planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <Processors/QueryPlan/ReadFromRecursiveCTEStep.h>
#include <Processors/QueryPlan/ObjectFilterStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>

#include <Interpreters/Context.h>
Expand All @@ -54,6 +55,7 @@
#include <Storages/StorageDistributed.h>
#include <Storages/StorageDummy.h>
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>
#include <Storages/IStorageCluster.h>

#include <AggregateFunctions/IAggregateFunction.h>

Expand Down Expand Up @@ -145,6 +147,7 @@ namespace Setting
extern const SettingsBool serialize_string_in_memory_with_zero_byte;
extern const SettingsString temporary_files_codec;
extern const SettingsNonZeroUInt64 temporary_files_buffer_size;
extern const SettingsBool use_hive_partitioning;
}

namespace ServerSetting
Expand Down Expand Up @@ -547,6 +550,21 @@ ALWAYS_INLINE void addFilterStep(
query_plan.addStep(std::move(where_step));
}

template <size_t size>
ALWAYS_INLINE void addObjectFilterStep(
QueryPlan & query_plan,
FilterAnalysisResult & filter_analysis_result,
const char (&step_description)[size])
{
auto actions = std::move(filter_analysis_result.filter_actions->dag);

auto where_step = std::make_unique<ObjectFilterStep>(query_plan.getCurrentHeader(),
std::move(actions),
filter_analysis_result.filter_column_name);
where_step->setStepDescription(step_description);
query_plan.addStep(std::move(where_step));
}

Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context,
const AggregationAnalysisResult & aggregation_analysis_result,
const QueryAnalysisResult & query_analysis_result,
Expand Down Expand Up @@ -2016,6 +2034,16 @@ void Planner::buildPlanForQueryNode()

if (query_processing_info.isSecondStage() || query_processing_info.isFromAggregationState())
{
if (settings[Setting::use_hive_partitioning]
&& !query_processing_info.isFirstStage()
&& expression_analysis_result.hasWhere())
{
if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get()))
{
addObjectFilterStep(query_plan, expression_analysis_result.getWhere(), "WHERE");
}
}

if (query_processing_info.isFromAggregationState())
{
/// Aggregation was performed on remote shards
Expand Down
4 changes: 3 additions & 1 deletion src/Planner/PlannerJoinTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1448,7 +1448,9 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
/// Overall, IStorage::read -> FetchColumns returns normal column names (except Distributed, which is inconsistent)
/// Interpreter::getQueryPlan -> FetchColumns returns identifiers (why?) and this the reason for the bug ^ in Distributed
/// Hopefully there is no other case when we read from Distributed up to FetchColumns.
if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns)
if (table_node && table_node->getStorage()->isRemote())
updated_actions_dag_outputs.push_back(output_node);
else if (table_function_node && table_function_node->getStorage()->isRemote())
updated_actions_dag_outputs.push_back(output_node);
}
else
Expand Down
Loading
Loading