feat(optimizer): add layer norm to tensorboard (#429)

* add layer norm to tensorboard

* test moe layer norm

* add function: reduce grads
pull/450/head
jiaopenglong 2023-10-23 17:07:04 +08:00 committed by GitHub
parent 140be20511
commit 949a0a1d55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 263 additions and 31 deletions

View File

@ -34,7 +34,7 @@ from internlm.utils.megatron_timers import megatron_timer as timer
from internlm.utils.timeout import llm_timeout
from .base_optimizer import BaseOptimizer
from .utils import compute_norm
from .utils import compute_layer_norm, compute_norm, compute_param_norm
inf = math.inf
logger = get_logger(__file__)
@ -520,6 +520,29 @@ class HybridZeroOptimizer(BaseOptimizer):
return norm
def _compute_param_norm_stage(
self, group_id: int = 0, last_bucket: bool = False, last_stage: bool = False, previous_param_norms=None
):
# compute norm for gradients that have been reduced
params, grads = self._param_store.get_reduced_param_for_compute_norm(group_id=group_id, last_bucket=last_bucket)
total_param_norms = {}
if len(params) == 0:
dtype = self.param_groups[group_id]["dtype"]
grads = [self.padding_grad.to(dtype)]
params = [self.padding_tensor.to(dtype)]
if self._clip_grad_norm > 0:
total_param_norms = compute_param_norm(
grads,
params,
last_stage=last_stage,
previous_param_norms=previous_param_norms,
zero_mode=self._broadcast_parallel_mode[group_id],
is_moe_group=self._is_moe_group(self.optim.param_groups[group_id]),
)
return total_param_norms
@llm_timeout(func_name="optim_step")
def step(self, closure=None):
"""Performs a single optimization step.
@ -547,8 +570,11 @@ class HybridZeroOptimizer(BaseOptimizer):
# compute norm for gradients in the before bucket
groups_norms = []
groups_param_norms = []
for group_id in range(self.num_param_groups):
groups_norms.append(self._compute_norm_with_stage(group_id=group_id))
if gpc.config.get("grad_norm_profiling", False):
groups_param_norms.append(self._compute_param_norm_stage(group_id=group_id))
# clear reduced grads
# grads in the last bucket is reduced
@ -561,6 +587,8 @@ class HybridZeroOptimizer(BaseOptimizer):
# compute norm for gradients in the last bucket
total_norms = {}
total_param_norms = {}
total_layer_norms = {}
for group_id in range(self.num_param_groups):
group_name = self.param_groups[group_id]["name"] if "name" in self.param_groups[group_id] else "default"
group_name = f"{group_id}_{group_name}"
@ -570,6 +598,16 @@ class HybridZeroOptimizer(BaseOptimizer):
last_stage=True,
previous_norm=groups_norms[group_id],
)
if gpc.config.get("grad_norm_profiling", False):
param_norms = self._compute_param_norm_stage(
group_id=group_id,
last_bucket=True,
last_stage=True,
previous_param_norms=groups_param_norms[group_id],
)
total_layer_norms[group_name], total_param_norms[group_name] = compute_layer_norm(
param_norms=param_norms, loss_scale=self.loss_scale.item()
)
# Need to allreduce(avg) the norms across different ranks because moe params will not be synced
# during allreduce
@ -585,7 +623,12 @@ class HybridZeroOptimizer(BaseOptimizer):
self._sync_grad()
timer("sync_grad").stop()
return self._step(closure=closure, norms=total_norms)
state, global_norms = self._step(closure=closure, norms=total_norms)
if gpc.config.get("grad_norm_profiling", False):
global_norms["layer_norms"] = total_layer_norms
global_norms["param_norms"] = total_param_norms
return state, global_norms
def _step(self, closure=None, norms=None):
assert closure is None, "closure is not supported by step()"

View File

@ -15,7 +15,7 @@ from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
from internlm.core.context import ParallelMode
from internlm.core.context import global_context as gpc
from internlm.core.naive_amp import NaiveAMPModel
from internlm.utils.common import get_tensor_norm, move_norm_to_cuda
from internlm.utils.common import get_current_device, get_tensor_norm, move_norm_to_cuda
from internlm.utils.logger import get_logger
from internlm.utils.parallel import is_model_parallel_parameter
@ -209,6 +209,49 @@ def calc_lp(grads, norm_type):
return norm
def reduce_grads(gradients, parameters, fine_grained=False):
parallel_grads = []
if fine_grained:
parallel_grads = {}
def append_grad(g, p):
if fine_grained:
param_name = p.param_name if hasattr(p, "param_name") else "unknown-padding"
if param_name not in parallel_grads:
parallel_grads[param_name] = []
parallel_grads[param_name].append(g.data.float())
else:
parallel_grads.append(g.data.float())
for g, p in zip(gradients, parameters):
# TODO: consider the pipeline shared parameter
if (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
and dist.get_rank(p.pipeline_shared_module_pg) == 0
): # if shared between different pipe, only count o
append_grad(g, p)
elif (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
and dist.get_rank(p.pipeline_shared_module_pg) != 0
):
continue
elif (
gpc.is_initialized(ParallelMode.TENSOR)
and not is_model_parallel_parameter(p)
and gpc.get_local_rank(ParallelMode.TENSOR) == 0
): # if not used in each chunk, such as layernorm
append_grad(g, p)
elif is_model_parallel_parameter(p):
append_grad(g, p)
elif gpc.get_local_rank(ParallelMode.TENSOR) != 0:
continue
else:
raise RuntimeError("Should not arrive here")
return parallel_grads
def compute_norm(
gradients, parameters, last_stage=False, previous_norm=None, norm_type=2, zero_mode=ParallelMode.ZERO1
):
@ -247,33 +290,7 @@ def compute_norm(
)
total_norm = total_norm_cuda[0].item()
else:
tensor_parallel_grads = []
for g, p in zip(gradients, parameters):
# TODO: consider the pipeline shared parameter
if (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
and dist.get_rank(p.pipeline_shared_module_pg) == 0
): # if shared between different pipe, only count o
tensor_parallel_grads.append(g.data.float())
elif (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
and dist.get_rank(p.pipeline_shared_module_pg) != 0
):
continue
elif (
gpc.is_initialized(ParallelMode.TENSOR)
and not is_model_parallel_parameter(p)
and gpc.get_local_rank(ParallelMode.TENSOR) == 0
): # if not used in each chunk, such as layernorm
tensor_parallel_grads.append(g.data.float())
elif is_model_parallel_parameter(p):
tensor_parallel_grads.append(g.data.float())
elif gpc.get_local_rank(ParallelMode.TENSOR) != 0:
continue
else:
raise RuntimeError("Should not arrive here")
tensor_parallel_grads = reduce_grads(gradients, parameters)
if norm_type == 2.0 and enable_cuda_kernels:
tensor_parallel_norm = calc_l2_norm(tensor_parallel_grads) ** norm_type
@ -319,6 +336,124 @@ def compute_norm(
return total_norm
def compute_param_norm(
gradients,
parameters,
last_stage=False,
previous_param_norms=None,
norm_type=2,
zero_mode=ParallelMode.ZERO1,
is_moe_group=False,
):
"""Get the norm of params
Arguments:
gradients (Iterable[Tensor]): The gradient value.
parameters (Iterable[Tensor]): The parameter each gradient corresponds to.
norm_type (float or int): type of the used p-norm. Can be ``'inf'`` for
infinity norm.
Returns:
The norm of the parameters.
"""
enable_cuda_kernels = gradients[0].device.type == "cuda"
# Norm parameters.
norm_type = float(norm_type)
total_param_norms = {}
param_grads = reduce_grads(gradients, parameters, fine_grained=True)
param_norms = {}
for param_name, grads in param_grads.items():
if norm_type == inf:
param_norm = max(g.data.abs().max() for g in grads)
elif norm_type == 2.0 and enable_cuda_kernels:
param_norm = calc_l2_norm(grads) ** norm_type
else:
param_norm = calc_lp(grads, norm_type)
param_norms[param_name] = param_norm.item() if torch.is_tensor(param_norm) else param_norm
if last_stage is False:
return param_norms
if previous_param_norms is not None:
for key, value in previous_param_norms.items():
if key not in param_norms:
param_norms[key] = value
continue
if norm_type == inf:
param_norms[key] = max(param_norms[key], value)
else:
param_norms[key] += value
# model parallel
model_parallel_param_norms = {}
if gpc.is_initialized(ParallelMode.MODEL):
parallel_param_norms = [None for _ in range(gpc.get_world_size(ParallelMode.MODEL))]
dist.all_gather_object(parallel_param_norms, param_norms, group=gpc.get_group(ParallelMode.MODEL))
for local_param_norm in parallel_param_norms:
for param_name, param_norm in local_param_norm.items():
if param_name not in model_parallel_param_norms:
model_parallel_param_norms[param_name] = 0.0
if norm_type == inf:
model_parallel_param_norms[param_name] = max(model_parallel_param_norms[param_name], param_norm)
else:
model_parallel_param_norms[param_name] += param_norm
# zero parallel
zero_param_norms = [None for _ in range(gpc.get_world_size(zero_mode))]
dist.all_gather_object(zero_param_norms, model_parallel_param_norms, group=gpc.get_group(zero_mode))
for local_param_norm in zero_param_norms:
for param_name, param_norm in local_param_norm.items():
if param_name not in total_param_norms:
total_param_norms[param_name] = 0.0
if norm_type == inf:
total_param_norms[param_name] = max(total_param_norms[param_name], param_norm)
else:
total_param_norms[param_name] += param_norm
# moe
if is_moe_group:
pg = gpc.get_group(ParallelMode.EXPERT)
scaled_param_norm = torch.cuda.FloatTensor(list(total_param_norms.values()), device=get_current_device())
scaled_param_norm = scaled_param_norm / float(gpc.get_world_size(ParallelMode.EXPERT))
dist.all_reduce(scaled_param_norm, group=pg)
for i, param_name in enumerate(total_param_norms.keys()):
total_param_norms[param_name] = scaled_param_norm[i].item()
# scale
for param_name, param_norm in total_param_norms.items():
if param_norm in (inf, -inf):
total_param_norms[param_name] = -1
elif math.isnan(param_norm):
total_param_norms[param_name] = -2
return total_param_norms
def compute_layer_norm(param_norms, loss_scale):
"""
compute layer norm by parameter norms
"""
param_norms_groupby_layer = {}
layer_norms = {}
for param_name, param_norm in param_norms.items():
layer_name, param_key = param_name.split("-")
if layer_name not in param_norms_groupby_layer:
param_norms_groupby_layer[layer_name] = {}
if layer_name not in layer_norms:
layer_norms[layer_name] = 0.0
if param_norm not in (-1, -2):
param_norm = param_norm**0.5 / loss_scale
param_norms_groupby_layer[layer_name][param_key] = param_norm
layer_norms[layer_name] += param_norm
return layer_norms, param_norms_groupby_layer
class BaseGradScaler(ABC):
"""A base class for the gradient scaler.

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import copy
import functools
import time
from functools import partial
@ -52,7 +53,11 @@ from internlm.train.utils import create_param_groups
from internlm.utils.common import DummyProfile
from internlm.utils.logger import get_logger
from internlm.utils.megatron_timers import megatron_timer as timer
from internlm.utils.parallel import sync_model_param, sync_model_param_within_tp
from internlm.utils.parallel import (
set_model_params_layer_name,
sync_model_param,
sync_model_param_within_tp,
)
from internlm.utils.registry import MODEL_INITIALIZER
from internlm.utils.timeout import llm_timeout
@ -154,6 +159,10 @@ def initialize_optimizer(model: Union[nn.Module, nn.ModuleList]):
Returns:
A tuple of (optimizer, beta2_scheduler, lr_scheduler).
"""
if gpc.config.get("grad_norm_profiling", False):
# set the layer name as an attribute of the model parameters
set_model_params_layer_name(model)
if gpc.config.hybrid_zero_optimizer.overlap_sync_param:
param_bcast_sync_handler = ParamBcastSyncHandler(model)
else:
@ -518,6 +527,21 @@ def record_current_batch_training_metrics(
for key, value in acc_perplex.items():
infos[key] = value
if gpc.config.get("grad_norm_profiling", False):
layer_norms = copy.deepcopy(grad_norm["layer_norms"])
param_norms = copy.deepcopy(grad_norm["param_norms"])
for group_name, value in layer_norms.items():
if value:
title = f"laye_norm_group_{group_name}"
writer.add_scalars(key=title, value=value, step=train_state.step_count)
for group_name, layer_group in param_norms.items():
if layer_group:
for layer_name, param_group in layer_group.items():
title = f"param_norm_{layer_name}_{group_name}"
writer.add_scalars(key=title, value=param_group, step=train_state.step_count)
del grad_norm["layer_norms"]
del grad_norm["param_norms"]
line = ""
for key, value in infos.items():
line += f"{key}={value} "

View File

@ -2,9 +2,11 @@
# -*- encoding: utf-8 -*-
import torch.distributed as dist
from torch import nn
from internlm.core.context import IS_TENSOR_PARALLEL, ParallelMode
from internlm.core.context import global_context as gpc
from internlm.core.naive_amp import NaiveAMPModel
def is_model_parallel_parameter(p):
@ -61,3 +63,31 @@ def get_parallel_log_file_name():
f"tp={gpc.get_local_rank(ParallelMode.TENSOR)}_pp={gpc.get_local_rank(ParallelMode.PIPELINE)}"
)
return log_file_name
def set_model_params_layer_name(model):
r"""Set the layer name as an attribute of the model parameters.
Args:
model (:class:`torch.nn.Module`): A pyTorch model on whose parameters you check the consistency.
"""
if not isinstance(model, nn.ModuleList):
model = [model]
for _chunk in model:
if isinstance(_chunk, NaiveAMPModel):
_chunk = _chunk.model
# Create a unique layer name based on the block's class name and index
for _, children in _chunk.named_children():
if isinstance(children, nn.ModuleList):
for idx, block in enumerate(children):
for param_name, param in block.named_parameters():
layer_name = f"{block.__class__.__name__}Block{idx}"
layer_param_name = f"{layer_name}-{param_name}"
param.__setattr__("layer_name", layer_name)
param.__setattr__("param_name", layer_param_name)
else:
for param_name, param in children.named_parameters():
layer_name = f"{children.__class__.__name__}"
layer_param_name = f"{layer_name}-{param_name}"
param.__setattr__("layer_name", layer_name)
param.__setattr__("param_name", f"{layer_name}-{param_name}")