From cb5bb4e7763994db4f4e765ce9dc74376d7eedca Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Sat, 9 Dec 2023 11:15:48 -0800 Subject: [PATCH] [core] Support mutable plasma objects (#41515) This is a first pass on introducing an experimental "channel" concept that can be used for direct worker-worker communication, bypassing the usual Ray Core components like the driver and raylet. Channels are implemented as mutable plasma objects. The object can be written multiple times by a client. The writer must specify the number of reads that can be made before the written object value is no longer valid. Reads block until the specified version or a later one is available. Writes block until all readers are available. Synchronization between a single writer and multiple readers is performed through a new header for plasma objects that is stored in shared memory. API: channel: Channel = ray.experimental.channel.Channel(buf_size): Client uses the normal ray.put path to create a mutable plasma object. Once created and sealed for the first time, the plasma store synchronously reads and releases the object. At this point, the object may be written by the original client and read by others. channel.write(val): Use the handle returned by the above to send a value through the channel. The caller waits until all readers of the previous version have released the object, then writes a new version. val = channel.begin_read(): Blocks until a value is available. Equivalent to ray.get. This is the beginning of the client's read. channel.end_read(): End the client's read, marking the channel as available to write again. --------- Signed-off-by: Stephanie Wang --- .buildkite/core.rayci.yml | 2 +- BUILD.bazel | 1 + .../ray/runtime/object/native_object_store.cc | 3 +- python/ray/_private/ray_perf.py | 87 ++++ python/ray/_private/worker.py | 43 +- python/ray/_raylet.pxd | 3 +- python/ray/_raylet.pyx | 71 +++- python/ray/experimental/channel.py | 149 +++++++ python/ray/includes/libcoreworker.pxd | 12 + python/ray/tests/BUILD | 1 + python/ray/tests/test_channel.py | 159 +++++++ python/ray/tests/test_object_store_metrics.py | 18 +- src/ray/core_worker/core_worker.cc | 49 ++- src/ray/core_worker/core_worker.h | 39 ++ ...io_ray_runtime_object_NativeObjectStore.cc | 6 +- .../store_provider/plasma_store_provider.cc | 69 +++- .../store_provider/plasma_store_provider.h | 38 +- src/ray/object_manager/common.cc | 181 ++++++++ src/ray/object_manager/common.h | 107 ++++- src/ray/object_manager/object_buffer_pool.cc | 1 + src/ray/object_manager/plasma/client.cc | 391 +++++++++++++++--- src/ray/object_manager/plasma/client.h | 49 +++ src/ray/object_manager/plasma/common.h | 16 + src/ray/object_manager/plasma/object_store.cc | 11 + src/ray/object_manager/plasma/plasma.fbs | 11 + src/ray/object_manager/plasma/plasma.h | 7 + .../object_manager/plasma/plasma_allocator.cc | 2 +- src/ray/object_manager/plasma/protocol.cc | 21 +- src/ray/object_manager/plasma/protocol.h | 1 + .../test/object_buffer_pool_test.cc | 15 + 30 files changed, 1445 insertions(+), 118 deletions(-) create mode 100644 python/ray/experimental/channel.py create mode 100644 python/ray/tests/test_channel.py create mode 100644 src/ray/object_manager/common.cc diff --git a/.buildkite/core.rayci.yml b/.buildkite/core.rayci.yml index 0cc3c37327d4..0e89ad785d28 100644 --- a/.buildkite/core.rayci.yml +++ b/.buildkite/core.rayci.yml @@ -222,7 +222,7 @@ steps: - label: ":ray: core: cpp ubsan tests" tags: core_cpp - instance_type: medium + instance_type: large commands: - bazel run //ci/ray_ci:test_in_docker -- //:all //src/... core --build-type ubsan --except-tags no_ubsan diff --git a/BUILD.bazel b/BUILD.bazel index d281edd0656b..953c1a524f5f 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -321,6 +321,7 @@ PLASMA_LINKOPTS = [] + select({ ray_cc_library( name = "plasma_client", srcs = [ + "src/ray/object_manager/common.cc", "src/ray/object_manager/plasma/client.cc", "src/ray/object_manager/plasma/connection.cc", "src/ray/object_manager/plasma/malloc.cc", diff --git a/cpp/src/ray/runtime/object/native_object_store.cc b/cpp/src/ray/runtime/object/native_object_store.cc index b65159a9c83f..6d1a14ae120f 100644 --- a/cpp/src/ray/runtime/object/native_object_store.cc +++ b/cpp/src/ray/runtime/object/native_object_store.cc @@ -91,7 +91,8 @@ std::vector> NativeObjectStore::GetRaw( const std::vector &ids, int timeout_ms) { auto &core_worker = CoreWorkerProcess::GetCoreWorker(); std::vector> results; - ::ray::Status status = core_worker.Get(ids, timeout_ms, &results); + ::ray::Status status = core_worker.Get( + ids, timeout_ms, /*is_experimental_mutable_object=*/false, &results); if (!status.ok()) { if (status.IsTimedOut()) { throw RayTimeoutException("Get object error:" + status.message()); diff --git a/python/ray/_private/ray_perf.py b/python/ray/_private/ray_perf.py index 316f3baeca84..330527957d67 100644 --- a/python/ray/_private/ray_perf.py +++ b/python/ray/_private/ray_perf.py @@ -8,6 +8,8 @@ import multiprocessing import ray +import ray.experimental.channel as ray_channel + logger = logging.getLogger(__name__) @@ -288,6 +290,91 @@ def async_actor_multi(): results += timeit("n:n async-actor calls async", async_actor_multi, m * n) ray.shutdown() + ################################################# + # Perf tests for channels, used in compiled DAGs. + ################################################# + + ray.init() + + def put_channel_small(chans, do_get=False, do_release=False): + for chan in chans: + chan.write(b"0") + if do_get: + chan.begin_read() + if do_release: + chan.end_read() + + @ray.remote + class ChannelReader: + def ready(self): + return + + def read(self, chans): + while True: + for chan in chans: + chan.begin_read() + chan.end_read() + + chans = [ray_channel.Channel(1000)] + results += timeit( + "local put, single channel calls", + lambda: put_channel_small(chans, do_release=True), + ) + results += timeit( + "local put:local get, single channel calls", + lambda: put_channel_small(chans, do_get=True, do_release=True), + ) + + chans = [ray_channel.Channel(1000)] + reader = ChannelReader.remote() + ray.get(reader.ready.remote()) + reader.read.remote(chans) + results += timeit( + "local put:1 remote get, single channel calls", lambda: put_channel_small(chans) + ) + ray.kill(reader) + + n_cpu = multiprocessing.cpu_count() // 2 + print(f"Testing multiple readers/channels, n={n_cpu}") + + chans = [ray_channel.Channel(1000, num_readers=n_cpu)] + readers = [ChannelReader.remote() for _ in range(n_cpu)] + ray.get([reader.ready.remote() for reader in readers]) + for reader in readers: + reader.read.remote(chans) + results += timeit( + "local put:n remote get, single channel calls", + lambda: put_channel_small(chans), + ) + for reader in readers: + ray.kill(reader) + + chans = [ray_channel.Channel(1000) for _ in range(n_cpu)] + reader = ChannelReader.remote() + ray.get(reader.ready.remote()) + reader.read.remote(chans) + results += timeit( + "local put:1 remote get, n channels calls", lambda: put_channel_small(chans) + ) + ray.kill(reader) + + chans = [ray_channel.Channel(1000) for _ in range(n_cpu)] + readers = [ChannelReader.remote() for _ in range(n_cpu)] + ray.get([reader.ready.remote() for reader in readers]) + for chan, reader in zip(chans, readers): + reader.read.remote([chan]) + results += timeit( + "local put:n remote get, n channels calls", lambda: put_channel_small(chans) + ) + for reader in readers: + ray.kill(reader) + + ray.shutdown() + + ############################ + # End of channel perf tests. + ############################ + NUM_PGS = 100 NUM_BUNDLES = 1 ray.init(resources={"custom": 100}) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index d4210c06ac77..c5047bfb22de 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -712,7 +712,13 @@ def set_mode(self, mode): def set_load_code_from_local(self, load_code_from_local): self._load_code_from_local = load_code_from_local - def put_object(self, value, object_ref=None, owner_address=None): + def put_object( + self, + value: Any, + object_ref: Optional["ray.ObjectRef"] = None, + owner_address: Optional[str] = None, + _is_experimental_mutable_object: bool = False, + ): """Put value in the local object store with object reference `object_ref`. This assumes that the value for `object_ref` has not yet been placed in @@ -727,6 +733,10 @@ def put_object(self, value, object_ref=None, owner_address=None): object_ref: The object ref of the value to be put. If None, one will be generated. owner_address: The serialized address of object's owner. + _is_experimental_mutable_object: An experimental flag for mutable + objects. If True, then the returned object will not have a + valid value. The object must be written to using the + ray.experimental.channel API before readers can read. Returns: ObjectRef: The object ref the object was put under. @@ -760,6 +770,11 @@ def put_object(self, value, object_ref=None, owner_address=None): f"{sio.getvalue()}" ) raise TypeError(msg) from e + + # If the object is mutable, then the raylet should never read the + # object. Instead, clients will keep the object pinned. + pin_object = not _is_experimental_mutable_object + # This *must* be the first place that we construct this python # ObjectRef because an entry with 0 local references is created when # the object is Put() in the core worker, expecting that this python @@ -768,7 +783,11 @@ def put_object(self, value, object_ref=None, owner_address=None): # reference counter. return ray.ObjectRef( self.core_worker.put_serialized_object_and_increment_local_ref( - serialized_value, object_ref=object_ref, owner_address=owner_address + serialized_value, + object_ref=object_ref, + pin_object=pin_object, + owner_address=owner_address, + _is_experimental_mutable_object=_is_experimental_mutable_object, ), # The initial local reference is already acquired internally. skip_adding_local_ref=True, @@ -790,7 +809,12 @@ def deserialize_objects(self, data_metadata_pairs, object_refs): context = self.get_serialization_context() return context.deserialize_objects(data_metadata_pairs, object_refs) - def get_objects(self, object_refs: list, timeout: Optional[float] = None): + def get_objects( + self, + object_refs: list, + timeout: Optional[float] = None, + _is_experimental_mutable_object: bool = False, + ): """Get the values in the object store associated with the IDs. Return the values from the local object store for object_refs. This @@ -806,6 +830,10 @@ def get_objects(self, object_refs: list, timeout: Optional[float] = None): list: List of deserialized objects bytes: UUID of the debugger breakpoint we should drop into or b"" if there is no breakpoint. + _is_experimental_mutable_object: An experimental flag for mutable + objects. If True, then wait until there is a value available to + read. The object must also already be local, or else the get + call will hang. """ # Make sure that the values are object refs. for object_ref in object_refs: @@ -817,7 +845,10 @@ def get_objects(self, object_refs: list, timeout: Optional[float] = None): timeout_ms = int(timeout * 1000) if timeout is not None else -1 data_metadata_pairs = self.core_worker.get_objects( - object_refs, self.current_task_id, timeout_ms + object_refs, + self.current_task_id, + timeout_ms, + _is_experimental_mutable_object, ) debugger_breakpoint = b"" for data, metadata in data_metadata_pairs: @@ -2648,7 +2679,9 @@ def get( @PublicAPI @client_mode_hook def put( - value: Any, *, _owner: Optional["ray.actor.ActorHandle"] = None + value: Any, + *, + _owner: Optional["ray.actor.ActorHandle"] = None, ) -> "ray.ObjectRef": """Store an object in the object store. diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index 015c636454df..5d47073b74e8 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -134,7 +134,8 @@ cdef class CoreWorker: CObjectID *c_object_id, shared_ptr[CBuffer] *data, c_bool created_by_worker, owner_address=*, - c_bool inline_small_object=*) + c_bool inline_small_object=*, + c_bool is_experimental_mutable_object=*) cdef unique_ptr[CAddress] _convert_python_address(self, address=*) cdef store_task_output( self, serialized_object, diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index e5c4549dd1bd..7b32ce1a3363 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -3370,14 +3370,15 @@ cdef class CoreWorker: return self.plasma_event_handler def get_objects(self, object_refs, TaskID current_task_id, - int64_t timeout_ms=-1): + int64_t timeout_ms=-1, + c_bool _is_experimental_mutable_object=False): cdef: c_vector[shared_ptr[CRayObject]] results CTaskID c_task_id = current_task_id.native() c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs) with nogil: op_status = CCoreWorkerProcess.GetCoreWorker().Get( - c_object_ids, timeout_ms, &results) + c_object_ids, timeout_ms, _is_experimental_mutable_object, &results) check_status(op_status) return RayObjectsToDataMetadataPairs(results) @@ -3412,7 +3413,9 @@ cdef class CoreWorker: CObjectID *c_object_id, shared_ptr[CBuffer] *data, c_bool created_by_worker, owner_address=None, - c_bool inline_small_object=True): + c_bool inline_small_object=True, + c_bool is_experimental_mutable_object=False, + ): cdef: unique_ptr[CAddress] c_owner_address @@ -3422,7 +3425,8 @@ cdef class CoreWorker: with nogil: check_status(CCoreWorkerProcess.GetCoreWorker() .CreateOwnedAndIncrementLocalRef( - metadata, data_size, contained_ids, + is_experimental_mutable_object, metadata, + data_size, contained_ids, c_object_id, data, created_by_worker, move(c_owner_address), inline_small_object)) @@ -3511,11 +3515,57 @@ cdef class CoreWorker: generator_id=CObjectID.Nil(), owner_address=c_owner_address)) - def put_serialized_object_and_increment_local_ref(self, serialized_object, - ObjectRef object_ref=None, - c_bool pin_object=True, - owner_address=None, - c_bool inline_small_object=True): + def experimental_mutable_object_put_serialized(self, serialized_object, + ObjectRef object_ref, + num_readers, + ): + cdef: + CObjectID c_object_id = object_ref.native() + shared_ptr[CBuffer] data + unique_ptr[CAddress] null_owner_address + + metadata = string_to_buffer(serialized_object.metadata) + data_size = serialized_object.total_bytes + check_status(CCoreWorkerProcess.GetCoreWorker() + .ExperimentalMutableObjectWriteAcquire( + c_object_id, + metadata, + data_size, + num_readers, + &data, + )) + if data_size > 0: + (serialized_object).write_to( + Buffer.make(data)) + check_status(CCoreWorkerProcess.GetCoreWorker() + .ExperimentalMutableObjectWriteRelease( + c_object_id, + )) + + def experimental_mutable_object_read_release(self, object_refs): + """ + For experimental.channel.Channel. + + Signal to the writer that the channel is ready to write again. The read + began when the caller calls ray.get and a written value is available. If + ray.get is not called first, then this call will block until a value is + written, then drop the value. + """ + cdef: + c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs) + with nogil: + op_status = (CCoreWorkerProcess.GetCoreWorker() + .ExperimentalMutableObjectReadRelease(c_object_ids)) + check_status(op_status) + + def put_serialized_object_and_increment_local_ref( + self, serialized_object, + ObjectRef object_ref=None, + c_bool pin_object=True, + owner_address=None, + c_bool inline_small_object=True, + c_bool _is_experimental_mutable_object=False, + ): cdef: CObjectID c_object_id shared_ptr[CBuffer] data @@ -3531,7 +3581,8 @@ cdef class CoreWorker: object_already_exists = self._create_put_buffer( metadata, total_bytes, object_ref, contained_object_ids, - &c_object_id, &data, True, owner_address, inline_small_object) + &c_object_id, &data, True, owner_address, inline_small_object, + _is_experimental_mutable_object) logger.debug( f"Serialized object size of {c_object_id.Hex()} is {total_bytes} bytes") diff --git a/python/ray/experimental/channel.py b/python/ray/experimental/channel.py new file mode 100644 index 000000000000..e8ef9ad085f7 --- /dev/null +++ b/python/ray/experimental/channel.py @@ -0,0 +1,149 @@ +import io +import logging +from typing import Any, Optional + +import ray +from ray.util.annotations import PublicAPI + +# Logger for this module. It should be configured at the entry point +# into the program using Ray. Ray provides a default configuration at +# entry/init points. +logger = logging.getLogger(__name__) + + +def _create_channel_ref( + buffer_size: int, +) -> "ray.ObjectRef": + """ + Create a channel that can be read and written by co-located Ray processes. + + The channel has no buffer, so the writer will block until reader(s) have + read the previous value. Only the channel creator may write to the channel. + + Args: + buffer_size: The number of bytes to allocate for the object data and + metadata. Writes to the channel must produce serialized data and + metadata less than or equal to this value. + Returns: + Channel: A wrapper around ray.ObjectRef. + """ + worker = ray._private.worker.global_worker + worker.check_connected() + + value = b"0" * buffer_size + + try: + object_ref = worker.put_object( + value, owner_address=None, _is_experimental_mutable_object=True + ) + except ray.exceptions.ObjectStoreFullError: + logger.info( + "Put failed since the value was either too large or the " + "store was full of pinned objects." + ) + raise + return object_ref + + +@PublicAPI(stability="alpha") +class Channel: + """ + A wrapper type for ray.ObjectRef. Currently supports ray.get but not + ray.wait. + """ + + def __init__(self, buffer_size: Optional[int] = None, num_readers: int = 1): + """ + Create a channel that can be read and written by co-located Ray processes. + + Only the caller may write to the channel. The channel has no buffer, + so the writer will block until reader(s) have read the previous value. + + Args: + buffer_size: The number of bytes to allocate for the object data and + metadata. Writes to the channel must produce serialized data and + metadata less than or equal to this value. + Returns: + Channel: A wrapper around ray.ObjectRef. + """ + if buffer_size is None: + self._base_ref = None + else: + self._base_ref = _create_channel_ref(buffer_size) + + self._num_readers = num_readers + self._worker = ray._private.worker.global_worker + self._worker.check_connected() + + @staticmethod + def _from_base_ref(base_ref: "ray.ObjectRef", num_readers: int) -> "Channel": + chan = Channel(num_readers=num_readers) + chan._base_ref = base_ref + return chan + + def __reduce__(self): + return self._from_base_ref, (self._base_ref, self._num_readers) + + def write(self, value: Any, num_readers: Optional[int] = None): + """ + Write a value to the channel. + + Blocks if there are still pending readers for the previous value. The + writer may not write again until the specified number of readers have + called ``end_read_channel``. + + Args: + value: The value to write. + num_readers: The number of readers that must read and release the value + before we can write again. + """ + if num_readers is None: + num_readers = self._num_readers + if num_readers <= 0: + raise ValueError("``num_readers`` must be a positive integer.") + + try: + serialized_value = self._worker.get_serialization_context().serialize(value) + except TypeError as e: + sio = io.StringIO() + ray.util.inspect_serializability(value, print_file=sio) + msg = ( + "Could not serialize the put value " + f"{repr(value)}:\n" + f"{sio.getvalue()}" + ) + raise TypeError(msg) from e + + self._worker.core_worker.experimental_mutable_object_put_serialized( + serialized_value, + self._base_ref, + num_readers, + ) + + def begin_read(self) -> Any: + """ + Read the latest value from the channel. This call will block until a + value is available to read. + + Subsequent calls to begin_read() will return the same value, until + end_read() is called. Then, the client must begin_read() again to get + the next value. + + Returns: + Any: The deserialized value. + """ + values, _ = self._worker.get_objects( + [self._base_ref], _is_experimental_mutable_object=True + ) + return values[0] + + def end_read(self): + """ + Signal to the writer that the channel is ready to write again. + + If begin_read is not called first, then this call will block until a + value is written, then drop the value. + """ + self._worker.core_worker.experimental_mutable_object_read_release( + [self._base_ref] + ) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 97035f419be7..de5dd651d557 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -230,6 +230,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const c_vector[CObjectID] &contained_object_ids, const CObjectID &object_id) CRayStatus CreateOwnedAndIncrementLocalRef( + c_bool is_mutable, const shared_ptr[CBuffer] &metadata, const size_t data_size, const c_vector[CObjectID] &contained_object_ids, @@ -243,12 +244,23 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CAddress &owner_address, shared_ptr[CBuffer] *data, c_bool created_by_worker) + CRayStatus ExperimentalMutableObjectWriteAcquire( + const CObjectID &object_id, + const shared_ptr[CBuffer] &metadata, + uint64_t data_size, + int64_t num_readers, + shared_ptr[CBuffer] *data) + CRayStatus ExperimentalMutableObjectWriteRelease( + const CObjectID &object_id) CRayStatus SealOwned(const CObjectID &object_id, c_bool pin_object, const unique_ptr[CAddress] &owner_address) CRayStatus SealExisting(const CObjectID &object_id, c_bool pin_object, const CObjectID &generator_id, const unique_ptr[CAddress] &owner_address) + CRayStatus ExperimentalMutableObjectReadRelease( + const c_vector[CObjectID] &object_ids) CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms, + c_bool is_experimental_mutable_object, c_vector[shared_ptr[CRayObject]] *results) CRayStatus GetIfLocal( const c_vector[CObjectID] &ids, diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 181428450ad1..21753059f0f6 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -250,6 +250,7 @@ py_test_module_list( "test_annotations.py", "test_args.py", "test_asyncio_cluster.py", + "test_channel.py", "test_concurrency_group.py", "test_component_failures.py", "test_cross_language.py", diff --git a/python/ray/tests/test_channel.py b/python/ray/tests/test_channel.py new file mode 100644 index 000000000000..3ff33df76bc5 --- /dev/null +++ b/python/ray/tests/test_channel.py @@ -0,0 +1,159 @@ +# coding: utf-8 +import logging +import os +import sys + +import numpy as np +import pytest + +import ray +import ray.cluster_utils +import ray.experimental.channel as ray_channel + +logger = logging.getLogger(__name__) + + +@pytest.mark.skipif(sys.platform == "win32", reason="Requires POSIX.") +def test_put_local_get(ray_start_regular): + chan = ray_channel.Channel(1000) + + num_writes = 1000 + for i in range(num_writes): + val = i.to_bytes(8, "little") + chan.write(val, num_readers=1) + assert chan.begin_read() == val + + # Begin read multiple times will return the same value. + assert chan.begin_read() == val + + chan.end_read() + + +@pytest.mark.skipif(sys.platform == "win32", reason="Requires POSIX.") +def test_errors(ray_start_regular): + @ray.remote + class Actor: + def make_chan(self, do_write=True): + self.chan = ray_channel.Channel(1000) + if do_write: + self.chan.write(b"hello", num_readers=1) + return self.chan + + a = Actor.remote() + # Only original creator can write. + chan = ray.get(a.make_chan.remote(do_write=False)) + with pytest.raises(ray.exceptions.RaySystemError): + chan.write(b"hi") + + # Only original creator can write. + chan = ray.get(a.make_chan.remote(do_write=True)) + assert chan.begin_read() == b"hello" + with pytest.raises(ray.exceptions.RaySystemError): + chan.write(b"hi") + + # Multiple consecutive reads from the same process are fine. + chan = ray.get(a.make_chan.remote(do_write=True)) + assert chan.begin_read() == b"hello" + assert chan.begin_read() == b"hello" + chan.end_read() + + @ray.remote + class Reader: + def __init__(self): + pass + + def read(self, chan): + return chan.begin_read() + + # Multiple reads from n different processes, where n > num_readers, errors. + chan = ray.get(a.make_chan.remote(do_write=True)) + readers = [Reader.remote(), Reader.remote()] + # At least 1 reader + with pytest.raises(ray.exceptions.RayTaskError) as exc_info: + ray.get([reader.read.remote(chan) for reader in readers]) + assert "ray.exceptions.RaySystemError" in str(exc_info.value) + + +@pytest.mark.skipif(sys.platform == "win32", reason="Requires POSIX.") +def test_put_different_meta(ray_start_regular): + chan = ray_channel.Channel(1000) + + def _test(val): + chan.write(val, num_readers=1) + + read_val = chan.begin_read() + if isinstance(val, np.ndarray): + assert np.array_equal(read_val, val) + else: + assert read_val == val + chan.end_read() + + _test(b"hello") + _test("hello") + _test(1000) + _test(np.random.rand(10)) + + # Cannot put a serialized value larger than the allocated buffer. + with pytest.raises(ValueError): + _test(np.random.rand(100)) + + _test(np.random.rand(1)) + + +@pytest.mark.skipif(sys.platform == "win32", reason="Requires POSIX.") +@pytest.mark.parametrize("num_readers", [1, 4]) +def test_put_remote_get(ray_start_regular, num_readers): + chan = ray_channel.Channel(1000) + + @ray.remote(num_cpus=0) + class Reader: + def __init__(self): + pass + + def read(self, chan, num_writes): + for i in range(num_writes): + val = i.to_bytes(8, "little") + assert chan.begin_read() == val + chan.end_read() + + for i in range(num_writes): + val = i.to_bytes(100, "little") + assert chan.begin_read() == val + chan.end_read() + + for val in [ + b"hello world", + "hello again", + 1000, + ]: + assert chan.begin_read() == val + chan.end_read() + + num_writes = 1000 + readers = [Reader.remote() for _ in range(num_readers)] + done = [reader.read.remote(chan, num_writes) for reader in readers] + for i in range(num_writes): + val = i.to_bytes(8, "little") + chan.write(val, num_readers=num_readers) + + # Test different data size. + for i in range(num_writes): + val = i.to_bytes(100, "little") + chan.write(val, num_readers=num_readers) + + # Test different metadata. + for val in [ + b"hello world", + "hello again", + 1000, + ]: + chan.write(val, num_readers=num_readers) + + ray.get(done) + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/test_object_store_metrics.py b/python/ray/tests/test_object_store_metrics.py index 60922a888dc4..7e72919e761b 100644 --- a/python/ray/tests/test_object_store_metrics.py +++ b/python/ray/tests/test_object_store_metrics.py @@ -92,7 +92,7 @@ def test_shared_memory_and_inline_worker_heap(shutdown_only): wait_for_condition( # 1KiB for metadata difference - lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB), + lambda: approx_eq_dict_in(objects_by_loc(info), expected, 2 * KiB), timeout=20, retry_interval_ms=500, ) @@ -134,7 +134,7 @@ def func(): wait_for_condition( # 1KiB for metadata difference - lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB), + lambda: approx_eq_dict_in(objects_by_loc(info), expected, 2 * KiB), timeout=20, retry_interval_ms=500, ) @@ -255,7 +255,7 @@ def test_fallback_memory(shutdown_only): wait_for_condition( # 2KiB for metadata difference - lambda: approx_eq_dict_in(objects_by_loc(info), expected, 2 * KiB), + lambda: approx_eq_dict_in(objects_by_loc(info), expected, 3 * KiB), timeout=20, retry_interval_ms=500, ) @@ -282,8 +282,8 @@ def test_fallback_memory(shutdown_only): } wait_for_condition( - # 1KiB for metadata difference - lambda: approx_eq_dict_in(objects_by_loc(info), expected, 2 * KiB), + # 3KiB for metadata difference + lambda: approx_eq_dict_in(objects_by_loc(info), expected, 3 * KiB), timeout=20, retry_interval_ms=500, ) @@ -302,8 +302,8 @@ def test_fallback_memory(shutdown_only): } wait_for_condition( - # 1KiB for metadata difference - lambda: approx_eq_dict_in(objects_by_loc(info), expected, 2 * KiB), + # 3KiB for metadata difference + lambda: approx_eq_dict_in(objects_by_loc(info), expected, 3 * KiB), timeout=20, retry_interval_ms=500, ) @@ -333,7 +333,7 @@ def test_seal_memory(shutdown_only): wait_for_condition( # 1KiB for metadata difference - lambda: approx_eq_dict_in(objects_by_seal_state(info), expected, 1 * KiB), + lambda: approx_eq_dict_in(objects_by_seal_state(info), expected, 2 * KiB), timeout=20, retry_interval_ms=500, ) @@ -347,7 +347,7 @@ def test_seal_memory(shutdown_only): wait_for_condition( # 1KiB for metadata difference - lambda: approx_eq_dict_in(objects_by_seal_state(info), expected, 1 * KiB), + lambda: approx_eq_dict_in(objects_by_seal_state(info), expected, 2 * KiB), timeout=20, retry_interval_ms=500, ) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index b59908b5c982..afd11e484031 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1229,6 +1229,7 @@ Status CoreWorker::Put(const RayObject &object, } Status CoreWorker::CreateOwnedAndIncrementLocalRef( + bool is_experimental_mutable_object, const std::shared_ptr &metadata, const size_t data_size, const std::vector &contained_object_ids, @@ -1303,7 +1304,8 @@ Status CoreWorker::CreateOwnedAndIncrementLocalRef( *object_id, /* owner_address = */ real_owner_address, data, - created_by_worker); + created_by_worker, + is_experimental_mutable_object); } if (!status.ok()) { RemoveLocalReference(*object_id); @@ -1334,6 +1336,20 @@ Status CoreWorker::CreateExisting(const std::shared_ptr &metadata, } } +Status CoreWorker::ExperimentalMutableObjectWriteAcquire( + const ObjectID &object_id, + const std::shared_ptr &metadata, + uint64_t data_size, + int64_t num_readers, + std::shared_ptr *data) { + return plasma_store_provider_->ExperimentalMutableObjectWriteAcquire( + object_id, metadata, data_size, num_readers, data); +} + +Status CoreWorker::ExperimentalMutableObjectWriteRelease(const ObjectID &object_id) { + return plasma_store_provider_->ExperimentalMutableObjectWriteRelease(object_id); +} + Status CoreWorker::SealOwned(const ObjectID &object_id, bool pin_object, const std::unique_ptr &owner_address) { @@ -1377,8 +1393,15 @@ Status CoreWorker::SealExisting(const ObjectID &object_id, return Status::OK(); } +Status CoreWorker::ExperimentalMutableObjectReadRelease( + const std::vector &object_ids) { + RAY_CHECK(object_ids.size() == 1); + return plasma_store_provider_->ExperimentalMutableObjectReadRelease(object_ids[0]); +} + Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_ms, + bool is_experimental_mutable_object, std::vector> *results) { std::unique_ptr state = nullptr; if (options_.worker_type == WorkerType::WORKER) { @@ -1446,6 +1469,7 @@ Status CoreWorker::Get(const std::vector &ids, RAY_LOG(DEBUG) << "Plasma GET timeout " << local_timeout_ms; RAY_RETURN_NOT_OK(plasma_store_provider_->Get(plasma_object_ids, local_timeout_ms, + is_experimental_mutable_object, worker_context_, &result_map, &got_exception)); @@ -2906,8 +2930,12 @@ bool CoreWorker::PinExistingReturnObject(const ObjectID &return_id, reference_counter_->AddLocalReference(return_id, ""); reference_counter_->AddBorrowedObject(return_id, ObjectID::Nil(), owner_address); - auto status = plasma_store_provider_->Get( - {return_id}, 0, worker_context_, &result_map, &got_exception); + auto status = plasma_store_provider_->Get({return_id}, + 0, + /*is_experimental_mutable_object=*/false, + worker_context_, + &result_map, + &got_exception); // Remove the temporary ref. RemoveLocalReference(return_id); @@ -3171,8 +3199,13 @@ Status CoreWorker::GetAndPinArgsForExecutor(const TaskSpecification &task, RAY_RETURN_NOT_OK( memory_store_->Get(by_ref_ids, -1, worker_context_, &result_map, &got_exception)); } else { - RAY_RETURN_NOT_OK(plasma_store_provider_->Get( - by_ref_ids, -1, worker_context_, &result_map, &got_exception)); + RAY_RETURN_NOT_OK( + plasma_store_provider_->Get(by_ref_ids, + -1, + /*is_experimental_mutable_object=*/false, + worker_context_, + &result_map, + &got_exception)); } for (const auto &it : result_map) { for (size_t idx : by_ref_indices[it.first]) { @@ -4166,7 +4199,11 @@ void CoreWorker::PlasmaCallback(SetResultCallback success, bool object_is_local = false; if (Contains(object_id, &object_is_local).ok() && object_is_local) { std::vector> vec; - if (Get(std::vector{object_id}, 0, &vec).ok()) { + if (Get(std::vector{object_id}, + 0, + /*is_experimental_mutable_object=*/false, + &vec) + .ok()) { RAY_CHECK(vec.size() > 0) << "Failed to get local object but Raylet notified object is local."; return success(vec.front(), object_id, py_future); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index a8901345f179..e945555e550a 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -605,6 +605,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// ensure that they decrement the ref count once the returned ObjectRef has /// gone out of scope. /// + /// \param[in] is_experimental_mutable_object Whether this object is an + /// experimental mutable object. If true, then the returned object buffer + /// will not be available to read until the caller Seals and then writes + /// again. /// \param[in] metadata Metadata of the object to be written. /// \param[in] data_size Size of the object to be written. /// \param[in] contained_object_ids The IDs serialized in this object. @@ -617,6 +621,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// small. /// \return Status. Status CreateOwnedAndIncrementLocalRef( + bool is_experimental_mutable_object, const std::shared_ptr &metadata, const size_t data_size, const std::vector &contained_object_ids, @@ -678,6 +683,39 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const ObjectID &generator_id = ObjectID::Nil(), const std::unique_ptr &owner_address = nullptr); + /// Experimental method for mutable objects. Acquires a write lock on the + /// object that prevents readers from reading until we are done writing. Does + /// not protect against concurrent writers. + /// + /// \param[in] object_id The ID of the object. + /// \param[in] metadata The metadata of the object. This overwrites the + /// current metadata. + /// \param[in] data_size The size of the object to write. This overwrites the + /// current data size. + /// \param[in] num_readers The number of readers that must read and release + /// the object before the caller can write again. + /// \param[out] data The mutable object buffer in plasma that can be written to. + Status ExperimentalMutableObjectWriteAcquire(const ObjectID &object_id, + const std::shared_ptr &metadata, + uint64_t data_size, + int64_t num_readers, + std::shared_ptr *data); + + /// Experimental method for mutable objects. Releases a write lock on the + /// object, allowing readers to read. This is the equivalent of "Seal" for + /// normal objects. + /// + /// \param[in] object_id The ID of the object. + Status ExperimentalMutableObjectWriteRelease(const ObjectID &object_id); + + /// Experimental method for mutable objects. Releases the objects, allowing them + /// to be written again. If the caller did not previously Get the objects, + /// then this first blocks until the latest value is available to read, then + /// releases the value. + /// + /// \param[in] object_ids The IDs of the objects. + Status ExperimentalMutableObjectReadRelease(const std::vector &object_ids); + /// Get a list of objects from the object store. Objects that failed to be retrieved /// will be returned as nullptrs. /// @@ -687,6 +725,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \return Status. Status Get(const std::vector &ids, const int64_t timeout_ms, + bool is_experimental_mutable_object, std::vector> *results); /// Get objects directly from the local plasma store, without waiting for the diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc index 955b46f746e9..72027ff27af5 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc @@ -42,6 +42,7 @@ Status PutSerializedObject(JNIEnv *env, nested_ids.push_back(ObjectID::FromBinary(ref.object_id())); } status = CoreWorkerProcess::GetCoreWorker().CreateOwnedAndIncrementLocalRef( + /*is_experimental_mutable_object=*/false, native_ray_object->GetMetadata(), data_size, nested_ids, @@ -128,7 +129,10 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeGet }); std::vector> results; auto status = - CoreWorkerProcess::GetCoreWorker().Get(object_ids, (int64_t)timeoutMs, &results); + CoreWorkerProcess::GetCoreWorker().Get(object_ids, + (int64_t)timeoutMs, + /*is_experimental_mutable_object=*/false, + &results); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); return NativeVectorToJavaList>( env, results, NativeRayObjectToJavaNativeRayObject); diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index bcc63089b9a5..d4c97b9e0ef0 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -108,12 +108,33 @@ Status CoreWorkerPlasmaStoreProvider::Put(const RayObject &object, return Status::OK(); } +Status CoreWorkerPlasmaStoreProvider::ExperimentalMutableObjectWriteAcquire( + const ObjectID &object_id, + const std::shared_ptr &metadata, + uint64_t data_size, + int64_t num_readers, + std::shared_ptr *data) { + return store_client_.ExperimentalMutableObjectWriteAcquire( + object_id, + data_size, + metadata ? metadata->Data() : nullptr, + metadata ? metadata->Size() : 0, + num_readers, + data); +} + +Status CoreWorkerPlasmaStoreProvider::ExperimentalMutableObjectWriteRelease( + const ObjectID &object_id) { + return store_client_.ExperimentalMutableObjectWriteRelease(object_id); +} + Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &metadata, const size_t data_size, const ObjectID &object_id, const rpc::Address &owner_address, std::shared_ptr *data, - bool created_by_worker) { + bool created_by_worker, + bool is_mutable) { auto source = plasma::flatbuf::ObjectSource::CreatedByWorker; if (!created_by_worker) { source = plasma::flatbuf::ObjectSource::RestoredFromStorage; @@ -121,6 +142,7 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &meta Status status = store_client_.CreateAndSpillIfNeeded(object_id, owner_address, + is_mutable, data_size, metadata ? metadata->Data() : nullptr, metadata ? metadata->Size() : 0, @@ -165,18 +187,21 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( absl::flat_hash_set &remaining, const std::vector &batch_ids, int64_t timeout_ms, + bool send_fetch_or_reconstruct_ipc, bool fetch_only, bool in_direct_call, const TaskID &task_id, absl::flat_hash_map> *results, bool *got_exception) { - const auto owner_addresses = reference_counter_->GetOwnerAddresses(batch_ids); - RAY_RETURN_NOT_OK( - raylet_client_->FetchOrReconstruct(batch_ids, - owner_addresses, - fetch_only, - /*mark_worker_blocked*/ !in_direct_call, - task_id)); + if (send_fetch_or_reconstruct_ipc) { + const auto owner_addresses = reference_counter_->GetOwnerAddresses(batch_ids); + RAY_RETURN_NOT_OK( + raylet_client_->FetchOrReconstruct(batch_ids, + owner_addresses, + fetch_only, + /*mark_worker_blocked*/ !in_direct_call, + task_id)); + } std::vector plasma_results; RAY_RETURN_NOT_OK(store_client_.Get(batch_ids, @@ -215,6 +240,11 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( return Status::OK(); } +Status CoreWorkerPlasmaStoreProvider::ExperimentalMutableObjectReadRelease( + const ObjectID &object_id) { + return store_client_.ExperimentalMutableObjectReadRelease(object_id); +} + Status CoreWorkerPlasmaStoreProvider::GetIfLocal( const std::vector &object_ids, absl::flat_hash_map> *results) { @@ -266,6 +296,7 @@ Status UnblockIfNeeded(const std::shared_ptr &client, Status CoreWorkerPlasmaStoreProvider::Get( const absl::flat_hash_set &object_ids, int64_t timeout_ms, + bool is_experimental_mutable_object, const WorkerContext &ctx, absl::flat_hash_map> *results, bool *got_exception) { @@ -281,14 +312,17 @@ Status CoreWorkerPlasmaStoreProvider::Get( for (int64_t i = start; i < batch_size && i < total_size; i++) { batch_ids.push_back(id_vector[start + i]); } - RAY_RETURN_NOT_OK(FetchAndGetFromPlasmaStore(remaining, - batch_ids, - /*timeout_ms=*/0, - /*fetch_only=*/true, - ctx.CurrentTaskIsDirectCall(), - ctx.GetCurrentTaskID(), - results, - got_exception)); + RAY_RETURN_NOT_OK(FetchAndGetFromPlasmaStore( + remaining, + batch_ids, + /*timeout_ms=*/0, + // Mutable objects must be local before ray.get. + /*send_fetch_or_reconstruct_ipc=*/!is_experimental_mutable_object, + /*fetch_only=*/true, + ctx.CurrentTaskIsDirectCall(), + ctx.GetCurrentTaskID(), + results, + got_exception)); } // If all objects were fetched already, return. Note that we always need to @@ -297,6 +331,8 @@ Status CoreWorkerPlasmaStoreProvider::Get( return UnblockIfNeeded(raylet_client_, ctx); } + RAY_CHECK(!is_experimental_mutable_object) << "Mutable objects must always be local"; + // If not all objects were successfully fetched, repeatedly call FetchOrReconstruct // and Get from the local object store in batches. This loop will run indefinitely // until the objects are all fetched if timeout is -1. @@ -330,6 +366,7 @@ Status CoreWorkerPlasmaStoreProvider::Get( RAY_RETURN_NOT_OK(FetchAndGetFromPlasmaStore(remaining, batch_ids, batch_timeout, + /*send_fetch_or_reconstruct_ipc=*/true, /*fetch_only=*/false, ctx.CurrentTaskIsDirectCall(), ctx.GetCurrentTaskID(), diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 2e08309c6cc8..365665562382 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -126,7 +126,8 @@ class CoreWorkerPlasmaStoreProvider { const ObjectID &object_id, const rpc::Address &owner_address, std::shared_ptr *data, - bool created_by_worker); + bool created_by_worker, + bool is_mutable = false); /// Seal an object buffer created with Create(). /// @@ -147,6 +148,7 @@ class CoreWorkerPlasmaStoreProvider { Status Get(const absl::flat_hash_set &object_ids, int64_t timeout_ms, + bool is_experimental_mutable_object, const WorkerContext &ctx, absl::flat_hash_map> *results, bool *got_exception); @@ -180,6 +182,39 @@ class CoreWorkerPlasmaStoreProvider { std::string MemoryUsageString(); + /// Experimental method for mutable objects. Acquires a write lock on the + /// object that prevents readers from reading until we are done writing. Does + /// not protect against concurrent writers. + /// + /// \param[in] object_id The ID of the object. + /// \param[in] metadata The metadata of the object. This overwrites the + /// current metadata. + /// \param[in] data_size The size of the object to write. This overwrites the + /// current data size. + /// \param[in] num_readers The number of readers that must read and release + /// the object before the caller can write again. + /// \param[out] data The mutable object buffer in plasma that can be written to. + Status ExperimentalMutableObjectWriteAcquire(const ObjectID &object_id, + const std::shared_ptr &metadata, + uint64_t data_size, + int64_t num_readers, + std::shared_ptr *data); + + /// Experimental method for mutable objects. Releases a write lock on the + /// object, allowing readers to read. This is the equivalent of "Seal" for + /// normal objects. + /// + /// \param[in] object_id The ID of the object. + Status ExperimentalMutableObjectWriteRelease(const ObjectID &object_id); + + /// Experimental method for mutable objects. Releases the objects, allowing them + /// to be written again. If the caller did not previously Get the objects, + /// then this first blocks until the latest value is available to read, then + /// releases the value. + /// + /// \param[in] object_id The ID of the object. + Status ExperimentalMutableObjectReadRelease(const ObjectID &object_id); + private: /// Ask the raylet to fetch a set of objects and then attempt to get them /// from the local plasma store. Successfully fetched objects will be removed @@ -202,6 +237,7 @@ class CoreWorkerPlasmaStoreProvider { absl::flat_hash_set &remaining, const std::vector &batch_ids, int64_t timeout_ms, + bool send_fetch_or_reconstruct_ipc, bool fetch_only, bool in_direct_call_task, const TaskID &task_id, diff --git a/src/ray/object_manager/common.cc b/src/ray/object_manager/common.cc new file mode 100644 index 000000000000..cb3335f9cf8c --- /dev/null +++ b/src/ray/object_manager/common.cc @@ -0,0 +1,181 @@ +// Copyright 2020-2021 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/object_manager/common.h" + +namespace ray { + +void PlasmaObjectHeader::Init() { +#ifndef _WIN32 + // wr_mut is shared between writer and readers. + pthread_mutexattr_t mutex_attr; + pthread_mutexattr_init(&mutex_attr); + pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); + pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_ERRORCHECK); + pthread_mutex_init(&wr_mut, &mutex_attr); + + sem_init(&rw_semaphore, PTHREAD_PROCESS_SHARED, 1); + + // Condition is shared between writer and readers. + pthread_condattr_t cond_attr; + pthread_condattr_init(&cond_attr); + pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); + pthread_cond_init(&cond, &cond_attr); +#endif +} + +void PlasmaObjectHeader::Destroy() { +#ifndef _WIN32 + RAY_CHECK(pthread_mutex_destroy(&wr_mut) == 0); + RAY_CHECK(pthread_cond_destroy(&cond) == 0); + RAY_CHECK(sem_destroy(&rw_semaphore) == 0); +#endif +} + +#ifndef _WIN32 + +void PrintPlasmaObjectHeader(const PlasmaObjectHeader *header) { + RAY_LOG(DEBUG) << "PlasmaObjectHeader: \n" + << "version: " << header->version << "\n" + << "num_readers: " << header->num_readers << "\n" + << "num_read_acquires_remaining: " << header->num_read_acquires_remaining + << "\n" + << "num_read_releases_remaining: " << header->num_read_releases_remaining + << "\n" + << "data_size: " << header->data_size << "\n" + << "metadata_size: " << header->metadata_size << "\n"; +} + +void PlasmaObjectHeader::WriteAcquire(int64_t write_version, + uint64_t write_data_size, + uint64_t write_metadata_size, + int64_t write_num_readers) { + RAY_LOG(DEBUG) << "WriteAcquire. version: " << write_version << ", data size " + << write_data_size << ", metadata size " << write_metadata_size + << ", num readers: " << write_num_readers; + sem_wait(&rw_semaphore); + RAY_CHECK(pthread_mutex_lock(&wr_mut) == 0); + PrintPlasmaObjectHeader(this); + + RAY_CHECK(num_read_acquires_remaining == 0); + RAY_CHECK(num_read_releases_remaining == 0); + RAY_CHECK(write_version == version + 1) + << "Write version " << write_version + << " is more than 1 greater than current version " << version + << ". Are you sure this is the only writer?"; + + version = write_version; + is_sealed = false; + data_size = write_data_size; + metadata_size = write_metadata_size; + num_readers = write_num_readers; + + RAY_LOG(DEBUG) << "WriteAcquire done"; + PrintPlasmaObjectHeader(this); + RAY_CHECK(pthread_mutex_unlock(&wr_mut) == 0); +} + +void PlasmaObjectHeader::WriteRelease(int64_t write_version) { + RAY_LOG(DEBUG) << "WriteRelease Waiting. version: " << write_version; + RAY_CHECK(pthread_mutex_lock(&wr_mut) == 0); + RAY_LOG(DEBUG) << "WriteRelease " << write_version; + PrintPlasmaObjectHeader(this); + + RAY_CHECK(version == write_version) + << "Write version " << write_version << " no longer matches current version " + << version << ". Are you sure this is the only writer?"; + + version = write_version; + is_sealed = true; + RAY_CHECK(num_readers != 0) << num_readers; + num_read_acquires_remaining = num_readers; + num_read_releases_remaining = num_readers; + + RAY_LOG(DEBUG) << "WriteRelease done, num_readers: " << num_readers; + PrintPlasmaObjectHeader(this); + RAY_CHECK(pthread_mutex_unlock(&wr_mut) == 0); + // Signal to all readers. + RAY_CHECK(pthread_cond_broadcast(&cond) == 0); +} + +bool PlasmaObjectHeader::ReadAcquire(int64_t version_to_read, int64_t *version_read) { + RAY_LOG(DEBUG) << "ReadAcquire waiting version " << version_to_read; + RAY_CHECK(pthread_mutex_lock(&wr_mut) == 0); + RAY_LOG(DEBUG) << "ReadAcquire " << version_to_read; + PrintPlasmaObjectHeader(this); + + // Wait for the requested version (or a more recent one) to be sealed. + while (version < version_to_read || !is_sealed) { + RAY_CHECK(pthread_cond_wait(&cond, &wr_mut) == 0); + } + + bool success = false; + if (num_readers == -1) { + // Object is a normal immutable object. Read succeeds. + *version_read = 0; + success = true; + } else { + *version_read = version; + if (version == version_to_read && num_read_acquires_remaining > 0) { + // This object is at the right version and still has reads remaining. Read + // succeeds. + num_read_acquires_remaining--; + success = true; + } else if (version > version_to_read) { + RAY_LOG(WARNING) << "Version " << version << " already exceeds version to read " + << version_to_read; + } else { + RAY_LOG(WARNING) << "Version " << version << " already has " << num_readers + << "readers"; + } + } + + RAY_LOG(DEBUG) << "ReadAcquire done"; + PrintPlasmaObjectHeader(this); + + RAY_CHECK(pthread_mutex_unlock(&wr_mut) == 0); + // Signal to other readers that they may read. + RAY_CHECK(pthread_cond_signal(&cond) == 0); + return success; +} + +void PlasmaObjectHeader::ReadRelease(int64_t read_version) { + bool all_readers_done = false; + RAY_LOG(DEBUG) << "ReadRelease Waiting" << read_version; + RAY_CHECK(pthread_mutex_lock(&wr_mut) == 0); + PrintPlasmaObjectHeader(this); + + RAY_LOG(DEBUG) << "ReadRelease " << read_version << " version is currently " << version; + RAY_CHECK(version == read_version) << "Version " << version << " modified from version " + << read_version << " at read start"; + + if (num_readers != -1) { + num_read_releases_remaining--; + RAY_CHECK(num_read_releases_remaining >= 0); + if (num_read_releases_remaining == 0) { + all_readers_done = true; + } + } + + PrintPlasmaObjectHeader(this); + RAY_LOG(DEBUG) << "ReadRelease done"; + RAY_CHECK(pthread_mutex_unlock(&wr_mut) == 0); + if (all_readers_done) { + sem_post(&rw_semaphore); + } +} + +#endif + +} // namespace ray diff --git a/src/ray/object_manager/common.h b/src/ray/object_manager/common.h index 66829d2511eb..e3e8381de933 100644 --- a/src/ray/object_manager/common.h +++ b/src/ray/object_manager/common.h @@ -14,6 +14,11 @@ #pragma once +#ifndef _WIN32 +#include +#endif + +#include #include #include @@ -36,9 +41,107 @@ using RestoreSpilledObjectCallback = const std::string &, std::function)>; +/// A header for all plasma objects that is allocated and stored in shared +/// memory. Therefore, it can be accessed across processes. +/// +/// For normal immutable objects, no synchronization between processes is +/// needed once the object has been Sealed. For experimental mutable objects, +/// we use the header to synchronize between writer and readers. +struct PlasmaObjectHeader { +// TODO(swang): PlasmaObjectHeader uses pthreads, POSIX mutex and semaphore. +#ifndef _WIN32 + // Used to signal to the writer when all readers are done. + sem_t rw_semaphore; + + // Protects all following state, used to signal from writer to readers. + pthread_mutex_t wr_mut; + // Used to signal to readers when the writer is done writing a new version. + pthread_cond_t cond; + // The object version. For immutable objects, this gets incremented to 1 on + // the first write and then should never be modified. For mutable objects, + // each new write must increment the version before releasing to readers. + int64_t version = 0; + // Indicates whether the current version has been written. is_sealed=false + // means that there is a writer who has WriteAcquire'd but not yet + // WriteRelease'd the current version. is_sealed=true means that `version` + // has been WriteRelease'd. A reader may read the actual object value if + // is_sealed=true and num_read_acquires_remaining != 0. + bool is_sealed = false; + // The total number of reads allowed before the writer can write again. This + // value should be set by the writer before releasing to readers. + // For immutable objects, this is set to -1 and infinite reads are allowed. + // Otherwise, readers must acquire/release before/after reading. + int64_t num_readers = 0; + // The number of readers who can acquire the current version. For mutable + // objects, readers must ensure this is > 0 and decrement before they read. + // Once this value reaches 0, no more readers are allowed until the writer + // writes a new version. + // NOTE(swang): Technically we do not need this because + // num_read_releases_remaining protects against too many readers. However, + // this allows us to throw an error as soon as the n+1-th reader begins, + // instead of waiting to error until the n+1-th reader is done reading. + int64_t num_read_acquires_remaining = 0; + // The number of readers who must release the current version before a new + // version can be written. For mutable objects, readers must decrement this + // when they are done reading the current version. Once this value reaches 0, + // the reader should signal to the writer that they can write again. + int64_t num_read_releases_remaining = 0; + // The valid data and metadata size of the Ray object. + // Not used for immutable objects. + // For mutable objects, this should be modified when the new object has a + // different data/metadata size. + uint64_t data_size = 0; + uint64_t metadata_size = 0; + + /// Blocks until all readers for the previous write have ReadRelease'd the + /// value. Protects against concurrent writers. Caller must pass consecutive + /// versions on each new write, starting with write_version=1. + /// + /// \param write_version The new version for write. + /// \param data_size The new data size of the object. + /// \param metadata_size The new metadata size of the object. + /// \param num_readers The number of readers for the object. + void WriteAcquire(int64_t write_version, + uint64_t data_size, + uint64_t metadata_size, + int64_t num_readers); + + /// Call after completing a write to signal that readers may read. + /// num_readers should be set before calling this. + /// + /// \param write_version The new version for write. This must match the + /// version previously passed to WriteAcquire. + void WriteRelease(int64_t write_version); + + // Blocks until the given version is ready to read. Returns false if the + // maximum number of readers have already read the requested version. + // + // \param[in] read_version The version to read. + // \param[out] version_read For normal immutable objects, this will be set to + // 0. Otherwise, the current version. + // \return success Whether the correct version was read and there were still + // reads remaining. + bool ReadAcquire(int64_t version_to_read, int64_t *version_read); + + // Finishes the read. If all reads are done, signals to the writer. This is + // not necessary to call for objects that have num_readers=-1. + /// + /// \param read_version This must match the version previously passed in + /// ReadAcquire. + void ReadRelease(int64_t read_version); +#endif + + /// Setup synchronization primitives. + void Init(); + + /// Destroy synchronization primitives. + void Destroy(); +}; + /// A struct that includes info about the object. struct ObjectInfo { ObjectID object_id; + bool is_mutable = false; int64_t data_size = 0; int64_t metadata_size = 0; /// Owner's raylet ID. @@ -50,7 +153,9 @@ struct ObjectInfo { /// Owner's worker ID. WorkerID owner_worker_id; - int64_t GetObjectSize() const { return data_size + metadata_size; } + int64_t GetObjectSize() const { + return data_size + metadata_size + (is_mutable ? sizeof(PlasmaObjectHeader) : 0); + } bool operator==(const ObjectInfo &other) const { return ((object_id == other.object_id) && (data_size == other.data_size) && diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 8004fb588811..a42a921fc50a 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -241,6 +241,7 @@ ray::Status ObjectBufferPool::EnsureBufferExists(const ObjectID &object_id, Status s = store_client_->CreateAndSpillIfNeeded( object_id, owner_address, + /*is_mutable=*/false, static_cast(object_size), nullptr, static_cast(metadata_size), diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index e3274a058df1..36ea5ba8bd3d 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -32,6 +32,7 @@ #include "absl/container/flat_hash_map.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/ray_config.h" +#include "ray/object_manager/common.h" #include "ray/object_manager/plasma/connection.h" #include "ray/object_manager/plasma/plasma.h" #include "ray/object_manager/plasma/protocol.h" @@ -94,6 +95,24 @@ struct ObjectInUseEntry { PlasmaObject object; /// A flag representing whether the object has been sealed. bool is_sealed; + + /// The below fields are experimental and used to implement + /// ray.experimental.channel. + /// Whether we are the writer. For now, only the original creator of the + /// mutable object may write to it. + bool is_writer = false; + /// The last version that we read. To read again, we must pass a newer + /// version than this. + int64_t next_version_to_read = 1; + /// Whether we currently have a read lock on the object. If this is true, + /// then it is safe to read the value of the object. For immutable objects, + /// this will always be true once the object has been sealed. For mutable + /// objects, ReadRelease resets this to false, and ReadAcquire resets to + /// true. + bool read_acquired = false; + /// The last version that we wrote. To write again, we must pass a newer + /// version than this. + int64_t next_version_to_write = 1; }; class PlasmaClient::Impl : public std::enable_shared_from_this { @@ -112,6 +131,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this *data); @@ -134,6 +155,15 @@ class PlasmaClient::Impl : public std::enable_shared_from_this *data); + + Status ExperimentalMutableObjectWriteRelease(const ObjectID &object_id); + Status Get(const std::vector &object_ids, int64_t timeout_ms, std::vector *object_buffers, @@ -145,6 +175,10 @@ class PlasmaClient::Impl : public std::enable_shared_from_this &object_entry); + + Status ExperimentalMutableObjectReadRelease(const ObjectID &object_id); + Status Release(const ObjectID &object_id); Status Contains(const ObjectID &object_id, bool *has_object); @@ -168,6 +202,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this *data); @@ -195,11 +230,19 @@ class PlasmaClient::Impl : public std::enable_shared_from_this(header_ptr); + } + + void InsertObjectInUse(const ObjectID &object_id, + std::unique_ptr object, + bool is_sealed); - void IncrementObjectCount(const ObjectID &object_id, - PlasmaObject *object, - bool is_sealed); + void IncrementObjectCount(const ObjectID &object_id); /// The boost::asio IO context for the client. instrumented_io_context main_service_; @@ -257,7 +300,7 @@ uint8_t *PlasmaClient::Impl::GetStoreFdAndMmap(MEMFD_TYPE store_fd_val, // Get a pointer to a file that we know has been memory mapped in this client // process before. -uint8_t *PlasmaClient::Impl::LookupMmappedFile(MEMFD_TYPE store_fd_val) { +uint8_t *PlasmaClient::Impl::LookupMmappedFile(MEMFD_TYPE store_fd_val) const { auto entry = mmap_table_.find(store_fd_val); RAY_CHECK(entry != mmap_table_.end()); return entry->second->pointer(); @@ -270,39 +313,39 @@ bool PlasmaClient::Impl::IsInUse(const ObjectID &object_id) { return (elem != objects_in_use_.end()); } -void PlasmaClient::Impl::IncrementObjectCount(const ObjectID &object_id, - PlasmaObject *object, - bool is_sealed) { +void PlasmaClient::Impl::InsertObjectInUse(const ObjectID &object_id, + std::unique_ptr object, + bool is_sealed) { + auto inserted = + objects_in_use_.insert({object_id, std::make_unique()}); + RAY_CHECK(inserted.second) << "Object already in use"; + auto it = inserted.first; + + // Add this object ID to the hash table of object IDs in use. The + // corresponding call to free happens in PlasmaClient::Release. + it->second->object = *object.release(); + // Count starts at 1 to pin the object. + it->second->count = 1; + it->second->is_sealed = is_sealed; +} + +void PlasmaClient::Impl::IncrementObjectCount(const ObjectID &object_id) { // Increment the count of the object to track the fact that it is being used. // The corresponding decrement should happen in PlasmaClient::Release. - auto elem = objects_in_use_.find(object_id); - ObjectInUseEntry *object_entry; - if (elem == objects_in_use_.end()) { - // Add this object ID to the hash table of object IDs in use. The - // corresponding call to free happens in PlasmaClient::Release. - objects_in_use_[object_id] = std::make_unique(); - objects_in_use_[object_id]->object = *object; - objects_in_use_[object_id]->count = 0; - objects_in_use_[object_id]->is_sealed = is_sealed; - object_entry = objects_in_use_[object_id].get(); - } else { - object_entry = elem->second.get(); - RAY_CHECK(object_entry->count > 0); - } - // Increment the count of the number of instances of this object that are - // being used by this client. The corresponding decrement should happen in - // PlasmaClient::Release. - object_entry->count += 1; + auto object_entry = objects_in_use_.find(object_id); + RAY_CHECK(object_entry != objects_in_use_.end()); + object_entry->second->count += 1; } Status PlasmaClient::Impl::HandleCreateReply(const ObjectID &object_id, + bool is_experimental_mutable_object, const uint8_t *metadata, uint64_t *retry_with_request_id, std::shared_ptr *data) { std::vector buffer; RAY_RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaCreateReply, &buffer)); ObjectID id; - PlasmaObject object; + auto object = std::make_unique(); MEMFD_TYPE store_fd; int64_t mmap_size; @@ -311,7 +354,7 @@ Status PlasmaClient::Impl::HandleCreateReply(const ObjectID &object_id, buffer.size(), &id, retry_with_request_id, - &object, + object.get(), &store_fd, &mmap_size)); if (*retry_with_request_id > 0) { @@ -321,46 +364,140 @@ Status PlasmaClient::Impl::HandleCreateReply(const ObjectID &object_id, } else { uint64_t unused = 0; RAY_RETURN_NOT_OK(ReadCreateReply( - buffer.data(), buffer.size(), &id, &unused, &object, &store_fd, &mmap_size)); + buffer.data(), buffer.size(), &id, &unused, object.get(), &store_fd, &mmap_size)); RAY_CHECK(unused == 0); } // If the CreateReply included an error, then the store will not send a file // descriptor. - if (object.device_num == 0) { + if (object->device_num == 0) { // The metadata should come right after the data. - RAY_CHECK(object.metadata_offset == object.data_offset + object.data_size); + RAY_CHECK(object->metadata_offset == object->data_offset + object->data_size); RAY_LOG(DEBUG) << "GetStoreFdAndMmap " << store_fd.first << ", " << store_fd.second << ", size " << mmap_size << " for object id " << id; *data = std::make_shared( shared_from_this(), - GetStoreFdAndMmap(store_fd, mmap_size) + object.data_offset, - object.data_size); + GetStoreFdAndMmap(store_fd, mmap_size) + object->data_offset, + object->data_size); // If plasma_create is being called from a transfer, then we will not copy the // metadata here. The metadata will be written along with the data streamed // from the transfer. if (metadata != NULL) { // Copy the metadata to the buffer. - memcpy((*data)->Data() + object.data_size, metadata, object.metadata_size); + memcpy((*data)->Data() + object->data_size, metadata, object->metadata_size); } } else { RAY_LOG(FATAL) << "GPU is not enabled."; } - // Increment the count of the number of instances of this object that this - // client is using. A call to PlasmaClient::Release is required to decrement - // this count. Cache the reference to the object. - IncrementObjectCount(object_id, &object, false); + // Add the object as in use. A call to PlasmaClient::Release is required to + // decrement the initial ref count of 1. Cache the reference to the object. + InsertObjectInUse(object_id, std::move(object), /*is_sealed=*/false); // We increment the count a second time (and the corresponding decrement will // happen in a PlasmaClient::Release call in plasma_seal) so even if the // buffer returned by PlasmaClient::Create goes out of scope, the object does // not get released before the call to PlasmaClient::Seal happens. - IncrementObjectCount(object_id, &object, false); + IncrementObjectCount(object_id); + + // Create IPC was successful. + auto object_entry = objects_in_use_.find(object_id); + RAY_CHECK(object_entry != objects_in_use_.end()); + auto &entry = object_entry->second; + RAY_CHECK(!entry->is_sealed); + entry->is_writer = true; + + return Status::OK(); +} + +Status PlasmaClient::Impl::ExperimentalMutableObjectWriteAcquire( + const ObjectID &object_id, + int64_t data_size, + const uint8_t *metadata, + int64_t metadata_size, + int64_t num_readers, + std::shared_ptr *data) { +#ifndef _WIN32 + std::unique_lock guard(client_mutex_); + auto object_entry = objects_in_use_.find(object_id); + if (object_entry == objects_in_use_.end()) { + return Status::Invalid( + "Plasma buffer for mutable object not in scope. Are you sure you're the writer?"); + } + if (!object_entry->second->is_writer) { + return Status::Invalid( + "Mutable objects can only be written by the original creator process."); + } + RAY_CHECK(object_entry != objects_in_use_.end()); + + auto &entry = object_entry->second; + RAY_CHECK(entry->object.is_experimental_mutable_object); + RAY_CHECK(entry->is_sealed) << "Must Seal before writing again to a mutable object"; + + RAY_LOG(DEBUG) << "Write mutable object " << object_id; + + // Wait for no readers. + auto plasma_header = GetPlasmaObjectHeader(entry->object); + // TODO(swang): Support data + metadata size larger than allocated buffer. + if (data_size + metadata_size > entry->object.allocated_size) { + return Status::InvalidArgument("Serialized size of mutable data (" + + std::to_string(data_size) + ") + metadata size (" + + std::to_string(metadata_size) + + ") is larger than allocated buffer size " + + std::to_string(entry->object.allocated_size)); + } + plasma_header->WriteAcquire( + entry->next_version_to_write, data_size, metadata_size, num_readers); + + // Prepare the data buffer and return to the client instead of sending + // the IPC to object store. + *data = std::make_shared( + shared_from_this(), + GetStoreFdAndMmap(entry->object.store_fd, entry->object.mmap_size) + + entry->object.data_offset, + data_size); + if (metadata != NULL) { + // Copy the metadata to the buffer. + memcpy((*data)->Data() + data_size, metadata, metadata_size); + } + + entry->is_sealed = false; +#endif + return Status::OK(); +} + +Status PlasmaClient::Impl::ExperimentalMutableObjectWriteRelease( + const ObjectID &object_id) { +#ifndef _WIN32 + std::unique_lock guard(client_mutex_); + auto object_entry = objects_in_use_.find(object_id); + if (object_entry == objects_in_use_.end()) { + return Status::Invalid( + "Plasma buffer for mutable object not in scope. Are you sure you're the writer?"); + } + if (!object_entry->second->is_writer) { + return Status::Invalid( + "Mutable objects can only be written by the original creator process."); + } + RAY_CHECK(object_entry != objects_in_use_.end()); + + auto &entry = object_entry->second; + RAY_CHECK(entry->object.is_experimental_mutable_object); + RAY_CHECK(!entry->is_sealed) + << "Must WriteAcquire before WriteRelease on a mutable object"; + + entry->is_sealed = true; + auto plasma_header = GetPlasmaObjectHeader(entry->object); + plasma_header->WriteRelease( + /*write_version=*/entry->next_version_to_write); + // The next Write must pass a higher version. + entry->next_version_to_write++; +#endif return Status::OK(); } Status PlasmaClient::Impl::CreateAndSpillIfNeeded(const ObjectID &object_id, const ray::rpc::Address &owner_address, + bool is_experimental_mutable_object, int64_t data_size, const uint8_t *metadata, int64_t metadata_size, @@ -375,12 +512,14 @@ Status PlasmaClient::Impl::CreateAndSpillIfNeeded(const ObjectID &object_id, RAY_RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, owner_address, + is_experimental_mutable_object, data_size, metadata_size, source, device_num, /*try_immediately=*/false)); - Status status = HandleCreateReply(object_id, metadata, &retry_with_request_id, data); + Status status = HandleCreateReply( + object_id, is_experimental_mutable_object, metadata, &retry_with_request_id, data); while (retry_with_request_id > 0) { guard.unlock(); @@ -390,8 +529,12 @@ Status PlasmaClient::Impl::CreateAndSpillIfNeeded(const ObjectID &object_id, guard.lock(); RAY_LOG(DEBUG) << "Retrying request for object " << object_id << " with request ID " << retry_with_request_id; - status = RetryCreate( - object_id, retry_with_request_id, metadata, &retry_with_request_id, data); + status = RetryCreate(object_id, + retry_with_request_id, + is_experimental_mutable_object, + metadata, + &retry_with_request_id, + data); } return status; @@ -399,12 +542,14 @@ Status PlasmaClient::Impl::CreateAndSpillIfNeeded(const ObjectID &object_id, Status PlasmaClient::Impl::RetryCreate(const ObjectID &object_id, uint64_t request_id, + bool is_experimental_mutable_object, const uint8_t *metadata, uint64_t *retry_with_request_id, std::shared_ptr *data) { std::lock_guard guard(client_mutex_); RAY_RETURN_NOT_OK(SendCreateRetryRequest(store_conn_, object_id, request_id)); - return HandleCreateReply(object_id, metadata, retry_with_request_id, data); + return HandleCreateReply( + object_id, is_experimental_mutable_object, metadata, retry_with_request_id, data); } Status PlasmaClient::Impl::TryCreateImmediately(const ObjectID &object_id, @@ -422,12 +567,14 @@ Status PlasmaClient::Impl::TryCreateImmediately(const ObjectID &object_id, RAY_RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, owner_address, + /*is_experimental_mutable_object=*/false, data_size, metadata_size, source, device_num, /*try_immediately=*/true)); - return HandleCreateReply(object_id, metadata, nullptr, data); + return HandleCreateReply( + object_id, /*is_experimental_mutable_object=*/false, metadata, nullptr, data); } Status PlasmaClient::Impl::GetBuffers( @@ -456,9 +603,17 @@ Status PlasmaClient::Impl::GetBuffers( << "Attempting to get an object that this client created but hasn't sealed."; all_present = false; } else { + if (object_entry->second->object.is_experimental_mutable_object) { + // Wait for the object to become ready to read. + RAY_RETURN_NOT_OK(EnsureGetAcquired(object_entry->second)); + } + PlasmaObject *object = &object_entry->second->object; - std::shared_ptr physical_buf; + std::shared_ptr physical_buf; + RAY_LOG(DEBUG) << "Plasma Get " << object_ids[i] + << ", data size: " << object->data_size + << ", metadata size: " << object->metadata_size; if (object->device_num == 0) { uint8_t *data = LookupMmappedFile(object->store_fd); physical_buf = std::make_shared( @@ -474,7 +629,7 @@ Status PlasmaClient::Impl::GetBuffers( object_buffers[i].device_num = object->device_num; // Increment the count of the number of instances of this object that this // client is using. Cache the reference to the object. - IncrementObjectCount(object_ids[i], object, true); + IncrementObjectCount(object_ids[i]); } } @@ -490,7 +645,6 @@ Status PlasmaClient::Impl::GetBuffers( RAY_RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaGetReply, &buffer)); std::vector received_object_ids(num_objects); std::vector object_data(num_objects); - PlasmaObject *object; std::vector store_fds; std::vector mmap_sizes; RAY_RETURN_NOT_OK(ReadGetReply(buffer.data(), @@ -511,9 +665,10 @@ Status PlasmaClient::Impl::GetBuffers( GetStoreFdAndMmap(store_fds[i], mmap_sizes[i]); } + std::unique_ptr object; for (int64_t i = 0; i < num_objects; ++i) { RAY_DCHECK(received_object_ids[i] == object_ids[i]); - object = &object_data[i]; + object = std::make_unique(object_data[i]); if (object_buffers[i].data) { // If the object was already in use by the client, then the store should // have returned it. @@ -525,24 +680,40 @@ Status PlasmaClient::Impl::GetBuffers( // If we are here, the object was not currently in use, so we need to // process the reply from the object store. if (object->data_size != -1) { + if (objects_in_use_.find(received_object_ids[i]) == objects_in_use_.end()) { + // Increment the count of the number of instances of this object that this + // client is using. Cache the reference to the object. + InsertObjectInUse(received_object_ids[i], std::move(object), /*is_sealed=*/true); + } else { + IncrementObjectCount(received_object_ids[i]); + } + auto &object_entry = objects_in_use_[received_object_ids[i]]; + + // Wait for the object to become ready to read. + if (object_entry->object.is_experimental_mutable_object) { + RAY_RETURN_NOT_OK(EnsureGetAcquired(object_entry)); + } std::shared_ptr physical_buf; - if (object->device_num == 0) { - uint8_t *data = LookupMmappedFile(object->store_fd); + RAY_LOG(DEBUG) << "Plasma Get " << received_object_ids[i] + << ", data size: " << object_entry->object.data_size + << ", metadata size: " << object_entry->object.metadata_size; + if (object_entry->object.device_num == 0) { + uint8_t *data = LookupMmappedFile(object_entry->object.store_fd); physical_buf = std::make_shared( - data + object->data_offset, object->data_size + object->metadata_size); + data + object_entry->object.data_offset, + object_entry->object.data_size + object_entry->object.metadata_size); } else { RAY_LOG(FATAL) << "Arrow GPU library is not enabled."; } // Finish filling out the return values. physical_buf = wrap_buffer(object_ids[i], physical_buf); object_buffers[i].data = - SharedMemoryBuffer::Slice(physical_buf, 0, object->data_size); - object_buffers[i].metadata = SharedMemoryBuffer::Slice( - physical_buf, object->data_size, object->metadata_size); - object_buffers[i].device_num = object->device_num; - // Increment the count of the number of instances of this object that this - // client is using. Cache the reference to the object. - IncrementObjectCount(received_object_ids[i], object, true); + SharedMemoryBuffer::Slice(physical_buf, 0, object_entry->object.data_size); + object_buffers[i].metadata = + SharedMemoryBuffer::Slice(physical_buf, + object_entry->object.data_size, + object_entry->object.metadata_size); + object_buffers[i].device_num = object_entry->object.device_num; } else { // The object was not retrieved. The caller can detect this condition // by checking the boolean value of the metadata/data buffers. @@ -569,6 +740,72 @@ Status PlasmaClient::Impl::Get(const std::vector &object_ids, &object_ids[0], num_objects, timeout_ms, wrap_buffer, &(*out)[0], is_from_worker); } +Status PlasmaClient::Impl::EnsureGetAcquired( + std::unique_ptr &object_entry) { +#ifndef _WIN32 + PlasmaObject *object = &object_entry->object; + RAY_CHECK(object->is_experimental_mutable_object); + auto plasma_header = GetPlasmaObjectHeader(*object); + if (object_entry->read_acquired) { + return Status::OK(); + } + + int64_t version_read = 0; + bool success = + plasma_header->ReadAcquire(object_entry->next_version_to_read, &version_read); + if (!success) { + return Status::Invalid( + "Reader missed a value. Are you sure there are num_readers many readers?"); + } + + object_entry->read_acquired = true; + RAY_CHECK(version_read > 0); + object_entry->next_version_to_read = version_read; + + // The data and metadata size may have changed, so update here before we + // create the Get buffer to return. + object_entry->object.data_size = plasma_header->data_size; + object_entry->object.metadata_size = plasma_header->metadata_size; + object_entry->object.metadata_offset = + object_entry->object.data_offset + object_entry->object.data_size; + RAY_CHECK(object_entry->object.data_size + object_entry->object.metadata_size <= + object_entry->object.allocated_size); +#endif + return Status::OK(); +} + +Status PlasmaClient::Impl::ExperimentalMutableObjectReadRelease( + const ObjectID &object_id) { +#ifndef _WIN32 + RAY_LOG(DEBUG) << "Try to release Get for object " << object_id; + std::unique_lock guard(client_mutex_); + + auto object_entry = objects_in_use_.find(object_id); + if (object_entry == objects_in_use_.end()) { + return Status::ObjectNotFound( + "ray.release() called on an object that is not in scope"); + } + + auto &entry = object_entry->second; + if (!entry->is_sealed) { + return Status::ObjectNotFound("ray.release() called on an object that is not sealed"); + } + if (!entry->object.is_experimental_mutable_object) { + return Status::ObjectNotFound( + "ray.release() called on an object that is not mutable"); + } + + RAY_RETURN_NOT_OK(EnsureGetAcquired(entry)); + RAY_LOG(DEBUG) << "Release shared object " << object_id; + auto plasma_header = GetPlasmaObjectHeader(entry->object); + plasma_header->ReadRelease(entry->next_version_to_read); + // The next read needs to read at least this version. + entry->next_version_to_read++; + entry->read_acquired = false; +#endif + return Status::OK(); +} + Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID &object_id) { auto object_entry = objects_in_use_.find(object_id); RAY_CHECK(object_entry != objects_in_use_.end()); @@ -589,9 +826,13 @@ Status PlasmaClient::Impl::Release(const ObjectID &object_id) { const auto object_entry = objects_in_use_.find(object_id); RAY_CHECK(object_entry != objects_in_use_.end()); - object_entry->second->count -= 1; - RAY_CHECK(object_entry->second->count >= 0); - // Check if the client is no longer using this object. + if (!object_entry->second->object.is_experimental_mutable_object) { + // Release only applies to immutable objects. + // TODO(swang): Add a delete call to properly clean up mutable objects. + object_entry->second->count -= 1; + RAY_CHECK(object_entry->second->count >= 0); + } + if (object_entry->second->count == 0) { // object_entry is invalidated in MarkObjectUnused, need to read the fd beforehand. MEMFD_TYPE fd = object_entry->second->object.store_fd; @@ -663,7 +904,8 @@ Status PlasmaClient::Impl::Seal(const ObjectID &object_id) { } object_entry->second->is_sealed = true; - /// Send the seal request to Plasma. + // Send the seal request to Plasma. This is the normal Seal path, used for + // immutable objects and the initial Create call for mutable objects. RAY_RETURN_NOT_OK(SendSealRequest(store_conn_, object_id)); std::vector buffer; RAY_RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaSealReply, &buffer)); @@ -675,7 +917,9 @@ Status PlasmaClient::Impl::Seal(const ObjectID &object_id) { // that are currently being used by this client. The corresponding increment // happened in plasma_create and was used to ensure that the object was not // released before the call to PlasmaClient::Seal. - return Release(object_id); + RAY_RETURN_NOT_OK(Release(object_id)); + + return Status::OK(); } Status PlasmaClient::Impl::Abort(const ObjectID &object_id) { @@ -804,8 +1048,24 @@ Status PlasmaClient::Connect(const std::string &store_socket_name, store_socket_name, manager_socket_name, release_delay, num_retries); } +Status PlasmaClient::ExperimentalMutableObjectWriteAcquire( + const ObjectID &object_id, + int64_t data_size, + const uint8_t *metadata, + int64_t metadata_size, + int64_t num_readers, + std::shared_ptr *data) { + return impl_->ExperimentalMutableObjectWriteAcquire( + object_id, data_size, metadata, metadata_size, num_readers, data); +} + +Status PlasmaClient::ExperimentalMutableObjectWriteRelease(const ObjectID &object_id) { + return impl_->ExperimentalMutableObjectWriteRelease(object_id); +} + Status PlasmaClient::CreateAndSpillIfNeeded(const ObjectID &object_id, const ray::rpc::Address &owner_address, + bool is_experimental_mutable_object, int64_t data_size, const uint8_t *metadata, int64_t metadata_size, @@ -814,6 +1074,7 @@ Status PlasmaClient::CreateAndSpillIfNeeded(const ObjectID &object_id, int device_num) { return impl_->CreateAndSpillIfNeeded(object_id, owner_address, + is_experimental_mutable_object, data_size, metadata, metadata_size, @@ -847,6 +1108,10 @@ Status PlasmaClient::Get(const std::vector &object_ids, return impl_->Get(object_ids, timeout_ms, object_buffers, is_from_worker); } +Status PlasmaClient::ExperimentalMutableObjectReadRelease(const ObjectID &object_id) { + return impl_->ExperimentalMutableObjectReadRelease(object_id); +} + Status PlasmaClient::Release(const ObjectID &object_id) { return impl_->Release(object_id); } diff --git a/src/ray/object_manager/plasma/client.h b/src/ray/object_manager/plasma/client.h index d466528ecd27..d50d1b8c5de9 100644 --- a/src/ray/object_manager/plasma/client.h +++ b/src/ray/object_manager/plasma/client.h @@ -82,6 +82,42 @@ class PlasmaClientInterface { std::vector *object_buffers, bool is_from_worker) = 0; + /// Experimental method for mutable objects. Acquires a write lock on the + /// object that prevents readers from reading until we are done writing. Does + /// not protect against concurrent writers. + /// + /// \param[in] object_id The ID of the object. + /// \param[in] data_size The size of the object to write. This overwrites the + /// current data size. + /// \param[in] metadata A pointer to the object metadata buffer to copy. This + /// will overwrite the current metadata. + /// \param[in] metadata_size The number of bytes to copy from the metadata + /// pointer. + /// \param[in] num_readers The number of readers that must read and release + /// the object before the caller can write again. + /// \param[out] data The mutable object buffer in plasma that can be written to. + virtual Status ExperimentalMutableObjectWriteAcquire(const ObjectID &object_id, + int64_t data_size, + const uint8_t *metadata, + int64_t metadata_size, + int64_t num_readers, + std::shared_ptr *data) = 0; + + /// Experimental method for mutable objects. Releases a write lock on the + /// object, allowing readers to read. This is the equivalent of "Seal" for + /// normal objects. + /// + /// \param[in] object_id The ID of the object. + virtual Status ExperimentalMutableObjectWriteRelease(const ObjectID &object_id) = 0; + + /// Experimental method for mutable objects. Releases the objects, allowing them + /// to be written again. If the caller did not previously Get the objects, + /// then this first blocks until the latest value is available to read, then + /// releases the value. + /// + /// \param[in] object_id The ID of the object. + virtual Status ExperimentalMutableObjectReadRelease(const ObjectID &object_id) = 0; + /// Seal an object in the object store. The object will be immutable after /// this /// call. @@ -127,6 +163,7 @@ class PlasmaClientInterface { /// be either sealed or aborted. virtual Status CreateAndSpillIfNeeded(const ObjectID &object_id, const ray::rpc::Address &owner_address, + bool is_mutable, int64_t data_size, const uint8_t *metadata, int64_t metadata_size, @@ -193,6 +230,7 @@ class PlasmaClient : public PlasmaClientInterface { /// be either sealed or aborted. Status CreateAndSpillIfNeeded(const ObjectID &object_id, const ray::rpc::Address &owner_address, + bool is_mutable, int64_t data_size, const uint8_t *metadata, int64_t metadata_size, @@ -200,6 +238,17 @@ class PlasmaClient : public PlasmaClientInterface { plasma::flatbuf::ObjectSource source, int device_num = 0); + Status ExperimentalMutableObjectWriteAcquire(const ObjectID &object_id, + int64_t data_size, + const uint8_t *metadata, + int64_t metadata_size, + int64_t num_readers, + std::shared_ptr *data); + + Status ExperimentalMutableObjectWriteRelease(const ObjectID &object_id); + + Status ExperimentalMutableObjectReadRelease(const ObjectID &object_id); + /// Create an object in the Plasma Store. Any metadata for this object must be /// be passed in when the object is created. /// diff --git a/src/ray/object_manager/plasma/common.h b/src/ray/object_manager/plasma/common.h index a4e8f8337372..6ffb86fdeb76 100644 --- a/src/ray/object_manager/plasma/common.h +++ b/src/ray/object_manager/plasma/common.h @@ -123,18 +123,34 @@ class LocalObject { const plasma::flatbuf::ObjectSource &GetSource() const { return source; } + ray::PlasmaObjectHeader *GetPlasmaObjectHeader() const { + RAY_CHECK(object_info.is_mutable) << "Object is not mutable"; + auto header_ptr = static_cast(allocation.address); + return reinterpret_cast(header_ptr); + } + void ToPlasmaObject(PlasmaObject *object, bool check_sealed) const { RAY_DCHECK(object != nullptr); if (check_sealed) { RAY_DCHECK(Sealed()); } object->store_fd = GetAllocation().fd; + object->header_offset = GetAllocation().offset; object->data_offset = GetAllocation().offset; object->metadata_offset = GetAllocation().offset + GetObjectInfo().data_size; + if (object_info.is_mutable) { + object->data_offset += sizeof(ray::PlasmaObjectHeader); + object->metadata_offset += sizeof(ray::PlasmaObjectHeader); + }; object->data_size = GetObjectInfo().data_size; object->metadata_size = GetObjectInfo().metadata_size; + // Senders and receivers of a channel may store different data and metadata + // sizes locally depending on what data is written to the channel, but the + // plasma store keeps the original data and metadata size. + object->allocated_size = object->data_size + object->metadata_size; object->device_num = GetAllocation().device_num; object->mmap_size = GetAllocation().mmap_size; + object->is_experimental_mutable_object = object_info.is_mutable; } private: diff --git a/src/ray/object_manager/plasma/object_store.cc b/src/ray/object_manager/plasma/object_store.cc index a36ad1d54906..4262a282f3fa 100644 --- a/src/ray/object_manager/plasma/object_store.cc +++ b/src/ray/object_manager/plasma/object_store.cc @@ -47,6 +47,12 @@ const LocalObject *ObjectStore::CreateObject(const ray::ObjectInfo &object_info, entry->construct_duration = -1; entry->source = source; + if (object_info.is_mutable) { + auto plasma_header = entry->GetPlasmaObjectHeader(); + *plasma_header = ray::PlasmaObjectHeader{}; + plasma_header->Init(); + } + RAY_LOG(DEBUG) << "create object " << object_info.object_id << " succeeded"; return entry; } @@ -74,6 +80,11 @@ bool ObjectStore::DeleteObject(const ObjectID &object_id) { if (entry == nullptr) { return false; } + if (entry->object_info.is_mutable) { + auto plasma_header = entry->GetPlasmaObjectHeader(); + plasma_header->Destroy(); + } + allocator_.Free(std::move(entry->allocation)); object_table_.erase(object_id); return true; diff --git a/src/ray/object_manager/plasma/plasma.fbs b/src/ray/object_manager/plasma/plasma.fbs index e5e7714aebc2..317e0aad4846 100644 --- a/src/ray/object_manager/plasma/plasma.fbs +++ b/src/ray/object_manager/plasma/plasma.fbs @@ -96,6 +96,8 @@ struct PlasmaObjectSpec { segment_index: int; // The unique id of the segment fd in case of fd reuse. unique_fd_id: long; + // The offset in bytes in the memory mapped file of the plasma object header. + header_offset: ulong; // The offset in bytes in the memory mapped file of the data. data_offset: ulong; // The size in bytes of the data. @@ -104,8 +106,15 @@ struct PlasmaObjectSpec { metadata_offset: ulong; // The size in bytes of the metadata. metadata_size: ulong; + // The allocated size. This is just data_size + metadata_size + // for immutable objects, but for mutable objects, the data size + // and metadata size may change. + allocated_size: ulong; // Device to create buffer on. device_num: int; + // Whether this is an experimental mutable object that can be written + // multiple times by a client. + is_experimental_mutable_object: bool; } table PlasmaGetDebugStringRequest { @@ -127,6 +136,8 @@ table PlasmaCreateRequest { owner_port: int; // Unique id for the owner worker. owner_worker_id: string; + // Whether the object will be mutable. + is_mutable: bool; // The size of the object's data in bytes. data_size: ulong; // The size of the object's metadata in bytes. diff --git a/src/ray/object_manager/plasma/plasma.h b/src/ray/object_manager/plasma/plasma.h index 0f8a00b06142..7b1367181ad3 100644 --- a/src/ray/object_manager/plasma/plasma.h +++ b/src/ray/object_manager/plasma/plasma.h @@ -37,6 +37,9 @@ struct PlasmaObject { /// a unique identifier of the file in the client to look up the corresponding /// file descriptor on the client's side. MEMFD_TYPE store_fd; + /// The offset in bytes in the memory mapped file of the plasma object + /// header. + ptrdiff_t header_offset; /// The offset in bytes in the memory mapped file of the data. ptrdiff_t data_offset; /// The offset in bytes in the memory mapped file of the metadata. @@ -45,10 +48,14 @@ struct PlasmaObject { int64_t data_size; /// The size in bytes of the metadata. int64_t metadata_size; + /// The size in bytes that was allocated. data_size + metadata_size must fit + /// within this. + int64_t allocated_size; /// Device number object is on. int device_num; /// Set if device_num is equal to 0. int64_t mmap_size; + bool is_experimental_mutable_object = false; bool operator==(const PlasmaObject &other) const { return ((store_fd == other.store_fd) && (data_offset == other.data_offset) && diff --git a/src/ray/object_manager/plasma/plasma_allocator.cc b/src/ray/object_manager/plasma/plasma_allocator.cc index 3737024ab416..06cdb20bf3d5 100644 --- a/src/ray/object_manager/plasma/plasma_allocator.cc +++ b/src/ray/object_manager/plasma/plasma_allocator.cc @@ -75,7 +75,7 @@ PlasmaAllocator::PlasmaAllocator(const std::string &plasma_directory, auto allocation = Allocate(kFootprintLimit - kDlMallocReserved); RAY_CHECK(allocation.has_value()) << "PlasmaAllocator initialization failed." - << " It's likely we don't have enought space in " << plasma_directory; + << " It's likely we don't have enough space in " << plasma_directory; // This will unmap the file, but the next one created will be as large // as this one (this is an implementation detail of dlmalloc). Free(std::move(allocation.value())); diff --git a/src/ray/object_manager/plasma/protocol.cc b/src/ray/object_manager/plasma/protocol.cc index 50595cde5370..771cb7087cd3 100644 --- a/src/ray/object_manager/plasma/protocol.cc +++ b/src/ray/object_manager/plasma/protocol.cc @@ -200,6 +200,7 @@ Status SendCreateRetryRequest(const std::shared_ptr &store_conn, Status SendCreateRequest(const std::shared_ptr &store_conn, ObjectID object_id, const ray::rpc::Address &owner_address, + bool is_experimental_mutable_object, int64_t data_size, int64_t metadata_size, flatbuf::ObjectSource source, @@ -213,6 +214,7 @@ Status SendCreateRequest(const std::shared_ptr &store_conn, fbb.CreateString(owner_address.ip_address()), owner_address.port(), fbb.CreateString(owner_address.worker_id()), + is_experimental_mutable_object, data_size, metadata_size, source, @@ -229,6 +231,7 @@ void ReadCreateRequest(uint8_t *data, RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); RAY_DCHECK(VerifyFlatbuffer(message, data, size)); + object_info->is_mutable = message->is_mutable(); object_info->data_size = message->data_size(); object_info->metadata_size = message->metadata_size(); object_info->object_id = ObjectID::FromBinary(message->object_id()->str()); @@ -260,11 +263,14 @@ Status SendCreateReply(const std::shared_ptr &client, flatbuffers::FlatBufferBuilder fbb; PlasmaObjectSpec plasma_object(FD2INT(object.store_fd.first), object.store_fd.second, + object.header_offset, object.data_offset, object.data_size, object.metadata_offset, object.metadata_size, - object.device_num); + object.allocated_size, + object.device_num, + object.is_experimental_mutable_object); auto object_string = fbb.CreateString(object_id.Binary()); fb::PlasmaCreateReplyBuilder crb(fbb); crb.add_error(static_cast(error_code)); @@ -300,10 +306,14 @@ Status ReadCreateReply(uint8_t *data, object->store_fd.first = INT2FD(message->plasma_object()->segment_index()); object->store_fd.second = message->plasma_object()->unique_fd_id(); + object->header_offset = message->plasma_object()->header_offset(); object->data_offset = message->plasma_object()->data_offset(); object->data_size = message->plasma_object()->data_size(); object->metadata_offset = message->plasma_object()->metadata_offset(); object->metadata_size = message->plasma_object()->metadata_size(); + object->allocated_size = message->plasma_object()->allocated_size(); + object->is_experimental_mutable_object = + message->plasma_object()->is_experimental_mutable_object(); store_fd->first = INT2FD(message->store_fd()); store_fd->second = message->unique_fd_id(); @@ -614,11 +624,14 @@ Status SendGetReply(const std::shared_ptr &client, << " metadata_size: " << object.metadata_size; objects.push_back(PlasmaObjectSpec(FD2INT(object.store_fd.first), object.store_fd.second, + object.header_offset, object.data_offset, object.data_size, object.metadata_offset, object.metadata_size, - object.device_num)); + object.allocated_size, + object.device_num, + object.is_experimental_mutable_object)); } std::vector store_fds_as_int; std::vector unique_fd_ids; @@ -654,11 +667,15 @@ Status ReadGetReply(uint8_t *data, const PlasmaObjectSpec *object = message->plasma_objects()->Get(i); plasma_objects[i].store_fd.first = INT2FD(object->segment_index()); plasma_objects[i].store_fd.second = object->unique_fd_id(); + plasma_objects[i].header_offset = object->header_offset(); plasma_objects[i].data_offset = object->data_offset(); plasma_objects[i].data_size = object->data_size(); plasma_objects[i].metadata_offset = object->metadata_offset(); plasma_objects[i].metadata_size = object->metadata_size(); + plasma_objects[i].allocated_size = object->allocated_size(); plasma_objects[i].device_num = object->device_num(); + plasma_objects[i].is_experimental_mutable_object = + object->is_experimental_mutable_object(); } RAY_CHECK(message->store_fds()->size() == message->mmap_sizes()->size()); for (uoffset_t i = 0; i < message->store_fds()->size(); i++) { diff --git a/src/ray/object_manager/plasma/protocol.h b/src/ray/object_manager/plasma/protocol.h index 7f4fcdd3ac58..23a120ac2ca0 100644 --- a/src/ray/object_manager/plasma/protocol.h +++ b/src/ray/object_manager/plasma/protocol.h @@ -85,6 +85,7 @@ Status SendCreateRetryRequest(const std::shared_ptr &store_conn, Status SendCreateRequest(const std::shared_ptr &store_conn, ObjectID object_id, const ray::rpc::Address &owner_address, + bool is_mutable, int64_t data_size, int64_t metadata_size, flatbuf::ObjectSource source, diff --git a/src/ray/object_manager/test/object_buffer_pool_test.cc b/src/ray/object_manager/test/object_buffer_pool_test.cc index 1ae4602f06ac..249f8bda3b3a 100644 --- a/src/ray/object_manager/test/object_buffer_pool_test.cc +++ b/src/ray/object_manager/test/object_buffer_pool_test.cc @@ -51,8 +51,23 @@ class MockPlasmaClient : public plasma::PlasmaClientInterface { MOCK_METHOD1(Abort, ray::Status(const ObjectID &object_id)); + MOCK_METHOD6(ExperimentalMutableObjectWriteAcquire, + ray::Status(const ObjectID &object_id, + int64_t data_size, + const uint8_t *metadata, + int64_t metadata_size, + int64_t num_readers, + std::shared_ptr *data)); + + MOCK_METHOD1(ExperimentalMutableObjectWriteRelease, + ray::Status(const ObjectID &object_id)); + + MOCK_METHOD1(ExperimentalMutableObjectReadRelease, + ray::Status(const ObjectID &object_id)); + ray::Status CreateAndSpillIfNeeded(const ObjectID &object_id, const ray::rpc::Address &owner_address, + bool is_experimental_mutable_object, int64_t data_size, const uint8_t *metadata, int64_t metadata_size,