-
Notifications
You must be signed in to change notification settings - Fork 51
feat: add IO_THREAD_COUNT client property [HZ-5386] #1410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6b6b2b9
73ec288
0a915b6
fddde9f
ea1d63b
65ea08f
ac0f16d
2817ccc
db790cb
809bb5d
6024b40
09ec6a5
14f9ca3
fbe7678
bc3a43d
e97a6e0
687ced0
e91a190
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -989,6 +989,10 @@ const std::string client_properties::INTERNAL_EXECUTOR_POOL_SIZE = | |
| "hazelcast.client.internal.executor.pool.size"; | ||
| const std::string client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT = "3"; | ||
|
|
||
| const std::string client_properties::IO_THREAD_COUNT = | ||
| "hazelcast.client.io.thread.count"; | ||
| const std::string client_properties::IO_THREAD_COUNT_DEFAULT = "3"; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if there was documentation to explain why
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a study conducted a while ago reported at here which gives details about the choice of 3.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we link to that in the code to help a future maintainer?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is internal.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, doesn't cpp client verify the public configuration? What do we expect when count is meaningless?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I understand that it won't help anyone outside of Hazelcast.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @emreyigit good catch. Added the check 14f9ca3 |
||
|
|
||
| const std::string client_properties::SHUFFLE_MEMBER_LIST = | ||
| "hazelcast.client.shuffle.member.list"; | ||
| const std::string client_properties::SHUFFLE_MEMBER_LIST_DEFAULT = "true"; | ||
|
|
@@ -1060,6 +1064,7 @@ client_properties::client_properties( | |
| , event_thread_count_(EVENT_THREAD_COUNT, EVENT_THREAD_COUNT_DEFAULT) | ||
| , internal_executor_pool_size_(INTERNAL_EXECUTOR_POOL_SIZE, | ||
| INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT) | ||
| , io_thread_count_(IO_THREAD_COUNT, IO_THREAD_COUNT_DEFAULT) | ||
| , shuffle_member_list_(SHUFFLE_MEMBER_LIST, SHUFFLE_MEMBER_LIST_DEFAULT) | ||
| , max_concurrent_invocations_(MAX_CONCURRENT_INVOCATIONS, | ||
| MAX_CONCURRENT_INVOCATIONS_DEFAULT) | ||
|
|
@@ -1122,6 +1127,12 @@ client_properties::get_internal_executor_pool_size() const | |
| return internal_executor_pool_size_; | ||
| } | ||
|
|
||
| const client_property& | ||
| client_properties::get_io_thread_count() const | ||
| { | ||
| return io_thread_count_; | ||
| } | ||
|
|
||
| const client_property& | ||
| client_properties::get_shuffle_member_list() const | ||
| { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -114,24 +114,33 @@ ClientConnectionManagerImpl::start() | |
| return false; | ||
| } | ||
|
|
||
| io_context_.reset(new boost::asio::io_context); | ||
| io_resolver_.reset( | ||
| new boost::asio::ip::tcp::resolver(io_context_->get_executor())); | ||
| socket_factory_.reset(new internal::socket::SocketFactory( | ||
| client_, *io_context_, *io_resolver_)); | ||
| auto guard = boost::asio::make_work_guard(*io_context_); | ||
| io_guard_ = std::unique_ptr< | ||
| boost::asio::executor_work_guard<boost::asio::io_context::executor_type>>( | ||
| new boost::asio::executor_work_guard< | ||
| boost::asio::io_context::executor_type>(std::move(guard))); | ||
| auto& props = client_.get_client_properties(); | ||
| int configured_io_thread_count = | ||
| props.get_integer(props.get_io_thread_count()); | ||
| int io_thread_count = find_thread_count(configured_io_thread_count); | ||
|
|
||
| socket_factory_.reset(new internal::socket::SocketFactory(client_)); | ||
|
|
||
| if (!socket_factory_->start()) { | ||
| return false; | ||
| } | ||
|
|
||
| socket_interceptor_ = client_.get_client_config().get_socket_interceptor(); | ||
|
|
||
| io_thread_ = std::thread([=]() { io_context_->run(); }); | ||
| for (int i = 0; i < io_thread_count; ++i) { | ||
| auto ctx = | ||
| std::unique_ptr<boost::asio::io_context>(new boost::asio::io_context); | ||
| io_guards_.push_back(std::unique_ptr<boost::asio::executor_work_guard< | ||
| boost::asio::io_context::executor_type>>( | ||
| new boost::asio::executor_work_guard< | ||
| boost::asio::io_context::executor_type>( | ||
| boost::asio::make_work_guard(*ctx)))); | ||
| io_resolvers_.push_back(std::unique_ptr<boost::asio::ip::tcp::resolver>( | ||
| new boost::asio::ip::tcp::resolver(ctx->get_executor()))); | ||
| auto raw_ctx = ctx.get(); | ||
| io_contexts_.push_back(std::move(ctx)); | ||
| io_threads_.emplace_back([raw_ctx]() { raw_ctx->run(); }); | ||
| } | ||
|
Comment on lines
+130
to
+143
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is definitely my inexperience, but this looks... intense.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. well, that is how C++ code looks. basically we are initializing the vectors and starting the io threads. Lock guards are used for finishing the io threads running. I will put more comments. |
||
|
|
||
| executor_.reset( | ||
| new hazelcast::util::hz_thread_pool(EXECUTOR_CORE_POOL_SIZE)); | ||
|
|
@@ -192,13 +201,27 @@ ClientConnectionManagerImpl::shutdown() | |
| spi::impl::ClientExecutionServiceImpl::shutdown_thread_pool( | ||
| executor_.get()); | ||
|
|
||
| // release the guard so that the io thread can stop gracefully | ||
| io_guard_.reset(); | ||
| io_thread_.join(); | ||
| // release the guards so that the io threads can stop gracefully | ||
| for (auto& guard : io_guards_) { | ||
| guard.reset(); | ||
| } | ||
| for (auto& thread : io_threads_) { | ||
| if (thread.joinable()) { | ||
| thread.join(); | ||
| } | ||
| } | ||
|
|
||
| // Release internal bookkeeping references to connections and listeners. | ||
| // io_resolvers_ and io_contexts_ are intentionally NOT cleared here: | ||
| // user code (e.g. transaction_context) may still hold | ||
| // shared_ptr<Connection> objects whose socket/backup_timer destructors | ||
| // reference these io_contexts. Leaving the io_contexts alive until | ||
| // ~ClientConnectionManagerImpl() ensures they outlive any such lingering | ||
| // Connection references. | ||
| connection_listeners_.clear(); | ||
| active_connections_.clear(); | ||
| active_connection_ids_.clear(); | ||
| io_guards_.clear(); | ||
| } | ||
|
|
||
| std::shared_ptr<Connection> | ||
|
|
@@ -902,6 +925,24 @@ ClientConnectionManagerImpl::on_authenticated( | |
| return connection; | ||
| } | ||
|
|
||
| int | ||
| ClientConnectionManagerImpl::find_thread_count( | ||
| int configured_thread_count) const | ||
| { | ||
| if (configured_thread_count != -1) { | ||
| return configured_thread_count; | ||
| } | ||
|
|
||
| // uni-socket client | ||
| if (!smart_routing_enabled_) { | ||
| return 1; | ||
| } | ||
|
|
||
| return (util::get_available_core_count() > SMALL_MACHINE_PROCESSOR_COUNT) | ||
| ? DEFAULT_IO_THREAD_COUNT | ||
| : 1; | ||
| } | ||
|
|
||
| void | ||
| ClientConnectionManagerImpl::fire_life_cycle_event( | ||
| lifecycle_event::lifecycle_state state) | ||
|
|
@@ -1062,12 +1103,15 @@ ClientConnectionManagerImpl::connect(const address& addr) | |
| info, | ||
| boost::str(boost::format("Trying to connect to %1%.") % addr)); | ||
|
|
||
| auto idx = next_io_index_.fetch_add(1) % io_contexts_.size(); | ||
| auto connection = std::make_shared<Connection>(addr, | ||
| client_, | ||
| ++connection_id_gen_, | ||
| *socket_factory_, | ||
| *this, | ||
| connection_timeout_millis_); | ||
| connection_timeout_millis_, | ||
| *io_contexts_[idx], | ||
| *io_resolvers_[idx]); | ||
| connection->connect(); | ||
|
|
||
| // call the interceptor from user thread | ||
|
|
@@ -1204,7 +1248,9 @@ Connection::Connection( | |
| int connection_id, // NOLINT(cppcoreguidelines-pro-type-member-init) | ||
| internal::socket::SocketFactory& socket_factory, | ||
| ClientConnectionManagerImpl& client_connection_manager, | ||
| std::chrono::milliseconds& connect_timeout_in_millis) | ||
| std::chrono::milliseconds& connect_timeout_in_millis, | ||
| boost::asio::io_context& io, | ||
| boost::asio::ip::tcp::resolver& resolver) | ||
| : read_handler(*this, 16 << 10) | ||
| , start_time_(std::chrono::system_clock::now()) | ||
| , closed_time_duration_() | ||
|
|
@@ -1217,7 +1263,8 @@ Connection::Connection( | |
| , last_write_time_(std::chrono::steady_clock::now().time_since_epoch()) | ||
| { | ||
| (void)client_connection_manager; | ||
| socket_ = socket_factory.create(address, connect_timeout_in_millis); | ||
| socket_ = | ||
| socket_factory.create(address, connect_timeout_in_millis, io, resolver); | ||
| } | ||
|
|
||
| Connection::~Connection() = default; | ||
|
|
@@ -1641,6 +1688,14 @@ HeartbeatManager::shutdown() | |
| { | ||
| if (timer_) { | ||
| timer_->cancel(); | ||
| // Release the timer while the execution service's thread pool is still | ||
| // alive. The timer holds an executor reference into that pool; if we | ||
| // defer releasing it to the HeartbeatManager destructor the pool will | ||
| // already have been destroyed (execution_service_ is a member of | ||
| // hazelcast_client_instance_impl declared before connection_manager_, | ||
| // so it is destroyed first), causing a use-after-free in the timer | ||
| // destructor. | ||
| timer_.reset(); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1729,12 +1784,8 @@ wait_strategy::sleep() | |
|
|
||
| namespace internal { | ||
| namespace socket { | ||
| SocketFactory::SocketFactory(spi::ClientContext& client_context, | ||
| boost::asio::io_context& io, | ||
| boost::asio::ip::tcp::resolver& resolver) | ||
| SocketFactory::SocketFactory(spi::ClientContext& client_context) | ||
| : client_context_(client_context) | ||
| , io_(io) | ||
| , io_resolver_(resolver) | ||
| { | ||
| } | ||
|
|
||
|
|
@@ -1808,30 +1859,32 @@ SocketFactory::start() | |
|
|
||
| std::unique_ptr<hazelcast::client::socket> | ||
| SocketFactory::create(const address& address, | ||
| std::chrono::milliseconds& connect_timeout_in_millis) | ||
| std::chrono::milliseconds& connect_timeout_in_millis, | ||
| boost::asio::io_context& io, | ||
| boost::asio::ip::tcp::resolver& resolver) | ||
| { | ||
| #ifdef HZ_BUILD_WITH_SSL | ||
| if (ssl_context_.get()) { | ||
| return std::unique_ptr<hazelcast::client::socket>( | ||
| new internal::socket::SSLSocket(io_, | ||
| new internal::socket::SSLSocket(io, | ||
| *ssl_context_, | ||
| address, | ||
| client_context_.get_client_config() | ||
| .get_network_config() | ||
| .get_socket_options(), | ||
| connect_timeout_in_millis, | ||
| io_resolver_)); | ||
| resolver)); | ||
| } | ||
| #endif | ||
|
|
||
| return std::unique_ptr<hazelcast::client::socket>( | ||
| new internal::socket::TcpSocket(io_, | ||
| new internal::socket::TcpSocket(io, | ||
| address, | ||
| client_context_.get_client_config() | ||
| .get_network_config() | ||
| .get_socket_options(), | ||
| connect_timeout_in_millis, | ||
| io_resolver_)); | ||
| resolver)); | ||
| } | ||
|
|
||
| #ifdef HZ_BUILD_WITH_SSL | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has moved (wasn't updated). Any good reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no reason except that socket factory is using io_context and resolver in its methods, they are related but move is a random choice, I can revert it, no problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a massive problem, just wanted to check my understanding.