-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Support mutable plasma objects #41515
[core] Support mutable plasma objects #41515
Conversation
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One question is around readability--- since these new code paths are pretty different from the rest of the plasma handling, perhaps we should by convention prefix them with MutableObj
or some other naming convention? Experimental
is another option.
python/ray/_raylet.pyx
Outdated
@@ -3465,11 +3474,40 @@ cdef class CoreWorker: | |||
generator_id=CObjectID.Nil(), | |||
owner_address=c_owner_address)) | |||
|
|||
def put_serialized_object_to_mutable_plasma_object(self, serialized_object, | |||
ObjectRef object_ref, | |||
num_readers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document this arg as experimental?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this one's okay because this method is only called by the new experimental path (renamed though).
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
btw do we not use eventfd anymore? Cannot find code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re; API.
Maybe it is better hiding object ref from the channel? As the returned object ref may have different semantics from a regular object ref (it is mutable)?
channel = ray._create_channel(byte_size=1000)
channel._read()
channel._write()
channel._end()
With the current semantic, if you just ray.get()
python/ray/_private/worker.py
Outdated
@PublicAPI | ||
def _write_channel(value: Any, object_ref: ObjectRef, num_readers: int): | ||
worker = global_worker | ||
worker.check_connected() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw this call is pretty expensive. Maybe we should skip it.
In my macbook it takes 10us
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved it to a constructor but that seems off, it's just a bool check...
// Increment the count of the number of instances of this object that this | ||
// client is using. Cache the reference to the object. | ||
IncrementObjectCount(received_object_ids[i], object, true); | ||
auto &object_entry = objects_in_use_[received_object_ids[i]]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
objects_in_use_[received_object_ids[i]]; will create a new empty obj if received_object_ids[i]
doesn't exist. Should we RAY_CHECK if received_object_ids[i] exists here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IncrementObjectCount will create the object first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some code cleanup here to make it a bit nicer. Previously IncrementObjectCount both added and incremented the count, which is not very clear.
std::unique_lock<std::recursive_mutex> guard(client_mutex_); | ||
|
||
auto object_entry = objects_in_use_.find(object_id); | ||
if (object_entry == objects_in_use_.end()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add tests for these cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like client.cc doesn't have unit tests... we should probably add one in the future. Maybe we can do that in the python test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is a meaningful python test here.
|
||
// The data and metadata size may have changed, so update here before we | ||
// create the Get buffer to return. | ||
object_entry->object.data_size = plasma_header->data_size; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't handle different data size in this PR yet right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's handled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm looking at our impl, I am not sure if we can handle write in the driver and read in the worker with different size.
Can you try adding a test like this? (it is what I wrote in the dag branch)
"""
Verify put in 2 different processes work.
"""
ray.init(num_cpus=1)
print("Test Write input from driver -> Read & Write from worker -> Read output from driver")
expected_input = b"000000000000"
ref = ray.put(expected_input, max_readers=1)
print(ref)
@ray.remote
class A:
def f(self, refs, expected_input, output_val):
ref = refs[0]
val = ray.get(refs[0])
assert val == expected_input, val
ray.release(ref)
ray.worker.global_worker.put_object(output_val, object_ref=ref, max_readers=1)
a = A.remote()
time.sleep(1)
output_val = b"0"
b = a.f.remote([ref], expected_input, output_val)
ray.get(b)
val = ray.get(ref)
assert output_val == val
ray.release(ref)
print("Test Write input from driver twice -> Read & Write from worker -> Read output from driver")
# Test write twice.
ref = ray.put(b"000000000000", max_readers=1)
assert b"000000000000" == ray.get(ref)
ray.release(ref)
print(ref)
expected_input = b"1"
ray.worker.global_worker.put_object(b"1", object_ref=ref, max_readers=1)
a = A.remote()
time.sleep(1)
expected_output = b"23"
b = a.f.remote([ref], expected_input, expected_output)
ray.get(b)
val = ray.get(ref)
assert expected_output == val
ray.release(ref)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does work, but the test is a good idea. Added.
Yup I got rid of it! |
) | ||
|
||
def begin_read(self) -> Any: | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add some sanity checks on the channel? For example, raising errors if trying to operate on the channel again before end_read is called after a begin_read.
Or other unsupported scenarios (such as deserializing the channel to a different node id). These might help development / documenting the current limitations.
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last question for different data size
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
LGTM |
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
This reverts commit cb5bb4e.
This reverts commit cb5bb4e.
)" (ray-project#41784)" This reverts commit d7926fa.
Why are these changes needed?
This is a first pass on introducing an experimental "channel" concept that can be used for direct worker-worker communication, bypassing the usual Ray Core components like the driver and raylet.
Channels are implemented as mutable plasma objects. The object can be written multiple times by a client. The writer must specify the number of reads that can be made before the written object value is no longer valid. Reads block until the specified version or a later one is available. Writes block until all readers are available. Synchronization between a single writer and multiple readers is performed through a new header for plasma objects that is stored in shared memory.
API:
channel: Channel = ray.experimental.channel.Channel(buf_size)
: Client uses the normalray.put
path to create a mutable plasma object. Once created and sealed for the first time, the plasma store synchronously reads and releases the object. At this point, the object may be written by the original client and read by others.channel.write(val)
: Use the handle returned by the above to send a value through the channel. The caller waits until all readers of the previous version have released the object, then writes a new version.val = channel.begin_read()
: Blocks until a value is available. Equivalent toray.get
. This is the beginning of the client's read.channel.end_read()
: End the client's read, marking the channel as available to write again.Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.