[test] align model name with the file name. (#2045)

pull/2047/head
Jiarui Fang 2022-11-30 15:45:26 +08:00 committed by GitHub
parent 31c644027b
commit 1e885329f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 225 additions and 219 deletions

View File

@ -1,2 +1,11 @@
from . import bert, gpt, inline_op_model, nested_model, no_leaf_module, repeated_computed_layer, resnet, simple_net from . import (
bert,
gpt2,
hanging_param_model,
inline_op_model,
nested_model,
repeated_computed_layer,
resnet,
simple_net,
)
from .utils import run_fwd_bwd from .utils import run_fwd_bwd

View File

@ -8,9 +8,10 @@ from .registry import non_distributed_component_funcs
from .utils.dummy_data_generator import DummyDataGenerator from .utils.dummy_data_generator import DummyDataGenerator
class NoLeafModule(CheckpointModule): class HangingParamModule(CheckpointModule):
""" """
In this no-leaf module, it has subordinate nn.modules and a nn.Parameter. Hanging Parameter: a parameter dose not belong to a leaf Module.
It has subordinate nn.modules and a nn.Parameter.
""" """
def __init__(self, checkpoint=False) -> None: def __init__(self, checkpoint=False) -> None:
@ -34,11 +35,11 @@ class DummyDataLoader(DummyDataGenerator):
return data, label return data, label
@non_distributed_component_funcs.register(name='no_leaf_module') @non_distributed_component_funcs.register(name='hanging_param_model')
def get_training_components(): def get_training_components():
def model_builder(checkpoint=False): def model_builder(checkpoint=False):
return NoLeafModule(checkpoint) return HangingParamModule(checkpoint)
trainloader = DummyDataLoader() trainloader = DummyDataLoader()
testloader = DummyDataLoader() testloader = DummyDataLoader()

View File

@ -14,7 +14,7 @@ from tests.components_to_test.registry import non_distributed_component_funcs
def run_tracer(rank, world_size, port, use_grad_check=True): def run_tracer(rank, world_size, port, use_grad_check=True):
colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
test_models = ['repeated_computed_layers', 'resnet18', 'no_leaf_module', 'bert'] test_models = ['repeated_computed_layers', 'resnet18', 'hanging_param_model', 'bert']
# test_models = ['bert'] # test_models = ['bert']
for model_name in test_models: for model_name in test_models:
get_components_func = non_distributed_component_funcs.get_callable(model_name) get_components_func = non_distributed_component_funcs.get_callable(model_name)

View File

@ -50,7 +50,7 @@ def run_model(model, inputs, label, criterion, use_param_hook=False):
def test_base_param_hook(): def test_base_param_hook():
test_models = ['repeated_computed_layers', 'resnet18', 'no_leaf_module', 'inline_op_model'] test_models = ['repeated_computed_layers', 'resnet18', 'hanging_param_model', 'inline_op_model']
# test_models = ['bert'] # test_models = ['bert']
for model_name in test_models: for model_name in test_models:

View File

@ -41,7 +41,7 @@ def check_param(model: ZeroDDP, torch_model: torch.nn.Module):
# 'gpt2', 'bert', # 'gpt2', 'bert',
TEST_MODELS = ['no_leaf_module', 'gpt2', 'bert', 'simple_net', 'nested_model', 'repeated_computed_layers'] TEST_MODELS = ['hanging_param_model', 'gpt2', 'bert', 'simple_net', 'nested_model', 'repeated_computed_layers']
@parameterize('placement_policy', ['cuda', 'cpu', 'auto', 'const']) @parameterize('placement_policy', ['cuda', 'cpu', 'auto', 'const'])

View File

@ -1,77 +1,75 @@
from functools import partial from functools import partial
import colossalai import pytest
import pytest import torch
import torch import torch.multiprocessing as mp
import torch.multiprocessing as mp
import colossalai
from colossalai.nn import MoeLoss from colossalai.context import MOE_CONTEXT
from colossalai.testing import parameterize, rerun_if_address_is_in_use from colossalai.engine.gradient_handler import MoeGradientHandler
from colossalai.utils import free_port from colossalai.nn import MoeLoss
from colossalai.zero.init_ctx import ZeroInitContext from colossalai.testing import assert_equal_in_group, parameterize, rerun_if_address_is_in_use
from colossalai.zero.shard_utils import (BucketTensorShardStrategy, TensorShardStrategy) from colossalai.utils import free_port
from colossalai.zero.sharded_model import ShardedModelV2 from colossalai.zero.init_ctx import ZeroInitContext
from colossalai.zero.sharded_model._utils import cast_tensor_to_fp16 from colossalai.zero.shard_utils import BucketTensorShardStrategy, TensorShardStrategy
from colossalai.zero.sharded_model.utils import col_model_deepcopy from colossalai.zero.sharded_model import ShardedModelV2
from tests.components_to_test.registry import non_distributed_component_funcs from colossalai.zero.sharded_model._utils import cast_tensor_to_fp16
from colossalai.engine.gradient_handler import MoeGradientHandler from colossalai.zero.sharded_model.utils import col_model_deepcopy
from colossalai.context import MOE_CONTEXT from tests.components_to_test.registry import non_distributed_component_funcs
from colossalai.testing import assert_equal_in_group from tests.test_moe.test_moe_zero_init import MoeModel
from tests.test_zero.common import CONFIG, check_grads_padding, run_fwd_bwd
from tests.test_zero.common import CONFIG, check_grads_padding, run_fwd_bwd
from tests.test_moe.test_moe_zero_init import MoeModel
@parameterize("enable_autocast", [False])
@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy])
@parameterize("enable_autocast", [False]) def run_model_test(enable_autocast, shard_strategy_class):
@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) shard_strategy = shard_strategy_class()
def run_model_test(enable_autocast, shard_strategy_class):
shard_strategy = shard_strategy_class() get_components_func = non_distributed_component_funcs.get_callable('hanging_param_model')
_, train_dataloader, _, optimizer_class, _ = get_components_func()
get_components_func = non_distributed_component_funcs.get_callable('no_leaf_module') criterion = MoeLoss(aux_weight=0.01, loss_fn=torch.nn.CrossEntropyLoss)
_, train_dataloader, _, optimizer_class, _ = get_components_func()
criterion = MoeLoss(aux_weight=0.01, loss_fn=torch.nn.CrossEntropyLoss) with ZeroInitContext(target_device=torch.device('cuda', torch.cuda.current_device()),
shard_strategy=shard_strategy,
with ZeroInitContext(target_device=torch.device('cuda', torch.cuda.current_device()), shard_param=True):
shard_strategy=shard_strategy, zero_model = MoeModel(checkpoint=True)
shard_param=True): zero_model = ShardedModelV2(zero_model, shard_strategy)
zero_model = MoeModel(checkpoint=True)
zero_model = ShardedModelV2(zero_model, shard_strategy) # check whether parameters are identical in ddp
for name, p in zero_model.named_parameters():
# check whether parameters are identical in ddp if not p.colo_attr.param_is_sharded and p.colo_attr.is_replicated:
for name, p in zero_model.named_parameters(): assert_equal_in_group(p.colo_attr.data_payload)
if not p.colo_attr.param_is_sharded and p.colo_attr.is_replicated:
assert_equal_in_group(p.colo_attr.data_payload) model = MoeModel(checkpoint=True).half()
col_model_deepcopy(zero_model, model)
model = MoeModel(checkpoint=True).half() model = model.cuda()
col_model_deepcopy(zero_model, model) grad_handler = MoeGradientHandler(model)
model = model.cuda()
grad_handler = MoeGradientHandler(model) for i, (data, label) in enumerate(train_dataloader):
if i > 5:
for i, (data, label) in enumerate(train_dataloader): break
if i > 5:
break data, label = cast_tensor_to_fp16(data).cuda(), label.cuda()
run_fwd_bwd(model, data, label, criterion, enable_autocast)
data, label = cast_tensor_to_fp16(data).cuda(), label.cuda() run_fwd_bwd(zero_model, data, label, criterion, enable_autocast)
run_fwd_bwd(model, data, label, criterion, enable_autocast) grad_handler.handle_gradient()
run_fwd_bwd(zero_model, data, label, criterion, enable_autocast)
grad_handler.handle_gradient() check_grads_padding(model, zero_model, loose=True)
check_grads_padding(model, zero_model, loose=True)
def run_dist(rank, world_size, port):
colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
def run_dist(rank, world_size, port): MOE_CONTEXT.setup(seed=42)
colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') run_model_test()
MOE_CONTEXT.setup(seed=42)
run_model_test()
@pytest.mark.dist
@pytest.mark.parametrize("world_size", [2])
@pytest.mark.dist @rerun_if_address_is_in_use()
@pytest.mark.parametrize("world_size", [2]) def test_moe_zero_model(world_size):
@rerun_if_address_is_in_use() run_func = partial(run_dist, world_size=world_size, port=free_port())
def test_moe_zero_model(world_size): mp.spawn(run_func, nprocs=world_size)
run_func = partial(run_dist, world_size=world_size, port=free_port())
mp.spawn(run_func, nprocs=world_size)
if __name__ == '__main__':
test_moe_zero_model(world_size=2)
if __name__ == '__main__':
test_moe_zero_model(world_size=2)

View File

@ -1,126 +1,124 @@
from functools import partial from functools import partial
import colossalai import pytest
import pytest import torch
import torch import torch.multiprocessing as mp
import torch.multiprocessing as mp
from colossalai.amp import convert_to_apex_amp import colossalai
from colossalai.nn import MoeLoss from colossalai.amp import convert_to_apex_amp
from colossalai.nn.optimizer import CPUAdam from colossalai.context import MOE_CONTEXT
from colossalai.testing import parameterize, rerun_if_address_is_in_use from colossalai.engine.gradient_handler import MoeGradientHandler
from colossalai.utils import free_port from colossalai.nn import MoeLoss
from colossalai.zero.init_ctx import ZeroInitContext from colossalai.nn.optimizer import CPUAdam
from colossalai.zero.shard_utils import (BucketTensorShardStrategy, TensorShardStrategy) from colossalai.testing import assert_equal_in_group, parameterize, rerun_if_address_is_in_use
from colossalai.zero.sharded_model import ShardedModelV2 from colossalai.utils import free_port, get_current_device
from colossalai.zero.sharded_model.utils import col_model_deepcopy from colossalai.zero.init_ctx import ZeroInitContext
from colossalai.zero.sharded_optim import ShardedOptimizerV2 from colossalai.zero.shard_utils import BucketTensorShardStrategy, TensorShardStrategy
from colossalai.zero.sharded_optim._utils import has_inf_or_nan from colossalai.zero.sharded_model import ShardedModelV2
from colossalai.utils import get_current_device from colossalai.zero.sharded_model.utils import col_model_deepcopy
from tests.components_to_test.registry import non_distributed_component_funcs from colossalai.zero.sharded_optim import ShardedOptimizerV2
from colossalai.engine.gradient_handler import MoeGradientHandler from colossalai.zero.sharded_optim._utils import has_inf_or_nan
from colossalai.context import MOE_CONTEXT from tests.components_to_test.registry import non_distributed_component_funcs
from colossalai.testing import assert_equal_in_group from tests.test_moe.test_moe_zero_init import MoeModel
from tests.test_zero.common import CONFIG, check_sharded_model_params
from tests.test_zero.common import CONFIG, check_sharded_model_params
from tests.test_moe.test_moe_zero_init import MoeModel
def _run_step(model, optimizer, data, label, criterion, grad_handler):
model.train()
def _run_step(model, optimizer, data, label, criterion, grad_handler): optimizer.zero_grad()
model.train()
optimizer.zero_grad() if criterion:
y = model(data)
if criterion: loss = criterion(y, label)
y = model(data) else:
loss = criterion(y, label) loss = model(data, label)
else:
loss = model(data, label) loss = loss.float()
if isinstance(model, ShardedModelV2):
loss = loss.float() optimizer.backward(loss)
if isinstance(model, ShardedModelV2): else:
optimizer.backward(loss) loss.backward()
else:
loss.backward() if grad_handler is not None:
grad_handler.handle_gradient()
if grad_handler is not None:
grad_handler.handle_gradient() optimizer.step()
optimizer.step()
@parameterize("cpu_offload", [True])
@parameterize("use_cpuadam", [True]) # We do not use Hybrid Adam right now, since it has a little bug
@parameterize("cpu_offload", [True]) @parameterize("reuse_fp16_shard", [True, False])
@parameterize("use_cpuadam", [True]) # We do not use Hybrid Adam right now, since it has a little bug @parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy])
@parameterize("reuse_fp16_shard", [True, False]) def _run_test_sharded_optim_v2(cpu_offload,
@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) shard_strategy_class,
def _run_test_sharded_optim_v2(cpu_offload, use_cpuadam,
shard_strategy_class, reuse_fp16_shard,
use_cpuadam, gpu_margin_mem_ratio=0.0):
reuse_fp16_shard, shard_strategy = shard_strategy_class()
gpu_margin_mem_ratio=0.0): if use_cpuadam and cpu_offload is False:
shard_strategy = shard_strategy_class() return
if use_cpuadam and cpu_offload is False: MOE_CONTEXT.reset_loss()
return get_components_func = non_distributed_component_funcs.get_callable('hanging_param_model')
MOE_CONTEXT.reset_loss() _, train_dataloader, _, optimizer_class, _ = get_components_func()
get_components_func = non_distributed_component_funcs.get_callable('no_leaf_module') criterion = MoeLoss(aux_weight=0.01, loss_fn=torch.nn.CrossEntropyLoss)
_, train_dataloader, _, optimizer_class, _ = get_components_func()
criterion = MoeLoss(aux_weight=0.01, loss_fn=torch.nn.CrossEntropyLoss) with ZeroInitContext(target_device=torch.device('cpu') if cpu_offload else get_current_device(),
shard_strategy=shard_strategy,
with ZeroInitContext(target_device=torch.device('cpu') if cpu_offload else get_current_device(), shard_param=True):
shard_strategy=shard_strategy, zero_model = MoeModel(checkpoint=True)
shard_param=True):
zero_model = MoeModel(checkpoint=True) zero_model = ShardedModelV2(zero_model,
shard_strategy,
zero_model = ShardedModelV2(zero_model, tensor_placement_policy='cpu' if cpu_offload else 'cuda',
shard_strategy, reuse_fp16_shard=reuse_fp16_shard)
tensor_placement_policy='cpu' if cpu_offload else 'cuda',
reuse_fp16_shard=reuse_fp16_shard) # check whether parameters are identical in ddp
for name, p in zero_model.named_parameters():
# check whether parameters are identical in ddp if not p.colo_attr.param_is_sharded and p.colo_attr.is_replicated:
for name, p in zero_model.named_parameters(): assert_equal_in_group(p.colo_attr.data_payload.to(get_current_device()))
if not p.colo_attr.param_is_sharded and p.colo_attr.is_replicated:
assert_equal_in_group(p.colo_attr.data_payload.to(get_current_device())) model = MoeModel(checkpoint=True).half()
col_model_deepcopy(zero_model, model)
model = MoeModel(checkpoint=True).half() model = model.cuda().float()
col_model_deepcopy(zero_model, model)
model = model.cuda().float() if use_cpuadam:
optimizer_class = CPUAdam
if use_cpuadam: optim = optimizer_class(model.parameters(), lr=1e-3)
optimizer_class = CPUAdam sharded_optim = optimizer_class(zero_model.parameters(), lr=1e-3)
optim = optimizer_class(model.parameters(), lr=1e-3) sharded_optim = ShardedOptimizerV2(zero_model,
sharded_optim = optimizer_class(zero_model.parameters(), lr=1e-3) sharded_optim,
sharded_optim = ShardedOptimizerV2(zero_model, initial_scale=2**5,
sharded_optim, gpu_margin_mem_ratio=gpu_margin_mem_ratio)
initial_scale=2**5,
gpu_margin_mem_ratio=gpu_margin_mem_ratio) amp_config = dict(opt_level='O2', keep_batchnorm_fp32=False)
apex_model, apex_optimizer = convert_to_apex_amp(model, optim, amp_config)
amp_config = dict(opt_level='O2', keep_batchnorm_fp32=False) apex_grad_handler = MoeGradientHandler(model)
apex_model, apex_optimizer = convert_to_apex_amp(model, optim, amp_config)
apex_grad_handler = MoeGradientHandler(model) for i, (data, label) in enumerate(train_dataloader):
if i > 5:
for i, (data, label) in enumerate(train_dataloader): break
if i > 5: data, label = data.cuda(), label.cuda()
break _run_step(apex_model, apex_optimizer, data, label, criterion, apex_grad_handler)
data, label = data.cuda(), label.cuda() _run_step(zero_model, sharded_optim, data, label, criterion, None)
_run_step(apex_model, apex_optimizer, data, label, criterion, apex_grad_handler) check_sharded_model_params(model, zero_model, loose=True, reuse_fp16_shard=use_cpuadam)
_run_step(zero_model, sharded_optim, data, label, criterion, None) for param in model.parameters():
check_sharded_model_params(model, zero_model, loose=True, reuse_fp16_shard=use_cpuadam) assert not has_inf_or_nan(param)
for param in model.parameters():
assert not has_inf_or_nan(param)
def _run_dist(rank, world_size, port):
colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
def _run_dist(rank, world_size, port): MOE_CONTEXT.setup(seed=42)
colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') _run_test_sharded_optim_v2()
MOE_CONTEXT.setup(seed=42)
_run_test_sharded_optim_v2()
# use_cpuadam = True can be used with cpu_offload = False
@pytest.mark.dist
# use_cpuadam = True can be used with cpu_offload = False @pytest.mark.parametrize("world_size", [2])
@pytest.mark.dist @rerun_if_address_is_in_use()
@pytest.mark.parametrize("world_size", [2]) def test_moe_zero_optim(world_size):
@rerun_if_address_is_in_use() run_func = partial(_run_dist, world_size=world_size, port=free_port())
def test_moe_zero_optim(world_size): mp.spawn(run_func, nprocs=world_size)
run_func = partial(_run_dist, world_size=world_size, port=free_port())
mp.spawn(run_func, nprocs=world_size)
if __name__ == '__main__':
test_moe_zero_optim(world_size=4)
if __name__ == '__main__':
test_moe_zero_optim(world_size=4)

View File

@ -23,7 +23,7 @@ from tests.components_to_test.registry import non_distributed_component_funcs
@parameterize("enable_autocast", [True]) @parameterize("enable_autocast", [True])
@parameterize("shard_strategy_class", [BucketTensorShardStrategy]) @parameterize("shard_strategy_class", [BucketTensorShardStrategy])
def run_model_test(enable_autocast, shard_strategy_class): def run_model_test(enable_autocast, shard_strategy_class):
test_models = ['repeated_computed_layers', 'resnet18', 'bert', 'no_leaf_module'] test_models = ['repeated_computed_layers', 'resnet18', 'bert', 'hanging_param_model']
shard_strategy = shard_strategy_class() shard_strategy = shard_strategy_class()
for model_name in test_models: for model_name in test_models:
get_components_func = non_distributed_component_funcs.get_callable(model_name) get_components_func = non_distributed_component_funcs.get_callable(model_name)

View File

@ -1,25 +1,25 @@
from functools import partial from functools import partial
import colossalai
from colossalai.utils.cuda import get_current_device
import pytest import pytest
import torch import torch
import torch.distributed as dist import torch.distributed as dist
import torch.multiprocessing as mp import torch.multiprocessing as mp
from common import CONFIG, check_sharded_model_params
from torch.nn.parallel import DistributedDataParallel as DDP
import colossalai
from colossalai.amp import convert_to_apex_amp from colossalai.amp import convert_to_apex_amp
from colossalai.nn.optimizer import CPUAdam from colossalai.nn.optimizer import CPUAdam
from colossalai.testing import parameterize, rerun_if_address_is_in_use from colossalai.testing import parameterize, rerun_if_address_is_in_use
from colossalai.utils import free_port from colossalai.utils import free_port
from colossalai.utils.cuda import get_current_device
from colossalai.zero.init_ctx import ZeroInitContext from colossalai.zero.init_ctx import ZeroInitContext
from colossalai.zero.shard_utils import (BucketTensorShardStrategy, TensorShardStrategy) from colossalai.zero.shard_utils import BucketTensorShardStrategy, TensorShardStrategy
from colossalai.zero.sharded_model import ShardedModelV2 from colossalai.zero.sharded_model import ShardedModelV2
from colossalai.zero.sharded_model.utils import col_model_deepcopy from colossalai.zero.sharded_model.utils import col_model_deepcopy
from colossalai.zero.sharded_optim import ShardedOptimizerV2 from colossalai.zero.sharded_optim import ShardedOptimizerV2
from colossalai.zero.sharded_optim._utils import has_inf_or_nan from colossalai.zero.sharded_optim._utils import has_inf_or_nan
from tests.components_to_test.registry import non_distributed_component_funcs from tests.components_to_test.registry import non_distributed_component_funcs
from torch.nn.parallel import DistributedDataParallel as DDP
from common import CONFIG, check_sharded_model_params
def _run_step(model, optimizer, data, label, criterion, enable_autocast=False): def _run_step(model, optimizer, data, label, criterion, enable_autocast=False):
@ -45,7 +45,7 @@ def _run_step(model, optimizer, data, label, criterion, enable_autocast=False):
@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) @parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy])
@parameterize("gpu_margin_mem_ratio", [0.0, 0.7]) @parameterize("gpu_margin_mem_ratio", [0.0, 0.7])
def _run_test_sharded_optim_v2(cpu_offload, shard_strategy_class, use_cpuadam, gpu_margin_mem_ratio): def _run_test_sharded_optim_v2(cpu_offload, shard_strategy_class, use_cpuadam, gpu_margin_mem_ratio):
test_models = ['repeated_computed_layers', 'resnet18', 'bert', 'no_leaf_module'] test_models = ['repeated_computed_layers', 'resnet18', 'bert', 'hanging_param_model']
shard_strategy = shard_strategy_class() shard_strategy = shard_strategy_class()
if use_cpuadam and cpu_offload is False: if use_cpuadam and cpu_offload is False: