Skip to content

Commit d72d675

Browse files
committed
tfbuilder: reset run counters
1 parent 4b191d6 commit d72d675

File tree

5 files changed

+41
-14
lines changed

5 files changed

+41
-14
lines changed

src/TfBuilder/TfBuilderDevice.cxx

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,12 @@ void TfBuilderDevice::PreRun()
285285
// enable monitoring
286286
DataDistMonitor::enable_datadist(DataDistLogger::sRunNumber, mPartitionId);
287287

288+
// reset tf counters and start accepting tfs
289+
// NOTE: TfBuilder accepts TFs in both READY and RUNNING state.
290+
mFlpInputHandler->reset();
291+
mRpc->reset_run_counters();
292+
mRpc->startAcceptingTfs();
293+
288294
IDDLOG("Entering running state. RunNumber: {}", DataDistLogger::sRunNumberStr);
289295

290296
mTfFwdTotalDataSize = 0;
@@ -298,6 +304,12 @@ void TfBuilderDevice::PreRun()
298304
// Get here when ConditionalRun returns false
299305
void TfBuilderDevice::PostRun()
300306
{
307+
// stop accepting tfs until the state is clear in-between runs
308+
mRpc->stopAcceptingTfs();
309+
310+
// send EOS
311+
mShouldSendEos = true;
312+
301313
// Not in Running state
302314
mInRunningState = false;
303315
DDMON("tfbuilder", "running", 0);
@@ -310,6 +322,9 @@ void TfBuilderDevice::PostRun()
310322
// disable monitoring
311323
DataDistMonitor::disable_datadist();
312324

325+
// start accepting tfs for the next run
326+
mRpc->startAcceptingTfs();
327+
313328
IDDLOG("Exiting running state. RunNumber: {}", DataDistLogger::sRunNumberStr);
314329
}
315330

@@ -334,8 +349,6 @@ void TfBuilderDevice::TfForwardThread()
334349
{
335350
using hres_clock = std::chrono::high_resolution_clock;
336351
auto lRateStartTime = hres_clock::now();
337-
std::uint64_t lTfOutCnt = 0;
338-
bool lShouldSendEos = false;
339352

340353
while (mRunning) {
341354
std::optional<std::unique_ptr<SubTimeFrame>> lTfOpt = dequeue_for(eTfFwdIn, 100ms);
@@ -350,16 +363,14 @@ void TfBuilderDevice::TfForwardThread()
350363
WDDLOG_RL(1000, "Dropping a raw TimeFrame because stop of the run is requested.");
351364
}
352365

353-
if (dplEnabled() && lShouldSendEos) {
354-
lShouldSendEos = false;
366+
// send EOS if exiting the running state
367+
if (dplEnabled() && mShouldSendEos) {
355368
mTfDplAdapter->sendEosToDpl();
369+
mShouldSendEos = false;
356370
}
357371
continue;
358372
}
359373

360-
// Make sure to send EoS if in running state
361-
lShouldSendEos = true;
362-
363374
if (lTfOpt == std::nullopt) {
364375
DDMON("tfbuilder", "data_output.rate", 0);
365376
DDMON("tfbuilder", "tf_output.sent_size", mTfFwdTotalDataSize);
@@ -386,9 +397,8 @@ void TfBuilderDevice::TfForwardThread()
386397

387398
if (!mStandalone) {
388399
try {
389-
lTfOutCnt++;
390400
IDDLOG_RL(5000, "Forwarding a new TF to DPL. tf_id={} stf_size={:d} unique_equipments={:d} total={:d}",
391-
lTfId, lTf->getDataSize(), lTf->getEquipmentIdentifiers().size(), lTfOutCnt);
401+
lTfId, lTf->getDataSize(), lTf->getEquipmentIdentifiers().size(), mTfFwdTotalTfCount);
392402

393403
if (dplEnabled()) {
394404
// adapt headers to include DPL processing header on the stack
@@ -413,8 +423,9 @@ void TfBuilderDevice::TfForwardThread()
413423
}
414424

415425
// leaving the output thread, send end of the stream info
416-
if (dplEnabled() && lShouldSendEos) {
426+
if (dplEnabled() && mShouldSendEos) {
417427
mTfDplAdapter->sendEosToDpl();
428+
mShouldSendEos = false;
418429
}
419430

420431
DDDLOG("Exiting TF forwarding thread.");

src/TfBuilder/TfBuilderDevice.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ class TfBuilderDevice : public DataDistDevice,
159159
std::atomic_bool mRunning = false; // Task initialized
160160
std::atomic_bool mInRunningState = false; // FMQ in running state
161161
std::atomic_bool mShouldExit = false;
162+
std::atomic_bool mShouldSendEos = false; // toggle in post run
162163
};
163164

164165
} /* namespace o2::DataDistribution */

src/TfBuilder/TfBuilderInput.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ class TfBuilderInput
4646

4747
bool start(std::shared_ptr<ConsulTfBuilder> pConfig);
4848
void stop(std::shared_ptr<ConsulTfBuilder> pConfig);
49+
void reset() {
50+
mReceivedData.flush();
51+
mStfsForMerging.flush();
52+
std::unique_lock<std::mutex> lQueueLock(mStfMergerQueueLock);
53+
mStfMergeMap.clear();
54+
mMaxMergedTfId = 0;
55+
}
4956

5057
void DataHandlerThread(const std::uint32_t pFlpIndex, const std::string pStfSenderId);
5158
void StfPacingThread();

src/TfBuilder/TfBuilderRpc.cxx

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,7 @@ void TfBuilderRpcImpl::stop()
9999
mStfSenderRpcClients.stop();
100100
mTfSchedulerRpcClient.stop();
101101

102-
mCurrentTfBufferSize = 0;
103-
mNumBufferedTfs = 0;
104-
mLastBuiltTfId = 0;
102+
reset_run_counters();
105103
}
106104

107105
// make sure these are sent immediately
@@ -223,7 +221,7 @@ bool TfBuilderRpcImpl::recordTfBuilt(const SubTimeFrame &pTf)
223221
}
224222
mUpdateCondition.notify_one();
225223

226-
{ // record the current TP
224+
{ // record the current TF
227225
std::unique_lock lLock(mStfDurationMapLock);
228226
mStfReqDuration.erase(lTfId);
229227
}

src/TfBuilder/TfBuilderRpc.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,16 @@ class TfBuilderRpcImpl final : public TfBuilderRpc::Service
109109

110110
::grpc::Status TerminatePartition(::grpc::ServerContext* context, const ::o2::DataDistribution::PartitionInfo* request, ::o2::DataDistribution::PartitionResponse* response) override;
111111

112+
/// reset counters on each new run
113+
void reset_run_counters() {
114+
mTfIdSizes.clear();
115+
mCurrentTfBufferSize = mBufferSize;
116+
mLastBuiltTfId = 0;
117+
mNumBufferedTfs = 0;
118+
mNumTfsInBuilding = 0;
119+
mTfBuildRequests->flush();
120+
}
121+
112122
private:
113123
std::atomic_bool mRunning = false;
114124
std::atomic_bool mTerminateRequested = false;

0 commit comments

Comments
 (0)