Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support mutable plasma objects #41515

Merged
merged 34 commits into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
12b977d
initial commit
stephanie-wang Nov 29, 2023
1c935b9
Add special calls for create and put mutable objects
stephanie-wang Nov 29, 2023
c2dbf1f
feature flag for shared mem seal, only acquire once per ray.get
stephanie-wang Nov 30, 2023
6d4aa94
put-get
stephanie-wang Nov 30, 2023
bc4f1e9
rm shared mem seal
stephanie-wang Nov 30, 2023
c4a2378
fix num_readers on first version, unit tests pass now
stephanie-wang Nov 30, 2023
e40d3c8
mutable object -> channel
stephanie-wang Nov 30, 2023
b79b7d1
micro
stephanie-wang Nov 30, 2023
5ea0fe3
support different metadata
stephanie-wang Dec 1, 2023
cbe257f
better error message
stephanie-wang Dec 1, 2023
a68cefd
cleanup
stephanie-wang Dec 1, 2023
ea57894
Test for errors, better error handling when too many readers
stephanie-wang Dec 2, 2023
5bbf379
remove unneeded
stephanie-wang Dec 2, 2023
1e16e09
java build
stephanie-wang Dec 2, 2023
580b3ad
rename
stephanie-wang Dec 2, 2023
fe11cc3
test metadata change in remote reader
stephanie-wang Dec 2, 2023
e11b614
build
stephanie-wang Dec 4, 2023
99a38c2
fix
stephanie-wang Dec 5, 2023
204bb9b
fix
stephanie-wang Dec 6, 2023
4703f34
compile?
stephanie-wang Dec 6, 2023
420bd1c
build
stephanie-wang Dec 6, 2023
4cabbc5
x
stephanie-wang Dec 6, 2023
b44ef8a
fix
stephanie-wang Dec 6, 2023
881d5ff
copyright
stephanie-wang Dec 6, 2023
ef2cfb7
test
stephanie-wang Dec 6, 2023
ca22a63
Merge remote-tracking branch 'upstream/master' into mutable-objects-2
stephanie-wang Dec 6, 2023
dbbb3d6
Only allocate PlasmaObjectHeader if is_mutable=true
stephanie-wang Dec 7, 2023
9078776
Only call Read/Write Acquire/Release if is_mutable=true
stephanie-wang Dec 7, 2023
2e677c3
x
stephanie-wang Dec 7, 2023
f06b543
cpp test
stephanie-wang Dec 7, 2023
4dfa31e
skip tests on windows
stephanie-wang Dec 7, 2023
126296f
Merge remote-tracking branch 'upstream/master' into mutable-objects-2
stephanie-wang Dec 7, 2023
03f4fbd
larger CI machine
stephanie-wang Dec 8, 2023
3e7dfa2
Merge branch 'master' into mutable-objects-2
stephanie-wang Dec 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Test for errors, better error handling when too many readers
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
  • Loading branch information
stephanie-wang committed Dec 2, 2023
commit ea57894f405c757d8faa6ed6077aecfc7c9db0cf
10 changes: 6 additions & 4 deletions python/ray/_private/ray_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,9 @@ def async_actor_multi():

ray.init()

def put_channel_small(chans, num_readers=1, do_get=False, do_release=False):
def put_channel_small(chans, do_get=False, do_release=False):
for chan in chans:
chan.write(b"0", num_readers=num_readers)
chan.write(b"0")
if do_get:
chan.begin_read()
if do_release:
Expand Down Expand Up @@ -337,14 +337,14 @@ def read(self, chans):
n_cpu = multiprocessing.cpu_count() // 2
print(f"Testing multiple readers/channels, n={n_cpu}")

chans = [ray_channel.Channel(1000)]
chans = [ray_channel.Channel(1000, num_readers=n_cpu)]
readers = [ChannelReader.remote() for _ in range(n_cpu)]
ray.get([reader.ready.remote() for reader in readers])
for reader in readers:
reader.read.remote(chans)
results += timeit(
"local put:n remote get, single channel calls",
lambda: put_channel_small(chans, num_readers=n_cpu),
lambda: put_channel_small(chans),
)
for reader in readers:
ray.kill(reader)
Expand All @@ -369,6 +369,8 @@ def read(self, chans):
for reader in readers:
ray.kill(reader)

ray.shutdown()

############################
# End of channel perf tests.
############################
Expand Down
29 changes: 18 additions & 11 deletions python/ray/experimental/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Channel:
ray.wait.
"""

def __init__(self, buffer_size: Optional[int] = None):
def __init__(self, buffer_size: Optional[int] = None, num_readers: int = 1):
"""
Create a channel that can be read and written by co-located Ray processes.

Expand All @@ -71,19 +71,20 @@ def __init__(self, buffer_size: Optional[int] = None):
else:
self._base_ref = _create_channel_ref(buffer_size)
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved

self.worker = ray._private.worker.global_worker
self.worker.check_connected()
self._num_readers = num_readers
self._worker = ray._private.worker.global_worker
self._worker.check_connected()

@staticmethod
def _from_base_ref(base_ref: "ray.ObjectRef") -> "Channel":
chan = Channel()
def _from_base_ref(base_ref: "ray.ObjectRef", num_readers: int) -> "Channel":
chan = Channel(num_readers=num_readers)
chan._base_ref = base_ref
return chan

def __reduce__(self):
return self._from_base_ref, (self._base_ref,)
return self._from_base_ref, (self._base_ref, self._num_readers)

def write(self, value: Any, num_readers: int):
def write(self, value: Any, num_readers: Optional[int] = None):
"""
Write a value to the channel.

Expand All @@ -96,11 +97,13 @@ def write(self, value: Any, num_readers: int):
num_readers: The number of readers that must read and release the value
before we can write again.
"""
if num_readers is None:
num_readers = self._num_readers
if num_readers <= 0:
raise ValueError("``num_readers`` must be a positive integer.")

try:
serialized_value = self.worker.get_serialization_context().serialize(value)
serialized_value = self._worker.get_serialization_context().serialize(value)
except TypeError as e:
sio = io.StringIO()
ray.util.inspect_serializability(value, print_file=sio)
Expand All @@ -111,7 +114,7 @@ def write(self, value: Any, num_readers: int):
)
raise TypeError(msg) from e

self.worker.core_worker.experimental_mutable_object_put_serialized(
self._worker.core_worker.experimental_mutable_object_put_serialized(
serialized_value,
self._base_ref,
num_readers,
Expand All @@ -122,10 +125,14 @@ def begin_read(self) -> Any:
Read the latest value from the channel. This call will block until a
value is available to read.

Subsequent calls to begin_read() will return the same value, until
end_read() is called. Then, the client must begin_read() again to get
the next value.

Returns:
Any: The deserialized value.
"""
values, _ = self.worker.get_objects(
values, _ = self._worker.get_objects(
[self._base_ref], _is_experimental_mutable_object=True
)
return values[0]
Expand All @@ -137,6 +144,6 @@ def end_read(self):
If begin_read is not called first, then this call will block until a
value is written, then drop the value.
"""
self.worker.core_worker.experimental_mutable_object_read_release(
self._worker.core_worker.experimental_mutable_object_read_release(
[self._base_ref]
)
49 changes: 49 additions & 0 deletions python/ray/tests/test_accelerated_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,57 @@ def test_put_local_get(ray_start_regular):
val = i.to_bytes(8, "little")
chan.write(val, num_readers=1)
assert chan.begin_read() == val

# Begin read multiple times will return the same value.
assert chan.begin_read() == val

chan.end_read()


def test_errors(ray_start_regular):
@ray.remote
class Actor:
def make_chan(self, do_write=True):
self.chan = ray_channel.Channel(1000)
if do_write:
self.chan.write(b"hello", num_readers=1)
return self.chan

a = Actor.remote()
# Only original creator can write.
chan = ray.get(a.make_chan.remote(do_write=False))
with pytest.raises(ray.exceptions.RaySystemError):
chan.write(b"hi")

# Only original creator can write.
chan = ray.get(a.make_chan.remote(do_write=True))
assert chan.begin_read() == b"hello"
with pytest.raises(ray.exceptions.RaySystemError):
chan.write(b"hi")

# Multiple consecutive reads from the same process are fine.
chan = ray.get(a.make_chan.remote(do_write=True))
assert chan.begin_read() == b"hello"
assert chan.begin_read() == b"hello"
chan.end_read()

@ray.remote
class Reader:
def __init__(self):
pass

def read(self, chan):
return chan.begin_read()

# Multiple reads from n different processes, where n > num_readers, errors.
chan = ray.get(a.make_chan.remote(do_write=True))
readers = [Reader.remote(), Reader.remote()]
# At least 1 reader
with pytest.raises(ray.exceptions.RayTaskError) as exc_info:
ray.get([reader.read.remote(chan) for reader in readers])
assert "ray.exceptions.RaySystemError" in str(exc_info.value)


def test_put_different_meta(ray_start_regular):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh did you fix it, or they happen to have the same metadata size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed it.

chan = ray_channel.Channel(1000)

Expand All @@ -42,6 +90,7 @@ def _test(val):
_test(1000)
_test(np.random.rand(10))

# Cannot put a serialized value larger than the allocated buffer.
with pytest.raises(ValueError):
_test(np.random.rand(100))

Expand Down
44 changes: 26 additions & 18 deletions src/ray/object_manager/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ void PlasmaObjectHeader::WriteAcquire(int64_t write_version,
<< ". Are you sure this is the only writer?";

version = write_version;
is_sealed = false;
data_size = write_data_size;
metadata_size = write_metadata_size;
num_readers = write_num_readers;
Expand All @@ -76,6 +77,7 @@ void PlasmaObjectHeader::WriteRelease(int64_t write_version) {
<< version << ". Are you sure this is the only writer?";

version = write_version;
is_sealed = true;
RAY_CHECK(num_readers != 0) << num_readers;
num_read_acquires_remaining = num_readers;
num_read_releases_remaining = num_readers;
Expand All @@ -87,30 +89,36 @@ void PlasmaObjectHeader::WriteRelease(int64_t write_version) {
RAY_CHECK(pthread_cond_broadcast(&cond) == 0);
}

int64_t PlasmaObjectHeader::ReadAcquire(int64_t read_version) {
RAY_LOG(DEBUG) << "ReadAcquire waiting version " << read_version;
bool PlasmaObjectHeader::ReadAcquire(int64_t version_to_read, int64_t *version_read) {
RAY_LOG(DEBUG) << "ReadAcquire waiting version " << version_to_read;
RAY_CHECK(pthread_mutex_lock(&wr_mut) == 0);
RAY_LOG(DEBUG) << "ReadAcquire " << read_version;
RAY_LOG(DEBUG) << "ReadAcquire " << version_to_read;
PrintPlasmaObjectHeader(this);

while (version < read_version || num_read_acquires_remaining == 0) {
// Wait for the requested version (or a more recent one) to be sealed.
while (version < version_to_read || !is_sealed) {
RAY_CHECK(pthread_cond_wait(&cond, &wr_mut) == 0);
}

if (version > read_version) {
RAY_LOG(WARNING) << "Version " << version << " already exceeds version to read "
<< read_version << ". May have missed earlier reads.";
}

if (num_readers != -1) {
num_read_acquires_remaining--;
RAY_CHECK(num_read_acquires_remaining >= 0)
<< "readers acquired exceeds max readers " << num_readers;
// This object can only be read a constant number of times. Tell the caller
// which version was read.
read_version = version;
bool success = false;
if (num_readers == -1) {
// Object is a normal immutable object. Read succeeds.
*version_read = 0;
success = true;
} else {
read_version = 0;
*version_read = version;
if (version == version_to_read && num_read_acquires_remaining > 0) {
// This object is at the right version and still has reads remaining. Read
// succeeds.
num_read_acquires_remaining--;
success = true;
} else if (version > version_to_read) {
RAY_LOG(WARNING) << "Version " << version << " already exceeds version to read "
<< version_to_read;
} else {
RAY_LOG(WARNING) << "Version " << version << " already has " << num_readers
<< "readers";
}
}

RAY_LOG(DEBUG) << "ReadAcquire done";
Expand All @@ -119,7 +127,7 @@ int64_t PlasmaObjectHeader::ReadAcquire(int64_t read_version) {
RAY_CHECK(pthread_mutex_unlock(&wr_mut) == 0);
// Signal to other readers that they may read.
RAY_CHECK(pthread_cond_signal(&cond) == 0);
return read_version;
return success;
}

void PlasmaObjectHeader::ReadRelease(int64_t read_version) {
Expand Down
39 changes: 25 additions & 14 deletions src/ray/object_manager/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ struct PlasmaObjectHeader {
// the first write and then should never be modified. For mutable objects,
// each new write must increment the version before releasing to readers.
int64_t version = 0;
// Indicates whether the current version has been written. is_sealed=false
// means that there is a writer who has WriteAcquire'd but not yet
// WriteRelease'd the current version. is_sealed=true means that `version`
// has been WriteRelease'd. A reader may read the actual object value if
// is_sealed=true and num_read_acquires_remaining != 0.
bool is_sealed = false;
// The total number of reads allowed before the writer can write again. This
// value should be set by the writer before releasing to readers.
// For immutable objects, this is set to -1 and infinite reads are allowed.
Expand All @@ -66,6 +72,10 @@ struct PlasmaObjectHeader {
// objects, readers must ensure this is > 0 and decrement before they read.
// Once this value reaches 0, no more readers are allowed until the writer
// writes a new version.
// NOTE(swang): Technically we do not need this because
// num_read_releases_remaining protects against too many readers. However,
// this allows us to throw an error as soon as the n+1-th reader begins,
// instead of waiting to error until the n+1-th reader is done reading.
int64_t num_read_acquires_remaining = 0;
// The number of readers who must release the current version before a new
// version can be written. For mutable objects, readers must decrement this
Expand All @@ -79,13 +89,15 @@ struct PlasmaObjectHeader {
uint64_t data_size = 0;
uint64_t metadata_size = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is not used yet. Let's remove it for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used.


/// Setup synchronization primitives.
void Init();

/// Destroy synchronization primitives.
void Destroy();

/// Blocks until all readers for the previous write have ReadRelease'd the value.
/// Caller must ensure there is one writer at a time. Caller must pass
/// consecutive versions on each new write, starting with write_version=1.
/// Blocks until all readers for the previous write have ReadRelease'd the
/// value. Protects against concurrent writers. Caller must pass consecutive
/// versions on each new write, starting with write_version=1.
///
/// \param write_version The new version for write.
/// \param data_size The new data size of the object.
Expand All @@ -96,23 +108,22 @@ struct PlasmaObjectHeader {
uint64_t metadata_size,
int64_t num_readers);

// Call after completing a write to signal that readers may read.
// num_readers should be set before calling this.
/// Call after completing a write to signal that readers may read.
/// num_readers should be set before calling this.
///
/// \param write_version The new version for write. This must match the
/// version previously passed to WriteAcquire.
void WriteRelease(int64_t write_version);

// Blocks until the given version or a more recent version is ready to read.
// If num_readers have already read this version, then this call will hang.
// Blocks until the given version is ready to read. Returns false if the
// maximum number of readers have already read the requested version.
//
// \param read_version The minimum version to wait for.
// \return The version that was read. This should be passed to ReadRelease
// when the reader is done. Returns 0 if the object is a normal immutable
// object, meaning no ReadRelease is needed.
///
/// \param read_version Read at least this version.
int64_t ReadAcquire(int64_t read_version);
// \param[in] read_version The version to read.
// \param[out] version_read For normal immutable objects, this will be set to
// 0. Otherwise, the current version.
// \return success Whether the correct version was read and there were still
// reads remaining.
bool ReadAcquire(int64_t version_to_read, int64_t *version_read);

// Finishes the read. If all reads are done, signals to the writer. This is
// not necessary to call for objects that have num_readers=-1.
Expand Down
Loading
Loading