forked from yoheinakajima/babyagi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathray_tasks.py
69 lines (51 loc) · 1.94 KB
/
ray_tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import sys
import logging
import ray
from collections import deque
from typing import Dict, List
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parent.parent))
from extensions.ray_objectives import CooperativeObjectivesListStorage
try:
ray.init(address="auto", namespace="babyagi", logging_level=logging.FATAL, ignore_reinit_error=True)
except:
ray.init(namespace="babyagi", logging_level=logging.FATAL, ignore_reinit_error=True)
@ray.remote
class CooperativeTaskListStorageActor:
def __init__(self):
self.tasks = deque([])
self.task_id_counter = 0
def append(self, task: Dict):
self.tasks.append(task)
def replace(self, tasks: List[Dict]):
self.tasks = deque(tasks)
def popleft(self):
return self.tasks.popleft()
def is_empty(self):
return False if self.tasks else True
def next_task_id(self):
self.task_id_counter += 1
return self.task_id_counter
def get_task_names(self):
return [t["task_name"] for t in self.tasks]
class CooperativeTaskListStorage:
def __init__(self, name: str):
self.name = name
try:
self.actor = ray.get_actor(name=self.name, namespace="babyagi")
except ValueError:
self.actor = CooperativeTaskListStorageActor.options(name=self.name, namespace="babyagi", lifetime="detached").remote()
objectives = CooperativeObjectivesListStorage()
objectives.append(self.name)
def append(self, task: Dict):
self.actor.append.remote(task)
def replace(self, tasks: List[Dict]):
self.actor.replace.remote(tasks)
def popleft(self):
return ray.get(self.actor.popleft.remote())
def is_empty(self):
return ray.get(self.actor.is_empty.remote())
def next_task_id(self):
return ray.get(self.actor.next_task_id.remote())
def get_task_names(self):
return ray.get(self.actor.get_task_names.remote())