Skip to content
This repository has been archived by the owner on Mar 19, 2024. It is now read-only.

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
iseessel committed Jun 4, 2021
1 parent 3dd9b79 commit 8200338
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 69 deletions.

This file was deleted.

32 changes: 32 additions & 0 deletions dev/benchmark_suite/benchmark_suite_scheduler_template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"params": {
"training_checkpoint_dir": "(str) Training checkpoint directory. That is the CHECKPOINT.DIR of the training config",
"benchmarks": [
{
"evaluation_name": "(str) Name of benchmark for convenience",
"config_files": [
"config=path/to/evaluation/config",
"config.OVERRIDES=new_value"
]
}
],
"evaluation_iter_freq": "(int, default=-1) Evaluate the checkpoint every N iterations",
"evaluation_phase_freq": "(int, default=-1) Evaluate the checkpoint every N phases",
"evaluate_final_phase": "(bool, default=True) Evaluate the final phase",
"autoload_slurm_evaluator_checkpoint": "(bool, default=False) Whether or not to automatically load the benchmark checkpoint",
"slurm_evaluator_checkpoint": "(str, default=None) Path to load the benchmark checkpoint",
"auto_retry_evaluations": "(bool, default=False) Whether or not to automatically retry the evaluations",
"retry_evaluation_job_ids": "(array[int], default=[]) Array of job_ids to retry",
"max_retries": "(int, default=3) Maximum number of retries",
},
"slurm_options": {
"PARTITION": "(str) Partition",
"NAME": "(str, default=vissl) Name of slurm job",
"COMMENT": "(str, default=vissl evaluation job) Comment of slurm job",
"CONSTRAINT": "(str, default='') Constraing of slurm job",
"TIMEOUT_MIN": "(int, default=72 * 60) Minimum amount of minutes to timeout",
"CPUS_PER_TASK": "(int, default=8) Numer of cpus per task.",
"MEM_GB": "(int, default=32) Amount of RAM to request from slurm",
"ADDITIONAL_PARAMETERS": "(Dict[[str, Any]], default={}) Any default slurm options to pass to submitit",
}
}
13 changes: 9 additions & 4 deletions dev/launch_benchmark_suite_scheduler_slurm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# This benchmark suite script launches a benchmark suite scheduler slurm job.
# The job takes an absolute json config path (see benchmark_suite_scheduler_template.json for info)
# The job continuously monitors training benchmarks, and dynamically launches evaluation jobs
# and amalgamates the results.

######################### EXAMPLE USAGE #################################

# cd into vissl root directory.
#
# bash ./dev/launch_benchmark_suite_scheduler_slurm.sh \
#/private/home/iseessel/vissl/configs/config/benchmark/slurm_evaluations/slurm_evaluation_example.json
#
# See slurm_evaluation_example.json or for an example config or slurm_evaluator.py for class structure.
# bash ./dev/launch_benchmark_suite_scheduler_slurm.sh /path/to/benchmark_suite_scheduler.json

# See benchmark_suite_scheduler_template.json or for config information or slurm_evaluator.py for class structure.
######################### INPUT PARAMS ##################################

FILE=( "$@" )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import collections
import json
import logging
import os
import time
from pathlib import Path
Expand All @@ -16,12 +17,21 @@
from vissl.config.attr_dict import AttrDict
from vissl.trainer.trainer_main import build_task
from vissl.utils.distributed_launcher import launch_distributed_on_slurm
from vissl.utils.hydra_config import convert_to_attrdict, is_hydra_available
from vissl.utils.hydra_config import convert_to_attrdict
from vissl.utils.io import load_file
from vissl.utils.misc import flatten_dict, retry
from vissl.utils.slurm import is_submitit_available


"""
This class is designed to be used to run multiple evaluations on a single (pre)training.
Using the #evaluate method we continuously monitor training checkpoints, launch evaluations
dynamically as they become available, and amalgamate the evaluation results as they become
available.
For SLURM usage, you should create a JSON configuration file (see benchmark_suite_scheduler_template.json)
and use the launch_benchmark_suite_scheduler_slurm.sh for convenience.
"""

# How many times to retry a slurm job submission.
_NUM_SLURM_RETRIES = 5
# How many seconds to sleep between iterations of the main loop.
Expand All @@ -43,7 +53,7 @@
"TIMEOUT",
]
# Wait for the training checkpoint folder to be available for 1 hour.
_TRAINING_CHECKPOINT_FILE_WAIT_SECONDS = 60 * 60
_TRAINING_CONFIG_WAIT_SECONDS = 60 * 60


class BenchmarkSuiteScheduler:
Expand Down Expand Up @@ -123,7 +133,7 @@ def __init__(
self.auto_retry_evaluations = auto_retry_evaluations
self.max_retries = max_retries

self._validate_class_instance()
self.validate()

# Will be set in #evaluate, once training_checkpoint_dir becomes available.
self.training_config = None
Expand All @@ -140,22 +150,34 @@ def evaluate(self):
"""
start_time = time.time()

# Wait for the training checkpoint folder to be available.
# Wait for the training config to be available. This indicates the training has begun.
while True:
if time.time() - start_time > _TRAINING_CHECKPOINT_FILE_WAIT_SECONDS:
if time.time() - start_time > _TRAINING_CONFIG_WAIT_SECONDS:
raise RuntimeError(
f"Training checkpoint folder still doesn't exist after:"
f"{_TRAINING_CHECKPOINT_FILE_WAIT_SECONDS / 60} minutes"
f"Training config still doesn't exist after:"
f"{_TRAINING_CONFIG_WAIT_SECONDS / 60} minutes"
)

if PathManager.exists(self.training_checkpoint_file):
# Load training yaml config.
self._load_training_config()

# Build main training task in order to extract iteration info.
self.training_task = build_task(self.training_config)
self.training_task.dataloaders = self.training_task.build_dataloaders(
False
)

# Generate evaluation results
self.evaluation_results = self._generate_initial_benchmark_results()
self._validate_evaluation_setup()

break

time.sleep(_SLEEP_TIME_SECONDS)

self.save_evaluation_results()
# Save initial evaluation benchmarks, for checkpointing reasons.
self.save_evaluation_benchmarks()

# Checkpoint folder is now available. Continuously monitor the training checkpoints,
# launch evaluation jobs as needed, monitor their progress, and record their results.
Expand All @@ -166,19 +188,24 @@ def evaluate(self):

# Break if no more checkpoints to evaluate
if self._finished():
print("Evaluations are finished")
logging.info("Evaluations are finished")
break

time.sleep(_SLEEP_TIME_SECONDS)

def save_evaluation_results(self):
def save_evaluation_benchmarks(self):
"""
Upload json file to the evaluation directory.
Create the /evaluations directory inside the training checkpoints dir.
Upload json file to the parent evaluation directories, as well as
to each child evaluation directories.
"""
# Upload all checkpoints evaluations to parent checkpoint directory.
evaluation_dir = self.evaluation_dir()
parent_metrics_file = os.path.join(evaluation_dir, "evaluation_metrics.json")
Path(evaluation_dir).mkdir(parents=True, exist_ok=True)

if not PathManager.exists(evaluation_dir):
PathManager.mkdirs(evaluation_dir)

self._write_json_file(self.evaluation_results, parent_metrics_file)

# Upload each checkpoint's evaluations to child directories.
Expand All @@ -187,10 +214,13 @@ def save_evaluation_results(self):
child_metrics_file = os.path.join(
child_metrics_dir, "evaluation_metrics.json"
)
Path(child_metrics_dir).mkdir(parents=True, exist_ok=True)

if not PathManager.exists(child_metrics_dir):
PathManager.mkdirs(child_metrics_dir)

self._write_json_file(benchmarks, child_metrics_file)

print("Saved evaluation Results")
logging.info("Saved evaluation Results")

def evaluation_dir(self):
return os.path.join(self.training_checkpoint_dir, "evaluations")
Expand All @@ -199,26 +229,33 @@ def _load_training_config(self):
# Load training yaml config.
self.training_config = load_file(self.training_checkpoint_file)
self.training_config = AttrDict(self.training_config)
print(
logging.info(
f"Loaded training checkpoint config from: { self.training_checkpoint_file }"
)
# Build main training task in order to extract iteration info.
self.training_task = build_task(self.training_config)
self.training_task.dataloaders = self.training_task.build_dataloaders(False)
self.evaluation_results = self._generate_initial_evaluation_results()
self._validate_training_cfg()

def _validate_class_instance(self):
def validate(self):
"""
Validate the class instance is valid.
"""
assert is_hydra_available(), "Make sure to install hydra"

assert not (
self.autoload_slurm_evaluator_checkpoint and self.slurm_evaluator_checkpoint
), "Specify only one of autoload_slurm_evaluator_checkpoint and slurm_evaluator_checkpoint"
assert (
type(self.evaluation_iter_freq) is int and self.evaluation_iter_freq >= -1
), "The evaluation_iter_freq must be an int >= 1"
assert (
type(self.evaluation_phase_freq) is int and self.evaluation_phase_freq >= -1
), "The evaluation_phase_freq must be an int >= 1"
assert (
self.evaluation_iter_freq >= -1
or self.evaluation_phase_freq >= -1
or self.evaluate_final_phase
), "No evaluations will be run with the current config. Specify evaluation_iter_freq, evaluation_phase_freq, or evaluate_final_phase"
assert (
is_submitit_available()
), "Please 'pip install submitit' to schedule jobs on SLURM"
type(self.max_retries) is int and self.max_retries >= -1
), "Max retries must be >= -1."

def _validate_training_cfg(self):
def _validate_evaluation_setup(self):
if self.evaluation_iter_freq > -1:
assert (
self.evaluation_iter_freq
Expand Down Expand Up @@ -265,7 +302,7 @@ def _evaluate_checkpoint(self, checkpoint_str, benchmarks):
self.evaluation_jobs_finished.remove(benchmark["job_id"])
# Log the job retry.
job_id, slurm_state = benchmark["job_id"], benchmark["slurm_state"]
print(f"Retrying job: { job_id } in state: { slurm_state }")
logging.info(f"Retrying job: { job_id } in state: { slurm_state }")

args, config = self._generate_config(benchmark["config_files"])
job = self._launch_slurm_job(args, config)
Expand All @@ -290,10 +327,10 @@ def _evaluate_checkpoint(self, checkpoint_str, benchmarks):
state_prev: None, state_current: { job.state }
"""

print(log)
logging.info(log)

# Save evaluation results to json file.
self.save_evaluation_results()
self.save_evaluation_benchmarks()

def _retry_job(self, benchmark):
return benchmark["job_id"] in self.retry_evaluation_job_ids or (
Expand Down Expand Up @@ -344,10 +381,10 @@ def _monitor_benchmark_job(self, benchmark):
state_prev: { benchmark["slurm_state"] }, state_curr: { job.state }
"""

print(log)
logging.info(log)
# Benchmark Job state has changed. Update the benchmark state.
self._update_benchmark_state(benchmark, job)
self.save_evaluation_results()
self.save_evaluation_benchmarks()

def _update_benchmark_state(self, benchmark, job):
# Job state has changed, record it.
Expand Down Expand Up @@ -395,7 +432,7 @@ def _set_largest_metric(self, metrics, final_metrics):
"train_phase_idx"
]

def _generate_initial_evaluation_results(self):
def _generate_initial_benchmark_results(self):
default_checkpoint = os.path.join(
self.evaluation_dir(), "evaluation_metrics.json"
)
Expand Down Expand Up @@ -469,7 +506,7 @@ def _load_evaluation_results_checkpoint(self):

evaluation_config = load_file(checkpoint_file)

print(f"Loaded evaluation results checkpoint from: { checkpoint_file }")
logging.info(f"Loaded evaluation results checkpoint from: { checkpoint_file }")

return evaluation_config

Expand Down
Loading

0 comments on commit 8200338

Please sign in to comment.