Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions hazelcast/include/hazelcast/client/client_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class HAZELCAST_API client_properties

const client_property& get_internal_executor_pool_size() const;

const client_property& get_io_thread_count() const;

const client_property& get_shuffle_member_list() const;

const client_property& get_max_concurrent_invocations() const;
Expand Down Expand Up @@ -181,6 +183,17 @@ class HAZELCAST_API client_properties
static const std::string INTERNAL_EXECUTOR_POOL_SIZE;
static const std::string INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT;

/**
* Number of IO threads for the networking layer.
* Each IO thread runs its own Boost.Asio io_context event loop.
* Connections are distributed across IO threads in round-robin fashion.
* This is the C++ equivalent of Java's IO_INPUT_THREAD_COUNT +
* IO_OUTPUT_THREAD_COUNT (unified because Asio's proactor pattern handles
* both read and write directions per thread without blocking).
*/
static const std::string IO_THREAD_COUNT;
static const std::string IO_THREAD_COUNT_DEFAULT;

/**
* Client shuffles the given member list to prevent all clients to connect
* to the same node when this property is set to true. When it is set to
Expand Down Expand Up @@ -315,6 +328,7 @@ class HAZELCAST_API client_properties
client_property invocation_timeout_seconds_;
client_property event_thread_count_;
client_property internal_executor_pool_size_;
client_property io_thread_count_;
client_property shuffle_member_list_;
client_property max_concurrent_invocations_;
client_property backpressure_backoff_timeout_millis_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ class HAZELCAST_API ClientConnectionManagerImpl
static const endpoint_qualifier PUBLIC_ENDPOINT_QUALIFIER;
static constexpr int SQL_CONNECTION_RANDOM_ATTEMPTS = 10;
static constexpr byte ALL_MEMBERS_ROUTING = 1;
static constexpr int DEFAULT_IO_THREAD_COUNT = 3;
static constexpr int SMALL_MACHINE_PROCESSOR_COUNT = 8;

struct auth_response
{
Expand Down Expand Up @@ -277,11 +279,20 @@ class HAZELCAST_API ClientConnectionManagerImpl
const std::shared_ptr<Connection>& connection,
auth_response& response);

int find_thread_count(int configured_thread_count) const;

std::atomic_bool alive_;
logger& logger_;
std::chrono::milliseconds connection_timeout_millis_;
spi::ClientContext& client_;
std::unique_ptr<boost::asio::io_context> io_context_;
std::vector<std::unique_ptr<boost::asio::io_context>> io_contexts_;
std::vector<std::unique_ptr<boost::asio::ip::tcp::resolver>> io_resolvers_;
std::unique_ptr<internal::socket::SocketFactory> socket_factory_;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has moved (wasn't updated). Any good reason?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no reason except that socket factory is using io_context and resolver in its methods, they are related but move is a random choice, I can revert it, no problem.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a massive problem, just wanted to check my understanding.

std::vector<std::thread> io_threads_;
std::vector<std::unique_ptr<
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>>>
io_guards_;
std::atomic<size_t> next_io_index_{ 0 };
socket_interceptor socket_interceptor_;
util::SynchronizedMap<member, bool> connecting_members_;
// TODO: change with CopyOnWriteArraySet<ConnectionListener> as in Java
Expand All @@ -291,13 +302,7 @@ class HAZELCAST_API ClientConnectionManagerImpl
bool shuffle_member_list_;
std::unique_ptr<AddressProvider> address_provider_;
std::atomic<int32_t> connection_id_gen_;
std::unique_ptr<boost::asio::ip::tcp::resolver> io_resolver_;
std::unique_ptr<internal::socket::SocketFactory> socket_factory_;
HeartbeatManager heartbeat_;
std::thread io_thread_;
std::unique_ptr<
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>>
io_guard_;
const bool async_start_;
const config::client_connection_strategy_config::reconnect_mode
reconnect_mode_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ class HAZELCAST_API Connection
int32_t connection_id,
internal::socket::SocketFactory& socket_factory,
ClientConnectionManagerImpl& client_connection_manager,
std::chrono::milliseconds& connect_timeout_in_millis);
std::chrono::milliseconds& connect_timeout_in_millis,
boost::asio::io_context& io,
boost::asio::ip::tcp::resolver& resolver);

~Connection() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,18 @@ namespace socket {
class HAZELCAST_API SocketFactory
{
public:
SocketFactory(spi::ClientContext& client_context,
boost::asio::io_context& io,
boost::asio::ip::tcp::resolver& resolver);
SocketFactory(spi::ClientContext& client_context);

bool start();

std::unique_ptr<hazelcast::client::socket> create(
const address& address,
std::chrono::milliseconds& connect_timeout_in_millis);
std::chrono::milliseconds& connect_timeout_in_millis,
boost::asio::io_context& io,
boost::asio::ip::tcp::resolver& resolver);

private:
spi::ClientContext& client_context_;
boost::asio::io_context& io_;
boost::asio::ip::tcp::resolver& io_resolver_;
#ifdef HZ_BUILD_WITH_SSL
std::shared_ptr<boost::asio::ssl::context> ssl_context_;
#endif
Expand Down
11 changes: 11 additions & 0 deletions hazelcast/src/hazelcast/client/client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,10 @@ const std::string client_properties::INTERNAL_EXECUTOR_POOL_SIZE =
"hazelcast.client.internal.executor.pool.size";
const std::string client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT = "3";

const std::string client_properties::IO_THREAD_COUNT =
"hazelcast.client.io.thread.count";
const std::string client_properties::IO_THREAD_COUNT_DEFAULT = "3";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if there was documentation to explain why 3 was chosen. Even if it just links to the config in the Java client or something.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a study conducted a while ago reported at here which gives details about the choice of 3.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we link to that in the code to help a future maintainer?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is internal.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, doesn't cpp client verify the public configuration? What do we expect when count is meaningless?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is internal.

I understand that it won't help anyone outside of Hazelcast.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emreyigit good catch. Added the check 14f9ca3


const std::string client_properties::SHUFFLE_MEMBER_LIST =
"hazelcast.client.shuffle.member.list";
const std::string client_properties::SHUFFLE_MEMBER_LIST_DEFAULT = "true";
Expand Down Expand Up @@ -1060,6 +1064,7 @@ client_properties::client_properties(
, event_thread_count_(EVENT_THREAD_COUNT, EVENT_THREAD_COUNT_DEFAULT)
, internal_executor_pool_size_(INTERNAL_EXECUTOR_POOL_SIZE,
INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT)
, io_thread_count_(IO_THREAD_COUNT, IO_THREAD_COUNT_DEFAULT)
, shuffle_member_list_(SHUFFLE_MEMBER_LIST, SHUFFLE_MEMBER_LIST_DEFAULT)
, max_concurrent_invocations_(MAX_CONCURRENT_INVOCATIONS,
MAX_CONCURRENT_INVOCATIONS_DEFAULT)
Expand Down Expand Up @@ -1122,6 +1127,12 @@ client_properties::get_internal_executor_pool_size() const
return internal_executor_pool_size_;
}

const client_property&
client_properties::get_io_thread_count() const
{
return io_thread_count_;
}

const client_property&
client_properties::get_shuffle_member_list() const
{
Expand Down
107 changes: 80 additions & 27 deletions hazelcast/src/hazelcast/client/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,24 +114,33 @@ ClientConnectionManagerImpl::start()
return false;
}

io_context_.reset(new boost::asio::io_context);
io_resolver_.reset(
new boost::asio::ip::tcp::resolver(io_context_->get_executor()));
socket_factory_.reset(new internal::socket::SocketFactory(
client_, *io_context_, *io_resolver_));
auto guard = boost::asio::make_work_guard(*io_context_);
io_guard_ = std::unique_ptr<
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>>(
new boost::asio::executor_work_guard<
boost::asio::io_context::executor_type>(std::move(guard)));
auto& props = client_.get_client_properties();
int configured_io_thread_count =
props.get_integer(props.get_io_thread_count());
int io_thread_count = find_thread_count(configured_io_thread_count);

socket_factory_.reset(new internal::socket::SocketFactory(client_));

if (!socket_factory_->start()) {
return false;
}

socket_interceptor_ = client_.get_client_config().get_socket_interceptor();

io_thread_ = std::thread([=]() { io_context_->run(); });
for (int i = 0; i < io_thread_count; ++i) {
auto ctx =
std::unique_ptr<boost::asio::io_context>(new boost::asio::io_context);
io_guards_.push_back(std::unique_ptr<boost::asio::executor_work_guard<
boost::asio::io_context::executor_type>>(
new boost::asio::executor_work_guard<
boost::asio::io_context::executor_type>(
boost::asio::make_work_guard(*ctx))));
io_resolvers_.push_back(std::unique_ptr<boost::asio::ip::tcp::resolver>(
new boost::asio::ip::tcp::resolver(ctx->get_executor())));
auto raw_ctx = ctx.get();
io_contexts_.push_back(std::move(ctx));
io_threads_.emplace_back([raw_ctx]() { raw_ctx->run(); });
}
Comment on lines +130 to +143
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely my inexperience, but this looks... intense.
Could it be more readable / commented?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, that is how C++ code looks. basically we are initializing the vectors and starting the io threads. Lock guards are used for finishing the io threads running. I will put more comments.


executor_.reset(
new hazelcast::util::hz_thread_pool(EXECUTOR_CORE_POOL_SIZE));
Expand Down Expand Up @@ -192,13 +201,27 @@ ClientConnectionManagerImpl::shutdown()
spi::impl::ClientExecutionServiceImpl::shutdown_thread_pool(
executor_.get());

// release the guard so that the io thread can stop gracefully
io_guard_.reset();
io_thread_.join();
// release the guards so that the io threads can stop gracefully
for (auto& guard : io_guards_) {
guard.reset();
}
for (auto& thread : io_threads_) {
if (thread.joinable()) {
thread.join();
}
}

// Release internal bookkeeping references to connections and listeners.
// io_resolvers_ and io_contexts_ are intentionally NOT cleared here:
// user code (e.g. transaction_context) may still hold
// shared_ptr<Connection> objects whose socket/backup_timer destructors
// reference these io_contexts. Leaving the io_contexts alive until
// ~ClientConnectionManagerImpl() ensures they outlive any such lingering
// Connection references.
connection_listeners_.clear();
active_connections_.clear();
active_connection_ids_.clear();
io_guards_.clear();
}

std::shared_ptr<Connection>
Expand Down Expand Up @@ -902,6 +925,24 @@ ClientConnectionManagerImpl::on_authenticated(
return connection;
}

int
ClientConnectionManagerImpl::find_thread_count(
int configured_thread_count) const
{
if (configured_thread_count != -1) {
return configured_thread_count;
}

// uni-socket client
if (!smart_routing_enabled_) {
return 1;
}

return (util::get_available_core_count() > SMALL_MACHINE_PROCESSOR_COUNT)
? DEFAULT_IO_THREAD_COUNT
: 1;
}

void
ClientConnectionManagerImpl::fire_life_cycle_event(
lifecycle_event::lifecycle_state state)
Expand Down Expand Up @@ -1062,12 +1103,15 @@ ClientConnectionManagerImpl::connect(const address& addr)
info,
boost::str(boost::format("Trying to connect to %1%.") % addr));

auto idx = next_io_index_.fetch_add(1) % io_contexts_.size();
auto connection = std::make_shared<Connection>(addr,
client_,
++connection_id_gen_,
*socket_factory_,
*this,
connection_timeout_millis_);
connection_timeout_millis_,
*io_contexts_[idx],
*io_resolvers_[idx]);
connection->connect();

// call the interceptor from user thread
Expand Down Expand Up @@ -1204,7 +1248,9 @@ Connection::Connection(
int connection_id, // NOLINT(cppcoreguidelines-pro-type-member-init)
internal::socket::SocketFactory& socket_factory,
ClientConnectionManagerImpl& client_connection_manager,
std::chrono::milliseconds& connect_timeout_in_millis)
std::chrono::milliseconds& connect_timeout_in_millis,
boost::asio::io_context& io,
boost::asio::ip::tcp::resolver& resolver)
: read_handler(*this, 16 << 10)
, start_time_(std::chrono::system_clock::now())
, closed_time_duration_()
Expand All @@ -1217,7 +1263,8 @@ Connection::Connection(
, last_write_time_(std::chrono::steady_clock::now().time_since_epoch())
{
(void)client_connection_manager;
socket_ = socket_factory.create(address, connect_timeout_in_millis);
socket_ =
socket_factory.create(address, connect_timeout_in_millis, io, resolver);
}

Connection::~Connection() = default;
Expand Down Expand Up @@ -1641,6 +1688,14 @@ HeartbeatManager::shutdown()
{
if (timer_) {
timer_->cancel();
// Release the timer while the execution service's thread pool is still
// alive. The timer holds an executor reference into that pool; if we
// defer releasing it to the HeartbeatManager destructor the pool will
// already have been destroyed (execution_service_ is a member of
// hazelcast_client_instance_impl declared before connection_manager_,
// so it is destroyed first), causing a use-after-free in the timer
// destructor.
timer_.reset();
}
}

Expand Down Expand Up @@ -1729,12 +1784,8 @@ wait_strategy::sleep()

namespace internal {
namespace socket {
SocketFactory::SocketFactory(spi::ClientContext& client_context,
boost::asio::io_context& io,
boost::asio::ip::tcp::resolver& resolver)
SocketFactory::SocketFactory(spi::ClientContext& client_context)
: client_context_(client_context)
, io_(io)
, io_resolver_(resolver)
{
}

Expand Down Expand Up @@ -1808,30 +1859,32 @@ SocketFactory::start()

std::unique_ptr<hazelcast::client::socket>
SocketFactory::create(const address& address,
std::chrono::milliseconds& connect_timeout_in_millis)
std::chrono::milliseconds& connect_timeout_in_millis,
boost::asio::io_context& io,
boost::asio::ip::tcp::resolver& resolver)
{
#ifdef HZ_BUILD_WITH_SSL
if (ssl_context_.get()) {
return std::unique_ptr<hazelcast::client::socket>(
new internal::socket::SSLSocket(io_,
new internal::socket::SSLSocket(io,
*ssl_context_,
address,
client_context_.get_client_config()
.get_network_config()
.get_socket_options(),
connect_timeout_in_millis,
io_resolver_));
resolver));
}
#endif

return std::unique_ptr<hazelcast::client::socket>(
new internal::socket::TcpSocket(io_,
new internal::socket::TcpSocket(io,
address,
client_context_.get_client_config()
.get_network_config()
.get_socket_options(),
connect_timeout_in_millis,
io_resolver_));
resolver));
}

#ifdef HZ_BUILD_WITH_SSL
Expand Down
12 changes: 12 additions & 0 deletions hazelcast/src/hazelcast/client/spi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1653,6 +1653,18 @@ ClientInvocation::set_exception(const std::exception& e,
"%1%, %2% Exception to be set: %3%") %
se.what() % *this % e.what()));
}
} catch (std::exception& ex) {
HZ_LOG(logger_,
warning,
boost::str(
boost::format("Failed to set the exception for invocation. "
"%1%, %2% Exception to be set: %3%") %
ex.what() % *this % e.what()));
} catch (...) {
HZ_LOG(
logger_,
warning,
"Unhandled exception. Failed to set the exception for invocation.");
}
}

Expand Down
Loading
Loading