Skip to content

Commit

Permalink
Revert D26955317: Perform appropriate CUDA stream synchronization in …
Browse files Browse the repository at this point in the history
…distributed autograd.

Test Plan: revert-hammer

Differential Revision:
D26955317 (0b84f45)

Original commit changeset: eace6d4f91d4

fbshipit-source-id: 1f322b4d7cf7d1a7e6caf3194c6f0bf163d45850
  • Loading branch information
ezyang authored and facebook-github-bot committed Mar 11, 2021
1 parent ffac9b2 commit e185ec6
Show file tree
Hide file tree
Showing 5 changed files with 2 additions and 85 deletions.
22 changes: 1 addition & 21 deletions torch/csrc/distributed/autograd/engine/dist_engine.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include <queue>

#include <ATen/Parallel.h>
#include <c10/core/Event.h>
#include <torch/csrc/autograd/functions/accumulate_grad.h>
#include <torch/csrc/autograd/input_buffer.h>
#include <torch/csrc/distributed/autograd/context/container.h>
Expand Down Expand Up @@ -424,27 +423,8 @@ std::shared_ptr<c10::ivalue::Future> DistEngine::

std::shared_ptr<c10::ivalue::Future> DistEngine::executeSendFunctionAsync(
const ContextPtr& autogradContext,
const std::shared_ptr<SendRpcBackward>& sendFunction,
const std::shared_ptr<Node>& sendFunction,
bool retainGraph) {

// Typically the local autograd engine ensures stream synchronizations between
// nodes in the graph. However, for distributed autograd the sendFunction
// inputs might have been retrieved over the wire on a separate stream and the
// sendFunction itself runs on a different stream. As a result, we need to
// manually synchronize those two streams here.
const auto& send_backward_stream = sendFunction->stream(c10::DeviceType::CUDA);
if (send_backward_stream) {
for (const auto& grad : sendFunction->getGrads()) {
const auto guard = c10::impl::VirtualGuardImpl{c10::DeviceType::CUDA};
const auto default_stream = guard.getStream(grad.device());
if (send_backward_stream != default_stream) {
auto event = c10::Event{c10::DeviceType::CUDA};
event.record(default_stream);
send_backward_stream->wait(event);
}
}
}

std::unique_lock<std::mutex> lock(initializedContextIdsLock_);
if (initializedContextIds_.find(autogradContext->contextId()) ==
initializedContextIds_.end()) {
Expand Down
2 changes: 1 addition & 1 deletion torch/csrc/distributed/autograd/engine/dist_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class TORCH_API DistEngine {
// The gradients are accumulated in the provided autograd context.
std::shared_ptr<c10::ivalue::Future> executeSendFunctionAsync(
const ContextPtr& autogradContext,
const std::shared_ptr<SendRpcBackward>& sendFunction,
const std::shared_ptr<torch::autograd::Node>& sendFunction,
bool retainGraph);

// Number of backward passes currently running for the Distributed Engine.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ void SendRpcBackward::setGrads(const torch::autograd::variable_list& grads) {
grads_ = grads;
}

const torch::autograd::variable_list& SendRpcBackward::getGrads() const {
return grads_;
}

} // namespace autograd
} // namespace distributed
} // namespace torch
3 changes: 0 additions & 3 deletions torch/csrc/distributed/autograd/functions/sendrpc_backward.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ struct TORCH_API SendRpcBackward : public torch::autograd::Node {
// computation.
void setGrads(const torch::autograd::variable_list& grads);

// Retrieve the grads for the function.
const torch::autograd::variable_list& getGrads() const;

private:
torch::autograd::variable_list grads_;
};
Expand Down
56 changes: 0 additions & 56 deletions torch/testing/_internal/distributed/rpc/dist_autograd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import time
import unittest
from enum import Enum
import random
import torch
from datetime import timedelta
import torch.distributed as dist
Expand Down Expand Up @@ -2267,58 +2266,3 @@ def test_device_maps_backward_pass(self):
self.assertEqual(t2.device, grads[t2].device)

rpc.shutdown()

class MyRemoteCompute(torch.nn.Module):
def __init__(self):
super().__init__()

def forward(self, input):
input = input * 2.0
return input

class MyLocalCompute(torch.nn.Module):
def __init__(self, next_stage):
super().__init__()
self.next_stage = next_stage

def forward(self, input):
return self.next_stage.rpc_sync().forward(input)

@skip_if_lt_x_gpu(4)
def test_remote_module(self):

options = self.rpc_backend_options
dst = worker_name((self.rank + 1) % self.world_size)

# The reverse of this device mapping should be used for the backward pass.
options.set_device_map(dst, {self.rank: (self.rank + 1) % self.world_size})

rpc.init_rpc(
name=worker_name(self.rank),
backend=self.rpc_backend,
rank=self.rank,
world_size=self.world_size,
rpc_backend_options=options,
)

remote_compute = rpc.remote(dst, TensorPipeDistAutogradTest.MyRemoteCompute)
local_compute = TensorPipeDistAutogradTest.MyLocalCompute(remote_compute)
for _ in range(10):
input = torch.rand([1000, 10000], device=self.rank, requires_grad=True)
# Run local autograd
result = input * 2.0
r = random.random()
loss = result.sum() * r
loss.backward()

# Run distributed autograd
with dist_autograd.context() as context_id:
result = local_compute(input)
loss = result.sum() * r
dist_autograd.backward(context_id, [loss])

# Compare grads.
grads = dist_autograd.get_gradients(context_id)
self.assertEqual(input.grad, grads[input])

rpc.shutdown()

0 comments on commit e185ec6

Please sign in to comment.