Skip to content

Commit 98bdcda

Browse files
author
kulikov
committed
Allow to change (reduce) max queue size for common elastic queue
Add same method as in TFastElasticQueue, and test. commit_hash:0a2b618325e57c32fd269254a7dbe912849c3f10
1 parent 46769f2 commit 98bdcda

2 files changed

Lines changed: 9 additions & 1 deletion

File tree

library/cpp/threading/equeue/equeue.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ size_t TElasticQueue::ObjectCount() const {
1010
}
1111

1212
bool TElasticQueue::TryIncCounter() {
13-
if (++GuardCount_ > MaxQueueSize_) {
13+
if (++GuardCount_ > CurrentMaxQueueSize_) {
1414
--GuardCount_;
1515
return false;
1616
}
@@ -68,6 +68,7 @@ bool TElasticQueue::Add(IObjectInQueue* obj) {
6868

6969
void TElasticQueue::Start(size_t threadCount, size_t maxQueueSize) {
7070
MaxQueueSize_ = maxQueueSize;
71+
CurrentMaxQueueSize_ = maxQueueSize;
7172
SlaveQueue_->Start(threadCount, maxQueueSize);
7273
}
7374

library/cpp/threading/equeue/equeue.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,20 @@ class TElasticQueue: public IThreadPool {
1717
void Stop() noexcept override;
1818

1919
size_t ObjectCount() const;
20+
21+
void SetCurrentMaxQueueSize(size_t v) {
22+
Y_ENSURE(v <= MaxQueueSize_);
23+
CurrentMaxQueueSize_ = v;
24+
}
2025
private:
2126
class TDecrementingWrapper;
2227

2328
bool TryIncCounter();
2429
private:
2530
THolder<IThreadPool> SlaveQueue_;
31+
2632
size_t MaxQueueSize_ = 0;
33+
std::atomic<size_t> CurrentMaxQueueSize_ = 0;
2734
std::atomic<size_t> ObjectCount_ = 0;
2835
std::atomic<size_t> GuardCount_ = 0;
2936
};

0 commit comments

Comments
 (0)