diff --git a/tests/components_to_test/__init__.py b/tests/components_to_test/__init__.py index b7f82db83..8fc7ea097 100644 --- a/tests/components_to_test/__init__.py +++ b/tests/components_to_test/__init__.py @@ -1,2 +1,11 @@ -from . import bert, gpt, inline_op_model, nested_model, no_leaf_module, repeated_computed_layer, resnet, simple_net +from . import ( + bert, + gpt2, + hanging_param_model, + inline_op_model, + nested_model, + repeated_computed_layer, + resnet, + simple_net, +) from .utils import run_fwd_bwd diff --git a/tests/components_to_test/gpt.py b/tests/components_to_test/gpt2.py similarity index 100% rename from tests/components_to_test/gpt.py rename to tests/components_to_test/gpt2.py diff --git a/tests/components_to_test/no_leaf_module.py b/tests/components_to_test/hanging_param_model.py similarity index 79% rename from tests/components_to_test/no_leaf_module.py rename to tests/components_to_test/hanging_param_model.py index 47dcecd36..329a08ea2 100644 --- a/tests/components_to_test/no_leaf_module.py +++ b/tests/components_to_test/hanging_param_model.py @@ -8,9 +8,10 @@ from .registry import non_distributed_component_funcs from .utils.dummy_data_generator import DummyDataGenerator -class NoLeafModule(CheckpointModule): +class HangingParamModule(CheckpointModule): """ - In this no-leaf module, it has subordinate nn.modules and a nn.Parameter. + Hanging Parameter: a parameter dose not belong to a leaf Module. + It has subordinate nn.modules and a nn.Parameter. """ def __init__(self, checkpoint=False) -> None: @@ -34,11 +35,11 @@ class DummyDataLoader(DummyDataGenerator): return data, label -@non_distributed_component_funcs.register(name='no_leaf_module') +@non_distributed_component_funcs.register(name='hanging_param_model') def get_training_components(): def model_builder(checkpoint=False): - return NoLeafModule(checkpoint) + return HangingParamModule(checkpoint) trainloader = DummyDataLoader() testloader = DummyDataLoader() diff --git a/tests/test_gemini/test_mem_tracer.py b/tests/test_gemini/test_mem_tracer.py index cb95cc783..c777308c1 100644 --- a/tests/test_gemini/test_mem_tracer.py +++ b/tests/test_gemini/test_mem_tracer.py @@ -14,7 +14,7 @@ from tests.components_to_test.registry import non_distributed_component_funcs def run_tracer(rank, world_size, port, use_grad_check=True): colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - test_models = ['repeated_computed_layers', 'resnet18', 'no_leaf_module', 'bert'] + test_models = ['repeated_computed_layers', 'resnet18', 'hanging_param_model', 'bert'] # test_models = ['bert'] for model_name in test_models: get_components_func = non_distributed_component_funcs.get_callable(model_name) diff --git a/tests/test_gemini/test_param_op.py b/tests/test_gemini/test_param_op.py index 60a0833cf..daf386d6d 100644 --- a/tests/test_gemini/test_param_op.py +++ b/tests/test_gemini/test_param_op.py @@ -50,7 +50,7 @@ def run_model(model, inputs, label, criterion, use_param_hook=False): def test_base_param_hook(): - test_models = ['repeated_computed_layers', 'resnet18', 'no_leaf_module', 'inline_op_model'] + test_models = ['repeated_computed_layers', 'resnet18', 'hanging_param_model', 'inline_op_model'] # test_models = ['bert'] for model_name in test_models: diff --git a/tests/test_gemini/update/test_optim.py b/tests/test_gemini/update/test_optim.py index 5789d2991..93164995d 100644 --- a/tests/test_gemini/update/test_optim.py +++ b/tests/test_gemini/update/test_optim.py @@ -41,7 +41,7 @@ def check_param(model: ZeroDDP, torch_model: torch.nn.Module): # 'gpt2', 'bert', -TEST_MODELS = ['no_leaf_module', 'gpt2', 'bert', 'simple_net', 'nested_model', 'repeated_computed_layers'] +TEST_MODELS = ['hanging_param_model', 'gpt2', 'bert', 'simple_net', 'nested_model', 'repeated_computed_layers'] @parameterize('placement_policy', ['cuda', 'cpu', 'auto', 'const']) diff --git a/tests/test_moe/test_moe_zero_model.py b/tests/test_moe/test_moe_zero_model.py index 37e8a4bab..d608ebf07 100644 --- a/tests/test_moe/test_moe_zero_model.py +++ b/tests/test_moe/test_moe_zero_model.py @@ -1,77 +1,75 @@ -from functools import partial - -import colossalai -import pytest -import torch -import torch.multiprocessing as mp - -from colossalai.nn import MoeLoss -from colossalai.testing import parameterize, rerun_if_address_is_in_use -from colossalai.utils import free_port -from colossalai.zero.init_ctx import ZeroInitContext -from colossalai.zero.shard_utils import (BucketTensorShardStrategy, TensorShardStrategy) -from colossalai.zero.sharded_model import ShardedModelV2 -from colossalai.zero.sharded_model._utils import cast_tensor_to_fp16 -from colossalai.zero.sharded_model.utils import col_model_deepcopy -from tests.components_to_test.registry import non_distributed_component_funcs -from colossalai.engine.gradient_handler import MoeGradientHandler -from colossalai.context import MOE_CONTEXT -from colossalai.testing import assert_equal_in_group - -from tests.test_zero.common import CONFIG, check_grads_padding, run_fwd_bwd -from tests.test_moe.test_moe_zero_init import MoeModel - - -@parameterize("enable_autocast", [False]) -@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) -def run_model_test(enable_autocast, shard_strategy_class): - shard_strategy = shard_strategy_class() - - get_components_func = non_distributed_component_funcs.get_callable('no_leaf_module') - _, train_dataloader, _, optimizer_class, _ = get_components_func() - criterion = MoeLoss(aux_weight=0.01, loss_fn=torch.nn.CrossEntropyLoss) - - with ZeroInitContext(target_device=torch.device('cuda', torch.cuda.current_device()), - shard_strategy=shard_strategy, - shard_param=True): - zero_model = MoeModel(checkpoint=True) - zero_model = ShardedModelV2(zero_model, shard_strategy) - - # check whether parameters are identical in ddp - for name, p in zero_model.named_parameters(): - if not p.colo_attr.param_is_sharded and p.colo_attr.is_replicated: - assert_equal_in_group(p.colo_attr.data_payload) - - model = MoeModel(checkpoint=True).half() - col_model_deepcopy(zero_model, model) - model = model.cuda() - grad_handler = MoeGradientHandler(model) - - for i, (data, label) in enumerate(train_dataloader): - if i > 5: - break - - data, label = cast_tensor_to_fp16(data).cuda(), label.cuda() - run_fwd_bwd(model, data, label, criterion, enable_autocast) - run_fwd_bwd(zero_model, data, label, criterion, enable_autocast) - grad_handler.handle_gradient() - - check_grads_padding(model, zero_model, loose=True) - - -def run_dist(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - MOE_CONTEXT.setup(seed=42) - run_model_test() - - -@pytest.mark.dist -@pytest.mark.parametrize("world_size", [2]) -@rerun_if_address_is_in_use() -def test_moe_zero_model(world_size): - run_func = partial(run_dist, world_size=world_size, port=free_port()) - mp.spawn(run_func, nprocs=world_size) - - -if __name__ == '__main__': - test_moe_zero_model(world_size=2) +from functools import partial + +import pytest +import torch +import torch.multiprocessing as mp + +import colossalai +from colossalai.context import MOE_CONTEXT +from colossalai.engine.gradient_handler import MoeGradientHandler +from colossalai.nn import MoeLoss +from colossalai.testing import assert_equal_in_group, parameterize, rerun_if_address_is_in_use +from colossalai.utils import free_port +from colossalai.zero.init_ctx import ZeroInitContext +from colossalai.zero.shard_utils import BucketTensorShardStrategy, TensorShardStrategy +from colossalai.zero.sharded_model import ShardedModelV2 +from colossalai.zero.sharded_model._utils import cast_tensor_to_fp16 +from colossalai.zero.sharded_model.utils import col_model_deepcopy +from tests.components_to_test.registry import non_distributed_component_funcs +from tests.test_moe.test_moe_zero_init import MoeModel +from tests.test_zero.common import CONFIG, check_grads_padding, run_fwd_bwd + + +@parameterize("enable_autocast", [False]) +@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) +def run_model_test(enable_autocast, shard_strategy_class): + shard_strategy = shard_strategy_class() + + get_components_func = non_distributed_component_funcs.get_callable('hanging_param_model') + _, train_dataloader, _, optimizer_class, _ = get_components_func() + criterion = MoeLoss(aux_weight=0.01, loss_fn=torch.nn.CrossEntropyLoss) + + with ZeroInitContext(target_device=torch.device('cuda', torch.cuda.current_device()), + shard_strategy=shard_strategy, + shard_param=True): + zero_model = MoeModel(checkpoint=True) + zero_model = ShardedModelV2(zero_model, shard_strategy) + + # check whether parameters are identical in ddp + for name, p in zero_model.named_parameters(): + if not p.colo_attr.param_is_sharded and p.colo_attr.is_replicated: + assert_equal_in_group(p.colo_attr.data_payload) + + model = MoeModel(checkpoint=True).half() + col_model_deepcopy(zero_model, model) + model = model.cuda() + grad_handler = MoeGradientHandler(model) + + for i, (data, label) in enumerate(train_dataloader): + if i > 5: + break + + data, label = cast_tensor_to_fp16(data).cuda(), label.cuda() + run_fwd_bwd(model, data, label, criterion, enable_autocast) + run_fwd_bwd(zero_model, data, label, criterion, enable_autocast) + grad_handler.handle_gradient() + + check_grads_padding(model, zero_model, loose=True) + + +def run_dist(rank, world_size, port): + colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + MOE_CONTEXT.setup(seed=42) + run_model_test() + + +@pytest.mark.dist +@pytest.mark.parametrize("world_size", [2]) +@rerun_if_address_is_in_use() +def test_moe_zero_model(world_size): + run_func = partial(run_dist, world_size=world_size, port=free_port()) + mp.spawn(run_func, nprocs=world_size) + + +if __name__ == '__main__': + test_moe_zero_model(world_size=2) diff --git a/tests/test_moe/test_moe_zero_optim.py b/tests/test_moe/test_moe_zero_optim.py index da67b7610..9d9a7bd17 100644 --- a/tests/test_moe/test_moe_zero_optim.py +++ b/tests/test_moe/test_moe_zero_optim.py @@ -1,126 +1,124 @@ -from functools import partial - -import colossalai -import pytest -import torch -import torch.multiprocessing as mp -from colossalai.amp import convert_to_apex_amp -from colossalai.nn import MoeLoss -from colossalai.nn.optimizer import CPUAdam -from colossalai.testing import parameterize, rerun_if_address_is_in_use -from colossalai.utils import free_port -from colossalai.zero.init_ctx import ZeroInitContext -from colossalai.zero.shard_utils import (BucketTensorShardStrategy, TensorShardStrategy) -from colossalai.zero.sharded_model import ShardedModelV2 -from colossalai.zero.sharded_model.utils import col_model_deepcopy -from colossalai.zero.sharded_optim import ShardedOptimizerV2 -from colossalai.zero.sharded_optim._utils import has_inf_or_nan -from colossalai.utils import get_current_device -from tests.components_to_test.registry import non_distributed_component_funcs -from colossalai.engine.gradient_handler import MoeGradientHandler -from colossalai.context import MOE_CONTEXT -from colossalai.testing import assert_equal_in_group - -from tests.test_zero.common import CONFIG, check_sharded_model_params -from tests.test_moe.test_moe_zero_init import MoeModel - - -def _run_step(model, optimizer, data, label, criterion, grad_handler): - model.train() - optimizer.zero_grad() - - if criterion: - y = model(data) - loss = criterion(y, label) - else: - loss = model(data, label) - - loss = loss.float() - if isinstance(model, ShardedModelV2): - optimizer.backward(loss) - else: - loss.backward() - - if grad_handler is not None: - grad_handler.handle_gradient() - - optimizer.step() - - -@parameterize("cpu_offload", [True]) -@parameterize("use_cpuadam", [True]) # We do not use Hybrid Adam right now, since it has a little bug -@parameterize("reuse_fp16_shard", [True, False]) -@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) -def _run_test_sharded_optim_v2(cpu_offload, - shard_strategy_class, - use_cpuadam, - reuse_fp16_shard, - gpu_margin_mem_ratio=0.0): - shard_strategy = shard_strategy_class() - if use_cpuadam and cpu_offload is False: - return - MOE_CONTEXT.reset_loss() - get_components_func = non_distributed_component_funcs.get_callable('no_leaf_module') - _, train_dataloader, _, optimizer_class, _ = get_components_func() - criterion = MoeLoss(aux_weight=0.01, loss_fn=torch.nn.CrossEntropyLoss) - - with ZeroInitContext(target_device=torch.device('cpu') if cpu_offload else get_current_device(), - shard_strategy=shard_strategy, - shard_param=True): - zero_model = MoeModel(checkpoint=True) - - zero_model = ShardedModelV2(zero_model, - shard_strategy, - tensor_placement_policy='cpu' if cpu_offload else 'cuda', - reuse_fp16_shard=reuse_fp16_shard) - - # check whether parameters are identical in ddp - for name, p in zero_model.named_parameters(): - if not p.colo_attr.param_is_sharded and p.colo_attr.is_replicated: - assert_equal_in_group(p.colo_attr.data_payload.to(get_current_device())) - - model = MoeModel(checkpoint=True).half() - col_model_deepcopy(zero_model, model) - model = model.cuda().float() - - if use_cpuadam: - optimizer_class = CPUAdam - optim = optimizer_class(model.parameters(), lr=1e-3) - sharded_optim = optimizer_class(zero_model.parameters(), lr=1e-3) - sharded_optim = ShardedOptimizerV2(zero_model, - sharded_optim, - initial_scale=2**5, - gpu_margin_mem_ratio=gpu_margin_mem_ratio) - - amp_config = dict(opt_level='O2', keep_batchnorm_fp32=False) - apex_model, apex_optimizer = convert_to_apex_amp(model, optim, amp_config) - apex_grad_handler = MoeGradientHandler(model) - - for i, (data, label) in enumerate(train_dataloader): - if i > 5: - break - data, label = data.cuda(), label.cuda() - _run_step(apex_model, apex_optimizer, data, label, criterion, apex_grad_handler) - _run_step(zero_model, sharded_optim, data, label, criterion, None) - check_sharded_model_params(model, zero_model, loose=True, reuse_fp16_shard=use_cpuadam) - for param in model.parameters(): - assert not has_inf_or_nan(param) - - -def _run_dist(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - MOE_CONTEXT.setup(seed=42) - _run_test_sharded_optim_v2() - - -# use_cpuadam = True can be used with cpu_offload = False -@pytest.mark.dist -@pytest.mark.parametrize("world_size", [2]) -@rerun_if_address_is_in_use() -def test_moe_zero_optim(world_size): - run_func = partial(_run_dist, world_size=world_size, port=free_port()) - mp.spawn(run_func, nprocs=world_size) - - -if __name__ == '__main__': - test_moe_zero_optim(world_size=4) +from functools import partial + +import pytest +import torch +import torch.multiprocessing as mp + +import colossalai +from colossalai.amp import convert_to_apex_amp +from colossalai.context import MOE_CONTEXT +from colossalai.engine.gradient_handler import MoeGradientHandler +from colossalai.nn import MoeLoss +from colossalai.nn.optimizer import CPUAdam +from colossalai.testing import assert_equal_in_group, parameterize, rerun_if_address_is_in_use +from colossalai.utils import free_port, get_current_device +from colossalai.zero.init_ctx import ZeroInitContext +from colossalai.zero.shard_utils import BucketTensorShardStrategy, TensorShardStrategy +from colossalai.zero.sharded_model import ShardedModelV2 +from colossalai.zero.sharded_model.utils import col_model_deepcopy +from colossalai.zero.sharded_optim import ShardedOptimizerV2 +from colossalai.zero.sharded_optim._utils import has_inf_or_nan +from tests.components_to_test.registry import non_distributed_component_funcs +from tests.test_moe.test_moe_zero_init import MoeModel +from tests.test_zero.common import CONFIG, check_sharded_model_params + + +def _run_step(model, optimizer, data, label, criterion, grad_handler): + model.train() + optimizer.zero_grad() + + if criterion: + y = model(data) + loss = criterion(y, label) + else: + loss = model(data, label) + + loss = loss.float() + if isinstance(model, ShardedModelV2): + optimizer.backward(loss) + else: + loss.backward() + + if grad_handler is not None: + grad_handler.handle_gradient() + + optimizer.step() + + +@parameterize("cpu_offload", [True]) +@parameterize("use_cpuadam", [True]) # We do not use Hybrid Adam right now, since it has a little bug +@parameterize("reuse_fp16_shard", [True, False]) +@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) +def _run_test_sharded_optim_v2(cpu_offload, + shard_strategy_class, + use_cpuadam, + reuse_fp16_shard, + gpu_margin_mem_ratio=0.0): + shard_strategy = shard_strategy_class() + if use_cpuadam and cpu_offload is False: + return + MOE_CONTEXT.reset_loss() + get_components_func = non_distributed_component_funcs.get_callable('hanging_param_model') + _, train_dataloader, _, optimizer_class, _ = get_components_func() + criterion = MoeLoss(aux_weight=0.01, loss_fn=torch.nn.CrossEntropyLoss) + + with ZeroInitContext(target_device=torch.device('cpu') if cpu_offload else get_current_device(), + shard_strategy=shard_strategy, + shard_param=True): + zero_model = MoeModel(checkpoint=True) + + zero_model = ShardedModelV2(zero_model, + shard_strategy, + tensor_placement_policy='cpu' if cpu_offload else 'cuda', + reuse_fp16_shard=reuse_fp16_shard) + + # check whether parameters are identical in ddp + for name, p in zero_model.named_parameters(): + if not p.colo_attr.param_is_sharded and p.colo_attr.is_replicated: + assert_equal_in_group(p.colo_attr.data_payload.to(get_current_device())) + + model = MoeModel(checkpoint=True).half() + col_model_deepcopy(zero_model, model) + model = model.cuda().float() + + if use_cpuadam: + optimizer_class = CPUAdam + optim = optimizer_class(model.parameters(), lr=1e-3) + sharded_optim = optimizer_class(zero_model.parameters(), lr=1e-3) + sharded_optim = ShardedOptimizerV2(zero_model, + sharded_optim, + initial_scale=2**5, + gpu_margin_mem_ratio=gpu_margin_mem_ratio) + + amp_config = dict(opt_level='O2', keep_batchnorm_fp32=False) + apex_model, apex_optimizer = convert_to_apex_amp(model, optim, amp_config) + apex_grad_handler = MoeGradientHandler(model) + + for i, (data, label) in enumerate(train_dataloader): + if i > 5: + break + data, label = data.cuda(), label.cuda() + _run_step(apex_model, apex_optimizer, data, label, criterion, apex_grad_handler) + _run_step(zero_model, sharded_optim, data, label, criterion, None) + check_sharded_model_params(model, zero_model, loose=True, reuse_fp16_shard=use_cpuadam) + for param in model.parameters(): + assert not has_inf_or_nan(param) + + +def _run_dist(rank, world_size, port): + colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + MOE_CONTEXT.setup(seed=42) + _run_test_sharded_optim_v2() + + +# use_cpuadam = True can be used with cpu_offload = False +@pytest.mark.dist +@pytest.mark.parametrize("world_size", [2]) +@rerun_if_address_is_in_use() +def test_moe_zero_optim(world_size): + run_func = partial(_run_dist, world_size=world_size, port=free_port()) + mp.spawn(run_func, nprocs=world_size) + + +if __name__ == '__main__': + test_moe_zero_optim(world_size=4) diff --git a/tests/test_zero/test_shard_model_v2.py b/tests/test_zero/test_shard_model_v2.py index d77a78e8e..95a9dee38 100644 --- a/tests/test_zero/test_shard_model_v2.py +++ b/tests/test_zero/test_shard_model_v2.py @@ -23,7 +23,7 @@ from tests.components_to_test.registry import non_distributed_component_funcs @parameterize("enable_autocast", [True]) @parameterize("shard_strategy_class", [BucketTensorShardStrategy]) def run_model_test(enable_autocast, shard_strategy_class): - test_models = ['repeated_computed_layers', 'resnet18', 'bert', 'no_leaf_module'] + test_models = ['repeated_computed_layers', 'resnet18', 'bert', 'hanging_param_model'] shard_strategy = shard_strategy_class() for model_name in test_models: get_components_func = non_distributed_component_funcs.get_callable(model_name) diff --git a/tests/test_zero/test_sharded_optim_v2.py b/tests/test_zero/test_sharded_optim_v2.py index 2b42a7128..221915167 100644 --- a/tests/test_zero/test_sharded_optim_v2.py +++ b/tests/test_zero/test_sharded_optim_v2.py @@ -1,25 +1,25 @@ from functools import partial -import colossalai -from colossalai.utils.cuda import get_current_device import pytest import torch import torch.distributed as dist import torch.multiprocessing as mp +from common import CONFIG, check_sharded_model_params +from torch.nn.parallel import DistributedDataParallel as DDP + +import colossalai from colossalai.amp import convert_to_apex_amp from colossalai.nn.optimizer import CPUAdam from colossalai.testing import parameterize, rerun_if_address_is_in_use from colossalai.utils import free_port +from colossalai.utils.cuda import get_current_device from colossalai.zero.init_ctx import ZeroInitContext -from colossalai.zero.shard_utils import (BucketTensorShardStrategy, TensorShardStrategy) +from colossalai.zero.shard_utils import BucketTensorShardStrategy, TensorShardStrategy from colossalai.zero.sharded_model import ShardedModelV2 from colossalai.zero.sharded_model.utils import col_model_deepcopy from colossalai.zero.sharded_optim import ShardedOptimizerV2 from colossalai.zero.sharded_optim._utils import has_inf_or_nan from tests.components_to_test.registry import non_distributed_component_funcs -from torch.nn.parallel import DistributedDataParallel as DDP - -from common import CONFIG, check_sharded_model_params def _run_step(model, optimizer, data, label, criterion, enable_autocast=False): @@ -45,7 +45,7 @@ def _run_step(model, optimizer, data, label, criterion, enable_autocast=False): @parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) @parameterize("gpu_margin_mem_ratio", [0.0, 0.7]) def _run_test_sharded_optim_v2(cpu_offload, shard_strategy_class, use_cpuadam, gpu_margin_mem_ratio): - test_models = ['repeated_computed_layers', 'resnet18', 'bert', 'no_leaf_module'] + test_models = ['repeated_computed_layers', 'resnet18', 'bert', 'hanging_param_model'] shard_strategy = shard_strategy_class() if use_cpuadam and cpu_offload is False: