Browse Source

[autoparallel] Patch meta information of `torch.nn.LayerNorm` (#2647)

* [autoparallel] layernorm metainfo patch

* [autoparallel] polish test
pull/2665/head
Boyuan Yao 2 years ago committed by GitHub
parent
commit
0385b26ebf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 55
      colossalai/auto_parallel/meta_profiler/meta_registry/norm.py
  2. 4
      colossalai/auto_parallel/tensor_shard/node_handler/layer_norm_handler.py
  3. 60
      tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_batchnorm_metainfo.py
  4. 41
      tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_matmul_metainfo.py
  5. 131
      tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_norm_metainfo.py
  6. 55
      tests/test_auto_parallel/test_tensor_shard/test_metainfo/utils.py

55
colossalai/auto_parallel/meta_profiler/meta_registry/norm.py

@ -16,7 +16,7 @@ from colossalai.tensor.sharding_spec import ShardingSpec
from ..registry import meta_register
__all__ = ['batchnormnd_meta_info']
__all__ = ['batchnormnd_meta_info', 'layernorm_meta_info']
@meta_register.register(torch.nn.BatchNorm1d)
@ -101,3 +101,56 @@ def batchnormnd_meta_info(*args, **kwargs) -> Tuple[TrainCycleItem, TrainCycleIt
fwd_out = [torch.zeros_like(output_tensor, device='meta')]
return compute_cost, memory_cost, fwd_in, fwd_buffer, fwd_out
@meta_register.register(torch.nn.LayerNorm)
def layernorm_meta_info(*args, **kwargs) -> Tuple[TrainCycleItem, TrainCycleItem, List[torch.Tensor]]:
"""LayerNorm meta information
Returns:
Tuple[TrainCycleItem, TrainCycleItem, List[torch.Tensor]]: compute cost, memory cost and forward inputs
"""
# construct needed tensors
input_tensor = next(filter(lambda x: x.type == OperationDataType.ARG, args)).data
output_tensor = next(filter(lambda x: x.type == OperationDataType.OUTPUT, args)).data
weight_tensor = next(filter(lambda x: x.name == "weight", args)).data
bias_tensor = next(filter(lambda x: x.name == "bias", args)).data
running_mean = torch.rand(input_tensor.shape[0], 1, device='meta')
running_var = torch.rand(input_tensor.shape[0], 1, device='meta')
# construct args
fwd_in_args = [input_tensor, [input_tensor.shape[0]], weight_tensor]
fwd_out_args = [output_tensor]
bwd_in_args = [input_tensor, output_tensor, [input_tensor.shape[0]]]
bwd_out_args = [weight_tensor, bias_tensor]
# compute cost
fwd_compute_cost = flop_mapping[torch.ops.aten.native_layer_norm.default](fwd_in_args, fwd_out_args)
bwd_compute_cost = flop_mapping[torch.ops.aten.native_layer_norm_backward.default](bwd_in_args, bwd_out_args)
compute_cost = TrainCycleItem(fwd=fwd_compute_cost, bwd=bwd_compute_cost, total=fwd_compute_cost + bwd_compute_cost)
# memory cost
# NOTE: currently in SPMD solver we always believe that there will be a new tensor created in forward
fwd_memory_cost = MemoryCost(activation=activation_size([input_tensor, output_tensor, weight_tensor, bias_tensor]),
parameter=activation_size([weight_tensor, bias_tensor]),
temp=0,
buffer=activation_size([running_mean, running_var]))
bwd_memory_cost = MemoryCost(activation=activation_size([input_tensor, weight_tensor, bias_tensor]),
parameter=activation_size([weight_tensor, bias_tensor]),
temp=activation_size([running_mean, running_var]),
buffer=activation_size([running_mean, running_var]))
total_cost = MemoryCost(activation=fwd_memory_cost.activation + bwd_memory_cost.activation,
parameter=fwd_memory_cost.parameter + bwd_memory_cost.parameter,
temp=fwd_memory_cost.temp + bwd_memory_cost.temp,
buffer=fwd_memory_cost.buffer + bwd_memory_cost.buffer)
memory_cost = TrainCycleItem(fwd=fwd_memory_cost, bwd=bwd_memory_cost, total=total_cost)
# store fwd_in, fwd_buffer, fwd_out
fwd_in = [torch.zeros_like(input_tensor, device='meta')]
fwd_buffer = [torch.zeros_like(running_mean, device='meta'), torch.zeros_like(running_var, device='meta')]
fwd_out = [torch.zeros_like(output_tensor, device='meta')]
return compute_cost, memory_cost, fwd_in, fwd_buffer, fwd_out

4
colossalai/auto_parallel/tensor_shard/node_handler/layer_norm_handler.py

@ -3,7 +3,7 @@ from typing import Dict, List
import torch
from ..sharding_strategy import OperationData, OperationDataType
from .node_handler import ModuleHandler
from .node_handler import MetaInfoModuleHandler, ModuleHandler
from .registry import operator_registry
from .strategy import LayerNormGenerator, StrategyGenerator
@ -11,7 +11,7 @@ __all__ = ['LayerNormModuleHandler']
@operator_registry.register(torch.nn.LayerNorm)
class LayerNormModuleHandler(ModuleHandler):
class LayerNormModuleHandler(MetaInfoModuleHandler):
"""
A LayerNormModuleHandler which deals with the sharding strategies for nn.LayerNorm module.
"""

60
tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_batchnorm_metainfo.py

@ -1,60 +0,0 @@
from functools import partial
import pytest
import torch
import torch.multiprocessing as mp
import torch.nn as nn
from colossalai.device.device_mesh import DeviceMesh
from colossalai.fx import ColoGraphModule, ColoTracer
from colossalai.initialize import launch
from colossalai.logging import disable_existing_loggers
from colossalai.testing.pytest_wrapper import run_on_environment_flag
from colossalai.testing.utils import parameterize, rerun_if_address_is_in_use
from colossalai.utils import free_port
from tests.test_auto_parallel.test_tensor_shard.test_metainfo.utils import mem_test_for_node_strategy
def _batchnorm_module_mem_test(rank, world_size, port):
"""This function is for batchnorm memory test
Test and print real memory cost and estimated, this test will not be executed except with the tag AUTO_PARALLEL
Args:
rank: device rank
bias: indicate whether conv module need bias
world_size: number of devices
port: port for initializing process group
"""
disable_existing_loggers()
launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
model = nn.Sequential(nn.BatchNorm2d(128)).cuda()
input = torch.rand(4, 128, 64, 64).cuda()
input.requires_grad = True
physical_mesh_id = torch.arange(0, 4)
mesh_shape = (2, 2)
device_mesh = DeviceMesh(physical_mesh_id, mesh_shape, init_process_group=True)
# index of target node in computation graph
node_index = 1
# total number of target node strategies
strategy_number = 9
mem_test_for_node_strategy(rank=rank,
model=model,
device_mesh=device_mesh,
node_index=node_index,
strategy_number=strategy_number,
input_args=[input],
meta_arg_names=['input'])
@run_on_environment_flag(name='AUTO_PARALLEL')
@pytest.mark.dist
@rerun_if_address_is_in_use()
def test_batchnorm_meta_concrete_info_match():
world_size = 4
run_func_module = partial(_batchnorm_module_mem_test, world_size=world_size, port=free_port())
mp.spawn(run_func_module, nprocs=world_size)
if __name__ == '__main__':
test_batchnorm_meta_concrete_info_match()

41
tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_matmul_metainfo.py

@ -21,7 +21,7 @@ from colossalai.logging import disable_existing_loggers
from colossalai.testing.pytest_wrapper import run_on_environment_flag
from colossalai.testing.utils import parameterize, rerun_if_address_is_in_use
from colossalai.utils import free_port
from tests.test_auto_parallel.test_tensor_shard.test_metainfo.utils import mem_test_for_node_strategy
from tests.test_auto_parallel.test_tensor_shard.test_metainfo.utils import print_results
if torch.__version__ >= '1.12.0':
from colossalai.auto_parallel.meta_profiler import MetaInfo, meta_register
@ -102,43 +102,8 @@ def test_matmul_function_meta_info(tensor_shapes):
compute_cost: TrainCycleItem
memory_cost: TrainCycleItem
print("=====================")
print(f"input shapes: {tensor_shapes[0]}, {tensor_shapes[1]}")
print(f"output shapes: {output_tensor.shape}")
# estimated results
print("Estimated Results")
# compute cost
print("compute_cost:")
print(f" fwd: {compute_cost.fwd}")
print(f" bwd: {compute_cost.bwd}")
# memory cost
print("memory_cost:")
# fwd
print(f" fwd activation: {memory_cost.fwd.activation / 1024} KB")
print(f" fwd buffer: {memory_cost.fwd.buffer / 1024} KB")
print(f" fwd temp: {memory_cost.fwd.temp / 1024} KB")
print(f" fwd parameter: {memory_cost.fwd.parameter / 1024} KB")
# bwd
print(f" bwd activation: {memory_cost.bwd.activation / 1024} KB")
print(f" bwd buffer: {memory_cost.bwd.buffer / 1024} KB")
print(f" bwd temp: {memory_cost.bwd.temp / 1024} KB")
print(f" bwd parameter: {memory_cost.bwd.parameter / 1024} KB")
# actual results
print("Actual Results")
print("memory_cost:")
# fwd
print(f" fwd allocated: {fwd_allocated / 1024} KB")
print(f" fwd peak: {fwd_peak / 1024} KB")
# bwd
print(f" bwd allocated: {bwd_allocated / 1024} KB")
print(f" bwd peak: {bwd_peak / 1024} KB")
print_results([input_real_tensor, other_real_tensor], [output_real_tensor], compute_cost, memory_cost,
fwd_allocated, fwd_peak, bwd_allocated, bwd_peak)
if __name__ == '__main__':

131
tests/test_auto_parallel/test_tensor_shard/test_metainfo/test_norm_metainfo.py

@ -0,0 +1,131 @@
from functools import partial
import pytest
import torch
import torch.multiprocessing as mp
import torch.nn as nn
from colossalai.auto_parallel.tensor_shard.sharding_strategy import (
MemoryCost,
OperationData,
OperationDataType,
ShardingStrategy,
StrategiesVector,
TrainCycleItem,
)
from colossalai.device.device_mesh import DeviceMesh
from colossalai.fx import ColoGraphModule, ColoTracer
from colossalai.initialize import launch
from colossalai.logging import disable_existing_loggers
from colossalai.testing.pytest_wrapper import run_on_environment_flag
from colossalai.testing.utils import parameterize, rerun_if_address_is_in_use
from colossalai.utils import free_port
from tests.test_auto_parallel.test_tensor_shard.test_metainfo.utils import mem_test_for_node_strategy, print_results
if torch.__version__ >= '1.12.0':
from colossalai.auto_parallel.meta_profiler import MetaInfo, meta_register
def _batchnorm_module_mem_test(rank, world_size, port):
"""This function is for batchnorm memory test
Test and print real memory cost and estimated, this test will not be executed except with the tag AUTO_PARALLEL
Args:
rank: device rank
bias: indicate whether conv module need bias
world_size: number of devices
port: port for initializing process group
"""
disable_existing_loggers()
launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
model = nn.Sequential(nn.BatchNorm2d(128)).cuda()
input = torch.rand(4, 128, 64, 64).cuda()
input.requires_grad = True
physical_mesh_id = torch.arange(0, 4)
mesh_shape = (2, 2)
device_mesh = DeviceMesh(physical_mesh_id, mesh_shape, init_process_group=True)
# index of target node in computation graph
node_index = 1
# total number of target node strategies
strategy_number = 9
mem_test_for_node_strategy(rank=rank,
model=model,
device_mesh=device_mesh,
node_index=node_index,
strategy_number=strategy_number,
input_args=[input],
meta_arg_names=['input'])
@run_on_environment_flag(name='AUTO_PARALLEL')
@pytest.mark.dist
@rerun_if_address_is_in_use()
def test_batchnorm_meta_concrete_info_match():
world_size = 4
run_func_module = partial(_batchnorm_module_mem_test, world_size=world_size, port=free_port())
mp.spawn(run_func_module, nprocs=world_size)
@pytest.mark.skipif(torch.__version__ < '1.12.0', reason='need pytorch 1.12.0 or higher for aten level operations')
@parameterize('tensor_shape', [
[256, 1024],
[1024, 256],
])
def test_layernorm_meta_info(tensor_shape):
meta_func = meta_register.get(torch.nn.LayerNorm)
# construct input
input_tensor = torch.rand(*tensor_shape, device="meta")
output_tensor = torch.rand(*tensor_shape, device="meta")
weight_tensor = torch.rand(tensor_shape[1], device="meta")
bias_tensor = torch.rand(tensor_shape[1], device="meta")
# construct operation data
input_data = OperationData(name="input", type=OperationDataType.ARG, data=input_tensor)
output_data = OperationData(name="output", type=OperationDataType.OUTPUT, data=output_tensor)
weight_data = OperationData(name="weight", type=OperationDataType.PARAM, data=weight_tensor)
bias_data = OperationData(name="bias", type=OperationDataType.PARAM, data=bias_tensor)
# construct args and kwargs
args = [input_data, output_data, weight_data, bias_data]
kwargs = {'inplace': False}
# estimated results
compute_cost, memory_cost, fwd_in, fwd_buffer, fwd_out = meta_func(*args, **kwargs)
# actual results
input_real_tensor = torch.rand(*tensor_shape, device="cuda:0")
input_real_tensor.requires_grad = True
ln_module = torch.nn.LayerNorm(tensor_shape[1]).cuda()
# fwd
torch.cuda.reset_peak_memory_stats()
mem_stamp0 = torch.cuda.memory_allocated()
output_real_tensor = ln_module(input_real_tensor)
fwd_allocated = torch.cuda.memory_allocated() - mem_stamp0
fwd_peak = torch.cuda.max_memory_allocated() - mem_stamp0
# bwd
upstream_grad = torch.rand_like(output_real_tensor)
torch.cuda.reset_peak_memory_stats()
mem_stamp0 = torch.cuda.memory_allocated()
torch.autograd.backward(output_real_tensor, upstream_grad)
bwd_allocated = torch.cuda.memory_allocated() - mem_stamp0
bwd_peak = torch.cuda.max_memory_allocated() - mem_stamp0
compute_cost: TrainCycleItem
memory_cost: TrainCycleItem
print_results([input_real_tensor], [output_real_tensor], compute_cost, memory_cost, fwd_allocated, fwd_peak,
bwd_allocated, bwd_peak)
if __name__ == '__main__':
test_batchnorm_meta_concrete_info_match()
test_layernorm_meta_info()

55
tests/test_auto_parallel/test_tensor_shard/test_metainfo/utils.py

@ -7,7 +7,7 @@ from torch.fx import GraphModule
from colossalai.auto_parallel.passes.runtime_apply_pass import runtime_apply_pass
from colossalai.auto_parallel.passes.runtime_preparation_pass import runtime_preparation_pass
from colossalai.auto_parallel.tensor_shard.sharding_strategy import OperationDataType
from colossalai.auto_parallel.tensor_shard.sharding_strategy import OperationDataType, TrainCycleItem
from colossalai.auto_parallel.tensor_shard.solver import SolverOptions, StrategiesConstructor
from colossalai.device.device_mesh import DeviceMesh
from colossalai.fx.tracer.tracer import ColoTracer
@ -126,3 +126,56 @@ def mem_test_for_node_strategy(rank: int,
f"backward temp: {metainfo.memory_cost.bwd.temp / 1024} kb, backward buffer: {metainfo.memory_cost.bwd.buffer / 1024} kb"
)
print("=======================")
def print_results(input: List[torch.Tensor], output: List[torch.Tensor], compute_cost: TrainCycleItem,
memory_cost: TrainCycleItem, fwd_allocated, fwd_peak, bwd_allocated, bwd_peak):
"""Print the results of the meta information test.
Args:
input (List[torch.Tensor]): input tensors
output (List[torch.Tensor]): output tensors
compute_cost (TrainCycleItem): compute cost estimated by meta_func
memory_cost (TrainCycleItem): memory cost estimated by meta_func
fwd_allocated: real forward memory allocated
fwd_peak: real forward peak memory stats
bwd_allocated: real backward memory allocated
bwd_peak: real backward peak memory stats
"""
print("=====================")
print(f"input shapes: {[tensor.shape for tensor in input]}")
print(f"output shapes: {[tensor.shape for tensor in output]}")
# estimated results
print("Estimated Results")
# compute cost
print("compute_cost:")
print(f" fwd: {compute_cost.fwd}")
print(f" bwd: {compute_cost.bwd}")
# memory cost
print("memory_cost:")
# fwd
print(f" fwd activation: {memory_cost.fwd.activation / 1024} KB")
print(f" fwd buffer: {memory_cost.fwd.buffer / 1024} KB")
print(f" fwd temp: {memory_cost.fwd.temp / 1024} KB")
print(f" fwd parameter: {memory_cost.fwd.parameter / 1024} KB")
# bwd
print(f" bwd activation: {memory_cost.bwd.activation / 1024} KB")
print(f" bwd buffer: {memory_cost.bwd.buffer / 1024} KB")
print(f" bwd temp: {memory_cost.bwd.temp / 1024} KB")
print(f" bwd parameter: {memory_cost.bwd.parameter / 1024} KB")
# actual results
print("Actual Results")
print("memory_cost:")
# fwd
print(f" fwd allocated: {fwd_allocated / 1024} KB")
print(f" fwd peak: {fwd_peak / 1024} KB")
# bwd
print(f" bwd allocated: {bwd_allocated / 1024} KB")
print(f" bwd peak: {bwd_peak / 1024} KB")

Loading…
Cancel
Save