1 Feb 2012 16:54
[RFC PATCH 3/6] common/Throttle: throttle in FIFO order
Jim Schutt <jaschut <at> sandia.gov>
2012-02-01 15:54:25 GMT
2012-02-01 15:54:25 GMT
Under heavy write load from many clients, many reader threads will
be waiting in the policy throttler, all on a single condition variable.
When a wakeup is signalled, any of those threads may receive the
signal. This increases the variance in the message processing
latency, and in extreme cases can significantly delay a message.
This patch causes threads to exit a throttler in the same order
they entered.
Signed-off-by: Jim Schutt <jaschut <at> sandia.gov>
---
src/common/Throttle.h | 42 ++++++++++++++++++++++++++++--------------
1 files changed, 28 insertions(+), 14 deletions(-)
diff --git a/src/common/Throttle.h b/src/common/Throttle.h
index 10560bf..ca72060 100644
--- a/src/common/Throttle.h
+++ b/src/common/Throttle.h
<at> <at> -3,23 +3,31 <at> <at>
#include "Mutex.h"
#include "Cond.h"
+#include <list>
class Throttle {
- int64_t count, max, waiting;
+ int64_t count, max;
uint64_t sseq, wseq;
Mutex lock;
- Cond cond;
+ list<Cond*> cond;
public:
- Throttle(int64_t m = 0) : count(0), max(m), waiting(0), sseq(0), wseq(0),
+ Throttle(int64_t m = 0) : count(0), max(m), sseq(0), wseq(0),
lock("Throttle::lock") {
assert(m >= 0);
}
+ ~Throttle() {
+ while (!cond.empty()) {
+ Cond *cv = cond.front();
+ delete cv;
+ cond.pop_front();
+ }
+ }
private:
void _reset_max(int64_t m) {
- if (m < max)
- cond.SignalOne();
+ if (m < max && !cond.empty())
+ cond.front()->SignalOne();
max = m;
}
bool _should_wait(int64_t c) {
<at> <at> -28,19 +36,24 <at> <at> private:
((c < max && count + c > max) || // normally stay under max
(c >= max && count > max)); // except for large c
}
+
bool _wait(int64_t c) {
bool waited = false;
- if (_should_wait(c)) {
- waiting += c;
+ if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
+ Cond *cv = new Cond;
+ cond.push_back(cv);
do {
+ if (cv != cond.front())
+ cond.front()->SignalOne(); // wake up the oldest.
waited = true;
- cond.Wait(lock);
- } while (_should_wait(c));
- waiting -= c;
+ cv->Wait(lock);
+ } while (_should_wait(c) || cv != cond.front());
+ delete cv;
+ cond.pop_front();
// wake up the next guy
- if (waiting)
- cond.SignalOne();
+ if (!cond.empty())
+ cond.front()->SignalOne();
}
return waited;
}
<at> <at> -101,7 +114,7 <at> <at> public:
bool get_or_fail(int64_t c = 1) {
assert (c >= 0);
Mutex::Locker l(lock);
- if (_should_wait(c)) return false;
+ if (_should_wait(c) || !cond.empty()) return false;
count += c;
return true;
}
<at> <at> -110,7 +123,8 <at> <at> public:
assert(c >= 0);
Mutex::Locker l(lock);
if (c) {
- cond.SignalOne();
+ if (!cond.empty())
+ cond.front()->SignalOne();
count -= c;
assert(count >= 0); //if count goes negative, we failed somewhere!
}
--
--
1.7.1
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo <at> vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
RSS Feed