Skip to content

Commit

Permalink
Fixed docstring in colossalai (hpcaitech#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
1SAA authored Jan 21, 2022
1 parent e2089c5 commit 0f8c7f9
Show file tree
Hide file tree
Showing 77 changed files with 985 additions and 605 deletions.
16 changes: 9 additions & 7 deletions colossalai/amp/apex_amp/apex_amp.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,25 @@


class ApexAMPOptimizer(ColossalaiOptimizer):
''' A wrapper class for APEX optimizer and it implements apex-specific backward and clip_grad_norm
""" A wrapper class for APEX optimizer and it implements apex-specific backward and clip_grad_norm
methods
'''
"""

def backward(self, loss: Tensor):
"""
:param loss: loss computed by a loss function
"""Backward pass to get all gradients
:param loss: Loss computed by a loss function
:type loss: torch.Tensor
"""
with apex_amp.scale_loss(loss, self.optim) as scaled_loss:
scaled_loss.backward()

def clip_grad_norm(self, model: nn.Module, max_norm: float):
"""
:param model: your model object
"""Clip gradients' norm
:param model: Your model object
:type model: torch.nn.Module
:param max_norm: the max norm value for gradient clipping
:param max_norm: The max norm value for gradient clipping
:type max_norm: float
"""
if max_norm > 0:
Expand Down
8 changes: 6 additions & 2 deletions colossalai/amp/naive_amp/naive_amp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
class NaiveAMPOptimizer(ColossalaiOptimizer):
"""A wrapper class for optimizer to cast all parameters to fp16
:param optim: a normal optimizer like Adam or SGD
:param optim: A normal optimizer like Adam or SGD
:param args: Args used to initialize FP16 optimizer
:param kwargs: Kwargs used to initialize FP16 optimizer
:type optim: torch.optim.Optimizer
"""

Expand All @@ -24,7 +27,8 @@ def __init__(self, optim: Optimizer, *args, **kwargs):
super().__init__(optim)

def backward(self, loss: Tensor):
"""backward with gradient scaler
"""Backward with gradient scaler
:param loss: loss computed by a loss function
:type loss: torch.Tensor
"""
Expand Down
33 changes: 14 additions & 19 deletions colossalai/amp/torch_amp/torch_amp.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,37 @@
class TorchAMPOptimizer(ColossalaiOptimizer):
"""A wrapper class which integrate pytorch amp with an optimizer
:param optim: a normal optimizer like Adam or SGD
:type optim: torch.optim.Optimizer
:param init_scale: Initial scale factor
:type init_scale: float, optional, default=2.**16
:param growth_factor: Factor by which the scale is multiplied during :meth:`update` if no inf/NaN gradients occur for ``growth_interval`` consecutive iterations.
:type growth_factor: float, optional, default=2.0
:param backoff_factor: Factor by which the scale is multiplied during :meth:`update` if inf/NaN gradients occur in an iteration.
:type backoff_factor: float, optional, default=0.5
:param growth_interval: Number of consecutive iterations without inf/NaN gradients that must occur for the scale to be multiplied by ``growth_factor``.
:type growth_interval: int, optional, default=2000
:param enabled: If ``False``, disables gradient scaling. :meth:`step` simply invokes the underlying ``optimizer.step()``, and other methods become no-ops.
:type enabled: bool, optional, default=True
:param optim: A normal optimizer like Adam or SGD
:param args: Args used to initialize gradient scaler
:param kwargs: Kwargs used to initialize gradient scaler
:type optim: torch.optim.Optimizer
"""

def __init__(self, optim: Optimizer, *args, **kwargs):
super().__init__(optim)
self.scaler = GradScaler(*args, **kwargs)

def backward(self, loss: Tensor):
"""backward with torch amp gradient scaler
:param loss: loss computed by a loss function
"""Backward with torch amp gradient scaler
:param loss: Loss computed by a loss function
:type loss: torch.Tensor
"""
self.scaler.scale(loss).backward()

def step(self):
"""update the parameters of the model
"""Update the parameters of the model
"""
self.scaler.step(self.optim)
self.scaler.update()

def clip_grad_norm(self, model: nn.Module, max_norm: float):
"""apply gradient clipping to the model parameters
:param model: your model object
"""Apply gradient clipping to the model parameters
:param model: Your model object
:type model: torch.nn.Module
:param max_norm: max norm value for gradient clipping
:param max_norm: Max norm value for gradient clipping
:type max_norm: float
"""
if max_norm > 0.0:
Expand All @@ -76,7 +70,8 @@ def forward(self, *args, **kwargs):

class TorchAMPLoss(nn.Module):
"""A wrapper class for a criterion object which computes the loss in mixed-precision context
:param loss: a loss function object
:param loss: A loss function object
:type loss: torch.nn.modules.loss._Loss
"""

Expand Down
26 changes: 13 additions & 13 deletions colossalai/builder/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,16 @@ def build_pipeline_model_from_cfg(config, num_chunks: int = 1, partition_method:
...
)
:param config: configuration of the model
:param config: Configuration of the model
:type config: dict
:param num_chunks: the number of chunks you want to have on the current stage. This value should be 1
:param num_chunks: The number of chunks you want to have on the current stage. This value should be 1
in most cases unless you are using virutal pipeline parallelism.
:type num_chunks: int
:param partition_method: this parameter determines how you want to split your model layers into stages,
:type num_chunks: int, optional
:param partition_method: This parameter determines how you want to split your model layers into stages,
you can set it as 'layer' or 'parameter'
:type partition_method: str
:param verbose: whether to print the logs
:type verbose: bool
:type partition_method: str, optional
:param verbose: Whether to print the logs
:type verbose: bool, optional
"""
ori_model = build_model(config)
layers = ori_model.layers_cfg
Expand Down Expand Up @@ -240,13 +240,13 @@ def build_pipeline_model(layers: nn.Sequential, num_chunks: int = 1, verbose: bo
"""An intializer to split the model into different stages for pipeline parallelism.
Note that `layer` must be `torch.nn.Sequential`.
:param layers: layers of model
:type config: `torch.nn.Sequential`
:param num_chunks: the number of chunks you want to have on the current stage. This value should be 1
:param layers: Layers of model
:type layers: `torch.nn.Sequential`
:param num_chunks: The number of chunks you want to have on the current stage. This value should be 1
in most cases unless you are using virutal pipeline parallelism.
:type num_chunks: int
:param verbose: whether to print the logs
:type verbose: bool
:type num_chunks: int, optional
:param verbose: Whether to print the logs
:type verbose: bool, optional
"""
pipeline_parallel_size = gpc.get_world_size(ParallelMode.PIPELINE)
pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE)
Expand Down
10 changes: 10 additions & 0 deletions colossalai/communication/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ def all_gather(tensor: Tensor, dim: int, parallel_mode: ParallelMode, async_op:
:param tensor: Tensor to be gathered
:param dim: The dimension concatenating in
:param parallel_mode: Parallel group mode used in this communication
:param async_op: Whether operations are asynchronous
:type tensor: :class:`torch.Tensor`
:type dim: int
:type parallel_mode: :class:`colossalai.context.ParallelMode`
:type async_op: bool, optional
:return: The tensor generated by all-gather
:rtype: :class:`torch.Tensor`
"""
Expand Down Expand Up @@ -56,9 +60,15 @@ def reduce_scatter(tensor: Tensor,
:param tensor: Tensor to be reduced and scattered
:param dim: The dimension scattering in
:param parallel_mode: Parallel group mode used in this communication
:param op: The type of reduce operation
:param async_op: Whether operations are asynchronous
:type tensor: :class:`torch.Tensor`
:type dim: int
:type parallel_mode: :class:`colossalai.context.ParallelMode`
:type op: ReduceOp, optional
:type async_op: bool, optional
:return: The tensor generated by reduce-scatter
:rtype: :class:`Tensor`
"""
Expand Down
21 changes: 19 additions & 2 deletions colossalai/communication/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,17 @@ def recv_tensor_meta(tensor_shape, prev_rank=None):


def split_tensor_into_1d_equal_chunks(tensor, new_buffer=False):
"""Break a tensor into equal 1D chunks."""
"""Break a tensor into equal 1D chunks.
:param tensor: Tensor to be splitted before communication
:param new_buffer: Whether uses a new buffer to store sliced tensor
:type tensor: torch.Tensor
:type new_buffer: bool, optional
:return splitted_tensor: The splitted tensor
:rtype splitted_tensor: torch.Tensor
"""
partition_size = torch.numel(tensor) // gpc.get_world_size(ParallelMode.PARALLEL_1D)
start_index = partition_size * gpc.get_local_rank(ParallelMode.PARALLEL_1D)
end_index = start_index + partition_size
Expand All @@ -80,7 +90,14 @@ def split_tensor_into_1d_equal_chunks(tensor, new_buffer=False):


def gather_split_1d_tensor(tensor):
"""Opposite of above function, gather values from model parallel ranks."""
"""Opposite of above function, gather values from model parallel ranks.
:param tensor: Tensor to be gathered after communication
:type tensor: torch.Tensor
:return gathered: The gathered tensor
:rtype gathered: torch.Tensor
"""
world_size = gpc.get_world_size(ParallelMode.PARALLEL_1D)
numel = torch.numel(tensor)
numel_gathered = world_size * numel
Expand Down
3 changes: 2 additions & 1 deletion colossalai/context/parallel_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ def init_global_dist(self,
port: int
):
"""Initializes the global distributed environment
:param rank: rank for the default process group
:type rank: int
:param world_size: world size of the default process group
Expand Down Expand Up @@ -462,7 +463,7 @@ def set_device(self, device_ordinal: int = None):
"""Sets distributed processes to be bound to devices.
:param device_ordinal: the device id to be bound to
:type device_ordinal: int
:type device_ordinal: int, optional
"""
global_rank = self.get_global_rank()
if device_ordinal is None:
Expand Down
13 changes: 8 additions & 5 deletions colossalai/context/process_group_initializer/initializer_1d.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@

@DIST_GROUP_INITIALIZER.register_module
class Initializer_1D(ProcessGroupInitializer):
'''A ProcessGroupInitializer for 1d tensor parallelism.
'''
"""A ProcessGroupInitializer for 1d tensor parallelism.
:param args: Args used to initialize ProcessGroupInitializer
:param kwargs: Kwargs used to initialize ProcessGroupInitializer
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num_group = self.world_size // self.tensor_parallel_size

def init_dist_group(self):
'''Initialize 1D tensor parallel groups, and assign local_ranks and groups to each gpu.
"""Initialize 1D tensor parallel groups, and assign local_ranks and groups to each gpu.
:return: (local_rank, group_world_size, process_group, ranks_in_group, mode)
:rtype: tuple
'''
:rtype: Tuple
"""
local_rank = None
ranks_in_group = None
process_group = None
Expand Down
57 changes: 37 additions & 20 deletions colossalai/context/process_group_initializer/initializer_2d.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,28 @@ def _check_summa_env_var(summa_dim):


class Initializer_2D_Row(ProcessGroupInitializer):
'''2d tensor parallel initialization among rows.
'''
"""2d tensor parallel initialization among rows.
:param num_group: The number of all tensor groups
:param summa_dim: The dimension of SUMMA
:param args: Args used to initialize base class
:param kwargs: Kwargs used to initialize base class
:type num_group: int
:type summa_dim: int
"""

def __init__(self, num_group, summa_dim, *args, **kwargs):
super(Initializer_2D_Row, self).__init__(*args, **kwargs)
self.num_group = num_group
self.summa_dim = summa_dim

def init_dist_group(self):
'''Initialize 2D tensor row parallel groups, and assign local_ranks and groups to each gpu.
"""Initialize 2D tensor row parallel groups, and assign local_ranks and groups to each gpu.
:return: 2D tensor row parallelism's information
:rtype: tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
:return: 2D tensor row parallelism's information
:rtype: Tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
local_rank = None
ranks_in_group = None
process_group = None
Expand All @@ -58,20 +66,28 @@ def init_dist_group(self):


class Initializer_2D_Col(ProcessGroupInitializer):
'''2d tensor parallel initialization among cols.
'''
"""2d tensor parallel initialization among cols.
:param num_group: The number of all tensor groups
:param summa_dim: The dimension of SUMMA
:param args: Args used to initialize base class
:param kwargs: Kwargs used to initialize base class
:type num_group: int
:type summa_dim: int
"""

def __init__(self, num_group, summa_dim, *args, **kwargs):
super(Initializer_2D_Col, self).__init__(*args, **kwargs)
self.num_group = num_group
self.summa_dim = summa_dim

def init_dist_group(self):
'''Initialize 2D tensor row parallel groups, and assign local_ranks and groups to each gpu.
"""Initialize 2D tensor row parallel groups, and assign local_ranks and groups to each gpu.
:return: 2D tensor col parallelism's information
:rtype: tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
:return: 2D tensor col parallelism's information
:rtype: Tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
local_rank = None
ranks_in_group = None
process_group = None
Expand All @@ -97,6 +113,9 @@ def init_dist_group(self):
class Initializer_2D(ProcessGroupInitializer):
"""
Serve as the single entry point to 2D parallel initialization.
:param args: Args used to initialize ProcessGroupInitializer
:param kwargs: Kwargs used to initialize ProcessGroupInitializer
"""

def __init__(self, *args, **kwargs):
Expand All @@ -112,12 +131,10 @@ def __init__(self, *args, **kwargs):
self.row_initializer = Initializer_2D_Row(self.num_group, self.summa_dim, *args, **kwargs)

def init_dist_group(self):
'''Initialize 2D tensor row and col parallel groups, and assign local_ranks and groups to each gpu.
:return: 2D tensor parallelism's information
:rtype: list of tuples (local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
parallel_setting = []
parallel_setting.append(self.row_initializer.init_dist_group())
parallel_setting.append(self.col_initializer.init_dist_group())
"""Initialize 2D tensor row and col parallel groups, and assign local_ranks and groups to each gpu.
:return: 2D tensor parallelism's information
:rtype: list of Tuples (local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
parallel_setting = [self.row_initializer.init_dist_group(), self.col_initializer.init_dist_group()]
return parallel_setting
Loading

0 comments on commit 0f8c7f9

Please sign in to comment.