add function: reduce grads

pull/429/head
JiaoPL 2023-10-23 15:59:37 +08:00
parent 45fd0ec86b
commit 1e263988cf
1 changed files with 45 additions and 56 deletions

View File

@ -209,6 +209,49 @@ def calc_lp(grads, norm_type):
return norm
def reduce_grads(gradients, parameters, fine_grained=False):
parallel_grads = []
if fine_grained:
parallel_grads = {}
def append_grad(g, p):
if fine_grained:
param_name = p.param_name if hasattr(p, "param_name") else "unknown-padding"
if param_name not in parallel_grads:
parallel_grads[param_name] = []
parallel_grads[param_name].append(g.data.float())
else:
parallel_grads.append(g.data.float())
for g, p in zip(gradients, parameters):
# TODO: consider the pipeline shared parameter
if (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
and dist.get_rank(p.pipeline_shared_module_pg) == 0
): # if shared between different pipe, only count o
append_grad(g, p)
elif (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
and dist.get_rank(p.pipeline_shared_module_pg) != 0
):
continue
elif (
gpc.is_initialized(ParallelMode.TENSOR)
and not is_model_parallel_parameter(p)
and gpc.get_local_rank(ParallelMode.TENSOR) == 0
): # if not used in each chunk, such as layernorm
append_grad(g, p)
elif is_model_parallel_parameter(p):
append_grad(g, p)
elif gpc.get_local_rank(ParallelMode.TENSOR) != 0:
continue
else:
raise RuntimeError("Should not arrive here")
return parallel_grads
def compute_norm(
gradients, parameters, last_stage=False, previous_norm=None, norm_type=2, zero_mode=ParallelMode.ZERO1
):
@ -247,33 +290,7 @@ def compute_norm(
)
total_norm = total_norm_cuda[0].item()
else:
tensor_parallel_grads = []
for g, p in zip(gradients, parameters):
# TODO: consider the pipeline shared parameter
if (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
and dist.get_rank(p.pipeline_shared_module_pg) == 0
): # if shared between different pipe, only count o
tensor_parallel_grads.append(g.data.float())
elif (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
and dist.get_rank(p.pipeline_shared_module_pg) != 0
):
continue
elif (
gpc.is_initialized(ParallelMode.TENSOR)
and not is_model_parallel_parameter(p)
and gpc.get_local_rank(ParallelMode.TENSOR) == 0
): # if not used in each chunk, such as layernorm
tensor_parallel_grads.append(g.data.float())
elif is_model_parallel_parameter(p):
tensor_parallel_grads.append(g.data.float())
elif gpc.get_local_rank(ParallelMode.TENSOR) != 0:
continue
else:
raise RuntimeError("Should not arrive here")
tensor_parallel_grads = reduce_grads(gradients, parameters)
if norm_type == 2.0 and enable_cuda_kernels:
tensor_parallel_norm = calc_l2_norm(tensor_parallel_grads) ** norm_type
@ -343,35 +360,7 @@ def compute_param_norm(
norm_type = float(norm_type)
total_param_norms = {}
param_grads = {}
for g, p in zip(gradients, parameters):
param_name = p.param_name if hasattr(p, "param_name") else "unknown-padding"
if param_name not in param_grads:
param_grads[param_name] = []
if (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
and dist.get_rank(p.pipeline_shared_module_pg) == 0
): # if shared between different pipe, only count o
param_grads[param_name].append(g.data.float())
elif (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
and dist.get_rank(p.pipeline_shared_module_pg) != 0
):
continue
elif (
gpc.is_initialized(ParallelMode.TENSOR)
and not is_model_parallel_parameter(p)
and gpc.get_local_rank(ParallelMode.TENSOR) == 0
): # if not used in each chunk, such as layernorm
param_grads[param_name].append(g.data.float())
elif is_model_parallel_parameter(p):
param_grads[param_name].append(g.data.float())
elif gpc.get_local_rank(ParallelMode.TENSOR) != 0:
continue
else:
raise RuntimeError("Should not arrive here")
param_grads = reduce_grads(gradients, parameters, fine_grained=True)
param_norms = {}
for param_name, grads in param_grads.items():