Skip to content

Commit

Permalink
Improving fairness of the throttler
Browse files Browse the repository at this point in the history
Summary:
WDT throttler is now used in stargate to throttle hdfs backup/download, and
application specific downloads. Rocksdb backup engine throttles 5M at a time.
However, WDT sender/receiver throttles 256K at a time. If a thread requests to
throttle more data, it gets higher percentage of the bandwidth available. For
example, Backup engine threads will get 20 time more bandwidth than WDT threads.
This sometimes causes WDT transfers to fail with NO_PROGRESS.
To solve this, any throttle request for more than 256K is broken down internally
to bunch of 256K requests. This ensures that one single thread is not able to
take too much of the bandwidth.

Reviewed By: ldemailly

Differential Revision: D4028660

fbshipit-source-id: ab65e1a9ac4861364a801083e56bf30efbb4e153
  • Loading branch information
uddipta authored and Facebook Github Bot committed Oct 19, 2016
1 parent 0e0a5cd commit 65b96ce
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 46 deletions.
49 changes: 33 additions & 16 deletions Throttler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <wdt/Throttler.h>
#include <wdt/ErrorCodes.h>
#include <wdt/WdtOptions.h>
#include <cmath>

namespace facebook {
namespace wdt {
Expand All @@ -24,17 +23,13 @@ std::shared_ptr<Throttler> Throttler::makeThrottler(const WdtOptions& options) {
double avgRateBytesPerSec = options.avg_mbytes_per_sec * kMbToB;
double peakRateBytesPerSec = options.max_mbytes_per_sec * kMbToB;
double bucketLimitBytes = options.throttler_bucket_limit * kMbToB;
return Throttler::makeThrottler(avgRateBytesPerSec, peakRateBytesPerSec,
bucketLimitBytes,
options.throttler_log_time_millis);
}

std::shared_ptr<Throttler> Throttler::makeThrottler(
double avgRateBytesPerSec, double peakRateBytesPerSec,
double bucketLimitBytes, int64_t throttlerLogTimeMillis) {
configureOptions(avgRateBytesPerSec, peakRateBytesPerSec, bucketLimitBytes);
return std::make_shared<Throttler>(avgRateBytesPerSec, peakRateBytesPerSec,
bucketLimitBytes, throttlerLogTimeMillis);
int64_t singleRequestLimit =
options.buffer_size; // Throttler limit is generally called with
// buffer_size amount of data
Throttler* throttler =
new Throttler(avgRateBytesPerSec, peakRateBytesPerSec, bucketLimitBytes,
singleRequestLimit, options.throttler_log_time_millis);
return std::shared_ptr<Throttler>(throttler);
}

void Throttler::configureOptions(double& avgRateBytesPerSec,
Expand All @@ -56,7 +51,8 @@ void Throttler::configureOptions(double& avgRateBytesPerSec,
}

Throttler::Throttler(double avgRateBytesPerSec, double peakRateBytesPerSec,
double bucketLimitBytes, int64_t throttlerLogTimeMillis)
double bucketLimitBytes, int64_t singleRequestLimit,
int64_t throttlerLogTimeMillis)
: avgRateBytesPerSec_(avgRateBytesPerSec) {
bucketRateBytesPerSec_ = peakRateBytesPerSec;
bytesTokenBucketLimit_ =
Expand All @@ -83,6 +79,8 @@ Throttler::Throttler(double avgRateBytesPerSec, double peakRateBytesPerSec,
} else {
WLOG(INFO) << "No peak rate specified";
}
WDT_CHECK_GT(singleRequestLimit, 0);
singleRequestLimit_ = singleRequestLimit;
throttlerLogTimeMillis_ = throttlerLogTimeMillis;
}

Expand Down Expand Up @@ -112,15 +110,34 @@ void Throttler::setThrottlerRates(const WdtOptions& options) {
setThrottlerRates(avgRateBytesPerSec, peakRateBytesPerSec, bucketLimitBytes);
}

void Throttler::limit(ThreadCtx& threadCtx, double deltaProgress) {
void Throttler::limit(ThreadCtx& threadCtx, int64_t deltaProgress) {
limitInternal(&threadCtx, deltaProgress);
}

void Throttler::limit(double deltaProgress) {
void Throttler::limit(int64_t deltaProgress) {
limitInternal(nullptr, deltaProgress);
}

void Throttler::limitInternal(ThreadCtx* threadCtx, double deltaProgress) {
void Throttler::limitInternal(ThreadCtx* threadCtx, int64_t deltaProgress) {
const int kLogInterval = 100;
int64_t numThrottled = 0;
int64_t count = 0;
while (numThrottled < deltaProgress) {
const int64_t toThrottle =
std::min(singleRequestLimit_, deltaProgress - numThrottled);
limitSingleRequest(threadCtx, toThrottle);
numThrottled += toThrottle;
count++;
if (count % kLogInterval == 0) {
WLOG(INFO) << "Throttling large amount data, to-throttle: "
<< deltaProgress << ", num-throttled: " << numThrottled;
}
}
}

void Throttler::limitSingleRequest(ThreadCtx* threadCtx,
int64_t deltaProgress) {
WDT_CHECK_LE(deltaProgress, singleRequestLimit_);
std::chrono::time_point<Clock> now = Clock::now();
double sleepTimeSeconds = calculateSleep(deltaProgress, now);
if (throttlerLogTimeMillis_ > 0) {
Expand Down
56 changes: 29 additions & 27 deletions Throttler.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,8 @@ namespace wdt {
*/
class Throttler {
public:
/**
* Utility method that configures the avg rate, peak rate and bucket limit
* based on the values passed to this method and returns a shared ptr to an
* instance of this throttler. It can return nullptr if throttling is off.
*/
static std::shared_ptr<Throttler> makeThrottler(
double avgRateBytesPerSec, double peakRateBytesPerSec,
double bucketLimitBytes, int64_t throttlerLogTimeMillis);

/// Utility method that makes throttler using the wdt options
/// Utility method that makes throttler using the wdt options. It can return
/// nullptr if throttling is off
static std::shared_ptr<Throttler> makeThrottler(const WdtOptions& options);

/**
Expand All @@ -48,31 +40,17 @@ class Throttler {
double& peakRateBytesPerSec,
double& bucketLimitBytes);

/**
* @param averageRateBytesPerSec Average rate in progress/second
* at which data should be transmitted
*
* @param peakRateBytesPerSec Max burst rate allowed by the
* token bucket
* @param bucketLimitBytes Max size of bucket, specify 0 for auto
* configure. In auto mode, it will be twice
* the data you send in 1/4th of a second
* at the peak rate
*/
Throttler(double avgRateBytesPerSec, double peakRateBytesPerSec,
double bucketLimitBytes, int64_t throttlerLogTimeMillis = 0);

/**
* Calls calculateSleep which is a thread safe method. Finds out the
* time thread has to sleep and makes it sleep.
* Also calls the throttler logger to log the stats
*/
virtual void limit(ThreadCtx& threadCtx, double deltaProgress);
virtual void limit(ThreadCtx& threadCtx, int64_t deltaProgress);

/**
* Same as the other limit, but without reporting for sleep duration
*/
virtual void limit(double deltaProgress);
virtual void limit(int64_t deltaProgress);

/**
* This is thread safe implementation of token bucket
Expand Down Expand Up @@ -127,6 +105,26 @@ class Throttler {
const Throttler& throttler);

private:
/**
* @param averageRateBytesPerSec Average rate in progress/second
* at which data should be transmitted
* @param peakRateBytesPerSec Max burst rate allowed by the
* token bucket
* @param bucketLimitBytes Max size of bucket, specify 0 for auto
* configure. In auto mode, it will be twice
* the data you send in 1/4th of a second
* at the peak rate
* @param singleRequestLimit Internal limit to the maximum number of
* bytes that can be throttled in one call.
* If more bytes are requested to be
* throttled, that requested gets broken down
* and it is treated as multiple throttle
* calls.
*/
Throttler(double avgRateBytesPerSec, double peakRateBytesPerSec,
double bucketLimitBytes, int64_t singleRequestLimit,
int64_t throttlerLogTimeMillis = 0);

/**
* This method is invoked repeatedly with the amount of progress made
* (e.g. number of bytes written) till now. If the total progress
Expand All @@ -136,7 +134,9 @@ class Throttler {
*/
double averageThrottler(const Clock::time_point& now);

void limitInternal(ThreadCtx* threadCtx, double deltaProgress);
void limitInternal(ThreadCtx* threadCtx, int64_t deltaProgress);

void limitSingleRequest(ThreadCtx* threadCtx, int64_t deltaProgress);

void sleep(double sleepTimeSecs) const;

Expand Down Expand Up @@ -179,6 +179,8 @@ class Throttler {
double bytesTokenBucketLimit_;
/// Rate at which bucket is filled
double bucketRateBytesPerSec_;
/// Max number of bytes that can be requested in a single call
int64_t singleRequestLimit_;
/// Interval between every print of throttler logs
int64_t throttlerLogTimeMillis_;
};
Expand Down
48 changes: 48 additions & 0 deletions test/ThrottlerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,53 @@ TEST(ThrottlerTest, RATE_CHANGE) {

throttler->endTransfer();
}

TEST(ThrottlerTest, FAIRNESS) {
WdtOptions options;
options.avg_mbytes_per_sec = 60;
options.buffer_size = 256 * 1024;
std::shared_ptr<Throttler> throttler = Throttler::makeThrottler(options);

const int numThread = 40;
const int testDurationSec = 5;
double throughputs[numThread];

auto transfer = [&](int index, int buffer) {
throttler->startTransfer();
int64_t numTransferred = 0;
auto startTime = Clock::now();
while (durationSeconds(Clock::now() - startTime) <= testDurationSec) {
throttler->limit(buffer);
numTransferred += buffer;
}
throttler->endTransfer();
throughputs[index] = numTransferred / testDurationSec / kMbToB;
};

auto getBuffer = [&](int index) -> int {
return options.buffer_size * (index + 1);
};

std::vector<std::thread> threads;
for (int i = 0; i < numThread; i++) {
std::thread t(transfer, i, getBuffer(i));
threads.emplace_back(std::move(t));
}
for (auto &t : threads) {
t.join();
}
for (int i = 0; i < numThread; i++) {
LOG(INFO) << "Buffer: " << getBuffer(i) / kMbToB
<< "MB, Throughput: " << throughputs[i];
}
const double minThroughput =
*std::min_element(throughputs, throughputs + numThread);
const double maxThroughput =
*std::max_element(throughputs, throughputs + numThread);
const double fairness = maxThroughput / minThroughput;
LOG(INFO) << "Min throughput " << minThroughput << ", max throughput "
<< maxThroughput << ", fairness " << fairness;
EXPECT_LT(fairness, 2.2);
}
}
}
6 changes: 3 additions & 3 deletions test/WdtMiscTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ TEST(BasicTest, MultiWdtSender) {
}

TEST(BasicTest, ThrottlerWithoutReporting) {
const double avgThrottlerRate = 1 * kMbToB;
shared_ptr<Throttler> throttler =
Throttler::makeThrottler(avgThrottlerRate, 0, 0, 0);
WdtOptions options;
options.avg_mbytes_per_sec = 1;
shared_ptr<Throttler> throttler = Throttler::makeThrottler(options);
const int toWrite = 2 * kMbToB;
const int blockSize = 1024;
int written = 0;
Expand Down

0 comments on commit 65b96ce

Please sign in to comment.