Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ActorGroup] Add ActorGroup #18960

Merged
merged 18 commits into from
Oct 22, 2021
Prev Previous commit
Next Next commit
address comments
amogkam committed Oct 21, 2021
commit adb0d99aaa15c2b48792d0d42766fad52cdea683
99 changes: 67 additions & 32 deletions python/ray/util/actor_group.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import socket
import weakref
from dataclasses import dataclass
import logging
from typing import List, TypeVar, Optional, Dict, Type, Tuple
@@ -13,14 +13,38 @@


@dataclass
class Worker:
class ActorWrapper:
"""Class containing an actor and its metadata."""
actor: ActorHandle
metadata: ActorMetadata


@dataclass
class ActorConfig:
pass
num_cpus: float
num_gpus: float
resources: Optional[Dict[str, float]]
init_args: Tuple
init_kwargs: Dict


class ActorGroupMethod:
def __init__(self, actor_group: "ActorGroup", method_name: str):
self.actor_group = weakref.ref(actor_group)
self._method_name = method_name

def __call__(self, *args, **kwargs):
raise TypeError("ActorGroup methods cannot be called directly. "
"Instead "
f"of running 'object.{self._method_name}()', try "
f"'object.{self._method_name}.remote()'.")

def remote(self, *args, **kwargs):
return [
getattr(a.actor, self._method_name).remote(*args, **kwargs)
for a in self.actor_group.actors
]


class ActorGroup:
"""Group of Ray Actors that can execute arbitrary functions.
@@ -44,7 +68,7 @@ class ActorGroup:
Dictionary specifying the resources that will be
requested for each actor in addition to ``num_cpus_per_actor``
and ``num_gpus_per_actor``.
remote_cls_args, remote_cls_kwargs: If ``remote_cls`` is provided,
init_args, init_kwargs: If ``remote_cls`` is provided,
these args will be used for the actor initialization.


@@ -55,15 +79,14 @@ class ActorGroup:

"""

def __init__(
self,
actor_cls: Type,
num_actors: int = 1,
num_cpus_per_actor: float = 1,
num_gpus_per_actor: float = 0,
resources_per_actor: Optional[Dict[str, float]] = None,
actor_cls_args: Optional[Tuple] = None,
actor_cls_kwargs: Optional[Dict] = None):
def __init__(self,
actor_cls: Type,
num_actors: int = 1,
num_cpus_per_actor: float = 1,
num_gpus_per_actor: float = 0,
resources_per_actor: Optional[Dict[str, float]] = None,
init_args: Optional[Tuple] = None,
init_kwargs: Optional[Dict] = None):

if num_actors <= 0:
raise ValueError("The provided `num_actors` must be greater "
@@ -78,20 +101,31 @@ def __init__(
self.actors = []

self.num_actors = num_actors
self.num_cpus_per_actor = num_cpus_per_actor
self.num_gpus_per_actor = num_gpus_per_actor
self.additional_resources_per_actor = resources_per_actor

self._actor_cls_args = actor_cls_args or []
self._actor_cls_kwargs = actor_cls_kwargs or {}
self.actor_config = ActorConfig(
num_cpus=num_cpus_per_actor,
num_gpus=num_gpus_per_actor,
resources=resources_per_actor,
init_args=init_args or (),
init_kwargs=init_kwargs or {})

#TODO: make into dataclass.
self._remote_cls = ray.remote(
num_cpus=self.num_cpus_per_actor,
num_gpus=self.num_gpus_per_actor,
resources=self.additional_resources_per_actor)(actor_cls)
num_cpus=self.actor_config.num_cpus,
num_gpus=self.actor_config.num_gpus,
resources=self.actor_config.resources)(actor_cls)

self.start()

def __getattr__(self, item):
# Same implementation as actor.py
return ActorGroupMethod(self, item)

def __len__(self):
return len(self.actors)

def __getitem__(self, item):
return self.actors[item]

def start(self):
"""Starts all the actors in this actor group."""
if self.actors and len(self.actors) > 0:
@@ -154,21 +188,22 @@ def add_actors(self, num_actors: int):
new_actors = []
new_actor_metadata = []
for _ in range(num_actors):
actor = self._remote_cls.remote(*self._actor_cls_args,
**self._actor_cls_kwargs)
actor = self._remote_cls.remote(*self.actor_config.init_args,
**self.actor_config.init_kwargs)
new_actors.append(actor)
new_actor_metadata.append(
actor._BaseactorMixin__execute.remote(construct_metadata))
if hasattr(actor, "get_actor_metadata"):
new_actor_metadata.append(actor.get_actor_metadata.remote())

# Get metadata from all actors.
metadata = ray.get(new_actor_metadata)

if len(metadata) == 0:
metadata = [None] * len(new_actors)

for i in range(len(new_actors)):
self.actors.append(
Actor(actor=new_actors[i], metadata=metadata[i]))
ActorWrapper(actor=new_actors[i], metadata=metadata[i]))

def __len__(self):
return len(self.actors)

def __getitem__(self, item):
return self.actors[item]
@property
def actor_metadata(self):
return [a.metadata for a in self.actors]