diff --git a/colossalai/auto_parallel/meta_profiler/meta_registry/norm.py b/colossalai/auto_parallel/meta_profiler/meta_registry/norm.py index 9b34332db..3a1db396e 100644 --- a/colossalai/auto_parallel/meta_profiler/meta_registry/norm.py +++ b/colossalai/auto_parallel/meta_profiler/meta_registry/norm.py @@ -16,7 +16,7 @@ from colossalai.tensor.sharding_spec import ShardingSpec from ..registry import meta_register -__all__ = ['batchnormnd_meta_info'] +__all__ = ['batchnormnd_meta_info', 'layernorm_meta_info'] @meta_register.register(torch.nn.BatchNorm1d) @@ -101,3 +101,56 @@ def batchnormnd_meta_info(*args, **kwargs) -> Tuple[TrainCycleItem, TrainCycleIt fwd_out = [torch.zeros_like(output_tensor, device='meta')] return compute_cost, memory_cost, fwd_in, fwd_buffer, fwd_out + + +@meta_register.register(torch.nn.LayerNorm) +def layernorm_meta_info(*args, **kwargs) -> Tuple[TrainCycleItem, TrainCycleItem, List[torch.Tensor]]: + """LayerNorm meta information + + Returns: + Tuple[TrainCycleItem, TrainCycleItem, List[torch.Tensor]]: compute cost, memory cost and forward inputs + """ + # construct needed tensors + input_tensor = next(filter(lambda x: x.type == OperationDataType.ARG, args)).data + output_tensor = next(filter(lambda x: x.type == OperationDataType.OUTPUT, args)).data + weight_tensor = next(filter(lambda x: x.name == "weight", args)).data + bias_tensor = next(filter(lambda x: x.name == "bias", args)).data + running_mean = torch.rand(input_tensor.shape[0], 1, device='meta') + running_var = torch.rand(input_tensor.shape[0], 1, device='meta') + + # construct args + fwd_in_args = [input_tensor, [input_tensor.shape[0]], weight_tensor] + fwd_out_args = [output_tensor] + bwd_in_args = [input_tensor, output_tensor, [input_tensor.shape[0]]] + bwd_out_args = [weight_tensor, bias_tensor] + + # compute cost + fwd_compute_cost = flop_mapping[torch.ops.aten.native_layer_norm.default](fwd_in_args, fwd_out_args) + bwd_compute_cost = flop_mapping[torch.ops.aten.native_layer_norm_backward.default](bwd_in_args, bwd_out_args) + compute_cost = TrainCycleItem(fwd=fwd_compute_cost, bwd=bwd_compute_cost, total=fwd_compute_cost + bwd_compute_cost) + + # memory cost + # NOTE: currently in SPMD solver we always believe that there will be a new tensor created in forward + fwd_memory_cost = MemoryCost(activation=activation_size([input_tensor, output_tensor, weight_tensor, bias_tensor]), + parameter=activation_size([weight_tensor, bias_tensor]), + temp=0, + buffer=activation_size([running_mean, running_var])) + + bwd_memory_cost = MemoryCost(activation=activation_size([input_tensor, weight_tensor, bias_tensor]), + parameter=activation_size([weight_tensor, bias_tensor]), + temp=activation_size([running_mean, running_var]), + buffer=activation_size([running_mean, running_var])) + + total_cost = MemoryCost(activation=fwd_memory_cost.activation + bwd_memory_cost.activation, + parameter=fwd_memory_cost.parameter + bwd_memory_cost.parameter, + temp=fwd_memory_cost.temp + bwd_memory_cost.temp, + buffer=fwd_memory_cost.buffer + bwd_memory_cost.buffer) + + memory_cost = TrainCycleItem(fwd=fwd_memory_cost, bwd=bwd_memory_cost, total=total_cost) + + # store fwd_in, fwd_buffer, fwd_out + fwd_in = [torch.zeros_like(input_tensor, device='meta')] + fwd_buffer = [torch.zeros_like(running_mean, device='meta'), torch.zeros_like(running_var, device='meta')] + fwd_out = [torch.zeros_like(output_tensor, device='meta')] + + return compute_cost, memory_cost, fwd_in, fwd_buffer, fwd_out diff --git a/colossalai/auto_parallel/tensor_shard/node_handler/layer_norm_handler.py b/colossalai/auto_parallel/tensor_shard/node_handler/layer_norm_handler.py index 132ac30da..452381169 100644 --- a/colossalai/auto_parallel/tensor_shard/node_handler/layer_norm_handler.py +++ b/colossalai/auto_parallel/tensor_shard/node_handler/layer_norm_handler.py @@ -3,7 +3,7 @@ from typing import Dict, List import torch from ..sharding_strategy import OperationData, OperationDataType -from .node_handler import ModuleHandler +from .node_handler import MetaInfoModuleHandler, ModuleHandler from .registry import operator_registry from .strategy import LayerNormGenerator, StrategyGenerator @@ -11,7 +11,7 @@ __all__ = ['LayerNormModuleHandler'] @operator_registry.register(torch.nn.LayerNorm) -class LayerNormModuleHandler(ModuleHandler): +class LayerNormModuleHandler(MetaInfoModuleHandler): """ A LayerNormModuleHandler which deals with the sharding strategies for nn.LayerNorm module. """ diff --git a/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_batchnorm_metainfo.py b/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_batchnorm_metainfo.py deleted file mode 100644 index 826c74666..000000000 --- a/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_batchnorm_metainfo.py +++ /dev/null @@ -1,60 +0,0 @@ -from functools import partial - -import pytest -import torch -import torch.multiprocessing as mp -import torch.nn as nn - -from colossalai.device.device_mesh import DeviceMesh -from colossalai.fx import ColoGraphModule, ColoTracer -from colossalai.initialize import launch -from colossalai.logging import disable_existing_loggers -from colossalai.testing.pytest_wrapper import run_on_environment_flag -from colossalai.testing.utils import parameterize, rerun_if_address_is_in_use -from colossalai.utils import free_port -from tests.test_auto_parallel.test_tensor_shard.test_metainfo.utils import mem_test_for_node_strategy - - -def _batchnorm_module_mem_test(rank, world_size, port): - """This function is for batchnorm memory test - Test and print real memory cost and estimated, this test will not be executed except with the tag AUTO_PARALLEL - - Args: - rank: device rank - bias: indicate whether conv module need bias - world_size: number of devices - port: port for initializing process group - """ - disable_existing_loggers() - launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - model = nn.Sequential(nn.BatchNorm2d(128)).cuda() - input = torch.rand(4, 128, 64, 64).cuda() - input.requires_grad = True - physical_mesh_id = torch.arange(0, 4) - mesh_shape = (2, 2) - device_mesh = DeviceMesh(physical_mesh_id, mesh_shape, init_process_group=True) - - # index of target node in computation graph - node_index = 1 - # total number of target node strategies - strategy_number = 9 - mem_test_for_node_strategy(rank=rank, - model=model, - device_mesh=device_mesh, - node_index=node_index, - strategy_number=strategy_number, - input_args=[input], - meta_arg_names=['input']) - - -@run_on_environment_flag(name='AUTO_PARALLEL') -@pytest.mark.dist -@rerun_if_address_is_in_use() -def test_batchnorm_meta_concrete_info_match(): - world_size = 4 - run_func_module = partial(_batchnorm_module_mem_test, world_size=world_size, port=free_port()) - mp.spawn(run_func_module, nprocs=world_size) - - -if __name__ == '__main__': - test_batchnorm_meta_concrete_info_match() diff --git a/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_matmul_metainfo.py b/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_matmul_metainfo.py index 3fb9c3d85..fd29c63fb 100644 --- a/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_matmul_metainfo.py +++ b/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_matmul_metainfo.py @@ -21,7 +21,7 @@ from colossalai.logging import disable_existing_loggers from colossalai.testing.pytest_wrapper import run_on_environment_flag from colossalai.testing.utils import parameterize, rerun_if_address_is_in_use from colossalai.utils import free_port -from tests.test_auto_parallel.test_tensor_shard.test_metainfo.utils import mem_test_for_node_strategy +from tests.test_auto_parallel.test_tensor_shard.test_metainfo.utils import print_results if torch.__version__ >= '1.12.0': from colossalai.auto_parallel.meta_profiler import MetaInfo, meta_register @@ -102,43 +102,8 @@ def test_matmul_function_meta_info(tensor_shapes): compute_cost: TrainCycleItem memory_cost: TrainCycleItem - print("=====================") - print(f"input shapes: {tensor_shapes[0]}, {tensor_shapes[1]}") - print(f"output shapes: {output_tensor.shape}") - - # estimated results - print("Estimated Results") - - # compute cost - print("compute_cost:") - print(f" fwd: {compute_cost.fwd}") - print(f" bwd: {compute_cost.bwd}") - - # memory cost - print("memory_cost:") - # fwd - print(f" fwd activation: {memory_cost.fwd.activation / 1024} KB") - print(f" fwd buffer: {memory_cost.fwd.buffer / 1024} KB") - print(f" fwd temp: {memory_cost.fwd.temp / 1024} KB") - print(f" fwd parameter: {memory_cost.fwd.parameter / 1024} KB") - - # bwd - print(f" bwd activation: {memory_cost.bwd.activation / 1024} KB") - print(f" bwd buffer: {memory_cost.bwd.buffer / 1024} KB") - print(f" bwd temp: {memory_cost.bwd.temp / 1024} KB") - print(f" bwd parameter: {memory_cost.bwd.parameter / 1024} KB") - - # actual results - print("Actual Results") - - print("memory_cost:") - # fwd - print(f" fwd allocated: {fwd_allocated / 1024} KB") - print(f" fwd peak: {fwd_peak / 1024} KB") - - # bwd - print(f" bwd allocated: {bwd_allocated / 1024} KB") - print(f" bwd peak: {bwd_peak / 1024} KB") + print_results([input_real_tensor, other_real_tensor], [output_real_tensor], compute_cost, memory_cost, + fwd_allocated, fwd_peak, bwd_allocated, bwd_peak) if __name__ == '__main__': diff --git a/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_norm_metainfo.py b/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_norm_metainfo.py new file mode 100644 index 000000000..9d3ab9c82 --- /dev/null +++ b/tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_norm_metainfo.py @@ -0,0 +1,131 @@ +from functools import partial + +import pytest +import torch +import torch.multiprocessing as mp +import torch.nn as nn + +from colossalai.auto_parallel.tensor_shard.sharding_strategy import ( + MemoryCost, + OperationData, + OperationDataType, + ShardingStrategy, + StrategiesVector, + TrainCycleItem, +) +from colossalai.device.device_mesh import DeviceMesh +from colossalai.fx import ColoGraphModule, ColoTracer +from colossalai.initialize import launch +from colossalai.logging import disable_existing_loggers +from colossalai.testing.pytest_wrapper import run_on_environment_flag +from colossalai.testing.utils import parameterize, rerun_if_address_is_in_use +from colossalai.utils import free_port +from tests.test_auto_parallel.test_tensor_shard.test_metainfo.utils import mem_test_for_node_strategy, print_results + +if torch.__version__ >= '1.12.0': + from colossalai.auto_parallel.meta_profiler import MetaInfo, meta_register + + +def _batchnorm_module_mem_test(rank, world_size, port): + """This function is for batchnorm memory test + Test and print real memory cost and estimated, this test will not be executed except with the tag AUTO_PARALLEL + + Args: + rank: device rank + bias: indicate whether conv module need bias + world_size: number of devices + port: port for initializing process group + """ + disable_existing_loggers() + launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + model = nn.Sequential(nn.BatchNorm2d(128)).cuda() + input = torch.rand(4, 128, 64, 64).cuda() + input.requires_grad = True + physical_mesh_id = torch.arange(0, 4) + mesh_shape = (2, 2) + device_mesh = DeviceMesh(physical_mesh_id, mesh_shape, init_process_group=True) + + # index of target node in computation graph + node_index = 1 + # total number of target node strategies + strategy_number = 9 + mem_test_for_node_strategy(rank=rank, + model=model, + device_mesh=device_mesh, + node_index=node_index, + strategy_number=strategy_number, + input_args=[input], + meta_arg_names=['input']) + + +@run_on_environment_flag(name='AUTO_PARALLEL') +@pytest.mark.dist +@rerun_if_address_is_in_use() +def test_batchnorm_meta_concrete_info_match(): + world_size = 4 + run_func_module = partial(_batchnorm_module_mem_test, world_size=world_size, port=free_port()) + mp.spawn(run_func_module, nprocs=world_size) + + +@pytest.mark.skipif(torch.__version__ < '1.12.0', reason='need pytorch 1.12.0 or higher for aten level operations') +@parameterize('tensor_shape', [ + [256, 1024], + [1024, 256], +]) +def test_layernorm_meta_info(tensor_shape): + meta_func = meta_register.get(torch.nn.LayerNorm) + + # construct input + input_tensor = torch.rand(*tensor_shape, device="meta") + output_tensor = torch.rand(*tensor_shape, device="meta") + weight_tensor = torch.rand(tensor_shape[1], device="meta") + bias_tensor = torch.rand(tensor_shape[1], device="meta") + + # construct operation data + input_data = OperationData(name="input", type=OperationDataType.ARG, data=input_tensor) + + output_data = OperationData(name="output", type=OperationDataType.OUTPUT, data=output_tensor) + + weight_data = OperationData(name="weight", type=OperationDataType.PARAM, data=weight_tensor) + + bias_data = OperationData(name="bias", type=OperationDataType.PARAM, data=bias_tensor) + + # construct args and kwargs + args = [input_data, output_data, weight_data, bias_data] + kwargs = {'inplace': False} + + # estimated results + compute_cost, memory_cost, fwd_in, fwd_buffer, fwd_out = meta_func(*args, **kwargs) + + # actual results + input_real_tensor = torch.rand(*tensor_shape, device="cuda:0") + + input_real_tensor.requires_grad = True + + ln_module = torch.nn.LayerNorm(tensor_shape[1]).cuda() + + # fwd + torch.cuda.reset_peak_memory_stats() + mem_stamp0 = torch.cuda.memory_allocated() + output_real_tensor = ln_module(input_real_tensor) + fwd_allocated = torch.cuda.memory_allocated() - mem_stamp0 + fwd_peak = torch.cuda.max_memory_allocated() - mem_stamp0 + + # bwd + upstream_grad = torch.rand_like(output_real_tensor) + torch.cuda.reset_peak_memory_stats() + mem_stamp0 = torch.cuda.memory_allocated() + torch.autograd.backward(output_real_tensor, upstream_grad) + bwd_allocated = torch.cuda.memory_allocated() - mem_stamp0 + bwd_peak = torch.cuda.max_memory_allocated() - mem_stamp0 + + compute_cost: TrainCycleItem + memory_cost: TrainCycleItem + + print_results([input_real_tensor], [output_real_tensor], compute_cost, memory_cost, fwd_allocated, fwd_peak, + bwd_allocated, bwd_peak) + + +if __name__ == '__main__': + test_batchnorm_meta_concrete_info_match() + test_layernorm_meta_info() diff --git a/tests/test_auto_parallel/test_tensor_shard/test_metainfo/utils.py b/tests/test_auto_parallel/test_tensor_shard/test_metainfo/utils.py index 17eb75fad..b8c01d358 100644 --- a/tests/test_auto_parallel/test_tensor_shard/test_metainfo/utils.py +++ b/tests/test_auto_parallel/test_tensor_shard/test_metainfo/utils.py @@ -7,7 +7,7 @@ from torch.fx import GraphModule from colossalai.auto_parallel.passes.runtime_apply_pass import runtime_apply_pass from colossalai.auto_parallel.passes.runtime_preparation_pass import runtime_preparation_pass -from colossalai.auto_parallel.tensor_shard.sharding_strategy import OperationDataType +from colossalai.auto_parallel.tensor_shard.sharding_strategy import OperationDataType, TrainCycleItem from colossalai.auto_parallel.tensor_shard.solver import SolverOptions, StrategiesConstructor from colossalai.device.device_mesh import DeviceMesh from colossalai.fx.tracer.tracer import ColoTracer @@ -126,3 +126,56 @@ def mem_test_for_node_strategy(rank: int, f"backward temp: {metainfo.memory_cost.bwd.temp / 1024} kb, backward buffer: {metainfo.memory_cost.bwd.buffer / 1024} kb" ) print("=======================") + + +def print_results(input: List[torch.Tensor], output: List[torch.Tensor], compute_cost: TrainCycleItem, + memory_cost: TrainCycleItem, fwd_allocated, fwd_peak, bwd_allocated, bwd_peak): + """Print the results of the meta information test. + + Args: + input (List[torch.Tensor]): input tensors + output (List[torch.Tensor]): output tensors + compute_cost (TrainCycleItem): compute cost estimated by meta_func + memory_cost (TrainCycleItem): memory cost estimated by meta_func + fwd_allocated: real forward memory allocated + fwd_peak: real forward peak memory stats + bwd_allocated: real backward memory allocated + bwd_peak: real backward peak memory stats + """ + print("=====================") + print(f"input shapes: {[tensor.shape for tensor in input]}") + print(f"output shapes: {[tensor.shape for tensor in output]}") + + # estimated results + print("Estimated Results") + + # compute cost + print("compute_cost:") + print(f" fwd: {compute_cost.fwd}") + print(f" bwd: {compute_cost.bwd}") + + # memory cost + print("memory_cost:") + # fwd + print(f" fwd activation: {memory_cost.fwd.activation / 1024} KB") + print(f" fwd buffer: {memory_cost.fwd.buffer / 1024} KB") + print(f" fwd temp: {memory_cost.fwd.temp / 1024} KB") + print(f" fwd parameter: {memory_cost.fwd.parameter / 1024} KB") + + # bwd + print(f" bwd activation: {memory_cost.bwd.activation / 1024} KB") + print(f" bwd buffer: {memory_cost.bwd.buffer / 1024} KB") + print(f" bwd temp: {memory_cost.bwd.temp / 1024} KB") + print(f" bwd parameter: {memory_cost.bwd.parameter / 1024} KB") + + # actual results + print("Actual Results") + + print("memory_cost:") + # fwd + print(f" fwd allocated: {fwd_allocated / 1024} KB") + print(f" fwd peak: {fwd_peak / 1024} KB") + + # bwd + print(f" bwd allocated: {bwd_allocated / 1024} KB") + print(f" bwd peak: {bwd_peak / 1024} KB")