Skip to content

Commit

Permalink
[refactor] moving memtracer to gemini (hpcaitech#801)
Browse files Browse the repository at this point in the history
  • Loading branch information
feifeibear authored Apr 19, 2022
1 parent 8711c70 commit 4d9332b
Show file tree
Hide file tree
Showing 24 changed files with 103 additions and 88 deletions.
3 changes: 1 addition & 2 deletions colossalai/engine/ophooks/_memtracer_ophook.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
from colossalai.logging import get_dist_logger
from colossalai.core import global_context as gpc
from typing import Union
from colossalai.utils.memory_tracer import AsyncMemoryMonitor
import os
import math


Expand All @@ -25,6 +23,7 @@ class MemTracerOpHook(BaseOpHook):
"""

def __init__(self, warmup: int = 50, refreshrate: int = 10, data_prefix: str = "memstats"):
from colossalai.gemini.memory_tracer import AsyncMemoryMonitor
super().__init__()
self.async_mem_monitor = AsyncMemoryMonitor()
self._curiter = 0
Expand Down
16 changes: 9 additions & 7 deletions colossalai/engine/schedule/_pipeline_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@
from colossalai.logging import get_dist_logger
from colossalai.utils import switch_virtual_pipeline_parallel_rank
from colossalai.utils.cuda import get_current_device
from colossalai.zero.sharded_model import ShardedModelV2

from ._base_schedule import BaseSchedule


def get_tensor_shape():
if hasattr(gpc.config, 'TENSOR_SHAPE'):
return gpc.config.TENSOR_SHAPE

if not gpc.is_initialized(ParallelMode.PIPELINE):
return None

if hasattr(gpc.config, 'SEQ_LENGTH') and hasattr(gpc.config, 'GLOBAL_BATCH_SIZE') and hasattr(gpc.config, 'GLOBAL_BATCH_SIZE') and hasattr(gpc.config, 'HIDDEN_SIZE'):
if hasattr(gpc.config, 'SEQ_LENGTH') and hasattr(gpc.config, 'GLOBAL_BATCH_SIZE') and hasattr(
gpc.config, 'GLOBAL_BATCH_SIZE') and hasattr(gpc.config, 'HIDDEN_SIZE'):
if gpc.is_initialized(ParallelMode.DATA):
dp_size = gpc.get_world_size(ParallelMode.DATA)
else:
Expand All @@ -34,12 +35,12 @@ def get_tensor_shape():
seq_size = 1

tensor_shape = (gpc.config.SEQ_LENGTH // seq_size,
gpc.config.GLOBAL_BATCH_SIZE // dp_size // gpc.config.NUM_MICRO_BATCHES,
gpc.config.HIDDEN_SIZE)
gpc.config.GLOBAL_BATCH_SIZE // dp_size // gpc.config.NUM_MICRO_BATCHES, gpc.config.HIDDEN_SIZE)
return tensor_shape
else:
return None


def pack_return_tensors(return_tensors):
output, label = tuple(zip(*return_tensors))
if isinstance(output[0], torch.Tensor):
Expand Down Expand Up @@ -114,7 +115,7 @@ def load_micro_batch(self):
def pre_processing(self, engine):
# TODO: remove this after testing new zero with pipeline parallelism
model = engine.model
if isinstance(model, (NaiveAMPModel, ShardedModelV2)):
if isinstance(model, (NaiveAMPModel)) or hasattr(model, 'colo_attr'):
self.dtype = torch.half
model = model.model
sig = inspect.signature(model.forward)
Expand All @@ -125,7 +126,7 @@ def pre_processing(self, engine):
def _call_engine(model, input_tensor, batch_data):
if isinstance(model, NaiveAMPModel):
sig = inspect.signature(model.model.forward)
elif isinstance(model, ShardedModelV2):
elif hasattr(model, 'colo_attr'):
sig = inspect.signature(model.module.forward)
else:
sig = inspect.signature(model.forward)
Expand Down Expand Up @@ -385,7 +386,8 @@ def __init__(self,
self.num_model_chunks = num_model_chunks

def pre_processing(self, engine):
if isinstance(engine.model, ShardedModelV2):
# FIXME(jiaruifang) we shall not use ShardedModelV2 in pipeline mode, due to circular dependency.
if hasattr(engine.model, 'colo_attr'):
self.dtype = torch.half
elif isinstance(engine.model[0], NaiveAMPModel):
self.dtype = torch.half
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .model_data_memtracer import GLOBAL_MODEL_DATA_TRACER
from .memory_monitor import AsyncMemoryMonitor, SyncCudaMemoryMonitor
from .memstats_collector import MemStatsCollector

__all__ = ['AsyncMemoryMonitor', 'SyncCudaMemoryMonitor', 'MemStatsCollector']
__all__ = ['AsyncMemoryMonitor', 'SyncCudaMemoryMonitor', 'MemStatsCollector', 'GLOBAL_MODEL_DATA_TRACER']
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import torch

from colossalai.utils.memory import colo_device_memory_used
from colossalai.utils import colo_device_memory_used
from colossalai.utils import get_current_device


Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from colossalai.utils.memory_tracer.model_data_memtracer import GLOBAL_MODEL_DATA_TRACER
from colossalai.gemini.memory_tracer import GLOBAL_MODEL_DATA_TRACER
from colossalai.gemini.memory_tracer import SyncCudaMemoryMonitor
from colossalai.utils.memory import colo_device_memory_used
from colossalai.utils.memory_tracer import SyncCudaMemoryMonitor

import torch
import time
from typing import List
Expand Down Expand Up @@ -138,6 +139,9 @@ def clear(self) -> None:
self._model_data_cpu_list = []
self._overall_cpu_list = []

self._non_model_data_cpu_list = []
self._non_model_data_cuda_list = []

self._start_flag = False
self._step_idx = 0
self._step_total = 0
4 changes: 2 additions & 2 deletions colossalai/gemini/tensor_placement_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from colossalai.zero.sharded_param.tensor_utils import colo_model_data_tensor_move_inline, colo_tensor_mem_usage
from colossalai.utils.memory import colo_device_memory_capacity
from colossalai.zero.sharded_param.tensorful_state import StatefulTensor
from colossalai.utils.memory_tracer import MemStatsCollector
from colossalai.utils.memory_tracer.model_data_memtracer import GLOBAL_MODEL_DATA_TRACER
from colossalai.gemini.memory_tracer import MemStatsCollector
from colossalai.gemini.memory_tracer import GLOBAL_MODEL_DATA_TRACER
from typing import Type


Expand Down
9 changes: 9 additions & 0 deletions colossalai/trainer/hooks/_commons_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import torch


def _format_number(val, prec=5):
if isinstance(val, float):
return f'{val:.{prec}g}'
elif torch.is_tensor(val) and torch.is_floating_point(val):
return f'{val.item():.{prec}g}'
return val
56 changes: 21 additions & 35 deletions colossalai/trainer/hooks/_log_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,7 @@
from colossalai.utils import report_memory_usage, is_dp_rank_0, \
is_tp_rank_0, is_no_pp_or_last_stage, MultiTimer
from ._base_hook import BaseHook


def _format_number(val, prec=5):
if isinstance(val, float):
return f'{val:.{prec}g}'
elif torch.is_tensor(val) and torch.is_floating_point(val):
return f'{val.item():.{prec}g}'
return val
from ._commons_ import _format_number


class LogByEpochHook(BaseHook):
Expand All @@ -35,10 +28,7 @@ class LogByEpochHook(BaseHook):
depend on the hooks order in the hook list.
"""

def __init__(self,
logger,
interval: int = 1,
priority: int = 1):
def __init__(self, logger, interval: int = 1, priority: int = 1):
super().__init__(priority)
self.logger = logger
self._interval = interval
Expand All @@ -63,14 +53,12 @@ def __init__(self, priority: int = 10):
def after_train_iter(self, trainer, *args):
trainer.states['step_metrics'] = dict()
for metric_name, metric_calculator in trainer.states['metrics']['train'].items():
trainer.states['step_metrics'][metric_name.lower()] = \
f'{_format_number(metric_calculator.get_last_step_value())}'
trainer.states['step_metrics'][metric_name.lower()] = metric_calculator.get_last_step_value()

def after_test_iter(self, trainer, *args):
trainer.states['step_metrics'] = dict()
for metric_name, metric_calculator in trainer.states['metrics']['test'].items():
trainer.states['step_metrics'][metric_name.lower()] = \
f'{_format_number(metric_calculator.get_last_step_value())}'
trainer.states['step_metrics'][metric_name.lower()] = metric_calculator.get_last_step_value()


@HOOKS.register_module
Expand All @@ -85,18 +73,14 @@ class LogMetricByEpochHook(LogByEpochHook):
depend on the hooks order in the hook list.
"""

def __init__(self,
logger,
interval: int = 1,
priority: int = 10) -> None:
def __init__(self, logger, interval: int = 1, priority: int = 10) -> None:
super().__init__(logger, interval, priority)
self._is_rank_to_log = is_dp_rank_0() and is_tp_rank_0() and is_no_pp_or_last_stage()

def _get_str(self, trainer, mode):
msg = []
for metric_name, metric_calculator in trainer.states['metrics'][mode].items():
msg.append(
f'{metric_name} = {_format_number(metric_calculator.get_accumulated_value())}')
msg.append(f'{metric_name} = {_format_number(metric_calculator.get_accumulated_value())}')
msg = ' | '.join(msg)
return msg

Expand Down Expand Up @@ -130,12 +114,13 @@ class TensorboardHook(BaseHook):
depend on the hooks order in the hook list.
"""

def __init__(self,
log_dir: str,
ranks: List = None,
parallel_mode: ParallelMode = ParallelMode.GLOBAL,
priority: int = 10,
) -> None:
def __init__(
self,
log_dir: str,
ranks: List = None,
parallel_mode: ParallelMode = ParallelMode.GLOBAL,
priority: int = 10,
) -> None:
super().__init__(priority=priority)
from torch.utils.tensorboard import SummaryWriter

Expand Down Expand Up @@ -280,13 +265,14 @@ class LogMemoryByEpochHook(LogByEpochHook):
log_eval (bool, optional): Whether writes in evaluation, defaults to True.
"""

def __init__(self,
logger: DistributedLogger,
interval: int = 1,
priority: int = 10,
log_eval: bool = True,
report_cpu: bool = False, # no reference
) -> None:
def __init__(
self,
logger: DistributedLogger,
interval: int = 1,
priority: int = 10,
log_eval: bool = True,
report_cpu: bool = False, # no reference
) -> None:
super().__init__(logger=logger, interval=interval, priority=priority)
self._log_eval = log_eval
self._is_rank_to_log = is_dp_rank_0() and is_tp_rank_0()
Expand Down
2 changes: 1 addition & 1 deletion colossalai/trainer/hooks/_mem_tracer_hook.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from colossalai.registry import HOOKS
from torch import Tensor
from colossalai.trainer.hooks import BaseHook
from colossalai.utils.memory_tracer import AsyncMemoryMonitor
from colossalai.gemini.memory_tracer import AsyncMemoryMonitor


@HOOKS.register_module
Expand Down
46 changes: 30 additions & 16 deletions colossalai/trainer/hooks/_metric_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from colossalai.utils import get_current_device, is_no_pp_or_last_stage

from ._base_hook import BaseHook
from ._commons_ import _format_number


class Metric(ABC):
Expand Down Expand Up @@ -51,7 +52,7 @@ def update(self, *args, **kwargs) -> None:
pass

@abstractmethod
def get_last_step_value(self):
def get_last_step_value(self) -> str:
"""Returns the metric value in the last iteration.
"""
pass
Expand Down Expand Up @@ -120,10 +121,10 @@ def get_accumulated_value(self):
self.accum_loss.div_(self.count)
return self.accum_loss.item()

def get_last_step_value(self):
def get_last_step_value(self) -> str:
"""Returns :attr:`last_step_loss`.
"""
return self.last_step_loss
return str(self.last_step_loss)

@staticmethod
def is_better(a, b):
Expand All @@ -148,8 +149,8 @@ def reset(self) -> None:
def update(self, lr) -> None:
self.lr = lr

def get_last_step_value(self):
return self.lr
def get_last_step_value(self) -> str:
return str(self.lr)

def get_accumulated_value(self):
return self.lr
Expand Down Expand Up @@ -203,10 +204,10 @@ def update(self, logits, targets, batch_size) -> None:
self.accumulated_sum += self.last_step_sum
self.accumulated_correct += self.last_step_correct

def get_last_step_value(self):
def get_last_step_value(self) -> str:
self.last_step_sum = all_reduce(self.last_step_sum, ParallelMode.DATA)
self.last_step_correct = all_reduce(self.last_step_correct, ParallelMode.DATA)
return (self.last_step_correct / self.last_step_sum).item()
return str(_format_number((self.last_step_correct / self.last_step_sum).item()))

def get_accumulated_value(self):
self.accumulated_sum = all_reduce(self.accumulated_sum, ParallelMode.DATA)
Expand Down Expand Up @@ -322,14 +323,16 @@ class ThroughputMetric(Metric):
Args:
epoch_only (bool): Whether the metric only read for the full epoch.
"""
def __init__(self, epoch_only: bool, ignored_steps: int = 0):

def __init__(self, epoch_only: bool, ignored_steps: int = 0, tflop_per_step: int = 0):
super().__init__(epoch_only=epoch_only)
self.ignored_steps = ignored_steps
self.cur_steps = 0
self.accumulated_num_samples = torch.zeros(1, device=get_current_device())
self.accumulated_used_time = torch.zeros(1, device=get_current_device())
self.last_step_num_samples = torch.zeros(1, device=get_current_device())
self.last_step_used_time = torch.zeros(1, device=get_current_device())
self._tflop_per_step = tflop_per_step

def reset(self) -> None:
# self.cur_steps = 0
Expand All @@ -346,13 +349,18 @@ def update(self, num_samples, time) -> None:
self.accumulated_num_samples += self.last_step_num_samples
self.accumulated_used_time += self.last_step_used_time

def get_last_step_value(self):
def get_last_step_value(self) -> str:
self.last_step_used_time = all_reduce(self.last_step_used_time, ParallelMode.DATA) / \
gpc.get_world_size(ParallelMode.DATA)
self.last_step_num_samples = all_reduce(self.last_step_num_samples, ParallelMode.DATA)
return (self.last_step_num_samples / (self.last_step_used_time + 1e-12)).item()

def get_accumulated_value(self):
sample_per_sec = _format_number(self.last_step_num_samples / (self.last_step_used_time + 1e-12).item())
if self._tflop_per_step > 0:
tflops = _format_number(self._tflop_per_step / (self.last_step_used_time.item() + 1e-12))
return f"{sample_per_sec} sample_per_sec, {tflops} Tflops"
else:
return f"{sample_per_sec} sample_per_sec"

def get_accumulated_value(self) -> float:
self.accumulated_used_time = all_reduce(self.accumulated_used_time, ParallelMode.DATA) / \
gpc.get_world_size(ParallelMode.DATA)
self.accumulated_num_samples = all_reduce(self.accumulated_num_samples, ParallelMode.DATA)
Expand All @@ -373,14 +381,18 @@ class ThroughputHook(MetricHook):
defaults to 10. If different hooks share same priority, the order of printing would
depend on the hooks order in the hook list.
"""
def __init__(self, ignored_steps: int = 0, priority: int = 10):

def __init__(self, ignored_steps: int = 0, priority: int = 10, tflop_per_step: int = 0):
super().__init__(priority)
self.ignored_steps = ignored_steps
self._tflop_per_step = tflop_per_step

def after_hook_is_attached(self, trainer):
self._check_metric_states_initialization(trainer)
if self._is_stage_to_compute:
self.metric = ThroughputMetric(epoch_only=True, ignored_steps=self.ignored_steps)
self.metric = ThroughputMetric(epoch_only=True,
ignored_steps=self.ignored_steps,
tflop_per_step=self._tflop_per_step)

# register the metric
trainer.states['metrics']['train']['Throughput'] = self.metric
Expand All @@ -392,12 +404,14 @@ def before_train_epoch(self, trainer):

def after_train_iter(self, trainer, *args):
if self._is_stage_to_compute:
self.metric.update(trainer.engine.schedule.batch_size, trainer._timer.get_timer('Train-step').get_elapsed_time())
self.metric.update(trainer.engine.schedule.batch_size,
trainer._timer.get_timer('Train-step').get_elapsed_time())

def before_test(self, trainer):
if self._is_stage_to_compute:
self.metric.reset()

def after_test_iter(self, trainer, *args):
if self._is_stage_to_compute:
self.metric.update(trainer.engine.schedule.batch_size, trainer._timer.get_timer('Test-step').get_elapsed_time())
self.metric.update(trainer.engine.schedule.batch_size,
trainer._timer.get_timer('Test-step').get_elapsed_time())
4 changes: 2 additions & 2 deletions colossalai/zero/sharded_model/sharded_model_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from colossalai.engine.paramhooks import BaseParamHookMgr
from colossalai.logging import get_dist_logger
from colossalai.utils import get_current_device, disposable
from colossalai.utils.memory_tracer.memstats_collector import MemStatsCollector
from colossalai.utils.memory_tracer.model_data_memtracer import \
from colossalai.gemini.memory_tracer.memstats_collector import MemStatsCollector
from colossalai.gemini.memory_tracer.model_data_memtracer import \
GLOBAL_MODEL_DATA_TRACER
from colossalai.utils.memory import colo_device_memory_capacity
from colossalai.zero.shard_utils import BaseShardStrategy
Expand Down
Loading

0 comments on commit 4d9332b

Please sign in to comment.