Skip to content

Commit 2d340fd

Browse files
committed
Use connection pool from common repo
This PR changes the connection pool implementation to use the one from the [database-connector](https://github.com/duckdb/database-connector) repository. The user-facing functionality stays the same with the following changes: - default pool size is changed from `64` to `max(num_cpus, 8)` - `pg_connection_cache` setting now can be used to disable the pool, but not to re-enable it; `pg_connection_limit` setting should be set to positive number to enable back the pool that was disabled. - when the pool is enabled and all connection slots are already used, the request to get the connection from the pool will try to acquire the connection from the pool for 30 seconds before throwing an error. More pool settings are planned to be exposed in subsequent PRs.
1 parent b3e5d65 commit 2d340fd

File tree

10 files changed

+113
-164
lines changed

10 files changed

+113
-164
lines changed

.gitmodules

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,6 @@
44
[submodule "extension-ci-tools"]
55
path = extension-ci-tools
66
url = https://github.com/duckdb/extension-ci-tools.git
7+
[submodule "database-connector"]
8+
path = database-connector
9+
url = https://github.com/duckdb/database-connector.git

CMakeLists.txt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@ set(CMAKE_C_FLAGS_RELEASE
2828
set(CMAKE_C_FLAGS_RELWITHDEBINFO
2929
"${CMAKE_C_FLAGS_RELWITHDEBINFO} ${POSTGRES_SCANNER_EXTRA_CFLAGS}")
3030

31-
include_directories(include postgres/src/include postgres/src/backend
32-
postgres/src/interfaces/libpq ${OPENSSL_INCLUDE_DIR})
31+
include_directories(
32+
include
33+
database-connector/src/include
34+
postgres/src/include postgres/src/backend
35+
postgres/src/interfaces/libpq
36+
${OPENSSL_INCLUDE_DIR})
3337

3438
if(WIN32)
3539
include_directories(postgres/src/include/port/win32 postgres/src/port

database-connector

Submodule database-connector added at 431b80b

src/include/postgres_connection.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ class PostgresConnection {
7373

7474
bool IsOpen();
7575
void Close();
76+
bool PingServer();
77+
void Reset();
7678

7779
shared_ptr<OwnedPostgresConnection> GetConnection() {
7880
return connection;

src/include/storage/postgres_catalog.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class PostgresCatalog : public Catalog {
7878
string GetDBPath() override;
7979

8080
PostgresConnectionPool &GetConnectionPool() {
81-
return connection_pool;
81+
return *connection_pool;
8282
}
8383

8484
void ClearCache();
@@ -103,7 +103,7 @@ class PostgresCatalog : public Catalog {
103103
private:
104104
PostgresVersion version;
105105
PostgresSchemaSet schemas;
106-
PostgresConnectionPool connection_pool;
106+
shared_ptr<PostgresConnectionPool> connection_pool;
107107
string default_schema;
108108
};
109109

src/include/storage/postgres_connection_pool.hpp

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,57 +11,38 @@
1111
#include "duckdb/common/common.hpp"
1212
#include "duckdb/common/mutex.hpp"
1313
#include "duckdb/common/optional_ptr.hpp"
14+
15+
#include "dbconnector/pool.hpp"
16+
1417
#include "postgres_connection.hpp"
1518

1619
namespace duckdb {
1720
class PostgresCatalog;
1821
class PostgresConnectionPool;
1922

20-
class PostgresPoolConnection {
21-
public:
22-
PostgresPoolConnection();
23-
PostgresPoolConnection(optional_ptr<PostgresConnectionPool> pool, PostgresConnection connection);
24-
~PostgresPoolConnection();
25-
// disable copy constructors
26-
PostgresPoolConnection(const PostgresPoolConnection &other) = delete;
27-
PostgresPoolConnection &operator=(const PostgresPoolConnection &) = delete;
28-
//! enable move constructors
29-
PostgresPoolConnection(PostgresPoolConnection &&other) noexcept;
30-
PostgresPoolConnection &operator=(PostgresPoolConnection &&) noexcept;
31-
32-
bool HasConnection();
33-
PostgresConnection &GetConnection();
34-
35-
private:
36-
optional_ptr<PostgresConnectionPool> pool;
37-
PostgresConnection connection;
38-
};
23+
using PostgresPoolConnection = dbconnector::pool::PooledConnection<PostgresConnection>;
3924

40-
class PostgresConnectionPool {
25+
class PostgresConnectionPool : public dbconnector::pool::ConnectionPool<PostgresConnection> {
4126
public:
42-
static constexpr const idx_t DEFAULT_MAX_CONNECTIONS = 64;
43-
44-
PostgresConnectionPool(PostgresCatalog &postgres_catalog, idx_t maximum_connections = DEFAULT_MAX_CONNECTIONS);
27+
PostgresConnectionPool(PostgresCatalog &postgres_catalog, idx_t maximum_connections = DefaultPoolSize());
4528

4629
public:
4730
bool TryGetConnection(PostgresPoolConnection &connection);
4831
PostgresPoolConnection GetConnection();
4932
//! Always returns a connection - even if the connection slots are exhausted
5033
PostgresPoolConnection ForceGetConnection();
51-
void ReturnConnection(PostgresConnection connection);
52-
void SetMaximumConnections(idx_t new_max);
5334

54-
static void PostgresSetConnectionCache(ClientContext &context, SetScope scope, Value &parameter);
35+
static idx_t DefaultPoolSize() noexcept;
36+
37+
protected:
38+
std::unique_ptr<PostgresConnection> CreateNewConnection() override;
39+
bool CheckConnectionHealthy(PostgresConnection &conn) override;
40+
void ResetConnection(PostgresConnection &conn) override;
5541

5642
private:
5743
PostgresCatalog &postgres_catalog;
58-
mutex connection_lock;
59-
idx_t active_connections;
60-
idx_t maximum_connections;
61-
vector<PostgresConnection> connection_cache;
6244

63-
private:
64-
PostgresPoolConnection GetConnectionInternal(unique_lock<mutex> &lock);
45+
static dbconnector::pool::ConnectionPoolConfig CreateConfig(idx_t max_connections);
6546
};
6647

6748
} // namespace duckdb

src/postgres_connection.cpp

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,41 @@ void PostgresConnection::Close() {
186186
connection = nullptr;
187187
}
188188

189+
bool PostgresConnection::PingServer() {
190+
if (!IsOpen()) {
191+
return false;
192+
}
193+
PGconn *conn = GetConn();
194+
if (PQstatus(conn) != CONNECTION_OK) {
195+
return false;
196+
}
197+
PGresult *res = PQexec(conn, "SELECT 1");
198+
PostgresResult res_holder(res);
199+
return PQresultStatus(res) == PGRES_TUPLES_OK;
200+
}
201+
202+
void PostgresConnection::Reset() {
203+
if (!IsOpen()) {
204+
throw InternalException("Cannot reset a connection that is not open");
205+
}
206+
PGconn *conn = GetConn();
207+
{
208+
PGresult *res = PQexec(conn, "ROLLBACK");
209+
PostgresResult res_holder(res);
210+
}
211+
{
212+
PGresult *res = PQexec(conn, "DISCARD ALL");
213+
PostgresResult res_holder(res);
214+
if (PQresultStatus(res) == PGRES_COMMAND_OK) {
215+
return;
216+
}
217+
}
218+
PQreset(conn);
219+
if (!PingServer()) {
220+
throw InternalException("Connection reset failure");
221+
}
222+
}
223+
189224
vector<IndexInfo> PostgresConnection::GetIndexInfo(const string &table_name) {
190225
return vector<IndexInfo>();
191226
}

src/postgres_extension.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,23 @@ static void SetPostgresConnectionLimit(ClientContext &context, SetScope scope, V
6464
if (catalog.GetCatalogType() != "postgres") {
6565
continue;
6666
}
67-
catalog.Cast<PostgresCatalog>().GetConnectionPool().SetMaximumConnections(UBigIntValue::Get(parameter));
67+
catalog.Cast<PostgresCatalog>().GetConnectionPool().SetMaxConnections(UBigIntValue::Get(parameter));
6868
}
6969
auto &config = DBConfig::GetConfig(context);
7070
config.SetOption("pg_connection_limit", parameter);
7171
}
7272

73+
static void DisablePool(ClientContext &context, SetScope scope, Value &parameter) {
74+
if (scope == SetScope::LOCAL) {
75+
throw InvalidInputException("pg_connection_cache can only be set globally");
76+
}
77+
if (parameter.IsNull() || BooleanValue::Get(parameter)) {
78+
return;
79+
}
80+
Value zero = Value::UBIGINT(0);
81+
SetPostgresConnectionLimit(context, scope, zero);
82+
}
83+
7384
static void SetPostgresDebugQueryPrint(ClientContext &context, SetScope scope, Value &parameter) {
7485
PostgresConnection::DebugSetPrintQueries(BooleanValue::Get(parameter));
7586
}
@@ -173,13 +184,13 @@ static void LoadInternal(ExtensionLoader &loader) {
173184
config.AddExtensionOption("pg_pages_per_task", "The amount of pages per task", LogicalType::UBIGINT,
174185
Value::UBIGINT(PostgresBindData::DEFAULT_PAGES_PER_TASK));
175186
config.AddExtensionOption("pg_connection_limit", "The maximum amount of concurrent Postgres connections",
176-
LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DEFAULT_MAX_CONNECTIONS),
187+
LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DefaultPoolSize()),
177188
SetPostgresConnectionLimit);
178189
config.AddExtensionOption(
179190
"pg_array_as_varchar", "Read Postgres arrays as varchar - enables reading mixed dimensional arrays",
180191
LogicalType::BOOLEAN, Value::BOOLEAN(false), PostgresClearCacheFunction::ClearCacheOnSetting);
181192
config.AddExtensionOption("pg_connection_cache", "Whether or not to use the connection cache", LogicalType::BOOLEAN,
182-
Value::BOOLEAN(true), PostgresConnectionPool::PostgresSetConnectionCache);
193+
Value::BOOLEAN(true), DisablePool);
183194
config.AddExtensionOption("pg_experimental_filter_pushdown", "Whether or not to use filter pushdown",
184195
LogicalType::BOOLEAN, Value::BOOLEAN(true));
185196
config.AddExtensionOption("pg_null_byte_replacement",

src/storage/postgres_catalog.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,17 @@ PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, string connection_strin
1515
ClientContext &context)
1616
: Catalog(db_p), connection_string(std::move(connection_string_p)), attach_path(std::move(attach_path_p)),
1717
access_mode(access_mode), isolation_level(isolation_level), schemas(*this, schema_to_load),
18-
connection_pool(*this), default_schema(schema_to_load) {
18+
connection_pool(make_shared_ptr<PostgresConnectionPool>(*this)), default_schema(schema_to_load) {
1919
if (default_schema.empty()) {
2020
default_schema = "public";
2121
}
2222
Value connection_limit;
2323
auto &db_instance = db_p.GetDatabase();
2424
if (db_instance.TryGetCurrentSetting("pg_connection_limit", connection_limit)) {
25-
connection_pool.SetMaximumConnections(UBigIntValue::Get(connection_limit));
25+
connection_pool->SetMaxConnections(UBigIntValue::Get(connection_limit));
2626
}
2727

28-
auto connection = connection_pool.GetConnection();
28+
auto connection = connection_pool->GetConnection();
2929
this->version = connection.GetConnection().GetPostgresVersion(context);
3030
}
3131

0 commit comments

Comments
 (0)