Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support mutable plasma objects #41515

Merged
merged 34 commits into from
Dec 9, 2023

Conversation

stephanie-wang
Copy link
Contributor

@stephanie-wang stephanie-wang commented Nov 30, 2023

Why are these changes needed?

This is a first pass on introducing an experimental "channel" concept that can be used for direct worker-worker communication, bypassing the usual Ray Core components like the driver and raylet.

Channels are implemented as mutable plasma objects. The object can be written multiple times by a client. The writer must specify the number of reads that can be made before the written object value is no longer valid. Reads block until the specified version or a later one is available. Writes block until all readers are available. Synchronization between a single writer and multiple readers is performed through a new header for plasma objects that is stored in shared memory.

API:

  1. channel: Channel = ray.experimental.channel.Channel(buf_size): Client uses the normal ray.put path to create a mutable plasma object. Once created and sealed for the first time, the plasma store synchronously reads and releases the object. At this point, the object may be written by the original client and read by others.
  2. channel.write(val): Use the handle returned by the above to send a value through the channel. The caller waits until all readers of the previous version have released the object, then writes a new version.
  3. val = channel.begin_read(): Blocks until a value is available. Equivalent to ray.get. This is the beginning of the client's read.
  4. channel.end_read(): End the client's read, marking the channel as available to write again.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
python/ray/_private/worker.py Outdated Show resolved Hide resolved
Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question is around readability--- since these new code paths are pretty different from the rest of the plasma handling, perhaps we should by convention prefix them with MutableObj or some other naming convention? Experimental is another option.

@@ -3465,11 +3474,40 @@ cdef class CoreWorker:
generator_id=CObjectID.Nil(),
owner_address=c_owner_address))

def put_serialized_object_to_mutable_plasma_object(self, serialized_object,
ObjectRef object_ref,
num_readers,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document this arg as experimental?

Copy link
Contributor Author

@stephanie-wang stephanie-wang Dec 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one's okay because this method is only called by the new experimental path (renamed though).

python/ray/_raylet.pyx Outdated Show resolved Hide resolved
src/ray/core_worker/store_provider/plasma_store_provider.h Outdated Show resolved Hide resolved
python/ray/_private/worker.py Outdated Show resolved Hide resolved
src/ray/object_manager/plasma/client.cc Outdated Show resolved Hide resolved
@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Dec 1, 2023
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
@rkooo567
Copy link
Contributor

rkooo567 commented Dec 1, 2023

btw do we not use eventfd anymore? Cannot find code.

Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re; API.

Maybe it is better hiding object ref from the channel? As the returned object ref may have different semantics from a regular object ref (it is mutable)?

channel = ray._create_channel(byte_size=1000)
channel._read()
channel._write()
channel._end()

With the current semantic, if you just ray.get()

python/ray/_private/worker.py Outdated Show resolved Hide resolved
python/ray/_private/worker.py Outdated Show resolved Hide resolved
python/ray/_private/worker.py Show resolved Hide resolved
@PublicAPI
def _write_channel(value: Any, object_ref: ObjectRef, num_readers: int):
worker = global_worker
worker.check_connected()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw this call is pretty expensive. Maybe we should skip it.

In my macbook it takes 10us

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved it to a constructor but that seems off, it's just a bool check...

python/ray/_private/worker.py Outdated Show resolved Hide resolved
// Increment the count of the number of instances of this object that this
// client is using. Cache the reference to the object.
IncrementObjectCount(received_object_ids[i], object, true);
auto &object_entry = objects_in_use_[received_object_ids[i]];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

objects_in_use_[received_object_ids[i]]; will create a new empty obj if received_object_ids[i] doesn't exist. Should we RAY_CHECK if received_object_ids[i] exists here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IncrementObjectCount will create the object first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some code cleanup here to make it a bit nicer. Previously IncrementObjectCount both added and incremented the count, which is not very clear.

std::unique_lock<std::recursive_mutex> guard(client_mutex_);

auto object_entry = objects_in_use_.find(object_id);
if (object_entry == objects_in_use_.end()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add tests for these cases?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like client.cc doesn't have unit tests... we should probably add one in the future. Maybe we can do that in the python test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a meaningful python test here.

src/ray/object_manager/plasma/client.cc Outdated Show resolved Hide resolved

// The data and metadata size may have changed, so update here before we
// create the Get buffer to return.
object_entry->object.data_size = plasma_header->data_size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't handle different data size in this PR yet right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's handled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm looking at our impl, I am not sure if we can handle write in the driver and read in the worker with different size.

Can you try adding a test like this? (it is what I wrote in the dag branch)

"""
Verify put in 2 different processes work.
"""
ray.init(num_cpus=1)

print("Test Write input from driver -> Read & Write from worker -> Read output from driver")
expected_input = b"000000000000"
ref = ray.put(expected_input, max_readers=1)
print(ref)

@ray.remote
class A:
    def f(self, refs, expected_input, output_val):
        ref = refs[0]
        val = ray.get(refs[0])
        assert val == expected_input, val
        ray.release(ref)
        ray.worker.global_worker.put_object(output_val, object_ref=ref, max_readers=1)


a = A.remote()
time.sleep(1)
output_val = b"0"
b = a.f.remote([ref], expected_input, output_val)
ray.get(b)
val = ray.get(ref)
assert output_val == val
ray.release(ref)

print("Test Write input from driver twice -> Read & Write from worker -> Read output from driver")
# Test write twice.
ref = ray.put(b"000000000000", max_readers=1)
assert b"000000000000" == ray.get(ref)
ray.release(ref)
print(ref)
expected_input = b"1"
ray.worker.global_worker.put_object(b"1", object_ref=ref, max_readers=1)

a = A.remote()
time.sleep(1)
expected_output = b"23"
b = a.f.remote([ref], expected_input, expected_output)
ray.get(b)
val = ray.get(ref)
assert expected_output == val
ray.release(ref)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does work, but the test is a good idea. Added.

src/ray/object_manager/plasma/client.cc Outdated Show resolved Hide resolved
@stephanie-wang
Copy link
Contributor Author

btw do we not use eventfd anymore? Cannot find code.

Yup I got rid of it!

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
@stephanie-wang stephanie-wang removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Dec 1, 2023
python/ray/experimental/channel.py Show resolved Hide resolved
src/ray/core_worker/core_worker.cc Show resolved Hide resolved
)

def begin_read(self) -> Any:
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add some sanity checks on the channel? For example, raising errors if trying to operate on the channel again before end_read is called after a begin_read.

Or other unsupported scenarios (such as deserializing the channel to a different node id). These might help development / documenting the current limitations.

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Dec 1, 2023
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
@stephanie-wang stephanie-wang removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Dec 2, 2023
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last question for different data size

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
@ericl
Copy link
Contributor

ericl commented Dec 4, 2023

LGTM

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
x
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
x
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
@stephanie-wang stephanie-wang merged commit cb5bb4e into ray-project:master Dec 9, 2023
14 of 15 checks passed
stephanie-wang added a commit to stephanie-wang/ray that referenced this pull request Dec 10, 2023
jjyao added a commit to jjyao/ray that referenced this pull request Dec 10, 2023
jjyao added a commit that referenced this pull request Dec 11, 2023
stephanie-wang added a commit to stephanie-wang/ray that referenced this pull request Dec 11, 2023
stephanie-wang added a commit that referenced this pull request Dec 12, 2023
See #41515.

This updates to only compile new code on linux. OSX does not support shared memory semaphores, only named semaphores.

---------

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants