diff --git a/internlm/solver/optimizer/hybrid_zero_optim.py b/internlm/solver/optimizer/hybrid_zero_optim.py index 97004eb..e3f9608 100644 --- a/internlm/solver/optimizer/hybrid_zero_optim.py +++ b/internlm/solver/optimizer/hybrid_zero_optim.py @@ -34,7 +34,7 @@ from internlm.utils.megatron_timers import megatron_timer as timer from internlm.utils.timeout import llm_timeout from .base_optimizer import BaseOptimizer -from .utils import compute_norm +from .utils import compute_layer_norm, compute_norm, compute_param_norm inf = math.inf logger = get_logger(__file__) @@ -520,6 +520,29 @@ class HybridZeroOptimizer(BaseOptimizer): return norm + def _compute_param_norm_stage( + self, group_id: int = 0, last_bucket: bool = False, last_stage: bool = False, previous_param_norms=None + ): + # compute norm for gradients that have been reduced + params, grads = self._param_store.get_reduced_param_for_compute_norm(group_id=group_id, last_bucket=last_bucket) + + total_param_norms = {} + if len(params) == 0: + dtype = self.param_groups[group_id]["dtype"] + grads = [self.padding_grad.to(dtype)] + params = [self.padding_tensor.to(dtype)] + + if self._clip_grad_norm > 0: + total_param_norms = compute_param_norm( + grads, + params, + last_stage=last_stage, + previous_param_norms=previous_param_norms, + zero_mode=self._broadcast_parallel_mode[group_id], + is_moe_group=self._is_moe_group(self.optim.param_groups[group_id]), + ) + return total_param_norms + @llm_timeout(func_name="optim_step") def step(self, closure=None): """Performs a single optimization step. @@ -547,8 +570,11 @@ class HybridZeroOptimizer(BaseOptimizer): # compute norm for gradients in the before bucket groups_norms = [] + groups_param_norms = [] for group_id in range(self.num_param_groups): groups_norms.append(self._compute_norm_with_stage(group_id=group_id)) + if gpc.config.get("grad_norm_profiling", False): + groups_param_norms.append(self._compute_param_norm_stage(group_id=group_id)) # clear reduced grads # grads in the last bucket is reduced @@ -561,6 +587,8 @@ class HybridZeroOptimizer(BaseOptimizer): # compute norm for gradients in the last bucket total_norms = {} + total_param_norms = {} + total_layer_norms = {} for group_id in range(self.num_param_groups): group_name = self.param_groups[group_id]["name"] if "name" in self.param_groups[group_id] else "default" group_name = f"{group_id}_{group_name}" @@ -570,6 +598,16 @@ class HybridZeroOptimizer(BaseOptimizer): last_stage=True, previous_norm=groups_norms[group_id], ) + if gpc.config.get("grad_norm_profiling", False): + param_norms = self._compute_param_norm_stage( + group_id=group_id, + last_bucket=True, + last_stage=True, + previous_param_norms=groups_param_norms[group_id], + ) + total_layer_norms[group_name], total_param_norms[group_name] = compute_layer_norm( + param_norms=param_norms, loss_scale=self.loss_scale.item() + ) # Need to allreduce(avg) the norms across different ranks because moe params will not be synced # during allreduce @@ -585,7 +623,12 @@ class HybridZeroOptimizer(BaseOptimizer): self._sync_grad() timer("sync_grad").stop() - return self._step(closure=closure, norms=total_norms) + state, global_norms = self._step(closure=closure, norms=total_norms) + if gpc.config.get("grad_norm_profiling", False): + global_norms["layer_norms"] = total_layer_norms + global_norms["param_norms"] = total_param_norms + + return state, global_norms def _step(self, closure=None, norms=None): assert closure is None, "closure is not supported by step()" diff --git a/internlm/solver/optimizer/utils.py b/internlm/solver/optimizer/utils.py index f4816a7..982a246 100644 --- a/internlm/solver/optimizer/utils.py +++ b/internlm/solver/optimizer/utils.py @@ -15,7 +15,7 @@ from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors from internlm.core.context import ParallelMode from internlm.core.context import global_context as gpc from internlm.core.naive_amp import NaiveAMPModel -from internlm.utils.common import get_tensor_norm, move_norm_to_cuda +from internlm.utils.common import get_current_device, get_tensor_norm, move_norm_to_cuda from internlm.utils.logger import get_logger from internlm.utils.parallel import is_model_parallel_parameter @@ -209,6 +209,49 @@ def calc_lp(grads, norm_type): return norm +def reduce_grads(gradients, parameters, fine_grained=False): + parallel_grads = [] + if fine_grained: + parallel_grads = {} + + def append_grad(g, p): + if fine_grained: + param_name = p.param_name if hasattr(p, "param_name") else "unknown-padding" + if param_name not in parallel_grads: + parallel_grads[param_name] = [] + parallel_grads[param_name].append(g.data.float()) + else: + parallel_grads.append(g.data.float()) + + for g, p in zip(gradients, parameters): + # TODO: consider the pipeline shared parameter + if ( + gpc.is_initialized(ParallelMode.PIPELINE) + and hasattr(p, "pipeline_shared_module_pg") + and dist.get_rank(p.pipeline_shared_module_pg) == 0 + ): # if shared between different pipe, only count o + append_grad(g, p) + elif ( + gpc.is_initialized(ParallelMode.PIPELINE) + and hasattr(p, "pipeline_shared_module_pg") + and dist.get_rank(p.pipeline_shared_module_pg) != 0 + ): + continue + elif ( + gpc.is_initialized(ParallelMode.TENSOR) + and not is_model_parallel_parameter(p) + and gpc.get_local_rank(ParallelMode.TENSOR) == 0 + ): # if not used in each chunk, such as layernorm + append_grad(g, p) + elif is_model_parallel_parameter(p): + append_grad(g, p) + elif gpc.get_local_rank(ParallelMode.TENSOR) != 0: + continue + else: + raise RuntimeError("Should not arrive here") + return parallel_grads + + def compute_norm( gradients, parameters, last_stage=False, previous_norm=None, norm_type=2, zero_mode=ParallelMode.ZERO1 ): @@ -247,33 +290,7 @@ def compute_norm( ) total_norm = total_norm_cuda[0].item() else: - tensor_parallel_grads = [] - for g, p in zip(gradients, parameters): - # TODO: consider the pipeline shared parameter - if ( - gpc.is_initialized(ParallelMode.PIPELINE) - and hasattr(p, "pipeline_shared_module_pg") - and dist.get_rank(p.pipeline_shared_module_pg) == 0 - ): # if shared between different pipe, only count o - tensor_parallel_grads.append(g.data.float()) - elif ( - gpc.is_initialized(ParallelMode.PIPELINE) - and hasattr(p, "pipeline_shared_module_pg") - and dist.get_rank(p.pipeline_shared_module_pg) != 0 - ): - continue - elif ( - gpc.is_initialized(ParallelMode.TENSOR) - and not is_model_parallel_parameter(p) - and gpc.get_local_rank(ParallelMode.TENSOR) == 0 - ): # if not used in each chunk, such as layernorm - tensor_parallel_grads.append(g.data.float()) - elif is_model_parallel_parameter(p): - tensor_parallel_grads.append(g.data.float()) - elif gpc.get_local_rank(ParallelMode.TENSOR) != 0: - continue - else: - raise RuntimeError("Should not arrive here") + tensor_parallel_grads = reduce_grads(gradients, parameters) if norm_type == 2.0 and enable_cuda_kernels: tensor_parallel_norm = calc_l2_norm(tensor_parallel_grads) ** norm_type @@ -319,6 +336,124 @@ def compute_norm( return total_norm +def compute_param_norm( + gradients, + parameters, + last_stage=False, + previous_param_norms=None, + norm_type=2, + zero_mode=ParallelMode.ZERO1, + is_moe_group=False, +): + """Get the norm of params + Arguments: + gradients (Iterable[Tensor]): The gradient value. + parameters (Iterable[Tensor]): The parameter each gradient corresponds to. + norm_type (float or int): type of the used p-norm. Can be ``'inf'`` for + infinity norm. + + Returns: + The norm of the parameters. + """ + enable_cuda_kernels = gradients[0].device.type == "cuda" + # Norm parameters. + norm_type = float(norm_type) + total_param_norms = {} + + param_grads = reduce_grads(gradients, parameters, fine_grained=True) + + param_norms = {} + for param_name, grads in param_grads.items(): + if norm_type == inf: + param_norm = max(g.data.abs().max() for g in grads) + elif norm_type == 2.0 and enable_cuda_kernels: + param_norm = calc_l2_norm(grads) ** norm_type + else: + param_norm = calc_lp(grads, norm_type) + param_norms[param_name] = param_norm.item() if torch.is_tensor(param_norm) else param_norm + + if last_stage is False: + return param_norms + + if previous_param_norms is not None: + for key, value in previous_param_norms.items(): + if key not in param_norms: + param_norms[key] = value + continue + + if norm_type == inf: + param_norms[key] = max(param_norms[key], value) + else: + param_norms[key] += value + + # model parallel + model_parallel_param_norms = {} + if gpc.is_initialized(ParallelMode.MODEL): + parallel_param_norms = [None for _ in range(gpc.get_world_size(ParallelMode.MODEL))] + dist.all_gather_object(parallel_param_norms, param_norms, group=gpc.get_group(ParallelMode.MODEL)) + for local_param_norm in parallel_param_norms: + for param_name, param_norm in local_param_norm.items(): + if param_name not in model_parallel_param_norms: + model_parallel_param_norms[param_name] = 0.0 + if norm_type == inf: + model_parallel_param_norms[param_name] = max(model_parallel_param_norms[param_name], param_norm) + else: + model_parallel_param_norms[param_name] += param_norm + + # zero parallel + zero_param_norms = [None for _ in range(gpc.get_world_size(zero_mode))] + dist.all_gather_object(zero_param_norms, model_parallel_param_norms, group=gpc.get_group(zero_mode)) + for local_param_norm in zero_param_norms: + for param_name, param_norm in local_param_norm.items(): + if param_name not in total_param_norms: + total_param_norms[param_name] = 0.0 + if norm_type == inf: + total_param_norms[param_name] = max(total_param_norms[param_name], param_norm) + else: + total_param_norms[param_name] += param_norm + + # moe + if is_moe_group: + pg = gpc.get_group(ParallelMode.EXPERT) + scaled_param_norm = torch.cuda.FloatTensor(list(total_param_norms.values()), device=get_current_device()) + scaled_param_norm = scaled_param_norm / float(gpc.get_world_size(ParallelMode.EXPERT)) + dist.all_reduce(scaled_param_norm, group=pg) + for i, param_name in enumerate(total_param_norms.keys()): + total_param_norms[param_name] = scaled_param_norm[i].item() + + # scale + for param_name, param_norm in total_param_norms.items(): + if param_norm in (inf, -inf): + total_param_norms[param_name] = -1 + elif math.isnan(param_norm): + total_param_norms[param_name] = -2 + + return total_param_norms + + +def compute_layer_norm(param_norms, loss_scale): + """ + compute layer norm by parameter norms + """ + param_norms_groupby_layer = {} + layer_norms = {} + + for param_name, param_norm in param_norms.items(): + layer_name, param_key = param_name.split("-") + if layer_name not in param_norms_groupby_layer: + param_norms_groupby_layer[layer_name] = {} + if layer_name not in layer_norms: + layer_norms[layer_name] = 0.0 + + if param_norm not in (-1, -2): + param_norm = param_norm**0.5 / loss_scale + + param_norms_groupby_layer[layer_name][param_key] = param_norm + layer_norms[layer_name] += param_norm + + return layer_norms, param_norms_groupby_layer + + class BaseGradScaler(ABC): """A base class for the gradient scaler. diff --git a/internlm/train/training_internlm.py b/internlm/train/training_internlm.py index 7af58dd..ab1746e 100644 --- a/internlm/train/training_internlm.py +++ b/internlm/train/training_internlm.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- encoding: utf-8 -*- +import copy import functools import time from functools import partial @@ -52,7 +53,11 @@ from internlm.train.utils import create_param_groups from internlm.utils.common import DummyProfile from internlm.utils.logger import get_logger from internlm.utils.megatron_timers import megatron_timer as timer -from internlm.utils.parallel import sync_model_param, sync_model_param_within_tp +from internlm.utils.parallel import ( + set_model_params_layer_name, + sync_model_param, + sync_model_param_within_tp, +) from internlm.utils.registry import MODEL_INITIALIZER from internlm.utils.timeout import llm_timeout @@ -154,6 +159,10 @@ def initialize_optimizer(model: Union[nn.Module, nn.ModuleList]): Returns: A tuple of (optimizer, beta2_scheduler, lr_scheduler). """ + if gpc.config.get("grad_norm_profiling", False): + # set the layer name as an attribute of the model parameters + set_model_params_layer_name(model) + if gpc.config.hybrid_zero_optimizer.overlap_sync_param: param_bcast_sync_handler = ParamBcastSyncHandler(model) else: @@ -518,6 +527,21 @@ def record_current_batch_training_metrics( for key, value in acc_perplex.items(): infos[key] = value + if gpc.config.get("grad_norm_profiling", False): + layer_norms = copy.deepcopy(grad_norm["layer_norms"]) + param_norms = copy.deepcopy(grad_norm["param_norms"]) + for group_name, value in layer_norms.items(): + if value: + title = f"laye_norm_group_{group_name}" + writer.add_scalars(key=title, value=value, step=train_state.step_count) + for group_name, layer_group in param_norms.items(): + if layer_group: + for layer_name, param_group in layer_group.items(): + title = f"param_norm_{layer_name}_{group_name}" + writer.add_scalars(key=title, value=param_group, step=train_state.step_count) + del grad_norm["layer_norms"] + del grad_norm["param_norms"] + line = "" for key, value in infos.items(): line += f"{key}={value} " diff --git a/internlm/utils/parallel.py b/internlm/utils/parallel.py index 3029af5..6e5384f 100644 --- a/internlm/utils/parallel.py +++ b/internlm/utils/parallel.py @@ -2,9 +2,11 @@ # -*- encoding: utf-8 -*- import torch.distributed as dist +from torch import nn from internlm.core.context import IS_TENSOR_PARALLEL, ParallelMode from internlm.core.context import global_context as gpc +from internlm.core.naive_amp import NaiveAMPModel def is_model_parallel_parameter(p): @@ -61,3 +63,31 @@ def get_parallel_log_file_name(): f"tp={gpc.get_local_rank(ParallelMode.TENSOR)}_pp={gpc.get_local_rank(ParallelMode.PIPELINE)}" ) return log_file_name + + +def set_model_params_layer_name(model): + r"""Set the layer name as an attribute of the model parameters. + Args: + model (:class:`torch.nn.Module`): A pyTorch model on whose parameters you check the consistency. + """ + if not isinstance(model, nn.ModuleList): + model = [model] + + for _chunk in model: + if isinstance(_chunk, NaiveAMPModel): + _chunk = _chunk.model + # Create a unique layer name based on the block's class name and index + for _, children in _chunk.named_children(): + if isinstance(children, nn.ModuleList): + for idx, block in enumerate(children): + for param_name, param in block.named_parameters(): + layer_name = f"{block.__class__.__name__}Block{idx}" + layer_param_name = f"{layer_name}-{param_name}" + param.__setattr__("layer_name", layer_name) + param.__setattr__("param_name", layer_param_name) + else: + for param_name, param in children.named_parameters(): + layer_name = f"{children.__class__.__name__}" + layer_param_name = f"{layer_name}-{param_name}" + param.__setattr__("layer_name", layer_name) + param.__setattr__("param_name", f"{layer_name}-{param_name}")