optimize engine and trainer test (#448)

pull/451/head
Frank Lee 2022-03-17 15:44:17 +08:00 committed by GitHub
parent 237d08e7ee
commit bb2790cf0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 111 additions and 197 deletions

View File

@ -37,21 +37,12 @@ def run_trainer(rank, world_size, port):
# build dataloaders
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.AutoAugment(policy=transforms.AutoAugmentPolicy.CIFAR10),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
transform_test = transforms.Compose([
transforms.Resize(32),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
train_dataset = CIFAR10(root=Path(os.environ['DATA']), train=True, download=True, transform=transform_train)
test_dataset = CIFAR10(root=Path(os.environ['DATA']), train=False, transform=transform_test)
train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=BATCH_SIZE, pin_memory=True)
test_dataloader = get_dataloader(dataset=test_dataset, batch_size=BATCH_SIZE, pin_memory=True)
# build criterion
criterion = CrossEntropyLoss()
@ -65,33 +56,29 @@ def run_trainer(rank, world_size, port):
warmup_steps = steps_per_epoch * WARMUP_EPOCHS
lr_scheduler = LinearWarmupLR(optimizer, total_steps=total_steps, warmup_steps=warmup_steps)
engine, train_dataloader, test_dataloader, lr_scheduler = colossalai.initialize(pipe_model, optimizer, criterion,
train_dataloader, test_dataloader,
lr_scheduler)
engine, train_dataloader, _, lr_scheduler = colossalai.initialize(pipe_model,
optimizer,
criterion,
train_dataloader,
lr_scheduler=lr_scheduler)
timer = MultiTimer()
schedule = PipelineSchedule(num_microbatches=2)
logger = get_dist_logger()
schedule = PipelineSchedule(num_microbatches=4)
trainer = Trainer(engine=engine, timer=timer, logger=logger, schedule=schedule)
trainer = Trainer(engine=engine, logger=logger, schedule=schedule)
hook_list = [
hooks.LossHook(),
hooks.LRSchedulerHook(lr_scheduler=lr_scheduler, by_epoch=False),
hooks.LogMetricByEpochHook(logger),
]
trainer.fit(train_dataloader=train_dataloader,
epochs=NUM_EPOCHS,
max_steps=5,
test_dataloader=test_dataloader,
test_interval=1,
max_steps=2,
hooks=hook_list,
display_progress=True)
@pytest.mark.dist
# @pytest.mark.skip("This test requires more than 8 GPUs, you should invoke this test script using test.sh provided manually")
def test_hybrid_parallel():
world_size = 8
run_func = partial(run_trainer, world_size=world_size, port=free_port())

View File

@ -8,78 +8,52 @@ from colossalai.context import Config
from colossalai.core import global_context as gpc
from colossalai.utils import free_port
from tests.components_to_test.registry import non_distributed_component_funcs
from colossalai.testing import parameterize
CONFIG = dict(parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None)),
fp16=dict(mode=None),
clip_grad_norm=1.0)
def run_train():
test_models = ['repeated_computed_layers', 'resnet18', 'repeated_computed_layers']
@parameterize('model_name', ['repeated_computed_layers', 'resnet18', 'repeated_computed_layers'])
@parameterize('amp_mode', [AMP_TYPE.APEX, AMP_TYPE.TORCH, AMP_TYPE.NAIVE, None])
def run_train(model_name, amp_mode):
# FIXME: test bert
for model_name in test_models:
get_components_func = non_distributed_component_funcs.get_callable(model_name)
model_builder, train_dataloader, _, optimizer_class, criterion = get_components_func()
get_components_func = non_distributed_component_funcs.get_callable(model_name)
gpc.config.fp16['mode'] = amp_mode
model_builder, train_dataloader, _, optimizer_class, criterion = get_components_func()
model = model_builder(checkpoint=False)
engine, train_dataloader, *args = colossalai.initialize(model=model,
optimizer=optimizer_class(model.parameters(), lr=1e-3),
criterion=criterion,
train_dataloader=train_dataloader)
model = model_builder(checkpoint=False)
engine, train_dataloader, *args = colossalai.initialize(model=model,
optimizer=optimizer_class(model.parameters(), lr=1e-3),
criterion=criterion,
train_dataloader=train_dataloader)
try:
engine.train()
for data, label in train_dataloader:
engine.zero_grad()
data = data.cuda()
label = label.cuda()
if criterion:
output = engine(data)
loss = engine.criterion(output, label)
else:
loss = engine(data, label)
engine.backward(loss)
engine.step()
break
except IndexError:
# if using apex amp, NetWithRepeatedlyComputedLayers will raise an index out of range issue
# the following check fails in apex
# if cached_x.grad_fn.next_functions[1][0].variable is not x:
continue
def run_with_no_amp():
run_train()
def run_with_torch_amp():
# hack config
CONFIG['fp16']['mode'] = AMP_TYPE.TORCH
gpc._config = Config(CONFIG)
run_train()
def run_with_apex_amp():
# hack config
CONFIG['fp16']['mode'] = AMP_TYPE.APEX
gpc._config = Config(CONFIG)
run_train()
def run_with_naive_amp():
# hack config
CONFIG['fp16']['mode'] = AMP_TYPE.NAIVE
gpc._config = Config(CONFIG)
run_train()
try:
engine.train()
for data, label in train_dataloader:
engine.zero_grad()
data = data.cuda()
label = label.cuda()
if criterion:
output = engine(data)
loss = engine.criterion(output, label)
else:
loss = engine(data, label)
engine.backward(loss)
engine.step()
break
except IndexError:
# if using apex amp, NetWithRepeatedlyComputedLayers will raise an index out of range issue
# the following check fails in apex
# if cached_x.grad_fn.next_functions[1][0].variable is not x:
pass
def run_engine(rank, world_size, port):
# init dist env
colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
run_with_no_amp()
run_with_torch_amp()
run_with_apex_amp()
run_with_naive_amp()
colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
run_train()
@pytest.mark.dist

View File

@ -7,10 +7,8 @@ import pytest
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from colossalai.communication import (recv_backward, recv_forward,
recv_tensor_meta, send_backward,
send_backward_recv_forward, send_forward,
send_forward_recv_backward,
from colossalai.communication import (recv_backward, recv_forward, recv_tensor_meta, send_backward,
send_backward_recv_forward, send_forward, send_forward_recv_backward,
send_tensor_meta)
from colossalai.context.parallel_mode import ParallelMode
from colossalai.core import global_context as gpc
@ -18,17 +16,11 @@ from colossalai.initialize import launch
from colossalai.logging import get_dist_logger
from colossalai.utils import free_port, get_current_device
BATCH_SIZE = 16
SEQ_LENGTH = 64
HIDDEN_SIZE = 128
BATCH_SIZE = 4
SEQ_LENGTH = 2
HIDDEN_SIZE = 16
CONFIG = dict(
parallel=dict(
pipeline=dict(size=4),
tensor=dict(size=1, mode=None)
),
seed=1024
)
CONFIG = dict(parallel=dict(pipeline=dict(size=4), tensor=dict(size=1, mode=None)), seed=1024)
def check_equal(A, B):
@ -41,8 +33,7 @@ def check_forward(output_tensor, rank, logger):
tensor = output_tensor.clone()
else:
tensor = recv_forward(output_tensor.shape)
logger.info('Rank {} received forward. Correct tensor: {}'.format(
rank, check_equal(tensor, output_tensor)))
logger.info('Rank {} received forward. Correct tensor: {}'.format(rank, check_equal(tensor, output_tensor)))
if not gpc.is_last_rank(ParallelMode.PIPELINE):
send_forward(tensor)
logger.info('Rank {} sent forward.'.format(rank))
@ -54,8 +45,7 @@ def check_backward(output_grad, rank, logger):
grad = output_grad.clone()
else:
grad = recv_backward(output_grad.shape)
logger.info('Rank {} received backward. Correct grad: {}'.format(
rank, check_equal(grad, output_grad)))
logger.info('Rank {} received backward. Correct grad: {}'.format(rank, check_equal(grad, output_grad)))
if not gpc.is_first_rank(ParallelMode.PIPELINE):
send_backward(grad)
logger.info('Rank {} sent backward.'.format(rank))
@ -65,17 +55,15 @@ def check_forward_backward(output_tensor, output_grad, rank, logger):
dist.barrier()
if not gpc.is_first_rank(ParallelMode.PIPELINE):
tensor = send_backward_recv_forward(output_grad, output_tensor.shape)
logger.info(
'Rank {} sent backward received forward. Correct tensor: {}'.
format(rank, check_equal(tensor, output_tensor)))
logger.info('Rank {} sent backward received forward. Correct tensor: {}'.format(
rank, check_equal(tensor, output_tensor)))
if not gpc.is_last_rank(ParallelMode.PIPELINE):
grad = send_forward_recv_backward(output_tensor, output_grad.shape)
logger.info(
'Rank {} sent forward received backward. Correct grad: {}'.format(
rank, check_equal(grad, output_grad)))
logger.info('Rank {} sent forward received backward. Correct grad: {}'.format(
rank, check_equal(grad, output_grad)))
def check_comm(size, rank, prev_rank, next_rank, logger):
def check_comm(size, rank, prev_rank, next_rank, logger):
dtype = torch.float32
device = get_current_device()
tensor_shape = (BATCH_SIZE, SEQ_LENGTH, HIDDEN_SIZE)
@ -90,21 +78,12 @@ def check_comm(size, rank, prev_rank, next_rank, logger):
def run_check(rank, world_size, port):
launch(
config=CONFIG,
rank=rank,
world_size=world_size,
host='localhost',
port=port,
backend='nccl'
)
launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
logger = get_dist_logger()
rank = gpc.get_global_rank()
prev_rank = gpc.get_prev_global_rank(ParallelMode.PIPELINE)
next_rank = gpc.get_next_global_rank(ParallelMode.PIPELINE)
logger.info(
'Rank {0}: prev rank {1}, next rank {2}'.format(
rank, prev_rank, next_rank))
logger.info('Rank {0}: prev rank {1}, next rank {2}'.format(rank, prev_rank, next_rank))
logger.info('Distributed environment is initialzied.')
check_comm(world_size, rank, prev_rank, next_rank, logger)

View File

@ -17,48 +17,34 @@ from colossalai.utils import free_port, get_dataloader, print_rank_0
from torchvision import transforms
from torchvision.datasets import CIFAR10
import model
BATCH_SIZE = 32
NUM_MICRO = 8
BATCH_SIZE = 4
NUM_MICRO = 2
DIR_PATH = osp.dirname(osp.realpath(__file__))
CONFIG_PATH = osp.join(DIR_PATH, './resnet_config.py')
def run_schedule(rank, world_size, port):
launch(config=CONFIG_PATH,
rank=rank,
world_size=world_size,
host='localhost',
port=port,
backend='nccl')
launch(config=CONFIG_PATH, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
# build model
model = build_pipeline_model_from_cfg(gpc.config.model, 1)
print_rank_0('model is created')
train_dataset = CIFAR10(
root=Path(os.environ['DATA']),
download=True,
transform=transforms.Compose(
[
transforms.RandomCrop(size=32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.4914, 0.4822, 0.4465], std=[
0.2023, 0.1994, 0.2010]),
]
)
)
train_dataset = CIFAR10(root=Path(os.environ['DATA']),
download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.4914, 0.4822, 0.4465], std=[0.2023, 0.1994, 0.2010]),
]))
train_dataloader = get_dataloader(dataset=train_dataset,
shuffle=True,
add_sampler=True,
batch_size=BATCH_SIZE,
pin_memory=True,
)
train_dataloader = get_dataloader(
dataset=train_dataset,
shuffle=True,
add_sampler=True,
batch_size=BATCH_SIZE,
pin_memory=True,
)
# build criterion
criterion = torch.nn.CrossEntropyLoss()

View File

@ -9,51 +9,51 @@ from colossalai.logging import get_dist_logger
from colossalai.trainer import Trainer
from colossalai.utils import MultiTimer, free_port
from tests.components_to_test.registry import non_distributed_component_funcs
from colossalai.testing import parameterize
BATCH_SIZE = 16
BATCH_SIZE = 4
IMG_SIZE = 32
NUM_EPOCHS = 200
CONFIG = dict(
# Config
fp16=dict(mode=AMP_TYPE.TORCH))
CONFIG = dict(fp16=dict(mode=AMP_TYPE.TORCH))
def run_trainer_no_pipeline(rank, world_size, port):
@parameterize('model_name', ['repeated_computed_layers', 'resnet18', 'nested_model'])
def run_trainer(model_name):
get_components_func = non_distributed_component_funcs.get_callable(model_name)
model_builder, train_dataloader, test_dataloader, optimizer_class, criterion = get_components_func()
model = model_builder()
optimizer = optimizer_class(model.parameters(), lr=1e-3)
engine, train_dataloader, *_ = colossalai.initialize(model=model,
optimizer=optimizer,
criterion=criterion,
train_dataloader=train_dataloader)
logger = get_dist_logger()
logger.info("engine is built", ranks=[0])
timer = MultiTimer()
trainer = Trainer(engine=engine, logger=logger, timer=timer)
logger.info("trainer is built", ranks=[0])
logger.info("start training", ranks=[0])
trainer.fit(train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
epochs=NUM_EPOCHS,
max_steps=3,
display_progress=True,
test_interval=5)
torch.cuda.empty_cache()
def run_dist(rank, world_size, port):
colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
test_models = ['repeated_computed_layers', 'resnet18', 'nested_model']
for name in test_models:
get_components_func = non_distributed_component_funcs.get_callable(name)
model_builder, train_dataloader, test_dataloader, optimizer_class, criterion = get_components_func()
model = model_builder()
optimizer = optimizer_class(model.parameters(), lr=1e-3)
engine, train_dataloader, *_ = colossalai.initialize(model=model,
optimizer=optimizer,
criterion=criterion,
train_dataloader=train_dataloader)
logger = get_dist_logger()
logger.info("engine is built", ranks=[0])
timer = MultiTimer()
trainer = Trainer(engine=engine, logger=logger, timer=timer)
logger.info("trainer is built", ranks=[0])
logger.info("start training", ranks=[0])
trainer.fit(train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
epochs=NUM_EPOCHS,
max_steps=5,
display_progress=True,
test_interval=5)
torch.cuda.empty_cache()
@pytest.mark.dist
def test_trainer_no_pipeline():
world_size = 4
run_func = partial(run_trainer_no_pipeline, world_size=world_size, port=free_port())
run_func = partial(run_dist, world_size=world_size, port=free_port())
mp.spawn(run_func, nprocs=world_size)

View File

@ -18,11 +18,11 @@ from torchvision import transforms
from torchvision.datasets import CIFAR10
from torchvision.models import resnet18
BATCH_SIZE = 16
BATCH_SIZE = 4
IMG_SIZE = 32
NUM_EPOCHS = 200
CONFIG = dict(parallel=dict(pipeline=2, ), )
CONFIG = dict(parallel=dict(pipeline=2),)
def run_trainer_with_pipeline(rank, world_size, port):
@ -34,9 +34,9 @@ def run_trainer_with_pipeline(rank, world_size, port):
if gpc.get_local_rank(ParallelMode.PIPELINE) == 0:
model = nn.Sequential(model.conv1, model.bn1, model.relu, model.maxpool, model.layer1, model.layer2)
elif gpc.get_local_rank(ParallelMode.PIPELINE) == 1:
from functools import partial
class Flatten(nn.Module):
def forward(self, x):
return torch.flatten(x, 1)
@ -51,23 +51,12 @@ def run_trainer_with_pipeline(rank, world_size, port):
transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
]))
test_dataset = CIFAR10(root=Path(os.environ['DATA']),
train=False,
download=True,
transform=transforms.Compose([
transforms.Resize(size=(IMG_SIZE, IMG_SIZE)),
transforms.ToTensor(),
transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
]))
train_dataloader = get_dataloader(dataset=train_dataset,
shuffle=True,
batch_size=BATCH_SIZE,
pin_memory=True,
drop_last=True)
test_dataloader = get_dataloader(dataset=test_dataset, batch_size=BATCH_SIZE, pin_memory=True, drop_last=True)
# build optimizer
optimizer = Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
@ -79,7 +68,7 @@ def run_trainer_with_pipeline(rank, world_size, port):
logger = get_dist_logger()
logger.info("engine is built", ranks=[0])
pipe_schedule = PipelineSchedule(num_microbatches=4)
pipe_schedule = PipelineSchedule(num_microbatches=2)
timer = MultiTimer()
trainer = Trainer(engine=engine, schedule=pipe_schedule, logger=logger, timer=timer)
logger.info("trainer is built", ranks=[0])
@ -87,9 +76,8 @@ def run_trainer_with_pipeline(rank, world_size, port):
logger.info("start training", ranks=[0])
trainer.fit(train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
epochs=NUM_EPOCHS,
max_steps=100,
max_steps=3,
display_progress=True,
test_interval=5)
gpc.destroy()