Skip to content

Commit

Permalink
pathos fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbuniat committed Nov 14, 2020
1 parent 999ec85 commit 623efff
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 10 deletions.
7 changes: 4 additions & 3 deletions hub/compute/pathos.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
from hub.compute.transform import Transform

try:
from pathos.pools import ProcessPool
from pathos.pools import ThreadPool
from pathos.pools import ProcessPool, ThreadPool
except Exception:
pass


class PathosTransform(Transform):
def __init__(self, func, schema, ds):
Transform.__init__(self, func, schema, ds)
self.map = ThreadPool(nodes=8).map
Pool = ProcessPool or ThreadPool
self.map = ThreadPool(nodes=2).map

def store(self, url, token=None):
"""
Expand All @@ -30,6 +30,7 @@ def batchify(ds):

def batched_func(i_xs):
i, xs = i_xs
print(i)
xs = [self._func(x) for x in xs]
self._transfer_batch(ds, i, xs)

Expand Down
16 changes: 13 additions & 3 deletions hub/compute/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,23 @@

try:
import ray
remote = ray.remote
except Exception:
ray = None
def remote(template, **kwargs):
"""
remote template
"""
def wrapper(func):
def inner(**kwargs):
return func
return inner

return wrapper


class RayTransform(Transform):

@ray.remote
@remote
def _transfer_batch(self, ds, i, results):
for j, result in enumerate(results[0]):
for key in result:
Expand All @@ -29,7 +39,7 @@ def store_chunkwise(self, url, token=None):
# Chunkwise compute
batch_size = ds.chunksize

@ray.remote(num_returns=int(len(ds) / batch_size))
@remote(num_returns=int(len(ds) / batch_size))
def batchify(results):
return tuple(batch(results, batch_size))

Expand Down
34 changes: 30 additions & 4 deletions hub/compute/tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import pytest
import numpy as np
import pytest
import zarr

import hub
from hub.features import Tensor
from hub.utils import ray_loaded, Timer
from hub.utils import ray_loaded, pathos_loaded, Timer

my_schema = {
"image": Tensor((28, 28, 4), "int32", (28, 28, 4)),
Expand Down Expand Up @@ -44,6 +44,31 @@ def test_pipeline_ray():
pass


@pytest.mark.skipif(
not pathos_loaded(),
reason="requires pathos to be loaded",
)
def test_pathos(sample_size=100, width=100, channels=4, dtype="uint8"):

my_schema = {
"image": Tensor((width, width, channels), dtype, (width, width, channels), chunks=(sample_size // 20, width, width, channels)),
}

with Timer("pathos"):
@hub.transform(schema=my_schema, scheduler="pathos", processes=1)
def my_transform(x):
return {
"image": (np.ones((width, width, channels), dtype=dtype) * 255),
}

ds = hub.Dataset(
"./data/test/test_pipeline_basic_3", mode="w", shape=(sample_size,), schema=my_schema, cache=0
)

ds_t = my_transform(ds).store("./data/test/test_pipeline_basic_4")

assert (ds_t["image", :].numpy() == 255).all()

def benchmark(sample_size=100, width=1000, channels=4, dtype="int8"):
numpy_arr = np.zeros((sample_size, width, width, channels), dtype=dtype)
zarr_fs = zarr.zeros((sample_size, width, width, channels), dtype=dtype, store=zarr.storage.FSStore("./data/test/array"), overwrite=True)
Expand All @@ -70,7 +95,7 @@ def benchmark(sample_size=100, width=1000, channels=4, dtype="int8"):
arr[i] = (np.random.rand(width, width, channels) * 255).astype(dtype)

print(f"~~~ Pipeline {sample_size}x{width}x{width}x{channels} random arrays ~~~")
for name, processes in [("single", 1), ("pathos", 10)]: # , ("ray", 10), ("green", 10), ("dask", 10)]:
for name, processes in [("single", 1), ("pathos", 10)]: # , ("ray", 10), ("green", 10), ("dask", 10)]:
@hub.transform(schema=my_schema, scheduler=name, processes=processes)
def my_transform(sample):
return {
Expand All @@ -83,4 +108,5 @@ def my_transform(sample):

if __name__ == "__main__":
# test_pipeline_basic()
benchmark()
test_pathos()
# benchmark()
13 changes: 13 additions & 0 deletions hub/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def pytorch_loaded():
return False
return True


def ray_loaded():
try:
import ray
Expand All @@ -55,6 +56,7 @@ def ray_loaded():
return False
return True


def dask_loaded():
try:
import ray
Expand All @@ -64,6 +66,7 @@ def dask_loaded():
return False
return True


def tensorflow_loaded():
try:
import tensorflow
Expand All @@ -84,6 +87,16 @@ def tfds_loaded():
return True


def pathos_loaded():
try:
import pathos

pathos.__version__
except ImportError:
return False
return True


def compute_lcm(a):
"""
Lowest Common Multiple of a list a
Expand Down

0 comments on commit 623efff

Please sign in to comment.