Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Existing output directory #660

Draft
wants to merge 21 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
fb91f20
Tests to skip module (MCCD PSF) if output file exists
martinkilbinger Sep 5, 2023
ac061c0
Merge remote-tracking branch 'upstream/develop' into output_exists
martinkilbinger Sep 17, 2023
a93546e
n_smp for more jobs; N_EPOCH bugs; ngmix checking for existing output
martinkilbinger Sep 17, 2023
1ed596c
numbering scheme with re pattern: copied, not changed
martinkilbinger Sep 24, 2023
ba4aa19
numbering scheme with re pattern: copied, not changed
martinkilbinger Sep 24, 2023
776943c
removed galsim from job script message
martinkilbinger Sep 24, 2023
3216eb6
Fixed import typo
martinkilbinger Oct 16, 2023
eeadfbf
ngmix template
martinkilbinger Oct 18, 2023
ee90dc1
testing openmpi 5.0.0 on candide
martinkilbinger Oct 28, 2023
22c2f56
Testing MPI on candide; errors with process list
martinkilbinger Oct 31, 2023
27e9bbb
Updated MPI setting and candide job
martinkilbinger Oct 31, 2023
c380cd4
ngmix runner reset to develop
martinkilbinger Oct 31, 2023
13940f5
ngmix script reset to develop
martinkilbinger Oct 31, 2023
0bec318
mccd and pysap dependencies added back in to example
martinkilbinger Oct 31, 2023
691daf3
submit run added missing arg
martinkilbinger Oct 31, 2023
3712734
config mpi
martinkilbinger Oct 31, 2023
ddf4091
removed debug prints
martinkilbinger Nov 6, 2023
3fb5a25
mpi4py upgraded to 3.1.5
martinkilbinger Nov 6, 2023
6a76454
changed warning print output
martinkilbinger Nov 6, 2023
7ac5042
ngmix_runner checked out from develop
martinkilbinger Nov 6, 2023
bfcdb78
Added testing of mmap file existence
martinkilbinger Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Testing MPI on candide; errors with process list
  • Loading branch information
martinkilbinger committed Oct 31, 2023
commit 22c2f56606bd0df66b1b530b5d393ba4824e697c
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ channels:
dependencies:
- python=3.9
- pip>=21.2.4
- numpy==1.21.6
- numpy==1.22
- astropy==5.0
- automake==1.16.2
- autoconf==2.69
Expand Down
2 changes: 1 addition & 1 deletion example/cfis/config_tile_Ng_template.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ VERBOSE = True
RUN_NAME = run_sp_tile_ngmix_NgXu

# Add date and time to RUN_NAME, optional, default: False
RUN_DATETIME = True
RUN_DATETIME = False


## ShapePipe execution options
Expand Down
11 changes: 8 additions & 3 deletions example/pbs/candide_mpi.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#PBS -l walltime=00:05:00

# Request number of cores (e.g. 2 from 2 different machines)
#PBS -l nodes=4:ppn=2
#PBS -l nodes=2:ppn=2

# Full path to environment
export SPENV="$HOME/.conda/envs/shapepipe"
Expand Down Expand Up @@ -50,9 +50,14 @@ fi
# Only version 5.0.0 downloaded from the web recognised the --mca argument
#/home/mkilbing/bin/mpirun -np $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 hostname

/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -map-by $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 hostname
/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -map-by $NSLOTS $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini
#/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -map-by node --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 hostname
#/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -map-by node $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini
#/home/mkilbing/bin/mpirun -map-by node $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini
#/home/mkilbing/bin/mpirun -n $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini
#$SPENV/bin/mpiexec -n $NSLOTS $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini
#/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -np $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini
/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -np $NSLOTS hostname
/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun -np $NSLOTS $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini

#/home/mkilbing/bin/mpirun -np $NSLOTS --mca pml ob1 --mca btl ^openib --mca orte_base_help_aggregate 0 --mca plm_tm_verbose 1 $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini

Expand Down
11 changes: 9 additions & 2 deletions shapepipe/modules/ngmix_package/ngmix.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""

import re
import os

import galsim
import ngmix
Expand Down Expand Up @@ -91,8 +92,12 @@ def __init__(
self._w_log = w_log

# Initiatlise random generator using image ID number
#seed = int(''.join(re.findall(r'\d+', self._output_path)))
seed = 6121975
basename = os.path.basename(self._output_path)
print("MKDEBUG output_path basename ", basename)
print(''.join(re.findall(r'\d+', basename)))
seed = int(''.join(re.findall(r'\d+', basename)))
print(seed)
#seed = 6121975
np.random.seed(seed)
self._w_log.info(f'Random generator initialisation seed = {seed}')

Expand Down Expand Up @@ -311,6 +316,8 @@ def save_results(self, output_dict):
Dictionary containing the results

"""
if os.path.exists(self._output_path):
raise IOError(f"Output file {self._output_path} already exists")
f = file_io.FITSCatalogue(
self._output_path,
open_mode=file_io.BaseCatalogue.OpenMode.ReadWrite
Expand Down
3 changes: 3 additions & 0 deletions shapepipe/modules/ngmix_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ def ngmix_runner(
)
else:
# Initialise class instance
w_log.info(
f"Processing data for output file {output_path}"
)
ngmix_inst = Ngmix(
input_file_list,
output_path,
Expand Down
29 changes: 28 additions & 1 deletion shapepipe/pipeline/file_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
from shapepipe.pipeline import shared
from shapepipe.utilities.file_system import mkdir

from mpi4py import MPI
import datetime
import time


class FileHandler(object):
"""File Handler.
Expand Down Expand Up @@ -930,6 +934,7 @@ def _save_num_patterns(
del file_list
break

print("MKDEBUG save_num_pattern: path = ", path)
if not true_file_list:
raise RuntimeError(
f'No files found matching "{pattern}" and "{ext}" in the '
Expand Down Expand Up @@ -995,6 +1000,11 @@ def _save_num_patterns(
)

# Save file list
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
now = datetime.datetime.now()
print(f"MKDEBUG save_num_patterns: save file list {output_file}, rank={rank}, size={size} time={now.time()}")
np.save(output_file, np.array(final_file_list))

del true_file_list, final_file_list
Expand All @@ -1015,7 +1025,21 @@ def _save_match_patterns(output_file, mmap_list):
List of memory maps

"""
num_pattern_list = [np.load(mmap, mmap_mode='r') for mmap in mmap_list]
#num_pattern_list = [np.load(mmap, mmap_mode='r') for mmap in mmap_list]
num_pattern_list = []
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
for mmap in mmap_list:
now = datetime.datetime.now()
print(f"MKDEBUG load mmap {mmap}, rank={rank}, size={size} time={now.time()}")
if not os.path.exists(mmap):
n_sec = 5
print(f"MKDEBUG waiting {n_sec}...")
time.sleep(n_sec)
if not os.path.exists(mmap):
print("MKDEBUG still not found")
num_pattern_list.append(np.load(mmap, mmap_mode="r"))

np.save(
output_file,
Expand Down Expand Up @@ -1223,6 +1247,8 @@ def _get_module_input_files(self, module, run_name):
num_scheme = self._module_dict[module][run_name]['numbering_scheme']
run_method = self._module_dict[module][run_name]['run_method']

print("MKDEBUG call _save_process_list, dir_list =", dir_list)
print("MKDEBUG tmp dir = ", self._tmp_dir, os.path.exists(self._tmp_dir))
self._save_process_list(
dir_list,
pattern_list,
Expand Down Expand Up @@ -1273,6 +1299,7 @@ def set_up_module(self, module):
self._set_module_properties(module, run_name)
self._create_module_run_dirs(module, run_name)
self._set_module_input_dir(module, run_name)
print("MKDEBUG call _get_module_input_files")
self._get_module_input_files(module, run_name)

def get_worker_log_name(self, module, file_number_string):
Expand Down
5 changes: 5 additions & 0 deletions shapepipe/pipeline/job_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from shapepipe.pipeline.worker_handler import WorkerHandler

from mpi4py import MPI

class JobHandler(object):
"""Job Handler.
Expand Down Expand Up @@ -78,6 +79,10 @@ def __init__(
self._log_job_parameters()

# Set up module in file handler
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
print(f"MKDEBUG set_up_module, rank = {rank}, size = {size}")
self.filehd.set_up_module(self._module)

# Set the total number of processes
Expand Down
11 changes: 11 additions & 0 deletions shapepipe/pipeline/mpi_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def submit_mpi_jobs(
timeout,
run_dirs,
module_runner,
module_config_sec,
worker_log,
verbose,
):
Expand All @@ -51,12 +52,22 @@ def submit_mpi_jobs(
w_log_name = worker_log(module_runner.__name__, process[0])

wh = WorkerHandler(verbose=verbose)
print("MKDEBUG in submit_mpi_job")
print("MKDEBUG process[1:] = ", process[1:])
print("MKDEBUG process[0] = ", process[0])
print("MKDEBUG w_log_name = ", w_log_name)
print("MKDEBUG run_dirs = ", run_dirs)
print("MKDEBUG config = ", config)
print("MKDEBUG timeout = ", timeout)
print("MKDEBUG module_runner = ", module_runner)
print("MKDEBUG module_config_sec = ", module_config_sec)
result.append(wh.worker(
process[1:],
process[0],
w_log_name,
run_dirs,
config,
module_config_sec,
timeout,
module_runner
))
Expand Down
9 changes: 6 additions & 3 deletions shapepipe/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,15 +405,16 @@ def run_mpi(pipe, comm):
# Get file handler objects
run_dirs = jh.filehd.module_run_dirs
module_runner = jh.filehd.module_runners[module]
module_config_sec = jh.filehd.get_module_config_sec(module)
worker_log = jh.filehd.get_worker_log_name
# Define process list
process_list = jh.filehd.process_list
# Define job list
jobs = split_mpi_jobs(process_list, comm.size)
del process_list
else:
job_type = module_runner = worker_log = timeout = \
jobs = run_dirs = None
job_type = module_runner = module_config_sec = worker_log = \
timeout = jobs = run_dirs = None

# Broadcast job type to all nodes
job_type = comm.bcast(job_type, root=0)
Expand All @@ -424,6 +425,7 @@ def run_mpi(pipe, comm):
run_dirs = comm.bcast(run_dirs, root=0)

module_runner = comm.bcast(module_runner, root=0)
module_config_sec = comm.bcast(module_config_sec, root=0)
worker_log = comm.bcast(worker_log, root=0)
timeout = comm.bcast(timeout, root=0)
jobs = comm.scatter(jobs, root=0)
Expand All @@ -436,14 +438,15 @@ def run_mpi(pipe, comm):
timeout,
run_dirs,
module_runner,
module_config_sec,
worker_log,
verbose
),
root=0,
)

# Delete broadcast objects
del module_runner, worker_log, timeout, jobs
del module_runner, module_config_sec, worker_log, timeout, jobs

# Finish up parallel jobs
if master:
Expand Down
Loading