Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/include/storage/postgres_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#pragma once

#include <memory>

#include "duckdb/catalog/catalog.hpp"
#include "duckdb/common/enums/access_mode.hpp"
#include "postgres_connection.hpp"
Expand Down Expand Up @@ -81,6 +83,10 @@ class PostgresCatalog : public Catalog {
return *connection_pool;
}

shared_ptr<PostgresConnectionPool> GetConnectionPoolPtr() {
return connection_pool;
}

void ClearCache();

//! Whether or not this catalog should search a specific type with the standard priority
Expand Down
7 changes: 6 additions & 1 deletion src/include/storage/postgres_connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ using PostgresPoolConnection = dbconnector::pool::PooledConnection<PostgresConne

class PostgresConnectionPool : public dbconnector::pool::ConnectionPool<PostgresConnection> {
public:
PostgresConnectionPool(PostgresCatalog &postgres_catalog);
PostgresConnectionPool(PostgresCatalog &postgres_catalog, ClientContext &context);

public:
bool TryGetConnection(PostgresPoolConnection &connection);
Expand All @@ -52,4 +52,9 @@ class PostgresConnectionPool : public dbconnector::pool::ConnectionPool<Postgres
std::string health_check_query;
};

class PostgresConfigurePoolFunction : public TableFunction {
public:
PostgresConfigurePoolFunction();
};

} // namespace duckdb
87 changes: 81 additions & 6 deletions src/postgres_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "duckdb/main/database_manager.hpp"
#include "duckdb/main/attached_database.hpp"
#include "storage/postgres_catalog.hpp"
#include "storage/postgres_connection_pool.hpp"
#include "storage/postgres_optimizer.hpp"
#include "duckdb/planner/extension_callback.hpp"
#include "duckdb/main/client_context.hpp"
Expand Down Expand Up @@ -68,13 +69,22 @@ static void SetPostgresConnectionLimit(ClientContext &context, SetScope scope, V
}
auto &config = DBConfig::GetConfig(context);
config.SetOption("pg_connection_limit", parameter);

// propagate the value also to 'pg_pool_max_connections'
optional_ptr<const ConfigurationOption> option;
auto setting_index = config.TryGetSettingIndex("pg_pool_max_connections", option);
if (setting_index.IsValid()) {
context.config.user_settings.SetUserSetting(setting_index.GetIndex(), parameter);
}
}

static void DisablePool(ClientContext &context, SetScope scope, Value &parameter) {
if (scope == SetScope::LOCAL) {
throw InvalidInputException("pg_connection_cache can only be set globally");
}
if (parameter.IsNull() || BooleanValue::Get(parameter)) {
Value def_size = Value::UBIGINT(PostgresConnectionPool::DefaultPoolSize());
SetPostgresConnectionLimit(context, scope, def_size);
return;
}
Value zero = Value::UBIGINT(0);
Expand Down Expand Up @@ -137,6 +147,12 @@ void SetPostgresNullByteReplacement(ClientContext &context, SetScope scope, Valu
}
}

static std::string CreatePoolNote(const std::string &option) {
return std::string() + "This option only applies to newly attached Postgres databases, " +
"to configure a database that is already attached use " +
"\"FROM postgres_configure_pool(catalog_name='my_attached_postgres_db', " + option + ")\"";
}

static void LoadInternal(ExtensionLoader &loader) {
PostgresScanFunction postgres_fun;
loader.RegisterFunction(postgres_fun);
Expand All @@ -162,6 +178,9 @@ static void LoadInternal(ExtensionLoader &loader) {
PostgresReadBinaryFunction read_binary_func;
loader.RegisterFunction(read_binary_func);

PostgresConfigurePoolFunction configure_pool_function;
loader.RegisterFunction(configure_pool_function);

// Register the new type
SecretType secret_type;
secret_type.name = "postgres";
Expand All @@ -183,14 +202,25 @@ static void LoadInternal(ExtensionLoader &loader) {
LogicalType::BOOLEAN, Value::BOOLEAN(true));
config.AddExtensionOption("pg_pages_per_task", "The amount of pages per task", LogicalType::UBIGINT,
Value::UBIGINT(PostgresBindData::DEFAULT_PAGES_PER_TASK));
config.AddExtensionOption("pg_connection_limit", "The maximum amount of concurrent Postgres connections",
LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DefaultPoolSize()),
SetPostgresConnectionLimit);
config.AddExtensionOption(
"pg_connection_limit",
"The maximum amount of concurrent Postgres connections."
" This option is deprecated, instead use \"SET pg_pool_max_connections = 42\" for newly attached Postgres "
"databases and \"FROM postgres_configure_pool(catalog_name='my_attached_postgres_db', max_connections=42)\" "
"for "
"already attached Postgres databases.",
LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DefaultPoolSize()), SetPostgresConnectionLimit);
config.AddExtensionOption(
"pg_array_as_varchar", "Read Postgres arrays as varchar - enables reading mixed dimensional arrays",
LogicalType::BOOLEAN, Value::BOOLEAN(false), PostgresClearCacheFunction::ClearCacheOnSetting);
config.AddExtensionOption("pg_connection_cache", "Whether or not to use the connection cache", LogicalType::BOOLEAN,
Value::BOOLEAN(true), DisablePool);
config.AddExtensionOption(
"pg_connection_cache",
"Whether or not to use the connection pooling."
" This option is deprecated, instead to disable the connection pooling use \"SET pg_pool_max_connections=0\" "
"for newly attached Postgres databases and \"FROM "
"postgres_configure_pool(catalog_name='my_attached_postgres_db', "
"max_connections=0)\" for already attached Postgres databases.",
LogicalType::BOOLEAN, Value::BOOLEAN(true), DisablePool);
config.AddExtensionOption("pg_experimental_filter_pushdown", "Whether or not to use filter pushdown",
LogicalType::BOOLEAN, Value::BOOLEAN(true));
config.AddExtensionOption("pg_null_byte_replacement",
Expand All @@ -209,7 +239,52 @@ static void LoadInternal(ExtensionLoader &loader) {
"Postgres idle in transaction timeout in milliseconds to set on scan connections",
LogicalType::UINTEGER, Value());
// connection pool options
config.AddExtensionOption("pg_pool_health_check_query", "The query to use to check that the connection is healthy",
config.AddExtensionOption("pg_pool_max_connections",
"Maximum number of connections that are allowed to be cached in a connection pool for "
"each attached Postgres database. "
"This number can be temporary exceeded when parallel scans are used. " +
CreatePoolNote("max_connections=42"),
LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DefaultPoolSize()));
config.AddExtensionOption("pg_pool_wait_timeout_millis",
"Maximum number of milliseconds to wait when acquiring a connection from a pool where "
"all available connections are already taken. " +
CreatePoolNote("wait_timeout_millis=60000"),
LogicalType::UBIGINT,
Value::UBIGINT(dbconnector::pool::ConnectionPoolConfig().wait_timeout_millis));
config.AddExtensionOption(
"pg_pool_enable_thread_local_cache",
"Whether to enable the connection caching in thread-local cache. Such connections are getting pinned to the "
"threads and are not made available to other threads, while still taking the place in the pool. " +
CreatePoolNote("enable_thread_local_cache=FALSE"),
LogicalType::BOOLEAN, Value::BOOLEAN(dbconnector::pool::ConnectionPoolConfig().tl_cache_enabled));
config.AddExtensionOption("pg_pool_max_lifetime_millis",
"Maximum number of milliseconds the connection can be kept open. This number is checked "
"when the connection is taken from the pool and returned to the pool. "
"When the connection pool reaper thread is enabled ('pg_pool_enable_reaper_thread' "
"option), then this number is checked in background periodically. " +
CreatePoolNote("max_lifetime_millis=600000"),
LogicalType::UBIGINT,
Value::UBIGINT(dbconnector::pool::ConnectionPoolConfig().max_lifetime_millis));
config.AddExtensionOption("pg_pool_idle_timeout_millis",
"Maximum number of milliseconds the connection can be kept idle in the pool. This number "
"is checked when the connection is taken from the pool. "
"When the connection pool reaper thread is enabled ('pg_pool_enable_reaper_thread' "
"option), then this number is checked in background periodically. " +
CreatePoolNote("idle_timeout_millis=300000"),
LogicalType::UBIGINT,
Value::UBIGINT(dbconnector::pool::ConnectionPoolConfig().idle_timeout_millis));
config.AddExtensionOption(
"pg_pool_enable_reaper_thread",
"Whether to enable the connection pool reaper thread, that periodically scans the pool to check the "
"'max_lifetime_millis' and 'idle_timeout_millis' and closes the connection which exceed the specified values. "
"Either 'max_lifetime_millis' or 'idle_timeout_millis' must be set to a non-zero value for this option to be "
"effective. " +
CreatePoolNote("enable_reaper_thread=TRUE"),
LogicalType::BOOLEAN, Value::BOOLEAN(dbconnector::pool::ConnectionPoolConfig().start_reaper_thread));
config.AddExtensionOption("pg_pool_health_check_query",
"The query that is used to check that the connection is healthy. Setting this option to "
"an empty string disables the health check. " +
CreatePoolNote("health_check_query=SELECT 42"),
LogicalType::VARCHAR, PostgresConnectionPool::DefaultHealthCheckQuery());

OptimizerExtension postgres_optimizer;
Expand Down
1 change: 1 addition & 0 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ add_library(
postgres_catalog_set.cpp
postgres_connection_pool.cpp
postgres_clear_cache.cpp
postgres_configure_pool.cpp
postgres_delete.cpp
postgres_index.cpp
postgres_index_entry.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/storage/postgres_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, string connection_strin
ClientContext &context)
: Catalog(db_p), connection_string(std::move(connection_string_p)), attach_path(std::move(attach_path_p)),
access_mode(access_mode), isolation_level(isolation_level), schemas(*this, schema_to_load),
connection_pool(make_shared_ptr<PostgresConnectionPool>(*this)), default_schema(schema_to_load) {
connection_pool(make_shared_ptr<PostgresConnectionPool>(*this, context)), default_schema(schema_to_load) {
if (default_schema.empty()) {
default_schema = "public";
}
Expand Down
Loading
Loading