-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core][compiled-graphs] Support wait-and-get to round-robin the acquisition of mutable objects allowing for fast failure #49444
Conversation
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial pass, partial review
( | ||
waitables_to_num_consumers, | ||
skip_deserialization_waitables_to_num_consumers, | ||
) = self._get_all_waitables_to_num_consumers() | ||
normal_waitables = list(waitables_to_num_consumers.keys()) | ||
skip_deserialization_waitables = list( | ||
skip_deserialization_waitables_to_num_consumers.keys() | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like these are static? should we do it at init time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to ReaderInterface constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After giving it a second thought, I realized it is not static. For example, the get_ray_waitables
method of BufferedSharedMemoryChannel
should return the buffer that will be read in the current read operation. Therefore, the return value of get_ray_waitables
is not always the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def get_ray_waitables(self) -> List[Tuple[ObjectRef, bool]]:
self.ensure_registered_as_reader()
return self._buffers[self._next_read_index].get_ray_waitables()
@@ -193,12 +198,31 @@ def _send_cpu_and_gpu_data(self, value: Any, timeout: Optional[float]): | |||
# normally. | |||
self.serialization_ctx.set_use_external_transport(False) | |||
|
|||
# First send the extracted tensors through a GPU-specific channel. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NCCL write -> NCCL read -> all mutable objects are ready -> _cpu_data_channel.write -> NCCL write
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
int64_t remaining_timeout = timeout_ms == -1 ? 1e9 : timeout_ms; | ||
auto timeout_point = ToTimeoutPoint(remaining_timeout); | ||
int64_t iteration_timeout = | ||
std::min(remaining_timeout, RayConfig::instance().get_timeout_milliseconds()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this env variable should be renamed, kind of misleading for both core and cgraph, maybe in a separate pr, but it's should be like get_iteration_timeout_milliseconds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, the name is misleading.
|
||
// Try to acquire the object. | ||
Status s = experimental_mutable_object_provider_->ReadAcquire( | ||
ids[i], results[i], iteration_timeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also now that timeout is always guaranteed to be there, ReadAcquire shouldn't be taking an int that could be -1, just pass it a non-optional timeout_point from here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I will address it in a separate PR to avoid making this PR bigger.
Will this approach work? Modify |
NcclCommunicator does not seem to support If a |
I synced with @stephanie-wang offline. I will try using the channel
For this question, we just relied on the e2e timeout to handle it. |
We decided to proceed with #49711 instead of this PR. |
Why are these changes needed?
Issue statement
https://gist.github.com/kevin85421/a7f14ea38d64420b105fbd79fd31fb8a
Without this PR, both
SynchronousReader._read_list
andAwaitableBackgroundReader._run
call the read function of each input channel sequentially. If the first input channel involves a long-running task and the second one fails immediately, the reader still has to wait until all channels have been read before failing.Based on #46337,
Implementation details
experimental_wait_and_get_mutable_objects
/CoreWorker::WaitAndGetExperimentalMutableObjects
WaitAndGetExperimentalMutableObjects
iterates through a list of mutable object references and retrieves the data when the objects are ready. The function returns when eithernum_objects
mutable objects are acquired or the operation times out.SynchronousReader._read_list
/AwaitableBackgroundReader._run
_get_all_waitables_to_num_consumers
: Iterate throughself._input_channels
and callget_ray_waitables
to retrieve all underlying mutable object references.worker.experimental_wait_and_get_mutable_objects
: Attempt to retrieve a single mutable object from a list of mutable object references. If the return value is aRayTaskError
, immediately return and raise an exception. Otherwise, write the return value intoChannelContext
.self._input_channels
and callchannel.read()
.Channel.read
(shared_memory_channel.py)_read_list
,Channel.read
retrieves the data fromChannelContext
instead of object store.get_ray_waitables
Retrieve the underlying mutable object references that the "next"
read
operation plans to access. Therefore,get_ray_waitables
may return different results for differentread
function calls on the same channel. For example,BufferedSharedMemoryChannel
'sget_ray_waitables
:Special case:
TorchTensorNcclChannel
: See the comments in the file for more details._read_list
and_run
will skip deserialization for theTorchTensorNcclChannel
's mutable object becauseTorchTensorNcclChannel
relies on a custom serializer, which replaces placeholders in the CPU data with tensors read from the NCCL channel during deserialization._read_list
or_run
before the reader has retrieved the GPU tensors via the NCCL channel and placed the out-of-band tensors into the serialization context, issues may arise.read
operation.Related issue number
Closes #46337
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.