Skip to content

Commit

Permalink
[NFC] polish code format
Browse files Browse the repository at this point in the history
[NFC] polish code format
  • Loading branch information
binmakeswell authored Feb 15, 2023
2 parents 1dc003c + 93b788b commit 30aee9c
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 213 deletions.
5 changes: 3 additions & 2 deletions colossalai/cli/cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import click
from .launcher import run
from .check import check

from .benchmark import benchmark
from .check import check
from .launcher import run


class Arguments():
Expand Down
4 changes: 3 additions & 1 deletion colossalai/cli/launcher/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import click
from .run import launch_multi_processes

from colossalai.context import Config

from .run import launch_multi_processes


@click.command(help="Launch distributed training on a single node or multiple nodes",
context_settings=dict(ignore_unknown_options=True))
Expand Down
258 changes: 129 additions & 129 deletions colossalai/context/moe_context.py
Original file line number Diff line number Diff line change
@@ -1,129 +1,129 @@
import torch
import torch.distributed as dist

from colossalai.context.parallel_mode import ParallelMode
from colossalai.context.singleton_meta import SingletonMeta
from colossalai.tensor import ProcessGroup

from typing import Tuple


def _check_sanity():
from colossalai.core import global_context as gpc
if gpc.tensor_parallel_size > 1 or gpc.pipeline_parallel_size > 1:
raise NotImplementedError("Moe is not compatible with tensor or "
"pipeline parallel at present.")


class MoeParallelInfo:
"""Moe parallelism information, storing parallel sizes and groups.
"""

def __init__(self, ep_size: int, dp_size: int):
_check_sanity()
self.ep_size = ep_size
self.dp_size = dp_size
self.pg = ProcessGroup(tp_degree=ep_size, dp_degree=dp_size)
self.ep_group = self.pg.tp_process_group()
self.dp_group = self.pg.dp_process_group()


class MoeContext(metaclass=SingletonMeta):
"""MoE parallel context manager. This class manages different
parallel groups in MoE context and MoE loss in training.
"""

def __init__(self):
self.world_size = 1
# Users may want to set maximum expert parallel size smaller than the world size
# since very low bandwidth across nodes may constrain the performance of MoE
# When we have a maximum expert parallel size, we have a minimum data parallel size naturally
self.max_ep_size = 1
self.min_dp_size = 1
self.aux_loss = None
self.use_kernel_optim = True

self.has_setup = False
self._parallel_info_dict = dict()

@property
def parallel_info_dict(self):
return self._parallel_info_dict

@property
def is_initialized(self):
return self.has_setup

def setup(self, seed: int, use_kernel_optim: bool = True):
assert not self.is_initialized, "MoE distributed context shouldn't be set up again"
_check_sanity()
assert torch.cuda.is_available(), "MoE requires to enable CUDA first"

self.world_size = dist.get_world_size()

from colossalai.core import global_context as gpc
self.max_ep_size = gpc.config.get('max_ep_size', self.world_size)
assert self.world_size % self.max_ep_size == 0, \
"Maximum epxert parallel size must be a factor of the number of GPUs"
self.min_dp_size = self.world_size // self.max_ep_size

# Enabling kernel optimization may raise error in some cases
# Users can close kernel optimization manually
self.use_kernel_optim = use_kernel_optim

from .random import moe_set_seed
moe_set_seed(seed)
self.has_setup = True

def get_info(self, num_experts: int) -> Tuple[int, MoeParallelInfo]:
"""Calculate the Data Parallel Group and Expert Parallel Group.
Parameters
----------
num_experts : int
The number experts
Returns
-------
int, MoeParallelInfo
number of local experts, the MoeParallelInfo of the current ep_size
"""

gt_flag = num_experts % self.max_ep_size == 0 # check whether num_experts is greater
lt_flag = self.max_ep_size % num_experts == 0 # check whether num_experts is less

assert gt_flag or lt_flag, "Automatic experts placement dose not not support expert number" \
" is not a multiple of ep size or vice versa."

# If the number of experts is greater than maximum expert parallel size. a.k.a ep_size,
# there are multiple experts in each GPU and each GPU has different experts
# So it's data parallel size is 1
# Otherwise, there is only one expert in each GPU
# The data parallel size should be calculated
dp_size = 1 if gt_flag else self.max_ep_size // num_experts
ep_size = self.max_ep_size // dp_size

# Calculate the number of experts for each GPU
num_local_experts = 1 if lt_flag else num_experts // self.max_ep_size

# Don't forget to multiply minimum data parallel size
dp_size *= self.min_dp_size
if not (ep_size in self.parallel_info_dict):
self.parallel_info_dict[ep_size] = MoeParallelInfo(ep_size, dp_size)

return num_local_experts, self.parallel_info_dict[ep_size]

def set_kernel_not_use(self):
self.use_kernel_optim = False

def reset_loss(self):
self.aux_loss = 0

def add_loss(self, loss):
self.aux_loss += loss

def get_loss(self):
return self.aux_loss


MOE_CONTEXT = MoeContext()
from typing import Tuple

import torch
import torch.distributed as dist

from colossalai.context.parallel_mode import ParallelMode
from colossalai.context.singleton_meta import SingletonMeta
from colossalai.tensor import ProcessGroup


def _check_sanity():
from colossalai.core import global_context as gpc
if gpc.tensor_parallel_size > 1 or gpc.pipeline_parallel_size > 1:
raise NotImplementedError("Moe is not compatible with tensor or "
"pipeline parallel at present.")


class MoeParallelInfo:
"""Moe parallelism information, storing parallel sizes and groups.
"""

def __init__(self, ep_size: int, dp_size: int):
_check_sanity()
self.ep_size = ep_size
self.dp_size = dp_size
self.pg = ProcessGroup(tp_degree=ep_size, dp_degree=dp_size)
self.ep_group = self.pg.tp_process_group()
self.dp_group = self.pg.dp_process_group()


class MoeContext(metaclass=SingletonMeta):
"""MoE parallel context manager. This class manages different
parallel groups in MoE context and MoE loss in training.
"""

def __init__(self):
self.world_size = 1
# Users may want to set maximum expert parallel size smaller than the world size
# since very low bandwidth across nodes may constrain the performance of MoE
# When we have a maximum expert parallel size, we have a minimum data parallel size naturally
self.max_ep_size = 1
self.min_dp_size = 1
self.aux_loss = None
self.use_kernel_optim = True

self.has_setup = False
self._parallel_info_dict = dict()

@property
def parallel_info_dict(self):
return self._parallel_info_dict

@property
def is_initialized(self):
return self.has_setup

def setup(self, seed: int, use_kernel_optim: bool = True):
assert not self.is_initialized, "MoE distributed context shouldn't be set up again"
_check_sanity()
assert torch.cuda.is_available(), "MoE requires to enable CUDA first"

self.world_size = dist.get_world_size()

from colossalai.core import global_context as gpc
self.max_ep_size = gpc.config.get('max_ep_size', self.world_size)
assert self.world_size % self.max_ep_size == 0, \
"Maximum epxert parallel size must be a factor of the number of GPUs"
self.min_dp_size = self.world_size // self.max_ep_size

# Enabling kernel optimization may raise error in some cases
# Users can close kernel optimization manually
self.use_kernel_optim = use_kernel_optim

from .random import moe_set_seed
moe_set_seed(seed)
self.has_setup = True

def get_info(self, num_experts: int) -> Tuple[int, MoeParallelInfo]:
"""Calculate the Data Parallel Group and Expert Parallel Group.
Parameters
----------
num_experts : int
The number experts
Returns
-------
int, MoeParallelInfo
number of local experts, the MoeParallelInfo of the current ep_size
"""

gt_flag = num_experts % self.max_ep_size == 0 # check whether num_experts is greater
lt_flag = self.max_ep_size % num_experts == 0 # check whether num_experts is less

assert gt_flag or lt_flag, "Automatic experts placement dose not not support expert number" \
" is not a multiple of ep size or vice versa."

# If the number of experts is greater than maximum expert parallel size. a.k.a ep_size,
# there are multiple experts in each GPU and each GPU has different experts
# So it's data parallel size is 1
# Otherwise, there is only one expert in each GPU
# The data parallel size should be calculated
dp_size = 1 if gt_flag else self.max_ep_size // num_experts
ep_size = self.max_ep_size // dp_size

# Calculate the number of experts for each GPU
num_local_experts = 1 if lt_flag else num_experts // self.max_ep_size

# Don't forget to multiply minimum data parallel size
dp_size *= self.min_dp_size
if not (ep_size in self.parallel_info_dict):
self.parallel_info_dict[ep_size] = MoeParallelInfo(ep_size, dp_size)

return num_local_experts, self.parallel_info_dict[ep_size]

def set_kernel_not_use(self):
self.use_kernel_optim = False

def reset_loss(self):
self.aux_loss = 0

def add_loss(self, loss):
self.aux_loss += loss

def get_loss(self):
return self.aux_loss


MOE_CONTEXT = MoeContext()
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

import torch.distributed as dist

from colossalai.global_variables import tensor_parallel_env as env
from colossalai.registry import DIST_GROUP_INITIALIZER
from .process_group_initializer import ProcessGroupInitializer

from ..parallel_mode import ParallelMode
from colossalai.global_variables import tensor_parallel_env as env
from .process_group_initializer import ProcessGroupInitializer


def _check_summa_env_var(summa_dim):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from torch import distributed as dist

from colossalai.registry import DIST_GROUP_INITIALIZER
from .process_group_initializer import ProcessGroupInitializer

from ..parallel_mode import ParallelMode
from .process_group_initializer import ProcessGroupInitializer


@DIST_GROUP_INITIALIZER.register_module
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import torch.distributed as dist

from colossalai.registry import DIST_GROUP_INITIALIZER

from ..parallel_mode import ParallelMode
from .initializer_tensor import Initializer_Tensor
from .process_group_initializer import ProcessGroupInitializer
from ..parallel_mode import ParallelMode


@DIST_GROUP_INITIALIZER.register_module
Expand Down
59 changes: 30 additions & 29 deletions colossalai/engine/gradient_handler/utils.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
import torch.distributed as dist
import torch.nn as nn
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
from typing import Iterable


def bucket_allreduce(param_list: Iterable[nn.Parameter], group=None):
# get communication world size
comm_size = dist.get_world_size(group)
# bucketize and all-reduce
buckets = {}
# Pack the buckets.
for param in param_list:
if param.requires_grad and param.grad is not None:
tp = param.data.type()
if tp not in buckets:
buckets[tp] = []
buckets[tp].append(param)

# For each bucket, all-reduce and copy all-reduced grads.
for tp in buckets:
bucket = buckets[tp]
grads = [param.grad.data for param in bucket]
coalesced = _flatten_dense_tensors(grads)
coalesced /= comm_size

dist.all_reduce(coalesced, group=group)
for buf, synced in zip(grads, _unflatten_dense_tensors(coalesced, grads)):
buf.copy_(synced)
from typing import Iterable

import torch.distributed as dist
import torch.nn as nn
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors


def bucket_allreduce(param_list: Iterable[nn.Parameter], group=None):
# get communication world size
comm_size = dist.get_world_size(group)
# bucketize and all-reduce
buckets = {}
# Pack the buckets.
for param in param_list:
if param.requires_grad and param.grad is not None:
tp = param.data.type()
if tp not in buckets:
buckets[tp] = []
buckets[tp].append(param)

# For each bucket, all-reduce and copy all-reduced grads.
for tp in buckets:
bucket = buckets[tp]
grads = [param.grad.data for param in bucket]
coalesced = _flatten_dense_tensors(grads)
coalesced /= comm_size

dist.all_reduce(coalesced, group=group)
for buf, synced in zip(grads, _unflatten_dense_tensors(coalesced, grads)):
buf.copy_(synced)
Loading

0 comments on commit 30aee9c

Please sign in to comment.