From a94f429a672d8a18fa09d9e54f3f6e1218437d35 Mon Sep 17 00:00:00 2001 From: JiaoPL Date: Thu, 12 Oct 2023 21:25:30 +0800 Subject: [PATCH] compute layer norms and replace total_norm with it --- .../solver/optimizer/hybrid_zero_optim.py | 29 ++-- internlm/solver/optimizer/utils.py | 132 +++++++++++------- 2 files changed, 92 insertions(+), 69 deletions(-) diff --git a/internlm/solver/optimizer/hybrid_zero_optim.py b/internlm/solver/optimizer/hybrid_zero_optim.py index 97004eb..d60d1a0 100644 --- a/internlm/solver/optimizer/hybrid_zero_optim.py +++ b/internlm/solver/optimizer/hybrid_zero_optim.py @@ -494,11 +494,7 @@ class HybridZeroOptimizer(BaseOptimizer): # Gradients may not be fully synchronized here. def _compute_norm_with_stage( - self, - group_id: int = 0, - last_bucket: bool = False, - last_stage: bool = False, - previous_norm=None, + self, group_id: int = 0, last_bucket: bool = False, last_stage: bool = False, previous_layer_norms=None ): # compute norm for gradients that have been reduced params, grads = self._param_store.get_reduced_param_for_compute_norm(group_id=group_id, last_bucket=last_bucket) @@ -507,18 +503,18 @@ class HybridZeroOptimizer(BaseOptimizer): grads = [self.padding_grad.to(dtype)] params = [self.padding_tensor.to(dtype)] - norm = 0 + layer_norm = None if self._clip_grad_norm > 0: # this norm is before scaling, it will be very large - norm = compute_norm( + layer_norm = compute_norm( gradients=grads, parameters=params, last_stage=last_stage, - previous_norm=previous_norm, + previous_layer_norms=previous_layer_norms, zero_mode=self._broadcast_parallel_mode[group_id], ) - return norm + return layer_norm @llm_timeout(func_name="optim_step") def step(self, closure=None): @@ -546,10 +542,10 @@ class HybridZeroOptimizer(BaseOptimizer): self._reduce_grads_stored_in_bucket(self._bucket_store[group_id], reduce_rank=None, last_bucket=True) # compute norm for gradients in the before bucket - groups_norms = [] + groups_layer_norms = [] for group_id in range(self.num_param_groups): - groups_norms.append(self._compute_norm_with_stage(group_id=group_id)) - + layer_norm = self._compute_norm_with_stage(group_id=group_id) + groups_layer_norms.append(layer_norm) # clear reduced grads # grads in the last bucket is reduced for bucket in self._bucket_in_progress: @@ -561,15 +557,14 @@ class HybridZeroOptimizer(BaseOptimizer): # compute norm for gradients in the last bucket total_norms = {} + total_layernorms = {} for group_id in range(self.num_param_groups): group_name = self.param_groups[group_id]["name"] if "name" in self.param_groups[group_id] else "default" group_name = f"{group_id}_{group_name}" - total_norms[group_name] = self._compute_norm_with_stage( - group_id=group_id, - last_bucket=True, - last_stage=True, - previous_norm=groups_norms[group_id], + total_layernorms[group_name] = self._compute_norm_with_stage( + group_id=group_id, last_bucket=True, last_stage=True, previous_layer_norms=groups_layer_norms[group_id] ) + total_norms[group_name] = sum(total_layernorms[group_name].values()) # Need to allreduce(avg) the norms across different ranks because moe params will not be synced # during allreduce diff --git a/internlm/solver/optimizer/utils.py b/internlm/solver/optimizer/utils.py index f4816a7..8775c5f 100644 --- a/internlm/solver/optimizer/utils.py +++ b/internlm/solver/optimizer/utils.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- encoding: utf-8 -*- +import copy import math from abc import ABC, abstractmethod from collections import OrderedDict @@ -15,7 +16,7 @@ from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors from internlm.core.context import ParallelMode from internlm.core.context import global_context as gpc from internlm.core.naive_amp import NaiveAMPModel -from internlm.utils.common import get_tensor_norm, move_norm_to_cuda +from internlm.utils.common import move_norm_to_cuda from internlm.utils.logger import get_logger from internlm.utils.parallel import is_model_parallel_parameter @@ -31,6 +32,7 @@ except (ModuleNotFoundError, ImportError): APEX_AVAILABLE = False inf = math.inf +global_layer_norms = {"unknown": 0.0, "embedding": 0.0, "norm": 0.0, "head": 0.0} def flatten(input_): @@ -210,7 +212,7 @@ def calc_lp(grads, norm_type): def compute_norm( - gradients, parameters, last_stage=False, previous_norm=None, norm_type=2, zero_mode=ParallelMode.ZERO1 + gradients, parameters, last_stage=False, previous_layer_norms=None, norm_type=2, zero_mode=ParallelMode.ZERO1 ): """Get the norm Arguments: @@ -220,42 +222,60 @@ def compute_norm( infinity norm. Returns: - Total norm of the parameters, need total_norm**(1/norm) before using. + gradient norm for all layers, need total_norm**(1/norm) before using. """ enable_cuda_kernels = gradients[0].device.type == "cuda" # Norm parameters. norm_type = float(norm_type) - + total_layer_norms = copy.deepcopy(global_layer_norms) + layer_grads = {} # Calculate norm. if norm_type == inf: - total_norm = max(g.data.abs().max() for g in gradients) - total_norm_cuda = torch.FloatTensor([float(total_norm)], device=gradients[0].device) + for g, p in zip(gradients, parameters): + layer_name = p.layer_name if hasattr(p, "layer_name") else "unknown" + if layer_name not in layer_grads: + layer_grads[layer_name] = [] + layer_grads[layer_name].append(g) + for layer_name, grads in layer_grads.items(): + layer_norm = max(g.data.abs().max() for g in grads) + total_layer_norms[layer_name] = max(total_layer_norms[layer_name], float(layer_norm)) if last_stage is False: - return total_norm_cuda + return total_layer_norms - if previous_norm is not None: - total_norm_cuda = max(total_norm_cuda, previous_norm) + if previous_layer_norms is not None: + for key, value in previous_layer_norms.items(): + total_layer_norms[key] = max(value, total_layer_norms[key]) + + total_layer_norms_values = move_norm_to_cuda(torch.Tensor(list(total_layer_norms.values()))) + total_layer_norms_keys = list(global_layer_norms.keys()) # Take max across all model-parallel GPUs. - if gpc.get_world_size(ParallelMode.MODEL) > 1: + if gpc.is_initialized(ParallelMode.MODEL): dist.all_reduce( - total_norm_cuda, + total_layer_norms_values, op=dist.ReduceOp.MAX, group=gpc.get_group(ParallelMode.MODEL), ) - total_norm = total_norm_cuda[0].item() + for idx in range(len(total_layer_norms_keys)): + layer_norm = total_layer_norms_values[idx] + if torch.is_tensor(layer_norm): + layer_norm = layer_norm.item() + total_layer_norms[total_layer_norms_keys[idx]] = layer_norm + else: - tensor_parallel_grads = [] for g, p in zip(gradients, parameters): # TODO: consider the pipeline shared parameter + layer_name = p.layer_name if hasattr(p, "layer_name") else "unknown" + if layer_name not in layer_grads: + layer_grads[layer_name] = [] if ( gpc.is_initialized(ParallelMode.PIPELINE) and hasattr(p, "pipeline_shared_module_pg") and dist.get_rank(p.pipeline_shared_module_pg) == 0 ): # if shared between different pipe, only count o - tensor_parallel_grads.append(g.data.float()) + layer_grads[layer_name].append(g.data.float()) elif ( gpc.is_initialized(ParallelMode.PIPELINE) and hasattr(p, "pipeline_shared_module_pg") @@ -267,56 +287,50 @@ def compute_norm( and not is_model_parallel_parameter(p) and gpc.get_local_rank(ParallelMode.TENSOR) == 0 ): # if not used in each chunk, such as layernorm - tensor_parallel_grads.append(g.data.float()) + layer_grads[layer_name].append(g.data.float()) elif is_model_parallel_parameter(p): - tensor_parallel_grads.append(g.data.float()) + layer_grads[layer_name].append(g.data.float()) elif gpc.get_local_rank(ParallelMode.TENSOR) != 0: continue else: raise RuntimeError("Should not arrive here") - if norm_type == 2.0 and enable_cuda_kernels: - tensor_parallel_norm = calc_l2_norm(tensor_parallel_grads) ** norm_type - else: - tensor_parallel_norm = calc_lp(tensor_parallel_grads, norm_type) - - # If norm is type of float, then we convert them into torch.Tensor. - tensor_parallel_norm = get_tensor_norm(tensor_parallel_norm, enable_cuda_kernels) - # If grads are on CPU, the norms is also on CPU. Cast them to CUDA tensors - if not enable_cuda_kernels: - tensor_parallel_norm = move_norm_to_cuda(tensor_parallel_norm) - - total_norm = tensor_parallel_norm + # calculate lay norm + for layer_name, grads in layer_grads.items(): + if norm_type == 2.0 and enable_cuda_kernels: + layer_norm = calc_l2_norm(grads) ** norm_type + else: + layer_norm = calc_lp(grads, norm_type) + total_layer_norms[layer_name] += layer_norm.item() if torch.is_tensor(layer_norm) else layer_norm if last_stage is False: - return total_norm + return total_layer_norms - if previous_norm is not None: - total_norm = total_norm + previous_norm + if previous_layer_norms is not None: + for key, value in previous_layer_norms.items(): + total_layer_norms[key] += value + # sync layer norm # Sum across all model-parallel GPUs. + total_layer_norms_values = move_norm_to_cuda(torch.Tensor(list(total_layer_norms.values()))) + total_layer_norms_keys = list(total_layer_norms.keys()) + if gpc.is_initialized(ParallelMode.MODEL): - dist.all_reduce( - total_norm, - op=dist.ReduceOp.SUM, - group=gpc.get_group(ParallelMode.MODEL), - ) + dist.all_reduce(total_layer_norms_values, op=dist.ReduceOp.SUM, group=gpc.get_group(ParallelMode.MODEL)) + dist.all_reduce(total_layer_norms_values, op=dist.ReduceOp.SUM, group=gpc.get_group(zero_mode)) - # This is because we use zero1, so we need to use this reduction. - # TODO: Check zero group to be a subset of dp group. - dist.all_reduce(total_norm, op=dist.ReduceOp.SUM, group=gpc.get_group(zero_mode)) + for idx in range(len(total_layer_norms_keys)): + layer_norm = total_layer_norms_values[idx] + if torch.is_tensor(layer_norm): + layer_norm = layer_norm.item() + if layer_norm == float("inf") or layer_norm == -float("inf"): + layer_norm = -1 - if torch.is_tensor(total_norm): - total_norm = total_norm.item() + if math.isnan(layer_norm): + layer_norm = -2 + total_layer_norms[total_layer_norms_keys[idx]] = layer_norm - # Scale. - if total_norm == float("inf") or total_norm == -float("inf"): - total_norm = -1 - - if math.isnan(total_norm): - total_norm = -2 - - return total_norm + return total_layer_norms class BaseGradScaler(ABC): @@ -507,18 +521,32 @@ class ParamBcastSyncHandler: for _chunk in model: if isinstance(_chunk, NaiveAMPModel): _chunk = _chunk.model - - for _, children in _chunk.named_children(): + # if gpc.is_rank_for_log(): + # logger.info(_chunk) + # [ name for name , _ in model.model.named_children()] + for name, children in _chunk.named_children(): # should be the transformer block definaton in modeling_xxx.py if isinstance(children, nn.ModuleList): # record the block that a parameter belongs to - for _, block in enumerate(children): + for idx, block in enumerate(children): # self._block_to_param[f"{name}.{idx}"] = list(block.parameters()) self._block_to_param[block] = list(block.parameters()) + for parameter in self._block_to_param[block]: + layer_name = f"{block.__class__.__name__}.{idx}" + # if gpc.is_rank_for_log(): + # logger.info(layer_name) + global_layer_norms[layer_name] = 0.0 + parameter.__setattr__("layer_name", layer_name) else: # record the block that a parameter belongs to # self._block_to_param[name] = list(children.parameters()) self._block_to_param[children] = list(children.parameters()) + for parameter in self._block_to_param[children]: + layer_name = f"{children.__class__.__name__}" + # if gpc.is_rank_for_log(): + # logger.info(layer_name) + # global_layer_norms[layer_name] = 0.0 + parameter.__setattr__("layer_name", name) alloc_num = 0 rank_to_go = 0