Skip to content

Commit

Permalink
[client][test] Client multiprocessing tests + client api minor fix (r…
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitriGekhtman authored Jul 7, 2021
1 parent 34422ef commit c6497c6
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 9 deletions.
1 change: 1 addition & 0 deletions ci/travis/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ test_python() {
-python/ray/tests:test_multi_node_2
-python/ray/tests:test_multi_node_3
-python/ray/tests:test_multiprocessing # test_connect_to_ray() fails to connect to raylet
-python/ray/tests:test_multiprocessing_client_mode # timeout
-python/ray/tests:test_node_manager
-python/ray/tests:test_object_manager
-python/ray/tests:test_placement_group # timeout and OOM
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ py_test_module_list(
"test_basic_2.py",
"test_basic_3.py",
"test_asyncio.py",
"test_multiprocessing.py",
],
size = "medium",
extra_srcs = SRCS,
Expand Down
14 changes: 7 additions & 7 deletions python/ray/tests/test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def check_pool_size(pool, size):
# Check that starting a pool starts ray if not initialized.
pool = Pool(processes=2)
assert ray.is_initialized()
assert int(ray.state.cluster_resources()["CPU"]) == 2
assert int(ray.cluster_resources()["CPU"]) == 2
check_pool_size(pool, 2)
pool.terminate()
pool.join()
Expand All @@ -58,7 +58,7 @@ def check_pool_size(pool, size):
ray.init(num_cpus=3)
assert ray.is_initialized()
pool = Pool(processes=2)
assert int(ray.state.cluster_resources()["CPU"]) == 3
assert int(ray.cluster_resources()["CPU"]) == 3
check_pool_size(pool, 2)
pool.terminate()
pool.join()
Expand All @@ -70,7 +70,7 @@ def check_pool_size(pool, size):
assert ray.is_initialized()
with pytest.raises(ValueError):
Pool(processes=2)
assert int(ray.state.cluster_resources()["CPU"]) == 1
assert int(ray.cluster_resources()["CPU"]) == 1
ray.shutdown()


Expand Down Expand Up @@ -98,7 +98,7 @@ def check_pool_size(pool, size):
# Check that starting a pool still starts ray if RAY_ADDRESS not set.
pool = Pool(processes=init_cpus)
assert ray.is_initialized()
assert int(ray.state.cluster_resources()["CPU"]) == init_cpus
assert int(ray.cluster_resources()["CPU"]) == init_cpus
check_pool_size(pool, init_cpus)
pool.terminate()
pool.join()
Expand All @@ -108,7 +108,7 @@ def check_pool_size(pool, size):
# ray_address is passed in.
pool = Pool(ray_address=address)
assert ray.is_initialized()
assert int(ray.state.cluster_resources()["CPU"]) == start_cpus
assert int(ray.cluster_resources()["CPU"]) == start_cpus
check_pool_size(pool, start_cpus)
pool.terminate()
pool.join()
Expand All @@ -121,7 +121,7 @@ def check_pool_size(pool, size):
# RAY_ADDRESS is set.
pool = Pool()
assert ray.is_initialized()
assert int(ray.state.cluster_resources()["CPU"]) == start_cpus
assert int(ray.cluster_resources()["CPU"]) == start_cpus
check_pool_size(pool, start_cpus)
pool.terminate()
pool.join()
Expand All @@ -131,7 +131,7 @@ def check_pool_size(pool, size):
# error if there aren't enough CPUs for the number of processes.
with pytest.raises(Exception):
Pool(processes=start_cpus + 1)
assert int(ray.state.cluster_resources()["CPU"]) == start_cpus
assert int(ray.cluster_resources()["CPU"]) == start_cpus
ray.shutdown()


Expand Down
8 changes: 6 additions & 2 deletions python/ray/util/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,14 @@ def remote(self, *args, **kwargs):
return self.api.remote(*args, **kwargs)

def __getattr__(self, key: str):
if not self.is_connected():
if self.is_connected():
return getattr(self.api, key)
elif key in ["is_initialized", "_internal_kv_initialized"]:
# Client is not connected, thus Ray is not considered initialized.
return lambda: False
else:
raise Exception("Ray Client is not connected. "
"Please connect by calling `ray.connect`.")
return getattr(self.api, key)

def is_connected(self) -> bool:
if self.client_worker is None:
Expand Down

0 comments on commit c6497c6

Please sign in to comment.