From c26f21d3651822b14f1e131e5d88696c8d250ead Mon Sep 17 00:00:00 2001 From: Boyuan Yao <70263930+Cypher30@users.noreply.github.com> Date: Fri, 18 Nov 2022 15:13:03 +0800 Subject: [PATCH] [autoparallel] add pooling metainfo (#1968) * [fx] metainfo class for auto parallel * [fx] add unit test for linear metainfo * [fx] fix bwd param for linear * [fx] modify unit test * [fx] modify unit test * [fx] modify import * [fx] modify import * [fx] modify import * [fx] move meta profiler to auto parallel * [fx] add conv metainfo class * [fx] restore profiler * [fx] restore meta profiler * [autoparallel] modify unit test * [fx] modify unit test * [autoparallel] add batchnorm metainfo class * [autoparallel] fix batchnorm unit test function declaration * [fx] restore profiler * [fx] add relu metainfo class * [fx] restore profiler * [autoparallel] modify metainfo input * [autoparallel] add pooling metainfo --- .../meta_profiler/meta_registry/__init__.py | 1 + .../meta_profiler/meta_registry/pooling.py | 127 ++++++++++++++++++ .../test_metainfo/test_activation_metainfo.py | 2 +- .../test_metainfo/test_batchnorm_metainfo.py | 3 +- .../test_metainfo/test_pooling_metainfo.py | 102 ++++++++++++++ 5 files changed, 232 insertions(+), 3 deletions(-) create mode 100644 colossalai/auto_parallel/meta_profiler/meta_registry/pooling.py create mode 100644 tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_pooling_metainfo.py diff --git a/colossalai/auto_parallel/meta_profiler/meta_registry/__init__.py b/colossalai/auto_parallel/meta_profiler/meta_registry/__init__.py index e753e968b..6fca1a2c1 100644 --- a/colossalai/auto_parallel/meta_profiler/meta_registry/__init__.py +++ b/colossalai/auto_parallel/meta_profiler/meta_registry/__init__.py @@ -2,3 +2,4 @@ from .activation import * from .conv import * from .linear import * from .norm import * +from .pooling import * diff --git a/colossalai/auto_parallel/meta_profiler/meta_registry/pooling.py b/colossalai/auto_parallel/meta_profiler/meta_registry/pooling.py new file mode 100644 index 000000000..a77b9c75f --- /dev/null +++ b/colossalai/auto_parallel/meta_profiler/meta_registry/pooling.py @@ -0,0 +1,127 @@ +from typing import List, Tuple + +import torch + +from colossalai.auto_parallel.tensor_shard.sharding_strategy import MemoryCost, OperationDataType, TrainCycleItem +from colossalai.fx.profiler.memory_utils import activation_size +from colossalai.fx.profiler.opcount import flop_mapping + +from ..registry import meta_register + +__all__ = ["avgpool_meta_info", "maxpool_meta_info"] + + +@meta_register.register(torch.nn.AdaptiveAvgPool1d) +@meta_register.register(torch.nn.AdaptiveAvgPool2d) +@meta_register.register(torch.nn.AdaptiveAvgPool3d) +def avgpool_meta_info(*args, **kwargs) -> Tuple[TrainCycleItem, TrainCycleItem, List[torch.Tensor]]: + """Meta info for AdaptiveAvgPool + The aten graph of AdaptiveAvgPool is + graph(): + %input_2 : [#users=2] = placeholder[target=placeholder](default=) + %_adaptive_avg_pool2d_default : [#users=1] = call_function[target=torch.ops.aten._adaptive_avg_pool2d.default](args = (%input_2, [None, None]), kwargs = {}) + %zeros_like_default : [#users=1] = call_function[target=torch.ops.aten.zeros_like.default](args = (%_adaptive_avg_pool2d_default,), kwargs = {dtype: None, layout: None, device: None, pin_memory: None}) + %detach_default : [#users=1] = call_function[target=torch.ops.aten.detach.default](args = (%input_2,), kwargs = {}) + %_adaptive_avg_pool2d_backward_default : [#users=1] = call_function[target=torch.ops.aten._adaptive_avg_pool2d_backward.default](args = (%zeros_like_default, %detach_default), kwargs = {}) + %detach_default_1 : [#users=1] = call_function[target=torch.ops.aten.detach.default](args = (%_adaptive_avg_pool2d_backward_default,), kwargs = {}) + %detach_default_2 : [#users=0] = call_function[target=torch.ops.aten.detach.default](args = (%detach_default_1,), kwargs = {}) + + Returns: + Tuple[TrainCycleItem, TrainCycleItem, List[torch.Tensor]]: compute cost, memory cost and forward inputs + """ + + input_tensor = next(filter(lambda x: x.type == OperationDataType.ARG, args)).data + output_tensor = next(filter(lambda x: x.type == OperationDataType.OUTPUT, args)).data + + # construct forward args for flop mapping + fwd_in_args = [input_tensor] + fwd_out_args = [output_tensor] + + # construct backward args for flop mapping + bwd_in_args = [output_tensor] + bwd_out_args = [input_tensor] + + # calculate cost + # the fwd op with compute cost is _adaptive_avg_pool2d.default + # the bwd op with compute cost is _adaptive_avg_pool2d_backward.default + + # calculate compute cost + fwd_compute_cost = flop_mapping[torch.ops.aten._adaptive_avg_pool2d.default](fwd_in_args, fwd_out_args) + bwd_compute_cost = flop_mapping[torch.ops.aten._adaptive_avg_pool2d_backward.default](bwd_in_args, bwd_out_args) + compute_cost = TrainCycleItem(fwd=fwd_compute_cost, bwd=bwd_compute_cost, total=fwd_compute_cost + bwd_compute_cost) + + # calculate memory cost + fwd_mem_cost = MemoryCost(activation=activation_size(output_tensor)) + bwd_mem_cost = MemoryCost(activation=activation_size(input_tensor)) + + # total cost + total_mem_cost = MemoryCost(activation=fwd_mem_cost.activation + bwd_mem_cost.activation) + + mem_cost = TrainCycleItem(fwd=fwd_mem_cost, bwd=bwd_mem_cost, total=total_mem_cost) + + # store_fwd_in + fwd_in = [input_tensor] + + return compute_cost, mem_cost, fwd_in + + +@meta_register.register(torch.nn.MaxPool1d) +@meta_register.register(torch.nn.MaxPool2d) +@meta_register.register(torch.nn.MaxPool3d) +def maxpool_meta_info(*args, **kwargs) -> Tuple[TrainCycleItem, TrainCycleItem, List[torch.Tensor]]: + """Meta info for MaxPool + The aten graph of MaxPool is + graph(): + %input_2 : [#users=2] = placeholder[target=placeholder](default=) + %max_pool2d_with_indices_default : [#users=2] = call_function[target=torch.ops.aten.max_pool2d_with_indices.default](args = (%input_2, [None, None], [None, None]), kwargs = {}) + %zeros_like_default : [#users=1] = call_function[target=torch.ops.aten.zeros_like.default](args = (%max_pool2d_with_indices_default,), kwargs = {dtype: None, layout: None, device: None, pin_memory: None}) + %detach_default : [#users=1] = call_function[target=torch.ops.aten.detach.default](args = (%input_2,), kwargs = {}) + %detach_default_1 : [#users=1] = call_function[target=torch.ops.aten.detach.default](args = (%max_pool2d_with_indices_default,), kwargs = {}) + %max_pool2d_with_indices_backward_default : [#users=1] = call_function[target=torch.ops.aten.max_pool2d_with_indices_backward.default](args = (%zeros_like_default, %detach_default, [None, None], [None, None], [None, None], [None, None], None, %detach_default_1), kwargs = {}) + %detach_default_2 : [#users=1] = call_function[target=torch.ops.aten.detach.default](args = (%max_pool2d_with_indices_backward_default,), kwargs = {}) + %detach_default_3 : [#users=0] = call_function[target=torch.ops.aten.detach.default](args = (%detach_default_2,), kwargs = {}) + + Returns: + Tuple[TrainCycleItem, TrainCycleItem, List[torch.Tensor]]: compute cost, memory cost and forward inputs + """ + + input_tensor = next(filter(lambda x: x.type == OperationDataType.ARG, args)).data + output_tensor = next(filter(lambda x: x.type == OperationDataType.OUTPUT, args)).data + + # construct forward args for flop mapping + fwd_in_args = [input_tensor] + fwd_out_args = [output_tensor] + + # construct backward args for flop mapping + bwd_in_args = [output_tensor] + bwd_out_args = [input_tensor] + + # construct index matrix + index_matrix = torch.zeros_like(output_tensor, device="meta", dtype=torch.int64) + + # calculate cost + # the fwd op with compute cost is max_pool2d_with_indices.default + # the bwd op with compute cost is max_pool2d_with_indices_backward.default + + # calculate compute cost + fwd_compute_cost = flop_mapping[torch.ops.aten.max_pool2d_with_indices.default](fwd_in_args, fwd_out_args) + bwd_compute_cost = flop_mapping[torch.ops.aten.max_pool2d_with_indices_backward.default](bwd_in_args, bwd_out_args) + compute_cost = TrainCycleItem(fwd=fwd_compute_cost, bwd=bwd_compute_cost, total=fwd_compute_cost + bwd_compute_cost) + + # calculate memory cost + # NOTE: the index matrix will be discarded in backward phase + fwd_mem_cost = MemoryCost(activation=activation_size(output_tensor) + activation_size(index_matrix)) + + # temp memory for backward is the index matrix to be discarded + bwd_mem_cost = MemoryCost(activation=activation_size(input_tensor) - activation_size(index_matrix), + temp=activation_size(index_matrix)) + + # total cost + total_mem_cost = MemoryCost(activation=fwd_mem_cost.activation + bwd_mem_cost.activation, temp=bwd_mem_cost.temp) + + mem_cost = TrainCycleItem(fwd=fwd_mem_cost, bwd=bwd_mem_cost, total=total_mem_cost) + + # store_fwd_in + fwd_in = [input_tensor] + + return compute_cost, mem_cost, fwd_in diff --git a/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_activation_metainfo.py b/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_activation_metainfo.py index ff64927b8..57dddc518 100644 --- a/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_activation_metainfo.py +++ b/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_activation_metainfo.py @@ -16,7 +16,7 @@ from tests.test_auto_parallel.test_tensor_shard.test_metainfo.utils import mem_t def _ReLU_module_mem_test(rank, world_size, port): - """This function is for conv memory test + """This function is for ReLU memory test Test and print real memory cost and estimated, this test will not be executed except with the tag AUTO_PARALLEL Args: diff --git a/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_batchnorm_metainfo.py b/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_batchnorm_metainfo.py index b63d333ba..9cc3d9b6a 100644 --- a/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_batchnorm_metainfo.py +++ b/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_batchnorm_metainfo.py @@ -16,10 +16,9 @@ from tests.test_auto_parallel.test_tensor_shard.test_metainfo.utils import mem_t def _batchnorm_module_mem_test(rank, world_size, port): - """This function is for conv memory test + """This function is for batchnorm memory test Test and print real memory cost and estimated, this test will not be executed except with the tag AUTO_PARALLEL - Args: Args: rank: device rank bias: indicate whether conv module need bias diff --git a/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_pooling_metainfo.py b/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_pooling_metainfo.py new file mode 100644 index 000000000..33f158569 --- /dev/null +++ b/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_pooling_metainfo.py @@ -0,0 +1,102 @@ +from functools import partial + +import pytest +import torch +import torch.multiprocessing as mp +import torch.nn as nn + +from colossalai.device.device_mesh import DeviceMesh +from colossalai.fx import ColoGraphModule, ColoTracer +from colossalai.initialize import launch +from colossalai.logging import disable_existing_loggers +from colossalai.testing.pytest_wrapper import run_on_environment_flag +from colossalai.testing.utils import parameterize, rerun_if_address_is_in_use +from colossalai.utils import free_port +from tests.test_auto_parallel.test_tensor_shard.test_metainfo.utils import mem_test_for_node_strategy + + +def _adaptiveavgpool_module_mem_test(rank, world_size, port): + """This function is for AdaptiveAvgPool memory test + Test and print real memory cost and estimated, this test will not be executed except with the tag AUTO_PARALLEL + + Args: + rank: device rank + bias: indicate whether conv module need bias + world_size: number of devices + port: port for initializing process group + """ + disable_existing_loggers() + launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + model = nn.Sequential(nn.AdaptiveAvgPool2d((16, 16))).cuda() + input = torch.rand(4, 128, 64, 64).cuda() + input.requires_grad = True + physical_mesh_id = torch.arange(0, 4) + mesh_shape = (2, 2) + device_mesh = DeviceMesh(physical_mesh_id, mesh_shape, init_process_group=True) + + # index of conv node in computation graph + node_index = 1 + # total number of conv strategies + strategy_number = 1 + mem_test_for_node_strategy(rank=rank, + model=model, + device_mesh=device_mesh, + node_index=node_index, + strategy_number=strategy_number, + input_args=[input], + meta_arg_names=['input']) + + +@run_on_environment_flag(name='AUTO_PARALLEL') +@pytest.mark.dist +@rerun_if_address_is_in_use() +def test_adaptiveavgpool_meta_concrete_info_match(): + world_size = 4 + run_func_module = partial(_adaptiveavgpool_module_mem_test, world_size=world_size, port=free_port()) + mp.spawn(run_func_module, nprocs=world_size) + + +def _maxpool_module_mem_test(rank, world_size, port): + """This function is for MaxPool memory test + Test and print real memory cost and estimated, this test will not be executed except with the tag AUTO_PARALLEL + + Args: + rank: device rank + bias: indicate whether conv module need bias + world_size: number of devices + port: port for initializing process group + """ + disable_existing_loggers() + launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + model = nn.Sequential(nn.MaxPool2d((16, 16))).cuda() + input = torch.rand(4, 128, 64, 64).cuda() + input.requires_grad = True + physical_mesh_id = torch.arange(0, 4) + mesh_shape = (2, 2) + device_mesh = DeviceMesh(physical_mesh_id, mesh_shape, init_process_group=True) + + # index of conv node in computation graph + node_index = 1 + # total number of conv strategies + strategy_number = 9 + mem_test_for_node_strategy(rank=rank, + model=model, + device_mesh=device_mesh, + node_index=node_index, + strategy_number=strategy_number, + input_args=[input], + meta_arg_names=['input']) + + +@run_on_environment_flag(name='AUTO_PARALLEL') +@pytest.mark.dist +@rerun_if_address_is_in_use() +def test_maxpool_meta_concrete_info_match(): + world_size = 4 + run_func_module = partial(_maxpool_module_mem_test, world_size=world_size, port=free_port()) + mp.spawn(run_func_module, nprocs=world_size) + + +if __name__ == '__main__': + test_adaptiveavgpool_meta_concrete_info_match() + test_maxpool_meta_concrete_info_match()