Skip to content

Rearchitect client pipeline: IO thread pool, response threads, lock-free writes [HZ-5387]#1412

Open
ihsandemir wants to merge 40 commits intohazelcast:masterfrom
ihsandemir:rearchitecture
Open

Rearchitect client pipeline: IO thread pool, response threads, lock-free writes [HZ-5387]#1412
ihsandemir wants to merge 40 commits intohazelcast:masterfrom
ihsandemir:rearchitecture

Conversation

@ihsandemir
Copy link
Collaborator

@ihsandemir ihsandemir commented Mar 12, 2026

Summary

  • IO thread pool with round-robin connection assignment — replaces single-IO-thread model with configurable pool (IO_THREAD_COUNT property), distributing connections across threads
  • Dedicated response thread pool — offloads invocation completion from IO threads to dedicated ClientResponseHandler threads (RESPONSE_THREAD_COUNT property, default 2), preventing IO thread stalls
  • Lock-free write queue with batch flushing — replaces per-write dispatch with boost::lockfree::queue and scatter-gather async_write, coalescing multiple outbound messages into single syscalls
  • Global concurrent invocation registry — uses boost::concurrent_flat_map for O(1) correlation ID lookup, replacing per-connection maps
  • CallIdSequence for correlation IDs — mirrors Java client pattern with optional backpressure support
  • Fix race condition in backup ack handling — adds double-check in ClientInvocation::notify() after storing pending_response_ to prevent invocations from getting stuck when backup acks arrive concurrently with primary responses (mirrors Java BaseInvocation.notifyResponse pattern)

Design

Design doc at docs/plans/2026-03-05-cpp-pipeline-rearchitecture-design.md‎

Test plan

  • [x ] Run ClientMapTest.testGet with 1000 entries — previously hung due to backup ack race condition
  • Run unit tests against 2-node cluster to validate backup-aware operations
  • Verify no regression in existing tests (all HazelcastTests1-8)
  • Benchmark sequential put/get throughput to confirm pipeline improvements

Add two new client property constants to control threading:
- IO_THREAD_COUNT (default 3): number of IO threads for networking
- RESPONSE_THREAD_COUNT (default 2): number of response processing threads

Includes static string definitions, constructor initializers, and
accessor methods following the existing client_properties pattern.
When no explicit executor_pool_size is set in client_config, the
user_executor_ thread pool now reads its size from the
RESPONSE_THREAD_COUNT client property (default: 2), matching the
Java client behavior. Previously it created an unbounded pool.
Instead of storing a single io_context& and resolver& as member fields,
SocketFactory now receives them as parameters to create(). This enables
callers to pass different io_context instances per connection, which is
needed for multi-threaded IO support.
…ignment

Replace single io_context/io_thread with vectors sized by IO_THREAD_COUNT
property. New connections are assigned to IO contexts via atomic round-robin
index, distributing network load across multiple threads.
Required for boost::concurrent_flat_map (global invocation registry)
and boost::lockfree::queue (write queue MPSC).
Property: hazelcast.client.response.thread.count
Default: 2 threads
Follows IO_THREAD_COUNT pattern for configuration.
…rviceImpl

Uses boost::concurrent_flat_map for thread-safe invocation tracking.
Registers invocations in global map before writing to connection,
mirroring Java client's registerInvocation pattern.
Introduce ClientResponseHandler with configurable thread count
(hazelcast.client.response.thread.count, default 2) that processes
responses on dedicated threads instead of IO threads. Uses per-thread
MPSC queues with condition_variable for efficient distribution by
correlation_id. Mirrors Java client's ClientResponseHandlerSupplier
pattern.
Route regular responses through ClientResponseHandler.enqueue() instead
of processing them directly on the IO thread. Backup events and listener
events remain on the IO thread as they are lightweight. This offloads
response deserialization and invocation completion to dedicated response
threads.
Match the socket buffer size (DEFAULT_BUFFER_SIZE_BYTE = 128KB) for
the ReadHandler to reduce the number of read syscalls and improve
throughput for larger responses.
Replace per-message strand posting and individual async_write calls with
a lock-free MPSC queue (boost::lockfree::queue). User threads push
write entries without strand involvement. A single flush on the IO
strand drains all queued messages into scatter-gather buffers for one
batched async_write syscall.

Also moves correlation ID generation to send() using an atomic counter
on Connection, ensuring the global invocation map is populated with the
correct ID before writing.

Key changes:
- BaseSocket: lock-free write_queue_, flush_write_queue() batch drain
- Connection: allocate_call_id() with atomic counter
- ClientInvocationServiceImpl::send(): generate + register correlation
  ID before write
Ensure invocations are removed from the global concurrent_flat_map when:
- Connection closes (iterates per-connection map, deregisters each)
- Invocation exception notification (deregister on error)
- Listener event handler removal (deregister on cleanup)

This prevents stale entries accumulating in the global map.
Revert allocate_call_id() on Connection and premature correlation ID
generation in send(). Instead, preserve the existing generate_new_call_id
+ add_invocation_to_map flow which runs on the strand during flush.

Key changes:
- Restore generate_new_call_id and add_invocation_to_map in BaseSocket
  (make call_id_counter_ atomic for thread safety)
- Add enqueue_write as new virtual method on socket interface per plan
- Keep old async_write path for backward compatibility
- Add OutboundEntry with correlation_id, message, and invocation fields
- flush_write_queue generates IDs on strand, registers in both
  per-connection and global maps, then batch-writes
- Add fail_queued_writes helper for error cleanup
- Fix read buffer to use socket_options::get_buffer_size_in_bytes()
  instead of hardcoded value
- Add Connection::register_invocation for global map access
- invoke()/invoke_urgent() now set correlation ID from CallIdSequence
  (matching Java ClientInvocation lines 154, 168)
- send() registers invocation in global map before writing
  (matching Java registerInvocation in send())
- Remove old per-socket call ID generation (generate_new_call_id,
  add_invocation_to_map, call_id_counter_)
- Remove old async_write/do_write/outbox per-message write path
- Remove Connection::register_invocation (dead code)
- Remove async_write from socket virtual interface
- Remove Connection::invocations map and deregister_invocation method
- Event dispatch in handle_client_message uses global invocation map
- notify_backup uses global map instead of connection_id-encoded lookup
- Connection close uses notify_connection_closed to iterate global map
- Backup timeout check uses global map via check_backup_timeouts
- Simplify erase_invocation and remove_event_handler
- Removed the notify_backup method from ClientConnectionManagerImpl.
- Integrated ClientResponseHandler into Connection for handling responses.
- Updated ClientInvocation methods to include an erase parameter for better control over invocation lifecycle.
- Simplified response processing in ClientResponseHandler to manage backup events and listener events more efficiently.
- Adjusted connection initialization to pass the response handler, enhancing the overall response management architecture.
Backup events carry the source invocation's correlation ID in the
message payload (after EVENT_HEADER_LEN), not in the message header.
The header correlation ID belongs to the backup listener registration.
Extract the source correlation ID from the payload to correctly notify
the original invocation about backup completion.
The flush_scheduled_ atomic flag was intended to reduce strand posts
when multiple writes arrive concurrently. However, it created a race
condition where writes could get permanently stuck after ~10K ops
under concurrent load: the flag could end up in a state where no
thread would post a flush even though the queue had entries.

Replace with unconditional strand post per enqueue_write call. The
strand serializes execution and flush_write_queue is cheap when the
queue is empty or a write is already in progress (early return).
Backup events use the backup listener's correlation ID in the header,
which may not exist in the global invocation map (the listener
registration invocation is erased after completion). Moving the
backup event flag check before the invocation lookup ensures backup
events are always processed — extracting the source invocation's
correlation ID from the payload and notifying it correctly.

This was causing operations to hang under concurrent load because
backup-aware PUTs would never complete: the primary response set
pending_response_ and waited for notify_backup(), but the backup
event was dropped due to the missing listener invocation.
Backup events need special handling: their header correlation ID
belongs to the backup listener registration (not in the global map).
Process them directly on the IO thread in handle_client_message,
matching the original code's pattern, rather than routing through
the response handler queue.
Backup event handling with the global invocation map needs further
investigation. Disable backup acks for now to allow benchmarking
the pipeline rearchitecture changes.
…nts and pending response management. This change enhances thread safety and ensures correct handling of concurrent operations. Removed unnecessary includes and updated related logic for atomic operations in the notify and completion methods.
…ons over to ClientInvocationServiceImpl similar to Java client.
Add double-check of backup_acks_received_ after storing pending_response_
to close the race window where notify_backup() runs between the initial
check and the store, sees no pending_response_, and returns without
completing the invocation. This mirrors the Java BaseInvocation.notifyResponse
pattern and prevents invocations from getting stuck indefinitely when
backup acks arrive concurrently with primary responses.
@ihsandemir ihsandemir self-assigned this Mar 12, 2026
@ihsandemir ihsandemir added this to the 5.7.0 milestone Mar 12, 2026
@ihsandemir ihsandemir requested a review from JackPGreen March 12, 2026 21:03
@ihsandemir ihsandemir changed the title Rearchitect client pipeline: IO thread pool, response threads, lock-free writes Rearchitect client pipeline: IO thread pool, response threads, lock-free writes [HZ-5387] Mar 12, 2026
Copy link
Contributor

@JackPGreen JackPGreen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done a very limited review to the best of my abilities and added some comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have we removed this? Guessing it's not used. But is it related to client pipeline changes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is not used. no pipeline uses it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same change as #1410 - to aid reviewing, could we have rebased on top of that or merged first to make this an easier (smaller) changeset to review?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i will rebase once i merge the other pr. i am working on that PR windows test failure. and after merge i will rebase.

ihsandemir and others added 4 commits March 13, 2026 15:32
Co-authored-by: Jack Green <JackPGreen@Gmail.com>
Co-authored-by: Jack Green <JackPGreen@Gmail.com>
Co-authored-by: Jack Green <JackPGreen@Gmail.com>
Co-authored-by: Jack Green <JackPGreen@Gmail.com>
@ihsandemir ihsandemir requested a review from JackPGreen March 17, 2026 07:01
, connect_timeout_(connect_timeout_in_millis)
, resolver_(io_resolver)
, socket_(io, context)
, write_queue_(128)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another magic number.

Comment on lines +56 to +57
/// Called by IO thread to enqueue a response for async processing.
/// Distributes across response threads by correlation_id % thread_count.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Called by IO thread to enqueue a response for async processing.
/// Distributes across response threads by correlation_id % thread_count.
// Called by IO thread to enqueue a response for async processing.
// Distributes across response threads by correlation_id % thread_count.

(I think!)

return partition_arg_cache_size_;
}
std::chrono::milliseconds
client_properties::get_positive_millis_or_defult(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
client_properties::get_positive_millis_or_defult(
client_properties::get_positive_millis_or_default(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I can't see the corresponding usage to update?

bool erase)
{
// if a regular response comes and there are backups, we need to wait for
// the backups when the backups complete, the response will be send by the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// the backups when the backups complete, the response will be send by the
// the backups when the backups complete, the response will be sent by the


// we are going to notify the future that a response is available; this can
// happen when:
// - we had a regular operation (so no backups we need to wait for) that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// - we had a regular operation (so no backups we need to wait for) that
// - we had a regular operation (so no backups to await for) that

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants