Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support mutable plasma objects #41515

Merged
merged 34 commits into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
12b977d
initial commit
stephanie-wang Nov 29, 2023
1c935b9
Add special calls for create and put mutable objects
stephanie-wang Nov 29, 2023
c2dbf1f
feature flag for shared mem seal, only acquire once per ray.get
stephanie-wang Nov 30, 2023
6d4aa94
put-get
stephanie-wang Nov 30, 2023
bc4f1e9
rm shared mem seal
stephanie-wang Nov 30, 2023
c4a2378
fix num_readers on first version, unit tests pass now
stephanie-wang Nov 30, 2023
e40d3c8
mutable object -> channel
stephanie-wang Nov 30, 2023
b79b7d1
micro
stephanie-wang Nov 30, 2023
5ea0fe3
support different metadata
stephanie-wang Dec 1, 2023
cbe257f
better error message
stephanie-wang Dec 1, 2023
a68cefd
cleanup
stephanie-wang Dec 1, 2023
ea57894
Test for errors, better error handling when too many readers
stephanie-wang Dec 2, 2023
5bbf379
remove unneeded
stephanie-wang Dec 2, 2023
1e16e09
java build
stephanie-wang Dec 2, 2023
580b3ad
rename
stephanie-wang Dec 2, 2023
fe11cc3
test metadata change in remote reader
stephanie-wang Dec 2, 2023
e11b614
build
stephanie-wang Dec 4, 2023
99a38c2
fix
stephanie-wang Dec 5, 2023
204bb9b
fix
stephanie-wang Dec 6, 2023
4703f34
compile?
stephanie-wang Dec 6, 2023
420bd1c
build
stephanie-wang Dec 6, 2023
4cabbc5
x
stephanie-wang Dec 6, 2023
b44ef8a
fix
stephanie-wang Dec 6, 2023
881d5ff
copyright
stephanie-wang Dec 6, 2023
ef2cfb7
test
stephanie-wang Dec 6, 2023
ca22a63
Merge remote-tracking branch 'upstream/master' into mutable-objects-2
stephanie-wang Dec 6, 2023
dbbb3d6
Only allocate PlasmaObjectHeader if is_mutable=true
stephanie-wang Dec 7, 2023
9078776
Only call Read/Write Acquire/Release if is_mutable=true
stephanie-wang Dec 7, 2023
2e677c3
x
stephanie-wang Dec 7, 2023
f06b543
cpp test
stephanie-wang Dec 7, 2023
4dfa31e
skip tests on windows
stephanie-wang Dec 7, 2023
126296f
Merge remote-tracking branch 'upstream/master' into mutable-objects-2
stephanie-wang Dec 7, 2023
03f4fbd
larger CI machine
stephanie-wang Dec 8, 2023
3e7dfa2
Merge branch 'master' into mutable-objects-2
stephanie-wang Dec 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
cleanup
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
  • Loading branch information
stephanie-wang committed Dec 1, 2023
commit a68cefd2e974f1e3b56f79e4bcb36a628e25c155
3 changes: 0 additions & 3 deletions python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ def _configure_system():
WORKER_MODE,
RESTORE_WORKER_MODE,
SPILL_WORKER_MODE,
_create_channel,
_write_channel,
_end_read_channel,
cancel,
get,
get_actor,
Expand Down
22 changes: 12 additions & 10 deletions python/ray/_private/ray_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import multiprocessing
import ray

import ray.experimental.channel as ray_channel

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -296,11 +298,11 @@ def async_actor_multi():

def put_channel_small(chans, num_readers=1, do_get=False, do_release=False):
for chan in chans:
ray._write_channel(b"0", chan, num_readers=num_readers)
chan.write(b"0", num_readers=num_readers)
if do_get:
ray.get(chan)
chan.begin_read()
if do_release:
ray._end_read_channel(chan)
chan.end_read()

@ray.remote
class ChannelReader:
Expand All @@ -310,10 +312,10 @@ def ready(self):
def read(self, chans):
while True:
for chan in chans:
ray.get(chan)
ray._end_read_channel(chan)
chan.begin_read()
chan.end_read()

chans = [ray._create_channel(1000)]
chans = [ray_channel.Channel(1000)]
results += timeit(
"local put, single channel calls",
lambda: put_channel_small(chans, do_release=True),
Expand All @@ -323,7 +325,7 @@ def read(self, chans):
lambda: put_channel_small(chans, do_get=True, do_release=True),
)

chans = [ray._create_channel(1000)]
chans = [ray_channel.Channel(1000)]
reader = ChannelReader.remote()
ray.get(reader.ready.remote())
reader.read.remote(chans)
Expand All @@ -335,7 +337,7 @@ def read(self, chans):
n_cpu = multiprocessing.cpu_count() // 2
print(f"Testing multiple readers/channels, n={n_cpu}")

chans = [ray._create_channel(1000)]
chans = [ray_channel.Channel(1000)]
readers = [ChannelReader.remote() for _ in range(n_cpu)]
ray.get([reader.ready.remote() for reader in readers])
for reader in readers:
Expand All @@ -347,7 +349,7 @@ def read(self, chans):
for reader in readers:
ray.kill(reader)

chans = [ray._create_channel(1000) for _ in range(n_cpu)]
chans = [ray_channel.Channel(1000) for _ in range(n_cpu)]
reader = ChannelReader.remote()
ray.get(reader.ready.remote())
reader.read.remote(chans)
Expand All @@ -356,7 +358,7 @@ def read(self, chans):
)
ray.kill(reader)

chans = [ray._create_channel(1000) for _ in range(n_cpu)]
chans = [ray_channel.Channel(1000) for _ in range(n_cpu)]
readers = [ChannelReader.remote() for _ in range(n_cpu)]
ray.get([reader.ready.remote() for reader in readers])
for chan, reader in zip(chans, readers):
Expand Down
91 changes: 27 additions & 64 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,13 @@ def set_mode(self, mode):
def set_load_code_from_local(self, load_code_from_local):
self._load_code_from_local = load_code_from_local

def put_object(self, value, object_ref=None, owner_address=None, is_mutable=False):
def put_object(
self,
value: Any,
object_ref: Optional["ray.ObjectRef"] = None,
owner_address: Optional[str] = None,
_is_experimental_mutable_object: bool = False,
):
"""Put value in the local object store with object reference `object_ref`.

This assumes that the value for `object_ref` has not yet been placed in
Expand All @@ -703,6 +709,10 @@ def put_object(self, value, object_ref=None, owner_address=None, is_mutable=Fals
object_ref: The object ref of the value to be
put. If None, one will be generated.
owner_address: The serialized address of object's owner.
_is_experimental_mutable_object: An experimental flag for mutable
objects. If True, then the returned object will not have a
valid value. The object must be written to using the
ray.experimental.channel API before readers can read.

Returns:
ObjectRef: The object ref the object was put under.
Expand Down Expand Up @@ -739,7 +749,7 @@ def put_object(self, value, object_ref=None, owner_address=None, is_mutable=Fals

# If the object is mutable, then the raylet should never read the
# object. Instead, clients will keep the object pinned.
pin_object = not is_mutable
pin_object = not _is_experimental_mutable_object

# This *must* be the first place that we construct this python
# ObjectRef because an entry with 0 local references is created when
Expand All @@ -753,7 +763,7 @@ def put_object(self, value, object_ref=None, owner_address=None, is_mutable=Fals
object_ref=object_ref,
pin_object=pin_object,
owner_address=owner_address,
is_mutable=is_mutable,
_is_experimental_mutable_object=_is_experimental_mutable_object,
),
# The initial local reference is already acquired internally.
skip_adding_local_ref=True,
Expand All @@ -775,7 +785,12 @@ def deserialize_objects(self, data_metadata_pairs, object_refs):
context = self.get_serialization_context()
return context.deserialize_objects(data_metadata_pairs, object_refs)

def get_objects(self, object_refs: list, timeout: Optional[float] = None):
def get_objects(
self,
object_refs: list,
timeout: Optional[float] = None,
_is_experimental_mutable_object: bool = False,
):
"""Get the values in the object store associated with the IDs.

Return the values from the local object store for object_refs. This
Expand All @@ -791,6 +806,10 @@ def get_objects(self, object_refs: list, timeout: Optional[float] = None):
list: List of deserialized objects
bytes: UUID of the debugger breakpoint we should drop
into or b"" if there is no breakpoint.
_is_experimental_mutable_object: An experimental flag for mutable
objects. If True, then wait until there is a value available to
read. The object must also already be local, or else the get
call will hang.
"""
# Make sure that the values are object refs.
for object_ref in object_refs:
Expand All @@ -802,7 +821,10 @@ def get_objects(self, object_refs: list, timeout: Optional[float] = None):

timeout_ms = int(timeout * 1000) if timeout is not None else -1
data_metadata_pairs = self.core_worker.get_objects(
object_refs, self.current_task_id, timeout_ms
object_refs,
self.current_task_id,
timeout_ms,
_is_experimental_mutable_object,
)
debugger_breakpoint = b""
for data, metadata in data_metadata_pairs:
Expand Down Expand Up @@ -2498,20 +2520,6 @@ def show_in_dashboard(message: str, key: str = "", dtype: str = "text"):
blocking_get_inside_async_warned = False


def _end_read_channel(object_refs):
"""
Signal to the writer that the channel is ready to write again. The read
begins when the caller calls ray.get and a written value is available. If
ray.get is not called first, then this call will block until a value is
written, then drop the value.
"""
worker = global_worker
worker.check_connected()
if isinstance(object_refs, ObjectRef):
object_refs = [object_refs]
worker.core_worker.get_release(object_refs)


@overload
def get(
object_refs: "Sequence[ObjectRef[Any]]", *, timeout: Optional[float] = None
Expand Down Expand Up @@ -2643,51 +2651,6 @@ def get(
return values


@PublicAPI
def _write_channel(value: Any, object_ref: ObjectRef, num_readers: int):
worker = global_worker
worker.check_connected()

if num_readers <= 0:
raise ValueError("``num_readers`` must be a positive integer.")

try:
serialized_value = worker.get_serialization_context().serialize(value)
except TypeError as e:
sio = io.StringIO()
ray.util.inspect_serializability(value, print_file=sio)
msg = (
"Could not serialize the put value " f"{repr(value)}:\n" f"{sio.getvalue()}"
)
raise TypeError(msg) from e

worker.core_worker.put_serialized_object_to_mutable_plasma_object(
serialized_value,
object_ref,
num_readers,
)


@PublicAPI
def _create_channel(
buffer_size: int,
) -> "ray.ObjectRef":
worker = global_worker
worker.check_connected()

value = b"0" * buffer_size

try:
object_ref = worker.put_object(value, owner_address=None, is_mutable=True)
except ObjectStoreFullError:
logger.info(
"Put failed since the value was either too large or the "
"store was full of pinned objects."
)
raise
return object_ref


@PublicAPI
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
@client_mode_hook
def put(
Expand Down
2 changes: 1 addition & 1 deletion python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ cdef class CoreWorker:
c_bool created_by_worker,
owner_address=*,
c_bool inline_small_object=*,
c_bool is_mutable=*)
c_bool is_experimental_mutable_object=*)
cdef unique_ptr[CAddress] _convert_python_address(self, address=*)
cdef store_task_output(
self, serialized_object,
Expand Down
73 changes: 43 additions & 30 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3324,25 +3324,19 @@ cdef class CoreWorker:
return self.plasma_event_handler

def get_objects(self, object_refs, TaskID current_task_id,
int64_t timeout_ms=-1):
int64_t timeout_ms=-1,
c_bool _is_experimental_mutable_object=False):
cdef:
c_vector[shared_ptr[CRayObject]] results
CTaskID c_task_id = current_task_id.native()
c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs)
with nogil:
op_status = CCoreWorkerProcess.GetCoreWorker().Get(
c_object_ids, timeout_ms, &results)
c_object_ids, timeout_ms, _is_experimental_mutable_object, &results)
check_status(op_status)

return RayObjectsToDataMetadataPairs(results)

def get_release(self, object_refs):
cdef:
c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs)
with nogil:
op_status = CCoreWorkerProcess.GetCoreWorker().GetRelease(c_object_ids)
check_status(op_status)

def get_if_local(self, object_refs):
"""Get objects from local plasma store directly
without a fetch request to raylet."""
Expand Down Expand Up @@ -3374,7 +3368,7 @@ cdef class CoreWorker:
c_bool created_by_worker,
owner_address=None,
c_bool inline_small_object=True,
c_bool is_mutable=False,
c_bool is_experimental_mutable_object=False,
):
cdef:
unique_ptr[CAddress] c_owner_address
Expand All @@ -3385,7 +3379,8 @@ cdef class CoreWorker:
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker()
.CreateOwnedAndIncrementLocalRef(
is_mutable, metadata, data_size, contained_ids,
is_experimental_mutable_object, metadata,
data_size, contained_ids,
c_object_id, data, created_by_worker,
move(c_owner_address),
inline_small_object))
Expand Down Expand Up @@ -3474,24 +3469,25 @@ cdef class CoreWorker:
generator_id=CObjectID.Nil(),
owner_address=c_owner_address))

def put_serialized_object_to_mutable_plasma_object(self, serialized_object,
ObjectRef object_ref,
num_readers,
):
def experimental_mutable_object_put_serialized(self, serialized_object,
ObjectRef object_ref,
num_readers,
):
cdef:
CObjectID c_object_id = object_ref.native()
shared_ptr[CBuffer] data
unique_ptr[CAddress] null_owner_address

metadata = string_to_buffer(serialized_object.metadata)
data_size = serialized_object.total_bytes
check_status(CCoreWorkerProcess.GetCoreWorker().WriteAcquireMutableObject(
c_object_id,
metadata,
data_size,
num_readers,
&data,
))
check_status(CCoreWorkerProcess.GetCoreWorker()
.ExperimentalMutableObjectWriteAcquire(
c_object_id,
metadata,
data_size,
num_readers,
&data,
))
if data_size > 0:
(<SerializedObject>serialized_object).write_to(
Buffer.make(data))
Expand All @@ -3501,13 +3497,30 @@ cdef class CoreWorker:
generator_id=CObjectID.Nil(),
owner_address=null_owner_address))

def put_serialized_object_and_increment_local_ref(self, serialized_object,
ObjectRef object_ref=None,
c_bool pin_object=True,
owner_address=None,
c_bool inline_small_object=True,
c_bool is_mutable=False,
):
def experimental_mutable_object_read_release(self, object_refs):
"""
For experimental.channel.Channel.

Signal to the writer that the channel is ready to write again. The read
began when the caller calls ray.get and a written value is available. If
ray.get is not called first, then this call will block until a value is
written, then drop the value.
"""
cdef:
c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs)
with nogil:
op_status = (CCoreWorkerProcess.GetCoreWorker()
.ExperimentalMutableObjectReadRelease(c_object_ids))
check_status(op_status)

def put_serialized_object_and_increment_local_ref(
self, serialized_object,
ObjectRef object_ref=None,
c_bool pin_object=True,
owner_address=None,
c_bool inline_small_object=True,
c_bool _is_experimental_mutable_object=False,
):
cdef:
CObjectID c_object_id
shared_ptr[CBuffer] data
Expand All @@ -3524,7 +3537,7 @@ cdef class CoreWorker:
metadata, total_bytes, object_ref,
contained_object_ids,
&c_object_id, &data, True, owner_address, inline_small_object,
is_mutable)
_is_experimental_mutable_object)

logger.debug(
f"Serialized object size of {c_object_id.Hex()} is {total_bytes} bytes")
Expand Down
Loading
Loading