From 65b96ce3d7ed33f13d01c0cef02aeb2f428ce6eb Mon Sep 17 00:00:00 2001 From: Uddipta Maity Date: Tue, 18 Oct 2016 17:52:41 -0700 Subject: [PATCH] Improving fairness of the throttler Summary: WDT throttler is now used in stargate to throttle hdfs backup/download, and application specific downloads. Rocksdb backup engine throttles 5M at a time. However, WDT sender/receiver throttles 256K at a time. If a thread requests to throttle more data, it gets higher percentage of the bandwidth available. For example, Backup engine threads will get 20 time more bandwidth than WDT threads. This sometimes causes WDT transfers to fail with NO_PROGRESS. To solve this, any throttle request for more than 256K is broken down internally to bunch of 256K requests. This ensures that one single thread is not able to take too much of the bandwidth. Reviewed By: ldemailly Differential Revision: D4028660 fbshipit-source-id: ab65e1a9ac4861364a801083e56bf30efbb4e153 --- Throttler.cpp | 49 ++++++++++++++++++++++++------------ Throttler.h | 56 ++++++++++++++++++++++-------------------- test/ThrottlerTest.cpp | 48 ++++++++++++++++++++++++++++++++++++ test/WdtMiscTests.cpp | 6 ++--- 4 files changed, 113 insertions(+), 46 deletions(-) diff --git a/Throttler.cpp b/Throttler.cpp index 0a6d11e5..26cccb8d 100644 --- a/Throttler.cpp +++ b/Throttler.cpp @@ -9,7 +9,6 @@ #include #include #include -#include namespace facebook { namespace wdt { @@ -24,17 +23,13 @@ std::shared_ptr Throttler::makeThrottler(const WdtOptions& options) { double avgRateBytesPerSec = options.avg_mbytes_per_sec * kMbToB; double peakRateBytesPerSec = options.max_mbytes_per_sec * kMbToB; double bucketLimitBytes = options.throttler_bucket_limit * kMbToB; - return Throttler::makeThrottler(avgRateBytesPerSec, peakRateBytesPerSec, - bucketLimitBytes, - options.throttler_log_time_millis); -} - -std::shared_ptr Throttler::makeThrottler( - double avgRateBytesPerSec, double peakRateBytesPerSec, - double bucketLimitBytes, int64_t throttlerLogTimeMillis) { - configureOptions(avgRateBytesPerSec, peakRateBytesPerSec, bucketLimitBytes); - return std::make_shared(avgRateBytesPerSec, peakRateBytesPerSec, - bucketLimitBytes, throttlerLogTimeMillis); + int64_t singleRequestLimit = + options.buffer_size; // Throttler limit is generally called with + // buffer_size amount of data + Throttler* throttler = + new Throttler(avgRateBytesPerSec, peakRateBytesPerSec, bucketLimitBytes, + singleRequestLimit, options.throttler_log_time_millis); + return std::shared_ptr(throttler); } void Throttler::configureOptions(double& avgRateBytesPerSec, @@ -56,7 +51,8 @@ void Throttler::configureOptions(double& avgRateBytesPerSec, } Throttler::Throttler(double avgRateBytesPerSec, double peakRateBytesPerSec, - double bucketLimitBytes, int64_t throttlerLogTimeMillis) + double bucketLimitBytes, int64_t singleRequestLimit, + int64_t throttlerLogTimeMillis) : avgRateBytesPerSec_(avgRateBytesPerSec) { bucketRateBytesPerSec_ = peakRateBytesPerSec; bytesTokenBucketLimit_ = @@ -83,6 +79,8 @@ Throttler::Throttler(double avgRateBytesPerSec, double peakRateBytesPerSec, } else { WLOG(INFO) << "No peak rate specified"; } + WDT_CHECK_GT(singleRequestLimit, 0); + singleRequestLimit_ = singleRequestLimit; throttlerLogTimeMillis_ = throttlerLogTimeMillis; } @@ -112,15 +110,34 @@ void Throttler::setThrottlerRates(const WdtOptions& options) { setThrottlerRates(avgRateBytesPerSec, peakRateBytesPerSec, bucketLimitBytes); } -void Throttler::limit(ThreadCtx& threadCtx, double deltaProgress) { +void Throttler::limit(ThreadCtx& threadCtx, int64_t deltaProgress) { limitInternal(&threadCtx, deltaProgress); } -void Throttler::limit(double deltaProgress) { +void Throttler::limit(int64_t deltaProgress) { limitInternal(nullptr, deltaProgress); } -void Throttler::limitInternal(ThreadCtx* threadCtx, double deltaProgress) { +void Throttler::limitInternal(ThreadCtx* threadCtx, int64_t deltaProgress) { + const int kLogInterval = 100; + int64_t numThrottled = 0; + int64_t count = 0; + while (numThrottled < deltaProgress) { + const int64_t toThrottle = + std::min(singleRequestLimit_, deltaProgress - numThrottled); + limitSingleRequest(threadCtx, toThrottle); + numThrottled += toThrottle; + count++; + if (count % kLogInterval == 0) { + WLOG(INFO) << "Throttling large amount data, to-throttle: " + << deltaProgress << ", num-throttled: " << numThrottled; + } + } +} + +void Throttler::limitSingleRequest(ThreadCtx* threadCtx, + int64_t deltaProgress) { + WDT_CHECK_LE(deltaProgress, singleRequestLimit_); std::chrono::time_point now = Clock::now(); double sleepTimeSeconds = calculateSleep(deltaProgress, now); if (throttlerLogTimeMillis_ > 0) { diff --git a/Throttler.h b/Throttler.h index ada14a70..7145da9c 100644 --- a/Throttler.h +++ b/Throttler.h @@ -28,16 +28,8 @@ namespace wdt { */ class Throttler { public: - /** - * Utility method that configures the avg rate, peak rate and bucket limit - * based on the values passed to this method and returns a shared ptr to an - * instance of this throttler. It can return nullptr if throttling is off. - */ - static std::shared_ptr makeThrottler( - double avgRateBytesPerSec, double peakRateBytesPerSec, - double bucketLimitBytes, int64_t throttlerLogTimeMillis); - - /// Utility method that makes throttler using the wdt options + /// Utility method that makes throttler using the wdt options. It can return + /// nullptr if throttling is off static std::shared_ptr makeThrottler(const WdtOptions& options); /** @@ -48,31 +40,17 @@ class Throttler { double& peakRateBytesPerSec, double& bucketLimitBytes); - /** - * @param averageRateBytesPerSec Average rate in progress/second - * at which data should be transmitted - * - * @param peakRateBytesPerSec Max burst rate allowed by the - * token bucket - * @param bucketLimitBytes Max size of bucket, specify 0 for auto - * configure. In auto mode, it will be twice - * the data you send in 1/4th of a second - * at the peak rate - */ - Throttler(double avgRateBytesPerSec, double peakRateBytesPerSec, - double bucketLimitBytes, int64_t throttlerLogTimeMillis = 0); - /** * Calls calculateSleep which is a thread safe method. Finds out the * time thread has to sleep and makes it sleep. * Also calls the throttler logger to log the stats */ - virtual void limit(ThreadCtx& threadCtx, double deltaProgress); + virtual void limit(ThreadCtx& threadCtx, int64_t deltaProgress); /** * Same as the other limit, but without reporting for sleep duration */ - virtual void limit(double deltaProgress); + virtual void limit(int64_t deltaProgress); /** * This is thread safe implementation of token bucket @@ -127,6 +105,26 @@ class Throttler { const Throttler& throttler); private: + /** + * @param averageRateBytesPerSec Average rate in progress/second + * at which data should be transmitted + * @param peakRateBytesPerSec Max burst rate allowed by the + * token bucket + * @param bucketLimitBytes Max size of bucket, specify 0 for auto + * configure. In auto mode, it will be twice + * the data you send in 1/4th of a second + * at the peak rate + * @param singleRequestLimit Internal limit to the maximum number of + * bytes that can be throttled in one call. + * If more bytes are requested to be + * throttled, that requested gets broken down + * and it is treated as multiple throttle + * calls. + */ + Throttler(double avgRateBytesPerSec, double peakRateBytesPerSec, + double bucketLimitBytes, int64_t singleRequestLimit, + int64_t throttlerLogTimeMillis = 0); + /** * This method is invoked repeatedly with the amount of progress made * (e.g. number of bytes written) till now. If the total progress @@ -136,7 +134,9 @@ class Throttler { */ double averageThrottler(const Clock::time_point& now); - void limitInternal(ThreadCtx* threadCtx, double deltaProgress); + void limitInternal(ThreadCtx* threadCtx, int64_t deltaProgress); + + void limitSingleRequest(ThreadCtx* threadCtx, int64_t deltaProgress); void sleep(double sleepTimeSecs) const; @@ -179,6 +179,8 @@ class Throttler { double bytesTokenBucketLimit_; /// Rate at which bucket is filled double bucketRateBytesPerSec_; + /// Max number of bytes that can be requested in a single call + int64_t singleRequestLimit_; /// Interval between every print of throttler logs int64_t throttlerLogTimeMillis_; }; diff --git a/test/ThrottlerTest.cpp b/test/ThrottlerTest.cpp index 6bed3406..8928dcc2 100644 --- a/test/ThrottlerTest.cpp +++ b/test/ThrottlerTest.cpp @@ -41,5 +41,53 @@ TEST(ThrottlerTest, RATE_CHANGE) { throttler->endTransfer(); } + +TEST(ThrottlerTest, FAIRNESS) { + WdtOptions options; + options.avg_mbytes_per_sec = 60; + options.buffer_size = 256 * 1024; + std::shared_ptr throttler = Throttler::makeThrottler(options); + + const int numThread = 40; + const int testDurationSec = 5; + double throughputs[numThread]; + + auto transfer = [&](int index, int buffer) { + throttler->startTransfer(); + int64_t numTransferred = 0; + auto startTime = Clock::now(); + while (durationSeconds(Clock::now() - startTime) <= testDurationSec) { + throttler->limit(buffer); + numTransferred += buffer; + } + throttler->endTransfer(); + throughputs[index] = numTransferred / testDurationSec / kMbToB; + }; + + auto getBuffer = [&](int index) -> int { + return options.buffer_size * (index + 1); + }; + + std::vector threads; + for (int i = 0; i < numThread; i++) { + std::thread t(transfer, i, getBuffer(i)); + threads.emplace_back(std::move(t)); + } + for (auto &t : threads) { + t.join(); + } + for (int i = 0; i < numThread; i++) { + LOG(INFO) << "Buffer: " << getBuffer(i) / kMbToB + << "MB, Throughput: " << throughputs[i]; + } + const double minThroughput = + *std::min_element(throughputs, throughputs + numThread); + const double maxThroughput = + *std::max_element(throughputs, throughputs + numThread); + const double fairness = maxThroughput / minThroughput; + LOG(INFO) << "Min throughput " << minThroughput << ", max throughput " + << maxThroughput << ", fairness " << fairness; + EXPECT_LT(fairness, 2.2); +} } } diff --git a/test/WdtMiscTests.cpp b/test/WdtMiscTests.cpp index 4761a2d0..f3ca6c21 100644 --- a/test/WdtMiscTests.cpp +++ b/test/WdtMiscTests.cpp @@ -89,9 +89,9 @@ TEST(BasicTest, MultiWdtSender) { } TEST(BasicTest, ThrottlerWithoutReporting) { - const double avgThrottlerRate = 1 * kMbToB; - shared_ptr throttler = - Throttler::makeThrottler(avgThrottlerRate, 0, 0, 0); + WdtOptions options; + options.avg_mbytes_per_sec = 1; + shared_ptr throttler = Throttler::makeThrottler(options); const int toWrite = 2 * kMbToB; const int blockSize = 1024; int written = 0;