diff --git a/examples/tutorial/hybrid_parallel/README.md b/examples/tutorial/hybrid_parallel/README.md index dcbdc1e00..dab69ce5d 100644 --- a/examples/tutorial/hybrid_parallel/README.md +++ b/examples/tutorial/hybrid_parallel/README.md @@ -1,16 +1,17 @@ # Handson 1: Multi-dimensional Parallelism with Colossal-AI -## Install Colossal-AI and other dependencies +## Install Titans Model Zoo ```bash -sh install.sh +pip install titans ``` ## Prepare Dataset -We use CIFAR10 dataset in this example. The dataset will be downloaded to `../data` by default. +We use CIFAR10 dataset in this example. You should invoke the `donwload_cifar10.py` in the tutorial root directory or directly run the `auto_parallel_with_resnet.py`. +The dataset will be downloaded to `colossalai/examples/tutorials/data` by default. If you wish to use customized directory for the dataset. You can set the environment variable `DATA` via the following command. ```bash @@ -23,5 +24,9 @@ export DATA=/path/to/data Current configuration setting on `config.py` is TP=2, PP=2. ```bash +# train with cifar10 colossalai run --nproc_per_node 4 train.py --config config.py -``` \ No newline at end of file + +# train with synthetic data +colossalai run --nproc_per_node 4 train.py --config config.py +``` diff --git a/examples/tutorial/hybrid_parallel/install.sh b/examples/tutorial/hybrid_parallel/install.sh deleted file mode 100644 index 252f6bcca..000000000 --- a/examples/tutorial/hybrid_parallel/install.sh +++ /dev/null @@ -1,4 +0,0 @@ -pip install torch==1.11.0+cu113 torchvision==0.12.0+cu113 torchaudio==0.11.0 --extra-index-url https://download.pytorch.org/whl/cu113 -pip install colossalai==0.1.10+torch1.12cu11.3 -f https://release.colossalai.org -pip install titans -colossalai check -i \ No newline at end of file diff --git a/examples/tutorial/hybrid_parallel/train.py b/examples/tutorial/hybrid_parallel/train.py index 1fb34d806..0f2a207cb 100644 --- a/examples/tutorial/hybrid_parallel/train.py +++ b/examples/tutorial/hybrid_parallel/train.py @@ -1,116 +1,145 @@ -import os -import colossalai -import torch - -from tqdm import tqdm -from colossalai.context import ParallelMode -from colossalai.core import global_context as gpc -from colossalai.logging import get_dist_logger -from colossalai.nn import CrossEntropyLoss -from colossalai.nn.lr_scheduler import CosineAnnealingWarmupLR -from colossalai.utils import is_using_pp, get_dataloader -from colossalai.pipeline.pipelinable import PipelinableContext -from titans.model.vit.vit import _create_vit_model -from titans.dataloader.cifar10 import build_cifar - - -def main(): - # initialize distributed setting - parser = colossalai.get_default_parser() - args = parser.parse_args() - - # launch from torch - colossalai.launch_from_torch(config=args.config) - - # get logger - logger = get_dist_logger() - logger.info("initialized distributed environment", ranks=[0]) - - if hasattr(gpc.config, 'LOG_PATH'): - if gpc.get_global_rank() == 0: - log_path = gpc.config.LOG_PATH - if not os.path.exists(log_path): - os.mkdir(log_path) - logger.log_to_file(log_path) - - use_pipeline = is_using_pp() - - # create model - model_kwargs = dict(img_size=gpc.config.IMG_SIZE, - patch_size=gpc.config.PATCH_SIZE, - hidden_size=gpc.config.HIDDEN_SIZE, - depth=gpc.config.DEPTH, - num_heads=gpc.config.NUM_HEADS, - mlp_ratio=gpc.config.MLP_RATIO, - num_classes=10, - init_method='jax', - checkpoint=gpc.config.CHECKPOINT) - - if use_pipeline: - pipelinable = PipelinableContext() - with pipelinable: - model = _create_vit_model(**model_kwargs) - pipelinable.to_layer_list() - pipelinable.policy = "uniform" - model = pipelinable.partition( - 1, gpc.pipeline_parallel_size, gpc.get_local_rank(ParallelMode.PIPELINE)) - else: - model = _create_vit_model(**model_kwargs) - - # count number of parameters - total_numel = 0 - for p in model.parameters(): - total_numel += p.numel() - if not gpc.is_initialized(ParallelMode.PIPELINE): - pipeline_stage = 0 - else: - pipeline_stage = gpc.get_local_rank(ParallelMode.PIPELINE) - logger.info( - f"number of parameters: {total_numel} on pipeline stage {pipeline_stage}") - - # create dataloaders - root = os.environ.get('DATA', '../data/cifar10') - train_dataloader, test_dataloader = build_cifar( - gpc.config.BATCH_SIZE, root, pad_if_needed=True) - - # create loss function - criterion = CrossEntropyLoss(label_smoothing=0.1) - - # create optimizer - optimizer = torch.optim.AdamW(model.parameters( - ), lr=gpc.config.LEARNING_RATE, weight_decay=gpc.config.WEIGHT_DECAY) - - # create lr scheduler - lr_scheduler = CosineAnnealingWarmupLR(optimizer=optimizer, - total_steps=gpc.config.NUM_EPOCHS, - warmup_steps=gpc.config.WARMUP_EPOCHS) - - # initialize - engine, train_dataloader, test_dataloader, _ = colossalai.initialize(model=model, - optimizer=optimizer, - criterion=criterion, - train_dataloader=train_dataloader, - test_dataloader=test_dataloader) - - logger.info("Engine is built", ranks=[0]) - - data_iter = iter(train_dataloader) - - for epoch in range(gpc.config.NUM_EPOCHS): - # training - engine.train() - - if gpc.get_global_rank() == 0: - description = 'Epoch {} / {}'.format(epoch, gpc.config.NUM_EPOCHS) - progress = tqdm(range(len(train_dataloader)), desc=description) - else: - progress = range(len(train_dataloader)) - for _ in progress: - engine.zero_grad() - engine.execute_schedule(data_iter, return_output_label=False) - engine.step() - lr_scheduler.step() - - -if __name__ == '__main__': - main() +import os + +import torch +from titans.dataloader.cifar10 import build_cifar +from titans.model.vit.vit import _create_vit_model +from tqdm import tqdm + +import colossalai +from colossalai.context import ParallelMode +from colossalai.core import global_context as gpc +from colossalai.logging import get_dist_logger +from colossalai.nn import CrossEntropyLoss +from colossalai.nn.lr_scheduler import CosineAnnealingWarmupLR +from colossalai.pipeline.pipelinable import PipelinableContext +from colossalai.utils import get_dataloader, is_using_pp + + +class DummyDataloader(): + + def __init__(self, length, batch_size): + self.length = length + self.batch_size = batch_size + + def generate(self): + data = torch.rand(self.batch_size, 3, 224, 224) + label = torch.randint(low=0, high=10, size=(self.batch_size,)) + return data, label + + def __iter__(self): + self.step = 0 + return self + + def __next__(self): + if self.step < self.length: + self.step += 1 + return self.generate() + else: + raise StopIteration + + def __len__(self): + return self.length + + +def main(): + # initialize distributed setting + parser = colossalai.get_default_parser() + parser.add_argument('-s', '--synthetic', action="store_true", help="whether use synthetic data") + args = parser.parse_args() + + # launch from torch + colossalai.launch_from_torch(config=args.config) + + # get logger + logger = get_dist_logger() + logger.info("initialized distributed environment", ranks=[0]) + + if hasattr(gpc.config, 'LOG_PATH'): + if gpc.get_global_rank() == 0: + log_path = gpc.config.LOG_PATH + if not os.path.exists(log_path): + os.mkdir(log_path) + logger.log_to_file(log_path) + + use_pipeline = is_using_pp() + + # create model + model_kwargs = dict(img_size=gpc.config.IMG_SIZE, + patch_size=gpc.config.PATCH_SIZE, + hidden_size=gpc.config.HIDDEN_SIZE, + depth=gpc.config.DEPTH, + num_heads=gpc.config.NUM_HEADS, + mlp_ratio=gpc.config.MLP_RATIO, + num_classes=10, + init_method='jax', + checkpoint=gpc.config.CHECKPOINT) + + if use_pipeline: + pipelinable = PipelinableContext() + with pipelinable: + model = _create_vit_model(**model_kwargs) + pipelinable.to_layer_list() + pipelinable.policy = "uniform" + model = pipelinable.partition(1, gpc.pipeline_parallel_size, gpc.get_local_rank(ParallelMode.PIPELINE)) + else: + model = _create_vit_model(**model_kwargs) + + # count number of parameters + total_numel = 0 + for p in model.parameters(): + total_numel += p.numel() + if not gpc.is_initialized(ParallelMode.PIPELINE): + pipeline_stage = 0 + else: + pipeline_stage = gpc.get_local_rank(ParallelMode.PIPELINE) + logger.info(f"number of parameters: {total_numel} on pipeline stage {pipeline_stage}") + + # create dataloaders + root = os.environ.get('DATA', '../data') + if args.synthetic: + # if we use synthetic dataset + # we train for 30 steps and eval for 10 steps per epoch + train_dataloader = DummyDataloader(length=30, batch_size=gpc.config.BATCH_SIZE) + test_dataloader = DummyDataloader(length=10, batch_size=gpc.config.BATCH_SIZE) + else: + train_dataloader, test_dataloader = build_cifar(gpc.config.BATCH_SIZE, root, pad_if_needed=True) + + # create loss function + criterion = CrossEntropyLoss(label_smoothing=0.1) + + # create optimizer + optimizer = torch.optim.AdamW(model.parameters(), lr=gpc.config.LEARNING_RATE, weight_decay=gpc.config.WEIGHT_DECAY) + + # create lr scheduler + lr_scheduler = CosineAnnealingWarmupLR(optimizer=optimizer, + total_steps=gpc.config.NUM_EPOCHS, + warmup_steps=gpc.config.WARMUP_EPOCHS) + + # initialize + engine, train_dataloader, test_dataloader, _ = colossalai.initialize(model=model, + optimizer=optimizer, + criterion=criterion, + train_dataloader=train_dataloader, + test_dataloader=test_dataloader) + + logger.info("Engine is built", ranks=[0]) + + for epoch in range(gpc.config.NUM_EPOCHS): + # training + engine.train() + data_iter = iter(train_dataloader) + + if gpc.get_global_rank() == 0: + description = 'Epoch {} / {}'.format(epoch, gpc.config.NUM_EPOCHS) + progress = tqdm(range(len(train_dataloader)), desc=description) + else: + progress = range(len(train_dataloader)) + for _ in progress: + engine.zero_grad() + engine.execute_schedule(data_iter, return_output_label=False) + engine.step() + lr_scheduler.step() + + +if __name__ == '__main__': + main()