From bb2790cf0bcea11c07e16e077c51b1fd11b545b1 Mon Sep 17 00:00:00 2001 From: Frank Lee Date: Thu, 17 Mar 2022 15:44:17 +0800 Subject: [PATCH] optimize engine and trainer test (#448) --- .../test_cifar_with_data_pipeline_tensor.py | 31 ++----- tests/test_engine/test_engine.py | 92 +++++++------------ tests/test_trainer/test_pipeline/test_p2p.py | 51 +++------- .../test_pipeline/test_pipeline_schedule.py | 46 ++++------ .../test_trainer_with_non_pipe_schedule.py | 66 ++++++------- .../test_trainer_with_pipe_schedule.py | 22 +---- 6 files changed, 111 insertions(+), 197 deletions(-) diff --git a/tests/test_data_pipeline_tensor_parallel/test_cifar_with_data_pipeline_tensor.py b/tests/test_data_pipeline_tensor_parallel/test_cifar_with_data_pipeline_tensor.py index 2ab907072..433ba77ff 100644 --- a/tests/test_data_pipeline_tensor_parallel/test_cifar_with_data_pipeline_tensor.py +++ b/tests/test_data_pipeline_tensor_parallel/test_cifar_with_data_pipeline_tensor.py @@ -37,21 +37,12 @@ def run_trainer(rank, world_size, port): # build dataloaders transform_train = transforms.Compose([ - transforms.RandomCrop(32, padding=4), - transforms.AutoAugment(policy=transforms.AutoAugmentPolicy.CIFAR10), - transforms.ToTensor(), - transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), - ]) - transform_test = transforms.Compose([ - transforms.Resize(32), transforms.ToTensor(), transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), ]) train_dataset = CIFAR10(root=Path(os.environ['DATA']), train=True, download=True, transform=transform_train) - test_dataset = CIFAR10(root=Path(os.environ['DATA']), train=False, transform=transform_test) train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=BATCH_SIZE, pin_memory=True) - test_dataloader = get_dataloader(dataset=test_dataset, batch_size=BATCH_SIZE, pin_memory=True) # build criterion criterion = CrossEntropyLoss() @@ -65,33 +56,29 @@ def run_trainer(rank, world_size, port): warmup_steps = steps_per_epoch * WARMUP_EPOCHS lr_scheduler = LinearWarmupLR(optimizer, total_steps=total_steps, warmup_steps=warmup_steps) - engine, train_dataloader, test_dataloader, lr_scheduler = colossalai.initialize(pipe_model, optimizer, criterion, - train_dataloader, test_dataloader, - lr_scheduler) + engine, train_dataloader, _, lr_scheduler = colossalai.initialize(pipe_model, + optimizer, + criterion, + train_dataloader, + lr_scheduler=lr_scheduler) - timer = MultiTimer() + schedule = PipelineSchedule(num_microbatches=2) + logger = get_dist_logger() - schedule = PipelineSchedule(num_microbatches=4) - - trainer = Trainer(engine=engine, timer=timer, logger=logger, schedule=schedule) + trainer = Trainer(engine=engine, logger=logger, schedule=schedule) hook_list = [ - hooks.LossHook(), hooks.LRSchedulerHook(lr_scheduler=lr_scheduler, by_epoch=False), - hooks.LogMetricByEpochHook(logger), ] trainer.fit(train_dataloader=train_dataloader, epochs=NUM_EPOCHS, - max_steps=5, - test_dataloader=test_dataloader, - test_interval=1, + max_steps=2, hooks=hook_list, display_progress=True) @pytest.mark.dist -# @pytest.mark.skip("This test requires more than 8 GPUs, you should invoke this test script using test.sh provided manually") def test_hybrid_parallel(): world_size = 8 run_func = partial(run_trainer, world_size=world_size, port=free_port()) diff --git a/tests/test_engine/test_engine.py b/tests/test_engine/test_engine.py index 1bcba61f3..6d247d677 100644 --- a/tests/test_engine/test_engine.py +++ b/tests/test_engine/test_engine.py @@ -8,78 +8,52 @@ from colossalai.context import Config from colossalai.core import global_context as gpc from colossalai.utils import free_port from tests.components_to_test.registry import non_distributed_component_funcs +from colossalai.testing import parameterize CONFIG = dict(parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None)), fp16=dict(mode=None), clip_grad_norm=1.0) -def run_train(): - test_models = ['repeated_computed_layers', 'resnet18', 'repeated_computed_layers'] +@parameterize('model_name', ['repeated_computed_layers', 'resnet18', 'repeated_computed_layers']) +@parameterize('amp_mode', [AMP_TYPE.APEX, AMP_TYPE.TORCH, AMP_TYPE.NAIVE, None]) +def run_train(model_name, amp_mode): # FIXME: test bert - for model_name in test_models: - get_components_func = non_distributed_component_funcs.get_callable(model_name) - model_builder, train_dataloader, _, optimizer_class, criterion = get_components_func() + get_components_func = non_distributed_component_funcs.get_callable(model_name) + gpc.config.fp16['mode'] = amp_mode + model_builder, train_dataloader, _, optimizer_class, criterion = get_components_func() - model = model_builder(checkpoint=False) - engine, train_dataloader, *args = colossalai.initialize(model=model, - optimizer=optimizer_class(model.parameters(), lr=1e-3), - criterion=criterion, - train_dataloader=train_dataloader) + model = model_builder(checkpoint=False) + engine, train_dataloader, *args = colossalai.initialize(model=model, + optimizer=optimizer_class(model.parameters(), lr=1e-3), + criterion=criterion, + train_dataloader=train_dataloader) - try: - engine.train() - for data, label in train_dataloader: - engine.zero_grad() - data = data.cuda() - label = label.cuda() - if criterion: - output = engine(data) - loss = engine.criterion(output, label) - else: - loss = engine(data, label) - engine.backward(loss) - engine.step() - break - except IndexError: - # if using apex amp, NetWithRepeatedlyComputedLayers will raise an index out of range issue - # the following check fails in apex - # if cached_x.grad_fn.next_functions[1][0].variable is not x: - continue - - -def run_with_no_amp(): - run_train() - - -def run_with_torch_amp(): - # hack config - CONFIG['fp16']['mode'] = AMP_TYPE.TORCH - gpc._config = Config(CONFIG) - run_train() - - -def run_with_apex_amp(): - # hack config - CONFIG['fp16']['mode'] = AMP_TYPE.APEX - gpc._config = Config(CONFIG) - run_train() - - -def run_with_naive_amp(): - # hack config - CONFIG['fp16']['mode'] = AMP_TYPE.NAIVE - gpc._config = Config(CONFIG) - run_train() + try: + engine.train() + for data, label in train_dataloader: + engine.zero_grad() + data = data.cuda() + label = label.cuda() + if criterion: + output = engine(data) + loss = engine.criterion(output, label) + else: + loss = engine(data, label) + engine.backward(loss) + engine.step() + break + except IndexError: + # if using apex amp, NetWithRepeatedlyComputedLayers will raise an index out of range issue + # the following check fails in apex + # if cached_x.grad_fn.next_functions[1][0].variable is not x: + pass def run_engine(rank, world_size, port): # init dist env - colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - run_with_no_amp() - run_with_torch_amp() - run_with_apex_amp() - run_with_naive_amp() + colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + run_train() @pytest.mark.dist diff --git a/tests/test_trainer/test_pipeline/test_p2p.py b/tests/test_trainer/test_pipeline/test_p2p.py index 5258b42a5..04d454267 100644 --- a/tests/test_trainer/test_pipeline/test_p2p.py +++ b/tests/test_trainer/test_pipeline/test_p2p.py @@ -7,10 +7,8 @@ import pytest import torch import torch.distributed as dist import torch.multiprocessing as mp -from colossalai.communication import (recv_backward, recv_forward, - recv_tensor_meta, send_backward, - send_backward_recv_forward, send_forward, - send_forward_recv_backward, +from colossalai.communication import (recv_backward, recv_forward, recv_tensor_meta, send_backward, + send_backward_recv_forward, send_forward, send_forward_recv_backward, send_tensor_meta) from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc @@ -18,17 +16,11 @@ from colossalai.initialize import launch from colossalai.logging import get_dist_logger from colossalai.utils import free_port, get_current_device -BATCH_SIZE = 16 -SEQ_LENGTH = 64 -HIDDEN_SIZE = 128 +BATCH_SIZE = 4 +SEQ_LENGTH = 2 +HIDDEN_SIZE = 16 -CONFIG = dict( - parallel=dict( - pipeline=dict(size=4), - tensor=dict(size=1, mode=None) - ), - seed=1024 -) +CONFIG = dict(parallel=dict(pipeline=dict(size=4), tensor=dict(size=1, mode=None)), seed=1024) def check_equal(A, B): @@ -41,8 +33,7 @@ def check_forward(output_tensor, rank, logger): tensor = output_tensor.clone() else: tensor = recv_forward(output_tensor.shape) - logger.info('Rank {} received forward. Correct tensor: {}'.format( - rank, check_equal(tensor, output_tensor))) + logger.info('Rank {} received forward. Correct tensor: {}'.format(rank, check_equal(tensor, output_tensor))) if not gpc.is_last_rank(ParallelMode.PIPELINE): send_forward(tensor) logger.info('Rank {} sent forward.'.format(rank)) @@ -54,8 +45,7 @@ def check_backward(output_grad, rank, logger): grad = output_grad.clone() else: grad = recv_backward(output_grad.shape) - logger.info('Rank {} received backward. Correct grad: {}'.format( - rank, check_equal(grad, output_grad))) + logger.info('Rank {} received backward. Correct grad: {}'.format(rank, check_equal(grad, output_grad))) if not gpc.is_first_rank(ParallelMode.PIPELINE): send_backward(grad) logger.info('Rank {} sent backward.'.format(rank)) @@ -65,17 +55,15 @@ def check_forward_backward(output_tensor, output_grad, rank, logger): dist.barrier() if not gpc.is_first_rank(ParallelMode.PIPELINE): tensor = send_backward_recv_forward(output_grad, output_tensor.shape) - logger.info( - 'Rank {} sent backward received forward. Correct tensor: {}'. - format(rank, check_equal(tensor, output_tensor))) + logger.info('Rank {} sent backward received forward. Correct tensor: {}'.format( + rank, check_equal(tensor, output_tensor))) if not gpc.is_last_rank(ParallelMode.PIPELINE): grad = send_forward_recv_backward(output_tensor, output_grad.shape) - logger.info( - 'Rank {} sent forward received backward. Correct grad: {}'.format( - rank, check_equal(grad, output_grad))) + logger.info('Rank {} sent forward received backward. Correct grad: {}'.format( + rank, check_equal(grad, output_grad))) -def check_comm(size, rank, prev_rank, next_rank, logger): +def check_comm(size, rank, prev_rank, next_rank, logger): dtype = torch.float32 device = get_current_device() tensor_shape = (BATCH_SIZE, SEQ_LENGTH, HIDDEN_SIZE) @@ -90,21 +78,12 @@ def check_comm(size, rank, prev_rank, next_rank, logger): def run_check(rank, world_size, port): - launch( - config=CONFIG, - rank=rank, - world_size=world_size, - host='localhost', - port=port, - backend='nccl' - ) + launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') logger = get_dist_logger() rank = gpc.get_global_rank() prev_rank = gpc.get_prev_global_rank(ParallelMode.PIPELINE) next_rank = gpc.get_next_global_rank(ParallelMode.PIPELINE) - logger.info( - 'Rank {0}: prev rank {1}, next rank {2}'.format( - rank, prev_rank, next_rank)) + logger.info('Rank {0}: prev rank {1}, next rank {2}'.format(rank, prev_rank, next_rank)) logger.info('Distributed environment is initialzied.') check_comm(world_size, rank, prev_rank, next_rank, logger) diff --git a/tests/test_trainer/test_pipeline/test_pipeline_schedule.py b/tests/test_trainer/test_pipeline/test_pipeline_schedule.py index d3c876c9c..499371e8a 100644 --- a/tests/test_trainer/test_pipeline/test_pipeline_schedule.py +++ b/tests/test_trainer/test_pipeline/test_pipeline_schedule.py @@ -17,48 +17,34 @@ from colossalai.utils import free_port, get_dataloader, print_rank_0 from torchvision import transforms from torchvision.datasets import CIFAR10 -import model - -BATCH_SIZE = 32 -NUM_MICRO = 8 - +BATCH_SIZE = 4 +NUM_MICRO = 2 DIR_PATH = osp.dirname(osp.realpath(__file__)) CONFIG_PATH = osp.join(DIR_PATH, './resnet_config.py') def run_schedule(rank, world_size, port): - launch(config=CONFIG_PATH, - rank=rank, - world_size=world_size, - host='localhost', - port=port, - backend='nccl') + launch(config=CONFIG_PATH, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') # build model model = build_pipeline_model_from_cfg(gpc.config.model, 1) print_rank_0('model is created') - train_dataset = CIFAR10( - root=Path(os.environ['DATA']), - download=True, - transform=transforms.Compose( - [ - transforms.RandomCrop(size=32, padding=4), - transforms.RandomHorizontalFlip(), - transforms.ToTensor(), - transforms.Normalize(mean=[0.4914, 0.4822, 0.4465], std=[ - 0.2023, 0.1994, 0.2010]), - ] - ) - ) + train_dataset = CIFAR10(root=Path(os.environ['DATA']), + download=True, + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize(mean=[0.4914, 0.4822, 0.4465], std=[0.2023, 0.1994, 0.2010]), + ])) - train_dataloader = get_dataloader(dataset=train_dataset, - shuffle=True, - add_sampler=True, - batch_size=BATCH_SIZE, - pin_memory=True, - ) + train_dataloader = get_dataloader( + dataset=train_dataset, + shuffle=True, + add_sampler=True, + batch_size=BATCH_SIZE, + pin_memory=True, + ) # build criterion criterion = torch.nn.CrossEntropyLoss() diff --git a/tests/test_trainer/test_trainer_with_non_pipe_schedule.py b/tests/test_trainer/test_trainer_with_non_pipe_schedule.py index d226916b5..6f7ab3f5e 100644 --- a/tests/test_trainer/test_trainer_with_non_pipe_schedule.py +++ b/tests/test_trainer/test_trainer_with_non_pipe_schedule.py @@ -9,51 +9,51 @@ from colossalai.logging import get_dist_logger from colossalai.trainer import Trainer from colossalai.utils import MultiTimer, free_port from tests.components_to_test.registry import non_distributed_component_funcs +from colossalai.testing import parameterize -BATCH_SIZE = 16 +BATCH_SIZE = 4 IMG_SIZE = 32 NUM_EPOCHS = 200 -CONFIG = dict( - # Config - fp16=dict(mode=AMP_TYPE.TORCH)) +CONFIG = dict(fp16=dict(mode=AMP_TYPE.TORCH)) -def run_trainer_no_pipeline(rank, world_size, port): +@parameterize('model_name', ['repeated_computed_layers', 'resnet18', 'nested_model']) +def run_trainer(model_name): + get_components_func = non_distributed_component_funcs.get_callable(model_name) + model_builder, train_dataloader, test_dataloader, optimizer_class, criterion = get_components_func() + model = model_builder() + optimizer = optimizer_class(model.parameters(), lr=1e-3) + engine, train_dataloader, *_ = colossalai.initialize(model=model, + optimizer=optimizer, + criterion=criterion, + train_dataloader=train_dataloader) + + logger = get_dist_logger() + logger.info("engine is built", ranks=[0]) + + timer = MultiTimer() + trainer = Trainer(engine=engine, logger=logger, timer=timer) + logger.info("trainer is built", ranks=[0]) + + logger.info("start training", ranks=[0]) + trainer.fit(train_dataloader=train_dataloader, + test_dataloader=test_dataloader, + epochs=NUM_EPOCHS, + max_steps=3, + display_progress=True, + test_interval=5) + torch.cuda.empty_cache() + + +def run_dist(rank, world_size, port): colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - test_models = ['repeated_computed_layers', 'resnet18', 'nested_model'] - for name in test_models: - get_components_func = non_distributed_component_funcs.get_callable(name) - model_builder, train_dataloader, test_dataloader, optimizer_class, criterion = get_components_func() - model = model_builder() - optimizer = optimizer_class(model.parameters(), lr=1e-3) - engine, train_dataloader, *_ = colossalai.initialize(model=model, - optimizer=optimizer, - criterion=criterion, - train_dataloader=train_dataloader) - - logger = get_dist_logger() - logger.info("engine is built", ranks=[0]) - - timer = MultiTimer() - trainer = Trainer(engine=engine, logger=logger, timer=timer) - logger.info("trainer is built", ranks=[0]) - - logger.info("start training", ranks=[0]) - trainer.fit(train_dataloader=train_dataloader, - test_dataloader=test_dataloader, - epochs=NUM_EPOCHS, - max_steps=5, - display_progress=True, - test_interval=5) - torch.cuda.empty_cache() - @pytest.mark.dist def test_trainer_no_pipeline(): world_size = 4 - run_func = partial(run_trainer_no_pipeline, world_size=world_size, port=free_port()) + run_func = partial(run_dist, world_size=world_size, port=free_port()) mp.spawn(run_func, nprocs=world_size) diff --git a/tests/test_trainer/test_trainer_with_pipe_schedule.py b/tests/test_trainer/test_trainer_with_pipe_schedule.py index 8dffc3fc7..15a85964d 100644 --- a/tests/test_trainer/test_trainer_with_pipe_schedule.py +++ b/tests/test_trainer/test_trainer_with_pipe_schedule.py @@ -18,11 +18,11 @@ from torchvision import transforms from torchvision.datasets import CIFAR10 from torchvision.models import resnet18 -BATCH_SIZE = 16 +BATCH_SIZE = 4 IMG_SIZE = 32 NUM_EPOCHS = 200 -CONFIG = dict(parallel=dict(pipeline=2, ), ) +CONFIG = dict(parallel=dict(pipeline=2),) def run_trainer_with_pipeline(rank, world_size, port): @@ -34,9 +34,9 @@ def run_trainer_with_pipeline(rank, world_size, port): if gpc.get_local_rank(ParallelMode.PIPELINE) == 0: model = nn.Sequential(model.conv1, model.bn1, model.relu, model.maxpool, model.layer1, model.layer2) elif gpc.get_local_rank(ParallelMode.PIPELINE) == 1: - from functools import partial class Flatten(nn.Module): + def forward(self, x): return torch.flatten(x, 1) @@ -51,23 +51,12 @@ def run_trainer_with_pipeline(rank, world_size, port): transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)) ])) - test_dataset = CIFAR10(root=Path(os.environ['DATA']), - train=False, - download=True, - transform=transforms.Compose([ - transforms.Resize(size=(IMG_SIZE, IMG_SIZE)), - transforms.ToTensor(), - transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)) - ])) - train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=BATCH_SIZE, pin_memory=True, drop_last=True) - test_dataloader = get_dataloader(dataset=test_dataset, batch_size=BATCH_SIZE, pin_memory=True, drop_last=True) - # build optimizer optimizer = Adam(model.parameters(), lr=0.001) criterion = nn.CrossEntropyLoss() @@ -79,7 +68,7 @@ def run_trainer_with_pipeline(rank, world_size, port): logger = get_dist_logger() logger.info("engine is built", ranks=[0]) - pipe_schedule = PipelineSchedule(num_microbatches=4) + pipe_schedule = PipelineSchedule(num_microbatches=2) timer = MultiTimer() trainer = Trainer(engine=engine, schedule=pipe_schedule, logger=logger, timer=timer) logger.info("trainer is built", ranks=[0]) @@ -87,9 +76,8 @@ def run_trainer_with_pipeline(rank, world_size, port): logger.info("start training", ranks=[0]) trainer.fit(train_dataloader=train_dataloader, - test_dataloader=test_dataloader, epochs=NUM_EPOCHS, - max_steps=100, + max_steps=3, display_progress=True, test_interval=5) gpc.destroy()