Skip to content

Commit

Permalink
Checkpoint in my increasingly frustrated efforts to debug this Broken…
Browse files Browse the repository at this point in the history
…PoolError
  • Loading branch information
andyljones committed Mar 2, 2021
1 parent adcc336 commit 07a0d4a
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 11 deletions.
76 changes: 66 additions & 10 deletions boardlaw/arena/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import pandas as pd
import numpy as np
import torch
from rebar import arrdict, dotdict
from rebar import arrdict, dotdict, parallel
from logging import getLogger
from itertools import combinations
import rebar
from multiprocessing import set_start_method

log = getLogger(__name__)

Expand Down Expand Up @@ -184,25 +186,57 @@ def step(self):
results = self.record(transitions, live, start, end)
return results

def evaluate(worldfunc, agentfunc, games, n_envs_per, chunksize=64):
assert games.index == games.columns
def _evaluate(worldfunc, agentfunc, subgames, n_envs_per):
agents = {n: agentfunc(n) for n in subgames.index}
evaluator = ChunkEvaluator(worldfunc, agents, subgames, n_envs_per=n_envs_per)

results = []
while not evaluator.finished():
results.extend(evaluator.step())

return results

def worldfunc(n_envs):
from . import common
return common.worlds('2021-02-26 16-57-51 angry-depth', n_envs, device='cuda')

def agentfunc(name):
from . import common
run, idx = name
return common.agent(run, idx, device='cuda')

def evaluate(worldfunc, agentfunc, games, n_envs_per=512, chunksize=64, n_workers=4):
assert list(games.index) == list(games.columns)

names = list(games.index)
chunks = [names[i:i+chunksize] for i in range(0, len(names), chunksize)]

jobs = []
jobs = {}
# Diagonal pieces
for chunk in chunks:
jobs.append(games.loc[chunk, chunk])
for i, chunk in enumerate(chunks):
jobs[i, i] = games.loc[chunk, chunk]

log.info('Generated diagonal pieces')

# Skew pieces
for first, second in combinations(chunks, 2):
for (i, first), (j, second) in combinations(enumerate(chunks), 2):
combined = first + second
subgames = games.loc[combined, combined].copy()
subgames.loc[first, first] = n_envs_per
subgames.loc[second, second] = n_envs_per
jobs.append(subgames)
pass
jobs[i, j] = subgames

log.info('Generated skew pieces')

results = []
set_start_method('spawn', True)
with parallel.parallel(_evaluate, N=n_workers, executor='process') as pool:
jobs = {k: pool(worldfunc, agentfunc, subgames, n_envs_per) for k, subgames in jobs.items()}
for k, rs in pool.wait(jobs).items():
results.extend(rs)
print(len(results))

return results

class MockAgent:

Expand Down Expand Up @@ -302,4 +336,26 @@ def test_chunk_evaluator():
results.extend(evaluator.step())

display.clear_output(wait=True)
evaluator.report()
evaluator.report()

def test_evaluator():
from pavlov import runs, storage
from boardlaw.arena import common

df = runs.pandas(description='cat/nodes')

def worldfunc(n_envs):
return common.worlds(df.index[0], n_envs, device='cuda')

def agentfunc(name):
run, idx = name
return common.agent(run, idx, device='cuda')

names = []
for r in df.index:
snaps = storage.snapshots(r)
for i in snaps:
names.append((r, i))

games = pd.DataFrame(0, names, names).iloc[:12, :12]
rs = evaluate(worldfunc, agentfunc, games, chunksize=4)
3 changes: 2 additions & 1 deletion rebar/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def _device_init(i):
os.environ['CUDA_VISIBLE_DEVICES'] = str(device)

def _adjust_process_count(self):
assert self._init_args == (), 'Device executor doesn\'t currently support custom initializers
assert self._initargs == (), 'Device executor doesn\'t currently support custom initializers'
from concurrent.futures.process import _process_worker
for i in range(len(self._processes), self._max_workers):
p = self._mp_context.Process(
Expand All @@ -55,6 +55,7 @@ def VariableExecutor(N=None, executor='process', **kwargs):
executor = 'serial'

executors = {
'serial': SerialExecutor,
'process': ProcessPoolExecutor,
'thread': ThreadPoolExecutor,
'cuda': CUDAPoolExecutor}
Expand Down

0 comments on commit 07a0d4a

Please sign in to comment.