forked from hpcaitech/ColossalAI
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Optimize pipeline schedule (hpcaitech#94)
* add pipeline shared module wrapper and update load batch * added model parallel process group for amp and clip grad (hpcaitech#86) * added model parallel process group for amp and clip grad * update amp and clip with model parallel process group * remove pipeline_prev/next group (hpcaitech#88) * micro batch offload * optimize pipeline gpu memory usage * pipeline can receive tensor shape (hpcaitech#93) * optimize pipeline gpu memory usage * fix grad accumulation step counter * rename classes and functions Co-authored-by: Frank Lee <somerlee.9@gmail.com>
- Loading branch information
1 parent
e5b9f9a
commit 96780e6
Showing
29 changed files
with
423 additions
and
290 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
43 changes: 43 additions & 0 deletions
43
colossalai/context/process_group_initializer/initializer_model.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
#!/usr/bin/env python | ||
# -*- encoding: utf-8 -*- | ||
|
||
import torch.distributed as dist | ||
|
||
from colossalai.context import Config | ||
from colossalai.registry import DIST_GROUP_INITIALIZER | ||
from .process_group_initializer import ProcessGroupInitializer | ||
from ..parallel_mode import ParallelMode | ||
|
||
|
||
@DIST_GROUP_INITIALIZER.register_module | ||
class Initializer_Model(ProcessGroupInitializer): | ||
'''A ProcessGroupInitializer for model parallelism (model parallel group contains pipeline and tensor parallel groups). | ||
''' | ||
|
||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
self.model_parallel_size = self.tensor_parallel_size * self.pipeline_parallel_size | ||
self.num_group = self.world_size // self.model_parallel_size | ||
|
||
def init_dist_group(self): | ||
'''Initialize 1D tensor parallel groups, and assign local_ranks and groups to each gpu. | ||
:return: (local_rank, group_world_size, process_group, ranks_in_group, mode) | ||
:rtype: tuple | ||
''' | ||
local_rank = None | ||
ranks_in_group = None | ||
process_group = None | ||
group_world_size = None | ||
mode = ParallelMode.MODEL | ||
|
||
for i in range(self.num_group): | ||
ranks = [i * self.model_parallel_size + j for j in range(self.model_parallel_size)] | ||
group = dist.new_group(ranks) | ||
|
||
if self.rank in ranks: | ||
local_rank = ranks.index(self.rank) | ||
group_world_size = len(ranks) | ||
process_group = group | ||
ranks_in_group = ranks | ||
return local_rank, group_world_size, process_group, ranks_in_group, mode |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
from ._base_gradient_handler import BaseGradientHandler | ||
from ._data_parallel_gradient_handler import DataParallelGradientHandler | ||
from ._zero_gradient_handler import ZeROGradientHandler | ||
from ._pipeline_parallel_gradient_handler import PipelineSharedModuleGradientHandler | ||
|
||
__all__ = ['BaseGradientHandler', 'DataParallelGradientHandler', 'ZeROGradientHandler'] | ||
__all__ = ['BaseGradientHandler', 'DataParallelGradientHandler', | ||
'ZeROGradientHandler', 'PipelineSharedModuleGradientHandler'] |
41 changes: 41 additions & 0 deletions
41
colossalai/engine/gradient_handler/_pipeline_parallel_gradient_handler.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
#!/usr/bin/env python | ||
|
||
import torch.distributed as dist | ||
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors | ||
|
||
from colossalai.core import global_context as gpc | ||
from colossalai.registry import GRADIENT_HANDLER | ||
from ._base_gradient_handler import BaseGradientHandler | ||
from collections import defaultdict | ||
|
||
|
||
@GRADIENT_HANDLER.register_module | ||
class PipelineSharedModuleGradientHandler(BaseGradientHandler): | ||
"""A helper class to handle all-reduce operations in sub parallel groups. | ||
A all-reduce collective communication will be operated in | ||
:func:`handle_gradient` among all sub pipeline parallel groups. | ||
For better performance, it bucketizes the gradients of all parameters that are | ||
the same type to improve the efficiency of communication. | ||
""" | ||
|
||
def handle_gradient(self): | ||
"""A method running a all-reduce operation in sub pipeline parallel groups. | ||
""" | ||
if gpc.pipeline_parallel_size > 1: | ||
# bucketize and all-reduce | ||
buckets = defaultdict(lambda: defaultdict(list)) | ||
# Pack the buckets. | ||
for param in self._model.parameters(): | ||
group = getattr(param, 'pipeline_shared_module_pg', None) | ||
if param.requires_grad and param.grad is not None and group is not None: | ||
tp = param.data.type() | ||
buckets[group][tp].append(param) | ||
|
||
# For each bucket, all-reduce and copy all-reduced grads. | ||
for group, group_buckets in buckets.items(): | ||
for tp, bucket in group_buckets.items(): | ||
grads = [param.grad.data for param in bucket] | ||
coalesced = _flatten_dense_tensors(grads) | ||
dist.all_reduce(coalesced, op=dist.ReduceOp.SUM, group=group) | ||
for buf, synced in zip(grads, _unflatten_dense_tensors(coalesced, grads)): | ||
buf.copy_(synced) |
Oops, something went wrong.