diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ce9887ccf..98ecd0314 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,6 +4,10 @@ repos: hooks: - id: yapf args: ['--style=.style.yapf', '--parallel', '--in-place'] + - repo: https://github.com/pycqa/flake8 + rev: '4.0.1' + hooks: + - id: flake8 - repo: https://github.com/pre-commit/mirrors-clang-format rev: v13.0.1 hooks: diff --git a/colossalai/utils/commons/__init__.py b/colossalai/utils/commons/__init__.py deleted file mode 100644 index e48fad25c..000000000 --- a/colossalai/utils/commons/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .bucket_tensor_copy import BucketizedTensorCopy - -__all__ = ['BucketizedTensorCopy'] diff --git a/colossalai/utils/commons/bucket_tensor_copy.py b/colossalai/utils/commons/bucket_tensor_copy.py deleted file mode 100644 index 6febb9705..000000000 --- a/colossalai/utils/commons/bucket_tensor_copy.py +++ /dev/null @@ -1,61 +0,0 @@ -import torch -from colossalai.zero.sharded_param import ShardedParamV2 -from colossalai.utils import get_current_device -from typing import List - - -class BucketizedTensorCopy(object): - - def __init__( - self, - chunk_size: int, - ): - r""" - torch.nn.Parameter CPU (fp32) -> ShardedParam GPU (fp16) - TODO(jiaruifang) The class is a little bit hardcoded - I will make it more general later. - """ - - self.chunk_size = chunk_size - self._offset = 0 - self._cpu_buffer = torch.empty(chunk_size, dtype=torch.float, device=torch.device("cpu:0"), pin_memory=True) - self._cuda_buffer = torch.empty(chunk_size, - dtype=torch.half, - device=torch.device(f"cuda:{get_current_device()}")) - - self._buffered_param_list: List[ShardedParamV2] = [] - self._numel_list = [] - - def copy(self, src_param: torch.nn.Parameter, target_param: ShardedParamV2): - assert isinstance(target_param, ShardedParamV2) - assert isinstance(src_param, torch.nn.Parameter) - - numel = src_param.numel() - - if self._offset + numel > self.chunk_size: - self.flush() - - assert src_param.data.device.type == 'cpu' - self._cpu_buffer.narrow(0, self._offset, numel).copy_(src_param.data.view(-1)) - - self._buffered_param_list.append(target_param) - self._numel_list.append(numel) - - self._offset += numel - - def flush(self): - """ - flush to cuda memory - """ - self._cuda_buffer.copy_(self._cpu_buffer) - flush_offset = 0 - for sparam, numel in zip(self._buffered_param_list, self._numel_list): - sparam.data.copy_payload(self._cpu_buffer.narrow(0, flush_offset, numel)) - flush_offset += numel - - self.reset() - - def reset(self): - self._buffered_param_list = [] - self._numel_list = [] - self._offset = 0 diff --git a/colossalai/zero/sharded_optim/sharded_optim_v2.py b/colossalai/zero/sharded_optim/sharded_optim_v2.py index d5fb44648..36330c5f6 100644 --- a/colossalai/zero/sharded_optim/sharded_optim_v2.py +++ b/colossalai/zero/sharded_optim/sharded_optim_v2.py @@ -88,19 +88,14 @@ class ShardedOptimizerV2(ColossalaiOptimizer): self.zero_grad() return - # assign master param pointers to p.data. - # We will not trigger data copy here. + # Write master param to p.data for group in self.optim.param_groups: for p in group['params']: p.data = self.master_params[p] # Now p.data is sharded # So optimizer states are sharded naturally - ret = self.optim.step(*args, **kwargs) - - # Copy master param data (fp32) to payload of col_attr (fp16) - # TODO() improve efficiency by gathering tensors into a chunk and transfering - # a chunk. + # Write master param to payload for group in self.optim.param_groups: for p in group['params']: is_param_sharded = p.col_attr.data.is_sharded @@ -113,10 +108,7 @@ class ShardedOptimizerV2(ColossalaiOptimizer): self.shard_strategy.shard([p.col_attr.data]) # We have to use `copy_payload` instead of `reset_payload` # Since p.data is fp32 and p.col_attr.data is fp16 - - # TODO() optimize this line p.col_attr.data.copy_payload(p.data) - if not is_param_sharded: # We gather full fp16 param here self.shard_strategy.gather([p.col_attr.data]) diff --git a/colossalai/zero/sharded_param/sharded_tensor.py b/colossalai/zero/sharded_param/sharded_tensor.py index cde257d77..eaaa8ab99 100644 --- a/colossalai/zero/sharded_param/sharded_tensor.py +++ b/colossalai/zero/sharded_param/sharded_tensor.py @@ -14,6 +14,7 @@ class ShardedTensor(object): self.world_size = dist.get_world_size(self.process_group) self.local_rank = dist.get_rank(self.process_group) self._is_sharded = False + self._payload = tensor self._origin_shape = tensor.shape self._origin_numel = tensor.numel() @@ -40,7 +41,7 @@ class ShardedTensor(object): return self._payload def copy_payload(self, tensor): - self._payload.view(-1).copy_(tensor.view(-1)) + self._payload.copy_(tensor) def reset_payload(self, tensor): del self._payload diff --git a/tests/test_utils/test_bucket_tensor_copy.py b/tests/test_utils/test_bucket_tensor_copy.py deleted file mode 100644 index 198d7b691..000000000 --- a/tests/test_utils/test_bucket_tensor_copy.py +++ /dev/null @@ -1,39 +0,0 @@ -from colossalai.utils.commons import BucketizedTensorCopy -from colossalai.zero.sharded_param import ShardedParamV2 -from colossalai.utils import free_port -import torch -import colossalai - - -def test_bucket_copy(): - # init dist env - colossalai.launch(config={}, rank=0, world_size=1, host='localhost', port=free_port(), backend='nccl') - - copyer = BucketizedTensorCopy(20) - - shape_list = [(2, 3), (5), (8), (12)] - src_param_list = [] - tgt_param_list = [] - for shape in shape_list: - # on CPU - src_param = torch.nn.Parameter(torch.randn(shape, dtype=torch.float, device=torch.device('cpu'))) - print(src_param) - # on GPU - tgt_param = ShardedParamV2(torch.nn.Parameter(torch.ones(shape, dtype=torch.half, device=torch.device('cuda')))) - - src_param_list.append(src_param) - tgt_param_list.append(tgt_param) - - copyer.copy(src_param, tgt_param) - - copyer.flush() - - for src_param, tgt_param in zip(src_param_list, tgt_param_list): - print(tgt_param.data.payload) - diff = src_param.cpu().float() - tgt_param.data.payload.cpu().float() - assert torch.allclose(src_param.cpu().float(), tgt_param.data.payload.cpu().float(), rtol=1e-03, - atol=1e-03), f"diff {diff}" - - -if __name__ == '__main__': - test_bucket_copy()