Skip to content

Commit

Permalink
Improving wdt throttler api for stand-alone use
Browse files Browse the repository at this point in the history
Summary:
WDT throttler takes a thread-ctx object in the limit method. Creating another
limit api without the thread-ctx object.
Also, in stargate ensuring that we register the transfer with throttler before
calling fallbackDownload

Reviewed By: ldemailly

Differential Revision: D3483351

fbshipit-source-id: 958c533b1b3b597aa192e267e37bb9725bec1c47
  • Loading branch information
uddipta authored and Facebook Github Bot committed Jun 24, 2016
1 parent beb9c91 commit a4c5586
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 19 deletions.
4 changes: 2 additions & 2 deletions Receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ void Receiver::startNewGlobalSession(const std::string &peerIp) {
// in the throttler. This is guranteed to work in either of the
// modes long running or not. We will de register from the throttler
// when the current session ends
throttler_->registerTransfer();
throttler_->startTransfer();
}
startTime_ = Clock::now();
if (options_.enable_download_resumption) {
Expand All @@ -109,7 +109,7 @@ void Receiver::endCurGlobalSession() {
}
WLOG(INFO) << "Ending the transfer " << getTransferId();
if (throttler_) {
throttler_->deRegisterTransfer();
throttler_->endTransfer();
}
checkpoints_.clear();
if (fileCreator_) {
Expand Down
4 changes: 2 additions & 2 deletions Sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ void Sender::endCurTransfer() {
<< getTransferId();
setTransferStatus(FINISHED);
if (throttler_) {
throttler_->deRegisterTransfer();
throttler_->endTransfer();
}
}

void Sender::startNewTransfer() {
if (throttler_) {
throttler_->registerTransfer();
throttler_->startTransfer();
}
WLOG(INFO) << "Starting a new transfer " << getTransferId() << " to "
<< destHost_;
Expand Down
31 changes: 23 additions & 8 deletions Throttler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,33 @@ void Throttler::setThrottlerRates(const WdtOptions& options) {
}

void Throttler::limit(ThreadCtx& threadCtx, double deltaProgress) {
// now should be before taking the lock
limitInternal(&threadCtx, deltaProgress);
}

void Throttler::limit(double deltaProgress) {
limitInternal(nullptr, deltaProgress);
}

void Throttler::limitInternal(ThreadCtx* threadCtx, double deltaProgress) {
std::chrono::time_point<Clock> now = Clock::now();
double sleepTimeSeconds = calculateSleep(deltaProgress, now);
if (throttlerLogTimeMillis_ > 0) {
printPeriodicLogs(now, deltaProgress);
}
if (sleepTimeSeconds > 0) {
/* sleep override */
PerfStatCollector statCollector(threadCtx, PerfStatReport::THROTTLER_SLEEP);
std::this_thread::sleep_for(
std::chrono::duration<double>(sleepTimeSeconds));
if (sleepTimeSeconds <= 0) {
return;
}
if (threadCtx == nullptr) {
sleep(sleepTimeSeconds);
return;
}
PerfStatCollector statCollector(*threadCtx, PerfStatReport::THROTTLER_SLEEP);
sleep(sleepTimeSeconds);
}

void Throttler::sleep(double sleepTimeSecs) const {
/* sleep override */
std::this_thread::sleep_for(std::chrono::duration<double>(sleepTimeSecs));
}

double Throttler::calculateSleep(double deltaProgress,
Expand Down Expand Up @@ -218,7 +233,7 @@ double Throttler::averageThrottler(const Clock::time_point& now) {
return -1;
}

void Throttler::registerTransfer() {
void Throttler::startTransfer() {
folly::SpinLockGuard lock(throttlerMutex_);
if (refCount_ == 0) {
startTime_ = Clock::now();
Expand All @@ -231,7 +246,7 @@ void Throttler::registerTransfer() {
refCount_++;
}

void Throttler::deRegisterTransfer() {
void Throttler::endTransfer() {
folly::SpinLockGuard lock(throttlerMutex_);
WDT_CHECK(refCount_ > 0);
refCount_--;
Expand Down
13 changes: 11 additions & 2 deletions Throttler.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ class Throttler {
*/
virtual void limit(ThreadCtx& threadCtx, double deltaProgress);

/**
* Same as the other limit, but without reporting for sleep duration
*/
virtual void limit(double deltaProgress);

/**
* This is thread safe implementation of token bucket
* algorithm. Bucket is filled at the rate of bucketRateBytesPerSec_
Expand All @@ -95,10 +100,10 @@ class Throttler {

/// Anyone who is using the throttler should call this method to maintain
/// the refCount_ and startTime_ correctly
void registerTransfer();
void startTransfer();

/// Method to de-register the transfer and decrement the refCount_
void deRegisterTransfer();
void endTransfer();

/// Get the average rate in bytes per sec
double getAvgRateBytesPerSec();
Expand Down Expand Up @@ -131,6 +136,10 @@ class Throttler {
*/
double averageThrottler(const Clock::time_point& now);

void limitInternal(ThreadCtx* threadCtx, double deltaProgress);

void sleep(double sleepTimeSecs) const;

/**
* This method periodically prints logs.
* The period is defined by FLAGS_peak_log_time_ms
Expand Down
2 changes: 1 addition & 1 deletion Wdt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ ErrorCode Wdt::initializeWdtInternal(const std::string &appName) {
}
appName_ = appName;
initDone_ = true;
resourceController_.getThrottler()->setThrottlerRates(options_);
resourceController_.getWdtThrottler()->setThrottlerRates(options_);
return OK;
}

Expand Down
6 changes: 3 additions & 3 deletions WdtResourceController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ ErrorCode WdtNamespaceController::createReceiver(
return QUOTA_EXCEEDED;
}
receiver = make_shared<Receiver>(request);
receiver->setThrottler(parent_->getThrottler());
receiver->setThrottler(parent_->getWdtThrottler());
receiver->setWdtOptions(parent_->getOptions());
receiversMap_[identifier] = receiver;
++numReceivers_;
Expand Down Expand Up @@ -108,7 +108,7 @@ ErrorCode WdtNamespaceController::createSender(
return QUOTA_EXCEEDED;
}
sender = make_shared<Sender>(request);
sender->setThrottler(parent_->getThrottler());
sender->setThrottler(parent_->getWdtThrottler());
sender->setWdtOptions(parent_->getOptions());
sendersMap_[identifier] = sender;
++numSenders_;
Expand Down Expand Up @@ -663,7 +663,7 @@ void WdtResourceController::updateMaxSendersLimit(
}
}

std::shared_ptr<Throttler> WdtResourceController::getThrottler() const {
std::shared_ptr<Throttler> WdtResourceController::getWdtThrottler() const {
return throttler_;
}

Expand Down
9 changes: 8 additions & 1 deletion WdtResourceController.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,15 @@ class WdtResourceController : public WdtControllerBase {
* getter for throttler.
* setThrottlerRates to this throttler may not take effect. Instead, update
* WdtOptions accordingly.
* Applications have to register transfers with the throttler, and at the end
* de-register it. For example-
* throttler->startTransfer();
* ...
* throttler->limit(numBytes);
* ...
* throttler->endTransfer();
*/
std::shared_ptr<Throttler> getThrottler() const;
std::shared_ptr<Throttler> getWdtThrottler() const;

const WdtOptions &getOptions() const;

Expand Down
20 changes: 20 additions & 0 deletions test/WdtMiscTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,26 @@ TEST(BasicTest, MultiWdtSender) {
files::FileUtil::removeAll(srcDir);
files::FileUtil::removeAll(targetDir);
}

TEST(BasicTest, THROTTLER_WITHOUT_REPORTING) {
const double avgThrottlerRate = 1 * kMbToB;
shared_ptr<Throttler> throttler =
Throttler::makeThrottler(avgThrottlerRate, 0, 0, 0);
const int toWrite = 2 * kMbToB;
const int blockSize = 1024;
int written = 0;
throttler->startTransfer();
const auto startTime = Clock::now();
while (written < toWrite) {
throttler->limit(blockSize);
written += blockSize;
}
const auto endTime = Clock::now();
throttler->endTransfer();
int durationMs = durationMillis(endTime - startTime);
EXPECT_GT(durationMs, 1900);
EXPECT_LT(durationMs, 2200);
}
}
} // namespace end

Expand Down

0 comments on commit a4c5586

Please sign in to comment.