From 6268446b81f7263f2baa246698f4d32ab9db5a95 Mon Sep 17 00:00:00 2001 From: Frank Lee Date: Tue, 8 Mar 2022 10:19:18 +0800 Subject: [PATCH] [test] refactored testing components (#324) --- tests/components_to_test/__init__.py | 1 + tests/components_to_test/nested_model.py | 49 ++++++++ tests/components_to_test/registry.py | 39 +++++++ .../repeated_computed_layer.py | 44 +++++++ tests/components_to_test/resnet.py | 30 +++++ tests/components_to_test/utils/__init__.py | 1 + .../utils/dummy_data_generator.py | 14 +++ tests/test_engine/test_engine.py | 86 ++++++++++++++ .../test_engine/test_engine_apex_amp.py | 110 ------------------ .../test_engine/test_engine_naive_amp.py | 109 ----------------- .../test_engine/test_engine_no_amp.py | 105 ----------------- .../test_engine/test_engine_torch_amp.py | 107 ----------------- .../{test_engine => }/test_param_hook.py | 0 13 files changed, 264 insertions(+), 431 deletions(-) create mode 100644 tests/components_to_test/__init__.py create mode 100644 tests/components_to_test/nested_model.py create mode 100644 tests/components_to_test/registry.py create mode 100644 tests/components_to_test/repeated_computed_layer.py create mode 100644 tests/components_to_test/resnet.py create mode 100644 tests/components_to_test/utils/__init__.py create mode 100644 tests/components_to_test/utils/dummy_data_generator.py create mode 100644 tests/test_engine/test_engine.py delete mode 100644 tests/test_engine/test_engine/test_engine_apex_amp.py delete mode 100644 tests/test_engine/test_engine/test_engine_naive_amp.py delete mode 100644 tests/test_engine/test_engine/test_engine_no_amp.py delete mode 100644 tests/test_engine/test_engine/test_engine_torch_amp.py rename tests/test_engine/{test_engine => }/test_param_hook.py (100%) diff --git a/tests/components_to_test/__init__.py b/tests/components_to_test/__init__.py new file mode 100644 index 000000000..889bd1349 --- /dev/null +++ b/tests/components_to_test/__init__.py @@ -0,0 +1 @@ +from . import repeated_computed_layer, resnet, nested_model diff --git a/tests/components_to_test/nested_model.py b/tests/components_to_test/nested_model.py new file mode 100644 index 000000000..316c1e18c --- /dev/null +++ b/tests/components_to_test/nested_model.py @@ -0,0 +1,49 @@ +import torch +import torch.nn as nn +import torch.nn.functional as F +from .utils import DummyDataGenerator +from .registry import non_distributed_component_funcs + + +class SubNet(nn.Module): + + def __init__(self, out_features) -> None: + super().__init__() + self.bias = nn.Parameter(torch.zeros(out_features)) + + def forward(self, x, weight): + return F.linear(x, weight, self.bias) + + +class NestedNet(nn.Module): + + def __init__(self) -> None: + super().__init__() + self.fc1 = nn.Linear(5, 5) + self.sub_fc = SubNet(5) + self.fc2 = nn.Linear(5, 2) + + def forward(self, x): + x = self.fc1(x) + x = self.sub_fc(x, self.fc1.weight) + x = self.fc1(x) + x = self.fc2(x) + return x + + +class DummyDataLoader(DummyDataGenerator): + + def generate(self): + data = torch.rand(16, 5) + label = torch.randint(low=0, high=2, size=(16,)) + return data, label + + +@non_distributed_component_funcs.register(name='nested_model') +def get_training_components(): + model = NestedNet() + trainloader = DummyDataLoader() + testloader = DummyDataLoader() + optim = torch.optim.Adam(model.parameters(), lr=0.001) + criterion = torch.nn.CrossEntropyLoss() + return model, trainloader, testloader, optim, criterion diff --git a/tests/components_to_test/registry.py b/tests/components_to_test/registry.py new file mode 100644 index 000000000..728ed9eba --- /dev/null +++ b/tests/components_to_test/registry.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python + + +class Registry: + + def __init__(self): + self._registry = dict() + + def register(self, name): + assert name not in self._registry + + def _regsiter(callable_): + self._registry[name] = callable_ + + return _regsiter + + def get_callable(self, name: str): + return self._registry[name] + + def __iter__(self): + self._idx = 0 + self._len = len(self._registry) + self._names = list(self._registry.keys()) + return self + + def __next__(self): + if self._idx < self._len: + key = self._names[self._idx] + callable_ = self._registry[key] + self._idx += 1 + return callable_ + else: + raise StopIteration + + +non_distributed_component_funcs = Registry() +model_paralle_component_funcs = Registry() + +__all__ = ['non_distributed_component_funcs', 'model_paralle_component_funcs'] diff --git a/tests/components_to_test/repeated_computed_layer.py b/tests/components_to_test/repeated_computed_layer.py new file mode 100644 index 000000000..a0f742041 --- /dev/null +++ b/tests/components_to_test/repeated_computed_layer.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python + +import torch +import torch.nn as nn +from colossalai.nn import CheckpointModule +from .utils.dummy_data_generator import DummyDataGenerator +from .registry import non_distributed_component_funcs + + +class NetWithRepeatedlyComputedLayers(CheckpointModule): + """ + This model is to test with layers which go through forward pass multiple times. + In this model, the fc1 and fc2 call forward twice + """ + + def __init__(self, checkpoint=False) -> None: + super().__init__(checkpoint=checkpoint) + self.fc1 = nn.Linear(5, 5) + self.fc2 = nn.Linear(5, 5) + self.fc3 = nn.Linear(5, 2) + self.layers = [self.fc1, self.fc2, self.fc1, self.fc2, self.fc3] + + def forward(self, x): + for layer in self.layers: + x = layer(x) + return x + + +class DummyDataLoader(DummyDataGenerator): + + def generate(self): + data = torch.rand(16, 5) + label = torch.randint(low=0, high=2, size=(16,)) + return data, label + + +@non_distributed_component_funcs.register(name='repeated_computed_layers') +def get_training_components(): + model = NetWithRepeatedlyComputedLayers(checkpoint=True) + trainloader = DummyDataLoader() + testloader = DummyDataLoader() + optim = torch.optim.Adam(model.parameters(), lr=0.001) + criterion = torch.nn.CrossEntropyLoss() + return model, trainloader, testloader, optim, criterion diff --git a/tests/components_to_test/resnet.py b/tests/components_to_test/resnet.py new file mode 100644 index 000000000..f1448fa62 --- /dev/null +++ b/tests/components_to_test/resnet.py @@ -0,0 +1,30 @@ +from torchvision.models import resnet18 +from .registry import non_distributed_component_funcs +from pathlib import Path +import os +import torch +from torchvision.transforms import transforms +from torchvision.datasets import CIFAR10 +from colossalai.utils import get_dataloader + + +def get_cifar10_dataloader(train): + # build dataloaders + dataset = CIFAR10(root=Path(os.environ['DATA']), + download=True, + train=train, + transform=transforms.Compose( + [transforms.ToTensor(), + transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))])) + dataloader = get_dataloader(dataset=dataset, shuffle=True, batch_size=16, drop_last=True) + return dataloader + + +@non_distributed_component_funcs.register(name='resnet18') +def get_resnet_training_components(): + model = resnet18(num_classes=10) + trainloader = get_cifar10_dataloader(train=True) + testloader = get_cifar10_dataloader(train=False) + optim = torch.optim.Adam(model.parameters(), lr=0.001) + criterion = torch.nn.CrossEntropyLoss() + return model, trainloader, testloader, optim, criterion diff --git a/tests/components_to_test/utils/__init__.py b/tests/components_to_test/utils/__init__.py new file mode 100644 index 000000000..fc6321214 --- /dev/null +++ b/tests/components_to_test/utils/__init__.py @@ -0,0 +1 @@ +from .dummy_data_generator import DummyDataGenerator diff --git a/tests/components_to_test/utils/dummy_data_generator.py b/tests/components_to_test/utils/dummy_data_generator.py new file mode 100644 index 000000000..aabcd30e4 --- /dev/null +++ b/tests/components_to_test/utils/dummy_data_generator.py @@ -0,0 +1,14 @@ +from abc import ABC, abstractmethod + + +class DummyDataGenerator(ABC): + + @abstractmethod + def generate(self): + pass + + def __iter__(self): + return self + + def __next__(self): + return self.generate() diff --git a/tests/test_engine/test_engine.py b/tests/test_engine/test_engine.py new file mode 100644 index 000000000..f6aa0a6e3 --- /dev/null +++ b/tests/test_engine/test_engine.py @@ -0,0 +1,86 @@ +from functools import partial + +import colossalai +import pytest +import torch.multiprocessing as mp +from colossalai.amp import AMP_TYPE +from colossalai.core import global_context as gpc +from colossalai.utils import free_port +from colossalai.context import Config +from tests.components_to_test.registry import non_distributed_component_funcs + +CONFIG = dict(parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None)), + fp16=dict(mode=None), + clip_grad_norm=1.0) + + +def run_train(): + for get_components_func in non_distributed_component_funcs: + model, train_dataloader, _, optimizer, criterion = get_components_func() + + engine, train_dataloader, *args = colossalai.initialize(model=model, + optimizer=optimizer, + criterion=criterion, + train_dataloader=train_dataloader) + + try: + engine.train() + for img, label in train_dataloader: + engine.zero_grad() + img = img.cuda() + label = label.cuda() + output = engine(img) + loss = engine.criterion(output, label) + engine.backward(loss) + engine.step() + break + except IndexError: + # if using apex amp, NetWithRepeatedlyComputedLayers will raise an index out of range issue + # the following check fails in apex + # if cached_x.grad_fn.next_functions[1][0].variable is not x: + continue + + +def run_with_no_amp(): + run_train() + + +def run_with_torch_amp(): + # hack config + CONFIG['fp16']['mode'] = AMP_TYPE.TORCH + gpc._config = Config(CONFIG) + run_train() + + +def run_with_apex_amp(): + # hack config + CONFIG['fp16']['mode'] = AMP_TYPE.APEX + gpc._config = Config(CONFIG) + run_train() + + +def run_with_naive_amp(): + # hack config + CONFIG['fp16']['mode'] = AMP_TYPE.NAIVE + gpc._config = Config(CONFIG) + run_train() + + +def run_engine(rank, world_size, port): + # init dist env + colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + run_with_no_amp() + run_with_torch_amp() + run_with_apex_amp() + run_with_naive_amp() + + +@pytest.mark.dist +def test_engine(): + world_size = 4 + run_func = partial(run_engine, world_size=world_size, port=free_port()) + mp.spawn(run_func, nprocs=world_size) + + +if __name__ == '__main__': + test_engine() diff --git a/tests/test_engine/test_engine/test_engine_apex_amp.py b/tests/test_engine/test_engine/test_engine_apex_amp.py deleted file mode 100644 index 164ae54bb..000000000 --- a/tests/test_engine/test_engine/test_engine_apex_amp.py +++ /dev/null @@ -1,110 +0,0 @@ -# !/usr/bin/env python -# -*- encoding: utf-8 -*- - -import os -from functools import partial -from pathlib import Path - -import colossalai -import pytest -import torch -import torch.multiprocessing as mp -import torch.nn as nn -from colossalai.amp import AMP_TYPE -from colossalai.core import global_context as gpc -from colossalai.logging import get_dist_logger -from colossalai.utils import free_port, get_dataloader, report_memory_usage -from torch.optim import Adam -from torchvision import transforms -from torchvision.datasets import CIFAR10 -from torchvision.models import resnet18 - -# Config -BATCH_SIZE = 128 -IMG_SIZE = 224 -DIM = 768 -NUM_CLASSES = 10 -NUM_ATTN_HEADS = 12 - -CONFIG = dict( - parallel=dict( - pipeline=dict(size=1), - tensor=dict(size=1, mode=None) - ), - fp16=dict(mode=AMP_TYPE.APEX), - clip_grad_norm=1.0 -) - - -def run_engine(rank, world_size, port): - # init dist env - colossalai.launch( - config=CONFIG, - rank=rank, - world_size=world_size, - host='localhost', - port=port, - backend='nccl' - ) - - # build model - model = resnet18(num_classes=10) - - # build dataloaders - train_dataset = CIFAR10( - root=Path(os.environ['DATA']), - download=True, - transform=transforms.Compose( - [ - transforms.Resize(size=(IMG_SIZE, IMG_SIZE)), - transforms.ToTensor(), - transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)) - ] - ) - ) - train_dataloader = get_dataloader(dataset=train_dataset, - shuffle=True, - batch_size=BATCH_SIZE, - drop_last=True) - - # build optimizer - optimizer = Adam(model.parameters(), lr=0.001) - criterion = nn.CrossEntropyLoss() - - engine, train_dataloader, *args = colossalai.initialize( - model=model, - optimizer=optimizer, - criterion=criterion, - train_dataloader=train_dataloader - ) - logger = get_dist_logger() - rank = torch.distributed.get_rank() - - engine.train() - for img, label in train_dataloader: - engine.zero_grad() - img = img.cuda() - label = label.cuda() - output = engine(img) - loss = engine.criterion(output, label) - engine.backward(loss) - engine.step() - break - - logger.info('Rank {} returns: {}'.format(rank, loss.item())) - - gpc.destroy() - logger.info('Test engine finished') - report_memory_usage("After testing") - torch.cuda.empty_cache() - - -@pytest.mark.dist -def test_engine(): - world_size = 4 - run_func = partial(run_engine, world_size=world_size, port=free_port()) - mp.spawn(run_func, nprocs=world_size) - - -if __name__ == '__main__': - test_engine() diff --git a/tests/test_engine/test_engine/test_engine_naive_amp.py b/tests/test_engine/test_engine/test_engine_naive_amp.py deleted file mode 100644 index 95c620368..000000000 --- a/tests/test_engine/test_engine/test_engine_naive_amp.py +++ /dev/null @@ -1,109 +0,0 @@ -import os -from functools import partial -from pathlib import Path - -import colossalai -import pytest -import torch -import torch.multiprocessing as mp -import torch.nn as nn -from colossalai.amp import AMP_TYPE -from colossalai.core import global_context as gpc -from colossalai.logging import get_dist_logger -from colossalai.utils import free_port, get_dataloader, report_memory_usage -from torch.optim import Adam -from torchvision import transforms -from torchvision.datasets import CIFAR10 -from torchvision.models import resnet18 - -# Config -BATCH_SIZE = 128 -IMG_SIZE = 224 -DIM = 768 -NUM_CLASSES = 10 -NUM_ATTN_HEADS = 12 - -CONFIG = dict( - parallel=dict( - pipeline=dict(size=1), - tensor=dict(size=1, mode=None) - ), - fp16=dict( - mode=AMP_TYPE.NAIVE, - clip_grad=1.0 - ) -) - - -def run_engine(rank, world_size, port): - # init dist env - colossalai.launch( - config=CONFIG, - rank=rank, - world_size=world_size, - host='localhost', - port=port, - backend='nccl' - ) - - # build model - model = resnet18(num_classes=10) - - # build dataloaders - train_dataset = CIFAR10( - root=Path(os.environ['DATA']), - download=True, - transform=transforms.Compose( - [ - transforms.Resize(size=(IMG_SIZE, IMG_SIZE)), - transforms.ToTensor(), - transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)) - ] - ) - ) - train_dataloader = get_dataloader(dataset=train_dataset, - shuffle=True, - batch_size=BATCH_SIZE, - drop_last=True) - - # build optimizer - optimizer = Adam(model.parameters(), lr=0.001) - criterion = nn.CrossEntropyLoss() - - engine, train_dataloader, *args = colossalai.initialize( - model=model, - optimizer=optimizer, - criterion=criterion, - train_dataloader=train_dataloader - ) - logger = get_dist_logger() - rank = torch.distributed.get_rank() - - engine.train() - for img, label in train_dataloader: - engine.zero_grad() - img = img.cuda() - label = label.cuda() - output = engine(img) - loss = engine.criterion(output, label) - engine.backward(loss) - engine.step() - break - - logger.info('Rank {} returns: {}'.format(rank, loss.item())) - - gpc.destroy() - logger.info('Test engine finished') - report_memory_usage("After testing") - torch.cuda.empty_cache() - - -@pytest.mark.dist -def test_engine(): - world_size = 4 - run_func = partial(run_engine, world_size=world_size, port=free_port()) - mp.spawn(run_func, nprocs=world_size) - - -if __name__ == '__main__': - test_engine() diff --git a/tests/test_engine/test_engine/test_engine_no_amp.py b/tests/test_engine/test_engine/test_engine_no_amp.py deleted file mode 100644 index 13668e251..000000000 --- a/tests/test_engine/test_engine/test_engine_no_amp.py +++ /dev/null @@ -1,105 +0,0 @@ -import os -from functools import partial -from pathlib import Path - -import colossalai -import pytest -import torch -import torch.multiprocessing as mp -import torch.nn as nn -from colossalai.core import global_context as gpc -from colossalai.logging import get_dist_logger -from colossalai.utils import free_port, get_dataloader, report_memory_usage -from torch.optim import Adam -from torchvision import transforms -from torchvision.datasets import CIFAR10 -from torchvision.models import resnet18 - -# Config -BATCH_SIZE = 128 -IMG_SIZE = 224 -DIM = 768 -NUM_CLASSES = 10 -NUM_ATTN_HEADS = 12 - -CONFIG = dict( - parallel=dict( - pipeline=dict(size=1), - tensor=dict(size=1, mode=None) - ), - clip_grad_norm=1.0 -) - - -def run_engine(rank, world_size, port): - # init dist env - colossalai.launch( - config=CONFIG, - rank=rank, - world_size=world_size, - host='localhost', - port=port, - backend='nccl' - ) - - # build model - model = resnet18(num_classes=10) - - # build dataloaders - train_dataset = CIFAR10( - root=Path(os.environ['DATA']), - download=True, - transform=transforms.Compose( - [ - transforms.Resize(size=(IMG_SIZE, IMG_SIZE)), - transforms.ToTensor(), - transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)) - ] - ) - ) - train_dataloader = get_dataloader(dataset=train_dataset, - shuffle=True, - batch_size=BATCH_SIZE, - drop_last=True) - - # build optimizer - optimizer = Adam(model.parameters(), lr=0.001) - criterion = nn.CrossEntropyLoss() - - engine, train_dataloader, *args = colossalai.initialize( - model=model, - optimizer=optimizer, - criterion=criterion, - train_dataloader=train_dataloader - ) - logger = get_dist_logger() - rank = torch.distributed.get_rank() - - engine.train() - for img, label in train_dataloader: - engine.zero_grad() - img = img.cuda() - label = label.cuda() - output = engine(img) - loss = engine.criterion(output, label) - engine.backward(loss) - engine.step() - break - - logger.info('Rank {} returns: {}'.format(rank, loss.item())) - - gpc.destroy() - logger.info('Test engine finished') - report_memory_usage("After testing") - torch.cuda.empty_cache() - - -@pytest.mark.dist -def test_engine(): - world_size = 4 - run_func = partial(run_engine, world_size=world_size, port=free_port()) - mp.spawn(run_func, nprocs=world_size) - - -if __name__ == '__main__': - test_engine() diff --git a/tests/test_engine/test_engine/test_engine_torch_amp.py b/tests/test_engine/test_engine/test_engine_torch_amp.py deleted file mode 100644 index 435df81dc..000000000 --- a/tests/test_engine/test_engine/test_engine_torch_amp.py +++ /dev/null @@ -1,107 +0,0 @@ -import os -from functools import partial -from pathlib import Path - -import colossalai -import pytest -import torch -import torch.multiprocessing as mp -import torch.nn as nn -from colossalai.amp import AMP_TYPE -from colossalai.core import global_context as gpc -from colossalai.logging import get_dist_logger -from colossalai.utils import free_port, get_dataloader, report_memory_usage -from torch.optim import Adam -from torchvision import transforms -from torchvision.datasets import CIFAR10 -from torchvision.models import resnet18 - -# Config -BATCH_SIZE = 128 -IMG_SIZE = 224 -DIM = 768 -NUM_CLASSES = 10 -NUM_ATTN_HEADS = 12 - -CONFIG = dict( - parallel=dict( - pipeline=dict(size=1), - tensor=dict(size=1, mode=None) - ), - fp16=dict(mode=AMP_TYPE.TORCH), - clip_grad_norm=1.0 -) - - -def run_engine(rank, world_size, port): - # init dist env - colossalai.launch( - config=CONFIG, - rank=rank, - world_size=world_size, - host='localhost', - port=port, - backend='nccl' - ) - - # build model - model = resnet18(num_classes=10) - - # build dataloaders - train_dataset = CIFAR10( - root=Path(os.environ['DATA']), - download=True, - transform=transforms.Compose( - [ - transforms.Resize(size=(IMG_SIZE, IMG_SIZE)), - transforms.ToTensor(), - transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)) - ] - ) - ) - train_dataloader = get_dataloader(dataset=train_dataset, - shuffle=True, - batch_size=BATCH_SIZE, - drop_last=True) - - # build optimizer - optimizer = Adam(model.parameters(), lr=0.001) - criterion = nn.CrossEntropyLoss() - - engine, train_dataloader, *args = colossalai.initialize( - model=model, - optimizer=optimizer, - criterion=criterion, - train_dataloader=train_dataloader - ) - logger = get_dist_logger() - rank = torch.distributed.get_rank() - - engine.train() - for img, label in train_dataloader: - engine.zero_grad() - img = img.cuda() - label = label.cuda() - output = engine(img) - loss = engine.criterion(output, label) - engine.backward(loss) - engine.step() - break - - logger.info('Rank {} returns: {}'.format(rank, loss.item())) - - gpc.destroy() - logger.info('Test engine finished') - report_memory_usage("After testing") - torch.cuda.empty_cache() - - -@pytest.mark.dist -def test_engine(): - world_size = 4 - run_func = partial(run_engine, world_size=world_size, port=free_port()) - mp.spawn(run_func, nprocs=world_size) - - -if __name__ == '__main__': - test_engine() diff --git a/tests/test_engine/test_engine/test_param_hook.py b/tests/test_engine/test_param_hook.py similarity index 100% rename from tests/test_engine/test_engine/test_param_hook.py rename to tests/test_engine/test_param_hook.py