diff --git a/database-connector b/database-connector index 431b80bd9..746b56c40 160000 --- a/database-connector +++ b/database-connector @@ -1 +1 @@ -Subproject commit 431b80bd9105d37cf8557ddfff4e18a7fd2ec153 +Subproject commit 746b56c4063f3682f4eb4facdc49408ed1885555 diff --git a/src/include/storage/postgres_catalog.hpp b/src/include/storage/postgres_catalog.hpp index a0693c968..74fa3577e 100644 --- a/src/include/storage/postgres_catalog.hpp +++ b/src/include/storage/postgres_catalog.hpp @@ -8,6 +8,8 @@ #pragma once +#include + #include "duckdb/catalog/catalog.hpp" #include "duckdb/common/enums/access_mode.hpp" #include "postgres_connection.hpp" @@ -81,6 +83,10 @@ class PostgresCatalog : public Catalog { return *connection_pool; } + shared_ptr GetConnectionPoolPtr() { + return connection_pool; + } + void ClearCache(); //! Whether or not this catalog should search a specific type with the standard priority diff --git a/src/include/storage/postgres_connection_pool.hpp b/src/include/storage/postgres_connection_pool.hpp index 2ab44893d..0ee4fda63 100644 --- a/src/include/storage/postgres_connection_pool.hpp +++ b/src/include/storage/postgres_connection_pool.hpp @@ -26,7 +26,7 @@ using PostgresPoolConnection = dbconnector::pool::PooledConnection { public: - PostgresConnectionPool(PostgresCatalog &postgres_catalog); + PostgresConnectionPool(PostgresCatalog &postgres_catalog, ClientContext &context); public: bool TryGetConnection(PostgresPoolConnection &connection); @@ -52,4 +52,9 @@ class PostgresConnectionPool : public dbconnector::pool::ConnectionPool option; + auto setting_index = config.TryGetSettingIndex("pg_pool_max_connections", option); + if (setting_index.IsValid()) { + context.config.user_settings.SetUserSetting(setting_index.GetIndex(), parameter); + } } static void DisablePool(ClientContext &context, SetScope scope, Value ¶meter) { @@ -75,6 +83,8 @@ static void DisablePool(ClientContext &context, SetScope scope, Value ¶meter throw InvalidInputException("pg_connection_cache can only be set globally"); } if (parameter.IsNull() || BooleanValue::Get(parameter)) { + Value def_size = Value::UBIGINT(PostgresConnectionPool::DefaultPoolSize()); + SetPostgresConnectionLimit(context, scope, def_size); return; } Value zero = Value::UBIGINT(0); @@ -137,6 +147,12 @@ void SetPostgresNullByteReplacement(ClientContext &context, SetScope scope, Valu } } +static std::string CreatePoolNote(const std::string &option) { + return std::string() + "This option only applies to newly attached Postgres databases, " + + "to configure a database that is already attached use " + + "\"FROM postgres_configure_pool(catalog_name='my_attached_postgres_db', " + option + ")\""; +} + static void LoadInternal(ExtensionLoader &loader) { PostgresScanFunction postgres_fun; loader.RegisterFunction(postgres_fun); @@ -162,6 +178,9 @@ static void LoadInternal(ExtensionLoader &loader) { PostgresReadBinaryFunction read_binary_func; loader.RegisterFunction(read_binary_func); + PostgresConfigurePoolFunction configure_pool_function; + loader.RegisterFunction(configure_pool_function); + // Register the new type SecretType secret_type; secret_type.name = "postgres"; @@ -183,14 +202,25 @@ static void LoadInternal(ExtensionLoader &loader) { LogicalType::BOOLEAN, Value::BOOLEAN(true)); config.AddExtensionOption("pg_pages_per_task", "The amount of pages per task", LogicalType::UBIGINT, Value::UBIGINT(PostgresBindData::DEFAULT_PAGES_PER_TASK)); - config.AddExtensionOption("pg_connection_limit", "The maximum amount of concurrent Postgres connections", - LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DefaultPoolSize()), - SetPostgresConnectionLimit); + config.AddExtensionOption( + "pg_connection_limit", + "The maximum amount of concurrent Postgres connections." + " This option is deprecated, instead use \"SET pg_pool_max_connections = 42\" for newly attached Postgres " + "databases and \"FROM postgres_configure_pool(catalog_name='my_attached_postgres_db', max_connections=42)\" " + "for " + "already attached Postgres databases.", + LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DefaultPoolSize()), SetPostgresConnectionLimit); config.AddExtensionOption( "pg_array_as_varchar", "Read Postgres arrays as varchar - enables reading mixed dimensional arrays", LogicalType::BOOLEAN, Value::BOOLEAN(false), PostgresClearCacheFunction::ClearCacheOnSetting); - config.AddExtensionOption("pg_connection_cache", "Whether or not to use the connection cache", LogicalType::BOOLEAN, - Value::BOOLEAN(true), DisablePool); + config.AddExtensionOption( + "pg_connection_cache", + "Whether or not to use the connection pooling." + " This option is deprecated, instead to disable the connection pooling use \"SET pg_pool_max_connections=0\" " + "for newly attached Postgres databases and \"FROM " + "postgres_configure_pool(catalog_name='my_attached_postgres_db', " + "max_connections=0)\" for already attached Postgres databases.", + LogicalType::BOOLEAN, Value::BOOLEAN(true), DisablePool); config.AddExtensionOption("pg_experimental_filter_pushdown", "Whether or not to use filter pushdown", LogicalType::BOOLEAN, Value::BOOLEAN(true)); config.AddExtensionOption("pg_null_byte_replacement", @@ -209,7 +239,52 @@ static void LoadInternal(ExtensionLoader &loader) { "Postgres idle in transaction timeout in milliseconds to set on scan connections", LogicalType::UINTEGER, Value()); // connection pool options - config.AddExtensionOption("pg_pool_health_check_query", "The query to use to check that the connection is healthy", + config.AddExtensionOption("pg_pool_max_connections", + "Maximum number of connections that are allowed to be cached in a connection pool for " + "each attached Postgres database. " + "This number can be temporary exceeded when parallel scans are used. " + + CreatePoolNote("max_connections=42"), + LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DefaultPoolSize())); + config.AddExtensionOption("pg_pool_wait_timeout_millis", + "Maximum number of milliseconds to wait when acquiring a connection from a pool where " + "all available connections are already taken. " + + CreatePoolNote("wait_timeout_millis=60000"), + LogicalType::UBIGINT, + Value::UBIGINT(dbconnector::pool::ConnectionPoolConfig().wait_timeout_millis)); + config.AddExtensionOption( + "pg_pool_enable_thread_local_cache", + "Whether to enable the connection caching in thread-local cache. Such connections are getting pinned to the " + "threads and are not made available to other threads, while still taking the place in the pool. " + + CreatePoolNote("enable_thread_local_cache=FALSE"), + LogicalType::BOOLEAN, Value::BOOLEAN(dbconnector::pool::ConnectionPoolConfig().tl_cache_enabled)); + config.AddExtensionOption("pg_pool_max_lifetime_millis", + "Maximum number of milliseconds the connection can be kept open. This number is checked " + "when the connection is taken from the pool and returned to the pool. " + "When the connection pool reaper thread is enabled ('pg_pool_enable_reaper_thread' " + "option), then this number is checked in background periodically. " + + CreatePoolNote("max_lifetime_millis=600000"), + LogicalType::UBIGINT, + Value::UBIGINT(dbconnector::pool::ConnectionPoolConfig().max_lifetime_millis)); + config.AddExtensionOption("pg_pool_idle_timeout_millis", + "Maximum number of milliseconds the connection can be kept idle in the pool. This number " + "is checked when the connection is taken from the pool. " + "When the connection pool reaper thread is enabled ('pg_pool_enable_reaper_thread' " + "option), then this number is checked in background periodically. " + + CreatePoolNote("idle_timeout_millis=300000"), + LogicalType::UBIGINT, + Value::UBIGINT(dbconnector::pool::ConnectionPoolConfig().idle_timeout_millis)); + config.AddExtensionOption( + "pg_pool_enable_reaper_thread", + "Whether to enable the connection pool reaper thread, that periodically scans the pool to check the " + "'max_lifetime_millis' and 'idle_timeout_millis' and closes the connection which exceed the specified values. " + "Either 'max_lifetime_millis' or 'idle_timeout_millis' must be set to a non-zero value for this option to be " + "effective. " + + CreatePoolNote("enable_reaper_thread=TRUE"), + LogicalType::BOOLEAN, Value::BOOLEAN(dbconnector::pool::ConnectionPoolConfig().start_reaper_thread)); + config.AddExtensionOption("pg_pool_health_check_query", + "The query that is used to check that the connection is healthy. Setting this option to " + "an empty string disables the health check. " + + CreatePoolNote("health_check_query=SELECT 42"), LogicalType::VARCHAR, PostgresConnectionPool::DefaultHealthCheckQuery()); OptimizerExtension postgres_optimizer; diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index f4a66f302..5153a9d96 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -4,6 +4,7 @@ add_library( postgres_catalog_set.cpp postgres_connection_pool.cpp postgres_clear_cache.cpp + postgres_configure_pool.cpp postgres_delete.cpp postgres_index.cpp postgres_index_entry.cpp diff --git a/src/storage/postgres_catalog.cpp b/src/storage/postgres_catalog.cpp index 0f7fc0c10..868c3291f 100644 --- a/src/storage/postgres_catalog.cpp +++ b/src/storage/postgres_catalog.cpp @@ -15,7 +15,7 @@ PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, string connection_strin ClientContext &context) : Catalog(db_p), connection_string(std::move(connection_string_p)), attach_path(std::move(attach_path_p)), access_mode(access_mode), isolation_level(isolation_level), schemas(*this, schema_to_load), - connection_pool(make_shared_ptr(*this)), default_schema(schema_to_load) { + connection_pool(make_shared_ptr(*this, context)), default_schema(schema_to_load) { if (default_schema.empty()) { default_schema = "public"; } diff --git a/src/storage/postgres_configure_pool.cpp b/src/storage/postgres_configure_pool.cpp new file mode 100644 index 000000000..3280e9854 --- /dev/null +++ b/src/storage/postgres_configure_pool.cpp @@ -0,0 +1,211 @@ +#include "storage/postgres_connection_pool.hpp" + +#include + +#include "duckdb/function/table_function.hpp" +#include "duckdb/main/database_manager.hpp" + +#include "storage/postgres_catalog.hpp" + +namespace duckdb { + +namespace { + +enum class ExecState { UNINITIALIZED, EXHAUSTED }; + +struct ConfigurePoolBindData : public TableFunctionData { + std::pair catalog_name; + std::pair max_connections; + std::pair wait_timeout_millis; + std::pair enable_thread_local_cache; + std::pair max_lifetime_millis; + std::pair idle_timeout_millis; + std::pair enable_reaper_thread; + std::pair health_check_query; + + static Value Lookup(const named_parameter_map_t &map, const std::string &key) { + auto it = map.find(key); + if (it == map.end()) { + return Value(); + } + return it->second; + } + + static std::pair LookupString(const named_parameter_map_t &map, const std::string &key) { + Value val = Lookup(map, key); + if (val.IsNull()) { + return std::make_pair("", true); + } + std::string str = StringValue::Get(val); + return std::make_pair(std::move(str), false); + } + + static std::pair LookupUBigInt(const named_parameter_map_t &map, const std::string &key) { + Value val = Lookup(map, key); + if (val.IsNull()) { + return std::make_pair(0, true); + } + uint64_t num = UBigIntValue::Get(val); + return std::make_pair(num, false); + } + + static std::pair LookupBool(const named_parameter_map_t &map, const std::string &key) { + Value val = Lookup(map, key); + if (val.IsNull()) { + return std::make_pair(false, true); + } + bool flag = BooleanValue::Get(val); + return std::make_pair(flag, false); + } + + ConfigurePoolBindData(const named_parameter_map_t &map) + : catalog_name(LookupString(map, "catalog_name")), max_connections(LookupUBigInt(map, "max_connections")), + wait_timeout_millis(LookupUBigInt(map, "wait_timeout_millis")), + enable_thread_local_cache(LookupBool(map, "enable_thread_local_cache")), + max_lifetime_millis(LookupUBigInt(map, "max_lifetime_millis")), + idle_timeout_millis(LookupUBigInt(map, "idle_timeout_millis")), + enable_reaper_thread(LookupBool(map, "enable_reaper_thread")), + health_check_query(LookupString(map, "health_check_query")) { + if (catalog_name.second && + !(max_connections.second && wait_timeout_millis.second && enable_thread_local_cache.second && + max_lifetime_millis.second && idle_timeout_millis.second && enable_reaper_thread.second && + health_check_query.second)) { + throw BinderException("'catalog_name' argument must be specified to change any option value on the " + "connection pool of this catalog"); + } + } +}; + +struct GlobalState : public GlobalTableFunctionState {}; + +struct LocalState : public LocalTableFunctionState { + ExecState exec_state = ExecState::UNINITIALIZED; +}; + +} // namespace + +static void AddColumn(vector &return_types, vector &names, const std::string &col_name, + LogicalType col_type) { + names.emplace_back(col_name); + return_types.emplace_back(col_type); +} + +static unique_ptr ConfigurePoolBind(ClientContext &context, TableFunctionBindInput &input, + vector &return_types, vector &names) { + AddColumn(return_types, names, "catalog_name", LogicalType::VARCHAR); + AddColumn(return_types, names, "available_connections", LogicalType::UBIGINT); + AddColumn(return_types, names, "max_connections", LogicalType::UBIGINT); + AddColumn(return_types, names, "wait_timeout_millis", LogicalType::UBIGINT); + AddColumn(return_types, names, "thread_local_cache_enabled", LogicalType::BOOLEAN); + AddColumn(return_types, names, "thread_local_cache_hits", LogicalType::UBIGINT); + AddColumn(return_types, names, "thread_local_cache_misses", LogicalType::UBIGINT); + AddColumn(return_types, names, "max_lifetime_millis", LogicalType::UBIGINT); + AddColumn(return_types, names, "idle_timeout_millis", LogicalType::UBIGINT); + AddColumn(return_types, names, "reaper_thread_running", LogicalType::BOOLEAN); + AddColumn(return_types, names, "health_check_query", LogicalType::VARCHAR); + + return make_uniq(input.named_parameters); +} + +static unique_ptr ConfigurePoolInitGlobalState(ClientContext &, TableFunctionInitInput &) { + return make_uniq(); +} + +static unique_ptr ConfigurePoolInitLocalState(ExecutionContext &, TableFunctionInitInput &, + GlobalTableFunctionState *) { + return make_uniq(); +} + +static void ConfigurePoolFunction(ClientContext &context, TableFunctionInput &input, DataChunk &output) { + auto &bdata = input.bind_data->Cast(); + auto &lstate = input.local_state->Cast(); + + if (lstate.exec_state == ExecState::EXHAUSTED) { + output.SetCardinality(0); + return; + } + + // collect pools + std::vector cat_names; + std::vector> pools; + auto databases = DatabaseManager::Get(context).GetDatabases(context); + for (auto &db_ref : databases) { + auto &db = *db_ref; + auto &catalog = db.GetCatalog(); + if (catalog.GetCatalogType() != "postgres") { + continue; + } + if (!bdata.catalog_name.second && catalog.GetName() != bdata.catalog_name.first) { + continue; + } + cat_names.push_back(catalog.GetName()); + shared_ptr pool = catalog.Cast().GetConnectionPoolPtr(); + pools.emplace_back(std::move(pool)); + } + + // configure the single pool if specified + if (!bdata.catalog_name.second && pools.size() > 0) { + auto &pool = pools.at(0); + if (!bdata.max_connections.second) { + pool->SetMaxConnections(bdata.max_connections.first); + } + if (!bdata.wait_timeout_millis.second) { + pool->SetWaitTimeoutMillis(bdata.wait_timeout_millis.first); + } + if (!bdata.enable_thread_local_cache.second) { + pool->SetThreadLocalCacheEnabled(bdata.enable_thread_local_cache.first); + } + if (!bdata.max_lifetime_millis.second) { + pool->SetMaxLifetimeMillis(bdata.max_lifetime_millis.first); + } + if (!bdata.idle_timeout_millis.second) { + pool->SetIdleTimeoutMillis(bdata.idle_timeout_millis.first); + } + if (!bdata.enable_reaper_thread.second) { + if (bdata.enable_reaper_thread.first) { + pool->EnsureReaperRunning(); + } else { + pool->ShutdownReaper(); + } + } + if (!bdata.health_check_query.second) { + pool->SetHealthCheckQuery(bdata.health_check_query.first); + } + } + + // set results + idx_t row_idx = 0; + for (auto &pool : pools) { + idx_t col_idx = 0; + output.SetValue(col_idx++, row_idx, Value(cat_names.at(row_idx))); + output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetAvailableConnections())); + output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetMaxConnections())); + output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetWaitTimeoutMillis())); + output.SetValue(col_idx++, row_idx, Value::BOOLEAN(pool->IsThreadLocalCacheEnabled())); + output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetThreadLocalCacheHits())); + output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetThreadLocalCacheMisses())); + output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetMaxLifetimeMillis())); + output.SetValue(col_idx++, row_idx, Value::UBIGINT(pool->GetIdleTimeoutMillis())); + output.SetValue(col_idx++, row_idx, Value::BOOLEAN(pool->IsReaperRunning())); + output.SetValue(col_idx++, row_idx, Value(pool->GetHealthCheckQuery())); + row_idx++; + } + + output.SetCardinality(row_idx); + lstate.exec_state = ExecState::EXHAUSTED; +} + +PostgresConfigurePoolFunction::PostgresConfigurePoolFunction() + : TableFunction("postgres_configure_pool", std::vector(), ConfigurePoolFunction, ConfigurePoolBind, + ConfigurePoolInitGlobalState, ConfigurePoolInitLocalState) { + named_parameters["catalog_name"] = LogicalType::VARCHAR; + named_parameters["max_connections"] = LogicalType::UBIGINT; + named_parameters["wait_timeout_millis"] = LogicalType::UBIGINT; + named_parameters["enable_thread_local_cache"] = LogicalType::BOOLEAN; + named_parameters["max_lifetime_millis"] = LogicalType::UBIGINT; + named_parameters["idle_timeout_millis"] = LogicalType::UBIGINT; + named_parameters["enable_reaper_thread"] = LogicalType::BOOLEAN; + named_parameters["health_check_query"] = LogicalType::VARCHAR; +} + +} // namespace duckdb diff --git a/src/storage/postgres_connection_pool.cpp b/src/storage/postgres_connection_pool.cpp index c82d7da26..085fe3ad3 100644 --- a/src/storage/postgres_connection_pool.cpp +++ b/src/storage/postgres_connection_pool.cpp @@ -6,11 +6,11 @@ namespace duckdb { -static dbconnector::pool::ConnectionPoolConfig CreateConfig(PostgresCatalog &postgres_catalog); +static dbconnector::pool::ConnectionPoolConfig CreateConfig(ClientContext &context); -static std::string GetHealthCheckQueryFromConfig(PostgresCatalog &postgres_catalog) { +static std::string GetHealthCheckQueryFromConfig(ClientContext &context) { Value val; - if (postgres_catalog.GetDatabase().TryGetCurrentSetting("pg_pool_health_check_query", val)) { + if (context.TryGetCurrentSetting("pg_pool_health_check_query", val)) { if (val.IsNull()) { return std::string(); } @@ -19,9 +19,9 @@ static std::string GetHealthCheckQueryFromConfig(PostgresCatalog &postgres_catal return PostgresConnectionPool::DefaultHealthCheckQuery(); } -PostgresConnectionPool::PostgresConnectionPool(PostgresCatalog &postgres_catalog) - : dbconnector::pool::ConnectionPool(CreateConfig(postgres_catalog)), - postgres_catalog(postgres_catalog), health_check_query(GetHealthCheckQueryFromConfig(postgres_catalog)) { +PostgresConnectionPool::PostgresConnectionPool(PostgresCatalog &postgres_catalog, ClientContext &context) + : dbconnector::pool::ConnectionPool(CreateConfig(context)), postgres_catalog(postgres_catalog), + health_check_query(GetHealthCheckQueryFromConfig(context)) { } PostgresPoolConnection PostgresConnectionPool::ForceGetConnection() { @@ -81,17 +81,46 @@ std::string PostgresConnectionPool::DefaultHealthCheckQuery() { return "SELECT 1"; } -static dbconnector::pool::ConnectionPoolConfig CreateConfig(PostgresCatalog &postgres_catalog) { - DatabaseInstance &db = postgres_catalog.GetDatabase(); +static dbconnector::pool::ConnectionPoolConfig CreateConfig(ClientContext &ctx) { + dbconnector::pool::ConnectionPoolConfig config; - Value connection_limit; - uint64_t max_connections = PostgresConnectionPool::DefaultPoolSize(); - if (db.TryGetCurrentSetting("pg_connection_limit", connection_limit) && !connection_limit.IsNull()) { - max_connections = UBigIntValue::Get(connection_limit); + { + Value val; + if (ctx.TryGetCurrentSetting("pg_pool_max_connections", val) && !val.IsNull()) { + config.max_connections = UBigIntValue::Get(val); + } + } + { + Value val; + if (ctx.TryGetCurrentSetting("pg_pool_wait_timeout_millis", val) && !val.IsNull()) { + config.wait_timeout_millis = UBigIntValue::Get(val); + } + } + { + Value val; + if (ctx.TryGetCurrentSetting("pg_pool_enable_thread_local_cache", val) && !val.IsNull()) { + config.tl_cache_enabled = BooleanValue::Get(val); + } + } + { + Value val; + if (ctx.TryGetCurrentSetting("pg_pool_max_lifetime_millis", val) && !val.IsNull()) { + config.max_lifetime_millis = UBigIntValue::Get(val); + } + } + { + Value val; + if (ctx.TryGetCurrentSetting("pg_pool_idle_timeout_millis", val) && !val.IsNull()) { + config.idle_timeout_millis = UBigIntValue::Get(val); + } + } + { + Value val; + if (ctx.TryGetCurrentSetting("pg_pool_enable_reaper_thread", val) && !val.IsNull()) { + config.start_reaper_thread = BooleanValue::Get(val); + } } - dbconnector::pool::ConnectionPoolConfig config; - config.max_connections = max_connections; return config; } diff --git a/test/sql/storage/attach_connection_pool.test b/test/sql/storage/attach_connection_pool.test index 639fde915..6b39caaa8 100644 --- a/test/sql/storage/attach_connection_pool.test +++ b/test/sql/storage/attach_connection_pool.test @@ -60,9 +60,74 @@ SELECT COUNT(*) FROM connection_pool ---- 1000000 -# todo: pool introspection is required to check the effect statement ok -SET pg_pool_health_check_query = 'SELECT FAIL' +RESET threads statement ok -RESET pg_pool_health_check_query +USE memory + +statement ok +DETACH s + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +statement ok +USE s; + +statement ok +FROM postgres_configure_pool(catalog_name='s', enable_thread_local_cache=FALSE) + +query III +SELECT catalog_name, available_connections, thread_local_cache_enabled +FROM postgres_configure_pool(catalog_name='s') +---- +s 0 FALSE + +statement ok +SELECT catalog_name, health_check_query +FROM postgres_configure_pool(catalog_name='s', health_check_query='SELECT FAIL') + +query II +SELECT catalog_name, health_check_query +FROM postgres_configure_pool(catalog_name='s') +---- +s SELECT FAIL + +statement ok +CREATE OR REPLACE TABLE duckdb_connection_pool_test1(col1 INT) + +statement ok +SELECT catalog_name, available_connections +FROM postgres_configure_pool(catalog_name='s', enable_thread_local_cache=FALSE) + +query II +SELECT catalog_name, available_connections +FROM postgres_configure_pool(catalog_name='s') +---- +s 0 + +statement ok +SELECT catalog_name, health_check_query +FROM postgres_configure_pool(catalog_name='s', health_check_query='SELECT 42') + +query II +SELECT catalog_name, health_check_query +FROM postgres_configure_pool(catalog_name='s') +---- +s SELECT 42 + +statement ok +DROP TABLE duckdb_connection_pool_test1 + +query II +SELECT catalog_name, available_connections +FROM postgres_configure_pool(catalog_name='s') +---- +s 1 + +statement ok +USE memory + +statement ok +DETACH s diff --git a/test/sql/storage/attach_connection_pool_configure.test b/test/sql/storage/attach_connection_pool_configure.test new file mode 100644 index 000000000..c6b0a7447 --- /dev/null +++ b/test/sql/storage/attach_connection_pool_configure.test @@ -0,0 +1,130 @@ +# name: test/sql/storage/attach_connection_pool_configure.test +# description: Test connection pool live configuration +# group: [storage] + +require postgres_scanner + +require-env POSTGRES_TEST_DATABASE_AVAILABLE + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +statement ok +ATTACH 'dbname=postgresscanner' AS s1 (TYPE POSTGRES); + +query I +SELECT catalog_name +FROM postgres_configure_pool() +ORDER BY catalog_name +---- +s +s1 + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s') +---- +s + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s', max_connections=42) +---- +s + +query II +SELECT catalog_name, max_connections +FROM postgres_configure_pool(catalog_name='s') +---- +s 42 + +query II +SELECT catalog_name, max_connections +FROM postgres_configure_pool() +ORDER BY catalog_name +---- +s 42 +s1 8 + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s', wait_timeout_millis=42000) +---- +s + +query II +SELECT catalog_name, wait_timeout_millis +FROM postgres_configure_pool(catalog_name='s') +---- +s 42000 + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s', enable_thread_local_cache=FALSE) +---- +s + +query II +SELECT catalog_name, thread_local_cache_enabled +FROM postgres_configure_pool(catalog_name='s') +---- +s FALSE + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s', max_lifetime_millis=42000) +---- +s + +query II +SELECT catalog_name, max_lifetime_millis +FROM postgres_configure_pool(catalog_name='s') +---- +s 42000 + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s', idle_timeout_millis=42000) +---- +s + +query II +SELECT catalog_name, idle_timeout_millis +FROM postgres_configure_pool(catalog_name='s') +---- +s 42000 + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s', enable_reaper_thread=TRUE) +---- +s + +query II +SELECT catalog_name, reaper_thread_running +FROM postgres_configure_pool(catalog_name='s') +---- +s TRUE + +query I +SELECT catalog_name +FROM postgres_configure_pool(catalog_name='s', health_check_query='') +---- +s + +query II +SELECT catalog_name, health_check_query +FROM postgres_configure_pool(catalog_name='s') +---- +s (empty) + +statement error +FROM postgres_configure_pool(max_connections=42) +---- +'catalog_name' argument must be specified + +statement ok +DETACH s1 + +statement ok +DETACH s diff --git a/test/sql/storage/attach_connection_pool_options.test b/test/sql/storage/attach_connection_pool_options.test new file mode 100644 index 000000000..1d0e10086 --- /dev/null +++ b/test/sql/storage/attach_connection_pool_options.test @@ -0,0 +1,185 @@ +# name: test/sql/storage/attach_connection_pool_options.test +# description: Test connection pool options +# group: [storage] + +require postgres_scanner + +require-env POSTGRES_TEST_DATABASE_AVAILABLE + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +statement ok +DETACH s + +statement ok +SET pg_pool_max_connections = 42 + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, max_connections +FROM postgres_configure_pool(catalog_name='s') +---- +s 42 + +statement ok +RESET pg_pool_max_connections + +statement ok +DETACH s + +statement ok +SET pg_pool_wait_timeout_millis = 42000 + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, wait_timeout_millis +FROM postgres_configure_pool(catalog_name='s') +---- +s 42000 + +statement ok +RESET pg_pool_wait_timeout_millis + +statement ok +DETACH s + +statement ok +SET pg_pool_enable_thread_local_cache = FALSE + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, thread_local_cache_enabled +FROM postgres_configure_pool(catalog_name='s') +---- +s FALSE + +statement ok +RESET pg_pool_enable_thread_local_cache + +statement ok +DETACH s + +statement ok +SET pg_pool_max_lifetime_millis = 42000 + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, max_lifetime_millis +FROM postgres_configure_pool(catalog_name='s') +---- +s 42000 + +statement ok +RESET pg_pool_max_lifetime_millis + +statement ok +DETACH s + +statement ok +SET pg_pool_idle_timeout_millis = 42000 + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, idle_timeout_millis +FROM postgres_configure_pool(catalog_name='s') +---- +s 42000 + +statement ok +RESET pg_pool_idle_timeout_millis + +statement ok +DETACH s + +statement ok +SET pg_pool_enable_reaper_thread = TRUE + +statement ok +SET pg_pool_idle_timeout_millis = 42000 + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, reaper_thread_running +FROM postgres_configure_pool(catalog_name='s') +---- +s TRUE + +statement ok +RESET pg_pool_enable_reaper_thread + +statement ok +RESET pg_pool_idle_timeout_millis + +statement ok +DETACH s + +# legacy options + +statement ok +SET pg_connection_limit = 42 + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, max_connections +FROM postgres_configure_pool(catalog_name='s') +---- +s 42 + +query I +SELECT current_setting('pg_pool_max_connections') +---- +42 + +statement ok +RESET pg_connection_limit + +query I +SELECT current_setting('pg_pool_max_connections') +---- +8 + +statement ok +DETACH s + +statement ok +SET pg_connection_cache = FALSE + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +query II +SELECT catalog_name, max_connections +FROM postgres_configure_pool(catalog_name='s') +---- +s 0 + +query I +SELECT current_setting('pg_pool_max_connections') +---- +0 + +statement ok +RESET pg_connection_cache + +query I +SELECT current_setting('pg_pool_max_connections') +---- +8 + +statement ok +DETACH s