From 623efffb3453d70a14a3fa58dbed51f84aa8263f Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Fri, 13 Nov 2020 22:40:39 -0800 Subject: [PATCH] pathos fixed --- hub/compute/pathos.py | 7 +++--- hub/compute/ray.py | 16 +++++++++++--- hub/compute/tests/test_pipeline.py | 34 ++++++++++++++++++++++++++---- hub/utils.py | 13 ++++++++++++ 4 files changed, 60 insertions(+), 10 deletions(-) diff --git a/hub/compute/pathos.py b/hub/compute/pathos.py index 1f5f09beb0..4fb8719d4f 100644 --- a/hub/compute/pathos.py +++ b/hub/compute/pathos.py @@ -3,8 +3,7 @@ from hub.compute.transform import Transform try: - from pathos.pools import ProcessPool - from pathos.pools import ThreadPool + from pathos.pools import ProcessPool, ThreadPool except Exception: pass @@ -12,7 +11,8 @@ class PathosTransform(Transform): def __init__(self, func, schema, ds): Transform.__init__(self, func, schema, ds) - self.map = ThreadPool(nodes=8).map + Pool = ProcessPool or ThreadPool + self.map = ThreadPool(nodes=2).map def store(self, url, token=None): """ @@ -30,6 +30,7 @@ def batchify(ds): def batched_func(i_xs): i, xs = i_xs + print(i) xs = [self._func(x) for x in xs] self._transfer_batch(ds, i, xs) diff --git a/hub/compute/ray.py b/hub/compute/ray.py index 962cbdd8b7..684c32aad6 100644 --- a/hub/compute/ray.py +++ b/hub/compute/ray.py @@ -4,13 +4,23 @@ try: import ray + remote = ray.remote except Exception: - ray = None + def remote(template, **kwargs): + """ + remote template + """ + def wrapper(func): + def inner(**kwargs): + return func + return inner + + return wrapper class RayTransform(Transform): - @ray.remote + @remote def _transfer_batch(self, ds, i, results): for j, result in enumerate(results[0]): for key in result: @@ -29,7 +39,7 @@ def store_chunkwise(self, url, token=None): # Chunkwise compute batch_size = ds.chunksize - @ray.remote(num_returns=int(len(ds) / batch_size)) + @remote(num_returns=int(len(ds) / batch_size)) def batchify(results): return tuple(batch(results, batch_size)) diff --git a/hub/compute/tests/test_pipeline.py b/hub/compute/tests/test_pipeline.py index 5c7d09c571..f67dd755a2 100644 --- a/hub/compute/tests/test_pipeline.py +++ b/hub/compute/tests/test_pipeline.py @@ -1,10 +1,10 @@ -import pytest import numpy as np +import pytest import zarr import hub from hub.features import Tensor -from hub.utils import ray_loaded, Timer +from hub.utils import ray_loaded, pathos_loaded, Timer my_schema = { "image": Tensor((28, 28, 4), "int32", (28, 28, 4)), @@ -44,6 +44,31 @@ def test_pipeline_ray(): pass +@pytest.mark.skipif( + not pathos_loaded(), + reason="requires pathos to be loaded", +) +def test_pathos(sample_size=100, width=100, channels=4, dtype="uint8"): + + my_schema = { + "image": Tensor((width, width, channels), dtype, (width, width, channels), chunks=(sample_size // 20, width, width, channels)), + } + + with Timer("pathos"): + @hub.transform(schema=my_schema, scheduler="pathos", processes=1) + def my_transform(x): + return { + "image": (np.ones((width, width, channels), dtype=dtype) * 255), + } + + ds = hub.Dataset( + "./data/test/test_pipeline_basic_3", mode="w", shape=(sample_size,), schema=my_schema, cache=0 + ) + + ds_t = my_transform(ds).store("./data/test/test_pipeline_basic_4") + + assert (ds_t["image", :].numpy() == 255).all() + def benchmark(sample_size=100, width=1000, channels=4, dtype="int8"): numpy_arr = np.zeros((sample_size, width, width, channels), dtype=dtype) zarr_fs = zarr.zeros((sample_size, width, width, channels), dtype=dtype, store=zarr.storage.FSStore("./data/test/array"), overwrite=True) @@ -70,7 +95,7 @@ def benchmark(sample_size=100, width=1000, channels=4, dtype="int8"): arr[i] = (np.random.rand(width, width, channels) * 255).astype(dtype) print(f"~~~ Pipeline {sample_size}x{width}x{width}x{channels} random arrays ~~~") - for name, processes in [("single", 1), ("pathos", 10)]: # , ("ray", 10), ("green", 10), ("dask", 10)]: + for name, processes in [("single", 1), ("pathos", 10)]: # , ("ray", 10), ("green", 10), ("dask", 10)]: @hub.transform(schema=my_schema, scheduler=name, processes=processes) def my_transform(sample): return { @@ -83,4 +108,5 @@ def my_transform(sample): if __name__ == "__main__": # test_pipeline_basic() - benchmark() \ No newline at end of file + test_pathos() + # benchmark() \ No newline at end of file diff --git a/hub/utils.py b/hub/utils.py index b4032e80b9..a7b03c4f76 100644 --- a/hub/utils.py +++ b/hub/utils.py @@ -46,6 +46,7 @@ def pytorch_loaded(): return False return True + def ray_loaded(): try: import ray @@ -55,6 +56,7 @@ def ray_loaded(): return False return True + def dask_loaded(): try: import ray @@ -64,6 +66,7 @@ def dask_loaded(): return False return True + def tensorflow_loaded(): try: import tensorflow @@ -84,6 +87,16 @@ def tfds_loaded(): return True +def pathos_loaded(): + try: + import pathos + + pathos.__version__ + except ImportError: + return False + return True + + def compute_lcm(a): """ Lowest Common Multiple of a list a