From 87a3c5c374c57b8f08928330256864f5e7bda1e4 Mon Sep 17 00:00:00 2001 From: jiaopenglong <44927264+JiaoPL@users.noreply.github.com> Date: Fri, 27 Oct 2023 16:26:55 +0800 Subject: [PATCH] feat(optimizer): zero gradient count (#449) * add zero grad count * fix layer norm with pp * fix layer norm with pp * add zero_grad_profiling option * fix param_metrics is not a tensor --- .../solver/optimizer/hybrid_zero_optim.py | 54 +++- internlm/solver/optimizer/utils.py | 246 +++++++++++++----- internlm/train/training_internlm.py | 39 +-- internlm/utils/parallel.py | 11 +- 4 files changed, 257 insertions(+), 93 deletions(-) diff --git a/internlm/solver/optimizer/hybrid_zero_optim.py b/internlm/solver/optimizer/hybrid_zero_optim.py index c45fb16..2901f81 100644 --- a/internlm/solver/optimizer/hybrid_zero_optim.py +++ b/internlm/solver/optimizer/hybrid_zero_optim.py @@ -34,7 +34,13 @@ from internlm.utils.megatron_timers import megatron_timer as timer from internlm.utils.timeout import llm_timeout from .base_optimizer import BaseOptimizer -from .utils import compute_layer_norm, compute_norm, compute_param_norm +from .utils import ( + compute_layer_norm, + compute_layer_zero_grad_count, + compute_norm, + compute_param_norm, + compute_zero_grad_count, +) inf = math.inf logger = get_logger(__file__) @@ -564,6 +570,29 @@ class HybridZeroOptimizer(BaseOptimizer): ) return total_param_norms + def _count_zero_grads_stage( + self, group_id: int = 0, last_bucket: bool = False, last_stage: bool = False, previous_zero_grad_count=None + ): + params, grads = self._param_store.get_reduced_param_for_compute_norm(group_id=group_id, last_bucket=last_bucket) + + total_zero_grad_count = {} + + if len(params) == 0: + dtype = self.param_groups[group_id]["dtype"] + grads = [self.padding_grad.to(dtype)] + params = [self.padding_tensor.to(dtype)] + + if self._clip_grad_norm > 0: + total_zero_grad_count = compute_zero_grad_count( + grads, + params, + last_stage=last_stage, + previous_zero_grad_count=previous_zero_grad_count, + zero_mode=self._broadcast_parallel_mode[group_id], + is_moe_group=self._is_moe_group(self.optim.param_groups[group_id]), + ) + return total_zero_grad_count + @llm_timeout(func_name="optim_step") def step(self, closure=None): """Performs a single optimization step. @@ -592,10 +621,13 @@ class HybridZeroOptimizer(BaseOptimizer): # compute norm for gradients in the before bucket groups_norms = [] groups_param_norms = [] + group_param_zero_grad_count = [] for group_id in range(self.num_param_groups): groups_norms.append(self._compute_norm_with_stage(group_id=group_id)) if gpc.config.get("grad_norm_profiling", False): groups_param_norms.append(self._compute_param_norm_stage(group_id=group_id)) + if gpc.config.get("zero_grad_profiling", False): + group_param_zero_grad_count.append(self._count_zero_grads_stage(group_id=group_id)) # clear reduced grads # grads in the last bucket is reduced @@ -609,6 +641,8 @@ class HybridZeroOptimizer(BaseOptimizer): # compute norm for gradients in the last bucket total_norms = {} total_param_norms = {} + total_param_zero_grad_count = {} + total_layer_zero_grad_count = {} total_layer_norms = {} for group_id in range(self.num_param_groups): group_name = self.param_groups[group_id]["name"] if "name" in self.param_groups[group_id] else "default" @@ -629,6 +663,17 @@ class HybridZeroOptimizer(BaseOptimizer): total_layer_norms[group_name], total_param_norms[group_name] = compute_layer_norm( param_norms=param_norms, loss_scale=self.loss_scale.item() ) + if gpc.config.get("zero_grad_profiling", False): + zero_grad_count = self._count_zero_grads_stage( + group_id=group_id, + last_bucket=True, + last_stage=True, + previous_zero_grad_count=group_param_zero_grad_count[group_id], + ) + ( + total_layer_zero_grad_count[group_name], + total_param_zero_grad_count[group_name], + ) = compute_layer_zero_grad_count(zero_grad_count) # Need to allreduce(avg) the norms across different ranks because moe params will not be synced # during allreduce @@ -646,8 +691,11 @@ class HybridZeroOptimizer(BaseOptimizer): state, global_norms = self._step(closure=closure, norms=total_norms) if gpc.config.get("grad_norm_profiling", False): - global_norms["layer_norms"] = total_layer_norms - global_norms["param_norms"] = total_param_norms + global_norms["layer_norm"] = total_layer_norms + global_norms["param_norm"] = total_param_norms + if gpc.config.get("zero_grad_profiling", False): + global_norms["layer_zero_grad"] = total_layer_zero_grad_count + global_norms["param_zero_grad"] = total_param_zero_grad_count return state, global_norms diff --git a/internlm/solver/optimizer/utils.py b/internlm/solver/optimizer/utils.py index 982a246..2fb8f57 100644 --- a/internlm/solver/optimizer/utils.py +++ b/internlm/solver/optimizer/utils.py @@ -209,6 +209,15 @@ def calc_lp(grads, norm_type): return norm +def calc_zero_grad(grads): + zero_count = 0 + grad_size = 0 + for grad in grads: + zero_count += (grad == 0).sum().item() + grad_size += grad.numel() + return torch.tensor([zero_count, grad_size]) + + def reduce_grads(gradients, parameters, fine_grained=False): parallel_grads = [] if fine_grained: @@ -336,6 +345,117 @@ def compute_norm( return total_norm +def compute_param_metric( + gradients, + parameters, + metric_type: str, + last_stage=False, + previous_param_metrics=None, + norm_type=2, + zero_mode=ParallelMode.ZERO1, + is_moe_group=False, +): + """Get the metrics of params + Argumemts: + metric_type: (norm | zero_grad) + """ + + enable_cuda_kernels = gradients[0].device.type == "cuda" + total_metrics = {} + param_metrics = {} + param_grads = reduce_grads(gradients, parameters, fine_grained=True) + + if metric_type == "norm": + # Norm parameters. + norm_type = float(norm_type) + + for param_name, grads in param_grads.items(): + if metric_type == "norm": + if norm_type == inf: + param_metric = max(g.data.abs().max() for g in grads) + elif norm_type == 2.0 and enable_cuda_kernels: + param_metric = calc_l2_norm(grads) ** norm_type + else: + param_metric = calc_lp(grads, norm_type) + param_metrics[param_name] = param_metric.item() if torch.is_tensor(param_metric) else param_metric + elif metric_type == "zero_grad": + param_zero_grad_count = calc_zero_grad(grads) + param_metrics[param_name] = param_zero_grad_count + + if last_stage is False: + return param_metrics + + if previous_param_metrics is not None: + for key, value in previous_param_metrics.items(): + if key not in param_metrics: + param_metrics[key] = value + continue + if metric_type == "norm" and norm_type == inf: + param_metrics[key] = max(param_metrics[key], value) + else: + param_metrics[key] += value + + # model parallel + model_parallel_param_metrics = {} + if gpc.is_initialized(ParallelMode.MODEL): + parallel_param_metrics = [None for _ in range(gpc.get_world_size(ParallelMode.MODEL))] + dist.all_gather_object(parallel_param_metrics, param_metrics, group=gpc.get_group(ParallelMode.MODEL)) + for local_param_metric in parallel_param_metrics: + for param_name, param_metric in local_param_metric.items(): + if param_name not in model_parallel_param_metrics: + model_parallel_param_metrics[param_name] = 0.0 + if metric_type == "norm" and norm_type == inf: + model_parallel_param_metrics[param_name] = max( + model_parallel_param_metrics[param_name], param_metric + ) + else: + model_parallel_param_metrics[param_name] += param_metric + + # zero parallel + zero_param_metrics = [None for _ in range(gpc.get_world_size(zero_mode))] + dist.all_gather_object(zero_param_metrics, model_parallel_param_metrics, group=gpc.get_group(zero_mode)) + for local_param_metric in zero_param_metrics: + for param_name, param_metric in local_param_metric.items(): + if param_name not in total_metrics: + total_metrics[param_name] = 0.0 + if metric_type == "norm" and norm_type == inf: + total_metrics[param_name] = max(total_metrics[param_name], param_metric) + else: + total_metrics[param_name] += param_metric + + # moe + if is_moe_group: + pg = gpc.get_group(ParallelMode.EXPERT) + total_metric_values = list(total_metrics.values()) + if isinstance(total_metric_values[0], torch.Tensor): + scaled_param_metric = torch.stack(total_metric_values).to(device=get_current_device()) + else: + scaled_param_metric = torch.cuda.FloatTensor(total_metric_values, device=get_current_device()) + scaled_param_metric = scaled_param_metric / float(gpc.get_world_size(ParallelMode.EXPERT)) + dist.all_reduce(scaled_param_metric, group=pg) + for i, param_name in enumerate(total_metrics.keys()): + total_metrics[param_name] = scaled_param_metric[i] + + # calc zero grad percent + if metric_type == "zero_grad": + for param_name, param_metric in total_metrics.items(): + total_metrics[param_name] = (param_metric[0] / param_metric[1]).item() + + # scale norm + if metric_type == "norm": + for param_name, param_metric in total_metrics.items(): + if torch.is_tensor(param_metric): + param_metric = param_metric.item() + if param_metric in (inf, -inf): + total_metrics[param_name] = -1 + elif math.isnan(param_metric): + total_metrics[param_name] = -2 + else: + total_metrics[param_name] = param_metric + + return total_metrics + + def compute_param_norm( gradients, parameters, @@ -355,80 +475,45 @@ def compute_param_norm( Returns: The norm of the parameters. """ - enable_cuda_kernels = gradients[0].device.type == "cuda" - # Norm parameters. - norm_type = float(norm_type) - total_param_norms = {} - param_grads = reduce_grads(gradients, parameters, fine_grained=True) + return compute_param_metric( + gradients, + parameters, + metric_type="norm", + last_stage=last_stage, + previous_param_metrics=previous_param_norms, + norm_type=norm_type, + zero_mode=zero_mode, + is_moe_group=is_moe_group, + ) - param_norms = {} - for param_name, grads in param_grads.items(): - if norm_type == inf: - param_norm = max(g.data.abs().max() for g in grads) - elif norm_type == 2.0 and enable_cuda_kernels: - param_norm = calc_l2_norm(grads) ** norm_type - else: - param_norm = calc_lp(grads, norm_type) - param_norms[param_name] = param_norm.item() if torch.is_tensor(param_norm) else param_norm - if last_stage is False: - return param_norms +def compute_zero_grad_count( + gradients, + parameters, + last_stage=False, + previous_zero_grad_count=None, + zero_mode=ParallelMode.ZERO1, + is_moe_group=False, +): + """Get the count of zero gradient for each parameters + Arguments: + gradients (Iterable[Tensor]): The gradient value. + parameters (Iterable[Tensor]): The parameter each gradient corresponds to. - if previous_param_norms is not None: - for key, value in previous_param_norms.items(): - if key not in param_norms: - param_norms[key] = value - continue + Returns: + The count of zero gradient for each parameters + """ - if norm_type == inf: - param_norms[key] = max(param_norms[key], value) - else: - param_norms[key] += value - - # model parallel - model_parallel_param_norms = {} - if gpc.is_initialized(ParallelMode.MODEL): - parallel_param_norms = [None for _ in range(gpc.get_world_size(ParallelMode.MODEL))] - dist.all_gather_object(parallel_param_norms, param_norms, group=gpc.get_group(ParallelMode.MODEL)) - for local_param_norm in parallel_param_norms: - for param_name, param_norm in local_param_norm.items(): - if param_name not in model_parallel_param_norms: - model_parallel_param_norms[param_name] = 0.0 - if norm_type == inf: - model_parallel_param_norms[param_name] = max(model_parallel_param_norms[param_name], param_norm) - else: - model_parallel_param_norms[param_name] += param_norm - - # zero parallel - zero_param_norms = [None for _ in range(gpc.get_world_size(zero_mode))] - dist.all_gather_object(zero_param_norms, model_parallel_param_norms, group=gpc.get_group(zero_mode)) - for local_param_norm in zero_param_norms: - for param_name, param_norm in local_param_norm.items(): - if param_name not in total_param_norms: - total_param_norms[param_name] = 0.0 - if norm_type == inf: - total_param_norms[param_name] = max(total_param_norms[param_name], param_norm) - else: - total_param_norms[param_name] += param_norm - - # moe - if is_moe_group: - pg = gpc.get_group(ParallelMode.EXPERT) - scaled_param_norm = torch.cuda.FloatTensor(list(total_param_norms.values()), device=get_current_device()) - scaled_param_norm = scaled_param_norm / float(gpc.get_world_size(ParallelMode.EXPERT)) - dist.all_reduce(scaled_param_norm, group=pg) - for i, param_name in enumerate(total_param_norms.keys()): - total_param_norms[param_name] = scaled_param_norm[i].item() - - # scale - for param_name, param_norm in total_param_norms.items(): - if param_norm in (inf, -inf): - total_param_norms[param_name] = -1 - elif math.isnan(param_norm): - total_param_norms[param_name] = -2 - - return total_param_norms + return compute_param_metric( + gradients, + parameters, + metric_type="zero_grad", + last_stage=last_stage, + previous_param_metrics=previous_zero_grad_count, + zero_mode=zero_mode, + is_moe_group=is_moe_group, + ) def compute_layer_norm(param_norms, loss_scale): @@ -440,20 +525,37 @@ def compute_layer_norm(param_norms, loss_scale): for param_name, param_norm in param_norms.items(): layer_name, param_key = param_name.split("-") - if layer_name not in param_norms_groupby_layer: - param_norms_groupby_layer[layer_name] = {} + if param_key not in param_norms_groupby_layer: + param_norms_groupby_layer[param_key] = {} if layer_name not in layer_norms: layer_norms[layer_name] = 0.0 if param_norm not in (-1, -2): param_norm = param_norm**0.5 / loss_scale - param_norms_groupby_layer[layer_name][param_key] = param_norm + param_norms_groupby_layer[param_key][layer_name] = param_norm layer_norms[layer_name] += param_norm return layer_norms, param_norms_groupby_layer +def compute_layer_zero_grad_count(param_zero_grad_count): + param_zero_grad_count_groupby_layer = {} + layer_zero_grad_count = {} + + for param_name, zero_grad_count in param_zero_grad_count.items(): + layer_name, param_key = param_name.split("-") + if param_key not in param_zero_grad_count_groupby_layer: + param_zero_grad_count_groupby_layer[param_key] = {} + if layer_name not in layer_zero_grad_count: + layer_zero_grad_count[layer_name] = 0.0 + + param_zero_grad_count_groupby_layer[param_key][layer_name] = zero_grad_count + layer_zero_grad_count[layer_name] += zero_grad_count + + return layer_zero_grad_count, param_zero_grad_count_groupby_layer + + class BaseGradScaler(ABC): """A base class for the gradient scaler. diff --git a/internlm/train/training_internlm.py b/internlm/train/training_internlm.py index ab1746e..4f5f1bb 100644 --- a/internlm/train/training_internlm.py +++ b/internlm/train/training_internlm.py @@ -1,7 +1,6 @@ #!/usr/bin/env python # -*- encoding: utf-8 -*- -import copy import functools import time from functools import partial @@ -159,7 +158,7 @@ def initialize_optimizer(model: Union[nn.Module, nn.ModuleList]): Returns: A tuple of (optimizer, beta2_scheduler, lr_scheduler). """ - if gpc.config.get("grad_norm_profiling", False): + if gpc.config.get("grad_norm_profiling", False) or gpc.config.get("zero_grad_profiling", False): # set the layer name as an attribute of the model parameters set_model_params_layer_name(model) @@ -527,20 +526,28 @@ def record_current_batch_training_metrics( for key, value in acc_perplex.items(): infos[key] = value - if gpc.config.get("grad_norm_profiling", False): - layer_norms = copy.deepcopy(grad_norm["layer_norms"]) - param_norms = copy.deepcopy(grad_norm["param_norms"]) - for group_name, value in layer_norms.items(): - if value: - title = f"laye_norm_group_{group_name}" - writer.add_scalars(key=title, value=value, step=train_state.step_count) - for group_name, layer_group in param_norms.items(): - if layer_group: - for layer_name, param_group in layer_group.items(): - title = f"param_norm_{layer_name}_{group_name}" - writer.add_scalars(key=title, value=param_group, step=train_state.step_count) - del grad_norm["layer_norms"] - del grad_norm["param_norms"] + if gpc.config.get("grad_norm_profiling", False) or gpc.config.get("zero_grad_profiling", False): + layer_metrics = ["layer_norm", "layer_zero_grad"] + param_metrics = ["param_norm", "param_zero_grad"] + + for layer_metric_name in layer_metrics: + layer_metric = grad_norm.get(layer_metric_name, {}) + if layer_metric: + for group_name, value in layer_metric.items(): + if value: + title = f"{layer_metric_name}/{group_name}" + writer.add_scalars(key=title, value=value, step=train_state.step_count) + del grad_norm[layer_metric_name] + + for param_metric_name in param_metrics: + param_metric = grad_norm.get(param_metric_name, {}) + if param_metric: + for group_name, layer_group in param_metric.items(): + if layer_group: + for param_name, param_group in layer_group.items(): + title = f"{param_name}/{group_name}_{param_metric_name}" + writer.add_scalars(key=title, value=param_group, step=train_state.step_count) + del grad_norm[param_metric_name] line = "" for key, value in infos.items(): diff --git a/internlm/utils/parallel.py b/internlm/utils/parallel.py index 6e5384f..9b70fc8 100644 --- a/internlm/utils/parallel.py +++ b/internlm/utils/parallel.py @@ -7,6 +7,7 @@ from torch import nn from internlm.core.context import IS_TENSOR_PARALLEL, ParallelMode from internlm.core.context import global_context as gpc from internlm.core.naive_amp import NaiveAMPModel +from internlm.solver.pipeline_utils import partition_uniform def is_model_parallel_parameter(p): @@ -70,18 +71,24 @@ def set_model_params_layer_name(model): Args: model (:class:`torch.nn.Module`): A pyTorch model on whose parameters you check the consistency. """ + pipeline_size = gpc.get_world_size(ParallelMode.PIPELINE) + pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE) + all_parts = partition_uniform(gpc.config.model.num_layers, pipeline_size, gpc.config.model.num_chunks) + parts = all_parts[pipeline_rank] + if not isinstance(model, nn.ModuleList): model = [model] - for _chunk in model: + for chunk_idx, _chunk in enumerate(model): if isinstance(_chunk, NaiveAMPModel): _chunk = _chunk.model + chunk_start = parts[chunk_idx][0] # Create a unique layer name based on the block's class name and index for _, children in _chunk.named_children(): if isinstance(children, nn.ModuleList): for idx, block in enumerate(children): for param_name, param in block.named_parameters(): - layer_name = f"{block.__class__.__name__}Block{idx}" + layer_name = f"{block.__class__.__name__}Block{idx + chunk_start}" layer_param_name = f"{layer_name}-{param_name}" param.__setattr__("layer_name", layer_name) param.__setattr__("param_name", layer_param_name)