From 87b84a79866589effaf6294b4d517060f81efae9 Mon Sep 17 00:00:00 2001 From: Alex Kasko Date: Sun, 5 Apr 2026 23:54:14 +0100 Subject: [PATCH 1/3] Pool: add configuration and introspection function This PR adds `postgres_configure_pool` table function that can be used to configure the connection pool for the specified attached Postgres DB (all for all attached Postgres DBs). This function return the details about the configured pool. Testing: basic test added, more test coverage pending. --- src/include/storage/postgres_catalog.hpp | 6 + .../storage/postgres_connection_pool.hpp | 5 + src/postgres_extension.cpp | 4 + src/storage/CMakeLists.txt | 1 + src/storage/postgres_configure_pool.cpp | 203 ++++++++++++++++++ test/sql/storage/attach_connection_pool.test | 53 ++++- 6 files changed, 269 insertions(+), 3 deletions(-) create mode 100644 src/storage/postgres_configure_pool.cpp diff --git a/src/include/storage/postgres_catalog.hpp b/src/include/storage/postgres_catalog.hpp index a0693c968..74fa3577e 100644 --- a/src/include/storage/postgres_catalog.hpp +++ b/src/include/storage/postgres_catalog.hpp @@ -8,6 +8,8 @@ #pragma once +#include + #include "duckdb/catalog/catalog.hpp" #include "duckdb/common/enums/access_mode.hpp" #include "postgres_connection.hpp" @@ -81,6 +83,10 @@ class PostgresCatalog : public Catalog { return *connection_pool; } + shared_ptr GetConnectionPoolPtr() { + return connection_pool; + } + void ClearCache(); //! Whether or not this catalog should search a specific type with the standard priority diff --git a/src/include/storage/postgres_connection_pool.hpp b/src/include/storage/postgres_connection_pool.hpp index 2ab44893d..0332b1cd3 100644 --- a/src/include/storage/postgres_connection_pool.hpp +++ b/src/include/storage/postgres_connection_pool.hpp @@ -52,4 +52,9 @@ class PostgresConnectionPool : public dbconnector::pool::ConnectionPool + +#include "duckdb/function/table_function.hpp" +#include "duckdb/main/database_manager.hpp" + +#include "storage/postgres_catalog.hpp" + +namespace duckdb { + +namespace { + +enum class ExecState { UNINITIALIZED, EXHAUSTED }; + +struct ConfigurePoolBindData : public TableFunctionData { + std::pair catalog_name; + std::pair max_connections; + std::pair wait_timeout_millis; + std::pair enable_thread_local_cache; + std::pair max_lifetime_millis; + std::pair idle_timeout_millis; + std::pair enable_reaper_thread; + std::pair health_check_query; + + static Value Lookup(const named_parameter_map_t &map, const std::string &key) { + auto it = map.find(key); + if (it == map.end()) { + return Value(); + } + return it->second; + } + + static std::pair LookupString(const named_parameter_map_t &map, const std::string &key) { + Value val = Lookup(map, key); + if (val.IsNull()) { + return std::make_pair("", true); + } + std::string str = StringValue::Get(val); + return std::make_pair(std::move(str), false); + } + + static std::pair LookupUBigInt(const named_parameter_map_t &map, const std::string &key) { + Value val = Lookup(map, key); + if (val.IsNull()) { + return std::make_pair(0, true); + } + uint64_t num = UBigIntValue::Get(val); + return std::make_pair(num, false); + } + + static std::pair LookupBool(const named_parameter_map_t &map, const std::string &key) { + Value val = Lookup(map, key); + if (val.IsNull()) { + return std::make_pair(false, true); + } + bool flag = BooleanValue::Get(val); + return std::make_pair(flag, false); + } + + ConfigurePoolBindData(const named_parameter_map_t &map) + : catalog_name(LookupString(map, "catalog")), max_connections(LookupUBigInt(map, "max_connections")), + wait_timeout_millis(LookupUBigInt(map, "wait_timeout_millis")), + enable_thread_local_cache(LookupBool(map, "enable_thread_local_cache")), + max_lifetime_millis(LookupUBigInt(map, "max_lifetime_millis")), + idle_timeout_millis(LookupUBigInt(map, "idle_timeout_millis")), + enable_reaper_thread(LookupBool(map, "enable_reaper_thread")), + health_check_query(LookupString(map, "health_check_query")) { + } +}; + +struct GlobalState : public GlobalTableFunctionState {}; + +struct LocalState : public LocalTableFunctionState { + ExecState exec_state = ExecState::UNINITIALIZED; +}; + +} // namespace + +static void AddColumn(vector &return_types, vector &names, const std::string &col_name, + LogicalType col_type) { + names.emplace_back(col_name); + return_types.emplace_back(col_type); +} + +static unique_ptr ConfigurePoolBind(ClientContext &context, TableFunctionBindInput &input, + vector &return_types, vector &names) { + AddColumn(return_types, names, "catalog", LogicalType::VARCHAR); + AddColumn(return_types, names, "available_connections", LogicalType::UBIGINT); + AddColumn(return_types, names, "max_connections", LogicalType::UBIGINT); + AddColumn(return_types, names, "wait_timeout_millis", LogicalType::UBIGINT); + AddColumn(return_types, names, "thread_local_cache_enabled", LogicalType::BOOLEAN); + AddColumn(return_types, names, "thread_local_cache_hits", LogicalType::UBIGINT); + AddColumn(return_types, names, "thread_local_cache_misses", LogicalType::UBIGINT); + AddColumn(return_types, names, "max_lifetime_millis", LogicalType::UBIGINT); + AddColumn(return_types, names, "idle_timeout_millis", LogicalType::UBIGINT); + AddColumn(return_types, names, "reaper_thread_running", LogicalType::BOOLEAN); + AddColumn(return_types, names, "health_check_query", LogicalType::VARCHAR); + + return make_uniq(input.named_parameters); +} + +static unique_ptr ConfigurePoolInitGlobalState(ClientContext &, TableFunctionInitInput &) { + return make_uniq(); +} + +static unique_ptr ConfigurePoolInitLocalState(ExecutionContext &, TableFunctionInitInput &, + GlobalTableFunctionState *) { + return make_uniq(); +} + +static void ConfigurePoolFunction(ClientContext &context, TableFunctionInput &input, DataChunk &output) { + auto &bdata = input.bind_data->Cast(); + auto &lstate = input.local_state->Cast(); + + if (lstate.exec_state == ExecState::EXHAUSTED) { + output.SetCardinality(0); + return; + } + + // collect pools + std::vector cat_names; + std::vector> pools; + auto databases = DatabaseManager::Get(context).GetDatabases(context); + for (auto &db_ref : databases) { + auto &db = *db_ref; + auto &catalog = db.GetCatalog(); + if (catalog.GetCatalogType() != "postgres") { + continue; + } + if (!bdata.catalog_name.second && catalog.GetName() != bdata.catalog_name.first) { + continue; + } + cat_names.push_back(catalog.GetName()); + shared_ptr pool = catalog.Cast().GetConnectionPoolPtr(); + pools.emplace_back(std::move(pool)); + } + + // configure pools + for (auto &pool : pools) { + if (!bdata.max_connections.second) { + pool->SetMaxConnections(bdata.max_connections.first); + } + if (!bdata.wait_timeout_millis.second) { + pool->SetWaitTimeoutMillis(bdata.wait_timeout_millis.first); + } + if (!bdata.enable_thread_local_cache.second) { + pool->SetThreadLocalCacheEnabled(bdata.enable_thread_local_cache.first); + } + if (!bdata.max_lifetime_millis.second) { + pool->SetMaxLifetimeMillis(bdata.max_lifetime_millis.first); + } + if (!bdata.idle_timeout_millis.second) { + pool->SetIdleTimeoutMillis(bdata.idle_timeout_millis.first); + } + if (!bdata.enable_reaper_thread.second) { + if (bdata.enable_reaper_thread.first) { + pool->EnsureReaperRunning(); + } else { + pool->ShutdownReaper(); + } + } + if (!bdata.health_check_query.second) { + pool->SetHealthCheckQuery(bdata.health_check_query.first); + } + } + + // setresults + idx_t row_idx = 0; + for (auto &pool : pools) { + idx_t col_idx = 0; + output.SetValue(col_idx++, row_idx, Value(cat_names.at(row_idx))); + output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetAvailableConnections())); + output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetMaxConnections())); + output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetWaitTimeoutMillis())); + output.SetValue(col_idx++, row_idx, Value::BOOLEAN(pool->IsThreadLocalCacheEnabled())); + output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetThreadLocalCacheHits())); + output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetThreadLocalCacheMisses())); + output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetMaxLifetimeMillis())); + output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetIdleTimeoutMillis())); + output.SetValue(col_idx++, row_idx, Value::BOOLEAN(false)); // todo + output.SetValue(col_idx++, row_idx, Value(pool->GetHealthCheckQuery())); + row_idx++; + } + + output.SetCardinality(row_idx); + lstate.exec_state = ExecState::EXHAUSTED; +} + +PostgresConfigurePoolFunction::PostgresConfigurePoolFunction() + : TableFunction("postgres_configure_pool", std::vector(), ConfigurePoolFunction, ConfigurePoolBind, + ConfigurePoolInitGlobalState, ConfigurePoolInitLocalState) { + named_parameters["catalog"] = LogicalType::VARCHAR; + named_parameters["max_connections"] = LogicalType::UBIGINT; + named_parameters["wait_timeout_millis"] = LogicalType::UBIGINT; + named_parameters["enable_thread_local_cache"] = LogicalType::BOOLEAN; + named_parameters["max_lifetime_millis"] = LogicalType::UBIGINT; + named_parameters["idle_timeout_millis"] = LogicalType::UBIGINT; + named_parameters["enable_reaper_thread"] = LogicalType::BOOLEAN; + named_parameters["health_check_query"] = LogicalType::VARCHAR; +} + +} // namespace duckdb diff --git a/test/sql/storage/attach_connection_pool.test b/test/sql/storage/attach_connection_pool.test index 639fde915..390460f8e 100644 --- a/test/sql/storage/attach_connection_pool.test +++ b/test/sql/storage/attach_connection_pool.test @@ -60,9 +60,56 @@ SELECT COUNT(*) FROM connection_pool ---- 1000000 -# todo: pool introspection is required to check the effect statement ok -SET pg_pool_health_check_query = 'SELECT FAIL' +USE memory statement ok -RESET pg_pool_health_check_query +DETACH s + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +statement ok +USE s; + +query III +SELECT catalog, available_connections, thread_local_cache_enabled +FROM postgres_configure_pool(catalog='s', enable_thread_local_cache=FALSE) +---- +s 0 FALSE + +query II +SELECT catalog, health_check_query +FROM postgres_configure_pool(catalog='s', health_check_query='SELECT FAIL') +---- +s SELECT FAIL + +statement ok +CREATE OR REPLACE TABLE duckdb_connection_pool_test1(col1 INT) + +query II +SELECT catalog, available_connections +FROM postgres_configure_pool(catalog='s', enable_thread_local_cache=FALSE) +---- +s 0 + +query II +SELECT catalog, health_check_query +FROM postgres_configure_pool(catalog='s', health_check_query='SELECT 42') +---- +s SELECT 42 + +statement ok +DROP TABLE duckdb_connection_pool_test1 + +query II +SELECT catalog, available_connections +FROM postgres_configure_pool(catalog='s', enable_thread_local_cache=FALSE) +---- +s 1 + +statement ok +USE memory + +statement ok +DETACH s From 4fcf375585ee53be2cbe91e55f856620021e8c72 Mon Sep 17 00:00:00 2001 From: Alex Kasko Date: Mon, 6 Apr 2026 03:14:51 +0100 Subject: [PATCH 2/3] Pool: global options and test coverage --- database-connector | 2 +- .../storage/postgres_connection_pool.hpp | 2 +- src/postgres_extension.cpp | 83 +++++++- src/storage/postgres_catalog.cpp | 2 +- src/storage/postgres_configure_pool.cpp | 22 ++- src/storage/postgres_connection_pool.cpp | 57 ++++-- test/sql/storage/attach_connection_pool.test | 23 ++- .../attach_connection_pool_configure.test | 130 ++++++++++++ .../attach_connection_pool_options.test | 185 ++++++++++++++++++ 9 files changed, 466 insertions(+), 40 deletions(-) create mode 100644 test/sql/storage/attach_connection_pool_configure.test create mode 100644 test/sql/storage/attach_connection_pool_options.test diff --git a/database-connector b/database-connector index 431b80bd9..746b56c40 160000 --- a/database-connector +++ b/database-connector @@ -1 +1 @@ -Subproject commit 431b80bd9105d37cf8557ddfff4e18a7fd2ec153 +Subproject commit 746b56c4063f3682f4eb4facdc49408ed1885555 diff --git a/src/include/storage/postgres_connection_pool.hpp b/src/include/storage/postgres_connection_pool.hpp index 0332b1cd3..0ee4fda63 100644 --- a/src/include/storage/postgres_connection_pool.hpp +++ b/src/include/storage/postgres_connection_pool.hpp @@ -26,7 +26,7 @@ using PostgresPoolConnection = dbconnector::pool::PooledConnection { public: - PostgresConnectionPool(PostgresCatalog &postgres_catalog); + PostgresConnectionPool(PostgresCatalog &postgres_catalog, ClientContext &context); public: bool TryGetConnection(PostgresPoolConnection &connection); diff --git a/src/postgres_extension.cpp b/src/postgres_extension.cpp index e17fe7bc2..4fbe8fc69 100644 --- a/src/postgres_extension.cpp +++ b/src/postgres_extension.cpp @@ -69,6 +69,13 @@ static void SetPostgresConnectionLimit(ClientContext &context, SetScope scope, V } auto &config = DBConfig::GetConfig(context); config.SetOption("pg_connection_limit", parameter); + + // propagate the value also to 'pg_pool_max_connections' + optional_ptr option; + auto setting_index = config.TryGetSettingIndex("pg_pool_max_connections", option); + if (setting_index.IsValid()) { + context.config.user_settings.SetUserSetting(setting_index.GetIndex(), parameter); + } } static void DisablePool(ClientContext &context, SetScope scope, Value ¶meter) { @@ -76,6 +83,8 @@ static void DisablePool(ClientContext &context, SetScope scope, Value ¶meter throw InvalidInputException("pg_connection_cache can only be set globally"); } if (parameter.IsNull() || BooleanValue::Get(parameter)) { + Value def_size = Value::UBIGINT(PostgresConnectionPool::DefaultPoolSize()); + SetPostgresConnectionLimit(context, scope, def_size); return; } Value zero = Value::UBIGINT(0); @@ -138,6 +147,12 @@ void SetPostgresNullByteReplacement(ClientContext &context, SetScope scope, Valu } } +static std::string CreatePoolNote(const std::string &option) { + return std::string() + "This option only applies to newly attached Postgres databases, " + + "to configure a database that is already attached use " + + "\"FROM postgres_configure_pool(catalog_name='my_attached_postgres_db', " + option + ")\""; +} + static void LoadInternal(ExtensionLoader &loader) { PostgresScanFunction postgres_fun; loader.RegisterFunction(postgres_fun); @@ -187,14 +202,25 @@ static void LoadInternal(ExtensionLoader &loader) { LogicalType::BOOLEAN, Value::BOOLEAN(true)); config.AddExtensionOption("pg_pages_per_task", "The amount of pages per task", LogicalType::UBIGINT, Value::UBIGINT(PostgresBindData::DEFAULT_PAGES_PER_TASK)); - config.AddExtensionOption("pg_connection_limit", "The maximum amount of concurrent Postgres connections", - LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DefaultPoolSize()), - SetPostgresConnectionLimit); + config.AddExtensionOption( + "pg_connection_limit", + "The maximum amount of concurrent Postgres connections." + " This option is deprecated, instead use \"SET pg_pool_max_connections = 42\" for newly attached Postgres " + "databases and \"FROM postgres_configure_pool(catalog_name='my_attached_postgres_db', max_connections=42)\" " + "for " + "already attached Postgres databases.", + LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DefaultPoolSize()), SetPostgresConnectionLimit); config.AddExtensionOption( "pg_array_as_varchar", "Read Postgres arrays as varchar - enables reading mixed dimensional arrays", LogicalType::BOOLEAN, Value::BOOLEAN(false), PostgresClearCacheFunction::ClearCacheOnSetting); - config.AddExtensionOption("pg_connection_cache", "Whether or not to use the connection cache", LogicalType::BOOLEAN, - Value::BOOLEAN(true), DisablePool); + config.AddExtensionOption( + "pg_connection_cache", + "Whether or not to use the connection pooling." + " This option is deprecated, instead to disable the connection pooling use \"SET pg_pool_max_connections=0\" " + "for newly attached Postgres databases and \"FROM " + "postgres_configure_pool(catalog_name='my_attached_postgres_db', " + "max_connections=0)\" for already attached Postgres databases.", + LogicalType::BOOLEAN, Value::BOOLEAN(true), DisablePool); config.AddExtensionOption("pg_experimental_filter_pushdown", "Whether or not to use filter pushdown", LogicalType::BOOLEAN, Value::BOOLEAN(true)); config.AddExtensionOption("pg_null_byte_replacement", @@ -213,7 +239,52 @@ static void LoadInternal(ExtensionLoader &loader) { "Postgres idle in transaction timeout in milliseconds to set on scan connections", LogicalType::UINTEGER, Value()); // connection pool options - config.AddExtensionOption("pg_pool_health_check_query", "The query to use to check that the connection is healthy", + config.AddExtensionOption("pg_pool_max_connections", + "Maximum number of connections that are allowed to be cached in a connection pool for " + "each attached Postgres database. " + "This number can be temporary exceeded when parallel scans are used. " + + CreatePoolNote("max_connections=42"), + LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DefaultPoolSize())); + config.AddExtensionOption("pg_pool_wait_timeout_millis", + "Maximum number of milliseconds to wait when acquiring a connection from a pool where " + "all available connections are already taken. " + + CreatePoolNote("wait_timeout_millis=60000"), + LogicalType::UBIGINT, + Value::UBIGINT(dbconnector::pool::ConnectionPoolConfig().wait_timeout_millis)); + config.AddExtensionOption( + "pg_pool_enable_thread_local_cache", + "Whether to enable the connection caching in thread-local cache. Such connections are getting pinned to the " + "threads and are not made available to other threads, while still taking the place in the pool. " + + CreatePoolNote("enable_thread_local_cache=FALSE"), + LogicalType::BOOLEAN, Value::BOOLEAN(dbconnector::pool::ConnectionPoolConfig().tl_cache_enabled)); + config.AddExtensionOption("pg_pool_max_lifetime_millis", + "Maximum number of milliseconds the connection can be kept open. This number is checked " + "when the connection is taken from the pool and returned to the pool. " + "When the connection pool reaper thread is enabled ('pg_pool_enable_reaper_thread' " + "option), then this number is checked in background periodically. " + + CreatePoolNote("max_lifetime_millis=600000"), + LogicalType::UBIGINT, + Value::UBIGINT(dbconnector::pool::ConnectionPoolConfig().max_lifetime_millis)); + config.AddExtensionOption("pg_pool_idle_timeout_millis", + "Maximum number of milliseconds the connection can be kept idle in the pool. This number " + "is checked when the connection is taken from the pool. " + "When the connection pool reaper thread is enabled ('pg_pool_enable_reaper_thread' " + "option), then this number is checked in background periodically. " + + CreatePoolNote("idle_timeout_millis=300000"), + LogicalType::UBIGINT, + Value::UBIGINT(dbconnector::pool::ConnectionPoolConfig().idle_timeout_millis)); + config.AddExtensionOption( + "pg_pool_enable_reaper_thread", + "Whether to enable the connection pool reaper thread, that periodically scans the pool to check the " + "'max_lifetime_millis' and 'idle_timeout_millis' and closes the connection which exceed the specified values. " + "Either 'max_lifetime_millis' or 'idle_timeout_millis' must be set to a non-zero value for this option to be " + "effective. " + + CreatePoolNote("enable_reaper_thread=TRUE"), + LogicalType::BOOLEAN, Value::BOOLEAN(dbconnector::pool::ConnectionPoolConfig().start_reaper_thread)); + config.AddExtensionOption("pg_pool_health_check_query", + "The query that is used to check that the connection is healthy. Setting this option to " + "an empty string disables the health check. " + + CreatePoolNote("health_check_query=SELECT 42"), LogicalType::VARCHAR, PostgresConnectionPool::DefaultHealthCheckQuery()); OptimizerExtension postgres_optimizer; diff --git a/src/storage/postgres_catalog.cpp b/src/storage/postgres_catalog.cpp index 0f7fc0c10..868c3291f 100644 --- a/src/storage/postgres_catalog.cpp +++ b/src/storage/postgres_catalog.cpp @@ -15,7 +15,7 @@ PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, string connection_strin ClientContext &context) : Catalog(db_p), connection_string(std::move(connection_string_p)), attach_path(std::move(attach_path_p)), access_mode(access_mode), isolation_level(isolation_level), schemas(*this, schema_to_load), - connection_pool(make_shared_ptr(*this)), default_schema(schema_to_load) { + connection_pool(make_shared_ptr(*this, context)), default_schema(schema_to_load) { if (default_schema.empty()) { default_schema = "public"; } diff --git a/src/storage/postgres_configure_pool.cpp b/src/storage/postgres_configure_pool.cpp index d5f1f3e68..3280e9854 100644 --- a/src/storage/postgres_configure_pool.cpp +++ b/src/storage/postgres_configure_pool.cpp @@ -59,13 +59,20 @@ struct ConfigurePoolBindData : public TableFunctionData { } ConfigurePoolBindData(const named_parameter_map_t &map) - : catalog_name(LookupString(map, "catalog")), max_connections(LookupUBigInt(map, "max_connections")), + : catalog_name(LookupString(map, "catalog_name")), max_connections(LookupUBigInt(map, "max_connections")), wait_timeout_millis(LookupUBigInt(map, "wait_timeout_millis")), enable_thread_local_cache(LookupBool(map, "enable_thread_local_cache")), max_lifetime_millis(LookupUBigInt(map, "max_lifetime_millis")), idle_timeout_millis(LookupUBigInt(map, "idle_timeout_millis")), enable_reaper_thread(LookupBool(map, "enable_reaper_thread")), health_check_query(LookupString(map, "health_check_query")) { + if (catalog_name.second && + !(max_connections.second && wait_timeout_millis.second && enable_thread_local_cache.second && + max_lifetime_millis.second && idle_timeout_millis.second && enable_reaper_thread.second && + health_check_query.second)) { + throw BinderException("'catalog_name' argument must be specified to change any option value on the " + "connection pool of this catalog"); + } } }; @@ -85,7 +92,7 @@ static void AddColumn(vector &return_types, vector &names, static unique_ptr ConfigurePoolBind(ClientContext &context, TableFunctionBindInput &input, vector &return_types, vector &names) { - AddColumn(return_types, names, "catalog", LogicalType::VARCHAR); + AddColumn(return_types, names, "catalog_name", LogicalType::VARCHAR); AddColumn(return_types, names, "available_connections", LogicalType::UBIGINT); AddColumn(return_types, names, "max_connections", LogicalType::UBIGINT); AddColumn(return_types, names, "wait_timeout_millis", LogicalType::UBIGINT); @@ -136,8 +143,9 @@ static void ConfigurePoolFunction(ClientContext &context, TableFunctionInput &in pools.emplace_back(std::move(pool)); } - // configure pools - for (auto &pool : pools) { + // configure the single pool if specified + if (!bdata.catalog_name.second && pools.size() > 0) { + auto &pool = pools.at(0); if (!bdata.max_connections.second) { pool->SetMaxConnections(bdata.max_connections.first); } @@ -165,7 +173,7 @@ static void ConfigurePoolFunction(ClientContext &context, TableFunctionInput &in } } - // setresults + // set results idx_t row_idx = 0; for (auto &pool : pools) { idx_t col_idx = 0; @@ -178,7 +186,7 @@ static void ConfigurePoolFunction(ClientContext &context, TableFunctionInput &in output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetThreadLocalCacheMisses())); output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetMaxLifetimeMillis())); output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetIdleTimeoutMillis())); - output.SetValue(col_idx++, row_idx, Value::BOOLEAN(false)); // todo + output.SetValue(col_idx++, row_idx, Value::BOOLEAN(pool->IsReaperRunning())); output.SetValue(col_idx++, row_idx, Value(pool->GetHealthCheckQuery())); row_idx++; } @@ -190,7 +198,7 @@ static void ConfigurePoolFunction(ClientContext &context, TableFunctionInput &in PostgresConfigurePoolFunction::PostgresConfigurePoolFunction() : TableFunction("postgres_configure_pool", std::vector(), ConfigurePoolFunction, ConfigurePoolBind, ConfigurePoolInitGlobalState, ConfigurePoolInitLocalState) { - named_parameters["catalog"] = LogicalType::VARCHAR; + named_parameters["catalog_name"] = LogicalType::VARCHAR; named_parameters["max_connections"] = LogicalType::UBIGINT; named_parameters["wait_timeout_millis"] = LogicalType::UBIGINT; named_parameters["enable_thread_local_cache"] = LogicalType::BOOLEAN; diff --git a/src/storage/postgres_connection_pool.cpp b/src/storage/postgres_connection_pool.cpp index c82d7da26..085fe3ad3 100644 --- a/src/storage/postgres_connection_pool.cpp +++ b/src/storage/postgres_connection_pool.cpp @@ -6,11 +6,11 @@ namespace duckdb { -static dbconnector::pool::ConnectionPoolConfig CreateConfig(PostgresCatalog &postgres_catalog); +static dbconnector::pool::ConnectionPoolConfig CreateConfig(ClientContext &context); -static std::string GetHealthCheckQueryFromConfig(PostgresCatalog &postgres_catalog) { +static std::string GetHealthCheckQueryFromConfig(ClientContext &context) { Value val; - if (postgres_catalog.GetDatabase().TryGetCurrentSetting("pg_pool_health_check_query", val)) { + if (context.TryGetCurrentSetting("pg_pool_health_check_query", val)) { if (val.IsNull()) { return std::string(); } @@ -19,9 +19,9 @@ static std::string GetHealthCheckQueryFromConfig(PostgresCatalog &postgres_catal return PostgresConnectionPool::DefaultHealthCheckQuery(); } -PostgresConnectionPool::PostgresConnectionPool(PostgresCatalog &postgres_catalog) - : dbconnector::pool::ConnectionPool(CreateConfig(postgres_catalog)), - postgres_catalog(postgres_catalog), health_check_query(GetHealthCheckQueryFromConfig(postgres_catalog)) { +PostgresConnectionPool::PostgresConnectionPool(PostgresCatalog &postgres_catalog, ClientContext &context) + : dbconnector::pool::ConnectionPool(CreateConfig(context)), postgres_catalog(postgres_catalog), + health_check_query(GetHealthCheckQueryFromConfig(context)) { } PostgresPoolConnection PostgresConnectionPool::ForceGetConnection() { @@ -81,17 +81,46 @@ std::string PostgresConnectionPool::DefaultHealthCheckQuery() { return "SELECT 1"; } -static dbconnector::pool::ConnectionPoolConfig CreateConfig(PostgresCatalog &postgres_catalog) { - DatabaseInstance &db = postgres_catalog.GetDatabase(); +static dbconnector::pool::ConnectionPoolConfig CreateConfig(ClientContext &ctx) { + dbconnector::pool::ConnectionPoolConfig config; - Value connection_limit; - uint64_t max_connections = PostgresConnectionPool::DefaultPoolSize(); - if (db.TryGetCurrentSetting("pg_connection_limit", connection_limit) && !connection_limit.IsNull()) { - max_connections = UBigIntValue::Get(connection_limit); + { + Value val; + if (ctx.TryGetCurrentSetting("pg_pool_max_connections", val) && !val.IsNull()) { + config.max_connections = UBigIntValue::Get(val); + } + } + { + Value val; + if (ctx.TryGetCurrentSetting("pg_pool_wait_timeout_millis", val) && !val.IsNull()) { + config.wait_timeout_millis = UBigIntValue::Get(val); + } + } + { + Value val; + if (ctx.TryGetCurrentSetting("pg_pool_enable_thread_local_cache", val) && !val.IsNull()) { + config.tl_cache_enabled = BooleanValue::Get(val); + } + } + { + Value val; + if (ctx.TryGetCurrentSetting("pg_pool_max_lifetime_millis", val) && !val.IsNull()) { + config.max_lifetime_millis = UBigIntValue::Get(val); + } + } + { + Value val; + if (ctx.TryGetCurrentSetting("pg_pool_idle_timeout_millis", val) && !val.IsNull()) { + config.idle_timeout_millis = UBigIntValue::Get(val); + } + } + { + Value val; + if (ctx.TryGetCurrentSetting("pg_pool_enable_reaper_thread", val) && !val.IsNull()) { + config.start_reaper_thread = BooleanValue::Get(val); + } } - dbconnector::pool::ConnectionPoolConfig config; - config.max_connections = max_connections; return config; } diff --git a/test/sql/storage/attach_connection_pool.test b/test/sql/storage/attach_connection_pool.test index 390460f8e..1e1f08c37 100644 --- a/test/sql/storage/attach_connection_pool.test +++ b/test/sql/storage/attach_connection_pool.test @@ -72,15 +72,18 @@ ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); statement ok USE s; +statement ok +FROM postgres_configure_pool(catalog_name='s', enable_thread_local_cache=FALSE) + query III -SELECT catalog, available_connections, thread_local_cache_enabled -FROM postgres_configure_pool(catalog='s', enable_thread_local_cache=FALSE) +SELECT catalog_name, available_connections, thread_local_cache_enabled +FROM postgres_configure_pool(catalog_name='s') ---- s 0 FALSE query II -SELECT catalog, health_check_query -FROM postgres_configure_pool(catalog='s', health_check_query='SELECT FAIL') +SELECT catalog_name, health_check_query +FROM postgres_configure_pool(catalog_name='s', health_check_query='SELECT FAIL') ---- s SELECT FAIL @@ -88,14 +91,14 @@ statement ok CREATE OR REPLACE TABLE duckdb_connection_pool_test1(col1 INT) query II -SELECT catalog, available_connections -FROM postgres_configure_pool(catalog='s', enable_thread_local_cache=FALSE) +SELECT catalog_name, available_connections +FROM postgres_configure_pool(catalog_name='s', enable_thread_local_cache=FALSE) ---- s 0 query II -SELECT catalog, health_check_query -FROM postgres_configure_pool(catalog='s', health_check_query='SELECT 42') +SELECT catalog_name, health_check_query +FROM postgres_configure_pool(catalog_name='s', health_check_query='SELECT 42') ---- s SELECT 42 @@ -103,8 +106,8 @@ statement ok DROP TABLE duckdb_connection_pool_test1 query II -SELECT catalog, available_connections -FROM postgres_configure_pool(catalog='s', enable_thread_local_cache=FALSE) +SELECT catalog_name, available_connections +FROM postgres_configure_pool(catalog_name='s', enable_thread_local_cache=FALSE) ---- s 1 diff --git a/test/sql/storage/attach_connection_pool_configure.test b/test/sql/storage/attach_connection_pool_configure.test new file mode 100644 index 000000000..c6b0a7447 --- /dev/null +++ b/test/sql/storage/attach_connection_pool_configure.test @@ -0,0 +1,130 @@ +# name: test/sql/storage/attach_connection_pool_configure.test +# description: Test connection pool live configuration +# group: [storage] + +require postgres_scanner + +require-env POSTGRES_TEST_DATABASE_AVAILABLE + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +statement ok +ATTACH 'dbname=postgresscanner' AS s1 (TYPE POSTGRES); + +query I +SELECT catalog_name +FROM postgres_configure_pool() +ORDER BY catalog_name +---- +s +s1 + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s') +---- +s + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s', max_connections=42) +---- +s + +query II +SELECT catalog_name, max_connections +FROM postgres_configure_pool(catalog_name='s') +---- +s 42 + +query II +SELECT catalog_name, max_connections +FROM postgres_configure_pool() +ORDER BY catalog_name +---- +s 42 +s1 8 + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s', wait_timeout_millis=42000) +---- +s + +query II +SELECT catalog_name, wait_timeout_millis +FROM postgres_configure_pool(catalog_name='s') +---- +s 42000 + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s', enable_thread_local_cache=FALSE) +---- +s + +query II +SELECT catalog_name, thread_local_cache_enabled +FROM postgres_configure_pool(catalog_name='s') +---- +s FALSE + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s', max_lifetime_millis=42000) +---- +s + +query II +SELECT catalog_name, max_lifetime_millis +FROM postgres_configure_pool(catalog_name='s') +---- +s 42000 + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s', idle_timeout_millis=42000) +---- +s + +query II +SELECT catalog_name, idle_timeout_millis +FROM postgres_configure_pool(catalog_name='s') +---- +s 42000 + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s', enable_reaper_thread=TRUE) +---- +s + +query II +SELECT catalog_name, reaper_thread_running +FROM postgres_configure_pool(catalog_name='s') +---- +s TRUE + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s', health_check_query='') +---- +s + +query II +SELECT catalog_name, health_check_query +FROM postgres_configure_pool(catalog_name='s') +---- +s (empty) + +statement error +FROM postgres_configure_pool(max_connections=42) +---- +'catalog_name' argument must be specified + +statement ok +DETACH s1 + +statement ok +DETACH s diff --git a/test/sql/storage/attach_connection_pool_options.test b/test/sql/storage/attach_connection_pool_options.test new file mode 100644 index 000000000..1d0e10086 --- /dev/null +++ b/test/sql/storage/attach_connection_pool_options.test @@ -0,0 +1,185 @@ +# name: test/sql/storage/attach_connection_pool_options.test +# description: Test connection pool options +# group: [storage] + +require postgres_scanner + +require-env POSTGRES_TEST_DATABASE_AVAILABLE + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +statement ok +DETACH s + +statement ok +SET pg_pool_max_connections = 42 + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, max_connections +FROM postgres_configure_pool(catalog_name='s') +---- +s 42 + +statement ok +RESET pg_pool_max_connections + +statement ok +DETACH s + +statement ok +SET pg_pool_wait_timeout_millis = 42000 + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, wait_timeout_millis +FROM postgres_configure_pool(catalog_name='s') +---- +s 42000 + +statement ok +RESET pg_pool_wait_timeout_millis + +statement ok +DETACH s + +statement ok +SET pg_pool_enable_thread_local_cache = FALSE + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, thread_local_cache_enabled +FROM postgres_configure_pool(catalog_name='s') +---- +s FALSE + +statement ok +RESET pg_pool_enable_thread_local_cache + +statement ok +DETACH s + +statement ok +SET pg_pool_max_lifetime_millis = 42000 + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, max_lifetime_millis +FROM postgres_configure_pool(catalog_name='s') +---- +s 42000 + +statement ok +RESET pg_pool_max_lifetime_millis + +statement ok +DETACH s + +statement ok +SET pg_pool_idle_timeout_millis = 42000 + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, idle_timeout_millis +FROM postgres_configure_pool(catalog_name='s') +---- +s 42000 + +statement ok +RESET pg_pool_idle_timeout_millis + +statement ok +DETACH s + +statement ok +SET pg_pool_enable_reaper_thread = TRUE + +statement ok +SET pg_pool_idle_timeout_millis = 42000 + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, reaper_thread_running +FROM postgres_configure_pool(catalog_name='s') +---- +s TRUE + +statement ok +RESET pg_pool_enable_reaper_thread + +statement ok +RESET pg_pool_idle_timeout_millis + +statement ok +DETACH s + +# legacy options + +statement ok +SET pg_connection_limit = 42 + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, max_connections +FROM postgres_configure_pool(catalog_name='s') +---- +s 42 + +query I +SELECT current_setting('pg_pool_max_connections') +---- +42 + +statement ok +RESET pg_connection_limit + +query I +SELECT current_setting('pg_pool_max_connections') +---- +8 + +statement ok +DETACH s + +statement ok +SET pg_connection_cache = FALSE + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, max_connections +FROM postgres_configure_pool(catalog_name='s') +---- +s 0 + +query I +SELECT current_setting('pg_pool_max_connections') +---- +0 + +statement ok +RESET pg_connection_cache + +query I +SELECT current_setting('pg_pool_max_connections') +---- +8 + +statement ok +DETACH s From 97048dae1eea3111cec1db0929ed7605045e28cf Mon Sep 17 00:00:00 2001 From: Alex Kasko Date: Mon, 6 Apr 2026 09:26:47 +0100 Subject: [PATCH 3/3] Fix flaky test --- test/sql/storage/attach_connection_pool.test | 23 ++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/test/sql/storage/attach_connection_pool.test b/test/sql/storage/attach_connection_pool.test index 1e1f08c37..6b39caaa8 100644 --- a/test/sql/storage/attach_connection_pool.test +++ b/test/sql/storage/attach_connection_pool.test @@ -60,6 +60,9 @@ SELECT COUNT(*) FROM connection_pool ---- 1000000 +statement ok +RESET threads + statement ok USE memory @@ -81,24 +84,36 @@ FROM postgres_configure_pool(catalog_name='s') ---- s 0 FALSE -query II +statement ok SELECT catalog_name, health_check_query FROM postgres_configure_pool(catalog_name='s', health_check_query='SELECT FAIL') + +query II +SELECT catalog_name, health_check_query +FROM postgres_configure_pool(catalog_name='s') ---- s SELECT FAIL statement ok CREATE OR REPLACE TABLE duckdb_connection_pool_test1(col1 INT) -query II +statement ok SELECT catalog_name, available_connections FROM postgres_configure_pool(catalog_name='s', enable_thread_local_cache=FALSE) + +query II +SELECT catalog_name, available_connections +FROM postgres_configure_pool(catalog_name='s') ---- s 0 -query II +statement ok SELECT catalog_name, health_check_query FROM postgres_configure_pool(catalog_name='s', health_check_query='SELECT 42') + +query II +SELECT catalog_name, health_check_query +FROM postgres_configure_pool(catalog_name='s') ---- s SELECT 42 @@ -107,7 +122,7 @@ DROP TABLE duckdb_connection_pool_test1 query II SELECT catalog_name, available_connections -FROM postgres_configure_pool(catalog_name='s', enable_thread_local_cache=FALSE) +FROM postgres_configure_pool(catalog_name='s') ---- s 1