Skip to content

Commit

Permalink
download_resume sender/receiver handshake
Browse files Browse the repository at this point in the history
Summary:
the diff ensures that wdt receiver imposes its download_resume
(if = true) on the sender.  it also makes sure that download_resume with
delete_extra_files=true, the set of files are kept identical on the sender
and receiver.

Reviewed By: ldemailly

Differential Revision: D4251809

fbshipit-source-id: 725c3ef02f2a10339223aca9958703ad49ae3e3c
  • Loading branch information
nedelchev authored and Facebook Github Bot committed Dec 3, 2016
1 parent c4c8fc6 commit 143fcf5
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 9 deletions.
4 changes: 4 additions & 0 deletions Receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ const WdtTransferRequest &Receiver::init() {
// This creates the destination directory (which is needed for transferLogMgr)
fileCreator_.reset(new FileCreator(
getDirectory(), numThreads, *transferLogManager_, options_.skip_writes));

transferRequest_.downloadResumptionEnabled =
options_.enable_download_resumption;

// Make sure we can get the lock on the transfer log manager early
// so if we can't we don't generate a valid but useless url and end up
// starting a sender doomed to fail
Expand Down
14 changes: 11 additions & 3 deletions ReceiverThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,18 @@ ReceiverState ReceiverThread::processFileCmd() {
threadStats_.addEffectiveBytes(headerBytes, writer.getTotalWritten());
}
});
if (writer.open() != OK) {
threadStats_.setLocalErrorCode(FILE_WRITE_ERROR);
return SEND_ABORT_CMD;

// writer.open() deletes files if status == TO_BE_DELETED
// therefore if !(!delete_extra_files && status == TO_BE_DELETED)
// we should skip writer.open() call altogether
if (options_.delete_extra_files ||
blockDetails.allocationStatus != TO_BE_DELETED) {
if (writer.open() != OK) {
threadStats_.setLocalErrorCode(FILE_WRITE_ERROR);
return SEND_ABORT_CMD;
}
}

int32_t checksum = 0;
int64_t remainingData = numRead_ + oldOffset_ - off_;
int64_t toWrite = remainingData;
Expand Down
7 changes: 5 additions & 2 deletions Sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,10 @@ ErrorCode Sender::start() {
WLOG(INFO) << "Client (sending) to " << getDestination() << ", Using ports [ "
<< transferRequest_.ports << "]";
startTime_ = Clock::now();
downloadResumptionEnabled_ = options_.enable_download_resumption;
downloadResumptionEnabled_ = (transferRequest_.downloadResumptionEnabled ||
options_.enable_download_resumption);
bool deleteExtraFiles = (transferRequest_.downloadResumptionEnabled ||
options_.delete_extra_files);
if (!progressReporter_) {
WVLOG(1) << "No progress reporter provided, making a default one";
progressReporter_ = std::make_unique<ProgressReporter>(transferRequest_);
Expand All @@ -331,7 +334,7 @@ ErrorCode Sender::start() {
// TODO: fix this ! use transferRequest! (and dup from Receiver)
senderThreads_ = threadsController_->makeThreads<Sender, SenderThread>(
this, transferRequest_.ports.size(), transferRequest_.ports);
if (downloadResumptionEnabled_ && options_.delete_extra_files) {
if (downloadResumptionEnabled_ && deleteExtraFiles) {
if (getProtocolVersion() >= Protocol::DELETE_CMD_VERSION) {
dirQueue_->enableFileDeletion();
} else {
Expand Down
16 changes: 16 additions & 0 deletions WdtTransferRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ const string WdtTransferRequest::NUM_PORTS_PARAM{"num_ports"};
const string WdtTransferRequest::ENCRYPTION_PARAM{"enc"};
const string WdtTransferRequest::NAMESPACE_PARAM{"ns"};
const string WdtTransferRequest::DEST_IDENTIFIER_PARAM{"dstid"};
const string WdtTransferRequest::DOWNLOAD_RESUMPTION_PARAM{"dr"};

WdtTransferRequest::WdtTransferRequest(int startPort, int numPorts,
const string& directory) {
Expand Down Expand Up @@ -312,6 +313,17 @@ WdtTransferRequest::WdtTransferRequest(const string& uriString) {
errorCode = getMoreInterestingError(code, errorCode);
}
}
string downloadResume = wdtUri.getQueryParam(DOWNLOAD_RESUMPTION_PARAM);
try {
if (!downloadResume.empty()) {
downloadResumptionEnabled = folly::to<bool>(downloadResume);
}
} catch (std::exception& e) {
WLOG(ERROR) << "Error parsing download resume " << downloadResume << " "
<< e.what();
errorCode = URI_PARSE_ERROR;
}

const string recpv = wdtUri.getQueryParam(RECEIVER_PROTOCOL_VERSION_PARAM);
if (recpv.empty()) {
WLOG(WARNING) << RECEIVER_PROTOCOL_VERSION_PARAM << " not specified in URI";
Expand Down Expand Up @@ -402,6 +414,10 @@ string WdtTransferRequest::generateUrlInternal(bool genFull,
wdtUri.setQueryParam(DEST_IDENTIFIER_PARAM, destIdentifier);
wdtUri.setQueryParam(RECEIVER_PROTOCOL_VERSION_PARAM,
folly::to<string>(protocolVersion));
if (downloadResumptionEnabled) {
wdtUri.setQueryParam(DOWNLOAD_RESUMPTION_PARAM,
folly::to<string>(downloadResumptionEnabled));
}
serializePorts(wdtUri);
if (genFull) {
wdtUri.setQueryParam(DIRECTORY_PARAM, directory);
Expand Down
5 changes: 5 additions & 0 deletions WdtTransferRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ struct WdtTransferRequest {
/// Use fileInfo even if empty (don't use the directory exploring)
bool disableDirectoryTraversal{false};

// download resumption is enabled on the receiver side
// and is requested from the sender
bool downloadResumptionEnabled{false};

/// Any error associated with this transfer request upon processing
ErrorCode errorCode{OK};

Expand Down Expand Up @@ -230,6 +234,7 @@ struct WdtTransferRequest {
const static std::string ENCRYPTION_PARAM;
const static std::string NAMESPACE_PARAM;
const static std::string DEST_IDENTIFIER_PARAM;
const static std::string DOWNLOAD_RESUMPTION_PARAM;

/// Get ports vector from startPort and numPorts
static std::vector<int32_t> genPortsVector(int32_t startPort,
Expand Down
30 changes: 30 additions & 0 deletions test/WdtUrlTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,36 @@ TEST(RequestSerializationTest, UrlTests) {
auto retTransferRequest = receiver.init();
EXPECT_EQ(retTransferRequest.errorCode, ERROR);
}
{
string uri = "wdt://localhost?dr=1";
WdtTransferRequest transferRequest(uri);
ASSERT_TRUE(transferRequest.downloadResumptionEnabled);
}
{
string uri = "wdt://localhost?dr=0";
WdtTransferRequest transferRequest(uri);
ASSERT_TRUE(!transferRequest.downloadResumptionEnabled);
}
{
string uri = "wdt://localhost?jimmy=1";
WdtTransferRequest transferRequest(uri);
ASSERT_TRUE(!transferRequest.downloadResumptionEnabled);
}
{
for (int i = 0; i < 2; i++) {
WdtTransferRequest transferRequest1(55000, 1, "");
transferRequest1.hostName = "localhost";
transferRequest1.downloadResumptionEnabled = (bool)i;
WLOG(INFO) << "Download Resumption="
<< transferRequest1.downloadResumptionEnabled;
string serializedString = transferRequest1.genWdtUrlWithSecret();
WLOG(INFO) << "serialized string=" << serializedString;
WdtTransferRequest transferRequest(serializedString);
Receiver receiver(transferRequest);
transferRequest = receiver.init();
EXPECT_EQ(transferRequest.downloadResumptionEnabled, (bool)i);
}
}
}

TEST(RequestSerializationTest, TransferIdGenerationTest) {
Expand Down
14 changes: 10 additions & 4 deletions test/common_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import os
import hashlib
import difflib
Expand Down Expand Up @@ -252,6 +248,14 @@ def get_source_dir():
return root_dir + "/src/"


def get_dest_dir():
return os.path.join(root_dir, "dst{0}".format(test_count))


def get_test_count():
return test_count


def generate_random_files(total_size):
src_dir = get_source_dir()
print(
Expand Down Expand Up @@ -288,6 +292,8 @@ def create_md5_for_directory(src_dir, md5_file_name):
lines = []
for root, dirs, files in os.walk(src_dir):
for file in files:
if file == ".wdt.log":
continue
full_path = os.path.join(root, file)
md5 = get_md5_for_file(full_path)
lines.append("{0} {1}".format(md5, file))
Expand Down
33 changes: 33 additions & 0 deletions test/wdt_dl_resume_test1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#! /usr/bin/env python

# the test is similar to wdt_dl_resume_test1.py
# we initiate a transfer and cancel it 2/3+ in flight
# we make sure the second transfer finishes the job
# but takes less than 50% of the first one
# 1/3 ~= 50% * 2/3
# in the end the test ensures that receiver imposes
# -enable_download_resumption on the sender

from time import time
from common_utils import *

root_dir = create_test_directory("/tmp")
generate_random_files(100 * 1024 * 1024)

start_test("receiver -enable_download_resumption imposes it on sender")
test_count = get_test_count()
start_time = time.time()
start_receiver("-num_ports=1 -avg_mbytes_per_sec=10 -enable_download_resumption -abort_after_seconds=7 -delete_extra_files=true")
run_sender("-avg_mbytes_per_sec=10 -block_size_mbytes=1")
check_transfer_status(True, True)
dur1 = time.time() - start_time
start_time = time.time()
start_receiver("-num_ports=1 -avg_mbytes_per_sec=10 -enable_download_resumption -delete_extra_files=true")
run_sender("-avg_mbytes_per_sec=10 -block_size_mbytes=1")
check_transfer_status()
dur2 = time.time() - start_time
# 1.9 = 2 with 5% tolerance
if dur1 < 1.9 * dur2:
print("Aborted previously, this run should take < 50% of prev cycle")
exit(1)
exit(verify_transfer_success())
61 changes: 61 additions & 0 deletions test/wdt_dl_resume_test2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#! /usr/bin/env python

# the test ensures that when the receiver is with enable_download_resumption
# this will also impose it on the sender as well as set of files is kept
# the same as delete_extra_files is true on the receiver.
# there was a bug where we did not delete these files when
# sender/receiver mismatch on enable_download_resumption

from time import time
from common_utils import *
import os


def delete_one_dst_file():
dst_dir = get_dest_dir()
file_name = os.path.join(dst_dir, "file0")
os.remove(file_name)


def rename_one_src_file():
src_dir = get_source_dir()
new_file_name = os.path.join(src_dir, "file55")
src_file_name = os.path.join(src_dir, "file0")
os.rename(src_file_name, new_file_name)


root_dir = create_test_directory("/tmp")
generate_random_files(100 * 1024 * 1024)

start_test("receiver -enable_download_resumption imposes it on sender")
# first we measure how long it takes for a full transfer
test_count = get_test_count()
start_time = time.time()
start_receiver("-num_ports=1 -avg_mbytes_per_sec=10 -enable_download_resumption -delete_extra_files=true")
run_sender("-avg_mbytes_per_sec=10 -block_size_mbytes=1")
check_transfer_status()
dur1 = time.time() - start_time
# we delete one file from the dst directory and make sure
# the transfer takes a small portion of the previous cycle runtime
delete_one_dst_file()
start_time = time.time()
start_receiver("-num_ports=1 -avg_mbytes_per_sec=10 -enable_download_resumption -delete_extra_files=true")
run_sender("-avg_mbytes_per_sec=10 -block_size_mbytes=1")
check_transfer_status()
dur2 = time.time() - start_time
if dur1 < 12 * dur2:
print("Single deletion of a file < 8% wall clock of overall cycle")
exit(1)
# next we rename one of the src file and check if
# the transfer takes less than 8% of the prev cycle
rename_one_src_file()
start_time = time.time()
start_receiver("-num_ports=1 -avg_mbytes_per_sec=10 -enable_download_resumption -delete_extra_files=true")
run_sender("-avg_mbytes_per_sec=10 -block_size_mbytes=1")
check_transfer_status()
dur3 = time.time() - start_time
if dur1 < 12 * dur3:
print("Single addition of a file < 8% wall clock of overall cycle")
exit(1)
# the set of files should be identical
exit(verify_transfer_success())

0 comments on commit 143fcf5

Please sign in to comment.