From 867c8c2d3a90bbf55a5bedba80a3aeabe0299d0f Mon Sep 17 00:00:00 2001 From: Jiarui Fang Date: Fri, 13 Jan 2023 10:05:58 +0800 Subject: [PATCH] [zero] low level optim supports ProcessGroup (#2464) --- colossalai/zero/sharded_optim/_utils.py | 25 +++++-- .../sharded_optim/bookkeeping/base_store.py | 13 +++- .../sharded_optim/bookkeeping/bucket_store.py | 9 +-- .../bookkeeping/parameter_store.py | 8 ++- .../zero/sharded_optim/low_level_optim.py | 67 +++++++++++-------- .../language/gpt/gemini/train_gpt_demo.py | 17 +++-- .../test_zero/low_level_zero/test_grad_acc.py | 13 +++- .../test_zero/low_level_zero/test_zero1_2.py | 6 ++ 8 files changed, 106 insertions(+), 52 deletions(-) diff --git a/colossalai/zero/sharded_optim/_utils.py b/colossalai/zero/sharded_optim/_utils.py index 9a839a570..7369f8a2e 100644 --- a/colossalai/zero/sharded_optim/_utils.py +++ b/colossalai/zero/sharded_optim/_utils.py @@ -1,4 +1,5 @@ import math +from typing import Optional import torch import torch.distributed as dist @@ -7,6 +8,7 @@ from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors from colossalai.context import ParallelMode from colossalai.core import global_context as gpc +from colossalai.tensor import ProcessGroup from colossalai.utils import is_model_parallel_parameter @@ -101,7 +103,7 @@ def split_half_float_double(tensor_list): return buckets -def reduce_tensor(tensor, dtype=None, dst_rank=None, parallel_mode=ParallelMode.DATA): +def reduce_tensor_dp_group(tensor, dtype=None, dst_rank=None, pg: Optional[ProcessGroup] = None): """ Reduce the tensor in the data parallel process group @@ -114,7 +116,7 @@ def reduce_tensor(tensor, dtype=None, dst_rank=None, parallel_mode=ParallelMode. :type tensor: torch.Tensor :type dtype: torch.dtype, optional :type dst_rank: int, optional - :type parallel_mode: ParallelMode, optional + :type pg: ProcessGroup, optional """ # use the original dtype if dtype is None: @@ -126,8 +128,13 @@ def reduce_tensor(tensor, dtype=None, dst_rank=None, parallel_mode=ParallelMode. else: tensor_to_reduce = tensor - world_size = gpc.get_world_size(parallel_mode) - group = gpc.get_group(parallel_mode) + if isinstance(pg, ProcessGroup): + group = pg.dp_process_group() + world_size = pg.dp_world_size() + else: + world_size = gpc.get_world_size(ParallelMode.DATA) + group = gpc.get_group(ParallelMode.DATA) + tensor_to_reduce.div_(world_size) # if rank is None, all reduce will be used @@ -137,13 +144,19 @@ def reduce_tensor(tensor, dtype=None, dst_rank=None, parallel_mode=ParallelMode. if use_all_reduce: dist.all_reduce(tensor_to_reduce, group=group) else: - ranks_in_group = gpc.get_ranks_in_group(parallel_mode) + if pg is not None: + ranks_in_group = pg.dp_rank_list() + else: + ranks_in_group = gpc.get_ranks_in_group(ParallelMode.DATA) global_rank = ranks_in_group[dst_rank] dist.reduce(tensor=tensor_to_reduce, dst=global_rank, group=group) # recover the original dtype if tensor.dtype != dtype and tensor is not tensor_to_reduce: - local_rank = gpc.get_local_rank(parallel_mode) + if pg is not None: + local_rank = pg.dp_local_rank() + else: + local_rank = gpc.get_local_rank(ParallelMode.DATA) if use_all_reduce or dst_rank == local_rank: tensor.copy_(tensor_to_reduce) diff --git a/colossalai/zero/sharded_optim/bookkeeping/base_store.py b/colossalai/zero/sharded_optim/bookkeeping/base_store.py index d4436acaa..3623ed1f0 100644 --- a/colossalai/zero/sharded_optim/bookkeeping/base_store.py +++ b/colossalai/zero/sharded_optim/bookkeeping/base_store.py @@ -1,12 +1,19 @@ +from typing import Optional + from colossalai.context import ParallelMode from colossalai.core import global_context as gpc +from colossalai.tensor import ProcessGroup class BaseStore: - def __init__(self, dp_parallel_mode=ParallelMode.DATA): - self._world_size = gpc.get_world_size(dp_parallel_mode) - self._local_rank = gpc.get_local_rank(dp_parallel_mode) + def __init__(self, pg: Optional[ProcessGroup] = None): + if isinstance(pg, ProcessGroup): + self._world_size = pg.dp_world_size() + self._local_rank = pg.dp_local_rank() + else: + self._world_size = gpc.get_world_size(ParallelMode.DATA) + self._local_rank = gpc.get_local_rank(ParallelMode.DATA) @property def world_size(self): diff --git a/colossalai/zero/sharded_optim/bookkeeping/bucket_store.py b/colossalai/zero/sharded_optim/bookkeeping/bucket_store.py index 0f2b1bb88..aba61624e 100644 --- a/colossalai/zero/sharded_optim/bookkeeping/bucket_store.py +++ b/colossalai/zero/sharded_optim/bookkeeping/bucket_store.py @@ -1,13 +1,14 @@ -from colossalai.context import ParallelMode -from colossalai.core import global_context as gpc +from typing import Optional + +from colossalai.tensor import ProcessGroup from .base_store import BaseStore class BucketStore(BaseStore): - def __init__(self, dp_parallel_mode): - super().__init__(dp_parallel_mode) + def __init__(self, pg: Optional[ProcessGroup] = None): + super().__init__(pg) self._grads = dict() self._params = dict() self._num_elements_in_bucket = dict() diff --git a/colossalai/zero/sharded_optim/bookkeeping/parameter_store.py b/colossalai/zero/sharded_optim/bookkeeping/parameter_store.py index 09ebaaf99..c22186abe 100644 --- a/colossalai/zero/sharded_optim/bookkeeping/parameter_store.py +++ b/colossalai/zero/sharded_optim/bookkeeping/parameter_store.py @@ -1,14 +1,16 @@ -from typing import List +from typing import List, Optional from torch import Tensor +from colossalai.tensor import ProcessGroup + from .base_store import BaseStore class ParameterStore(BaseStore): - def __init__(self, dp_paralle_mode): - super().__init__(dp_paralle_mode) + def __init__(self, pg: Optional[ProcessGroup] = None): + super().__init__(pg) # param partitioning data structures self._fp16_param_to_rank = dict() self._rank_groupid_to_fp16_param_list = dict() diff --git a/colossalai/zero/sharded_optim/low_level_optim.py b/colossalai/zero/sharded_optim/low_level_optim.py index c437ac549..e372eaa50 100644 --- a/colossalai/zero/sharded_optim/low_level_optim.py +++ b/colossalai/zero/sharded_optim/low_level_optim.py @@ -1,5 +1,5 @@ from functools import partial -from itertools import groupby +from typing import Optional import torch import torch.distributed as dist @@ -10,6 +10,7 @@ from colossalai.context import ParallelMode from colossalai.core import global_context as gpc from colossalai.logging import get_dist_logger from colossalai.nn.optimizer import ColossalaiOptimizer +from colossalai.tensor import ProcessGroup from colossalai.utils.cuda import get_current_device from ._utils import ( @@ -18,7 +19,7 @@ from ._utils import ( flatten, get_grad_accumulate_object, has_inf_or_nan, - reduce_tensor, + reduce_tensor_dp_group, release_param_grad, split_half_float_double, sync_param, @@ -33,7 +34,7 @@ class LowLevelZeroOptimizer(ColossalaiOptimizer): def __init__( self, optimizer: Optimizer, - + pg: Optional[ProcessGroup] = None, # grad scaler config initial_scale=2**16, min_scale=1, @@ -54,9 +55,6 @@ class LowLevelZeroOptimizer(ColossalaiOptimizer): # stage 2 partition_grad=False, - dp_parallel_mode=ParallelMode.DATA, - mp_parallel_mode=ParallelMode.MODEL, - # cpu offload cpu_offload=False, @@ -76,21 +74,33 @@ class LowLevelZeroOptimizer(ColossalaiOptimizer): # stage 2 self._partition_grads = partition_grad - # cpu_offload self._cpu_offload = cpu_offload - # get process groups - self._dp_parallel_mode = dp_parallel_mode - self._mp_parallel_mode = mp_parallel_mode - self._local_rank = gpc.get_local_rank(dp_parallel_mode) - self._world_size = gpc.get_world_size(dp_parallel_mode) + self._pg = pg + if isinstance(pg, ProcessGroup): + self._local_rank = pg.dp_local_rank() + self._world_size = pg.dp_world_size() + self._dp_group = pg.dp_process_group() + if pg.tp_world_size() > 1: + self._mp_group = pg.tp_process_group() + else: + self._mp_group = None + elif pg is None: + dp_parallel_mode = ParallelMode.DATA + mp_parallel_mode = ParallelMode.MODEL - self._dp_group = gpc.get_group(dp_parallel_mode) - if gpc.is_initialized(mp_parallel_mode) and gpc.get_world_size(mp_parallel_mode) > 1: - self._mp_group = gpc.get_group(mp_parallel_mode) + self._dp_parallel_mode = dp_parallel_mode + self._mp_parallel_mode = mp_parallel_mode + self._local_rank = gpc.get_local_rank(dp_parallel_mode) + self._world_size = gpc.get_world_size(dp_parallel_mode) + + self._dp_group = gpc.get_group(dp_parallel_mode) + if gpc.is_initialized(mp_parallel_mode) and gpc.get_world_size(mp_parallel_mode) > 1: + self._mp_group = gpc.get_group(mp_parallel_mode) + else: + self._mp_group = None else: - self._mp_group = None - + raise TypeError(f"pg should be None or a ProcesGroup") # fp16 and fp32 params for mixed precision training self._fp16_param_groups = dict() self._fp32_flat_param_groups_of_current_rank = dict() @@ -126,9 +136,14 @@ class LowLevelZeroOptimizer(ColossalaiOptimizer): # ParameterStore will manage the tensor buffers used for zero # it will not manage the tensors used by mixed precision training - self._param_store = ParameterStore(self._dp_parallel_mode) - self._grad_store = GradientStore(self._dp_parallel_mode) - self._bucket_store = BucketStore(self._dp_parallel_mode) + if self._pg is not None: + self._param_store = ParameterStore(self._pg) + self._grad_store = GradientStore(self._pg) + self._bucket_store = BucketStore(self._pg) + else: + self._param_store = ParameterStore(self._dp_parallel_mode) + self._grad_store = GradientStore(self._dp_parallel_mode) + self._bucket_store = BucketStore(self._dp_parallel_mode) # iterate over the param group in the optimizer # partition these param groups for data parallel training @@ -223,9 +238,7 @@ class LowLevelZeroOptimizer(ColossalaiOptimizer): numel_per_rank[rank_to_go] += param.numel() if self._verbose: - self._logger.info(f'Number of elements on ranks: {numel_per_rank}', - ranks=[0], - parallel_mode=self._dp_parallel_mode) + self._logger.info(f'Number of elements on ranks: {numel_per_rank}', ranks=[0]) return params_per_rank def _sanity_checks(self): @@ -371,10 +384,10 @@ class LowLevelZeroOptimizer(ColossalaiOptimizer): with torch.cuda.stream(stream): flat = bucket.flatten() - reduced_flat = reduce_tensor(tensor=flat, - dtype=self._communication_dtype, - dst_rank=reduce_rank, - parallel_mode=self._dp_parallel_mode) + reduced_flat = reduce_tensor_dp_group(tensor=flat, + dtype=self._communication_dtype, + dst_rank=reduce_rank, + pg=self._pg) # update the reduced tensor if reduce_rank is None or reduce_rank == self._local_rank: diff --git a/examples/language/gpt/gemini/train_gpt_demo.py b/examples/language/gpt/gemini/train_gpt_demo.py index 92cb7393c..7bec980f9 100644 --- a/examples/language/gpt/gemini/train_gpt_demo.py +++ b/examples/language/gpt/gemini/train_gpt_demo.py @@ -290,14 +290,19 @@ def main(): from torch.distributed.optim import ZeroRedundancyOptimizer optimizer = ZeroRedundancyOptimizer(model.parameters(), optimizer_class=torch.optim.Adam, lr=0.01) elif args.distplan.startswith("zero"): + pg = ProcessGroup() model = model.half() - partition_flag = args.distplan == "zero2" + partition_flag = (args.distplan == "zero2") optimizer = torch.optim.Adam(model.parameters(), lr=0.01) - optimizer = LowLevelZeroOptimizer(optimizer, - reduce_bucket_size=12 * 1024 * 1024, - overlap_communication=True, - partition_grad=partition_flag, - verbose=True) + + optimizer = LowLevelZeroOptimizer( + optimizer, + pg=pg, + reduce_bucket_size=12 * 1024 * 1024, + overlap_communication=True, + partition_grad=partition_flag, + verbose=True, + ) # model is shared after TP numel = get_model_size(model) diff --git a/tests/test_zero/low_level_zero/test_grad_acc.py b/tests/test_zero/low_level_zero/test_grad_acc.py index c23b3a3e8..a0d1ac531 100644 --- a/tests/test_zero/low_level_zero/test_grad_acc.py +++ b/tests/test_zero/low_level_zero/test_grad_acc.py @@ -9,6 +9,7 @@ from torch.nn.parallel import DistributedDataParallel as DDP from torch.testing import assert_close import colossalai +from colossalai.tensor import ProcessGroup from colossalai.testing.random import seed_all from colossalai.utils import free_port from colossalai.zero import LowLevelZeroOptimizer @@ -34,16 +35,18 @@ def exam_zero_1_2_grad_acc(): # create model zero1_model = TestModel().cuda() zero2_model = copy.deepcopy(zero1_model) - + pg = ProcessGroup() # create optimizer zero1_optimizer = torch.optim.Adam(zero1_model.parameters(), lr=1) zero2_optimizer = torch.optim.Adam(zero2_model.parameters(), lr=1) zero1_optimizer = LowLevelZeroOptimizer(zero1_optimizer, + pg=pg, overlap_communication=True, initial_scale=32, clip_grad_norm=1.0, verbose=True) zero2_optimizer = LowLevelZeroOptimizer(zero2_optimizer, + pg=pg, overlap_communication=True, partition_grad=True, initial_scale=32, @@ -83,7 +86,7 @@ def exam_zero_1_2_grad_acc(): assert torch.equal(z1p.data, z2p.data) -def exam_zero_1_grad_acc(): +def exam_zero_1_grad_acc(use_pg=True): local_rank = torch.distributed.get_rank() grad_scale = 32 seed_all(2008) @@ -92,6 +95,7 @@ def exam_zero_1_grad_acc(): zero_model = TestModel() torch_model = copy.deepcopy(zero_model) + seed_all(2008) zero_model = zero_model.cuda() torch_model = DDP(torch_model.cuda(), bucket_cap_mb=0) @@ -101,7 +105,9 @@ def exam_zero_1_grad_acc(): # we only test stage 1 here # in `check_sharded_param_consistency.py`, we will test whether # level 1 and 2 will produce exactly the same results + pg = ProcessGroup() if use_pg else None #ProcessGroup() zero_optimizer = LowLevelZeroOptimizer(zero_optimizer, + pg=pg, overlap_communication=False, initial_scale=grad_scale, reduce_bucket_size=262144, @@ -152,7 +158,8 @@ def exam_zero_1_grad_acc(): def run_dist(rank, world_size, port): colossalai.launch(config=dict(), rank=rank, world_size=world_size, port=port, host='localhost') - exam_zero_1_grad_acc() + exam_zero_1_grad_acc(True) + exam_zero_1_grad_acc(False) # exam_zero_1_2_grad_acc() diff --git a/tests/test_zero/low_level_zero/test_zero1_2.py b/tests/test_zero/low_level_zero/test_zero1_2.py index b02d3a6a4..6924827fe 100644 --- a/tests/test_zero/low_level_zero/test_zero1_2.py +++ b/tests/test_zero/low_level_zero/test_zero1_2.py @@ -9,6 +9,7 @@ from torch.nn.parallel import DistributedDataParallel as DDP from torch.testing import assert_close import colossalai +from colossalai.tensor import ProcessGroup from colossalai.testing.random import seed_all from colossalai.utils import free_port from colossalai.zero import LowLevelZeroOptimizer @@ -58,14 +59,17 @@ def exam_zero_1_2(): zero1_model = TestModel().cuda() zero2_model = copy.deepcopy(zero1_model) + pg = ProcessGroup() # create optimizer zero1_optimizer = torch.optim.Adam(zero1_model.parameters(), lr=1) zero2_optimizer = torch.optim.Adam(zero2_model.parameters(), lr=1) zero1_optimizer = LowLevelZeroOptimizer(zero1_optimizer, + pg=pg, overlap_communication=True, initial_scale=128, verbose=True) zero2_optimizer = LowLevelZeroOptimizer(zero2_optimizer, + pg=pg, overlap_communication=True, partition_grad=True, initial_scale=128) @@ -127,7 +131,9 @@ def exam_zero_1_torch_ddp(): # we only test stage 1 here # in `check_sharded_param_consistency.py`, we will test whether # level 1 and 2 will produce exactly the same results + pg = ProcessGroup() zero_optimizer = LowLevelZeroOptimizer(zero_optimizer, + pg=pg, overlap_communication=True, initial_scale=1, reduce_bucket_size=262144)