Skip to content

Commit

Permalink
clean up parallel scanner, import codeutil.py directly from ipyparallel
Browse files Browse the repository at this point in the history
  • Loading branch information
jmrohwer committed Jul 8, 2021
1 parent 94e4734 commit 6503861
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 142 deletions.
58 changes: 27 additions & 31 deletions pysces/PyscesParScan.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,24 @@
import numpy as np
from pysces.PyscesUtils import TimerBox
from pysces.PyscesScan import Scanner
import multiprocessing

import sys, os, pickle

flush = sys.stdout.flush
from time import sleep, time
import subprocess

# this is a cooler way of doing this
# acutally don't need the _multicorescan import
# import _multicorescan
# del _multicorescan
# print "__file__ is", __file__
try:
import ipyparallel
except ImportError as ex:
print()
print(ex)
raise ImportError(
'PARSCANNER: Requires ipyparallel version >=4.0 (http://ipython.org).'
)

MULTISCANFILE = __file__.replace(
'PyscesParScan', '_multicorescan'
) # .replace('.pyc','.py')
)
# print 'MULTISCANFILE is', MULTISCANFILE
__psyco_active__ = 0

Expand Down Expand Up @@ -87,13 +89,7 @@ def __init__(self, mod, engine='multiproc'):
print('parallel engine: multiproc')
elif engine == 'ipcluster':
print('parallel engine: ipcluster')
try:
from ipyparallel import Client
except ImportError as ex:
print('\n', ex)
raise ImportError(
'PARSCANNER: Requires IPython and ipyparallel version >=4.0 (http://ipython.org) and 0MQ (http://zero.mq).'
)
from ipyparallel import Client
try:
rc = Client()
self.rc = rc
Expand All @@ -109,6 +105,8 @@ def __init__(self, mod, engine='multiproc'):
dv.execute('from pysces.PyscesParScan import Analyze, setModValue')
else:
raise UserWarning(engine + " is not a valid parallel engine!")

from ipyparallel.serialize import codeutil
self.GenDict = {}
self.GenOrder = []
self.ScanSpace = []
Expand Down Expand Up @@ -179,26 +177,24 @@ def Run(self, ReRun=False):
arl = [] # asynchronous results list
if self.engine == 'multiproc':
fN = str(time()).split('.')[0]
F = open(fN, 'wb')
pickle.dump(
(
self.mod,
self.ScanPartition,
self.SeqPartition,
self.GenOrder,
self.UserOutputList,
),
F,
protocol=-1,
)
F.close()
with open(fN, 'wb') as F:
pickle.dump(
(
self.mod,
self.ScanPartition,
self.SeqPartition,
self.GenOrder,
self.UserOutputList,
),
F,
protocol=-1,
)
fN = os.path.abspath(fN)
print("Preparation completed:", next(self.scanT.PREP))
self.scanT.normal_timer('RUN')
subprocess.call([sys.executable, MULTISCANFILE, self._MODE_, fN])
F = open(fN, 'rb')
res_list = pickle.load(F)
F.close()
with open(fN, 'rb') as F:
res_list = pickle.load(F)
os.remove(fN)
for result in res_list:
self.StoreData(result)
Expand Down
1 change: 0 additions & 1 deletion pysces/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from . import PyscesConfig
from . import PyscesParse
from . import PyscesLink as link
from . import codeutil
from . import PyscesSED as SED

from .PyscesUtils import str2bool
Expand Down
16 changes: 7 additions & 9 deletions pysces/_multicorescan.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

import sys, pickle
import multiprocessing
from pysces.PyscesParScan import Analyze, setModValue
from pysces.PyscesUtils import TimerBox
from pysces.PyscesParScan import Analyze
from time import sleep


Expand All @@ -34,9 +33,8 @@
pool = multiprocessing.Pool()

# load stuff from the pickle
F = open(sys.argv[2], 'rb')
mod, scanpartition, seqpartition, genorder, useroutputlist = pickle.load(F)
F.close()
with open(sys.argv[2], 'rb') as F:
mod, scanpartition, seqpartition, genorder, useroutputlist = pickle.load(F)

mod.SetQuiet() # kill verbose output during scan
# append tasks to asynchronous results list
Expand Down Expand Up @@ -70,8 +68,8 @@
res_list = []
for ar in arl:
res_list.append(ar.get())
pool.close()
# pickle results_list
F = open(sys.argv[2], 'wb')
pickle.dump(res_list, F, protocol=-1)
F.flush()
F.close()
with open(sys.argv[2], 'wb') as F:
pickle.dump(res_list, F, protocol=-1)
F.flush()
51 changes: 0 additions & 51 deletions pysces/codeutil.py

This file was deleted.

92 changes: 56 additions & 36 deletions pysces/examples/benchmark.ipy
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import pysces

from timeit import default_timer as clock

from IPython.parallel import Client
from ipyparallel import Client
rc=Client()
dv=rc[:]

Expand All @@ -26,73 +26,93 @@ m.SetQuiet()
%px m.scan_out=['J_R1', 'ccJR1_R4']
%px m.SetQuiet()

scan_range=np.logspace(0.5,1.2,2000)
scan_range=np.logspace(0.5,1.2,5000)

print "PySCeS parallel scanning benchmark\n==================================\n"
print("PySCeS parallel scanning benchmark\n==================================\n")

print "Serial execution with Scan1..."
print("\nSerial execution with Scan1...")
print("------------------------------")
t1=clock()
m.Scan1(scan_range)
ser_res = m.scan_res
t2=clock()
print "Time taken with Scan1:", t2-t1, "s"
print "States per second: ", len(scan_range)/(t2-t1)
print("Time taken with Scan1:", t2-t1, "s")
print("States per second: ", len(scan_range)/(t2-t1))

print "Using serial scanner class with Run..."
print("\nUsing serial scanner class with Run...")
print("--------------------------------------")
serrun=pysces.Scanner(m)
serrun.quietRun=True
serrun.addScanParameter('V4',10**0.5,10**1.2,2000,log=True)
serrun.addScanParameter('V4',10**0.5,10**1.2,5000,log=True)
serrun.addUserOutput('J_R1', 'ccJR1_R4')
t5=clock()
serrun.Run()
t6=clock()
print "Time taken with Scanner class:", t6-t5, "s"
print "States per second: ", len(scan_range)/(t6-t5)
print("Time taken with Scanner class:", t6-t5, "s")
print("States per second: ", len(scan_range)/(t6-t5))


print "Parallel execution direct code with scatter..."
print("\nParallel execution direct code with scatter...")
print("----------------------------------------------")
t3=clock()
dv.scatter('scan_range', np.logspace(0.5,1.2,2000))
dv.scatter('scan_range', np.logspace(0.5,1.2,5000))
%px m.Scan1(scan_range)
%px y=m.scan_res
yloc=dv.gather('y')
par_res = yloc.get()
t4=clock()
print "Time taken for direct parallel execution:", t4-t3, "s"
print "States per second: ", len(scan_range)/(t4-t3)
print "Number of engines:", len(dv)
print "Speedup:", (t2-t1)/(t4-t3)

print "Using parallel scanner class with RunScatter..."
parsct=pysces.ParScanner(m)
print("Time taken for direct parallel execution:", t4-t3, "s")
print("States per second: ", len(scan_range)/(t4-t3))
print("Number of engines:", len(dv))
print("Speedup:", (t2-t1)/(t4-t3))

print("\nUsing parallel scanner class with RunScatter...")
print("-----------------------------------------------")
parsct=pysces.ParScanner(m, engine='ipcluster')
parsct.quietRun = True
parsct.addScanParameter('V4',10**0.5,10**1.2,2000,log=True)
parsct.addScanParameter('V4',10**0.5,10**1.2,5000,log=True)
parsct.addUserOutput('J_R1', 'ccJR1_R4')
t7=clock()
parsct.RunScatter()
t8=clock()
print "Time taken with RunScatter for ParScanner class:", t8-t7, "s"
print "States per second: ", len(scan_range)/(t8-t7)
print "Speedup:", (t2-t1)/(t8-t7)
print("Time taken with RunScatter for ParScanner class:", t8-t7, "s")
print("States per second: ", len(scan_range)/(t8-t7))
print("Speedup:", (t2-t1)/(t8-t7))

print "Using parallel scanner class with Run..."
parrun=pysces.ParScanner(m)
print("\nUsing parallel scanner class with Run...(ipcluster)")
print("---------------------------------------------------")
parrun=pysces.ParScanner(m, engine='ipcluster')
parrun.quietRun = True
parrun.scans_per_run = 100
parrun.addScanParameter('V4',10**0.5,10**1.2,2000,log=True)
parrun.addScanParameter('V4',10**0.5,10**1.2,5000,log=True)
parrun.addUserOutput('J_R1', 'ccJR1_R4')
t9=clock()
parrun.Run()
t10=clock()
print "Time taken with Run for ParScanner class:", t10-t9, "s"
print "States per second: ", len(scan_range)/(t10-t9)
print "Speedup:", (t2-t1)/(t10-t9)


print "Checking results:"
print "serial vs parallel: ", np.allclose(ser_res,par_res)
print "serial Scan1 vs scanner class: ", np.allclose(ser_res,np.hstack((serrun.ScanSpace, serrun.UserOutputResults)))
print "parallel direct code vs. RunScatter: ", np.allclose(ser_res,np.hstack((parsct.ScanSpace, parsct.UserOutputResults)))
print "parallel direct code vs. Run (lv) : ", np.allclose(ser_res,np.hstack((parrun.ScanSpace, parrun.UserOutputResults)))
print("Time taken with Run for ParScanner class:", t10-t9, "s")
print("States per second: ", len(scan_range)/(t10-t9))
print("Speedup:", (t2-t1)/(t10-t9))

print("\nUsing parallel scanner class with Run...(multiprocessing)")
print("---------------------------------------------------------")
parrunmp=pysces.ParScanner(m)
parrunmp.quietRun = True
parrunmp.scans_per_run = 100
parrunmp.addScanParameter('V4',10**0.5,10**1.2,5000,log=True)
parrunmp.addUserOutput('J_R1', 'ccJR1_R4')
t11=clock()
parrunmp.Run()
t12=clock()
print("Time taken with Run for ParScanner class:", t12-t11, "s")
print("States per second: ", len(scan_range)/(t12-t11))
print("Speedup:", (t2-t1)/(t12-t11))

print("\nChecking results:")
print("-----------------")
print("serial vs parallel: ", np.allclose(ser_res,par_res))
print("serial Scan1 vs scanner class: ", np.allclose(ser_res,np.hstack((serrun.ScanSpace, serrun.UserOutputResults))))
print("parallel direct code vs. RunScatter: ", np.allclose(par_res,np.hstack((parsct.ScanSpace, parsct.UserOutputResults))))
print("parallel direct code vs. Run (ipc) : ", np.allclose(par_res,np.hstack((parrun.ScanSpace, parrun.UserOutputResults))))
print("parallel direct code vs. Run (mp) : ", np.allclose(par_res,np.hstack((parrunmp.ScanSpace, parrunmp.UserOutputResults))))

os.chdir(curDir)
20 changes: 10 additions & 10 deletions pysces/examples/parallelscan.ipy
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import pysces

from timeit import default_timer as clock

from IPython.parallel import Client
from ipyparallel import Client
rc=Client()
dv=rc[:]

Expand All @@ -31,27 +31,27 @@ with dv.sync_imports():

scan_range=np.logspace(0.5,1.2,1000)

print "Serial execution with Scan1..."
print("Serial execution with Scan1...")
t1=clock()
m.Scan1(scan_range)
ser_res = m.scan_res
t2=clock()
print "Time taken with Scan1:", t2-t1, "s"
print("Time taken with Scan1:", t2-t1, "s")

print "Parallel execution direct code with scatter..."
print "... is really easy and only requires 4 lines of code :-)"
print("Parallel execution direct code with scatter...")
print("... is really easy and only requires 4 lines of code :-)")
t3=clock()
dv.scatter('scan_range', np.logspace(0.5,1.2,1000))
%px m.Scan1(scan_range)
%px y=m.scan_res
par_res=dv.gather('y')
par_res.get()
t4=clock()
print "Time taken for direct parallel execution:", t4-t3, "s"
print "Number of engines:", len(dv)
print "Speedup:", (t2-t1)/(t4-t3)
print("Time taken for direct parallel execution:", t4-t3, "s")
print("Number of engines:", len(dv))
print("Speedup:", (t2-t1)/(t4-t3))

print "Checking results:"
print "serial vs parallel: ", np.alltrue(np.equal(ser_res,par_res.get()))
print("Checking results:")
print("serial vs parallel: ", np.alltrue(np.equal(ser_res,par_res.get())))

os.chdir(curDir)
2 changes: 1 addition & 1 deletion pysces/examples/testRunScatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
m = pysces.model('isola2a')

print("\n\nParallel execution...using RunScatter")
par2 = pysces.ParScanner(m)
par2 = pysces.ParScanner(m, engine='ipcluster')
t5 = time.time()
par2.addScanParameter('V4', 60, 100, 11)
par2.addScanParameter('V1', 100, 130, 16)
Expand Down
2 changes: 1 addition & 1 deletion pysces/examples/testinvalidstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
print("\n Speedup: %.2f" % (par.statespersecond/ser.statespersecond))

print("\n\nParallel execution...with scatter and gather")
par2 = pysces.ParScanner(m)
par2 = pysces.ParScanner(m, engine='ipcluster')
t5=time.time()
par2.addScanParameter('V4',0.01,200,2000,log=True)
par2.addUserOutput('J_R1', 'A')
Expand Down
Loading

0 comments on commit 6503861

Please sign in to comment.