Skip to content

Commit

Permalink
[log] local throughput metrics (hpcaitech#811)
Browse files Browse the repository at this point in the history
* Revert "[zero] add ZeroTensorShardStrategy (hpcaitech#793)"

This reverts commit 88759e2.

* [gemini] set cpu memory capacity

* [log] local throughput collecting

* polish

* polish

* polish

* polish code

* polish
  • Loading branch information
feifeibear authored Apr 20, 2022
1 parent dd92b90 commit 61c20b4
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 11 deletions.
25 changes: 17 additions & 8 deletions colossalai/trainer/hooks/_metric_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def get_accumulated_value(self):
def get_last_step_value(self) -> str:
"""Returns :attr:`last_step_loss`.
"""
return str(self.last_step_loss)
return str(self.last_step_loss.cpu().item())

@staticmethod
def is_better(a, b):
Expand Down Expand Up @@ -207,7 +207,7 @@ def update(self, logits, targets, batch_size) -> None:
def get_last_step_value(self) -> str:
self.last_step_sum = all_reduce(self.last_step_sum, ParallelMode.DATA)
self.last_step_correct = all_reduce(self.last_step_correct, ParallelMode.DATA)
return str(_format_number((self.last_step_correct / self.last_step_sum).item()))
return str(_format_number((self.last_step_correct / self.last_step_sum).cpu().item()))

def get_accumulated_value(self):
self.accumulated_sum = all_reduce(self.accumulated_sum, ParallelMode.DATA)
Expand Down Expand Up @@ -324,7 +324,7 @@ class ThroughputMetric(Metric):
epoch_only (bool): Whether the metric only read for the full epoch.
"""

def __init__(self, epoch_only: bool, ignored_steps: int = 0, tflop_per_step: int = 0):
def __init__(self, epoch_only: bool, ignored_steps: int = 0, tflop_per_step: int = 0, use_local: bool = False):
super().__init__(epoch_only=epoch_only)
self.ignored_steps = ignored_steps
self.cur_steps = 0
Expand All @@ -333,6 +333,7 @@ def __init__(self, epoch_only: bool, ignored_steps: int = 0, tflop_per_step: int
self.last_step_num_samples = torch.zeros(1, device=get_current_device())
self.last_step_used_time = torch.zeros(1, device=get_current_device())
self._tflop_per_step = tflop_per_step
self._use_local = use_local

def reset(self) -> None:
# self.cur_steps = 0
Expand All @@ -350,9 +351,13 @@ def update(self, num_samples, time) -> None:
self.accumulated_used_time += self.last_step_used_time

def get_last_step_value(self) -> str:
self.last_step_used_time = all_reduce(self.last_step_used_time, ParallelMode.DATA) / \
gpc.get_world_size(ParallelMode.DATA)
self.last_step_num_samples = all_reduce(self.last_step_num_samples, ParallelMode.DATA)
if self._use_local:
self.last_step_num_samples *= gpc.get_world_size(ParallelMode.DATA)
else:
self.last_step_used_time = all_reduce(self.last_step_used_time, ParallelMode.DATA) / \
gpc.get_world_size(ParallelMode.DATA)
self.last_step_num_samples = all_reduce(self.last_step_num_samples, ParallelMode.DATA)

sample_per_sec = _format_number(self.last_step_num_samples / (self.last_step_used_time + 1e-12).item())
if self._tflop_per_step > 0:
tflops = _format_number(self._tflop_per_step / (self.last_step_used_time.item() + 1e-12))
Expand Down Expand Up @@ -380,19 +385,23 @@ class ThroughputHook(MetricHook):
priority (int, optional): Priority in the printing, hooks with small priority will be printed in front
defaults to 10. If different hooks share same priority, the order of printing would
depend on the hooks order in the hook list.
tflop_per_step(int, optional): tera floating point operations per step.
use_local (bool, optional): Whether to use local time for throughput calculation.
"""

def __init__(self, ignored_steps: int = 0, priority: int = 10, tflop_per_step: int = 0):
def __init__(self, ignored_steps: int = 0, priority: int = 10, tflop_per_step: int = 0, use_local=False):
super().__init__(priority)
self.ignored_steps = ignored_steps
self._tflop_per_step = tflop_per_step
self._use_local = use_local

def after_hook_is_attached(self, trainer):
self._check_metric_states_initialization(trainer)
if self._is_stage_to_compute:
self.metric = ThroughputMetric(epoch_only=True,
ignored_steps=self.ignored_steps,
tflop_per_step=self._tflop_per_step)
tflop_per_step=self._tflop_per_step,
use_local=self._use_local)

# register the metric
trainer.states['metrics']['train']['Throughput'] = self.metric
Expand Down
4 changes: 2 additions & 2 deletions colossalai/zero/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ def convert_to_zero_v2(model: nn.Module, optimizer: torch.optim.Optimizer, model

logger = get_dist_logger('convert_to_zero_v2')

logger.info(f'optimizer_config is {optimizer_config}')
logger.info(f'optimizer_config is {optimizer_config}', ranks=[0])
if optimizer_config is None:
optimizer_config = dict()
logger.info(f'model_config is {model_config}')
logger.info(f'model_config is {model_config}', ranks=[0])
if model_config is None:
model_config = dict()

Expand Down
3 changes: 2 additions & 1 deletion colossalai/zero/sharded_optim/sharded_optim_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ def __init__(self,
self._register_master_weight()
if self.gpu_margin_mem_ratio != 0.0 and not isinstance(sharded_model._tensor_placement_policy,
AutoTensorPlacementPolicy):
self._logger.warning(f'gpu_margin_mem_ratio is meaningless when tensor_placement_policy is not "auto"')
self._logger.warning(f'gpu_margin_mem_ratio is meaningless when tensor_placement_policy is not "auto"',
ranks=[0])

if self._verbose:
self._logger.debug(
Expand Down

0 comments on commit 61c20b4

Please sign in to comment.