Skip to content

Commit 92ec38f

Browse files
committed
Improve queue comparison benchmark
Refactors consumer loops to check for both stop signal and queue emptiness, improving correctness and shutdown behavior. Renames DefaultQueueCapacity to QueueCapacity for consistency. Enhances batch read/write logic for clarity and correctness, and adds more robust verification and error handling.
1 parent a914743 commit 92ec38f

1 file changed

Lines changed: 93 additions & 39 deletions

File tree

benchmarks/queue_comparison_benchmark.cpp

Lines changed: 93 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
#include <random>
99

1010
using DataType = int64_t;
11-
constexpr size_t RandomDataSize = 4001; // The prime to make patterns less obvious
12-
constexpr size_t ItemsPerIteration = 100'025;
13-
constexpr size_t DefaultQueueCapacity = 65536; // 2^16
11+
constexpr size_t RandomDataSize = 4001; // The prime to make patterns less obvious
12+
constexpr size_t ItemsPerIteration = 100'025;
13+
constexpr size_t QueueCapacity = 65536; // 2^16
1414

1515
// Shared Test Data Generation
1616
const std::vector<DataType>& get_random_data() {
@@ -28,14 +28,16 @@ const std::vector<DataType>& get_random_data() {
2828

2929
static void BM_ThisQueue_SingleItem(benchmark::State& state) {
3030
const auto& random_data = get_random_data();
31-
std::vector<DataType> shared_buffer(DefaultQueueCapacity);
31+
std::vector<DataType> shared_buffer(QueueCapacity);
3232
LockFreeSpscQueue<DataType> queue(shared_buffer);
3333

3434
std::atomic<bool> verification_failed = false;
3535
std::atomic<bool> consumer_should_stop = false;
3636
std::jthread consumer([&] {
3737
size_t i = 0;
38-
while (!consumer_should_stop.load(std::memory_order_relaxed)) {
38+
while (!( consumer_should_stop.load(std::memory_order_relaxed)
39+
&& queue.get_num_items_ready() == 0))
40+
{
3941
auto scope = queue.prepare_read(1);
4042
if (scope.get_items_read() == 1) {
4143
if (scope.get_block1()[0] != random_data[i % RandomDataSize]) verification_failed.store(true);
@@ -48,10 +50,10 @@ static void BM_ThisQueue_SingleItem(benchmark::State& state) {
4850

4951
size_t total_written = 0;
5052
for (auto _ : state) {
53+
if (verification_failed.load(std::memory_order_relaxed)) {
54+
state.SkipWithError("Verification failed!"); return;
55+
}
5156
for (size_t n = 0; n < ItemsPerIteration; ++n) {
52-
if (verification_failed.load(std::memory_order_relaxed)) {
53-
state.SkipWithError("Verification failed!"); return;
54-
}
5557
const auto& item_to_write = random_data[total_written % RandomDataSize];
5658
while (true) {
5759
auto scope = queue.prepare_write(1);
@@ -63,6 +65,7 @@ static void BM_ThisQueue_SingleItem(benchmark::State& state) {
6365
total_written++;
6466
}
6567
}
68+
6669
consumer_should_stop.store(true, std::memory_order_relaxed);
6770
state.SetItemsProcessed(total_written);
6871
}
@@ -73,19 +76,31 @@ BENCHMARK(BM_ThisQueue_SingleItem)->Unit(benchmark::kMillisecond)->UseRealTime()
7376

7477
static void BM_ThisQueue_SingleItem_Write256(benchmark::State& state) {
7578
const auto& random_data = get_random_data();
76-
std::vector<DataType> shared_buffer(DefaultQueueCapacity);
79+
std::vector<DataType> shared_buffer(QueueCapacity);
7780
LockFreeSpscQueue<DataType> queue(shared_buffer);
7881

7982
std::atomic<bool> verification_failed = false;
8083
std::atomic<bool> consumer_should_stop = false;
81-
std::jthread consumer([&] {
82-
size_t i = 0;
83-
while (!consumer_should_stop.load(std::memory_order_relaxed)) {
84-
auto scope = queue.prepare_read(1);
85-
if (scope.get_items_read() == 1) {
86-
if (scope.get_block1()[0] != random_data[i % RandomDataSize]) verification_failed.store(true);
87-
i++;
88-
} else {
84+
85+
std::jthread consumer([&]{
86+
size_t received_count = 0;
87+
while (!( consumer_should_stop.load(std::memory_order_relaxed)
88+
&& queue.get_num_items_ready() == 0))
89+
{
90+
// Always read much items from the queue as they are ready
91+
const size_t items_read = queue.try_read(QueueCapacity, [&](auto b1, auto b2) {
92+
for (const auto& item : b1) {
93+
if (item != random_data[(received_count++) % RandomDataSize]) {
94+
verification_failed.store(true);
95+
}
96+
}
97+
for (const auto& item : b2) {
98+
if (item != random_data[(received_count++) % RandomDataSize]) {
99+
verification_failed.store(true);
100+
}
101+
}
102+
});
103+
if (items_read == 0) {
89104
std::this_thread::yield();
90105
}
91106
}
@@ -97,7 +112,10 @@ static void BM_ThisQueue_SingleItem_Write256(benchmark::State& state) {
97112
// Start a transaction for up to 256 items at a time
98113
auto transaction = queue.try_start_write(256);
99114
if (transaction) {
100-
while(n < ItemsPerIteration && transaction->try_push(random_data[total_written % RandomDataSize])) {
115+
// Push a single item into transaction in a loop, until the transaction is full
116+
while( n < ItemsPerIteration
117+
&& transaction->try_push(random_data[total_written % RandomDataSize]))
118+
{
101119
total_written++;
102120
n++;
103121
}
@@ -113,13 +131,15 @@ BENCHMARK(BM_ThisQueue_SingleItem_Write256)->Unit(benchmark::kMillisecond)->UseR
113131

114132
static void BM_Moodycamel_SingleItem(benchmark::State& state) {
115133
const auto& random_data = get_random_data();
116-
moodycamel::ReaderWriterQueue<DataType> queue(DefaultQueueCapacity);
134+
moodycamel::ReaderWriterQueue<DataType> queue(QueueCapacity);
117135
std::atomic<bool> verification_failed = false;
118136
std::atomic<bool> consumer_should_stop = false;
119137
std::jthread consumer([&] {
120138
size_t i = 0;
121139
DataType item;
122-
while (!consumer_should_stop.load(std::memory_order_relaxed)) {
140+
while (!( consumer_should_stop.load(std::memory_order_relaxed)
141+
&& queue.size_approx() == 0))
142+
{
123143
if (queue.try_dequeue(item)) {
124144
if (item != random_data[i % RandomDataSize]) verification_failed.store(true);
125145
i++;
@@ -151,41 +171,67 @@ BENCHMARK(BM_Moodycamel_SingleItem)->Unit(benchmark::kMillisecond)->UseRealTime(
151171
static void BM_ThisQueue_Batch(benchmark::State& state) {
152172
const size_t batch_size = state.range(0);
153173
const auto& random_data = get_random_data();
154-
std::vector<DataType> shared_buffer(DefaultQueueCapacity);
174+
std::vector<DataType> shared_buffer(QueueCapacity);
155175
LockFreeSpscQueue<DataType> queue(shared_buffer);
156176

157-
std::atomic<bool> verification_failed = false;
177+
std::atomic<bool> verification_failed = false;
158178
std::atomic<bool> consumer_should_stop = false;
159179
std::jthread consumer([&]{
160180
size_t received_count = 0;
161-
while (!consumer_should_stop.load(std::memory_order_relaxed)) {
162-
const size_t items_read = queue.try_read(batch_size, [&](auto b1, auto b2){
163-
for (const auto& item : b1) if (item != random_data[(received_count++) % RandomDataSize]) verification_failed.store(true);
164-
for (const auto& item : b2) if (item != random_data[(received_count++) % RandomDataSize]) verification_failed.store(true);
181+
while (!( consumer_should_stop.load(std::memory_order_relaxed)
182+
&& queue.get_num_items_ready() == 0))
183+
{
184+
const size_t items_read = queue.try_read(batch_size, [&](auto b1, auto b2) {
185+
// Read and verify block1
186+
for (const auto& item : b1) {
187+
if (item != random_data[(received_count++) % RandomDataSize]) {
188+
verification_failed.store(true);
189+
}
190+
}
191+
// Read and verify block2
192+
for (const auto& item : b2) {
193+
if (item != random_data[(received_count++) % RandomDataSize]) {
194+
verification_failed.store(true);
195+
}
196+
}
165197
});
166-
if (items_read == 0) std::this_thread::yield();
198+
if (items_read == 0) {
199+
std::this_thread::yield();
200+
}
167201
}
168202
});
169203

170204
size_t total_written = 0;
171205
for (auto _ : state) {
172206
for (size_t n = 0; n < ItemsPerIteration; ) {
173207
if (verification_failed.load(std::memory_order_relaxed)) {
174-
state.SkipWithError("Verification failed!"); consumer_should_stop.store(true); return;
208+
state.SkipWithError("Verification failed!");
209+
consumer_should_stop.store(true);
210+
return;
175211
}
176212

177-
const size_t current_rand_idx = total_written % RandomDataSize;
213+
const size_t current_rand_idx = total_written % RandomDataSize;
178214
const size_t items_to_rand_end = RandomDataSize - current_rand_idx;
179-
size_t remaining_in_iter = ItemsPerIteration - n;
180-
size_t batch_to_send_size = std::min({batch_size, remaining_in_iter, items_to_rand_end});
215+
size_t remaining_in_iter = ItemsPerIteration - n;
216+
size_t batch_to_send_size = std::min({batch_size,
217+
remaining_in_iter,
218+
items_to_rand_end});
181219

182220
std::span<const DataType> sub_batch(&random_data[current_rand_idx], batch_to_send_size);
183221

184222
size_t written_this_batch = 0;
185-
while(written_this_batch < sub_batch.size()) {
186-
written_this_batch += queue.try_write(sub_batch.size() - written_this_batch, [&](auto b1, auto b2){
223+
while (written_this_batch < sub_batch.size()) {
224+
written_this_batch += queue.try_write(sub_batch.size() - written_this_batch,
225+
[&](auto b1, auto b2)
226+
{
227+
// Copy block1
187228
std::copy_n(sub_batch.begin() + written_this_batch, b1.size(), b1.begin());
188-
if (!b2.empty()) std::copy_n(sub_batch.begin() + written_this_batch + b1.size(), b2.size(), b2.begin());
229+
if (!b2.empty()) {
230+
// Copy block2 (if needed)
231+
std::copy_n(sub_batch.begin() + written_this_batch + b1.size(),
232+
b2.size(),
233+
b2.begin());
234+
}
189235
});
190236
}
191237
n += written_this_batch;
@@ -201,32 +247,40 @@ BENCHMARK(BM_ThisQueue_Batch)->Arg(4)->Arg(8)->Arg(16)->Arg(64)->Arg(256)->Unit(
201247
static void BM_Moodycamel_Batch(benchmark::State& state) {
202248
const size_t batch_size = state.range(0);
203249
const auto& random_data = get_random_data();
204-
moodycamel::ReaderWriterQueue<DataType> queue(DefaultQueueCapacity);
250+
moodycamel::ReaderWriterQueue<DataType> queue(QueueCapacity);
205251

206252
std::atomic<bool> verification_failed = false;
207253
std::atomic<bool> consumer_should_stop = false;
208254
std::jthread consumer([&]{
209255
size_t received_count = 0;
210256
DataType item;
211-
while (!consumer_should_stop.load(std::memory_order_relaxed)) {
257+
while (!( consumer_should_stop.load(std::memory_order_relaxed)
258+
&& queue.size_approx() == 0))
259+
{
212260
bool dequeued_something = false;
213261
for (size_t i = 0; i < batch_size; ++i) {
214262
if (queue.try_dequeue(item)) {
215-
if (item != random_data[(received_count++) % RandomDataSize]) verification_failed.store(true);
263+
if (item != random_data[(received_count++) % RandomDataSize]) {
264+
verification_failed.store(true);
265+
}
216266
dequeued_something = true;
217267
} else {
218268
break;
219269
}
220270
}
221-
if (!dequeued_something) std::this_thread::yield();
271+
if (!dequeued_something) {
272+
std::this_thread::yield();
273+
}
222274
}
223275
});
224276

225277
size_t total_written = 0;
226278
for (auto _ : state) {
227279
for (size_t n = 0; n < ItemsPerIteration; n += batch_size) {
228280
if (verification_failed.load(std::memory_order_relaxed)) {
229-
state.SkipWithError("Verification failed!"); consumer_should_stop.store(true); return;
281+
state.SkipWithError("Verification failed!");
282+
consumer_should_stop.store(true);
283+
return;
230284
}
231285
for (size_t i = 0; i < batch_size; ++i) {
232286
const auto& item_to_write = random_data[(total_written + i) % RandomDataSize];

0 commit comments

Comments
 (0)