compute layer norms and replace total_norm with it

pull/412/head
JiaoPL 2023-10-12 21:25:30 +08:00
parent b3645b0244
commit a94f429a67
2 changed files with 92 additions and 69 deletions

View File

@ -494,11 +494,7 @@ class HybridZeroOptimizer(BaseOptimizer):
# Gradients may not be fully synchronized here.
def _compute_norm_with_stage(
self,
group_id: int = 0,
last_bucket: bool = False,
last_stage: bool = False,
previous_norm=None,
self, group_id: int = 0, last_bucket: bool = False, last_stage: bool = False, previous_layer_norms=None
):
# compute norm for gradients that have been reduced
params, grads = self._param_store.get_reduced_param_for_compute_norm(group_id=group_id, last_bucket=last_bucket)
@ -507,18 +503,18 @@ class HybridZeroOptimizer(BaseOptimizer):
grads = [self.padding_grad.to(dtype)]
params = [self.padding_tensor.to(dtype)]
norm = 0
layer_norm = None
if self._clip_grad_norm > 0:
# this norm is before scaling, it will be very large
norm = compute_norm(
layer_norm = compute_norm(
gradients=grads,
parameters=params,
last_stage=last_stage,
previous_norm=previous_norm,
previous_layer_norms=previous_layer_norms,
zero_mode=self._broadcast_parallel_mode[group_id],
)
return norm
return layer_norm
@llm_timeout(func_name="optim_step")
def step(self, closure=None):
@ -546,10 +542,10 @@ class HybridZeroOptimizer(BaseOptimizer):
self._reduce_grads_stored_in_bucket(self._bucket_store[group_id], reduce_rank=None, last_bucket=True)
# compute norm for gradients in the before bucket
groups_norms = []
groups_layer_norms = []
for group_id in range(self.num_param_groups):
groups_norms.append(self._compute_norm_with_stage(group_id=group_id))
layer_norm = self._compute_norm_with_stage(group_id=group_id)
groups_layer_norms.append(layer_norm)
# clear reduced grads
# grads in the last bucket is reduced
for bucket in self._bucket_in_progress:
@ -561,15 +557,14 @@ class HybridZeroOptimizer(BaseOptimizer):
# compute norm for gradients in the last bucket
total_norms = {}
total_layernorms = {}
for group_id in range(self.num_param_groups):
group_name = self.param_groups[group_id]["name"] if "name" in self.param_groups[group_id] else "default"
group_name = f"{group_id}_{group_name}"
total_norms[group_name] = self._compute_norm_with_stage(
group_id=group_id,
last_bucket=True,
last_stage=True,
previous_norm=groups_norms[group_id],
total_layernorms[group_name] = self._compute_norm_with_stage(
group_id=group_id, last_bucket=True, last_stage=True, previous_layer_norms=groups_layer_norms[group_id]
)
total_norms[group_name] = sum(total_layernorms[group_name].values())
# Need to allreduce(avg) the norms across different ranks because moe params will not be synced
# during allreduce

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import copy
import math
from abc import ABC, abstractmethod
from collections import OrderedDict
@ -15,7 +16,7 @@ from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
from internlm.core.context import ParallelMode
from internlm.core.context import global_context as gpc
from internlm.core.naive_amp import NaiveAMPModel
from internlm.utils.common import get_tensor_norm, move_norm_to_cuda
from internlm.utils.common import move_norm_to_cuda
from internlm.utils.logger import get_logger
from internlm.utils.parallel import is_model_parallel_parameter
@ -31,6 +32,7 @@ except (ModuleNotFoundError, ImportError):
APEX_AVAILABLE = False
inf = math.inf
global_layer_norms = {"unknown": 0.0, "embedding": 0.0, "norm": 0.0, "head": 0.0}
def flatten(input_):
@ -210,7 +212,7 @@ def calc_lp(grads, norm_type):
def compute_norm(
gradients, parameters, last_stage=False, previous_norm=None, norm_type=2, zero_mode=ParallelMode.ZERO1
gradients, parameters, last_stage=False, previous_layer_norms=None, norm_type=2, zero_mode=ParallelMode.ZERO1
):
"""Get the norm
Arguments:
@ -220,42 +222,60 @@ def compute_norm(
infinity norm.
Returns:
Total norm of the parameters, need total_norm**(1/norm) before using.
gradient norm for all layers, need total_norm**(1/norm) before using.
"""
enable_cuda_kernels = gradients[0].device.type == "cuda"
# Norm parameters.
norm_type = float(norm_type)
total_layer_norms = copy.deepcopy(global_layer_norms)
layer_grads = {}
# Calculate norm.
if norm_type == inf:
total_norm = max(g.data.abs().max() for g in gradients)
total_norm_cuda = torch.FloatTensor([float(total_norm)], device=gradients[0].device)
for g, p in zip(gradients, parameters):
layer_name = p.layer_name if hasattr(p, "layer_name") else "unknown"
if layer_name not in layer_grads:
layer_grads[layer_name] = []
layer_grads[layer_name].append(g)
for layer_name, grads in layer_grads.items():
layer_norm = max(g.data.abs().max() for g in grads)
total_layer_norms[layer_name] = max(total_layer_norms[layer_name], float(layer_norm))
if last_stage is False:
return total_norm_cuda
return total_layer_norms
if previous_norm is not None:
total_norm_cuda = max(total_norm_cuda, previous_norm)
if previous_layer_norms is not None:
for key, value in previous_layer_norms.items():
total_layer_norms[key] = max(value, total_layer_norms[key])
total_layer_norms_values = move_norm_to_cuda(torch.Tensor(list(total_layer_norms.values())))
total_layer_norms_keys = list(global_layer_norms.keys())
# Take max across all model-parallel GPUs.
if gpc.get_world_size(ParallelMode.MODEL) > 1:
if gpc.is_initialized(ParallelMode.MODEL):
dist.all_reduce(
total_norm_cuda,
total_layer_norms_values,
op=dist.ReduceOp.MAX,
group=gpc.get_group(ParallelMode.MODEL),
)
total_norm = total_norm_cuda[0].item()
for idx in range(len(total_layer_norms_keys)):
layer_norm = total_layer_norms_values[idx]
if torch.is_tensor(layer_norm):
layer_norm = layer_norm.item()
total_layer_norms[total_layer_norms_keys[idx]] = layer_norm
else:
tensor_parallel_grads = []
for g, p in zip(gradients, parameters):
# TODO: consider the pipeline shared parameter
layer_name = p.layer_name if hasattr(p, "layer_name") else "unknown"
if layer_name not in layer_grads:
layer_grads[layer_name] = []
if (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
and dist.get_rank(p.pipeline_shared_module_pg) == 0
): # if shared between different pipe, only count o
tensor_parallel_grads.append(g.data.float())
layer_grads[layer_name].append(g.data.float())
elif (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
@ -267,56 +287,50 @@ def compute_norm(
and not is_model_parallel_parameter(p)
and gpc.get_local_rank(ParallelMode.TENSOR) == 0
): # if not used in each chunk, such as layernorm
tensor_parallel_grads.append(g.data.float())
layer_grads[layer_name].append(g.data.float())
elif is_model_parallel_parameter(p):
tensor_parallel_grads.append(g.data.float())
layer_grads[layer_name].append(g.data.float())
elif gpc.get_local_rank(ParallelMode.TENSOR) != 0:
continue
else:
raise RuntimeError("Should not arrive here")
# calculate lay norm
for layer_name, grads in layer_grads.items():
if norm_type == 2.0 and enable_cuda_kernels:
tensor_parallel_norm = calc_l2_norm(tensor_parallel_grads) ** norm_type
layer_norm = calc_l2_norm(grads) ** norm_type
else:
tensor_parallel_norm = calc_lp(tensor_parallel_grads, norm_type)
# If norm is type of float, then we convert them into torch.Tensor.
tensor_parallel_norm = get_tensor_norm(tensor_parallel_norm, enable_cuda_kernels)
# If grads are on CPU, the norms is also on CPU. Cast them to CUDA tensors
if not enable_cuda_kernels:
tensor_parallel_norm = move_norm_to_cuda(tensor_parallel_norm)
total_norm = tensor_parallel_norm
layer_norm = calc_lp(grads, norm_type)
total_layer_norms[layer_name] += layer_norm.item() if torch.is_tensor(layer_norm) else layer_norm
if last_stage is False:
return total_norm
return total_layer_norms
if previous_norm is not None:
total_norm = total_norm + previous_norm
if previous_layer_norms is not None:
for key, value in previous_layer_norms.items():
total_layer_norms[key] += value
# sync layer norm
# Sum across all model-parallel GPUs.
total_layer_norms_values = move_norm_to_cuda(torch.Tensor(list(total_layer_norms.values())))
total_layer_norms_keys = list(total_layer_norms.keys())
if gpc.is_initialized(ParallelMode.MODEL):
dist.all_reduce(
total_norm,
op=dist.ReduceOp.SUM,
group=gpc.get_group(ParallelMode.MODEL),
)
dist.all_reduce(total_layer_norms_values, op=dist.ReduceOp.SUM, group=gpc.get_group(ParallelMode.MODEL))
dist.all_reduce(total_layer_norms_values, op=dist.ReduceOp.SUM, group=gpc.get_group(zero_mode))
# This is because we use zero1, so we need to use this reduction.
# TODO: Check zero group to be a subset of dp group.
dist.all_reduce(total_norm, op=dist.ReduceOp.SUM, group=gpc.get_group(zero_mode))
for idx in range(len(total_layer_norms_keys)):
layer_norm = total_layer_norms_values[idx]
if torch.is_tensor(layer_norm):
layer_norm = layer_norm.item()
if layer_norm == float("inf") or layer_norm == -float("inf"):
layer_norm = -1
if torch.is_tensor(total_norm):
total_norm = total_norm.item()
if math.isnan(layer_norm):
layer_norm = -2
total_layer_norms[total_layer_norms_keys[idx]] = layer_norm
# Scale.
if total_norm == float("inf") or total_norm == -float("inf"):
total_norm = -1
if math.isnan(total_norm):
total_norm = -2
return total_norm
return total_layer_norms
class BaseGradScaler(ABC):
@ -507,18 +521,32 @@ class ParamBcastSyncHandler:
for _chunk in model:
if isinstance(_chunk, NaiveAMPModel):
_chunk = _chunk.model
for _, children in _chunk.named_children():
# if gpc.is_rank_for_log():
# logger.info(_chunk)
# [ name for name , _ in model.model.named_children()]
for name, children in _chunk.named_children():
# should be the transformer block definaton in modeling_xxx.py
if isinstance(children, nn.ModuleList):
# record the block that a parameter belongs to
for _, block in enumerate(children):
for idx, block in enumerate(children):
# self._block_to_param[f"{name}.{idx}"] = list(block.parameters())
self._block_to_param[block] = list(block.parameters())
for parameter in self._block_to_param[block]:
layer_name = f"{block.__class__.__name__}.{idx}"
# if gpc.is_rank_for_log():
# logger.info(layer_name)
global_layer_norms[layer_name] = 0.0
parameter.__setattr__("layer_name", layer_name)
else:
# record the block that a parameter belongs to
# self._block_to_param[name] = list(children.parameters())
self._block_to_param[children] = list(children.parameters())
for parameter in self._block_to_param[children]:
layer_name = f"{children.__class__.__name__}"
# if gpc.is_rank_for_log():
# logger.info(layer_name)
# global_layer_norms[layer_name] = 0.0
parameter.__setattr__("layer_name", name)
alloc_num = 0
rank_to_go = 0