From 1e263988cf33ad70f061c9927df29173c1e2e18c Mon Sep 17 00:00:00 2001 From: JiaoPL Date: Mon, 23 Oct 2023 15:59:37 +0800 Subject: [PATCH] add function: reduce grads --- internlm/solver/optimizer/utils.py | 101 +++++++++++++---------------- 1 file changed, 45 insertions(+), 56 deletions(-) diff --git a/internlm/solver/optimizer/utils.py b/internlm/solver/optimizer/utils.py index 9833309..982a246 100644 --- a/internlm/solver/optimizer/utils.py +++ b/internlm/solver/optimizer/utils.py @@ -209,6 +209,49 @@ def calc_lp(grads, norm_type): return norm +def reduce_grads(gradients, parameters, fine_grained=False): + parallel_grads = [] + if fine_grained: + parallel_grads = {} + + def append_grad(g, p): + if fine_grained: + param_name = p.param_name if hasattr(p, "param_name") else "unknown-padding" + if param_name not in parallel_grads: + parallel_grads[param_name] = [] + parallel_grads[param_name].append(g.data.float()) + else: + parallel_grads.append(g.data.float()) + + for g, p in zip(gradients, parameters): + # TODO: consider the pipeline shared parameter + if ( + gpc.is_initialized(ParallelMode.PIPELINE) + and hasattr(p, "pipeline_shared_module_pg") + and dist.get_rank(p.pipeline_shared_module_pg) == 0 + ): # if shared between different pipe, only count o + append_grad(g, p) + elif ( + gpc.is_initialized(ParallelMode.PIPELINE) + and hasattr(p, "pipeline_shared_module_pg") + and dist.get_rank(p.pipeline_shared_module_pg) != 0 + ): + continue + elif ( + gpc.is_initialized(ParallelMode.TENSOR) + and not is_model_parallel_parameter(p) + and gpc.get_local_rank(ParallelMode.TENSOR) == 0 + ): # if not used in each chunk, such as layernorm + append_grad(g, p) + elif is_model_parallel_parameter(p): + append_grad(g, p) + elif gpc.get_local_rank(ParallelMode.TENSOR) != 0: + continue + else: + raise RuntimeError("Should not arrive here") + return parallel_grads + + def compute_norm( gradients, parameters, last_stage=False, previous_norm=None, norm_type=2, zero_mode=ParallelMode.ZERO1 ): @@ -247,33 +290,7 @@ def compute_norm( ) total_norm = total_norm_cuda[0].item() else: - tensor_parallel_grads = [] - for g, p in zip(gradients, parameters): - # TODO: consider the pipeline shared parameter - if ( - gpc.is_initialized(ParallelMode.PIPELINE) - and hasattr(p, "pipeline_shared_module_pg") - and dist.get_rank(p.pipeline_shared_module_pg) == 0 - ): # if shared between different pipe, only count o - tensor_parallel_grads.append(g.data.float()) - elif ( - gpc.is_initialized(ParallelMode.PIPELINE) - and hasattr(p, "pipeline_shared_module_pg") - and dist.get_rank(p.pipeline_shared_module_pg) != 0 - ): - continue - elif ( - gpc.is_initialized(ParallelMode.TENSOR) - and not is_model_parallel_parameter(p) - and gpc.get_local_rank(ParallelMode.TENSOR) == 0 - ): # if not used in each chunk, such as layernorm - tensor_parallel_grads.append(g.data.float()) - elif is_model_parallel_parameter(p): - tensor_parallel_grads.append(g.data.float()) - elif gpc.get_local_rank(ParallelMode.TENSOR) != 0: - continue - else: - raise RuntimeError("Should not arrive here") + tensor_parallel_grads = reduce_grads(gradients, parameters) if norm_type == 2.0 and enable_cuda_kernels: tensor_parallel_norm = calc_l2_norm(tensor_parallel_grads) ** norm_type @@ -343,35 +360,7 @@ def compute_param_norm( norm_type = float(norm_type) total_param_norms = {} - param_grads = {} - for g, p in zip(gradients, parameters): - param_name = p.param_name if hasattr(p, "param_name") else "unknown-padding" - if param_name not in param_grads: - param_grads[param_name] = [] - if ( - gpc.is_initialized(ParallelMode.PIPELINE) - and hasattr(p, "pipeline_shared_module_pg") - and dist.get_rank(p.pipeline_shared_module_pg) == 0 - ): # if shared between different pipe, only count o - param_grads[param_name].append(g.data.float()) - elif ( - gpc.is_initialized(ParallelMode.PIPELINE) - and hasattr(p, "pipeline_shared_module_pg") - and dist.get_rank(p.pipeline_shared_module_pg) != 0 - ): - continue - elif ( - gpc.is_initialized(ParallelMode.TENSOR) - and not is_model_parallel_parameter(p) - and gpc.get_local_rank(ParallelMode.TENSOR) == 0 - ): # if not used in each chunk, such as layernorm - param_grads[param_name].append(g.data.float()) - elif is_model_parallel_parameter(p): - param_grads[param_name].append(g.data.float()) - elif gpc.get_local_rank(ParallelMode.TENSOR) != 0: - continue - else: - raise RuntimeError("Should not arrive here") + param_grads = reduce_grads(gradients, parameters, fine_grained=True) param_norms = {} for param_name, grads in param_grads.items():