From c9e7d9582dc2ff0d4ac9fc33f8772cb3a7a765a4 Mon Sep 17 00:00:00 2001 From: Jiarui Fang Date: Fri, 4 Mar 2022 15:35:07 +0800 Subject: [PATCH] [zero] polish shard strategy (#310) * init shard param from shape tuple * add more unitest for shard param * add set_payload method for ShardedParam * [zero] add shareded tensor class * polish code * add shard stratgy * move shard and gather logic to shard strategy from shard tensor. * polish code --- .../zero/shard_utils/tensor_shard_strategy.py | 37 +++++++++++++-- .../zero/sharded_param/sharded_tensor.py | 46 ++++++++----------- .../test_shard_param.py | 8 ++-- 3 files changed, 56 insertions(+), 35 deletions(-) diff --git a/colossalai/zero/shard_utils/tensor_shard_strategy.py b/colossalai/zero/shard_utils/tensor_shard_strategy.py index 2c8f3c904..e2a964392 100644 --- a/colossalai/zero/shard_utils/tensor_shard_strategy.py +++ b/colossalai/zero/shard_utils/tensor_shard_strategy.py @@ -1,7 +1,11 @@ -from colossalai.zero.shard_utils import BaseShardStrategy +import torch import torch.distributed as dist + from typing import List, Optional + +from colossalai.zero.shard_utils import BaseShardStrategy from colossalai.zero.sharded_param.sharded_tensor import ShardedTensor +from colossalai.zero.sharded_model._zero3_utils import get_shard class TensorShardStrategy(BaseShardStrategy): @@ -11,8 +15,35 @@ class TensorShardStrategy(BaseShardStrategy): def shard(self, tensor_list: List[ShardedTensor]): for t in tensor_list: - t.shard() + self._shard_tensor(t) def gather(self, tensor_list: List[ShardedTensor]): for t in tensor_list: - t.gather() + self._gather_tensor(t) + + def _shard_tensor(self, t: ShardedTensor): + if t.is_sharded: + return + sharded_payload, _ = get_shard(t.payload, self.local_rank, self.world_size) + t.reset_payload(sharded_payload) + t.is_sharded = True + + def _gather_tensor(self, t: ShardedTensor): + if not t.is_sharded: + return + + buffer_list = [] + payload_numel = t.payload.numel() + for i in range(self.world_size): + if i == self.local_rank: + buffer_list.append(t.payload.cuda()) + else: + buffer_list.append(torch.zeros(payload_numel).cuda()) + + torch.distributed.all_gather(buffer_list, + buffer_list[self.local_rank], + group=self.process_group, + async_op=False) + gathered_payload = torch.narrow(torch.cat(buffer_list), 0, 0, t.origin_numel).reshape(t.origin_shape) + t.reset_payload(gathered_payload) + t.is_sharded = False diff --git a/colossalai/zero/sharded_param/sharded_tensor.py b/colossalai/zero/sharded_param/sharded_tensor.py index 19e2715d6..823222725 100644 --- a/colossalai/zero/sharded_param/sharded_tensor.py +++ b/colossalai/zero/sharded_param/sharded_tensor.py @@ -1,6 +1,5 @@ import torch import torch.distributed as dist -from colossalai.zero.sharded_model._zero3_utils import get_shard from typing import Optional @@ -21,47 +20,38 @@ class ShardedTensor(object): self._origin_numel = tensor.numel() self._origin_dtype = tensor.dtype + @property + def origin_numel(self): + return self._origin_numel + + @property + def origin_shape(self): + return self._origin_shape + @property def is_sharded(self): return self._is_sharded + @is_sharded.setter + def is_sharded(self, flag: bool): + self._is_sharded = flag + @property def payload(self): return self._payload - @payload.setter - def payload(self, tensor): + def copy_payload(self, tensor): self._payload.copy_(tensor) + def reset_payload(self, tensor): + del self._payload + self._payload = tensor + @property def dtype(self): + assert self._payload.dtype == self._origin_dtype return self._origin_dtype @property def shape(self): return self._payload.shape - - def shard(self): - if self._is_sharded: - return - self._payload, _ = get_shard(self._payload, self.local_rank, self.world_size) - self._is_sharded = True - - def gather(self): - if not self._is_sharded: - return - - buffer_list = [] - payload_numel = self._payload.numel() - for i in range(self.world_size): - if i == self.local_rank: - buffer_list.append(self._payload.cuda()) - else: - buffer_list.append(torch.zeros(payload_numel).cuda()) - - torch.distributed.all_gather(buffer_list, - buffer_list[self.local_rank], - group=self.process_group, - async_op=False) - self._payload = torch.narrow(torch.cat(buffer_list), 0, 0, self._origin_numel).view(self._origin_shape) - self._is_sharded = False diff --git a/tests/test_zero_data_parallel/test_shard_param.py b/tests/test_zero_data_parallel/test_shard_param.py index 4341cf5ff..876dd4953 100644 --- a/tests/test_zero_data_parallel/test_shard_param.py +++ b/tests/test_zero_data_parallel/test_shard_param.py @@ -7,7 +7,6 @@ import colossalai import pytest import torch import torch.multiprocessing as mp - from colossalai.zero.shard_utils import TensorShardStrategy from colossalai.zero.sharded_param import ShardedTensor, ShardedParam from colossalai.utils import free_port @@ -18,15 +17,16 @@ from tests.test_zero_data_parallel.common import Net, CONFIG def run_shard_tensor(rank, world_size, port): colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') t = ShardedTensor(tensor=torch.randn(world_size * 2, 3)) - + assert list(t.origin_shape) == [world_size * 2, 3] assert list(t.shape) == [world_size * 2, 3] + shard_strategy = TensorShardStrategy(process_group=None) # test shard strategy shard_strategy.shard([t]) - assert list(t.shape) == [6] + assert list(t.shape) == [6], f"{list(t.shape)} vs 6" shard_strategy.gather([t]) - assert list(t.shape) == [world_size * 2, 3] + assert list(t.shape) == [world_size * 2, 3], f"{list(t.shape)} vs {[world_size * 2, 3]}" @pytest.mark.dist