From cd9c28e05595e800dc14cf9d0cc2f0bcc842129e Mon Sep 17 00:00:00 2001 From: Frank Lee Date: Thu, 16 Dec 2021 10:32:08 +0800 Subject: [PATCH] added CI for unit testing (#69) --- .github/workflows/build.yml | 40 +++ colossalai/trainer/hooks/_log_hook.py | 2 +- tests/test_context/test_2d_init.py | 9 +- tests/test_context/test_2p5d_init.py | 4 +- tests/test_context/test_3d_init.py | 5 +- tests/test_data/test_data_parallel_sampler.py | 5 +- .../test_deterministic_dataloader.py | 6 +- .../run_cifar10_vit2d_with_pipeline.py | 5 +- tests/test_engine/test.sh | 4 - .../test_engine/test_engine_apex_amp.py | 26 +- .../test_engine/test_engine_naive_amp.py | 25 +- .../test_engine/test_engine_no_amp.py | 25 +- .../test_engine/test_engine_torch_amp.py | 25 +- tests/test_layers/test.sh | 4 - .../test_layers/test_1d/checks_1d/__init__.py | 0 .../check_layer_1d.py} | 3 +- .../test_1d/{ => checks_1d}/common.py | 0 tests/test_layers/test_1d/test_1d.py | 33 +-- .../test_layers/test_2d/checks_2d/__init__.py | 0 .../check_layer_2d.py} | 2 +- .../check_operation_2d.py} | 2 +- .../test_2d/{ => checks_2d}/common.py | 0 tests/test_layers/test_2d/test_2d.py | 33 ++- .../test_2p5d/checks_2p5d/__init__.py | 0 .../check_layer_2p5d.py} | 2 +- .../check_operation_2p5d.py} | 2 +- .../test_2p5d/{ => checks_2p5d}/common.py | 0 tests/test_layers/test_2p5d/test.sh | 3 - tests/test_layers/test_2p5d/test_2p5d.py | 37 +-- .../test_layers/test_3d/checks_3d/__init__.py | 0 .../{test_conn.py => checks_3d/check_conn.py} | 0 .../check_layer_3d.py} | 2 +- .../check_operation_3d.py} | 2 +- .../test_3d/{ => checks_3d}/common.py | 0 tests/test_layers/test_3d/test.sh | 22 -- tests/test_layers/test_3d/test_3d.py | 42 ++-- .../test_sequence/checks_seq/__init__.py | 0 .../check_layer_seq.py} | 0 .../test_sequence/test_sequence.py | 33 ++- .../configs/test_trainer_resnet.py | 19 -- .../configs/test_trainer_vit_2d.py | 133 ---------- tests/test_trainer/test.sh | 4 - .../test_pipeline/debug_schedule.py | 232 ------------------ .../test_pipeline/model/__init__.py | 2 + .../test_pipeline/model/layers/__init__.py | 3 + .../test_pipeline/model/layers/basic_block.py | 64 +++++ .../test_pipeline/model/layers/bottleneck.py | 69 ++++++ .../test_pipeline/model/layers/conv.py | 15 ++ .../test_pipeline/model/layers/reslayer.py | 63 +++++ .../test_pipeline/model/resnet.py | 163 ++++++++++++ .../test_pipeline/resnet_config.py | 19 ++ tests/test_trainer/test_pipeline/test_p2p.py | 43 ++-- .../test_pipeline/test_partition.py | 37 ++- .../test_pipeline/test_pipeline_schedule.py | 94 +++++++ .../test_pipeline/test_schedule.py | 51 ---- .../test_trainer_with_non_pipe_schedule.py | 36 +-- .../test_trainer_with_pipe_schedule.py | 41 ++-- .../test_activation_checkpointing.py | 1 + .../test_utils/test_gradient_accumluation.py | 7 +- tests/test_zero_data_parallel/config.py | 4 - tests/test_zero_data_parallel/test_zero.sh | 4 - .../test_zero_level_2.py | 102 ++++++++ .../{test_zero.py => test_zero_level_3.py} | 50 ++-- .../configs/vit_2d_zero2.py | 12 - .../configs/vit_2d_zero3.py | 12 - tests/test_zero_tensor_parallel/test.sh | 4 - ...{test_vit_2d.py => test_vit_2d_level_2.py} | 49 ++-- .../test_vit_2d_level_3.py | 119 +++++++++ 68 files changed, 1089 insertions(+), 766 deletions(-) create mode 100644 .github/workflows/build.yml delete mode 100644 tests/test_engine/test.sh delete mode 100644 tests/test_layers/test.sh create mode 100644 tests/test_layers/test_1d/checks_1d/__init__.py rename tests/test_layers/test_1d/{test_layer.py => checks_1d/check_layer_1d.py} (95%) rename tests/test_layers/test_1d/{ => checks_1d}/common.py (100%) create mode 100644 tests/test_layers/test_2d/checks_2d/__init__.py rename tests/test_layers/test_2d/{test_layer.py => checks_2d/check_layer_2d.py} (99%) rename tests/test_layers/test_2d/{test_operation.py => checks_2d/check_operation_2d.py} (99%) rename tests/test_layers/test_2d/{ => checks_2d}/common.py (100%) create mode 100644 tests/test_layers/test_2p5d/checks_2p5d/__init__.py rename tests/test_layers/test_2p5d/{test_layer.py => checks_2p5d/check_layer_2p5d.py} (99%) rename tests/test_layers/test_2p5d/{test_operation.py => checks_2p5d/check_operation_2p5d.py} (99%) rename tests/test_layers/test_2p5d/{ => checks_2p5d}/common.py (100%) delete mode 100644 tests/test_layers/test_2p5d/test.sh create mode 100644 tests/test_layers/test_3d/checks_3d/__init__.py rename tests/test_layers/test_3d/{test_conn.py => checks_3d/check_conn.py} (100%) rename tests/test_layers/test_3d/{test_layer.py => checks_3d/check_layer_3d.py} (99%) rename tests/test_layers/test_3d/{test_operation.py => checks_3d/check_operation_3d.py} (99%) rename tests/test_layers/test_3d/{ => checks_3d}/common.py (100%) delete mode 100644 tests/test_layers/test_3d/test.sh create mode 100644 tests/test_layers/test_sequence/checks_seq/__init__.py rename tests/test_layers/test_sequence/{test_layer.py => checks_seq/check_layer_seq.py} (100%) delete mode 100644 tests/test_trainer/configs/test_trainer_resnet.py delete mode 100644 tests/test_trainer/configs/test_trainer_vit_2d.py delete mode 100644 tests/test_trainer/test.sh delete mode 100644 tests/test_trainer/test_pipeline/debug_schedule.py create mode 100644 tests/test_trainer/test_pipeline/model/__init__.py create mode 100644 tests/test_trainer/test_pipeline/model/layers/__init__.py create mode 100644 tests/test_trainer/test_pipeline/model/layers/basic_block.py create mode 100644 tests/test_trainer/test_pipeline/model/layers/bottleneck.py create mode 100644 tests/test_trainer/test_pipeline/model/layers/conv.py create mode 100644 tests/test_trainer/test_pipeline/model/layers/reslayer.py create mode 100644 tests/test_trainer/test_pipeline/model/resnet.py create mode 100644 tests/test_trainer/test_pipeline/resnet_config.py create mode 100644 tests/test_trainer/test_pipeline/test_pipeline_schedule.py delete mode 100644 tests/test_trainer/test_pipeline/test_schedule.py delete mode 100644 tests/test_zero_data_parallel/config.py delete mode 100644 tests/test_zero_data_parallel/test_zero.sh create mode 100644 tests/test_zero_data_parallel/test_zero_level_2.py rename tests/test_zero_data_parallel/{test_zero.py => test_zero_level_3.py} (74%) delete mode 100644 tests/test_zero_tensor_parallel/configs/vit_2d_zero2.py delete mode 100644 tests/test_zero_tensor_parallel/configs/vit_2d_zero3.py delete mode 100644 tests/test_zero_tensor_parallel/test.sh rename tests/test_zero_tensor_parallel/{test_vit_2d.py => test_vit_2d_level_2.py} (77%) create mode 100644 tests/test_zero_tensor_parallel/test_vit_2d_level_3.py diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 000000000..a521551c4 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,40 @@ +name: Build +on: + pull_request: + types: [review_requested] + branches: + - "*" + +jobs: + build: + name: Build and test Colossal-AI + runs-on: [self-hosted, gpu] + container: + image: nvcr.io/nvidia/pytorch:21.07-py3 + options: --gpus all --rm --ipc=host -v /data/cifar-10:/data/cifar-10 + timeout-minutes: 1200 + if: github.event.pull_request.draft == false && github.base_ref == 'main' && github.event.pull_request.base.repo.full_name == 'hpcaitech/ColossalAI' + steps: + - name: Setup Environment + run: | + export https_proxy=http://172.17.0.1:7890 http_proxy=http://172.17.0.1:7890 all_proxy=socks5://172.17.0.1:7890 + - name: Install dependencies + run: | + python3 -m pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple + python3 -m pip install -U pip setuptools wheel --user + pip install pytest tensorboard deepspeed apex + - uses: actions/checkout@v2 + - name: Install Colossal-AI + run: | + pip install -v --no-cache-dir --global-option="--cuda_ext" . + - name: Unit Testing + run: | + pytest tests + env: + DATA: /data/cifar-10 + + + + + + diff --git a/colossalai/trainer/hooks/_log_hook.py b/colossalai/trainer/hooks/_log_hook.py index 8693dd515..bb82c1e5b 100644 --- a/colossalai/trainer/hooks/_log_hook.py +++ b/colossalai/trainer/hooks/_log_hook.py @@ -5,7 +5,6 @@ import os import os.path as osp import torch -from torch.utils.tensorboard import SummaryWriter from typing import List from decimal import Decimal from colossalai.context import ParallelMode @@ -100,6 +99,7 @@ class TensorboardHook(BaseHook): priority: int = 10, ) -> None: super().__init__(priority=priority) + from torch.utils.tensorboard import SummaryWriter # create log dir if not gpc.is_initialized(ParallelMode.GLOBAL) or gpc.get_global_rank() == 0: diff --git a/tests/test_context/test_2d_init.py b/tests/test_context/test_2d_init.py index d373964f8..3ad376750 100644 --- a/tests/test_context/test_2d_init.py +++ b/tests/test_context/test_2d_init.py @@ -1,15 +1,15 @@ #!/usr/bin/env python # -*- encoding: utf-8 -*- -from functools import partial -from pathlib import Path - import pytest +import torch import torch.multiprocessing as mp from colossalai import launch from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc +from functools import partial +from pathlib import Path CONFIG_PATH = Path(__file__).parent.joinpath('configs/parallel_2d_init.py').absolute() @@ -75,6 +75,7 @@ def init_2d(rank, world_size, backend, port, host): check_2d_parallel_rank(rank) check_pipeline_parallel_rank(rank) gpc.destroy() + torch.cuda.empty_cache() @pytest.mark.cpu @@ -86,7 +87,7 @@ def test_2d_init(): test_fn = partial(init_2d, world_size=world_size, backend='gloo', - port='29500', + port='29900', host='localhost' ) mp.spawn(test_fn, nprocs=world_size) diff --git a/tests/test_context/test_2p5d_init.py b/tests/test_context/test_2p5d_init.py index c071d86e7..1ce5f8ff4 100644 --- a/tests/test_context/test_2p5d_init.py +++ b/tests/test_context/test_2p5d_init.py @@ -5,6 +5,7 @@ from functools import partial from pathlib import Path import pytest +import torch import torch.multiprocessing as mp from colossalai.context.parallel_mode import ParallelMode @@ -98,6 +99,7 @@ def init_2halfd(rank, world_size, backend, port, host): check_tensor_parallel_rank(rank) check_2p5d_parallel_rank(rank) gpc.destroy() + torch.cuda.empty_cache() @pytest.mark.cpu @@ -109,7 +111,7 @@ def test_2halfd_init(): test_fn = partial(init_2halfd, world_size=world_size, backend='gloo', - port='29501', + port='29901', host='localhost' ) mp.spawn(test_fn, nprocs=world_size) diff --git a/tests/test_context/test_3d_init.py b/tests/test_context/test_3d_init.py index a1c48a9b7..5c66ab6a0 100644 --- a/tests/test_context/test_3d_init.py +++ b/tests/test_context/test_3d_init.py @@ -5,8 +5,10 @@ from functools import partial from pathlib import Path import pytest +import torch import torch.multiprocessing as mp + from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc from colossalai.initialize import launch @@ -90,6 +92,7 @@ def init_3d(rank, world_size, backend, port, host): check_data_parallel_rank(rank) check_pipeline_parallel_rank(rank) gpc.destroy() + torch.cuda.empty_cache() @pytest.mark.cpu @@ -101,7 +104,7 @@ def test_3d_init(): test_fn = partial(init_3d, world_size=world_size, backend='gloo', - port='29502', + port='29902', host='localhost' ) mp.spawn(test_fn, nprocs=world_size) diff --git a/tests/test_data/test_data_parallel_sampler.py b/tests/test_data/test_data_parallel_sampler.py index 2f2e275c4..18d3e1b35 100644 --- a/tests/test_data/test_data_parallel_sampler.py +++ b/tests/test_data/test_data_parallel_sampler.py @@ -6,7 +6,7 @@ from functools import partial from pathlib import Path import pytest -import torch.cuda +import torch import torch.distributed as dist import torch.multiprocessing as mp from torch.utils.data import DataLoader @@ -49,7 +49,7 @@ def run_data_sampler(rank, world_size): rank=rank, world_size=world_size, backend='gloo', - port='29503', + port='29903', host='localhost' ) colossalai.launch(**dist_args) @@ -73,6 +73,7 @@ def run_data_sampler(rank, world_size): if gpc.get_local_rank(ParallelMode.DATA) != 0: assert not torch.equal(img, img_to_compare), 'Same image was distributed across ranks but expected it to be different' + torch.cuda.empty_cache() @pytest.mark.cpu diff --git a/tests/test_data/test_deterministic_dataloader.py b/tests/test_data/test_deterministic_dataloader.py index 237c92b77..c96a3210f 100644 --- a/tests/test_data/test_deterministic_dataloader.py +++ b/tests/test_data/test_deterministic_dataloader.py @@ -6,7 +6,7 @@ from functools import partial from pathlib import Path import pytest -import torch.cuda +import torch import torch.distributed as dist import torch.multiprocessing as mp from torchvision import transforms @@ -52,11 +52,10 @@ def run_data_sampler(rank, world_size): rank=rank, world_size=world_size, backend='gloo', - port='29499', + port='29904', host='localhost' ) colossalai.launch(**dist_args) - print('finished initialization') dataset_cfg = gpc.config.train_data.dataset dataloader_cfg = gpc.config.train_data.dataloader @@ -88,6 +87,7 @@ def run_data_sampler(rank, world_size): # this should be false if data parallel sampler to given to the dataloader assert torch.equal(img, img_to_compare), 'Same image was distributed across ranks and expected it to be the same' + torch.cuda.empty_cache() @pytest.mark.cpu diff --git a/tests/test_data_pipeline_tensor_parallel/run_cifar10_vit2d_with_pipeline.py b/tests/test_data_pipeline_tensor_parallel/run_cifar10_vit2d_with_pipeline.py index 529fedf5a..2953dcb5b 100644 --- a/tests/test_data_pipeline_tensor_parallel/run_cifar10_vit2d_with_pipeline.py +++ b/tests/test_data_pipeline_tensor_parallel/run_cifar10_vit2d_with_pipeline.py @@ -1,3 +1,4 @@ +import pytest from pathlib import Path from colossalai.amp.amp_type import AMP_TYPE from colossalai.context.parallel_mode import ParallelMode @@ -34,7 +35,9 @@ CONFIG = dict( ) -def main(): +@pytest.mark.dist +@pytest.mark.skip("This test requires more than 8 GPUs, you should invoke this test script using test.sh provided manually") +def test_hybrid_parallel(): parser = colossalai.get_default_parser() args = parser.parse_args() colossalai.launch_from_slurm(config=CONFIG, diff --git a/tests/test_engine/test.sh b/tests/test_engine/test.sh deleted file mode 100644 index 0d90c8e55..000000000 --- a/tests/test_engine/test.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env sh -test_file=$1 - -python $test_file --world_size $SLURM_NPROCS --host $HOST --port 29500 --rank $SLURM_PROCID \ No newline at end of file diff --git a/tests/test_engine/test_engine/test_engine_apex_amp.py b/tests/test_engine/test_engine/test_engine_apex_amp.py index ff9c9f9bf..c8ee13de1 100644 --- a/tests/test_engine/test_engine/test_engine_apex_amp.py +++ b/tests/test_engine/test_engine/test_engine_apex_amp.py @@ -8,6 +8,7 @@ import torch import os.path as osp from pathlib import Path import torch.nn as nn +import torch.multiprocessing as mp from torchvision import transforms from torch.optim import Adam @@ -15,9 +16,9 @@ from colossalai.core import global_context as gpc from colossalai.amp import AMP_TYPE from colossalai.logging import get_dist_logger from colossalai.utils import report_memory_usage, get_dataloader -from colossalai.initialize import get_default_parser from torchvision.models import resnet18 from torchvision.datasets import CIFAR10 +from functools import partial # Config @@ -37,18 +38,15 @@ CONFIG = dict( ) -def run_no_pipeline(): - parser = get_default_parser() - args = parser.parse_args() - +def run_engine(rank, world_size): # init dist env colossalai.launch( config=CONFIG, - rank=args.rank, - world_size=args.world_size, - host=args.host, - port=args.port, - backend=args.backend + rank=rank, + world_size=world_size, + host='localhost', + port=29910, + backend='nccl' ) # build model @@ -69,8 +67,6 @@ def run_no_pipeline(): train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=BATCH_SIZE, - num_workers=1, - pin_memory=True, drop_last=True) # build optimizer @@ -102,12 +98,14 @@ def run_no_pipeline(): gpc.destroy() logger.info('Test engine finished') report_memory_usage("After testing") + torch.cuda.empty_cache() -@pytest.mark.skip("This test should be invoked using the test.sh provided") @pytest.mark.dist def test_engine(): - run_no_pipeline() + world_size = 4 + run_func = partial(run_engine, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': diff --git a/tests/test_engine/test_engine/test_engine_naive_amp.py b/tests/test_engine/test_engine/test_engine_naive_amp.py index dd75b9359..e60b0bbe9 100644 --- a/tests/test_engine/test_engine/test_engine_naive_amp.py +++ b/tests/test_engine/test_engine/test_engine_naive_amp.py @@ -5,6 +5,7 @@ import torch import os.path as osp from pathlib import Path import torch.nn as nn +import torch.multiprocessing as mp from torchvision import transforms from torch.optim import Adam @@ -15,6 +16,7 @@ from colossalai.utils import report_memory_usage, get_dataloader from colossalai.initialize import get_default_parser from torchvision.models import resnet18 from torchvision.datasets import CIFAR10 +from functools import partial # Config @@ -36,18 +38,15 @@ CONFIG = dict( ) -def run_no_pipeline(): - parser = get_default_parser() - args = parser.parse_args() - +def run_engine(rank, world_size): # init dist env colossalai.launch( config=CONFIG, - rank=args.rank, - world_size=args.world_size, - host=args.host, - port=args.port, - backend=args.backend + rank=rank, + world_size=world_size, + host='localhost', + port=29911, + backend='nccl' ) # build model @@ -68,8 +67,6 @@ def run_no_pipeline(): train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=BATCH_SIZE, - num_workers=1, - pin_memory=True, drop_last=True) # build optimizer @@ -101,12 +98,14 @@ def run_no_pipeline(): gpc.destroy() logger.info('Test engine finished') report_memory_usage("After testing") + torch.cuda.empty_cache() -@pytest.mark.skip("This test should be invoked using the test.sh provided") @pytest.mark.dist def test_engine(): - run_no_pipeline() + world_size = 4 + run_func = partial(run_engine, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': diff --git a/tests/test_engine/test_engine/test_engine_no_amp.py b/tests/test_engine/test_engine/test_engine_no_amp.py index f8392c98a..8bf0baaea 100644 --- a/tests/test_engine/test_engine/test_engine_no_amp.py +++ b/tests/test_engine/test_engine/test_engine_no_amp.py @@ -5,6 +5,7 @@ import torch import os.path as osp from pathlib import Path import torch.nn as nn +import torch.multiprocessing as mp from torchvision import transforms from torch.optim import Adam @@ -15,6 +16,7 @@ from colossalai.utils import report_memory_usage, get_dataloader from colossalai.initialize import get_default_parser from torchvision.models import resnet18 from torchvision.datasets import CIFAR10 +from functools import partial # Config @@ -33,18 +35,15 @@ CONFIG = dict( ) -def run_no_pipeline(): - parser = get_default_parser() - args = parser.parse_args() - +def run_engine(rank, world_size): # init dist env colossalai.launch( config=CONFIG, - rank=args.rank, - world_size=args.world_size, - host=args.host, - port=args.port, - backend=args.backend + rank=rank, + world_size=world_size, + host='localhost', + port=29912, + backend='nccl' ) # build model @@ -65,8 +64,6 @@ def run_no_pipeline(): train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=BATCH_SIZE, - num_workers=1, - pin_memory=True, drop_last=True) # build optimizer @@ -98,12 +95,14 @@ def run_no_pipeline(): gpc.destroy() logger.info('Test engine finished') report_memory_usage("After testing") + torch.cuda.empty_cache() -@pytest.mark.skip("This test should be invoked using the test.sh provided") @pytest.mark.dist def test_engine(): - run_no_pipeline() + world_size = 4 + run_func = partial(run_engine, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': diff --git a/tests/test_engine/test_engine/test_engine_torch_amp.py b/tests/test_engine/test_engine/test_engine_torch_amp.py index fdafd494c..289a8f1b6 100644 --- a/tests/test_engine/test_engine/test_engine_torch_amp.py +++ b/tests/test_engine/test_engine/test_engine_torch_amp.py @@ -5,6 +5,7 @@ import torch import os.path as osp from pathlib import Path import torch.nn as nn +import torch.multiprocessing as mp from torchvision import transforms from torch.optim import Adam @@ -15,6 +16,7 @@ from colossalai.utils import report_memory_usage, get_dataloader from colossalai.initialize import get_default_parser from torchvision.models import resnet18 from torchvision.datasets import CIFAR10 +from functools import partial # Config @@ -34,18 +36,15 @@ CONFIG = dict( ) -def run_no_pipeline(): - parser = get_default_parser() - args = parser.parse_args() - +def run_engine(rank, world_size): # init dist env colossalai.launch( config=CONFIG, - rank=args.rank, - world_size=args.world_size, - host=args.host, - port=args.port, - backend=args.backend + rank=rank, + world_size=world_size, + host='localhost', + port=29913, + backend='nccl' ) # build model @@ -66,8 +65,6 @@ def run_no_pipeline(): train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=BATCH_SIZE, - num_workers=1, - pin_memory=True, drop_last=True) # build optimizer @@ -99,12 +96,14 @@ def run_no_pipeline(): gpc.destroy() logger.info('Test engine finished') report_memory_usage("After testing") + torch.cuda.empty_cache() -@pytest.mark.skip("This test should be invoked using the test.sh provided") @pytest.mark.dist def test_engine(): - run_no_pipeline() + world_size = 4 + run_func = partial(run_engine, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': diff --git a/tests/test_layers/test.sh b/tests/test_layers/test.sh deleted file mode 100644 index da5afd5ae..000000000 --- a/tests/test_layers/test.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env sh -test_file=$1 - -python $test_file --rank $SLURM_PROCID --world_size $SLURM_NPROCS --host $HOST --port 29500 \ No newline at end of file diff --git a/tests/test_layers/test_1d/checks_1d/__init__.py b/tests/test_layers/test_1d/checks_1d/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_layers/test_1d/test_layer.py b/tests/test_layers/test_1d/checks_1d/check_layer_1d.py similarity index 95% rename from tests/test_layers/test_1d/test_layer.py rename to tests/test_layers/test_1d/checks_1d/check_layer_1d.py index 682a4257a..33b0ed68b 100644 --- a/tests/test_layers/test_1d/test_layer.py +++ b/tests/test_layers/test_1d/checks_1d/check_layer_1d.py @@ -1,4 +1,3 @@ -from tests.test_layers.test_3d.common import IMG_SIZE import torch import torch.distributed as dist from torch.nn import Parameter @@ -7,7 +6,7 @@ from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc from colossalai.nn import Linear1D_Col, Linear1D_Row, TransformerMLP1D, TransformerSelfAttention1D, ViTMLP1D, ViTSelfAttention1D, ViTPatchEmbedding1D, ViTHead1D, ViTTokenFuser1D from colossalai.utils import get_current_device, print_rank_0 -from common import HIDDEN_SIZE, DEPTH, BATCH_SIZE, SEQ_LENGTH, NUM_CLASSES, check_equal, IMG_SIZE +from .common import HIDDEN_SIZE, DEPTH, BATCH_SIZE, SEQ_LENGTH, NUM_CLASSES, check_equal, IMG_SIZE def check_linear_col(): diff --git a/tests/test_layers/test_1d/common.py b/tests/test_layers/test_1d/checks_1d/common.py similarity index 100% rename from tests/test_layers/test_1d/common.py rename to tests/test_layers/test_1d/checks_1d/common.py diff --git a/tests/test_layers/test_1d/test_1d.py b/tests/test_layers/test_1d/test_1d.py index 533376999..00ba3c4eb 100644 --- a/tests/test_layers/test_1d/test_1d.py +++ b/tests/test_layers/test_1d/test_1d.py @@ -2,10 +2,13 @@ # -*- encoding: utf-8 -*- import pytest +import torch +import torch.multiprocessing as mp from colossalai.core import global_context as gpc from colossalai.initialize import launch, get_default_parser -from test_layer import * +from functools import partial +from checks_1d.check_layer_1d import * CONFIG = dict( parallel=dict( @@ -18,8 +21,14 @@ CONFIG = dict( ) -def check_layer(): - # print_rank_0('start check_linear_col') +def check_layer(rank, world_size): + launch(config=CONFIG, + rank=rank, + world_size=world_size, + host='localhost', + port=29920, + backend='nccl') + check_linear_col() check_linear_row() check_attention() @@ -28,21 +37,15 @@ def check_layer(): check_embed() check_head() + gpc.destroy() + torch.cuda.empty_cache() + @pytest.mark.dist -@pytest.mark.skip("This test should be invoked by test.sh in the same folder as it runs on multiple gpus") def test_1d(): - parser = get_default_parser() - args = parser.parse_args() - launch(config=CONFIG, - rank=args.rank, - world_size=args.world_size, - host=args.host, - port=args.port, - backend=args.backend) - - check_layer() - gpc.destroy() + world_size = 2 + run_func = partial(check_layer, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': diff --git a/tests/test_layers/test_2d/checks_2d/__init__.py b/tests/test_layers/test_2d/checks_2d/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_layers/test_2d/test_layer.py b/tests/test_layers/test_2d/checks_2d/check_layer_2d.py similarity index 99% rename from tests/test_layers/test_2d/test_layer.py rename to tests/test_layers/test_2d/checks_2d/check_layer_2d.py index b8404a488..c913ecc6b 100644 --- a/tests/test_layers/test_2d/test_layer.py +++ b/tests/test_layers/test_2d/checks_2d/check_layer_2d.py @@ -5,7 +5,7 @@ from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc from colossalai.nn import Linear2D, LayerNorm2D, TransformerSelfAttention2D, TransformerMLP2D, TransformerLayer2D from colossalai.utils import get_current_device, print_rank_0 -from common import HIDDEN_SIZE, DEPTH, BATCH_SIZE, SEQ_LENGTH, check_equal +from .common import HIDDEN_SIZE, DEPTH, BATCH_SIZE, SEQ_LENGTH, check_equal def check_linear(): diff --git a/tests/test_layers/test_2d/test_operation.py b/tests/test_layers/test_2d/checks_2d/check_operation_2d.py similarity index 99% rename from tests/test_layers/test_2d/test_operation.py rename to tests/test_layers/test_2d/checks_2d/check_operation_2d.py index 74772a837..64abad146 100644 --- a/tests/test_layers/test_2d/test_operation.py +++ b/tests/test_layers/test_2d/checks_2d/check_operation_2d.py @@ -8,7 +8,7 @@ from colossalai.core import global_context as gpc from colossalai.nn.layer.parallel_2d import Matmul_AB_2D, Matmul_ABT_2D, Matmul_ATB_2D from colossalai.utils import get_current_device from colossalai.utils import print_rank_0 -from common import check_equal, BATCH_SIZE, SEQ_LENGTH, HIDDEN_SIZE, DEPTH +from .common import check_equal, BATCH_SIZE, SEQ_LENGTH, HIDDEN_SIZE, DEPTH def check_AB(): diff --git a/tests/test_layers/test_2d/common.py b/tests/test_layers/test_2d/checks_2d/common.py similarity index 100% rename from tests/test_layers/test_2d/common.py rename to tests/test_layers/test_2d/checks_2d/common.py diff --git a/tests/test_layers/test_2d/test_2d.py b/tests/test_layers/test_2d/test_2d.py index f1b683b9f..05b445458 100644 --- a/tests/test_layers/test_2d/test_2d.py +++ b/tests/test_layers/test_2d/test_2d.py @@ -2,11 +2,15 @@ # -*- encoding: utf-8 -*- import pytest +import torch +import torch.multiprocessing as mp from colossalai.core import global_context as gpc from colossalai.initialize import launch, get_default_parser -from test_layer import check_linear, check_layernorm, check_attention, check_mlp, check_transformerlayer -from test_operation import check_AB, check_ABT, check_ATB +from checks_2d.check_layer_2d import check_linear, check_layernorm, check_attention, check_mlp, check_transformerlayer +from checks_2d.check_operation_2d import check_AB, check_ABT, check_ATB +from functools import partial + CONFIG = dict( parallel=dict( @@ -33,20 +37,25 @@ def check_layer(): check_transformerlayer() -@pytest.mark.dist -@pytest.mark.skip("This test should be invoked by test.sh in the same folder as it runs on multiple gpus") -def test_2d(): - parser = get_default_parser() - args = parser.parse_args() +def check_layer_and_operation(rank, world_size): launch(config=CONFIG, - rank=args.rank, - world_size=args.world_size, - host=args.host, - port=args.port, - backend=args.backend) + rank=rank, + world_size=world_size, + host='localhost', + port=29921, + backend='nccl') + check_operations() check_layer() gpc.destroy() + torch.cuda.empty_cache() + + +@pytest.mark.dist +def test_2d(): + world_size = 4 + run_func = partial(check_layer_and_operation, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': diff --git a/tests/test_layers/test_2p5d/checks_2p5d/__init__.py b/tests/test_layers/test_2p5d/checks_2p5d/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_layers/test_2p5d/test_layer.py b/tests/test_layers/test_2p5d/checks_2p5d/check_layer_2p5d.py similarity index 99% rename from tests/test_layers/test_2p5d/test_layer.py rename to tests/test_layers/test_2p5d/checks_2p5d/check_layer_2p5d.py index ffe4678b9..c1e5bfb5a 100644 --- a/tests/test_layers/test_2p5d/test_layer.py +++ b/tests/test_layers/test_2p5d/checks_2p5d/check_layer_2p5d.py @@ -6,7 +6,7 @@ from colossalai.nn import (Linear2p5D, LayerNorm2p5D, TransformerSelfAttention2p TransformerLayer2p5D) from colossalai.utils import get_current_device from colossalai.utils import print_rank_0 -from common import * +from .common import * def check_linear(): diff --git a/tests/test_layers/test_2p5d/test_operation.py b/tests/test_layers/test_2p5d/checks_2p5d/check_operation_2p5d.py similarity index 99% rename from tests/test_layers/test_2p5d/test_operation.py rename to tests/test_layers/test_2p5d/checks_2p5d/check_operation_2p5d.py index 2342db3bb..f2b7ffe17 100644 --- a/tests/test_layers/test_2p5d/test_operation.py +++ b/tests/test_layers/test_2p5d/checks_2p5d/check_operation_2p5d.py @@ -6,7 +6,7 @@ from colossalai.nn.layer.parallel_2p5d._operation import Matmul_AB_2p5D, Matmul_ Matmul_ATB_2p5D from colossalai.utils import get_current_device from colossalai.utils import print_rank_0 -from common import * +from .common import * def check_AB(): diff --git a/tests/test_layers/test_2p5d/common.py b/tests/test_layers/test_2p5d/checks_2p5d/common.py similarity index 100% rename from tests/test_layers/test_2p5d/common.py rename to tests/test_layers/test_2p5d/checks_2p5d/common.py diff --git a/tests/test_layers/test_2p5d/test.sh b/tests/test_layers/test_2p5d/test.sh deleted file mode 100644 index 3eb567435..000000000 --- a/tests/test_layers/test_2p5d/test.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -python -m torch.distributed.launch test_2p5d.py --nproc_per_node 8 --host $HOST --port 29516 --world_size 8 diff --git a/tests/test_layers/test_2p5d/test_2p5d.py b/tests/test_layers/test_2p5d/test_2p5d.py index bad2a9a04..ae9f02ac2 100644 --- a/tests/test_layers/test_2p5d/test_2p5d.py +++ b/tests/test_layers/test_2p5d/test_2p5d.py @@ -1,9 +1,13 @@ import pytest +import torch +import torch.multiprocessing as mp from colossalai.core import global_context as gpc -from colossalai.initialize import launch, get_default_parser -from test_layer import check_linear, check_layernorm, check_attention, check_mlp, check_transformerlayer -from test_operation import check_AB, check_ABT, check_ATB +from colossalai.initialize import launch +from checks_2p5d.check_layer_2p5d import check_linear, check_layernorm, check_attention, check_mlp, check_transformerlayer +from checks_2p5d.check_operation_2p5d import check_AB, check_ABT, check_ATB +from functools import partial + CONFIG = dict( parallel=dict( @@ -27,20 +31,25 @@ def check_layer(): check_transformerlayer() -@pytest.mark.dist -@pytest.mark.skip("This test should be invoked by test.sh in the same folder as it runs on multiple gpus") -def test_2p5d(): - parser = get_default_parser() - args = parser.parse_args() +def check_layer_and_operation(rank, world_size): launch(config=CONFIG, - rank=args.rank, - world_size=args.world_size, - host=args.host, - port=args.port, - backend=args.backend) - check_layer() + rank=rank, + world_size=world_size, + host='localhost', + port=29922, + backend='nccl') + check_operations() + check_layer() gpc.destroy() + torch.cuda.empty_cache() + + +@pytest.mark.dist +def test_2p5d(): + world_size = 8 + run_func = partial(check_layer_and_operation, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': diff --git a/tests/test_layers/test_3d/checks_3d/__init__.py b/tests/test_layers/test_3d/checks_3d/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_layers/test_3d/test_conn.py b/tests/test_layers/test_3d/checks_3d/check_conn.py similarity index 100% rename from tests/test_layers/test_3d/test_conn.py rename to tests/test_layers/test_3d/checks_3d/check_conn.py diff --git a/tests/test_layers/test_3d/test_layer.py b/tests/test_layers/test_3d/checks_3d/check_layer_3d.py similarity index 99% rename from tests/test_layers/test_3d/test_layer.py rename to tests/test_layers/test_3d/checks_3d/check_layer_3d.py index 92720e42c..164fbfa92 100644 --- a/tests/test_layers/test_3d/test_layer.py +++ b/tests/test_layers/test_3d/checks_3d/check_layer_3d.py @@ -13,7 +13,7 @@ from colossalai.utils import get_current_device, print_rank_0 from colossalai.nn.layer.parallel_3d._utils import get_parallel_mode_from_env from colossalai.constants import INPUT_GROUP_3D, WEIGHT_GROUP_3D, OUTPUT_GROUP_3D -from common import * +from .common import * def check_linear(): diff --git a/tests/test_layers/test_3d/test_operation.py b/tests/test_layers/test_3d/checks_3d/check_operation_3d.py similarity index 99% rename from tests/test_layers/test_3d/test_operation.py rename to tests/test_layers/test_3d/checks_3d/check_operation_3d.py index a0c34432c..02509fc5f 100644 --- a/tests/test_layers/test_3d/test_operation.py +++ b/tests/test_layers/test_3d/checks_3d/check_operation_3d.py @@ -7,7 +7,7 @@ from colossalai.logging import get_dist_logger from colossalai.nn.layer.parallel_3d._operation import * from colossalai.utils import get_current_device -from common import * +from .common import * def check_AB(): diff --git a/tests/test_layers/test_3d/common.py b/tests/test_layers/test_3d/checks_3d/common.py similarity index 100% rename from tests/test_layers/test_3d/common.py rename to tests/test_layers/test_3d/checks_3d/common.py diff --git a/tests/test_layers/test_3d/test.sh b/tests/test_layers/test_3d/test.sh deleted file mode 100644 index 80edf05bb..000000000 --- a/tests/test_layers/test_3d/test.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/bash - -python -m torch.distributed.launch test_2d.py --nproc_per_node 8 test_3d.py --host $HOST --port 29516 --world_size 8 - -# expected test output -# distributed environment initialized -# AB forward: pass -# AB backward: pass -# ABT forward: pass -# ABT backward: pass -# ATB forward: pass -# ATB backward: pass -# linear backward: pass -# linear backward: pass -# layer norm forward: pass -# layer norm backward: pass -# self attention forward: pass -# self attention backward: pass -# mlp forward: pass -# mlp backward: pass -# transformerlayer forward: pass -# transformerlayer backward: pass \ No newline at end of file diff --git a/tests/test_layers/test_3d/test_3d.py b/tests/test_layers/test_3d/test_3d.py index b05fc672a..277ff22b7 100644 --- a/tests/test_layers/test_3d/test_3d.py +++ b/tests/test_layers/test_3d/test_3d.py @@ -1,11 +1,14 @@ #!/usr/bin/env python # -*- encoding: utf-8 -*- - +import pytest +import torch +import torch.multiprocessing as mp from colossalai.initialize import launch, get_default_parser -from test_layer import * -from test_operation import * +from checks_3d.check_layer_3d import * +from checks_3d.check_operation_3d import * from colossalai.logging import get_dist_logger +from functools import partial CONFIG = dict(parallel=dict(pipeline=1, tensor=dict(mode='3d', size=8)), seed=0) @@ -38,26 +41,25 @@ def check_layer(): ranks=[0]) -def _test_main(): - # init dist - parser = get_default_parser() - args = parser.parse_args() +def check_layer_and_operation(rank, world_size): launch(config=CONFIG, - rank=args.rank, - world_size=args.world_size, - host=args.host, - port=args.port, - backend=args.backend) - logger = get_dist_logger() - logger.info('Distributed environment is initialzied.', ranks=[0]) - torch.backends.cudnn.benchmark = True + rank=rank, + world_size=world_size, + host='localhost', + port=29923, + backend='nccl') - # check operation - # check_operations() - - # check layers check_layer() + gpc.destroy() + torch.cuda.empty_cache() + + +@pytest.mark.dist +def test_3d(): + world_size = 8 + run_func = partial(check_layer_and_operation, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': - _test_main() + test_3d() diff --git a/tests/test_layers/test_sequence/checks_seq/__init__.py b/tests/test_layers/test_sequence/checks_seq/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_layers/test_sequence/test_layer.py b/tests/test_layers/test_sequence/checks_seq/check_layer_seq.py similarity index 100% rename from tests/test_layers/test_sequence/test_layer.py rename to tests/test_layers/test_sequence/checks_seq/check_layer_seq.py diff --git a/tests/test_layers/test_sequence/test_sequence.py b/tests/test_layers/test_sequence/test_sequence.py index 64a42a653..56148e673 100644 --- a/tests/test_layers/test_sequence/test_sequence.py +++ b/tests/test_layers/test_sequence/test_sequence.py @@ -1,9 +1,14 @@ #!/usr/bin/env python # -*- encoding: utf-8 -*- +import pytest +import torch +import torch.multiprocessing as mp from colossalai.initialize import launch, get_default_parser from colossalai.logging import get_dist_logger -from test_layer import * +from checks_seq.check_layer_seq import * +from functools import partial + CONFIG = dict( parallel=dict( @@ -17,24 +22,28 @@ def check_layer(): check_selfattention() -def _test_main(): +def run_check_sequence(rank, world_size): # init dist - parser = get_default_parser() - args = parser.parse_args() launch(config=CONFIG, - rank=args.rank, - world_size=args.world_size, - host=args.host, - port=args.port, - backend=args.backend) + rank=rank, + world_size=world_size, + host='localhost', + port=29924, + backend='nccl') logger = get_dist_logger() logger.info('Distributed environment is initialzied.', ranks=[0]) - torch.backends.cudnn.benchmark = True - # check layers check_layer() + torch.cuda.empty_cache() + + +@pytest.mark.dist +def test_sequence(): + world_size = 4 + run_func = partial(run_check_sequence, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': - _test_main() + test_sequence() diff --git a/tests/test_trainer/configs/test_trainer_resnet.py b/tests/test_trainer/configs/test_trainer_resnet.py deleted file mode 100644 index bd69dc475..000000000 --- a/tests/test_trainer/configs/test_trainer_resnet.py +++ /dev/null @@ -1,19 +0,0 @@ -import os -from pathlib import Path - - -hooks = [ - dict(type='LogMetricByEpochHook'), - dict(type='AccuracyHook'), - dict(type='LossHook'), - dict(type='TensorboardHook', log_dir='./tfb_logs'), - dict( - type='LRSchedulerHook', - by_epoch=True, - lr_scheduler_cfg=dict( - type='CosineAnnealingLR', - warmup_steps=5 - ) - ), - dict(type='SaveCheckpointHook', interval=5, checkpoint_dir='./ckpt'), -] diff --git a/tests/test_trainer/configs/test_trainer_vit_2d.py b/tests/test_trainer/configs/test_trainer_vit_2d.py deleted file mode 100644 index 1769f4afe..000000000 --- a/tests/test_trainer/configs/test_trainer_vit_2d.py +++ /dev/null @@ -1,133 +0,0 @@ -import os -from pathlib import Path - -from colossalai.engine import AMP_TYPE - -BATCH_SIZE = 512 -IMG_SIZE = 32 -PATCH_SIZE = 4 -DIM = 512 -NUM_ATTENTION_HEADS = 8 -SUMMA_DIM = 2 -NUM_CLASSES = 10 -DEPTH = 6 -num_epochs = 60 - -train_data = dict( - dataset=dict(type='CIFAR10Dataset', - root=Path(os.environ['DATA']), - transform_pipeline=[ - dict(type='Resize', size=IMG_SIZE), - dict(type='RandomCrop', size=IMG_SIZE, padding=4), - dict(type='RandomHorizontalFlip'), - dict(type='ToTensor'), - dict(type='Normalize', - mean=[0.4914, 0.4822, 0.4465], - std=[0.2023, 0.1994, 0.2010]), - ]), - dataloader=dict( - batch_size=BATCH_SIZE, - pin_memory=True, - # num_workers=1, - shuffle=True, - )) - -test_data = dict( - dataset=dict(type='CIFAR10Dataset', - root=Path(os.environ['DATA']), - train=False, - transform_pipeline=[ - dict(type='Resize', size=IMG_SIZE), - dict(type='ToTensor'), - dict(type='Normalize', - mean=[0.4914, 0.4822, 0.4465], - std=[0.2023, 0.1994, 0.2010]), - ]), - dataloader=dict( - batch_size=400, - pin_memory=True, - # num_workers=1, - )) - -optimizer = dict(type='Adam', lr=0.001, weight_decay=0) - -loss = dict(type='CrossEntropyLoss2D', ) - -model = dict( - type='VisionTransformerFromConfig', - tensor_splitting_cfg=dict(type='ViTInputSplitter2D', ), - embedding_cfg=dict( - type='ViTPatchEmbedding2D', - img_size=IMG_SIZE, - patch_size=PATCH_SIZE, - embed_dim=DIM, - ), - token_fusion_cfg=dict(type='ViTTokenFuser2D', - img_size=IMG_SIZE, - patch_size=PATCH_SIZE, - embed_dim=DIM, - drop_rate=0.1), - norm_cfg=dict( - type='LayerNorm2D', - normalized_shape=DIM, - eps=1e-6, - ), - block_cfg=dict( - type='ViTBlock', - attention_cfg=dict( - type='ViTSelfAttention2D', - hidden_size=DIM, - num_attention_heads=NUM_ATTENTION_HEADS, - attention_dropout_prob=0., - hidden_dropout_prob=0.1, - ), - droppath_cfg=dict(type='VanillaViTDropPath', ), - mlp_cfg=dict(type='ViTMLP2D', - in_features=DIM, - dropout_prob=0.1, - mlp_ratio=1), - norm_cfg=dict( - type='LayerNorm2D', - normalized_shape=DIM, - eps=1e-6, - ), - ), - head_cfg=dict( - type='ViTHead2D', - hidden_size=DIM, - num_classes=NUM_CLASSES, - ), - embed_dim=DIM, - depth=DEPTH, - drop_path_rate=0., -) - -hooks = [ - dict(type='LogMetricByEpochHook'), - dict(type='LogTimingByEpochHook'), - dict(type='Accuracy2DHook'), - dict(type='LossHook'), - dict(type='TensorboardHook', log_dir='./tfb_logs'), - dict( - type='LRSchedulerHook', - by_epoch=True, - lr_scheduler_cfg=dict( - type='LinearWarmupLR', - warmup_steps=5 - ) - ), - dict(type='SaveCheckpointHook', interval=5, checkpoint_dir='./ckpt'), -] - -parallel = dict( - pipeline=dict(size=1), - tensor=dict(size=4, mode='2d'), -) - -fp16 = dict(mode=AMP_TYPE.PARALLEL, initial_scale=2 ** 8) - -engine = dict( - schedule=dict(num_microbatches=1) -) - -logging = dict(root_path='./logs') diff --git a/tests/test_trainer/test.sh b/tests/test_trainer/test.sh deleted file mode 100644 index fa0ae78d5..000000000 --- a/tests/test_trainer/test.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env sh -test_file=$1 - -python $test_file --rank $SLURM_PROCID --world_size $SLURM_NPROCS --host $HOST --port 29500 diff --git a/tests/test_trainer/test_pipeline/debug_schedule.py b/tests/test_trainer/test_pipeline/debug_schedule.py deleted file mode 100644 index ea3799dfd..000000000 --- a/tests/test_trainer/test_pipeline/debug_schedule.py +++ /dev/null @@ -1,232 +0,0 @@ -# referenced from Megatron and used to testify communication -import os.path as osp - -import pytest -import torch -from torch.utils.data import DataLoader - -from colossalai.builder import ModelInitializer, build_dataset, build_optimizer, build_loss -from colossalai.communication import p2p as p2p_communication -from colossalai.communication.utils import send_tensor_meta, recv_tensor_meta -from colossalai.context.parallel_mode import ParallelMode -from colossalai.core import global_context as gpc -from colossalai.initialize import initialize -from colossalai.utils import print_rank_0, get_current_device - -NUM_BATCH = 128 -NUM_MICRO = 6 - - -def get_num_microbatches(): - return NUM_MICRO - - -def to_cuda(data): - if isinstance(data, (tuple, list)): - data = data[0].to(get_current_device()) - else: - data = data.to(get_current_device()) - return data - - -def step_func(loss): - def _step_func(input_tensor, model): - output = model(input_tensor) - if isinstance(output, (tuple, list)): - if len(output) > 1: - raise NotImplementedError("Multiple output!!!") - else: - output = output[0] - return output, loss - - return _step_func - - -def forward_step(forward_step_func, data_iterator, model, input_tensor, losses_reduced): - """Forward step for passed-in model. - If first stage, input tensor is obtained from data_iterator, otherwise - passed-in input_tensor is used. - Returns output tensor.""" - - if input_tensor is None: - data, label = data_iterator.next() - input_tensor = to_cuda(data) - - output_tensor, loss_func = forward_step_func(input_tensor, model) - if gpc.is_last_rank(ParallelMode.PIPELINE): - data, label = data_iterator.next() - label = to_cuda(label) - output_tensor = loss_func(output_tensor, label) / get_num_microbatches() - losses_reduced.append(output_tensor) - - return output_tensor - - -def backward_step(optimizer, input_tensor, output_tensor, output_tensor_grad): - """Backward step through passed-in output tensor. - If last stage, output_tensor_grad is None, otherwise gradient of loss - with respect to stage's output tensor. - Returns gradient of loss with respect to input tensor (None if first - stage).""" - - # Retain the grad on the input_tensor. - if input_tensor is not None: - input_tensor.retain_grad() - - # Backward pass. - torch.autograd.backward(output_tensor, grad_tensors=output_tensor_grad) - - # Collect the grad of the input_tensor. - input_tensor_grad = None - if input_tensor is not None: - input_tensor_grad = input_tensor.grad - - return input_tensor_grad - - -def forward_backward_pipelining_without_interleaving(forward_step_func, data_iterator, - model, optimizer, forward_only): - """Run non-interleaved 1F1B schedule, with communication between pipeline - stages. - Returns dictionary with losses if the last stage, empty dict otherwise.""" - - # Compute number of warmup microbatches. - num_microbatches = get_num_microbatches() - num_warmup_microbatches = \ - (gpc.get_world_size(ParallelMode.PIPELINE) - - gpc.get_local_rank(ParallelMode.PIPELINE) - 1) - num_warmup_microbatches = min( - num_warmup_microbatches, - num_microbatches) - num_microbatches_remaining = \ - num_microbatches - num_warmup_microbatches - - # Input, output tensors only need to be saved when doing backward passes - input_tensors = None - output_tensors = None - if not forward_only: - input_tensors = [] - output_tensors = [] - losses_reduced = [] - - # Used for tensor meta information communication - ft_shape = None - bt_shape = None - fs_checker = True - - # Run warmup forward passes. - for i in range(num_warmup_microbatches): - if not gpc.is_first_rank(ParallelMode.PIPELINE): - ft_shape = recv_tensor_meta(ft_shape) - input_tensor = p2p_communication.recv_forward(ft_shape) - output_tensor = forward_step(forward_step_func, data_iterator, model, - input_tensor, losses_reduced) - if not gpc.is_last_rank(ParallelMode.PIPELINE): - bt_shape = output_tensor.shape - fs_checker = send_tensor_meta(output_tensor, fs_checker) - p2p_communication.send_forward(output_tensor) - - if not forward_only: - input_tensors.append(input_tensor) - output_tensors.append(output_tensor) - - # Before running 1F1B, need to receive first forward tensor. - # If all microbatches are run in warmup / cooldown phase, then no need to - # receive this tensor here. - if num_microbatches_remaining > 0: - if not gpc.is_first_rank(ParallelMode.PIPELINE): - ft_shape = recv_tensor_meta(ft_shape) - input_tensor = p2p_communication.recv_forward(ft_shape) - - # Run 1F1B in steady state. - for i in range(num_microbatches_remaining): - last_iteration = (i == (num_microbatches_remaining - 1)) - - output_tensor = forward_step(forward_step_func, data_iterator, model, - input_tensor, losses_reduced) - if forward_only: - p2p_communication.send_forward(output_tensor) - - if not last_iteration: - input_tensor = p2p_communication.recv_forward(ft_shape) - - else: - output_tensor_grad = \ - p2p_communication.send_forward_recv_backward(output_tensor, bt_shape) - - # Add input_tensor and output_tensor to end of list. - input_tensors.append(input_tensor) - output_tensors.append(output_tensor) - - # Pop input_tensor and output_tensor from the start of the list for - # the backward pass. - input_tensor = input_tensors.pop(0) - output_tensor = output_tensors.pop(0) - - input_tensor_grad = \ - backward_step(optimizer, input_tensor, output_tensor, - output_tensor_grad) - - if last_iteration: - input_tensor = None - p2p_communication.send_backward(input_tensor_grad) - else: - input_tensor = \ - p2p_communication.send_backward_recv_forward(input_tensor_grad, ft_shape) - - # Run cooldown backward passes. - if not forward_only: - for i in range(num_warmup_microbatches): - input_tensor = input_tensors.pop(0) - output_tensor = output_tensors.pop(0) - - output_tensor_grad = p2p_communication.recv_backward(bt_shape) - - input_tensor_grad = \ - backward_step(optimizer, input_tensor, output_tensor, - output_tensor_grad) - - p2p_communication.send_backward(input_tensor_grad) - - return losses_reduced - - -DIR_PATH = osp.dirname(osp.realpath(__file__)) -CONFIG_PATH = osp.join(DIR_PATH, '../configs/pipeline_vanilla_vit.py') - - -@pytest.mark.skip(reason="This is only for debugging purpose, please ignore this test") -@pytest.mark.dist -def test_schedule(): - initialize(CONFIG_PATH) - - # build model - model = ModelInitializer(gpc.config.model, 1).model_initialize() - print_rank_0('model is created') - - # keep the same sampler for all process - torch.manual_seed(1331) - - dataset = build_dataset(gpc.config.data.dataset) - dataloader = DataLoader(dataset=dataset, **gpc.config.data.dataloader) - print_rank_0('train data is created') - - # build optimizer and loss - optim = build_optimizer(gpc.config.optimizer, model) - loss = build_loss(gpc.config.loss) - print_rank_0('optim and loss is created') - - forward_backward_pipelining_without_interleaving( - step_func(loss), - iter(dataloader), - model, - optim, - False - ) - - gpc.destroy() - print_rank_0('training finished') - - -if __name__ == '__main__': - test_schedule() diff --git a/tests/test_trainer/test_pipeline/model/__init__.py b/tests/test_trainer/test_pipeline/model/__init__.py new file mode 100644 index 000000000..2bf880f41 --- /dev/null +++ b/tests/test_trainer/test_pipeline/model/__init__.py @@ -0,0 +1,2 @@ +from .layers import * +from .resnet import VanillaResNet diff --git a/tests/test_trainer/test_pipeline/model/layers/__init__.py b/tests/test_trainer/test_pipeline/model/layers/__init__.py new file mode 100644 index 000000000..aa553b737 --- /dev/null +++ b/tests/test_trainer/test_pipeline/model/layers/__init__.py @@ -0,0 +1,3 @@ +from .basic_block import ResNetBasicBlock +from .bottleneck import ResNetBottleneck +from .reslayer import ResLayer \ No newline at end of file diff --git a/tests/test_trainer/test_pipeline/model/layers/basic_block.py b/tests/test_trainer/test_pipeline/model/layers/basic_block.py new file mode 100644 index 000000000..320dac2fd --- /dev/null +++ b/tests/test_trainer/test_pipeline/model/layers/basic_block.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +from typing import Optional, Callable + +import torch.nn as nn +from torch import Tensor + +from colossalai.registry import LAYERS +from .conv import conv3x3 + + +@LAYERS.register_module +class ResNetBasicBlock(nn.Module): + """Basic ResNet block + """ + expansion: int = 1 + + def __init__( + self, + inplanes: int, + planes: int, + stride: int = 1, + downsample: Optional[nn.Module] = None, + groups: int = 1, + base_width: int = 64, + dilation: int = 1, + norm_layer: Optional[Callable[..., nn.Module]] = None + ) -> None: + super().__init__() + if norm_layer is None: + norm_layer = nn.BatchNorm2d + if groups != 1 or base_width != 64: + raise ValueError( + 'BasicBlock only supports groups=1 and base_width=64') + if dilation > 1: + raise NotImplementedError( + "Dilation > 1 not supported in BasicBlock") + # Both self.conv1 and self.downsample layers downsample the input when stride != 1 + self.conv1 = conv3x3(inplanes, planes, stride) + self.bn1 = norm_layer(planes) + self.relu = nn.ReLU(inplace=True) + self.conv2 = conv3x3(planes, planes) + self.bn2 = norm_layer(planes) + self.downsample = downsample + self.stride = stride + + def forward(self, x: Tensor) -> Tensor: + identity = x + + out = self.conv1(x) + out = self.bn1(out) + out = self.relu(out) + + out = self.conv2(out) + out = self.bn2(out) + + if self.downsample is not None: + identity = self.downsample(x) + + out += identity + out = self.relu(out) + + return out diff --git a/tests/test_trainer/test_pipeline/model/layers/bottleneck.py b/tests/test_trainer/test_pipeline/model/layers/bottleneck.py new file mode 100644 index 000000000..d75f9534b --- /dev/null +++ b/tests/test_trainer/test_pipeline/model/layers/bottleneck.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +from typing import Optional, Callable + +import torch.nn as nn +from torch import Tensor + +from colossalai.registry import LAYERS +from .conv import conv3x3, conv1x1 + + +@LAYERS.register_module +class ResNetBottleneck(nn.Module): + # Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2) + # while original implementation places the stride at the first 1x1 convolution(self.conv1) + # according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385. + # This variant is also known as ResNet V1.5 and improves accuracy according to + # https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch. + + expansion: int = 4 + + def __init__( + self, + inplanes: int, + planes: int, + stride: int = 1, + downsample: Optional[nn.Module] = None, + groups: int = 1, + base_width: int = 64, + dilation: int = 1, + norm_layer: Optional[Callable[..., nn.Module]] = None + ) -> None: + super().__init__() + if norm_layer is None: + norm_layer = nn.BatchNorm2d + width = int(planes * (base_width / 64.)) * groups + # Both self.conv2 and self.downsample layers downsample the input when stride != 1 + self.conv1 = conv1x1(inplanes, width) + self.bn1 = norm_layer(width) + self.conv2 = conv3x3(width, width, stride, groups, dilation) + self.bn2 = norm_layer(width) + self.conv3 = conv1x1(width, planes * self.expansion) + self.bn3 = norm_layer(planes * self.expansion) + self.relu = nn.ReLU(inplace=True) + self.downsample = downsample + self.stride = stride + + def forward(self, x: Tensor) -> Tensor: + identity = x + + out = self.conv1(x) + out = self.bn1(out) + out = self.relu(out) + + out = self.conv2(out) + out = self.bn2(out) + out = self.relu(out) + + out = self.conv3(out) + out = self.bn3(out) + + if self.downsample is not None: + identity = self.downsample(x) + + out += identity + out = self.relu(out) + + return out diff --git a/tests/test_trainer/test_pipeline/model/layers/conv.py b/tests/test_trainer/test_pipeline/model/layers/conv.py new file mode 100644 index 000000000..c918d94c4 --- /dev/null +++ b/tests/test_trainer/test_pipeline/model/layers/conv.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +import torch.nn as nn + + +def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1) -> nn.Conv2d: + """3x3 convolution with padding""" + return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, + padding=dilation, groups=groups, bias=False, dilation=dilation) + + +def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d: + """1x1 convolution""" + return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False) diff --git a/tests/test_trainer/test_pipeline/model/layers/reslayer.py b/tests/test_trainer/test_pipeline/model/layers/reslayer.py new file mode 100644 index 000000000..4e1b48c5e --- /dev/null +++ b/tests/test_trainer/test_pipeline/model/layers/reslayer.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +import torch.nn as nn + +from colossalai.registry import LAYERS +from .conv import conv1x1 + + +@LAYERS.register_module +class ResLayer(nn.Module): + + def __init__(self, + block_type: str, + norm_layer_type: str, + inplanes: int, + planes: int, + blocks: int, + groups: int, + base_width: int, + stride: int = 1, + dilation: int = 1, + dilate: bool = False, + ): + super().__init__() + self.block = LAYERS.get_module(block_type) + self.norm_layer = LAYERS.get_module(norm_layer_type) + self.inplanes = inplanes + self.planes = planes + self.blocks = blocks + self.groups = groups + self.dilation = dilation + self.base_width = base_width + self.dilate = dilate + self.stride = stride + self.layer = self._make_layer() + + def _make_layer(self): + norm_layer = self.norm_layer + downsample = None + previous_dilation = self.dilation + if self.dilate: + self.dilation *= self.stride + self.stride = 1 + if self.stride != 1 or self.inplanes != self.planes * self.block.expansion: + downsample = nn.Sequential( + conv1x1(self.inplanes, self.planes * self.block.expansion, self.stride), + norm_layer(self.planes * self.block.expansion), + ) + + layers = [] + layers.append(self.block(self.inplanes, self.planes, self.stride, downsample, self.groups, + self.base_width, previous_dilation, norm_layer)) + self.inplanes = self.planes * self.block.expansion + for _ in range(1, self.blocks): + layers.append(self.block(self.inplanes, self.planes, groups=self.groups, + base_width=self.base_width, dilation=self.dilation, + norm_layer=norm_layer)) + + return nn.Sequential(*layers) + + def forward(self, x): + return self.layer(x) diff --git a/tests/test_trainer/test_pipeline/model/resnet.py b/tests/test_trainer/test_pipeline/model/resnet.py new file mode 100644 index 000000000..ffb158ecc --- /dev/null +++ b/tests/test_trainer/test_pipeline/model/resnet.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +from typing import List, Optional + +import torch +import torch.nn as nn +from torch import Tensor + +from colossalai.registry import LAYERS +from colossalai.registry import MODELS +from colossalai.nn.model import ModelFromConfig + + +@MODELS.register_module +class VanillaResNet(ModelFromConfig): + """ResNet from + `"Deep Residual Learning for Image Recognition" `_. + """ + + def __init__( + self, + num_cls: int, + block_type: str, + layers: List[int], + norm_layer_type: str = 'BatchNorm2d', + in_channels: int = 3, + groups: int = 1, + width_per_group: int = 64, + zero_init_residual: bool = False, + replace_stride_with_dilation: Optional[List[bool]] = None, + dilations=(1, 1, 1, 1) + ) -> None: + super().__init__() + + self.inplanes = 64 + self.zero_init_residual = zero_init_residual + self.blocks = layers + self.block_expansion = LAYERS.get_module(block_type).expansion + self.dilations = dilations + self.reslayer_common_cfg = dict( + type='ResLayer', + block_type=block_type, + norm_layer_type=norm_layer_type, + groups=groups, + base_width=width_per_group + ) + + if replace_stride_with_dilation is None: + # each element in the tuple indicates if we should replace + # the 2x2 stride with a dilated convolution instead + replace_stride_with_dilation = [False, False, False] + + if len(replace_stride_with_dilation) != 3: + raise ValueError("replace_stride_with_dilation should be None " + "or a 3-element tuple, got {}".format(replace_stride_with_dilation)) + + self.layers_cfg = [ + # conv1 + dict(type='Conv2d', + in_channels=in_channels, + out_channels=self.inplanes, + kernel_size=7, + stride=2, + padding=3, + bias=False), + # bn1 + dict( + type=norm_layer_type, + num_features=self.inplanes + ), + # relu + dict( + type='ReLU', + inplace=True + ), + # maxpool + dict( + type='MaxPool2d', + kernel_size=3, + stride=2, + padding=1 + ), + # layer 1 + dict( + inplanes=self.inplanes, + planes=64, + blocks=self.blocks[0], + dilation=self.dilations[0], + **self.reslayer_common_cfg + ), + # layer 2 + dict( + inplanes=64 * self.block_expansion, + planes=128, + blocks=self.blocks[1], + stride=2, + dilate=replace_stride_with_dilation[0], + dilation=self.dilations[1], + **self.reslayer_common_cfg + ), + # layer 3 + dict( + inplanes=128 * self.block_expansion, + planes=256, + blocks=layers[2], + stride=2, + dilate=replace_stride_with_dilation[1], + dilation=self.dilations[2], + **self.reslayer_common_cfg + ), + # layer 4 + dict( + inplanes=256 * self.block_expansion, + planes=512, + blocks=layers[3], stride=2, + dilate=replace_stride_with_dilation[2], + dilation=self.dilations[3], + **self.reslayer_common_cfg + ), + # avg pool + dict( + type='AdaptiveAvgPool2d', + output_size=(1, 1) + ), + # flatten + dict( + type='LambdaWrapper', + func=lambda mod, x: torch.flatten(x, 1) + ), + # linear + dict( + type='Linear', + in_features=512 * self.block_expansion, + out_features=num_cls + ) + ] + + def forward(self, x: Tensor): + for layer in self.layers: + x = layer(x) + return x, + + def init_weights(self): + for m in self.modules(): + if isinstance(m, nn.Conv2d): + nn.init.kaiming_normal_( + m.weight, mode='fan_out', nonlinearity='relu') + elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): + nn.init.constant_(m.weight, 1) + nn.init.constant_(m.bias, 0) + + # Zero-initialize the last BN in each residual branch, + # so that the residual branch starts with zeros, and each residual block behaves like an identity. + # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677 + if self.zero_init_residual: + for m in self.modules(): + if isinstance(m, LAYERS.get_module('ResNetBottleneck')): + # type: ignore[arg-type] + nn.init.constant_(m.bn3.weight, 0) + elif isinstance(m, LAYERS.get_module('ResNetBasicBlock')): + # type: ignore[arg-type] + nn.init.constant_(m.bn2.weight, 0) diff --git a/tests/test_trainer/test_pipeline/resnet_config.py b/tests/test_trainer/test_pipeline/resnet_config.py new file mode 100644 index 000000000..b0bcc3860 --- /dev/null +++ b/tests/test_trainer/test_pipeline/resnet_config.py @@ -0,0 +1,19 @@ +import os +from pathlib import Path + +BATCH_SIZE = 128 +IMG_SIZE = 224 +DIM = 768 +NUM_CLASSES = 10 +NUM_ATTN_HEADS = 12 + +# resnet 18 +model = dict(type='VanillaResNet', + block_type='ResNetBasicBlock', + layers=[2, 2, 2, 2], + num_cls=10) + +parallel = dict( + pipeline=dict(size=4), + tensor=dict(size=1, mode=None) +) diff --git a/tests/test_trainer/test_pipeline/test_p2p.py b/tests/test_trainer/test_pipeline/test_p2p.py index 39cfa1003..ce60955ae 100644 --- a/tests/test_trainer/test_pipeline/test_p2p.py +++ b/tests/test_trainer/test_pipeline/test_p2p.py @@ -4,6 +4,7 @@ import pytest import torch import torch.distributed as dist +import torch.multiprocessing as mp from colossalai.communication import (recv_backward, recv_forward, recv_tensor_meta, send_backward, @@ -12,13 +13,14 @@ from colossalai.communication import (recv_backward, recv_forward, send_tensor_meta) from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc -from colossalai.initialize import init_dist, parse_args +from colossalai.initialize import launch from colossalai.logging import get_dist_logger from colossalai.utils import get_current_device +from functools import partial -BATCH_SIZE = 32 -SEQ_LENGTH = 128 -HIDDEN_SIZE = 512 +BATCH_SIZE = 16 +SEQ_LENGTH = 64 +HIDDEN_SIZE = 128 CONFIG = dict( parallel=dict( @@ -106,7 +108,7 @@ def check_op(size, rank, prev_rank, next_rank, up_group, down_group, logger): rank, check_equal(tensor, out))) -def test_comm(size, rank, prev_rank, next_rank, up_group, down_group, logger): +def check_comm(size, rank, prev_rank, next_rank, up_group, down_group, logger): dtype = torch.float32 device = get_current_device() tensor_shape = (BATCH_SIZE, SEQ_LENGTH, HIDDEN_SIZE) @@ -121,13 +123,15 @@ def test_comm(size, rank, prev_rank, next_rank, up_group, down_group, logger): check_forward_backward(tensor, grad, rank, logger) -@pytest.mark.skip("This test should be invoked using the test.sh provided") -@pytest.mark.dist -def test_main(): - args = parse_args() - world_size = args.world_size - - init_dist(CONFIG) +def run_check(rank, world_size): + launch( + config=CONFIG, + rank=rank, + world_size=world_size, + host='localhost', + port=29932, + backend='nccl' + ) logger = get_dist_logger() rank = gpc.get_global_rank() prev_rank = gpc.get_prev_global_rank(ParallelMode.PIPELINE) @@ -141,9 +145,18 @@ def test_main(): rank, prev_rank, up_ranks, next_rank, down_ranks)) logger.info('Distributed environment is initialzied.') - test_comm(world_size, rank, prev_rank, next_rank, up_group, down_group, - logger) + check_comm(world_size, rank, prev_rank, next_rank, up_group, down_group, + logger) + gpc.destroy() + torch.cuda.empty_cache() + + +@pytest.mark.dist +def test_p2p(): + world_size = 4 + run_func = partial(run_check, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': - test_main() + test_p2p() diff --git a/tests/test_trainer/test_pipeline/test_partition.py b/tests/test_trainer/test_pipeline/test_partition.py index d3c811657..9335a197b 100644 --- a/tests/test_trainer/test_pipeline/test_partition.py +++ b/tests/test_trainer/test_pipeline/test_partition.py @@ -2,35 +2,46 @@ import os.path as osp import pytest import torch +import torch.multiprocessing as mp from torch.utils.data import DataLoader -from colossalai.builder import build_dataset, ModelInitializer +from colossalai.builder.pipeline import PipelineModelInitializer from colossalai.core import global_context -from colossalai.initialize import init_dist +from colossalai.initialize import launch from colossalai.logging import get_dist_logger +from functools import partial +import model DIR_PATH = osp.dirname(osp.realpath(__file__)) -CONFIG_PATH = osp.join(DIR_PATH, '../configs/pipeline_vanilla_resnet.py') +CONFIG_PATH = osp.join(DIR_PATH, 'resnet_config.py') -@pytest.mark.skip("This test should be invoked using the test.sh provided") -@pytest.mark.dist -def test_partition(): - init_dist(CONFIG_PATH) +def run_partition(rank, world_size): + launch(config=CONFIG_PATH, + rank=rank, + world_size=world_size, + host='localhost', + port=29933, + backend='nccl' + ) logger = get_dist_logger() logger.info('finished initialization') # build model - model = ModelInitializer(global_context.config.model, 1, verbose=True).model_initialize() + model = PipelineModelInitializer(global_context.config.model, 1, verbose=True).initialize() + assert isinstance(model, torch.nn.Module) logger.info('model is created') - dataset = build_dataset(global_context.config.train_data.dataset) - dataloader = DataLoader(dataset=dataset, **global_context.config.train_data.dataloader) - logger.info('train data is created') - global_context.destroy() - torch.cuda.synchronize() logger.info('training finished') + torch.cuda.empty_cache() + + +@pytest.mark.dist +def test_partition(): + world_size = 4 + run_func = partial(run_partition, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': diff --git a/tests/test_trainer/test_pipeline/test_pipeline_schedule.py b/tests/test_trainer/test_pipeline/test_pipeline_schedule.py new file mode 100644 index 000000000..5247fe7f0 --- /dev/null +++ b/tests/test_trainer/test_pipeline/test_pipeline_schedule.py @@ -0,0 +1,94 @@ +# referenced from Megatron and used to testify communication + +import colossalai +import os +import os.path as osp +import pytest +import torch +import torch.multiprocessing as mp +import model + +from colossalai.builder import PipelineModelInitializer +from colossalai.communication import p2p as p2p_communication +from colossalai.communication.utils import send_tensor_meta, recv_tensor_meta +from colossalai.context.parallel_mode import ParallelMode +from colossalai.core import global_context as gpc +from colossalai.initialize import launch +from colossalai.utils import print_rank_0, get_current_device, get_dataloader +from colossalai.engine.schedule import PipelineSchedule +from torchvision.datasets import CIFAR10 +from torchvision import transforms +from pathlib import Path +from functools import partial + + +BATCH_SIZE = 32 +NUM_MICRO = 8 + + +DIR_PATH = osp.dirname(osp.realpath(__file__)) +CONFIG_PATH = osp.join(DIR_PATH, './resnet_config.py') + + +def run_schedule(rank, world_size): + launch(config=CONFIG_PATH, + rank=rank, + world_size=world_size, + host='localhost', + port=29934, + backend='nccl') + + # build model + model = PipelineModelInitializer(gpc.config.model, 1).initialize() + print_rank_0('model is created') + + train_dataset = CIFAR10( + root=Path(os.environ['DATA']), + download=True, + transform=transforms.Compose( + [ + transforms.RandomCrop(size=32, padding=4), + transforms.RandomHorizontalFlip(), + transforms.ToTensor(), + transforms.Normalize(mean=[0.4914, 0.4822, 0.4465], std=[ + 0.2023, 0.1994, 0.2010]), + ] + ) + ) + + train_dataloader = get_dataloader(dataset=train_dataset, + shuffle=True, + add_sampler=True, + batch_size=BATCH_SIZE, + pin_memory=True, + ) + + # build criterion + criterion = torch.nn.CrossEntropyLoss() + + # optimizer + optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=0) + + # initialize + engine, train_dataloader, _, _ = colossalai.initialize(model, optimizer, criterion, train_dataloader) + + # build pipeline schedule + schedule = PipelineSchedule(num_microbatches=NUM_MICRO) + + # run schedule + data_iter = iter(train_dataloader) + schedule.forward_backward_step(engine, data_iter) + + gpc.destroy() + torch.cuda.empty_cache() + + +@pytest.mark.dist +def test_pipeline_schedule(): + world_size = 4 + run_func = partial(run_schedule, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) + + +if __name__ == '__main__': + test_pipeline_schedule() diff --git a/tests/test_trainer/test_pipeline/test_schedule.py b/tests/test_trainer/test_pipeline/test_schedule.py deleted file mode 100644 index 7e2f32017..000000000 --- a/tests/test_trainer/test_pipeline/test_schedule.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -import os.path as osp - -import pytest - -from colossalai.context import ParallelMode -from colossalai.core import global_context as gpc -from colossalai.initialize import initialize -from colossalai.logging import get_dist_logger - -NUM_BATCH = 128 - -BATCH_SIZE = 32 -SEQ_LENGTH = 128 -HIDDEN_SIZE = 512 - -DIR_PATH = osp.dirname(osp.realpath(__file__)) -CONFIG_PATH = osp.join(DIR_PATH, '../configs/pipeline_vanilla_resnet.py') - - -@pytest.mark.skip("This test should be invoked using the test.sh provided") -@pytest.mark.dist -def test_schedule(): - engine, train_dataloader, test_dataloader = initialize(CONFIG_PATH) - logger = get_dist_logger() - - model = engine.model - optimizer = engine.optimizer - criterion = engine.criterion - schedule = engine._schedule - - output, label, loss = schedule.forward_backward_step( - data_iter=iter(train_dataloader), - model=model, - optimizer=optimizer, - criterion=criterion, - forward_only=False - ) - schedule.optimizer_step(model, optimizer) - - if gpc.is_last_rank(ParallelMode.PIPELINE): - logger.info('losses: {}'.format(loss)) - - gpc.destroy() - logger.info('training finished') - - -if __name__ == '__main__': - test_schedule() diff --git a/tests/test_trainer/test_trainer_with_non_pipe_schedule.py b/tests/test_trainer/test_trainer_with_non_pipe_schedule.py index 170f38087..ff9d334e4 100644 --- a/tests/test_trainer/test_trainer_with_non_pipe_schedule.py +++ b/tests/test_trainer/test_trainer_with_non_pipe_schedule.py @@ -1,20 +1,23 @@ import colossalai import os -from colossalai.amp.amp_type import AMP_TYPE +import pytest +import torch import torch.nn as nn +import torch.multiprocessing as mp from pathlib import Path from torchvision import transforms from torch.optim import Adam -from colossalai.initialize import get_default_parser +from colossalai.amp.amp_type import AMP_TYPE from colossalai.core import global_context as gpc from colossalai.logging import get_dist_logger from colossalai.trainer import Trainer from colossalai.utils import get_dataloader from torchvision.models import resnet18 from torchvision.datasets import CIFAR10 +from functools import partial -BATCH_SIZE = 128 +BATCH_SIZE = 16 IMG_SIZE = 32 NUM_EPOCHS = 200 @@ -26,16 +29,14 @@ CONFIG = dict( ) -def test_trainer(): - parser = get_default_parser() - args = parser.parse_args() +def run_trainer_no_pipeline(rank, world_size): colossalai.launch( config=CONFIG, - rank=args.rank, - world_size=args.world_size, - host=args.host, - port=args.port, - backend=args.backend + rank=rank, + world_size=world_size, + host='localhost', + port=29930, + backend='nccl' ) # build model @@ -70,13 +71,11 @@ def test_trainer(): train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=BATCH_SIZE, - num_workers=1, pin_memory=True, drop_last=True) test_dataloader = get_dataloader(dataset=test_dataset, batch_size=BATCH_SIZE, - num_workers=1, pin_memory=True, drop_last=True) @@ -107,7 +106,16 @@ def test_trainer(): display_progress=True, test_interval=5 ) + gpc.destroy() + torch.cuda.empty_cache() + + +@pytest.mark.dist +def test_trainer_no_pipeline(): + world_size = 4 + run_func = partial(run_trainer_no_pipeline, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': - test_trainer() + test_trainer_no_pipeline() diff --git a/tests/test_trainer/test_trainer_with_pipe_schedule.py b/tests/test_trainer/test_trainer_with_pipe_schedule.py index 63a22f6ec..b43f14585 100644 --- a/tests/test_trainer/test_trainer_with_pipe_schedule.py +++ b/tests/test_trainer/test_trainer_with_pipe_schedule.py @@ -1,14 +1,14 @@ import colossalai import os +import pytest import torch -from colossalai.amp.amp_type import AMP_TYPE -from colossalai.context.parallel_mode import ParallelMode import torch.nn as nn +import torch.multiprocessing as mp from pathlib import Path from torchvision import transforms from torch.optim import Adam -from colossalai.initialize import get_default_parser +from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc from colossalai.logging import get_dist_logger from colossalai.trainer import Trainer @@ -16,8 +16,10 @@ from colossalai.utils import get_dataloader from colossalai.engine.schedule import PipelineSchedule from torchvision.models import resnet18 from torchvision.datasets import CIFAR10 +from functools import partial -BATCH_SIZE = 32 + +BATCH_SIZE = 16 IMG_SIZE = 32 NUM_EPOCHS = 200 @@ -25,23 +27,17 @@ CONFIG = dict( parallel=dict( pipeline=2, ), - # Config - fp16=dict( - mode=AMP_TYPE.TORCH - ) ) -def test_trainer(): - parser = get_default_parser() - args = parser.parse_args() +def run_trainer_with_pipeline(rank, world_size): colossalai.launch( config=CONFIG, - rank=args.rank, - world_size=args.world_size, - host=args.host, - port=args.port, - backend=args.backend + rank=rank, + world_size=world_size, + host='localhost', + port=29931, + backend='nccl' ) # build model @@ -101,13 +97,11 @@ def test_trainer(): train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=BATCH_SIZE, - num_workers=1, pin_memory=True, drop_last=True) test_dataloader = get_dataloader(dataset=test_dataset, batch_size=BATCH_SIZE, - num_workers=1, pin_memory=True, drop_last=True) @@ -140,7 +134,16 @@ def test_trainer(): display_progress=True, test_interval=5 ) + gpc.destroy() + torch.cuda.empty_cache() + + +@pytest.mark.dist +def test_trainer_with_pipeline(): + world_size = 4 + run_func = partial(run_trainer_with_pipeline, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': - test_trainer() + test_trainer_with_pipeline() diff --git a/tests/test_utils/test_activation_checkpointing.py b/tests/test_utils/test_activation_checkpointing.py index 667b7c337..1adc548fb 100644 --- a/tests/test_utils/test_activation_checkpointing.py +++ b/tests/test_utils/test_activation_checkpointing.py @@ -54,6 +54,7 @@ def test_activation_checkpointing(): loss.backward() assert torch.all(data.grad == data_.grad), 'Gradient of the input does not match' + torch.cuda.empty_cache() if __name__ == '__main__': diff --git a/tests/test_utils/test_gradient_accumluation.py b/tests/test_utils/test_gradient_accumluation.py index 4f7ccd09b..6a709f7db 100644 --- a/tests/test_utils/test_gradient_accumluation.py +++ b/tests/test_utils/test_gradient_accumluation.py @@ -104,13 +104,14 @@ def run_no_pipeline(rank, world_size): 'param should be the same in the first few iterations and only changed in the last iteration' gpc.destroy() + torch.cuda.empty_cache() -@pytest.mark.skip("This test should be invoked using the test.sh provided") @pytest.mark.dist def test_engine(): - func = partial(run_no_pipeline, world_size=4) - mp.spawn(func, nprocs=4) + world_size = 4 + func = partial(run_no_pipeline, world_size=world_size) + mp.spawn(func, nprocs=world_size) if __name__ == '__main__': diff --git a/tests/test_zero_data_parallel/config.py b/tests/test_zero_data_parallel/config.py deleted file mode 100644 index 8e263505b..000000000 --- a/tests/test_zero_data_parallel/config.py +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- -import os -from pathlib import Path diff --git a/tests/test_zero_data_parallel/test_zero.sh b/tests/test_zero_data_parallel/test_zero.sh deleted file mode 100644 index c1effa2d1..000000000 --- a/tests/test_zero_data_parallel/test_zero.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash - -test_file="test_zero.py" -python $test_file --rank $SLURM_PROCID --world_size $SLURM_NPROCS --host $HOST --port 29500 \ No newline at end of file diff --git a/tests/test_zero_data_parallel/test_zero_level_2.py b/tests/test_zero_data_parallel/test_zero_level_2.py new file mode 100644 index 000000000..5da282255 --- /dev/null +++ b/tests/test_zero_data_parallel/test_zero_level_2.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +import os +import pytest +import torch +import torch.multiprocessing as mp +from pathlib import Path + +import colossalai +from colossalai.core import global_context as gpc +from colossalai.utils import get_dataloader +from torchvision import transforms +from torchvision.models import resnet18 +from torchvision.datasets import CIFAR10 +from functools import partial + +BATCH_SIZE = 16 +IMG_SIZE = 224 + +CONFIG = dict( + fp16=dict( + mode=None, + ), + zero=dict( + level=2, + cpu_offload=True, + verbose=False, + ), + parallel=dict( + pipeline=dict(size=1), + tensor=dict(size=1, mode=None) + ) +) + + +def run_dist(rank, world_size): + colossalai.launch(config=CONFIG, + rank=rank, + world_size=world_size, + host='localhost', + port=29940, + backend='nccl') + + # build model + model = resnet18(num_classes=10) + + # build dataloader# build dataloaders + train_dataset = CIFAR10( + root=Path(os.environ['DATA']), + download=True, + transform=transforms.Compose( + [ + transforms.Resize(size=(IMG_SIZE, IMG_SIZE)), + transforms.ToTensor(), + transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)) + ] + ) + ) + train_dataloader = get_dataloader(dataset=train_dataset, + shuffle=True, + batch_size=BATCH_SIZE, + pin_memory=True, + drop_last=True) + + # build optimizer and loss + # optimizer = build_optimizer(global_context.config.optimizer, model) + optimizer = torch.optim.Adam(model.parameters(), lr=0.001) + criterion = torch.nn.CrossEntropyLoss() + + engine, train_dataloader, *args = colossalai.initialize(model=model, + optimizer=optimizer, + criterion=criterion, + train_dataloader=train_dataloader) + + # train + model.train() + for idx, (data, label) in enumerate(train_dataloader): + engine.zero_grad() + data = data.cuda() + label = label.cuda() + + output = engine(data) + loss = engine.criterion(output, label) + + engine.backward(loss) + engine.step() + break + + gpc.destroy() + torch.cuda.empty_cache() + + +@pytest.mark.dist +def test_zero_level_2(): + world_size = 4 + run_func = partial(run_dist, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) + + +if __name__ == '__main__': + test_zero_level_2() diff --git a/tests/test_zero_data_parallel/test_zero.py b/tests/test_zero_data_parallel/test_zero_level_3.py similarity index 74% rename from tests/test_zero_data_parallel/test_zero.py rename to tests/test_zero_data_parallel/test_zero_level_3.py index 6331a9a2b..e202b31e4 100644 --- a/tests/test_zero_data_parallel/test_zero.py +++ b/tests/test_zero_data_parallel/test_zero_level_3.py @@ -4,36 +4,25 @@ import os import pytest import torch - +import torch.multiprocessing as mp from pathlib import Path import colossalai -from colossalai.initialize import get_default_parser from colossalai.core import global_context as gpc from colossalai.utils import get_dataloader from torchvision import transforms from torchvision.models import resnet18 from torchvision.datasets import CIFAR10 +from functools import partial -BATCH_SIZE = 128 +BATCH_SIZE = 16 IMG_SIZE = 224 -NUM_CLS = 1000 CONFIG = dict( fp16=dict( mode=None, ), zero=dict( - # ============== - # level 2 config - # ============== - # level=2, - # cpu_offload=True, - # verbose=False, - - # ============== - # level 3 config - # ============== level=3, verbose=False, offload_optimizer_config=dict( @@ -57,16 +46,13 @@ CONFIG = dict( ) -def run_dist(): - parser = get_default_parser() - args = parser.parse_args() - +def run_dist(rank, world_size): colossalai.launch(config=CONFIG, - rank=args.rank, - world_size=args.world_size, - host=args.host, - port=args.port, - backend=args.backend) + rank=rank, + world_size=world_size, + host='localhost', + port=29941, + backend='nccl') # build model model = resnet18(num_classes=10) @@ -86,7 +72,6 @@ def run_dist(): train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=BATCH_SIZE, - num_workers=1, pin_memory=True, drop_last=True) @@ -104,22 +89,27 @@ def run_dist(): model.train() for idx, (data, label) in enumerate(train_dataloader): engine.zero_grad() - data = data.cuda() + data = data.cuda().half() label = label.cuda() - output = engine(data) + output = engine(data).float() loss = engine.criterion(output, label) engine.backward(loss) engine.step() break + gpc.destroy() + torch.cuda.empty_cache() + -@pytest.mark.skip("This test should be invoked manually using the script provided") @pytest.mark.dist -def test_zero(): - run_dist() +@pytest.mark.skip("Level 3 has unknown bug so skip this test for now") +def test_zero_level_3(): + world_size = 4 + run_func = partial(run_dist, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': - test_zero() + test_zero_level_3() diff --git a/tests/test_zero_tensor_parallel/configs/vit_2d_zero2.py b/tests/test_zero_tensor_parallel/configs/vit_2d_zero2.py deleted file mode 100644 index 80c450a47..000000000 --- a/tests/test_zero_tensor_parallel/configs/vit_2d_zero2.py +++ /dev/null @@ -1,12 +0,0 @@ -parallel = dict( - pipeline=dict(size=1), - tensor=dict(size=4, mode='2d'), -) - -fp16 = dict( - mode=None, -) - -zero = dict( - level=2 -) diff --git a/tests/test_zero_tensor_parallel/configs/vit_2d_zero3.py b/tests/test_zero_tensor_parallel/configs/vit_2d_zero3.py deleted file mode 100644 index 58e026347..000000000 --- a/tests/test_zero_tensor_parallel/configs/vit_2d_zero3.py +++ /dev/null @@ -1,12 +0,0 @@ -parallel = dict( - pipeline=dict(size=1), - tensor=dict(size=4, mode='2d'), -) - -fp16 = dict( - mode=None, -) - -zero = dict( - level=3 -) diff --git a/tests/test_zero_tensor_parallel/test.sh b/tests/test_zero_tensor_parallel/test.sh deleted file mode 100644 index da5afd5ae..000000000 --- a/tests/test_zero_tensor_parallel/test.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env sh -test_file=$1 - -python $test_file --rank $SLURM_PROCID --world_size $SLURM_NPROCS --host $HOST --port 29500 \ No newline at end of file diff --git a/tests/test_zero_tensor_parallel/test_vit_2d.py b/tests/test_zero_tensor_parallel/test_vit_2d_level_2.py similarity index 77% rename from tests/test_zero_tensor_parallel/test_vit_2d.py rename to tests/test_zero_tensor_parallel/test_vit_2d_level_2.py index ef77e9f2e..5b27d24e5 100644 --- a/tests/test_zero_tensor_parallel/test_vit_2d.py +++ b/tests/test_zero_tensor_parallel/test_vit_2d_level_2.py @@ -6,12 +6,11 @@ from pathlib import Path import pytest import torch.autograd +import torch.multiprocessing as mp import colossalai import torch -from colossalai.initialize import get_default_parser from colossalai.builder import build_model -from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc from colossalai.logging import get_dist_logger from colossalai.utils import get_dataloader @@ -20,9 +19,20 @@ from colossalai.nn import CrossEntropyLoss2D from torchvision import transforms from torchvision.datasets import CIFAR10 from components import * +from functools import partial -level = os.environ['LEVEL'] -CONFIG_PATH = Path(__file__).parent.parent.joinpath(f'configs/vit_2d_zero{level}.py') +CONFIG = dict( + parallel=dict( + pipeline=dict(size=1), + tensor=dict(size=4, mode='2d'), + ), + fp16=dict( + mode=None, + ), + zero=dict( + level=2 + ) +) def train_epoch(engine, train_dataloader): @@ -37,18 +47,14 @@ def train_epoch(engine, train_dataloader): return avg_loss -@pytest.mark.dist -@pytest.mark.skip("This test should be invoked by test.sh in the same folder as it runs on multiple gpus") -def test_2d_parallel_vision_transformer(): - parser = get_default_parser() - args = parser.parse_args() +def run_2d_parallel_vision_transformer_level_2(rank, world_size): colossalai.launch( - config=CONFIG_PATH, - rank=args.rank, - world_size=args.world_size, - host=args.host, - port=args.port, - backend=args.backend + config=CONFIG, + rank=rank, + world_size=world_size, + host='localhost', + port=29950, + backend='nccl' ) # build model @@ -70,7 +76,6 @@ def test_2d_parallel_vision_transformer(): train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=BATCH_SIZE, - num_workers=1, pin_memory=True, drop_last=True) @@ -97,6 +102,16 @@ def test_2d_parallel_vision_transformer(): engine.step() break + gpc.destroy() + torch.cuda.empty_cache() + + +@pytest.mark.dist +def test_2d_vit_zero_level_2(): + world_size = 8 + run_func = partial(run_2d_parallel_vision_transformer_level_2, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) + if __name__ == '__main__': - test_2d_parallel_vision_transformer() + test_2d_vit_zero_level_2() diff --git a/tests/test_zero_tensor_parallel/test_vit_2d_level_3.py b/tests/test_zero_tensor_parallel/test_vit_2d_level_3.py new file mode 100644 index 000000000..38c9d8a70 --- /dev/null +++ b/tests/test_zero_tensor_parallel/test_vit_2d_level_3.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +import os +from pathlib import Path + +import pytest +import torch.autograd +import torch.multiprocessing as mp + +import colossalai +import torch +from colossalai.core import global_context as gpc +from colossalai.builder import build_model +from colossalai.logging import get_dist_logger +from colossalai.utils import get_dataloader +from colossalai.nn.layer._parallel_utilities import _gather +from colossalai.nn import CrossEntropyLoss2D +from torchvision import transforms +from torchvision.datasets import CIFAR10 +from functools import partial +from components import * + + +CONFIG = dict( + parallel=dict( + pipeline=dict(size=1), + tensor=dict(size=4, mode='2d'), + ), + fp16=dict( + mode=None, + ), + zero=dict( + level=3 + ) +) + + +def train_epoch(engine, train_dataloader): + engine.train() + accumulated_loss = 0 + num_steps = len(train_dataloader) + data_iter = iter(train_dataloader) + for i in range(num_steps): + output, label, loss = engine.step(data_iter) + accumulated_loss += loss.detach().cpu().numpy() + avg_loss = accumulated_loss / num_steps + return avg_loss + + +def run_2d_parallel_vision_transformer_level_3(rank, world_size): + colossalai.launch( + config=CONFIG, + rank=rank, + world_size=world_size, + host='localhost', + port=29951, + backend='nccl' + ) + + # build model + model = build_model(model_cfg) + model.build_from_cfg() + + # build dataloader# build dataloaders + train_dataset = CIFAR10( + root=Path(os.environ['DATA']), + download=True, + transform=transforms.Compose( + [ + transforms.Resize(size=(IMG_SIZE, IMG_SIZE)), + transforms.ToTensor(), + transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)) + ] + ) + ) + train_dataloader = get_dataloader(dataset=train_dataset, + shuffle=True, + batch_size=BATCH_SIZE, + pin_memory=True, + drop_last=True) + + # build optimizer and loss + optimizer = torch.optim.Adam(model.parameters(), lr=0.001) + criterion = CrossEntropyLoss2D() + + engine, train_dataloader, *args = colossalai.initialize(model=model, + optimizer=optimizer, + criterion=criterion, + train_dataloader=train_dataloader) + logger = get_dist_logger() + + logger.info('start training') + engine.train() + + for img, label in train_dataloader: + engine.zero_grad() + img = img.cuda() + label = label.cuda() + out = engine(img) + loss = engine.criterion(out, label) + engine.backward(loss) + engine.step() + break + + gpc.destroy() + torch.cuda.empty_cache() + + +@pytest.mark.dist +@pytest.mark.skip("Level 3 has unknown bug so skip this test for now") +def test_3d_vit_zero_level_3(): + world_size = 8 + run_func = partial(run_2d_parallel_vision_transformer_level_3, world_size=world_size) + mp.spawn(run_func, nprocs=world_size) + + +if __name__ == '__main__': + test_3d_vit_zero_level_3()