Skip to content

Commit f906546

Browse files
committed
DPL: Use X9 to make AsyncQueue atomic
1 parent c5855d8 commit f906546

3 files changed

Lines changed: 44 additions & 1 deletion

File tree

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ o2_add_library(Framework
163163
O2::Headers
164164
O2::MemoryResources
165165
O2::PCG
166+
O2::X9
166167
RapidJSON::RapidJSON
167168
Arrow::arrow_shared
168169
Microsoft.GSL::GSL

Framework/Core/include/Framework/AsyncQueue.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
#include <string>
1616
#include <vector>
1717

18+
typedef struct x9_inbox_internal x9_inbox;
19+
typedef struct x9_node_internal x9_node;
20+
1821
namespace o2::framework
1922
{
2023

@@ -89,6 +92,12 @@ struct AsyncQueue {
8992
std::vector<AsyncTaskSpec> prototypes;
9093
std::vector<AsyncTask> tasks;
9194
size_t iteration = 0;
95+
96+
std::atomic<bool> first = true;
97+
98+
// Inbox for the message queue used to append
99+
// tasks to this queue.
100+
x9_inbox* inbox = nullptr;
92101
AsyncQueue();
93102
};
94103

@@ -104,6 +113,8 @@ struct AsyncQueueHelpers {
104113
/// 3. only execute the highest (timeslice, debounce) value
105114
static void run(AsyncQueue& queue, TimesliceId oldestPossibleTimeslice);
106115

116+
// Flush tasks which were posted but not yet committed to the queue
117+
static void flushPending(AsyncQueue& queue);
107118
/// Reset the queue to its initial state
108119
static void reset(AsyncQueue& queue);
109120
};

Framework/Core/src/AsyncQueue.cxx

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,17 @@
1111

1212
#include "Framework/AsyncQueue.h"
1313
#include "Framework/Signpost.h"
14+
#include "x9.h"
1415
#include <numeric>
1516

1617
O2_DECLARE_DYNAMIC_LOG(async_queue);
1718

1819
namespace o2::framework
1920
{
2021
AsyncQueue::AsyncQueue()
22+
: inbox(x9_create_inbox(16, "async_queue", sizeof(AsyncTask)))
2123
{
24+
this->inbox = x9_create_inbox(16, "async_queue", sizeof(AsyncTask));
2225
}
2326

2427
auto AsyncQueueHelpers::create(AsyncQueue& queue, AsyncTaskSpec spec) -> AsyncTaskId
@@ -31,11 +34,39 @@ auto AsyncQueueHelpers::create(AsyncQueue& queue, AsyncTaskSpec spec) -> AsyncTa
3134

3235
auto AsyncQueueHelpers::post(AsyncQueue& queue, AsyncTask const& task) -> void
3336
{
34-
queue.tasks.push_back(task);
37+
// Until we do not manage to write to the inbox, keep removing
38+
// items from the queue if you are the first one which fails to
39+
// write.
40+
while (!x9_write_to_inbox(queue.inbox, sizeof(AsyncTask), &task)) {
41+
AsyncQueueHelpers::flushPending(queue);
42+
}
43+
}
44+
45+
auto AsyncQueueHelpers::flushPending(AsyncQueue& queue) -> void
46+
{
47+
bool isFirst = true;
48+
if (!std::atomic_compare_exchange_strong(&queue.first, &isFirst, false)) {
49+
// Not the first, try again.
50+
return;
51+
}
52+
// First thread which does not manage to write to the queue.
53+
// Flush it a bit before we try again.
54+
AsyncTask toFlush;
55+
// This potentially stalls if the inserting tasks are faster to insert
56+
// than we are to retrieve. We should probably have a cut-off
57+
while (x9_read_from_inbox(queue.inbox, sizeof(AsyncTask), &toFlush)) {
58+
queue.tasks.push_back(toFlush);
59+
}
60+
queue.first = true;
3561
}
3662

3763
auto AsyncQueueHelpers::run(AsyncQueue& queue, TimesliceId oldestPossible) -> void
3864
{
65+
// We synchronize right before we run to get as many
66+
// tasks as possible. Notice we might still miss some
67+
// which will have to handled on a subsequent iteration.
68+
AsyncQueueHelpers::flushPending(queue);
69+
3970
if (queue.tasks.empty()) {
4071
return;
4172
}

0 commit comments

Comments
 (0)