Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
[submodule "extension-ci-tools"]
path = extension-ci-tools
url = https://github.com/duckdb/extension-ci-tools.git
[submodule "database-connector"]
path = database-connector
url = https://github.com/duckdb/database-connector.git
8 changes: 6 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ set(CMAKE_C_FLAGS_RELEASE
set(CMAKE_C_FLAGS_RELWITHDEBINFO
"${CMAKE_C_FLAGS_RELWITHDEBINFO} ${POSTGRES_SCANNER_EXTRA_CFLAGS}")

include_directories(include postgres/src/include postgres/src/backend
postgres/src/interfaces/libpq ${OPENSSL_INCLUDE_DIR})
include_directories(
include
database-connector/src/include
postgres/src/include postgres/src/backend
postgres/src/interfaces/libpq
${OPENSSL_INCLUDE_DIR})

if(WIN32)
include_directories(postgres/src/include/port/win32 postgres/src/port
Expand Down
1 change: 1 addition & 0 deletions database-connector
Submodule database-connector added at 431b80
2 changes: 2 additions & 0 deletions src/include/postgres_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class PostgresConnection {

bool IsOpen();
void Close();
bool PingServer();
void Reset();

shared_ptr<OwnedPostgresConnection> GetConnection() {
return connection;
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/postgres_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class PostgresCatalog : public Catalog {
string GetDBPath() override;

PostgresConnectionPool &GetConnectionPool() {
return connection_pool;
return *connection_pool;
}

void ClearCache();
Expand All @@ -103,7 +103,7 @@ class PostgresCatalog : public Catalog {
private:
PostgresVersion version;
PostgresSchemaSet schemas;
PostgresConnectionPool connection_pool;
shared_ptr<PostgresConnectionPool> connection_pool;
string default_schema;
};

Expand Down
45 changes: 13 additions & 32 deletions src/include/storage/postgres_connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,57 +11,38 @@
#include "duckdb/common/common.hpp"
#include "duckdb/common/mutex.hpp"
#include "duckdb/common/optional_ptr.hpp"

#include "dbconnector/pool.hpp"

#include "postgres_connection.hpp"

namespace duckdb {
class PostgresCatalog;
class PostgresConnectionPool;

class PostgresPoolConnection {
public:
PostgresPoolConnection();
PostgresPoolConnection(optional_ptr<PostgresConnectionPool> pool, PostgresConnection connection);
~PostgresPoolConnection();
// disable copy constructors
PostgresPoolConnection(const PostgresPoolConnection &other) = delete;
PostgresPoolConnection &operator=(const PostgresPoolConnection &) = delete;
//! enable move constructors
PostgresPoolConnection(PostgresPoolConnection &&other) noexcept;
PostgresPoolConnection &operator=(PostgresPoolConnection &&) noexcept;

bool HasConnection();
PostgresConnection &GetConnection();

private:
optional_ptr<PostgresConnectionPool> pool;
PostgresConnection connection;
};
using PostgresPoolConnection = dbconnector::pool::PooledConnection<PostgresConnection>;

class PostgresConnectionPool {
class PostgresConnectionPool : public dbconnector::pool::ConnectionPool<PostgresConnection> {
public:
static constexpr const idx_t DEFAULT_MAX_CONNECTIONS = 64;

PostgresConnectionPool(PostgresCatalog &postgres_catalog, idx_t maximum_connections = DEFAULT_MAX_CONNECTIONS);
PostgresConnectionPool(PostgresCatalog &postgres_catalog, idx_t maximum_connections = DefaultPoolSize());

public:
bool TryGetConnection(PostgresPoolConnection &connection);
PostgresPoolConnection GetConnection();
//! Always returns a connection - even if the connection slots are exhausted
PostgresPoolConnection ForceGetConnection();
void ReturnConnection(PostgresConnection connection);
void SetMaximumConnections(idx_t new_max);

static void PostgresSetConnectionCache(ClientContext &context, SetScope scope, Value &parameter);
static idx_t DefaultPoolSize() noexcept;

protected:
std::unique_ptr<PostgresConnection> CreateNewConnection() override;
bool CheckConnectionHealthy(PostgresConnection &conn) override;
void ResetConnection(PostgresConnection &conn) override;

private:
PostgresCatalog &postgres_catalog;
mutex connection_lock;
idx_t active_connections;
idx_t maximum_connections;
vector<PostgresConnection> connection_cache;

private:
PostgresPoolConnection GetConnectionInternal(unique_lock<mutex> &lock);
static dbconnector::pool::ConnectionPoolConfig CreateConfig(idx_t max_connections);
};

} // namespace duckdb
35 changes: 35 additions & 0 deletions src/postgres_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,41 @@ void PostgresConnection::Close() {
connection = nullptr;
}

bool PostgresConnection::PingServer() {
if (!IsOpen()) {
return false;
}
PGconn *conn = GetConn();
if (PQstatus(conn) != CONNECTION_OK) {
return false;
}
PGresult *res = PQexec(conn, "SELECT 1");
PostgresResult res_holder(res);
return PQresultStatus(res) == PGRES_TUPLES_OK;
}

void PostgresConnection::Reset() {
if (!IsOpen()) {
throw InternalException("Cannot reset a connection that is not open");
}
PGconn *conn = GetConn();
{
PGresult *res = PQexec(conn, "ROLLBACK");
PostgresResult res_holder(res);
}
{
PGresult *res = PQexec(conn, "DISCARD ALL");
PostgresResult res_holder(res);
if (PQresultStatus(res) == PGRES_COMMAND_OK) {
return;
}
}
PQreset(conn);
if (!PingServer()) {
throw InternalException("Connection reset failure");
}
}

vector<IndexInfo> PostgresConnection::GetIndexInfo(const string &table_name) {
return vector<IndexInfo>();
}
Expand Down
17 changes: 14 additions & 3 deletions src/postgres_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,23 @@ static void SetPostgresConnectionLimit(ClientContext &context, SetScope scope, V
if (catalog.GetCatalogType() != "postgres") {
continue;
}
catalog.Cast<PostgresCatalog>().GetConnectionPool().SetMaximumConnections(UBigIntValue::Get(parameter));
catalog.Cast<PostgresCatalog>().GetConnectionPool().SetMaxConnections(UBigIntValue::Get(parameter));
}
auto &config = DBConfig::GetConfig(context);
config.SetOption("pg_connection_limit", parameter);
}

static void DisablePool(ClientContext &context, SetScope scope, Value &parameter) {
if (scope == SetScope::LOCAL) {
throw InvalidInputException("pg_connection_cache can only be set globally");
}
if (parameter.IsNull() || BooleanValue::Get(parameter)) {
return;
}
Value zero = Value::UBIGINT(0);
SetPostgresConnectionLimit(context, scope, zero);
}

static void SetPostgresDebugQueryPrint(ClientContext &context, SetScope scope, Value &parameter) {
PostgresConnection::DebugSetPrintQueries(BooleanValue::Get(parameter));
}
Expand Down Expand Up @@ -173,13 +184,13 @@ static void LoadInternal(ExtensionLoader &loader) {
config.AddExtensionOption("pg_pages_per_task", "The amount of pages per task", LogicalType::UBIGINT,
Value::UBIGINT(PostgresBindData::DEFAULT_PAGES_PER_TASK));
config.AddExtensionOption("pg_connection_limit", "The maximum amount of concurrent Postgres connections",
LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DEFAULT_MAX_CONNECTIONS),
LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DefaultPoolSize()),
SetPostgresConnectionLimit);
config.AddExtensionOption(
"pg_array_as_varchar", "Read Postgres arrays as varchar - enables reading mixed dimensional arrays",
LogicalType::BOOLEAN, Value::BOOLEAN(false), PostgresClearCacheFunction::ClearCacheOnSetting);
config.AddExtensionOption("pg_connection_cache", "Whether or not to use the connection cache", LogicalType::BOOLEAN,
Value::BOOLEAN(true), PostgresConnectionPool::PostgresSetConnectionCache);
Value::BOOLEAN(true), DisablePool);
config.AddExtensionOption("pg_experimental_filter_pushdown", "Whether or not to use filter pushdown",
LogicalType::BOOLEAN, Value::BOOLEAN(true));
config.AddExtensionOption("pg_null_byte_replacement",
Expand Down
6 changes: 3 additions & 3 deletions src/storage/postgres_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, string connection_strin
ClientContext &context)
: Catalog(db_p), connection_string(std::move(connection_string_p)), attach_path(std::move(attach_path_p)),
access_mode(access_mode), isolation_level(isolation_level), schemas(*this, schema_to_load),
connection_pool(*this), default_schema(schema_to_load) {
connection_pool(make_shared_ptr<PostgresConnectionPool>(*this)), default_schema(schema_to_load) {
if (default_schema.empty()) {
default_schema = "public";
}
Value connection_limit;
auto &db_instance = db_p.GetDatabase();
if (db_instance.TryGetCurrentSetting("pg_connection_limit", connection_limit)) {
connection_pool.SetMaximumConnections(UBigIntValue::Get(connection_limit));
connection_pool->SetMaxConnections(UBigIntValue::Get(connection_limit));
}

auto connection = connection_pool.GetConnection();
auto connection = connection_pool->GetConnection();
this->version = connection.GetConnection().GetPostgresVersion(context);
}

Expand Down
Loading
Loading