-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core][compiled-graphs] Support wait-and-get to round-robin the acquisition of mutable objects allowing for fast failure #49444
Closed
Closed
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
e631160
pass tests
kevin85421 bf585fa
pass tests, refactor
kevin85421 bf7a26e
pass tests, waitables
kevin85421 93bff72
update
kevin85421 d68f8db
pass tests
kevin85421 aa7b831
pass tests, retrieve obj one by one in sync reader
kevin85421 a22bbaa
update
kevin85421 d1aac6c
update comment and move import to top-level
kevin85421 a84e561
remove logs and update comments for WaitAndGetExperimentalMutableObjects
kevin85421 9e44591
update comments
kevin85421 3ddad3a
add some utils
kevin85421 4ca28e9
fix test_channel tests
kevin85421 d8dadb4
update
kevin85421 5d18be7
Merge remote-tracking branch 'upstream/master' into 20241224-2
kevin85421 cddaef7
fix test_channel
kevin85421 abd4d28
update type hint
kevin85421 a684329
remove c++ log
kevin85421 2a1b3fc
update _read_list
kevin85421 9a3ae27
refactor
kevin85421 9f2f1d1
remove comment for visualize tests
kevin85421 ba489c3
add tests
kevin85421 ebbee6f
address comments
kevin85421 8ebe78c
move retrieve_obj_refs to util
kevin85421 d958b24
fix lint error
kevin85421 551bacb
fix nccl channel tests
kevin85421 1ae976b
fix test
kevin85421 1cc8210
fix typo
kevin85421 8c4e61d
refactor
kevin85421 6d22187
Merge remote-tracking branch 'upstream/master' into 20241224-2
kevin85421 38c8652
fix lint
kevin85421 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
update
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
- Loading branch information
commit d8dadb47e95d518e2b62fce3327b611d1303c275
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -164,6 +164,10 @@ def ensure_registered_as_writer(self): | |
self._cpu_data_channel.ensure_registered_as_writer() | ||
|
||
def ensure_registered_as_reader(self): | ||
reader = utils.get_self_actor() | ||
if reader == self._writer: | ||
self._local_channel.ensure_registered_as_reader() | ||
return | ||
self._gpu_data_channel.ensure_registered_as_reader() | ||
if self._cpu_data_channel is not None: | ||
self._cpu_data_channel.ensure_registered_as_reader() | ||
|
@@ -194,12 +198,31 @@ def _send_cpu_and_gpu_data(self, value: Any, timeout: Optional[float]): | |
# normally. | ||
self.serialization_ctx.set_use_external_transport(False) | ||
|
||
# First send the extracted tensors through a GPU-specific channel. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NCCL write -> NCCL read -> all mutable objects are ready -> _cpu_data_channel.write -> NCCL write |
||
self._gpu_data_channel.write(gpu_tensors) | ||
# Next send the non-tensor data through a CPU-specific channel. The | ||
# The `write` operation of the shared memory channel must be called | ||
# before the `write` operation of the GPU channel. This is because in | ||
# `_read_list`, the channel's `read` operation waits for all underlying | ||
# mutable objects for all input channels to be consumed. | ||
# | ||
# Step 1: `_cpu_data_channel.write` is called to write data into the | ||
# mutable object. | ||
# Step 2: `_read_list` consumes the mutable object. | ||
# Step 3: After all underlying mutable objects of all input channels are | ||
# consumed, `read` is called in the receiver of the NCCL channel. | ||
# | ||
# If we call NCCL write before the CPU channel write, then the shared | ||
# memory channel's `write` operation will block because the NCCL write | ||
# operation blocks forever until the NCCL read operation is called. However, | ||
# the `read` operation of the NCCL channel will never be called because | ||
# `_read_list` will never consume the mutable object that hasn't been | ||
# written yet. | ||
|
||
# First send the non-tensor data through a CPU-specific channel. The | ||
# data contains placeholders for the extracted tensors. | ||
self._cpu_data_channel.write(cpu_data) | ||
|
||
# Next send the extracted tensors through a GPU-specific channel. | ||
self._gpu_data_channel.write(gpu_tensors) | ||
|
||
def write(self, value: Any, timeout: Optional[float] = None) -> None: | ||
""" | ||
Send a value that may contain torch.Tensors that should be sent via | ||
|
@@ -275,17 +298,29 @@ def _recv_cpu_and_gpu_data( | |
# Next, read and deserialize the non-tensor data. The registered custom | ||
# deserializer will replace the found tensor placeholders with | ||
# `tensors`. | ||
data = self._cpu_data_channel.read( | ||
# | ||
# We need to deserialize the CPU data channel first in `read` instead of | ||
# `_read_list` because the deserialization of the CPU data channel relies | ||
# on the out-of-band tensors in the serialization context. Therefore, the | ||
# `read` method of the NCCL channel must be called first to ensure that | ||
# the out-of-band tensors are ready. | ||
serialized_data, metadata = self._cpu_data_channel.read( | ||
timeout=timeout, | ||
) | ||
rets = self._worker.deserialize_objects( | ||
[(serialized_data, metadata)], self._cpu_data_channel.get_ray_waitables() | ||
) | ||
assert len(rets) == 1 | ||
ret = rets[0] | ||
|
||
# Check that all placeholders had a corresponding tensor. | ||
( | ||
_, | ||
deserialized_tensor_placeholders, | ||
) = self.serialization_ctx.reset_out_of_band_tensors([]) | ||
assert deserialized_tensor_placeholders == set(range(len(tensors))) | ||
|
||
return data | ||
return ret | ||
|
||
def read(self, timeout: Optional[float] = None) -> Any: | ||
""" | ||
|
@@ -327,10 +362,19 @@ def read(self, timeout: Optional[float] = None) -> Any: | |
|
||
def get_ray_waitables(self) -> List[ObjectRef]: | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.ensure_registered_as_reader() | ||
reader = utils.get_self_actor() | ||
if reader == self._writer: | ||
return self._local_channel.get_ray_waitables() | ||
waitables = [] | ||
waitables.extend(self._gpu_data_channel.get_ray_waitables()) | ||
if self._cpu_data_channel is not None: | ||
waitables.extend(self._cpu_data_channel.get_ray_waitables()) | ||
cpu_waitables = self._cpu_data_channel.get_ray_waitables() | ||
assert len(cpu_waitables) == 1 | ||
# Skip deserialization of the CPU data in `_read_list` and | ||
# handle the deserialization in the channel's `read()` method | ||
# after the out-of-band tensors are ready in the serialization | ||
# context instead. | ||
waitables.append((cpu_waitables[0][0], True)) | ||
return waitables | ||
|
||
def close(self) -> None: | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like these are static? should we do it at init time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to ReaderInterface constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After giving it a second thought, I realized it is not static. For example, the
get_ray_waitables
method ofBufferedSharedMemoryChannel
should return the buffer that will be read in the current read operation. Therefore, the return value ofget_ray_waitables
is not always the same.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.