Rearchitect client pipeline: IO thread pool, response threads, lock-free writes [HZ-5387]#1412
Rearchitect client pipeline: IO thread pool, response threads, lock-free writes [HZ-5387]#1412ihsandemir wants to merge 40 commits intohazelcast:masterfrom
Conversation
Add two new client property constants to control threading: - IO_THREAD_COUNT (default 3): number of IO threads for networking - RESPONSE_THREAD_COUNT (default 2): number of response processing threads Includes static string definitions, constructor initializers, and accessor methods following the existing client_properties pattern.
When no explicit executor_pool_size is set in client_config, the user_executor_ thread pool now reads its size from the RESPONSE_THREAD_COUNT client property (default: 2), matching the Java client behavior. Previously it created an unbounded pool.
Instead of storing a single io_context& and resolver& as member fields, SocketFactory now receives them as parameters to create(). This enables callers to pass different io_context instances per connection, which is needed for multi-threaded IO support.
…ignment Replace single io_context/io_thread with vectors sized by IO_THREAD_COUNT property. New connections are assigned to IO contexts via atomic round-robin index, distributing network load across multiple threads.
Required for boost::concurrent_flat_map (global invocation registry) and boost::lockfree::queue (write queue MPSC).
Property: hazelcast.client.response.thread.count Default: 2 threads Follows IO_THREAD_COUNT pattern for configuration.
…rviceImpl Uses boost::concurrent_flat_map for thread-safe invocation tracking. Registers invocations in global map before writing to connection, mirroring Java client's registerInvocation pattern.
Introduce ClientResponseHandler with configurable thread count (hazelcast.client.response.thread.count, default 2) that processes responses on dedicated threads instead of IO threads. Uses per-thread MPSC queues with condition_variable for efficient distribution by correlation_id. Mirrors Java client's ClientResponseHandlerSupplier pattern.
Route regular responses through ClientResponseHandler.enqueue() instead of processing them directly on the IO thread. Backup events and listener events remain on the IO thread as they are lightweight. This offloads response deserialization and invocation completion to dedicated response threads.
Match the socket buffer size (DEFAULT_BUFFER_SIZE_BYTE = 128KB) for the ReadHandler to reduce the number of read syscalls and improve throughput for larger responses.
Replace per-message strand posting and individual async_write calls with a lock-free MPSC queue (boost::lockfree::queue). User threads push write entries without strand involvement. A single flush on the IO strand drains all queued messages into scatter-gather buffers for one batched async_write syscall. Also moves correlation ID generation to send() using an atomic counter on Connection, ensuring the global invocation map is populated with the correct ID before writing. Key changes: - BaseSocket: lock-free write_queue_, flush_write_queue() batch drain - Connection: allocate_call_id() with atomic counter - ClientInvocationServiceImpl::send(): generate + register correlation ID before write
Ensure invocations are removed from the global concurrent_flat_map when: - Connection closes (iterates per-connection map, deregisters each) - Invocation exception notification (deregister on error) - Listener event handler removal (deregister on cleanup) This prevents stale entries accumulating in the global map.
Revert allocate_call_id() on Connection and premature correlation ID generation in send(). Instead, preserve the existing generate_new_call_id + add_invocation_to_map flow which runs on the strand during flush. Key changes: - Restore generate_new_call_id and add_invocation_to_map in BaseSocket (make call_id_counter_ atomic for thread safety) - Add enqueue_write as new virtual method on socket interface per plan - Keep old async_write path for backward compatibility - Add OutboundEntry with correlation_id, message, and invocation fields - flush_write_queue generates IDs on strand, registers in both per-connection and global maps, then batch-writes - Add fail_queued_writes helper for error cleanup - Fix read buffer to use socket_options::get_buffer_size_in_bytes() instead of hardcoded value - Add Connection::register_invocation for global map access
- invoke()/invoke_urgent() now set correlation ID from CallIdSequence (matching Java ClientInvocation lines 154, 168) - send() registers invocation in global map before writing (matching Java registerInvocation in send()) - Remove old per-socket call ID generation (generate_new_call_id, add_invocation_to_map, call_id_counter_) - Remove old async_write/do_write/outbox per-message write path - Remove Connection::register_invocation (dead code) - Remove async_write from socket virtual interface
- Remove Connection::invocations map and deregister_invocation method - Event dispatch in handle_client_message uses global invocation map - notify_backup uses global map instead of connection_id-encoded lookup - Connection close uses notify_connection_closed to iterate global map - Backup timeout check uses global map via check_backup_timeouts - Simplify erase_invocation and remove_event_handler
- Removed the notify_backup method from ClientConnectionManagerImpl. - Integrated ClientResponseHandler into Connection for handling responses. - Updated ClientInvocation methods to include an erase parameter for better control over invocation lifecycle. - Simplified response processing in ClientResponseHandler to manage backup events and listener events more efficiently. - Adjusted connection initialization to pass the response handler, enhancing the overall response management architecture.
Backup events carry the source invocation's correlation ID in the message payload (after EVENT_HEADER_LEN), not in the message header. The header correlation ID belongs to the backup listener registration. Extract the source correlation ID from the payload to correctly notify the original invocation about backup completion.
The flush_scheduled_ atomic flag was intended to reduce strand posts when multiple writes arrive concurrently. However, it created a race condition where writes could get permanently stuck after ~10K ops under concurrent load: the flag could end up in a state where no thread would post a flush even though the queue had entries. Replace with unconditional strand post per enqueue_write call. The strand serializes execution and flush_write_queue is cheap when the queue is empty or a write is already in progress (early return).
Backup events use the backup listener's correlation ID in the header, which may not exist in the global invocation map (the listener registration invocation is erased after completion). Moving the backup event flag check before the invocation lookup ensures backup events are always processed — extracting the source invocation's correlation ID from the payload and notifying it correctly. This was causing operations to hang under concurrent load because backup-aware PUTs would never complete: the primary response set pending_response_ and waited for notify_backup(), but the backup event was dropped due to the missing listener invocation.
Backup events need special handling: their header correlation ID belongs to the backup listener registration (not in the global map). Process them directly on the IO thread in handle_client_message, matching the original code's pattern, rather than routing through the response handler queue.
Backup event handling with the global invocation map needs further investigation. Disable backup acks for now to allow benchmarking the pipeline rearchitecture changes.
…e usual completion path.
…nts and pending response management. This change enhances thread safety and ensures correct handling of concurrent operations. Removed unnecessary includes and updated related logic for atomic operations in the notify and completion methods.
…ons over to ClientInvocationServiceImpl similar to Java client.
Add double-check of backup_acks_received_ after storing pending_response_ to close the race window where notify_backup() runs between the initial check and the store, sees no pending_response_, and returns without completing the invocation. This mirrors the Java BaseInvocation.notifyResponse pattern and prevents invocations from getting stuck indefinitely when backup acks arrive concurrently with primary responses.
JackPGreen
left a comment
There was a problem hiding this comment.
I've done a very limited review to the best of my abilities and added some comments.
asan_symbolize.py
Outdated
There was a problem hiding this comment.
Why have we removed this? Guessing it's not used. But is it related to client pipeline changes?
There was a problem hiding this comment.
No, it is not used. no pipeline uses it.
There was a problem hiding this comment.
This is the same change as #1410 - to aid reviewing, could we have rebased on top of that or merged first to make this an easier (smaller) changeset to review?
There was a problem hiding this comment.
yes, i will rebase once i merge the other pr. i am working on that PR windows test failure. and after merge i will rebase.
hazelcast/include/hazelcast/client/internal/socket/BaseSocket.h
Outdated
Show resolved
Hide resolved
Co-authored-by: Jack Green <JackPGreen@Gmail.com>
Co-authored-by: Jack Green <JackPGreen@Gmail.com>
Co-authored-by: Jack Green <JackPGreen@Gmail.com>
Co-authored-by: Jack Green <JackPGreen@Gmail.com>
330e3c9 to
1e980b2
Compare
| , connect_timeout_(connect_timeout_in_millis) | ||
| , resolver_(io_resolver) | ||
| , socket_(io, context) | ||
| , write_queue_(128) |
| /// Called by IO thread to enqueue a response for async processing. | ||
| /// Distributes across response threads by correlation_id % thread_count. |
There was a problem hiding this comment.
| /// Called by IO thread to enqueue a response for async processing. | |
| /// Distributes across response threads by correlation_id % thread_count. | |
| // Called by IO thread to enqueue a response for async processing. | |
| // Distributes across response threads by correlation_id % thread_count. |
(I think!)
| return partition_arg_cache_size_; | ||
| } | ||
| std::chrono::milliseconds | ||
| client_properties::get_positive_millis_or_defult( |
There was a problem hiding this comment.
| client_properties::get_positive_millis_or_defult( | |
| client_properties::get_positive_millis_or_default( |
There was a problem hiding this comment.
Also I can't see the corresponding usage to update?
| bool erase) | ||
| { | ||
| // if a regular response comes and there are backups, we need to wait for | ||
| // the backups when the backups complete, the response will be send by the |
There was a problem hiding this comment.
| // the backups when the backups complete, the response will be send by the | |
| // the backups when the backups complete, the response will be sent by the |
|
|
||
| // we are going to notify the future that a response is available; this can | ||
| // happen when: | ||
| // - we had a regular operation (so no backups we need to wait for) that |
There was a problem hiding this comment.
| // - we had a regular operation (so no backups we need to wait for) that | |
| // - we had a regular operation (so no backups to await for) that |
Summary
IO_THREAD_COUNTproperty), distributing connections across threadsClientResponseHandlerthreads (RESPONSE_THREAD_COUNTproperty, default 2), preventing IO thread stallsboost::lockfree::queueand scatter-gatherasync_write, coalescing multiple outbound messages into single syscallsboost::concurrent_flat_mapfor O(1) correlation ID lookup, replacing per-connection mapsClientInvocation::notify()after storingpending_response_to prevent invocations from getting stuck when backup acks arrive concurrently with primary responses (mirrors JavaBaseInvocation.notifyResponsepattern)Design
Design doc at docs/plans/2026-03-05-cpp-pipeline-rearchitecture-design.md
Test plan
ClientMapTest.testGetwith 1000 entries — previously hung due to backup ack race conditionHazelcastTests1-8)