Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support mutable plasma objects #41515

Merged
merged 34 commits into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
12b977d
initial commit
stephanie-wang Nov 29, 2023
1c935b9
Add special calls for create and put mutable objects
stephanie-wang Nov 29, 2023
c2dbf1f
feature flag for shared mem seal, only acquire once per ray.get
stephanie-wang Nov 30, 2023
6d4aa94
put-get
stephanie-wang Nov 30, 2023
bc4f1e9
rm shared mem seal
stephanie-wang Nov 30, 2023
c4a2378
fix num_readers on first version, unit tests pass now
stephanie-wang Nov 30, 2023
e40d3c8
mutable object -> channel
stephanie-wang Nov 30, 2023
b79b7d1
micro
stephanie-wang Nov 30, 2023
5ea0fe3
support different metadata
stephanie-wang Dec 1, 2023
cbe257f
better error message
stephanie-wang Dec 1, 2023
a68cefd
cleanup
stephanie-wang Dec 1, 2023
ea57894
Test for errors, better error handling when too many readers
stephanie-wang Dec 2, 2023
5bbf379
remove unneeded
stephanie-wang Dec 2, 2023
1e16e09
java build
stephanie-wang Dec 2, 2023
580b3ad
rename
stephanie-wang Dec 2, 2023
fe11cc3
test metadata change in remote reader
stephanie-wang Dec 2, 2023
e11b614
build
stephanie-wang Dec 4, 2023
99a38c2
fix
stephanie-wang Dec 5, 2023
204bb9b
fix
stephanie-wang Dec 6, 2023
4703f34
compile?
stephanie-wang Dec 6, 2023
420bd1c
build
stephanie-wang Dec 6, 2023
4cabbc5
x
stephanie-wang Dec 6, 2023
b44ef8a
fix
stephanie-wang Dec 6, 2023
881d5ff
copyright
stephanie-wang Dec 6, 2023
ef2cfb7
test
stephanie-wang Dec 6, 2023
ca22a63
Merge remote-tracking branch 'upstream/master' into mutable-objects-2
stephanie-wang Dec 6, 2023
dbbb3d6
Only allocate PlasmaObjectHeader if is_mutable=true
stephanie-wang Dec 7, 2023
9078776
Only call Read/Write Acquire/Release if is_mutable=true
stephanie-wang Dec 7, 2023
2e677c3
x
stephanie-wang Dec 7, 2023
f06b543
cpp test
stephanie-wang Dec 7, 2023
4dfa31e
skip tests on windows
stephanie-wang Dec 7, 2023
126296f
Merge remote-tracking branch 'upstream/master' into mutable-objects-2
stephanie-wang Dec 7, 2023
03f4fbd
larger CI machine
stephanie-wang Dec 8, 2023
3e7dfa2
Merge branch 'master' into mutable-objects-2
stephanie-wang Dec 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ PLASMA_LINKOPTS = [] + select({
ray_cc_library(
name = "plasma_client",
srcs = [
"src/ray/object_manager/common.cc",
"src/ray/object_manager/plasma/client.cc",
"src/ray/object_manager/plasma/connection.cc",
"src/ray/object_manager/plasma/malloc.cc",
Expand Down
87 changes: 87 additions & 0 deletions python/ray/_private/ray_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import multiprocessing
import ray

import ray.experimental.channel as ray_channel

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -288,6 +290,91 @@ def async_actor_multi():
results += timeit("n:n async-actor calls async", async_actor_multi, m * n)
ray.shutdown()

#################################################
# Perf tests for channels, used in compiled DAGs.
#################################################

ray.init()

def put_channel_small(chans, do_get=False, do_release=False):
for chan in chans:
chan.write(b"0")
if do_get:
chan.begin_read()
if do_release:
chan.end_read()

@ray.remote
class ChannelReader:
def ready(self):
return

def read(self, chans):
while True:
for chan in chans:
chan.begin_read()
chan.end_read()

chans = [ray_channel.Channel(1000)]
results += timeit(
"local put, single channel calls",
lambda: put_channel_small(chans, do_release=True),
)
results += timeit(
"local put:local get, single channel calls",
lambda: put_channel_small(chans, do_get=True, do_release=True),
)

chans = [ray_channel.Channel(1000)]
reader = ChannelReader.remote()
ray.get(reader.ready.remote())
reader.read.remote(chans)
results += timeit(
"local put:1 remote get, single channel calls", lambda: put_channel_small(chans)
)
ray.kill(reader)

n_cpu = multiprocessing.cpu_count() // 2
print(f"Testing multiple readers/channels, n={n_cpu}")

chans = [ray_channel.Channel(1000, num_readers=n_cpu)]
readers = [ChannelReader.remote() for _ in range(n_cpu)]
ray.get([reader.ready.remote() for reader in readers])
for reader in readers:
reader.read.remote(chans)
results += timeit(
"local put:n remote get, single channel calls",
lambda: put_channel_small(chans),
)
for reader in readers:
ray.kill(reader)

chans = [ray_channel.Channel(1000) for _ in range(n_cpu)]
reader = ChannelReader.remote()
ray.get(reader.ready.remote())
reader.read.remote(chans)
results += timeit(
"local put:1 remote get, n channels calls", lambda: put_channel_small(chans)
)
ray.kill(reader)

chans = [ray_channel.Channel(1000) for _ in range(n_cpu)]
readers = [ChannelReader.remote() for _ in range(n_cpu)]
ray.get([reader.ready.remote() for reader in readers])
for chan, reader in zip(chans, readers):
reader.read.remote([chan])
results += timeit(
"local put:n remote get, n channels calls", lambda: put_channel_small(chans)
)
for reader in readers:
ray.kill(reader)

ray.shutdown()

############################
# End of channel perf tests.
############################

NUM_PGS = 100
NUM_BUNDLES = 1
ray.init(resources={"custom": 100})
Expand Down
43 changes: 38 additions & 5 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,13 @@ def set_mode(self, mode):
def set_load_code_from_local(self, load_code_from_local):
self._load_code_from_local = load_code_from_local

def put_object(self, value, object_ref=None, owner_address=None):
def put_object(
self,
value: Any,
object_ref: Optional["ray.ObjectRef"] = None,
owner_address: Optional[str] = None,
_is_experimental_mutable_object: bool = False,
):
"""Put value in the local object store with object reference `object_ref`.

This assumes that the value for `object_ref` has not yet been placed in
Expand All @@ -703,6 +709,10 @@ def put_object(self, value, object_ref=None, owner_address=None):
object_ref: The object ref of the value to be
put. If None, one will be generated.
owner_address: The serialized address of object's owner.
_is_experimental_mutable_object: An experimental flag for mutable
objects. If True, then the returned object will not have a
valid value. The object must be written to using the
ray.experimental.channel API before readers can read.

Returns:
ObjectRef: The object ref the object was put under.
Expand Down Expand Up @@ -736,6 +746,11 @@ def put_object(self, value, object_ref=None, owner_address=None):
f"{sio.getvalue()}"
)
raise TypeError(msg) from e

# If the object is mutable, then the raylet should never read the
# object. Instead, clients will keep the object pinned.
pin_object = not _is_experimental_mutable_object

# This *must* be the first place that we construct this python
# ObjectRef because an entry with 0 local references is created when
# the object is Put() in the core worker, expecting that this python
Expand All @@ -744,7 +759,11 @@ def put_object(self, value, object_ref=None, owner_address=None):
# reference counter.
return ray.ObjectRef(
self.core_worker.put_serialized_object_and_increment_local_ref(
serialized_value, object_ref=object_ref, owner_address=owner_address
serialized_value,
object_ref=object_ref,
pin_object=pin_object,
owner_address=owner_address,
_is_experimental_mutable_object=_is_experimental_mutable_object,
),
# The initial local reference is already acquired internally.
skip_adding_local_ref=True,
Expand All @@ -766,7 +785,12 @@ def deserialize_objects(self, data_metadata_pairs, object_refs):
context = self.get_serialization_context()
return context.deserialize_objects(data_metadata_pairs, object_refs)

def get_objects(self, object_refs: list, timeout: Optional[float] = None):
def get_objects(
self,
object_refs: list,
timeout: Optional[float] = None,
_is_experimental_mutable_object: bool = False,
):
"""Get the values in the object store associated with the IDs.

Return the values from the local object store for object_refs. This
Expand All @@ -782,6 +806,10 @@ def get_objects(self, object_refs: list, timeout: Optional[float] = None):
list: List of deserialized objects
bytes: UUID of the debugger breakpoint we should drop
into or b"" if there is no breakpoint.
_is_experimental_mutable_object: An experimental flag for mutable
objects. If True, then wait until there is a value available to
read. The object must also already be local, or else the get
call will hang.
"""
# Make sure that the values are object refs.
for object_ref in object_refs:
Expand All @@ -793,7 +821,10 @@ def get_objects(self, object_refs: list, timeout: Optional[float] = None):

timeout_ms = int(timeout * 1000) if timeout is not None else -1
data_metadata_pairs = self.core_worker.get_objects(
object_refs, self.current_task_id, timeout_ms
object_refs,
self.current_task_id,
timeout_ms,
_is_experimental_mutable_object,
)
debugger_breakpoint = b""
for data, metadata in data_metadata_pairs:
Expand Down Expand Up @@ -2623,7 +2654,9 @@ def get(
@PublicAPI
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
@client_mode_hook
def put(
value: Any, *, _owner: Optional["ray.actor.ActorHandle"] = None
value: Any,
*,
_owner: Optional["ray.actor.ActorHandle"] = None,
) -> "ray.ObjectRef":
"""Store an object in the object store.

Expand Down
3 changes: 2 additions & 1 deletion python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ cdef class CoreWorker:
CObjectID *c_object_id, shared_ptr[CBuffer] *data,
c_bool created_by_worker,
owner_address=*,
c_bool inline_small_object=*)
c_bool inline_small_object=*,
c_bool is_experimental_mutable_object=*)
cdef unique_ptr[CAddress] _convert_python_address(self, address=*)
cdef store_task_output(
self, serialized_object,
Expand Down
72 changes: 62 additions & 10 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3324,14 +3324,15 @@ cdef class CoreWorker:
return self.plasma_event_handler

def get_objects(self, object_refs, TaskID current_task_id,
int64_t timeout_ms=-1):
int64_t timeout_ms=-1,
c_bool _is_experimental_mutable_object=False):
cdef:
c_vector[shared_ptr[CRayObject]] results
CTaskID c_task_id = current_task_id.native()
c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs)
with nogil:
op_status = CCoreWorkerProcess.GetCoreWorker().Get(
c_object_ids, timeout_ms, &results)
c_object_ids, timeout_ms, _is_experimental_mutable_object, &results)
check_status(op_status)

return RayObjectsToDataMetadataPairs(results)
Expand Down Expand Up @@ -3366,7 +3367,9 @@ cdef class CoreWorker:
CObjectID *c_object_id, shared_ptr[CBuffer] *data,
c_bool created_by_worker,
owner_address=None,
c_bool inline_small_object=True):
c_bool inline_small_object=True,
c_bool is_experimental_mutable_object=False,
):
cdef:
unique_ptr[CAddress] c_owner_address

Expand All @@ -3376,7 +3379,8 @@ cdef class CoreWorker:
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker()
.CreateOwnedAndIncrementLocalRef(
metadata, data_size, contained_ids,
is_experimental_mutable_object, metadata,
data_size, contained_ids,
c_object_id, data, created_by_worker,
move(c_owner_address),
inline_small_object))
Expand Down Expand Up @@ -3465,11 +3469,58 @@ cdef class CoreWorker:
generator_id=CObjectID.Nil(),
owner_address=c_owner_address))

def put_serialized_object_and_increment_local_ref(self, serialized_object,
ObjectRef object_ref=None,
c_bool pin_object=True,
owner_address=None,
c_bool inline_small_object=True):
def experimental_mutable_object_put_serialized(self, serialized_object,
ObjectRef object_ref,
num_readers,
):
cdef:
CObjectID c_object_id = object_ref.native()
shared_ptr[CBuffer] data
unique_ptr[CAddress] null_owner_address

metadata = string_to_buffer(serialized_object.metadata)
data_size = serialized_object.total_bytes
check_status(CCoreWorkerProcess.GetCoreWorker()
.ExperimentalMutableObjectWriteAcquire(
c_object_id,
metadata,
data_size,
num_readers,
&data,
))
if data_size > 0:
(<SerializedObject>serialized_object).write_to(
Buffer.make(data))
check_status(
CCoreWorkerProcess.GetCoreWorker().SealExisting(
c_object_id, pin_object=False,
generator_id=CObjectID.Nil(),
owner_address=null_owner_address))

def experimental_mutable_object_read_release(self, object_refs):
"""
For experimental.channel.Channel.

Signal to the writer that the channel is ready to write again. The read
began when the caller calls ray.get and a written value is available. If
ray.get is not called first, then this call will block until a value is
written, then drop the value.
"""
cdef:
c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs)
with nogil:
op_status = (CCoreWorkerProcess.GetCoreWorker()
.ExperimentalMutableObjectReadRelease(c_object_ids))
check_status(op_status)

def put_serialized_object_and_increment_local_ref(
self, serialized_object,
ObjectRef object_ref=None,
c_bool pin_object=True,
owner_address=None,
c_bool inline_small_object=True,
c_bool _is_experimental_mutable_object=False,
):
cdef:
CObjectID c_object_id
shared_ptr[CBuffer] data
Expand All @@ -3485,7 +3536,8 @@ cdef class CoreWorker:
object_already_exists = self._create_put_buffer(
metadata, total_bytes, object_ref,
contained_object_ids,
&c_object_id, &data, True, owner_address, inline_small_object)
&c_object_id, &data, True, owner_address, inline_small_object,
_is_experimental_mutable_object)

logger.debug(
f"Serialized object size of {c_object_id.Hex()} is {total_bytes} bytes")
Expand Down
Loading
Loading