From db86467c277cc6861bb13dadadd37eda0be12a0a Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Mon, 13 Oct 2025 00:16:17 +0200 Subject: [PATCH 1/3] Merge pull request #1039 from Altinity/fp_antaya_25_8_parquet_metadata_caching Antalya 25.8 - Forward port of #938 - Parquet metadata caching --- programs/server/Server.cpp | 12 +++ src/Access/Common/AccessType.h | 2 +- src/Common/ProfileEvents.cpp | 3 +- src/Core/FormatFactorySettings.h | 3 +- src/Core/ServerSettings.cpp | 3 +- src/Core/SettingsChangesHistory.cpp | 5 ++ src/Interpreters/InterpreterSystemQuery.cpp | 24 +++++- src/Parsers/ASTSystemQuery.cpp | 1 + src/Parsers/ASTSystemQuery.h | 1 + src/Processors/Formats/IInputFormat.h | 4 + .../Formats/Impl/ParquetBlockInputFormat.cpp | 76 ++++++++++++++++++- .../Formats/Impl/ParquetBlockInputFormat.h | 15 ++++ .../Formats/Impl/ParquetFileMetaDataCache.cpp | 20 +++++ .../Formats/Impl/ParquetFileMetaDataCache.h | 30 ++++++++ .../Impl/ParquetMetadataInputFormat.cpp | 53 ++++++++++++- .../Formats/Impl/ParquetMetadataInputFormat.h | 10 +++ .../StorageObjectStorageSource.cpp | 3 + .../disable_parquet_metadata_caching.xml | 7 ++ tests/integration/test_storage_delta/test.py | 7 +- .../01271_show_privileges.reference | 1 + .../0_stateless/02995_settings_25_11_1.tsv | 1 + ...et_object_storage_metadata_cache.reference | 8 ++ ..._parquet_object_storage_metadata_cache.sql | 61 +++++++++++++++ 23 files changed, 334 insertions(+), 16 deletions(-) create mode 100644 src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp create mode 100644 src/Processors/Formats/Impl/ParquetFileMetaDataCache.h create mode 100644 tests/integration/test_storage_delta/configs/users.d/disable_parquet_metadata_caching.xml create mode 100644 tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference create mode 100644 tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index f919d4c407b7..9dec8d198062 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -163,6 +163,10 @@ # include #endif +#if USE_PARQUET +# include +#endif + /// A minimal file used when the server is run without installation constexpr unsigned char resource_embedded_xml[] = @@ -369,6 +373,7 @@ namespace ServerSetting extern const ServerSettingsBool abort_on_logical_error; extern const ServerSettingsUInt64 jemalloc_flush_profile_interval_bytes; extern const ServerSettingsBool jemalloc_flush_profile_on_memory_exceeded; +<<<<<<< HEAD extern const ServerSettingsString allowed_disks_for_table_engines; extern const ServerSettingsUInt64 s3_credentials_provider_max_cache_size; extern const ServerSettingsUInt64 max_open_files; @@ -415,6 +420,9 @@ namespace ServerSetting extern const ServerSettingsUInt64 keeper_server_socket_send_timeout_sec; extern const ServerSettingsString hdfs_libhdfs3_conf; extern const ServerSettingsString config_file; +======= + extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size; +>>>>>>> 67a38d1181c (Merge pull request #1039 from Altinity/fp_antaya_25_8_parquet_metadata_caching) } namespace ErrorCodes @@ -2739,6 +2747,10 @@ try auto replicas_reconnector = ReplicasReconnector::init(global_context); +#if USE_PARQUET + ParquetFileMetaDataCache::instance()->setMaxSizeInBytes(server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]); +#endif + /// Set current database name before loading tables and databases because /// system logs may copy global context. std::string default_database = server_settings[ServerSetting::default_database]; diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 0eaa21082a5c..99c70ce1d662 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -334,10 +334,10 @@ enum class AccessType : uint8_t M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM CLEAR SCHEMA CACHE, SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM CLEAR FORMAT SCHEMA CACHE, SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM CLEAR S3 CLIENT CACHE, SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ + M(SYSTEM_DROP_PARQUET_METADATA_CACHE, "SYSTEM DROP PARQUET METADATA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \ M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \ - M(SYSTEM_RELOAD_DICTIONARY, "SYSTEM RELOAD DICTIONARIES, RELOAD DICTIONARY, RELOAD DICTIONARIES", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_MODEL, "SYSTEM RELOAD MODELS, RELOAD MODEL, RELOAD MODELS", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_FUNCTION, "SYSTEM RELOAD FUNCTIONS, RELOAD FUNCTION, RELOAD FUNCTIONS", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_EMBEDDED_DICTIONARIES, "RELOAD EMBEDDED DICTIONARIES", GLOBAL, SYSTEM_RELOAD) /* implicitly enabled by the grant SYSTEM_RELOAD_DICTIONARY ON *.* */\ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 3f9bd174afeb..63ed4c7cd978 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -1298,7 +1298,8 @@ The server successfully detected this situation and will download merged part fr M(RuntimeFilterRowsChecked, "Number of rows checked by JOIN Runtime Filters", ValueType::Number) \ M(RuntimeFilterRowsPassed, "Number of rows that passed (not filtered out by) JOIN Runtime Filters", ValueType::Number) \ M(RuntimeFilterRowsSkipped, "Number of rows in blocks that were skipped by JOIN Runtime Filters", ValueType::Number) \ - + M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \ + M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \ #ifdef APPLY_FOR_EXTERNAL_EVENTS #define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M) diff --git a/src/Core/FormatFactorySettings.h b/src/Core/FormatFactorySettings.h index 742ed6d3748e..3ee1da38c3d2 100644 --- a/src/Core/FormatFactorySettings.h +++ b/src/Core/FormatFactorySettings.h @@ -1527,8 +1527,7 @@ Allow to write information about geo columns in parquet metadata and encode colu DECLARE(Bool, into_outfile_create_parent_directories, false, R"( Automatically create parent directories when using INTO OUTFILE if they do not already exists. )", 0) \ - - + DECLARE(Bool, input_format_parquet_use_metadata_cache, true, R"(Enable parquet file metadata caching)", 0) \ // End of FORMAT_FACTORY_SETTINGS #define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \ diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index 9df634a0a8b1..77fb5160ad52 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -1542,7 +1542,8 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_ DECLARE(UInt64, keeper_server_socket_receive_timeout_sec, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, R"(Keeper socket receive timeout.)", 0, "keeper_server.socket_receive_timeout_sec") \ DECLARE(UInt64, keeper_server_socket_send_timeout_sec, DBMS_DEFAULT_SEND_TIMEOUT_SEC, R"(Keeper socket send timeout.)", 0, "keeper_server.socket_send_timeout_sec") \ DECLARE(String, hdfs_libhdfs3_conf, "", R"(Points libhdfs3 to the right location for its config.)", 0, "hdfs.libhdfs3_conf") \ - DECLARE(String, config_file, "config.xml", R"(Points to the server config file.)", 0, "config-file") + DECLARE(String, config_file, "config.xml", R"(Points to the server config file.)", 0, "config-file") \ + DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) // clang-format on diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index e8686e2a43ea..d7cf495769f7 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -434,6 +434,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"parallel_hash_join_threshold", 0, 0, "New setting"}, /// Release closed. Please use 25.4 }); + addSettingsChanges(settings_changes_history, "24.12.2.20000", + { + // Altinity Antalya modifications atop of 24.12 + {"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"}, // https://github.com/Altinity/ClickHouse/pull/586 + }); addSettingsChanges(settings_changes_history, "25.2", { /// Release closed. Please use 25.3 diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index c94ebe324fa4..f3e30f24af16 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -77,6 +77,10 @@ #include #endif +#if USE_PARQUET +#include +#endif + #if USE_AWS_S3 #include #endif @@ -453,6 +457,16 @@ BlockIO InterpreterSystemQuery::execute() getContext()->clearQueryResultCache(query.query_result_cache_tag); break; } + case Type::DROP_PARQUET_METADATA_CACHE: + { +#if USE_PARQUET + getContext()->checkAccess(AccessType::SYSTEM_DROP_PARQUET_METADATA_CACHE); + ParquetFileMetaDataCache::instance()->clear(); + break; +#else + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for Parquet"); +#endif + } case Type::CLEAR_COMPILED_EXPRESSION_CACHE: #if USE_EMBEDDED_COMPILER getContext()->checkAccess(AccessType::SYSTEM_DROP_COMPILED_EXPRESSION_CACHE); @@ -1994,10 +2008,12 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() case Type::CLEAR_FILESYSTEM_CACHE: case Type::CLEAR_DISTRIBUTED_CACHE: case Type::SYNC_FILESYSTEM_CACHE: - case Type::CLEAR_PAGE_CACHE: - case Type::CLEAR_SCHEMA_CACHE: - case Type::CLEAR_FORMAT_SCHEMA_CACHE: - case Type::CLEAR_S3_CLIENT_CACHE: + case Type::DROP_PAGE_CACHE: + case Type::DROP_SCHEMA_CACHE: + case Type::DROP_FORMAT_SCHEMA_CACHE: + case Type::DROP_PARQUET_METADATA_CACHE: + case Type::DROP_S3_CLIENT_CACHE: + case Type::DROP_PARQUET_METADATA_CACHE: { required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE); break; diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index a7e0e7c2d57d..7e9540d1658b 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -575,6 +575,7 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti case Type::CLEAR_TEXT_INDEX_CACHES: case Type::CLEAR_COMPILED_EXPRESSION_CACHE: case Type::CLEAR_S3_CLIENT_CACHE: + case Type::DROP_PARQUET_METADATA_CACHE: case Type::CLEAR_ICEBERG_METADATA_CACHE: case Type::RESET_COVERAGE: case Type::RESTART_REPLICAS: diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index 14dcb8e04f73..1f3c733aa865 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -43,6 +43,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster CLEAR_QUERY_CONDITION_CACHE, CLEAR_QUERY_CACHE, CLEAR_COMPILED_EXPRESSION_CACHE, + DROP_PARQUET_METADATA_CACHE, CLEAR_ICEBERG_METADATA_CACHE, CLEAR_FILESYSTEM_CACHE, CLEAR_DISTRIBUTED_CACHE, diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index 1b6aa14bc2ec..bf0eccf2fa88 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -8,6 +8,7 @@ #include #include #include +#include namespace DB @@ -128,6 +129,9 @@ class IInputFormat : public ISource void needOnlyCount() { need_only_count = true; } + /// Set additional info/key/id related to underlying storage of the ReadBuffer + virtual void setStorageRelatedUniqueKey(const Settings & /*settings*/, const String & /*key*/) {} + protected: ReadBuffer & getReadBuffer() const { chassert(in); return *in; } diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index d80af0ab162e..3a60e4666ce0 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -8,6 +8,9 @@ #if USE_PARQUET +#include +#include +#include #include #include #include @@ -34,6 +37,7 @@ #include #include #include +#include #include #include #include @@ -45,6 +49,8 @@ namespace ProfileEvents extern const Event ParquetFetchWaitTimeMicroseconds; extern const Event ParquetReadRowGroups; extern const Event ParquetPrunedRowGroups; + extern const Event ParquetMetaDataCacheHits; + extern const Event ParquetMetaDataCacheMisses; } namespace CurrentMetrics @@ -61,6 +67,16 @@ namespace CurrentMetrics namespace DB { +namespace Setting +{ + extern const SettingsBool input_format_parquet_use_metadata_cache; +} + +namespace ServerSetting +{ + extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size; +} + namespace ErrorCodes { extern const int INCORRECT_DATA; @@ -545,6 +561,49 @@ static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaDa return hyperrectangle; } +std::shared_ptr ParquetBlockInputFormat::readMetadataFromFile() +{ + createArrowFileIfNotCreated(); + return parquet::ReadMetaData(arrow_file); +} + +std::shared_ptr ParquetBlockInputFormat::getFileMetaData() +{ + // in-memory cache is not implemented for local file operations, only for remote files + // there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation + // and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key + if (!metadata_cache.use_cache || metadata_cache.key.empty()) + { + return readMetadataFromFile(); + } + + auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet( + metadata_cache.key, + [&]() + { + return readMetadataFromFile(); + } + ); + if (loaded) + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses); + else + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits); + return parquet_file_metadata; +} + +void ParquetBlockInputFormat::createArrowFileIfNotCreated() +{ + if (arrow_file) + { + return; + } + + // Create arrow file adapter. + // TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that + // we'll need to read (which we know in advance). Use max_download_threads for that. + arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); +} + std::unordered_set getBloomFilterFilteringColumnKeys(const KeyCondition::RPN & rpn) { std::unordered_set column_keys; @@ -691,7 +750,7 @@ void ParquetBlockInputFormat::initializeIfNeeded() if (is_stopped) return; - metadata = parquet::ReadMetaData(arrow_file); + metadata = getFileMetaData(); if (buckets_to_read) { std::unordered_set set_to_read(buckets_to_read->row_group_ids.begin(), buckets_to_read->row_group_ids.end()); @@ -807,6 +866,8 @@ void ParquetBlockInputFormat::initializeIfNeeded() } } + bool has_row_groups_to_read = false; + auto skip_row_group_based_on_filters = [&](int row_group) { if (!format_settings.parquet.filter_push_down && !format_settings.parquet.bloom_filter_push_down) @@ -865,7 +926,20 @@ void ParquetBlockInputFormat::initializeIfNeeded() row_group_batches.back().total_bytes_compressed += row_group_size; auto rows = adaptive_chunk_size(row_group); row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size; + + has_row_groups_to_read = true; } + + if (has_row_groups_to_read) + { + createArrowFileIfNotCreated(); + } +} + +void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_) +{ + metadata_cache.key = key_; + metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; } void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx) diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index dda5721e2c4a..29e34171e432 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -94,6 +94,7 @@ class ParquetBlockInputFormat : public IInputFormat size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; } void setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) override; + void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override; private: Chunk read() override; @@ -113,6 +114,13 @@ class ParquetBlockInputFormat : public IInputFormat void threadFunction(size_t row_group_batch_idx); + void createArrowFileIfNotCreated(); + std::shared_ptr readMetadataFromFile(); + + std::shared_ptr getFileMetaData(); + + inline bool supportPrefetch() const; + // Data layout in the file: // // row group 0 @@ -361,6 +369,13 @@ class ParquetBlockInputFormat : public IInputFormat bool is_initialized = false; std::optional> parquet_names_to_clickhouse; std::optional> clickhouse_names_to_parquet; + struct Cache + { + String key; + bool use_cache = false; + }; + + Cache metadata_cache; }; class ArrowParquetSchemaReader : public ISchemaReader diff --git a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp new file mode 100644 index 000000000000..da8ad825f505 --- /dev/null +++ b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp @@ -0,0 +1,20 @@ +#include + +#if USE_PARQUET + +namespace DB +{ + +ParquetFileMetaDataCache::ParquetFileMetaDataCache() + : CacheBase(CurrentMetrics::end(), CurrentMetrics::end(), 0) +{} + +ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance() +{ + static ParquetFileMetaDataCache instance; + return &instance; +} + +} + +#endif diff --git a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h new file mode 100644 index 000000000000..fb5fc1bb0217 --- /dev/null +++ b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h @@ -0,0 +1,30 @@ +#pragma once + +#include "config.h" + +#if USE_PARQUET + +namespace parquet +{ + +class FileMetaData; + +} + +#include + +namespace DB +{ + +class ParquetFileMetaDataCache : public CacheBase +{ +public: + static ParquetFileMetaDataCache * instance(); + +private: + ParquetFileMetaDataCache(); +}; + +} + +#endif diff --git a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp index 02e5f79ee3f4..f7c2dab49db1 100644 --- a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp @@ -22,8 +22,17 @@ #include #include #include +#include +#include +#include +namespace ProfileEvents +{ +extern const Event ParquetMetaDataCacheHits; +extern const Event ParquetMetaDataCacheMisses; +} + namespace DB { @@ -32,6 +41,11 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +namespace Setting +{ +extern const SettingsBool input_format_parquet_use_metadata_cache; +} + static NamesAndTypesList getHeaderForParquetMetadata() { NamesAndTypesList names_and_types{ @@ -130,10 +144,35 @@ void checkHeader(const Block & header) static std::shared_ptr getFileMetadata( ReadBuffer & in, const FormatSettings & format_settings, - std::atomic & is_stopped) + std::atomic & is_stopped, + ParquetMetadataInputFormat::Cache metadata_cache) { - auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); - return parquet::ReadMetaData(arrow_file); + // in-memory cache is not implemented for local file operations, only for remote files + // there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation + // and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key + if (!metadata_cache.use_cache || metadata_cache.key.empty()) + { + auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); + return parquet::ReadMetaData(arrow_file); + } + + auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet( + metadata_cache.key, + [&]() + { + auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); + return parquet::ReadMetaData(arrow_file); + } + ); + + if (loaded) + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses); + else + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits); + + return parquet_file_metadata; + + } ParquetMetadataInputFormat::ParquetMetadataInputFormat(ReadBuffer & in_, SharedHeader header_, const FormatSettings & format_settings_) @@ -148,7 +187,7 @@ Chunk ParquetMetadataInputFormat::read() if (done) return res; - auto metadata = getFileMetadata(*in, format_settings, is_stopped); + auto metadata = getFileMetadata(*in, format_settings, is_stopped, metadata_cache); const auto & header = getPort().getHeader(); auto names_and_types = getHeaderForParquetMetadata(); @@ -489,6 +528,12 @@ void ParquetMetadataInputFormat::resetParser() done = false; } +void ParquetMetadataInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_) +{ + metadata_cache.key = key_; + metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; +} + ParquetMetadataSchemaReader::ParquetMetadataSchemaReader(ReadBuffer & in_) : ISchemaReader(in_) { diff --git a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h index 81cf7890ee7e..6b667dcc5b1e 100644 --- a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h @@ -62,6 +62,14 @@ class ParquetMetadataInputFormat : public IInputFormat void resetParser() override; + void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override; + + struct Cache + { + String key; + bool use_cache = false; + }; + private: Chunk read() override; @@ -78,6 +86,8 @@ class ParquetMetadataInputFormat : public IInputFormat const FormatSettings format_settings; bool done = false; std::atomic is_stopped{0}; + + Cache metadata_cache; }; class ParquetMetadataSchemaReader : public ISchemaReader diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 580e102544e1..c8baf388c89e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -643,6 +643,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (need_only_count) input_format->needOnlyCount(); + if (!object_info->getPath().empty()) + input_format->setStorageRelatedUniqueKey(context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag); + builder.init(Pipe(input_format)); configuration->addDeleteTransformers(object_info, builder, format_settings, parser_shared_resources, context_); diff --git a/tests/integration/test_storage_delta/configs/users.d/disable_parquet_metadata_caching.xml b/tests/integration/test_storage_delta/configs/users.d/disable_parquet_metadata_caching.xml new file mode 100644 index 000000000000..bc34464e30da --- /dev/null +++ b/tests/integration/test_storage_delta/configs/users.d/disable_parquet_metadata_caching.xml @@ -0,0 +1,7 @@ + + + + 0 + + + diff --git a/tests/integration/test_storage_delta/test.py b/tests/integration/test_storage_delta/test.py index 26b138c191be..708d51cf93d9 100644 --- a/tests/integration/test_storage_delta/test.py +++ b/tests/integration/test_storage_delta/test.py @@ -110,6 +110,7 @@ def started_cluster(): user_configs=[ "configs/users.d/users.xml", "configs/users.d/enable_writes.xml", + "configs/users.d/disable_parquet_metadata_caching.xml", ], env_variables={ "RUST_BACKTRACE": "1", @@ -131,6 +132,7 @@ def started_cluster(): user_configs=[ "configs/users.d/users.xml", "configs/users.d/enable_writes.xml", + "configs/users.d/disable_parquet_metadata_caching.xml", ], with_minio=True, stay_alive=True, @@ -181,6 +183,7 @@ def started_cluster(): user_configs=[ "configs/users.d/users.xml", "configs/users.d/disabled_delta_kernel.xml", + "configs/users.d/disable_parquet_metadata_caching.xml", ], with_minio=True, with_azurite=True, @@ -1383,7 +1386,7 @@ def test_session_token(started_cluster): parquet_data_path = create_initial_data_file( started_cluster, instance, - "SELECT toUInt64(number), toString(number) FROM numbers(100)", + "SELECT toUInt64(number), toString(number) FROM numbers(100) SETTINGS input_format_parquet_use_metadata_cache=0", TABLE_NAME, node_name=node_name, ) @@ -1396,7 +1399,7 @@ def test_session_token(started_cluster): f""" SELECT count() FROM deltaLake( 'http://{started_cluster.minio_host}:{started_cluster.minio_port}/{started_cluster.minio_bucket}/{TABLE_NAME}/', - SETTINGS allow_experimental_delta_kernel_rs=1) + SETTINGS allow_experimental_delta_kernel_rs=1, input_format_parquet_use_metadata_cache=0) """ ) ) diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index b63f357d776a..3cff6afcbea1 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -144,6 +144,7 @@ SYSTEM DROP PAGE CACHE ['SYSTEM CLEAR PAGE CACHE','SYSTEM DROP PAGE CACHE','DROP SYSTEM DROP SCHEMA CACHE ['SYSTEM CLEAR SCHEMA CACHE','SYSTEM DROP SCHEMA CACHE','DROP SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP FORMAT SCHEMA CACHE ['SYSTEM CLEAR FORMAT SCHEMA CACHE','SYSTEM DROP FORMAT SCHEMA CACHE','DROP FORMAT SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP S3 CLIENT CACHE ['SYSTEM CLEAR S3 CLIENT CACHE','SYSTEM DROP S3 CLIENT','DROP S3 CLIENT CACHE'] GLOBAL SYSTEM DROP CACHE +SYSTEM DROP PARQUET METADATA CACHE ['SYSTEM DROP PARQUET METADATA CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP CACHE ['DROP CACHE'] \N SYSTEM SYSTEM RELOAD CONFIG ['RELOAD CONFIG'] GLOBAL SYSTEM RELOAD SYSTEM RELOAD USERS ['RELOAD USERS'] GLOBAL SYSTEM RELOAD diff --git a/tests/queries/0_stateless/02995_settings_25_11_1.tsv b/tests/queries/0_stateless/02995_settings_25_11_1.tsv index 1fc5f4b2f8b1..0969de0f86de 100644 --- a/tests/queries/0_stateless/02995_settings_25_11_1.tsv +++ b/tests/queries/0_stateless/02995_settings_25_11_1.tsv @@ -646,6 +646,7 @@ input_format_parquet_case_insensitive_column_matching 0 input_format_parquet_enable_json_parsing 1 input_format_parquet_enable_row_group_prefetch 1 input_format_parquet_filter_push_down 1 +input_format_parquet_use_metadata_cache 1 input_format_parquet_import_nested 0 input_format_parquet_local_file_min_bytes_for_seek 8192 input_format_parquet_local_time_as_utc 1 diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference new file mode 100644 index 000000000000..c87ad9008b60 --- /dev/null +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference @@ -0,0 +1,8 @@ +10 +10 +10 +10 +10 +10 +0 +10 diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql new file mode 100644 index 000000000000..b7b0501e4875 --- /dev/null +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql @@ -0,0 +1,61 @@ +-- Tags: no-parallel, no-fasttest, no-parallel-replicas + +DROP TABLE IF EXISTS t_parquet_03262; + +CREATE TABLE t_parquet_03262 (a UInt64) +ENGINE = S3(s3_conn, filename = 'test_03262_{_partition_id}', format = Parquet) +PARTITION BY a; + +INSERT INTO t_parquet_03262 SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1, use_query_condition_cache=0,optimize_count_from_files=0; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1, use_query_condition_cache=0, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_cache'; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = ParquetMetadata) +SETTINGS input_format_parquet_use_metadata_cache=1, use_query_condition_cache=0, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_format_metadata_cache'; + +SYSTEM FLUSH LOGS; + +SELECT ProfileEvents['ParquetMetaDataCacheHits'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1 SETTINGS use_query_condition_cache=0; + +SELECT ProfileEvents['ParquetMetaDataCacheHits'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_format_metadata_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1 SETTINGS use_query_condition_cache=0; + +SYSTEM DROP PARQUET METADATA CACHE; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1, use_query_condition_cache=0, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_cache_cache_empty'; + +SYSTEM FLUSH LOGS; + +SELECT ProfileEvents['ParquetMetaDataCacheHits'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_cache_cache_empty' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1 SETTINGS use_query_condition_cache=0; + +SELECT ProfileEvents['ParquetMetaDataCacheMisses'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_cache_cache_empty' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1 SETTINGS use_query_condition_cache=0; + +DROP TABLE t_parquet_03262; From b8094db6e5f7dc64f65237a7ebe4a705e9558741 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 9 Feb 2026 14:34:07 -0300 Subject: [PATCH 2/3] merge glitches --- programs/server/Server.cpp | 3 --- src/Access/Common/AccessType.h | 1 + src/Core/ServerSettings.cpp | 6 +++--- src/Interpreters/InterpreterSystemQuery.cpp | 9 ++++----- .../ObjectStorage/StorageObjectStorageSource.cpp | 7 ++++++- 5 files changed, 14 insertions(+), 12 deletions(-) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 9dec8d198062..ce33066847be 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -373,7 +373,6 @@ namespace ServerSetting extern const ServerSettingsBool abort_on_logical_error; extern const ServerSettingsUInt64 jemalloc_flush_profile_interval_bytes; extern const ServerSettingsBool jemalloc_flush_profile_on_memory_exceeded; -<<<<<<< HEAD extern const ServerSettingsString allowed_disks_for_table_engines; extern const ServerSettingsUInt64 s3_credentials_provider_max_cache_size; extern const ServerSettingsUInt64 max_open_files; @@ -420,9 +419,7 @@ namespace ServerSetting extern const ServerSettingsUInt64 keeper_server_socket_send_timeout_sec; extern const ServerSettingsString hdfs_libhdfs3_conf; extern const ServerSettingsString config_file; -======= extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size; ->>>>>>> 67a38d1181c (Merge pull request #1039 from Altinity/fp_antaya_25_8_parquet_metadata_caching) } namespace ErrorCodes diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 99c70ce1d662..90cf43050585 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -338,6 +338,7 @@ enum class AccessType : uint8_t M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \ M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \ + M(SYSTEM_RELOAD_DICTIONARY, "SYSTEM RELOAD DICTIONARIES, RELOAD DICTIONARY, RELOAD DICTIONARIES", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_MODEL, "SYSTEM RELOAD MODELS, RELOAD MODEL, RELOAD MODELS", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_FUNCTION, "SYSTEM RELOAD FUNCTIONS, RELOAD FUNCTION, RELOAD FUNCTIONS", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_EMBEDDED_DICTIONARIES, "RELOAD EMBEDDED DICTIONARIES", GLOBAL, SYSTEM_RELOAD) /* implicitly enabled by the grant SYSTEM_RELOAD_DICTIONARY ON *.* */\ diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index 77fb5160ad52..f1c59b91269e 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -1469,7 +1469,8 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_ ```xml 1 ``` - )", 0) + )", 0) \ + DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) /// Settings with a path are server settings with at least one layer of nesting that have a fixed structure (no lists, lists, enumerations, repetitions, ...). #define LIST_OF_SERVER_SETTINGS_WITH_PATH(DECLARE, ALIAS) \ @@ -1542,8 +1543,7 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_ DECLARE(UInt64, keeper_server_socket_receive_timeout_sec, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, R"(Keeper socket receive timeout.)", 0, "keeper_server.socket_receive_timeout_sec") \ DECLARE(UInt64, keeper_server_socket_send_timeout_sec, DBMS_DEFAULT_SEND_TIMEOUT_SEC, R"(Keeper socket send timeout.)", 0, "keeper_server.socket_send_timeout_sec") \ DECLARE(String, hdfs_libhdfs3_conf, "", R"(Points libhdfs3 to the right location for its config.)", 0, "hdfs.libhdfs3_conf") \ - DECLARE(String, config_file, "config.xml", R"(Points to the server config file.)", 0, "config-file") \ - DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) + DECLARE(String, config_file, "config.xml", R"(Points to the server config file.)", 0, "config-file") // clang-format on diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index f3e30f24af16..1dce8ba11369 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -2008,12 +2008,11 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() case Type::CLEAR_FILESYSTEM_CACHE: case Type::CLEAR_DISTRIBUTED_CACHE: case Type::SYNC_FILESYSTEM_CACHE: - case Type::DROP_PAGE_CACHE: - case Type::DROP_SCHEMA_CACHE: - case Type::DROP_FORMAT_SCHEMA_CACHE: - case Type::DROP_PARQUET_METADATA_CACHE: - case Type::DROP_S3_CLIENT_CACHE: + case Type::CLEAR_PAGE_CACHE: + case Type::CLEAR_SCHEMA_CACHE: + case Type::CLEAR_FORMAT_SCHEMA_CACHE: case Type::DROP_PARQUET_METADATA_CACHE: + case Type::CLEAR_S3_CLIENT_CACHE: { required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE); break; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index c8baf388c89e..bf243eceaff2 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -644,7 +644,12 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade input_format->needOnlyCount(); if (!object_info->getPath().empty()) - input_format->setStorageRelatedUniqueKey(context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag); + { + if (const auto & metadata = object_info->relative_path_with_metadata.metadata) + { + input_format->setStorageRelatedUniqueKey(context_->getSettingsRef(), object_info->getPath() + ":" + metadata->etag); + } + } builder.init(Pipe(input_format)); From ade2d44e24a788776a67debf9c26c7dbffb21c5e Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 10 Feb 2026 07:53:40 -0300 Subject: [PATCH 3/3] fix settingshistory --- src/Core/SettingsChangesHistory.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index d7cf495769f7..15640a09a17b 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -78,6 +78,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", true, false, "It becomes obsolete."}, {"database_datalake_require_metadata_access", true, true, "New setting."}, {"automatic_parallel_replicas_min_bytes_per_replica", 0, 1_MiB, "Better default value derived from testing results"}, + {"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"}, }); addSettingsChanges(settings_changes_history, "25.12", {