Skip to content

Commit

Permalink
v1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
yifan92 committed Mar 30, 2021
1 parent a2bae33 commit 4f078c2
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 58 deletions.
2 changes: 0 additions & 2 deletions README.md

This file was deleted.

27 changes: 27 additions & 0 deletions jsub_juno.egg-info/PKG-INFO
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
Metadata-Version: 1.1
Name: jsub-juno
Version: 0.1.0.dev1
Summary: JSUB extension for JUNO experiment
Home-page: https://jsubpy.github.io/exts/
Author: Yifan Yang
Author-email: yangyf@ihep.ac.cn
License: MIT
Description: jsub-juno
#########

JSUB extension for JUNO experment

Keywords: jsub extension juno
Platform: UNKNOWN
Classifier: Development Status :: 2 - Pre-Alpha
Classifier: Intended Audience :: Science/Research
Classifier: Topic :: System :: Distributed Computing
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.6
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
9 changes: 9 additions & 0 deletions jsub_juno.egg-info/SOURCES.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
README.rst
setup.cfg
setup.py
jsub_juno/__init__.py
jsub_juno.egg-info/PKG-INFO
jsub_juno.egg-info/SOURCES.txt
jsub_juno.egg-info/dependency_links.txt
jsub_juno.egg-info/requires.txt
jsub_juno.egg-info/top_level.txt
1 change: 1 addition & 0 deletions jsub_juno.egg-info/dependency_links.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

1 change: 1 addition & 0 deletions jsub_juno.egg-info/requires.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
jsub
1 change: 1 addition & 0 deletions jsub_juno.egg-info/top_level.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
jsub_juno
10 changes: 8 additions & 2 deletions jsub_juno/action/juno_prod/juno_prod.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ eval positions='$JSUB_'$JSUB_step_type'_positions_jobvar'
eval volume='$JSUB_'$JSUB_step_type'_volume_jobvar'
eval material='$JSUB_'$JSUB_step_type'_material_jobvar'


# for condor backend, the output data should be under certain folder
data_folder=$JSUB_data_folder

Expand Down Expand Up @@ -104,24 +105,29 @@ elif [ "$JSUB_step_type" == "calib" ]; then
cmd_arg=$cmd_arg' --user-output '$user_output
elif [ "$JSUB_step_type" == "rec" ]; then
if [ -z "$gdml_file" ]; then
gdml_file=`find *gdml`
gdml_file=`find |grep gdml`
fi

cmd=$cmd'tut_calib2rec.py'
cmd_arg=$cmd_arg' --evtmax '$evtmax
cmd_arg=$cmd_arg' --input '$input
cmd_arg=$cmd_arg' --output '$output
cmd_arg=$cmd_arg' --gdml-file '$gdml_file
cmd_arg=$cmd_arg' --user-output '$user_output
if [ -n "$gdml_file" ]; then
cmd_arg=$cmd_arg' --gdml-file '$gdml_file
fi
elif [ "$JSUB_step_type" == "calib_woelec" ]; then
cmd=$cmd'tut_det2calib.py'
cmd_arg=$cmd_arg' --evtmax '$evtmax
cmd_arg=$cmd_arg' --input '$input
cmd_arg=$cmd_arg' --output '$output
# cmd_arg=$cmd_arg' --user-output '$user_output
elif [ "$JSUB_step_type" == "rec_woelec" ]; then
cmd=$cmd'tut_calib2rec.py'
cmd_arg=$cmd_arg' --evtmax '$evtmax
cmd_arg=$cmd_arg' --input '$input
cmd_arg=$cmd_arg' --output '$output
# cmd_arg=$cmd_arg' --user-output '$user_output
if [ -n "$gdml_file" ]; then
cmd_arg=$cmd_arg' --gdml-file '$gdml_file
fi
Expand Down
Binary file modified jsub_juno/scenario/__pycache__/juno.cpython-37.opt-1.pyc
Binary file not shown.
Binary file modified jsub_juno/scenario/__pycache__/juno.cpython-37.pyc
Binary file not shown.
176 changes: 122 additions & 54 deletions jsub_juno/scenario/juno.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,33 @@ def check_step(step, setting, attributes):
# JUNO environment
if 'softVersion' not in self.scenario_input:
# use default JUNO environment
junoTop = '/cvmfs/juno.ihep.ac.cn/sl6_amd64_gcc494/Pre-Release/J19v1r0-Pre3'
junoTop = '/cvmfs/juno.ihep.ac.cn/centos7_amd64_gcc830/Pre-Release/J20v2r0-Pre1'
else:
softVersion = self.scenario_input.get('softVersion')
junoTop_list = []
junoTop_list.extend(glob.glob('/%s/'%softVersion))
junoTop_list.extend(glob.glob('/cvmfs/%s/'%softVersion))
junoTop_list.extend(glob.glob('/cvmfs/juno.ihep.ac.cn/%s/'%softVersion))
junoTop_list.extend(glob.glob('/cvmfs/juno.ihep.ac.cn/*/%s/'%softVersion))
junoTop_list.extend(glob.glob('/cvmfs/juno.ihep.ac.cn/*/*/%s/'%softVersion))
if type(softVersion)==str:
junoTop_list.extend(glob.glob('/%s/'%softVersion))
junoTop_list.extend(glob.glob('/cvmfs/%s/'%softVersion))
junoTop_list.extend(glob.glob('/cvmfs/juno.ihep.ac.cn/%s/'%softVersion))
junoTop_list.extend(glob.glob('/cvmfs/juno.ihep.ac.cn/*/%s/'%softVersion))
junoTop_list.extend(glob.glob('/cvmfs/juno.ihep.ac.cn/*/*/%s/'%softVersion))
elif type(softVersion)==dict:
arch=softVersion.get('arch','*')
release=softVersion.get('release','*')
JUNOpath=os.path.join('/cvmfs/juno.ihep.ac.cn/',arch,'*/',release)
junoTop_list.extend(glob.glob(JUNOpath))

if not junoTop_list:
raise JunoScenarioError('No suitable environment on /cvmfs/ for JUNO version:%s'%softVersion)
raise JunoScenarioError('No suitable environment on /cvmfs/ for JUNO version setting.')
else:
junoTop = junoTop_list[0]


# build input sandbox
yaml_input=self.scenario_input.get('input')
input_sandbox={'common':{}}
if type(yaml_input) is dict:
input_sandbox['common'].update(yaml_input)

# deal with splitter
splitter=self.scenario_input.get('splitter')
Expand Down Expand Up @@ -73,6 +83,7 @@ def check_step(step, setting, attributes):
splitterMode='splitByEvent'
jobvarsToSeq={}
jobvars={}
index0=0
elif splitterMode in ['splitByJobvars','splitByJobvar','spliteByJobvar','spliteByJobvars']:
splitterMode='splitByJobvars'
if 'jobvarLists' in splitter:
Expand All @@ -87,11 +98,10 @@ def check_step(step, setting, attributes):
# build workflow
workflow={}
previous_steps=[]
input_map={'elecsim':'detsim','calib':'elecsim','rec':'calib','rec_woelec':'calib','calib_woelec':'detsim'}
dirac_upload_file_list=[]
input_map={'elecsim':'detsim','calib':'elecsim','rec':'calib','rec_woelec':'calib_woelec','calib_woelec':'detsim'}
dirac_upload_dict={} # {files_to_upload:dir, files_to_upload can be wildcard}
alg_counter=0 # idx of jobvar in algs




for step in job_steps:
Expand All @@ -106,6 +116,7 @@ def check_step(step, setting, attributes):
if step in ['detsim','elecsim','calib','rec','calib_woelec','rec_woelec']:

workflow[step]={'type':'juno_prod','actvar':{'step_type':step},'depend_on':p_steps}
step_type = step



Expand All @@ -130,61 +141,107 @@ def check_step(step, setting, attributes):
step_setting['full_args']=step_setting['fullArgs']

# resolve input from previous steps
rand = randint(0,200000000) #use rand to assure unique seed
rand = randint(0,20000000) #use rand to assure unique seed

if splitterMode in ['splitByEvent']:
# seed
seed = step_setting.get('seed',rand)
jobvarsToSeq.update({step+'_seed_jobvar':seed})
if step in input_map:
jobvarsToSeq.update({step+'_input_jobvar':input_map[step]})
else: # use the seed of first step, if detsim involved
index0=seed
jobvarsToSeq.update({step+'_output_jobvar':step})
jobvarsToSeq.update({step+'_user_output_jobvar':step+'_user'})
for attribute in ['additional_args','full_args','positions','particles','momentums','material','volume']:
if attribute in step_setting:
workflow[step]['actvar'][step+'_'+attribute+'_jobvar']=step_setting.get(attribute)

# seed
seed = step_setting.get('seed',rand)
jobvarsToSeq.update({step+'_seed_jobvar':seed})
dirac_upload_dict.update({'*'+step+'_*':'DIRACTOPDIR/'+step})

elif splitterMode in ['splitByJobvars']:
# seed setting
if 'seed' not in step_setting:
splitter['jobvar_lists'][step+'_seed_jobvar']={'type':'range','param':{'first':rand,'step':1}}
else:
splitter['jobvar_lists'][step+'_seed_jobvar']={'type':'composite_string','param':{'value': step_setting.get('seed')}}
splitter['jobvar_lists'][step+'_seed_jobvar']={'type':'composite_string','param':{'value': step_setting.get('seed')},'priority':0}




splitter['jobvar_lists']['univ_index']={'type':'range','param':{'first':0}}

# use seed for output/user-output names if not specified
if 'output' not in step_setting:
step_setting['output']='%s_$(univ_index)'%(step)
if 'user_output' not in step_setting:
step_setting['user_output']='%s_$(univ_index).user'%(step)
# Input and Output ----------------------------------
# get input from previous steps
if (step in input_map) and (input_map[step] in previous_steps):
step_setting['input']=workflow_input.get(input_map[step])['output']

output_name_from_input=False
# zhangxm file name scheme: detsim-$(seed).root, detsim_user-$(seed).root ...
if 'input' in step_setting:
input_step=input_map[step]
if input_step in step_setting['input']: #keyword replacement if step name in filenames
step_setting['output']=step_setting['input'].replace(input_step,step)
step_setting['user_output']=step_setting['input'].replace(input_step,step+'_user')
else: # add suffixes to indicate step name
step_setting['output']=step_setting['input']+'.'+step
step_setting['user_output']=step_setting['input']+'.'+step+'_user'

output_name_from_input=True

# handling outputLFN/userOutputLFN
LFN_count=0
if 'outputLFN' in step_setting:
LFN_count+=1
splitter['jobvar_lists'][step+'_outputLFN_jobvar']={'type':'composite_string','param':{'value':step_setting.get('outputLFN')},'priority':0}
dirac_upload_dict.update({step+'_outputLFN_jobvar':'COMPSTR'}) # comp str handled by DIRAC_UPLOAD module
# should override output
step_setting['output']=step_setting['outputLFN']
else:
dirac_upload_dict.update({'*'+step_setting['output']+'*':'DIRACTOPDIR/'+step})

if 'userOutputLFN' in step_setting:
LFN_count+=1
splitter['jobvar_lists'][step+'_userOutputLFN_jobvar']={'type':'composite_string','param':{'value':step_setting.get('userOutputLFN')},'priority':0}
dirac_upload_dict.update({step+'_userOutputLFN_jobvar':'COMPSTR'}) # comp str handled by DIRAC_UPLOAD module
# should override user_output
step_setting['user_output']=step_setting['userOutputLFN']
else:
dirac_upload_dict.update({'*'+step_setting['user_output']+'*':'DIRACTOPDIR/'+step})

if LFN_count==0:
dirac_upload_dict.update({'*'+step_type+'*':'DIRACTOPDIR/'+step})


# use seed for output/user-output names if not specified
if not output_name_from_input:
index_jobvar='univ_index'
if step+'_seed_jobvar' in splitter['jobvar_lists']:
index_jobvar=step+'_seed_jobvar'
if 'output' not in step_setting:
step_setting['output']='%s-$(%s)'%(step,index_jobvar) # use seed instead of univ-index
if 'user_output' not in step_setting:
step_setting['user_output']='%s_user-$(%s)'%(step,index_jobvar)



# validate step setting;
if step=='detsim':
check_step(step,step_setting,['output','user_output'])
elif step=='elecsim':
check_step(step,step_setting,['input','output','user_output'])
elif step=='calib':
check_step(step,step_setting,['input','output','user_output'])
elif step=='rec':
check_step(step,step_setting,['input','output'])
elif step=='calib_woelec':
check_step(step,step_setting,['input','output'])
elif step=='rec_woelec':
check_step(step,step_setting,['input','output'])
# if step=='detsim':
# check_step(step,step_setting,['output','user_output'])
# elif step=='elecsim':
# check_step(step,step_setting,['input','output','user_output'])
# elif step=='calib':
# check_step(step,step_setting,['input','output','user_output'])
# elif step=='rec':
# check_step(step,step_setting,['input','output','user_output'])
# elif step=='calib_woelec':
# check_step(step,step_setting,['input','output','user_output'])
# elif step=='rec_woelec':
# check_step(step,step_setting,['input','output','user_output'])

# if with jobvar splitter, translate some settings to composite_string jobvars
for attribute in ['input','output','user_output','additional_args','full_args','positions','particles','momentums','volume','material']:
if attribute in step_setting:
splitter['jobvar_lists'][step+'_'+attribute+'_jobvar']={'type':'composite_string','param':{'value':step_setting.get(attribute)}}
splitter['jobvar_lists'][step+'_'+attribute+'_jobvar']={'type':'composite_string','param':{'value':step_setting.get(attribute)},'priority':0}


# other necessary args for certain steps
Expand All @@ -208,14 +265,14 @@ def check_step(step, setting, attributes):
if script_file:
input_sandbox['common'].update({os.path.basename(script_file):os.path.abspath(script_file)})
if splitterMode in ['splitByJobvars']:
splitter['jobvar_lists'][step+'_argument_jobvar']={'type':'composite_string','param':{'value':script_args}}
splitter['jobvar_lists'][step+'_argument_jobvar']={'type':'composite_string','param':{'value':script_args},'priority':0}
workflow[step]={'type':'exe','actvar':{'exe':os.path.basename(script_file),'argument_jobvar':step+'_argument_jobvar','location':'common'},'depend_on':p_steps}
else:
workflow[step]={'type':'exe','actvar':{'exe':os.path.basename(script_file),'argument':script_args,'location':'common'},'depend_on':p_steps}

elif script_code:
if splitterMode in ['splitByJobvars']:
splitter['jobvar_lists'][step+'_code_jobvar']={'type':'composite_string','param':{'value':script_args}}
splitter['jobvar_lists'][step+'_code_jobvar']={'type':'composite_string','param':{'value':script_args},'priority':0}
workflow[step]={'type':'run_code','actvar':{'code_jobvar':step+'_code_jobvar'},'depend_on':p_steps}
else:
workflow[step]={'type':'run_code','actvar':{'code':script_code},'depend_on':p_steps}
Expand Down Expand Up @@ -243,11 +300,11 @@ def check_step(step, setting, attributes):
elif type(outputUpload) is not list:
raise JunoScenarioError('The value of outputUpload should be a string or a list. (in step %s)'%step)

dirac_upload_file_list.extend(outputUpload)
dirac_upload_dict.update({outputUpload:'DIRACTOPDIR/'+step})
if splitterMode in ['splitByJobvars']:
for k,v in textReplace.items():
splitter['jobvar_lists']['alg_rtext_'+str(alg_counter)]={'type':'composite_string','param':{'value':k}}
splitter['jobvar_lists']['alg_textr_'+str(alg_counter)]={'type':'composite_string','param':{'value':v}}
splitter['jobvar_lists']['alg_rtext_'+str(alg_counter)]={'type':'composite_string','param':{'value':k},'priority':0}
splitter['jobvar_lists']['alg_textr_'+str(alg_counter)]={'type':'composite_string','param':{'value':v},'priority':0}
alg_counter+=1

for f in soFile:
Expand All @@ -271,7 +328,7 @@ def check_step(step, setting, attributes):


if splitterMode in ['splitByJobvars']:
splitter['jobvar_lists'][step+'_argument_jobvar']={'type':'composite_string','param':{'value':software_args}}
splitter['jobvar_lists'][step+'_argument_jobvar']={'type':'composite_string','param':{'value':software_args},'priority':0}
workflow[step]={'type':'juno_soft','actvar':{'JUNO_top':junoTop,'software':software,'argument_jobvar':step+'_argument_jobvar'},'depend_on':p_steps}
else:
workflow[step]={'type':'juno_soft','actvar':{'JUNO_top':junoTop,'software':software,'argument':software_args},'depend_on':p_steps}
Expand All @@ -281,6 +338,8 @@ def check_step(step, setting, attributes):
cmd=step_setting.get('cmd')
workflow[step]={'type':'cmd','actvar':{'cmd':cmd},'depend_on':p_steps}

elif step_type.lower() in ['dirac_upload']:
workflow[step]={'type':'dirac_upload','actvar':step_setting,'depend_on':p_steps}

else: # invalid step type
raise JunoScenarioError('Invalid step type (%s) for step %s in workflow'%(step_type,step))
Expand All @@ -293,6 +352,8 @@ def check_step(step, setting, attributes):
if splitterMode in ['splitByEvent']:
splitter['jobvars']=jobvars
splitter['jobvarsToSeq']=jobvarsToSeq
if 'index0' not in splitter: # top priority if index0 specified by user
splitter['index0']=index0


# Condor backend
Expand All @@ -318,26 +379,33 @@ def check_step(step, setting, attributes):
# a dirac-upload action in the end, to upload everything:
workflow['dirac_upload']={'type':'dirac_upload','actvar':{},'depend_on':job_steps}
workflow['dirac_upload']['actvar']['overwrite']=dirac_setting.get('overwrite','True')
workflow['dirac_upload']['actvar']['SE']=dirac_setting.get('SE')

# outputDir for dir of full path; outputSubDir for subdir in user home.
dirac_outputSubDir=dirac_setting.get('outputSubDir',outputSubDir)
dirac_outputDir=dirac_setting.get('outputDir',outputDir)
dirac_upload_dict.update({'*xml':'DIRACTOPDIR/'})
if dirac_outputDir:
workflow['dirac_upload']['actvar']['user_home']='False'
if splitterMode in ['splitByJobvars']:
splitter['jobvar_lists']['dirac_upload_destination_dir_jobvar']={'type':'composite_string','param':{'value':dirac_outputDir}}
else:
workflow['dirac_upload']['actvar']['destination_dir']=dirac_outputDir
workflow['dirac_upload']['actvar']['files_to_upload']=os.path.join('*root,*user*,*xml')
# workflow['dirac_upload']['actvar']['user_home']='False'
# if splitterMode in ['splitByJobvars']:
# splitter['jobvar_lists']['dirac_upload_destination_dir_jobvar']={'type':'composite_string','param':{'value':dirac_outputDir},'priority':0}
# else:
# workflow['dirac_upload']['actvar']['destination_dir']=dirac_outputDir
dirac_topdir=dirac_outputDir
elif dirac_outputSubDir:
workflow['dirac_upload']['actvar']['user_home']='True'
if splitterMode in ['splitByJobvars']:
splitter['jobvar_lists']['dirac_upload_destination_dir_jobvar']={'type':'composite_string','param':{'value':os.path.join(dirac_outputSubDir,taskName)}}
else:
workflow['dirac_upload']['actvar']['destination_dir']=os.path.join(dirac_outputSubDir,taskName)
# workflow['dirac_upload']['actvar']['user_home']='True'
# if splitterMode in ['splitByJobvars']:
# splitter['jobvar_lists']['dirac_upload_destination_dir_jobvar']={'type':'composite_string','param':{'value':os.path.join(dirac_outputSubDir,taskName)},'priority':0}
# else:
# workflow['dirac_upload']['actvar']['destination_dir']=os.path.join(dirac_outputSubDir,taskName)
dirac_topdir=os.path.join(dirac_outputSubDir,taskName)

dirac_upload_file_list.extend(['*root','*user*','*xml'])
workflow['dirac_upload']['actvar']['files_to_upload']=','.join(dirac_upload_file_list)
keys=dirac_upload_dict.keys()
for key in keys:
if 'DIRACTOPDIR' in dirac_upload_dict[key]:
dirac_upload_dict[key]=dirac_upload_dict[key].replace('DIRACTOPDIR',dirac_topdir)

workflow['dirac_upload']['actvar']['upload_dict']=dirac_upload_dict


#when input is not from previous steps, use a dirac-download action to get input
Expand Down

0 comments on commit 4f078c2

Please sign in to comment.