From 6a21f96a87948971e7c2e96f2cf2e563304e0c7a Mon Sep 17 00:00:00 2001 From: flybird11111 <1829166702@qq.com> Date: Tue, 10 Oct 2023 16:18:55 +0800 Subject: [PATCH] [doc] update advanced tutorials, training gpt with hybrid parallelism (#4866) * [doc]update advanced tutorials, training gpt with hybrid parallelism * [doc]update advanced tutorials, training gpt with hybrid parallelism * update vit tutorials * update vit tutorials * update vit tutorials * update vit tutorials * update en/train_vit_with_hybrid_parallel.py * fix * resolve comments * fix --- docs/sidebars.json | 1 - .../train_gpt_using_hybrid_parallelism.md | 408 ++++----- .../train_vit_using_pipeline_parallelism.md | 248 ------ .../train_vit_with_hybrid_parallelism.md | 787 +++++------------- .../train_gpt_using_hybrid_parallelism.md | 416 ++++----- .../train_vit_using_pipeline_parallelism.md | 247 ------ .../train_vit_with_hybrid_parallelism.md | 729 +++++----------- 7 files changed, 766 insertions(+), 2070 deletions(-) delete mode 100644 docs/source/en/advanced_tutorials/train_vit_using_pipeline_parallelism.md delete mode 100644 docs/source/zh-Hans/advanced_tutorials/train_vit_using_pipeline_parallelism.md diff --git a/docs/sidebars.json b/docs/sidebars.json index 45e86afc1..123211db5 100644 --- a/docs/sidebars.json +++ b/docs/sidebars.json @@ -64,7 +64,6 @@ "label": "Advanced Tutorials", "collapsed": true, "items": [ - "advanced_tutorials/train_vit_using_pipeline_parallelism", "advanced_tutorials/train_vit_with_hybrid_parallelism", "advanced_tutorials/train_gpt_using_hybrid_parallelism", "advanced_tutorials/meet_gemini", diff --git a/docs/source/en/advanced_tutorials/train_gpt_using_hybrid_parallelism.md b/docs/source/en/advanced_tutorials/train_gpt_using_hybrid_parallelism.md index 0218264cc..7a0e3b1a0 100644 --- a/docs/source/en/advanced_tutorials/train_gpt_using_hybrid_parallelism.md +++ b/docs/source/en/advanced_tutorials/train_gpt_using_hybrid_parallelism.md @@ -1,10 +1,14 @@ -# Train GPT Using Hybrid Parallelism +# Fine-tune GPT-2 Using Hybrid Parallelism -Author: Hongxin Liu, Yongbin Li +Author: Hongxin Liu, Yongbin Li, Mingyan Jiang + +**Prerequisite:** +- [parallelism plugin](../basics/booster_plugins.md) +- [booster API](../basics/booster_api.md) **Example Code** -- [ColossalAI-Examples GPT2](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/language/gpt_2) -- [ColossalAI-Examples GPT3](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/language/gpt_3) +- [ColossalAI-Examples GPT](https://github.com/hpcaitech/ColossalAI/blob/main/examples/language/gpt/hybridparallelism/finetune.py) + **Related Paper** - [Colossal-AI: A Unified Deep Learning System For Large-Scale Parallel Training](https://arxiv.org/abs/2110.14883) @@ -12,260 +16,192 @@ Author: Hongxin Liu, Yongbin Li ## Introduction -In the previous tutorial, we introduce how to train ViT with pipeline. In this tutorial, you will learn a more complex scenario -- train GPT with hybrid parallelism. In this case, GPT-3 is so large that CPU memory cannot fit it as well. Therefore, you must split the model by yourself. +In the previous tutorial, we introduce how to train ViT with pipeline. In this tutorial, you will learn a more complex scenario -- fine-tune GPT-2 with hybrid parallelism. In this case, GPT-2 is so large that CPU memory cannot fit it as well. Therefore, you must split the model. ## Table of content In this tutorial we will cover: -1. The definition of GPT model, based on colossalai/model_zoo -2. Processing the dataset -3. Training GPT using hybrid parallelism +1. Initialize the hybrid parallelism plugin. +2. Defining the Training Components of the GPT-2 Model +3. Boost the GPT-2 Model with [`HybridParallelPlugin`](../basics/booster_plugins.md) +4. Training GPT-2 using hybrid parallelism ## Import libraries ```python -import json -import os -from typing import Callable - -import colossalai -import colossalai.utils as utils -import model_zoo.gpt.gpt as col_gpt +from typing import Callable, List, Union import torch +import torch.distributed as dist import torch.nn as nn -from colossalai import nn as col_nn -from colossalai.amp import AMP_TYPE -from colossalai.legacy.builder.pipeline import partition_uniform -from colossalai.legacy.context.parallel_mode import ParallelMode -from colossalai.core import global_context as gpc -from colossalai.legacy.engine.schedule import (InterleavedPipelineSchedule, - PipelineSchedule) -from colossalai.logging import disable_existing_loggers, get_dist_logger -from colossalai.legacy.nn.layer.wrapper import PipelineSharedModuleWrapper -from colossalai.legacy.trainer import Trainer, hooks -from colossalai.utils.timer import MultiTimer -from model_zoo.gpt import GPTLMLoss -from torch.nn import functional as F -from torch.utils.data import Dataset -from transformers import GPT2Tokenizer -``` - - - -## Define GPT model - -In the previous tutorial, we introduced 3 ways to build a pipelined model. But for huge models like GPT-3, you can't even build the model in CPU. In this case, you must split the model by yourself. - -GPT dataloader returns `input_ids` and `attention_mask`, so we use two keyword arguments in `forward()` to get them. Note that for stages except the first stage, the first positional argument of `forward()` is the output tensor from the previous stage. So the `hidden_states` is from the previous stage, and for the first stage it's `None`. - -For GPT, the *word embedding layer* shares the weights with the *output head*. We provide `PipelineSharedModuleWrapper` to share parameters among pipeline stages. It takes a `list` of `int` as argument, which means those ranks share the parameters. You can use `register_module()` or `register_parameter()` to register a module or a parameter as the shared module or parameter. If you have multiple sets of shared modules / parameters, you should have multiple `PipelineSharedModuleWrapper` instance. If the parameter is shared within **one** stage, you should not use `PipelineSharedModuleWrapper`, and just use the same module / parameter instance. In this example, the *word embedding layer* is at the first stage, and the *output head* is at the last stage. Thus, they are shared among ranks `[0, pipeline_size - 1]`. - -For the first stage, it maintains the embedding layer and some transformer blocks. For the last stage, it maintains some transformer blocks and the output head layer. For other stages, they just maintain some transformer blocks. `partition_uniform(num_layers, pipeline_size, num_chunks)` returns the parts of all ranks, and the part is a `tuple` of `(start, end)` (exclude end). `start == 0` means that it's the first stage, and `end == num_layers` means it's the last stage. +from torch.optim import Optimizer +from torch.optim.lr_scheduler import _LRScheduler as LRScheduler +from tqdm import tqdm +from transformers import AutoConfig, GPT2ForSequenceClassification, get_linear_schedule_with_warmup +from transformers import AutoTokenizer +import colossalai +from colossalai.booster import Booster +from colossalai.booster.plugin import GeminiPlugin, HybridParallelPlugin, LowLevelZeroPlugin, TorchDDPPlugin +from colossalai.cluster import DistCoordinator +from colossalai.nn.optimizer import HybridAdam +from colossalai.utils import get_current_device +``` +## Define Plugin +Create a `HybridParallelPlugin` object and specify the desired parallelism strategies to be used. In this example, both pipeline parallelism and ZeRO-1 are used simultaneously. ```python -class PipelineGPTHybrid(nn.Module): - def __init__(self, - num_layers: int = 12, - hidden_size: int = 768, - num_attention_heads: int = 12, - vocab_size: int = 50304, - embed_drop_rate: float = 0., - act_func: Callable = F.gelu, - mlp_ratio: int = 4, - attn_drop_rate: float = 0., - drop_rate: float = 0., - dtype: torch.dtype = torch.float, - checkpoint: bool = False, - max_position_embeddings: int = 1024, - layer_norm_epsilon: float = 1e-5, - first: bool = False, - last: bool = False): - super().__init__() - self.embedding = None - self.norm = None - self.head = None - if first: - self.embedding = col_gpt.GPTEmbedding( - hidden_size, vocab_size, max_position_embeddings, dropout=embed_drop_rate, dtype=dtype) - self.blocks = nn.ModuleList([ - col_gpt.GPTBlock(hidden_size, num_attention_heads, mlp_ratio=mlp_ratio, attention_dropout=attn_drop_rate, - dropout=drop_rate, dtype=dtype, checkpoint=checkpoint, activation=act_func) - for _ in range(num_layers) - ]) - if last: - self.norm = col_nn.LayerNorm(hidden_size, eps=layer_norm_epsilon) - self.head = col_gpt.GPTLMHead(vocab_size=vocab_size, - dim=hidden_size, - dtype=dtype, - bias=False) - - def forward(self, hidden_states=None, input_ids=None, attention_mask=None): - if self.embedding is not None: - hidden_states = self.embedding(input_ids=input_ids) - batch_size = hidden_states.shape[0] - attention_mask = attention_mask.view(batch_size, -1) - attention_mask = attention_mask[:, None, None, :] - attention_mask = attention_mask.to(dtype=hidden_states.dtype) # fp16 compatibility - attention_mask = (1.0 - attention_mask) * -10000.0 - for block in self.blocks: - hidden_states, attention_mask = block(hidden_states, attention_mask) - if self.norm is not None: - hidden_states = self.head(self.norm(hidden_states)) - return hidden_states - - -def build_gpt_pipeline(num_layers, num_chunks, device=torch.device('cuda'), **kwargs): - logger = get_dist_logger() - pipeline_size = gpc.get_world_size(ParallelMode.PIPELINE) - pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE) - rank = gpc.get_global_rank() - wrapper = PipelineSharedModuleWrapper([0, pipeline_size - 1]) - parts = partition_uniform(num_layers, pipeline_size, num_chunks)[pipeline_rank] - models = [] - for start, end in parts: - kwargs['num_layers'] = end - start - kwargs['first'] = start == 0 - kwargs['last'] = end == num_layers - logger.info(f'Rank{rank} build layer {start}-{end}, {end-start}/{num_layers} layers') - chunk = PipelineGPTHybrid(**kwargs).to(device) - if start == 0: - wrapper.register_module(chunk.embedding.word_embeddings) - elif end == num_layers: - wrapper.register_module(chunk.head) - models.append(chunk) - if len(models) == 1: - model = models[0] - else: - model = nn.ModuleList(models) - return model - - -def GPT2_exlarge_pipeline_hybrid(num_chunks=1, checkpoint=False, dtype=torch.float): - cfg = dict(hidden_size=1600, num_attention_heads=32, checkpoint=checkpoint, dtype=dtype) - return build_gpt_pipeline(48, num_chunks, **cfg) - - -def GPT3_pipeline_hybrid(num_chunks=1, checkpoint=False, dtype=torch.float): - cfg = dict(hidden_size=12288, num_attention_heads=96, - checkpoint=checkpoint, max_position_embeddings=2048, dtype=dtype) - return build_gpt_pipeline(96, num_chunks, **cfg) +plugin = HybridParallelPlugin( + tp_size=1, + pp_size=2, + num_microbatches=None, + microbatch_size=1, + enable_all_optimization=True, + zero_stage=1, + precision="fp16", + initial_scale=1, +) ``` +## Define GPT-2's Training Components -## Process the dataset - -We provide a small GPT web-text dataset here. The original format is loose JSON, and we will save the processed dataset. +Before using hybrid parallelism, you need to define the components used for training. +Define hyperparameters ```python -class WebtextDataset(Dataset): - def __init__(self, path, seq_len=1024) -> None: - super().__init__() - root = os.path.dirname(path) - encoded_data_cache_path = os.path.join(root, f'gpt_webtext_{seq_len}.pt') - if os.path.isfile(encoded_data_cache_path): - seq_len_, data, attention_mask = torch.load( - encoded_data_cache_path) - if seq_len_ == seq_len: - self.data = data - self.attention_mask = attention_mask - return - raw_data = [] - with open(path) as f: - for line in f.readlines(): - raw_data.append(json.loads(line)['text']) - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.unk_token - encoded_data = tokenizer( - raw_data, padding=True, truncation=True, max_length=seq_len, return_tensors='pt') - self.data = encoded_data['input_ids'] - self.attention_mask = encoded_data['attention_mask'] - torch.save((seq_len, self.data, self.attention_mask), - encoded_data_cache_path) - - def __len__(self): - return len(self.data) - - def __getitem__(self, index): - return { - 'input_ids': self.data[index], - 'attention_mask': self.attention_mask[index] - }, self.data[index] +NUM_EPOCHS = 3 +BATCH_SIZE = 32 +LEARNING_RATE = 2.4e-5 +WEIGHT_DECAY = 0.01 +WARMUP_FRACTION = 0.1 ``` - -## Training GPT using hybrid parallelism - -In the previous tutorial, we explained the meanings of some pipeline arguments. In this case, we can determine the shape of each output tensor which is exchanged among pipeline stages. For GPT, the shape is `(MICRO BATCH SIZE, SEQUENCE LEN, HIDDEN SIZE)`. By setting this, we can avoid exchanging the tensor shape of each stage. When you are not sure of the tensor shape, you can just leave it `None`, and the shape is inferred automatically. Make sure that the `dtype` of your model is correct. When you use `fp16`, the `dtype` of your model must be `torch.half`. Otherwise, the `dtype` must be `torch.float`. For pipeline parallelism, only `AMP_TYPE.NAIVE` is supported. - -You can easily use tensor parallel by setting `parallel` in `CONFIG`. The data parallelism size is automatically set based on the number of GPUs. - +we create a distributed environment. ```python -NUM_EPOCHS = 60 -SEQ_LEN = 1024 -BATCH_SIZE = 192 -NUM_CHUNKS = None -TENSOR_SHAPE = (1, 1024, 1600) -# only pipeline parallel -# CONFIG = dict(parallel=dict(pipeline=2), fp16=dict(mode=AMP_TYPE.NAIVE)) -# pipeline + 1D model parallel -CONFIG = dict(NUM_MICRO_BATCHES = 192, parallel=dict(pipeline=2, tensor=dict(mode='1d', size=2)), fp16=dict(mode=AMP_TYPE.NAIVE)) - - -def train(): - disable_existing_loggers() - parser = colossalai.get_default_parser() - args = parser.parse_args() - colossalai.launch_from_torch(config=CONFIG, backend=args.backend) - logger = get_dist_logger() - - train_ds = WebtextDataset(os.environ['DATA'], seq_len=SEQ_LEN) - train_dataloader = utils.get_dataloader(train_ds, - seed=42, - batch_size=BATCH_SIZE, - pin_memory=True, - shuffle=True, - drop_last=True) - - use_interleaved = NUM_CHUNKS is not None - num_chunks = 1 if not use_interleaved else NUM_CHUNKS - model = GPT2_exlarge_pipeline_hybrid(num_chunks=num_chunks, checkpoint=True, dtype=torch.half) - # model = GPT3_pipeline_hybrid(num_chunks=num_chunks, checkpoint=True, dtype=torch.half) - if use_interleaved and not isinstance(model, nn.ModuleList): - model = nn.ModuleList([model]) - - criterion = GPTLMLoss() - - optimizer = torch.optim.Adam(model.parameters(), lr=0.00015, weight_decay=1e-2,) +# Launch ColossalAI +colossalai.launch_from_torch(config={}, seed=42) +coordinator = DistCoordinator() +``` +prepare the dataset. You can use `plugin.prepare_dataloader` to generate a dataloader or customize your own dataloader. +```python +def tokenize_batch(batch, tokenizer: Optional[AutoTokenizer] = None, max_length: int = 2048): + texts = [sample["sentence1"] + sample["sentence2"] for sample in batch] + data = tokenizer(texts, return_tensors="pt", padding="max_length", truncation=True, max_length=max_length) + data = {k: v.cuda() for k, v in data.items()} + data["labels"] = data["input_ids"].clone() + return data + +tokenizer = AutoTokenizer.from_pretrained("gpt2") +dataset = datasets.load_dataset("glue", "mrpc") +train_dataloader = plugin.prepare_dataloader( + dataset["train"], + batch_size=BATCH_SIZE, + shuffle=True, + drop_last=True, + collate_fn=partial(tokenize_batch, tokenizer=tokenizer, max_length=512), +) +``` +Prepare gpt-2 model +```python +cfg = AutoConfig.from_pretrained("gpt2", num_labels=2) +model = GPT2ForSequenceClassification.from_pretrained("gpt2", config=cfg).cuda() - engine, train_dataloader, _, _ = colossalai.initialize(model, - optimizer, - criterion, - train_dataloader=train_dataloader) - global_batch_size = BATCH_SIZE * \ - gpc.get_world_size(ParallelMode.DATA) * getattr(gpc.config, "gradient_accumulation", 1) - logger.info(f'Init done, global batch size = {global_batch_size}', ranks=[0]) +``` +prepare optimizer +```python +lr = LEARNING_RATE * coordinator.world_size +no_decay = ["bias", "LayerNorm.weight"] +optimizer_grouped_parameters = [ + { + "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], + "weight_decay": WEIGHT_DECAY, + }, + { + "params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], + "weight_decay": 0.0, + }, +] +optimizer = HybridAdam(optimizer_grouped_parameters, lr=lr, eps=1e-8) +``` +Prepare the lr_scheduler and criterion, and it's important to note that when hybrid parallelism with pipeline parallelism is used, a criterion function should also be defined. This function should take the input and output of the model's forward pass as parameters and return the loss. +```python +# lr scheduler +total_steps = len(train_dataloader) * NUM_EPOCHS +num_warmup_steps = int(WARMUP_FRACTION * total_steps) +lr_scheduler = get_linear_schedule_with_warmup( + optimizer, + num_warmup_steps=num_warmup_steps, + num_training_steps=total_steps, +) + +def _criterion(outputs, inputs): + return outputs.loss +``` +## Boost the GPT-2 Model +Define a booster with `HybridParallelPlugin`. Based on the configured plugin parameters, the booster will inject one or more parallel strategies into the model. In this example, pipeline parallelism, zero1, and mixed-precision training optimizations are utilized. +```python +booster = Booster(plugin=plugin) +``` +Boost these components with the defined booster. +```python +model, optimizer, _criterion, _, lr_scheduler = booster.boost( + model, optimizer, criterion=_criterion, lr_scheduler=lr_scheduler +) +``` - timer = MultiTimer() - trainer = Trainer( - engine=engine, - logger=logger, - timer=timer - ) +## Training GPT-2 using hybrid parallelism - hook_list = [ - hooks.LossHook(), - hooks.LogMetricByEpochHook(logger), - hooks.ThroughputHook(), - hooks.LogMetricByStepHook(), - ] +In the previous tutorial, We've explained how to inject various parallelism features into the model and its training components using the Booster and `HybridParallelPlugin`. Now we can start model training. +Define a training function. When pipeline parallelism is used, you need to call `booster.execute_pipeline` to schedule the stages of model training. +```python +def train_epoch( + epoch: int, + model: nn.Module, + optimizer: Optimizer, + _criterion: Callable, + lr_scheduler: LRScheduler, + train_dataloader: DataLoader, + booster: Booster, + coordinator: DistCoordinator, +): + use_pipeline = isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1 + is_pp_last_stage = use_pipeline and booster.plugin.stage_manager.is_last_stage() + print_flag = (not use_pipeline and coordinator.is_master()) or (use_pipeline and is_pp_last_stage) + total_step = len(train_dataloader) + + model.train() + optimizer.zero_grad() + train_dataloader_iter = iter(train_dataloader) + with tqdm( + range(total_step), + desc=f"Epoch [{epoch + 1}/{NUM_EPOCHS}]", + disable=not print_flag, + ) as pbar: + # Forward pass + for _ in pbar: + if use_pipeline: + outputs = booster.execute_pipeline( + train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + ) + # Backward and optimize + if is_pp_last_stage: + loss = outputs["loss"] + pbar.set_postfix({"loss": loss.item()}) + else: + data = next(train_dataloader_iter) + data = move_to_cuda(data) + outputs = model(**data) + loss = _criterion(outputs, None) + # Backward + booster.backward(loss, optimizer) + pbar.set_postfix({"loss": loss.item()}) + + optimizer.step() + optimizer.zero_grad() + lr_scheduler.step() - trainer.fit( - train_dataloader=train_dataloader, - epochs=NUM_EPOCHS, - test_interval=1, - hooks=hook_list, - display_progress=True, - return_output_label=False, - ) ``` - +Training the gpt-2 model +```python +for epoch in range(NUM_EPOCHS): + train_epoch(epoch, model, optimizer, _criterion, lr_scheduler, train_dataloader, booster, coordinator) +``` + \ No newline at end of file diff --git a/docs/source/en/advanced_tutorials/train_vit_using_pipeline_parallelism.md b/docs/source/en/advanced_tutorials/train_vit_using_pipeline_parallelism.md deleted file mode 100644 index 6dbe33800..000000000 --- a/docs/source/en/advanced_tutorials/train_vit_using_pipeline_parallelism.md +++ /dev/null @@ -1,248 +0,0 @@ -# Train ViT Using Pipeline Parallelism - -Author: Hongxin Liu, Yongbin Li - -**Example Code** -- [ColossalAI-Examples Pipeline Parallel ViT](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/image/vision_transformer/pipeline_parallel) - -**Related Paper** -- [Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM](https://arxiv.org/abs/2104.04473) - -## Introduction - -In this tutorial, you will learn how to train Vision Transformer for image classification from scratch, using pipeline. -Pipeline parallelism is a kind of model parallelism, which is useful when your GPU memory cannot fit your model. -By using it, we split the original model into multi stages, and each stage maintains a part of the original model. -We assume that your GPU memory cannot fit ViT/L-16, and your memory can fit this model. - -## Table of contents - -In this tutorial we will cover: - -1. The definition of ViT model, based on [TIMM](https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/vision_transformer.py) -2. Processing the dataset -3. Training ViT using pipeline - -## Import libraries - -```python -import os -from collections import OrderedDict -from functools import partial - -import colossalai -import colossalai.nn as col_nn -import torch -import torch.nn as nn -from colossalai.legacy.builder import build_pipeline_model -from colossalai.legacy.engine.schedule import (InterleavedPipelineSchedule, - PipelineSchedule) -from colossalai.logging import disable_existing_loggers, get_dist_logger -from colossalai.legacy.trainer import Trainer, hooks -from colossalai.utils import MultiTimer, get_dataloader -from timm.models import vision_transformer as vit -from torchvision import transforms -from torchvision.datasets import CIFAR10 -``` - - - -## Define Vision Transformer model - -Generally, we provide 3 ways to build a pipelined model: - -1. `colossalai.legacy.builder.build_pipeline_model_from_cfg` -2. `colossalai.legacy.builder.build_pipeline_model` -3. Split the model by stages by yourself - -When your memory can fit the model, you can use the first two methods to build your model, otherwise you must split the model by yourself. The first two methods first build the whole model on CPU, then split the model, and finally you can just move the corresponding part of model to GPU. - -`colossalai.legacy.builder.build_pipeline_model_from_cfg()` receives a config file of model, and it can split the model uniformly (by layer) or balanced (by parameter size). - -If you are familiar with `PyTorch`, you can use `colossalai.legacy.builder.build_pipeline_model()` which receives a `torch.nn.Sequential` model and split it by layer uniformly. - -In this tutorial, we will modify [TIMM/ViT](https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/vision_transformer.py) to `torch.nn.Sequential` and then use `colossalai.legacy.builder.build_pipeline_model()` to build the pipelined model. - -When the data is **one** `Tensor`, you can use the positional argument in `forward()` of your model to get the data tensor. For the first stage of pipeline, the first positional argument of `forward()` is the data tensor loaded from data loader. For other stages, the first positional argument of `forward()` is the output tensor from the previous stage. Note that if the stage is not the last stage, the return of `forward()` must be a `Tensor`. - -When the data is a `dict` of `Tensor`, you can use named keyword arguments in `forward()` of your model to get the data `dict`. - -```python -class ViTEmbedding(nn.Module): - def __init__(self, img_size=224, patch_size=16, in_chans=3, embed_dim=768, embed_layer=vit.PatchEmbed, drop_rate=0., distilled=False): - super().__init__() - self.embed_dim = embed_dim # num_features for consistency with other models - self.num_tokens = 2 if distilled else 1 - self.patch_embed = embed_layer( - img_size=img_size, patch_size=patch_size, in_chans=in_chans, embed_dim=embed_dim) - num_patches = self.patch_embed.num_patches - - self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) - self.dist_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) if distilled else None - self.pos_embed = nn.Parameter(torch.zeros(1, num_patches + self.num_tokens, embed_dim)) - self.pos_drop = nn.Dropout(p=drop_rate) - self.init_weights() - - def forward(self, x): - x = self.patch_embed(x) - cls_token = self.cls_token.expand(x.shape[0], -1, -1) # stole cls_tokens impl from Phil Wang, thanks - if self.dist_token is None: - x = torch.cat((cls_token, x), dim=1) - else: - x = torch.cat((cls_token, self.dist_token.expand(x.shape[0], -1, -1), x), dim=1) - x = self.pos_drop(x + self.pos_embed) - return x - - def init_weights(self): - vit.trunc_normal_(self.pos_embed, std=.02) - if self.dist_token is not None: - vit.trunc_normal_(self.dist_token, std=.02) - vit.trunc_normal_(self.cls_token, std=.02) - self.apply(vit._init_vit_weights) - - -class ViTHead(nn.Module): - def __init__(self, embed_dim=768, num_classes=1000, norm_layer=None, distilled=False, representation_size=None): - super().__init__() - norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6) - self.norm = norm_layer(embed_dim) - self.num_classes = num_classes - self.distilled = distilled - self.num_features = embed_dim - # Representation layer - if representation_size and not distilled: - self.num_features = representation_size - self.pre_logits = nn.Sequential(OrderedDict([ - ('fc', nn.Linear(embed_dim, representation_size)), - ('act', nn.Tanh()) - ])) - else: - self.pre_logits = nn.Identity() - # Classifier head(s) - self.head = nn.Linear(self.num_features, num_classes) if num_classes > 0 else nn.Identity() - self.head_dist = None - if distilled: - self.head_dist = nn.Linear(embed_dim, num_classes) if num_classes > 0 else nn.Identity() - self.init_weights() - - def forward(self, x): - x = self.norm(x) - if self.distilled: - x, x_dist = self.head(x[:, 0]), self.head_dist(x[:, 1]) - if self.training and not torch.jit.is_scripting(): - # during inference, return the average of both classifier predictions - return x, x_dist - else: - return (x + x_dist) / 2 - else: - x = self.pre_logits(x[:, 0]) - x = self.head(x) - return x - - def init_weights(self): - self.apply(vit._init_vit_weights) - - -def sequential_vit(img_size=224, patch_size=16, in_chans=3, num_classes=1000, embed_dim=768, depth=12, - num_heads=12, mlp_ratio=4., qkv_bias=True, representation_size=None, distilled=False, - drop_rate=0., attn_drop_rate=0., drop_path_rate=0., embed_layer=vit.PatchEmbed, norm_layer=None, - act_layer=None): - norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6) - act_layer = act_layer or nn.GELU - embedding = ViTEmbedding(img_size=img_size, patch_size=patch_size, in_chans=in_chans, - embed_dim=embed_dim, embed_layer=embed_layer, drop_rate=drop_rate, distilled=distilled) - dpr = [x.item() for x in torch.linspace(0, drop_path_rate, depth)] # stochastic depth decay rule - blocks = [vit.Block( - dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, qkv_bias=qkv_bias, drop=drop_rate, - attn_drop=attn_drop_rate, drop_path=dpr[i], norm_layer=norm_layer, act_layer=act_layer) - for i in range(depth)] - for block in blocks: - block.apply(vit._init_vit_weights) - head = ViTHead(embed_dim=embed_dim, num_classes=num_classes, norm_layer=norm_layer, - distilled=distilled, representation_size=representation_size) - return nn.Sequential(embedding, *blocks, head) - - -def vit_large_patch16_224(**kwargs): - model_kwargs = dict(embed_dim=1024, depth=24, num_heads=16, **kwargs) - return sequential_vit(**model_kwargs) -``` - -## Process the dataset - -Generally, we train ViT on large dataset like Imagenet. For simplicity, we just use CIFAR-10 here, since this tutorial is just for pipeline training. - -```python -def build_cifar(batch_size): - transform_train = transforms.Compose([ - transforms.RandomCrop(224, pad_if_needed=True), - transforms.AutoAugment(policy=transforms.AutoAugmentPolicy.CIFAR10), - transforms.ToTensor(), - transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), - ]) - transform_test = transforms.Compose([ - transforms.Resize(224), - transforms.ToTensor(), - transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), - ]) - - train_dataset = CIFAR10(root=os.environ['DATA'], train=True, download=True, transform=transform_train) - test_dataset = CIFAR10(root=os.environ['DATA'], train=False, transform=transform_test) - train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=batch_size, pin_memory=True) - test_dataloader = get_dataloader(dataset=test_dataset, batch_size=batch_size, pin_memory=True) - return train_dataloader, test_dataloader -``` - -## Training ViT using pipeline - -You can set the size of pipeline parallel and number of microbatches in config. `NUM_CHUNKS` is useful when using interleaved-pipeline (for more details see [Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM](https://arxiv.org/abs/2104.04473) ). The original batch will be split into `num_microbatches`, and each stage will load a micro batch each time. Then we will generate an appropriate schedule for you to execute the pipeline training. If you don't need the output and label of model, you can set `return_output_label` to `False` when calling `trainer.fit()` which can further reduce GPU memory usage. - -You should `export DATA=/path/to/cifar`. - -```python -BATCH_SIZE = 16 -NUM_EPOCHS = 60 -NUM_CHUNKS = 1 -CONFIG = dict(NUM_MICRO_BATCHES=4, parallel=dict(pipeline=2)) - - -def train(): - disable_existing_loggers() - parser = colossalai.get_default_parser() - args = parser.parse_args() - colossalai.launch_from_torch(backend=args.backend, config=CONFIG) - logger = get_dist_logger() - - # build model - model = vit_large_patch16_224() - model = build_pipeline_model(model, num_chunks=NUM_CHUNKS, verbose=True) - - # build criterion - criterion = nn.CrossEntropyLoss() - - # optimizer - optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=0) - - # build dataloader - train_dataloader, test_dataloader = build_cifar(BATCH_SIZE) - - engine, train_dataloader, test_dataloader, _ = colossalai.initialize(model, optimizer, criterion, - train_dataloader, test_dataloader) - timer = MultiTimer() - - trainer = Trainer(engine=engine, timer=timer, logger=logger) - - hook_list = [ - hooks.LossHook(), - hooks.AccuracyHook(col_nn.metric.Accuracy()), - hooks.LogMetricByEpochHook(logger), - ] - - trainer.fit(train_dataloader=train_dataloader, - epochs=NUM_EPOCHS, - test_dataloader=test_dataloader, - test_interval=1, - hooks=hook_list, - display_progress=True) -``` - diff --git a/docs/source/en/advanced_tutorials/train_vit_with_hybrid_parallelism.md b/docs/source/en/advanced_tutorials/train_vit_with_hybrid_parallelism.md index 0ec9d5c3c..93fed61c3 100644 --- a/docs/source/en/advanced_tutorials/train_vit_with_hybrid_parallelism.md +++ b/docs/source/en/advanced_tutorials/train_vit_with_hybrid_parallelism.md @@ -1,10 +1,14 @@ # Step By Step: Accelerate ViT Training With Colossal-AI (From Data Parallel to Hybrid Parallel) -Author: Yuxuan Lou +Author: Yuxuan Lou, Mingyan Jiang + +**Prerequisite:** +- [parallelism plugin](../basics/booster_plugins.md) +- [booster API](../basics/booster_api.md) **Example Code** -- [Colossal-AI Examples ViT on Cifar10](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/image/vision_transformer) +- [Colossal-AI Examples ViT on `beans`](https://github.com/hpcaitech/ColossalAI/blob/main/examples/images/vit/vit_train_demo.py) **Related Paper** - [An Image is Worth 16x16 Words: Transformers for Image Recognition at Scale](https://arxiv.org/pdf/2010.11929.pdf) @@ -13,14 +17,14 @@ Author: Yuxuan Lou ## Introduction In this example for ViT model, Colossal-AI provides three different parallelism techniques which accelerate model training: data parallelism, pipeline parallelism and tensor parallelism. -We will show you how to train ViT on CIFAR-10 dataset with these parallelism techniques. To run this example, you will need 2-4 GPUs. +We will show you how to train ViT on `beans` dataset with these parallelism techniques. To run this example, you will need 2-4 GPUs. ## Table of Contents 1. Colossal-AI installation -2. Steps to train ViT with data parallelism -3. Steps to train ViT with pipeline parallelism -4. Steps to train ViT with tensor parallelism or hybrid parallelism +2. Define the ViT model and related training components. +3. Boost the VIT Model with [`HybridParallelPlugin`](../basics/booster_plugins.md) +4. Train the VIT model using data parallelism, pipeline parallelism, and tensor parallelism. ## Colossal-AI Installation You can install Colossal-AI package and its dependencies with PyPI. @@ -29,619 +33,250 @@ pip install colossalai ``` - -## Data Parallelism -Data parallelism is one basic way to accelerate model training process. You can apply data parallelism to training by only two steps: -1. Define a configuration file -2. Change a few lines of code in train script - -### Define your configuration file (`data_parallel/config.py`) -To use Colossal-AI, the first step is to define a configuration file. And there are two kinds of variables here: - -1. **Colossal-AI feature specification** - -There is an array of features Colossal-AI provides to speed up training (parallel mode, mixed precision, ZeRO, etc.). Each feature is defined by a corresponding field in the config file. If we apply data parallel only, we do not need to specify the parallel mode. In this example, we use mixed precision training natively provided by PyTorch by define the mixed precision configuration `fp16 = dict(mode=AMP_TYPE.TORCH)`. - -2. **Global hyper-parameters** - -Global hyper-parameters include model-specific hyper-parameters, training settings, dataset information, etc. - +## Import libraries ```python -from colossalai.amp import AMP_TYPE - -# ViT Base -BATCH_SIZE = 256 -DROP_RATE = 0.1 -NUM_EPOCHS = 300 - -# mix precision -fp16 = dict( - mode=AMP_TYPE.TORCH, -) - -gradient_accumulation = 16 -clip_grad_norm = 1.0 - -dali = dict( - gpu_aug=True, - mixup_alpha=0.2 -) -``` +from typing import Any, Callable, Iterator -### Modify train script (`/data_parallel/train_with_cifar10.py`) +import torch +import torch.distributed as dist +import torch.nn as nn +import transformers +from data import BeansDataset, beans_collator +from torch.optim import Optimizer +from torch.optim.lr_scheduler import _LRScheduler as LRScheduler +from torch.utils.data import DataLoader +from tqdm import tqdm +from transformers import ViTConfig, ViTForImageClassification, ViTImageProcessor -#### Import modules -- Colossal-AI related modules -```python import colossalai -from colossalai.context import ParallelMode -from colossalai.core import global_context as gpc +from colossalai.booster import Booster +from colossalai.booster.plugin import GeminiPlugin, HybridParallelPlugin, LowLevelZeroPlugin, TorchDDPPlugin +from colossalai.cluster import DistCoordinator from colossalai.logging import disable_existing_loggers, get_dist_logger -from colossalai.nn.lr_scheduler import LinearWarmupLR -from colossalai.legacy.nn.metric import Accuracy -from colossalai.legacy.trainer import Trainer, hooks -``` - -- Other modules -```python -import os - -import torch -from timm.models import vit_base_patch16_224 - - -from torchvision import transforms -from torchvision.datasets import CIFAR10 +from colossalai.nn.lr_scheduler import CosineAnnealingWarmupLR +from colossalai.nn.optimizer import HybridAdam ``` - -#### Launch Colossal-AI - -In train script, you need to initialize the distributed environment for Colossal-AI after your config file is prepared. We call this process `launch`. In Colossal-AI, we provided several launch methods to initialize the distributed backend. In most cases, you can use `colossalai.launch` and `colossalai.get_default_parser` to pass the parameters via command line. Besides, Colossal-AI can utilize the existing launch tool provided by PyTorch as many users are familiar with by using `colossalai.launch_from_torch`. For more details, you can view the related [documents](https://www.colossalai.org/docs/basics/launch_colossalai). - +## Define the Vision Transformer (VIT) model. +Define hyperparameters. ```python -# initialize distributed setting -parser = colossalai.get_default_parser() -args = parser.parse_args() -colossalai.launch_from_torch(config=args.config) - -disable_existing_loggers() -logger = get_dist_logger() +SEED = 42 +MODEL_PATH = "google/vit-base-patch16-224" +LEARNING_RATE = 5e-5 +WEIGHT_DECAY = 0.0 +NUM_EPOCH = 3 +WARMUP_RATIO = 0.3 +TP_SIZE = 2 +PP_SIZE = 2 ``` - -After initialization, you can access the variables in the config file by using `colossalai.core.global_context`. - +Create a distributed environment. ```python -#access parameters -print(gpc.config.BATCH_SIZE) +# Launch ColossalAI +colossalai.launch_from_torch(config={}, seed=SEEDå) +coordinator = DistCoordinator() +world_size = coordinator.world_size ``` +Before training, you can define the relevant components of the model training process as usual, such as defining the model, data loaders, optimizer, and so on. It's important to note that when using pipeline parallelism, you also need to define a criterion function. This function takes the input and output of the model forward pass as inputs and returns the loss. +Prepare the dataset. BeansDataset is defined in [data.py](https://github.com/hpcaitech/ColossalAI/blob/main/examples/images/vit/data.py). -#### Build Model - -If only data parallelism is required, you do not need to make any changes to your model. Here, we use `vit_base_patch16_224` from `timm`. ```python -# build model -model = vit_base_patch16_224(drop_rate=0.1, num_classes=gpc.config.NUM_CLASSES) +image_processor = ViTImageProcessor.from_pretrained(MODEL_PATH) +train_dataset = BeansDataset(image_processor, TP_SIZE, split="train") +eval_dataset = BeansDataset(image_processor, RP_SIZE, split="validation") +num_labels = train_dataset.num_labels ``` - -#### Build CIFAR-10 Dataloader -`colossalai.utils.get_dataloader` can help you build dataloader easily. - +Define the VIT model: ```python -def build_cifar(batch_size): - transform_train = transforms.Compose([ - transforms.RandomCrop(224, pad_if_needed=True), - transforms.AutoAugment(policy=transforms.AutoAugmentPolicy.CIFAR10), - transforms.ToTensor(), - transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), - ]) - transform_test = transforms.Compose([ - transforms.Resize(224), - transforms.ToTensor(), - transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), - ]) - - train_dataset = CIFAR10(root=os.environ['DATA'], train=True, download=True, transform=transform_train) - test_dataset = CIFAR10(root=os.environ['DATA'], train=False, transform=transform_test) - train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=batch_size, pin_memory=True) - test_dataloader = get_dataloader(dataset=test_dataset, batch_size=batch_size, pin_memory=True) - return train_dataloader, test_dataloader - - -# build dataloader -train_dataloader, test_dataloader = build_cifar(gpc.config.BATCH_SIZE) +config = ViTConfig.from_pretrained(MODEL_PATH) +config.num_labels = num_labels +config.id2label = {str(i): c for i, c in enumerate(train_dataset.label_names)} +config.label2id = {c: str(i) for i, c in enumerate(train_dataset.label_names)} +model = ViTForImageClassification.from_pretrained( + MODEL_PATH, config=config, ignore_mismatched_sizes=True +) ``` - -#### Define optimizer, loss function and LR scheduler - -Colossal-AI provides its own optimizer, loss function and LR scheduler. Those from PyTorch are also compatible. - +Define the optimizer: ```python -# build optimizer -optimizer = colossalai.nn.Lamb(model.parameters(), lr=1.8e-2, weight_decay=0.1) - -# build loss -criterion = torch.nn.CrossEntropyLoss() - -# lr_scheduler -lr_scheduler = LinearWarmupLR(optimizer, warmup_steps=50, total_steps=gpc.config.NUM_EPOCHS) +optimizer = HybridAdam(model.parameters(), lr=(LEARNING_RATE * world_size), weight_decay=WEIGHT_DECAY) ``` - -#### Start Colossal-AI engine - -Engine is essentially a wrapper class for model, optimizer and loss function. When we call `colossalai.initialize`, an engine object will be returned, and it has already been equipped with functionalities such as gradient clipping, gradient accumulation and zero optimizer as specified in your configuration file. Further model training is based on Colossal-AI engine. - +Define the learning rate scheduler: ```python -engine, train_dataloader, test_dataloader, _ = colossalai.initialize( - model, optimizer, criterion, train_dataloader, test_dataloader +total_steps = len(train_dataloader) * NUM_EPOCH +num_warmup_steps = int(WARMUP_RATIO * total_steps) +lr_scheduler = CosineAnnealingWarmupLR( + optimizer=optimizer, total_steps=(len(train_dataloader) * NUM_EPOCH), warmup_steps=num_warmup_steps ) ``` - -#### Train: Trainer API -Trainer is a more high-level wrapper for the user to execute training with fewer lines of code. It is easy to create a trainer object by passing the engine object. - -Besides, In trainer, the user can customize some hooks and attach these hooks to the trainer object. A hook object will execute life-cycle methods periodically based on the training scheme. For example, The `LRSchedulerHook` will execute `lr_scheduler.step()` to update the learning rate of the model during either `after_train_iter` or `after_train_epoch` stages. - -```python -# build trainer -trainer = Trainer(engine=engine, logger=logger) - -# build hooks -hook_list = [ - hooks.LossHook(), - hooks.AccuracyHook(accuracy_func=MixupAccuracy()), - hooks.LogMetricByEpochHook(logger), - hooks.LRSchedulerHook(lr_scheduler, by_epoch=True), - - # comment if you do not need to use the hooks below - hooks.SaveCheckpointHook(interval=1, checkpoint_dir='./ckpt'), - hooks.TensorboardHook(log_dir='./tb_logs', ranks=[0]), -] -``` - -Use `trainer.fit` for training: - -```python -# start training -trainer.fit( - train_dataloader=train_dataloader, - test_dataloader=test_dataloader, - epochs=gpc.config.NUM_EPOCHS, - hooks=hook_list, - display_progress=True, - test_interval=1 -) -``` - -### Start training -`DATA` is the filepath where CIFAR-10 dataset will be automatically downloaded and stored. - -`` is the number of GPUs you want to use to train ViT on CIFAR-10 with data parallelism. - -```bash -export DATA= -# If your torch >= 1.10.0 -torchrun --standalone --nproc_per_node train_dp.py --config ./configs/config_data_parallel.py -# If your torch >= 1.9.0 -# python -m torch.distributed.run --standalone --nproc_per_node= train_dp.py --config ./configs/config_data_parallel.py -# Otherwise -# python -m torch.distributed.launch --nproc_per_node --master_addr --master_port 29500 train_dp.py --config ./configs/config.py -``` - - - -## Pipeline Parallelism -Aside from data parallelism, Colossal-AI also support pipeline parallelism. In specific, Colossal-AI uses 1F1B pipeline introduced by NVIDIA. For more details, you can view the related [documents](https://www.colossalai.org/tutorials/features/pipeline_parallel). - -### Define your configuration file(`hybrid_parallel/configs/vit_pipeline.py`) -To apply pipeline parallel on the data parallel basis, you only need to add a **parallel dict** +Define the criterion function: ```python -from colossalai.amp import AMP_TYPE - -parallel = dict( - pipeline=2 -) -# pipeline config -NUM_MICRO_BATCHES = parallel['pipeline'] -TENSOR_SHAPE = (BATCH_SIZE // NUM_MICRO_BATCHES, SEQ_LENGTH, HIDDEN_SIZE) - -fp16 = dict(mode=AMP_TYPE.NAIVE) -clip_grad_norm = 1.0 +def _criterion(outputs, inputs): + return outputs.loss ``` - -Other configs: +## Boost the VIT Model +We begin using ColossalAI's hybrid parallelism strategy to enhance the model. First, let's define an object of `HybridParallelPlugin`. `HybridParallelPlugin` encapsulates various parallelism strategies in ColossalAI. Afterward, we use the `HybridParallelPlugin` object to initialize the booster and boost the VIT model. +### Training with AMP +In the HybridParallelPlugin plugin, you can determine the training precision by setting the precision parameter, which supports three types: 'fp16', 'bf16', and 'fp32'. 'fp16' and 'bf16' are half-precision types. Half-precision is used in two scenarios in the HybridParallelPlugin: +1. When using zero-data parallelism, you should set it to half-precision. +2. When specifying the use of AMP (Automatic Mixed Precision) for training. +You can set related parameters when using half-precision. +`initial_scale` (float, optional): Initial loss scaling factor for AMP. Default value is 2**16. +`min_scale` (float, optional): Minimum loss scaling factor for AMP. Default value is 1. +`growth_factor` (float, optional): Multiplicative factor used to increase the loss scaling factor when using AMP. Default value is 2. +`backoff_factor` (float, optional): Multiplicative factor used to decrease the loss scaling factor when using AMP. Default value is 0.5. +`growth_interval` (integer, optional): Number of steps to increase the loss scaling factor when using AMP, in cases where there is no overflow. Default value is 1000. +`hysteresis` (integer, optional): Number of overflows required before reducing the loss scaling factor when using AMP. Default value is 2. +`max_scale` (float, optional): Maximum loss scaling factor for AMP. Default value is 2**32. +Plugin example when using amp: ```python -# hyper parameters -# BATCH_SIZE is as per GPU -# global batch size = BATCH_SIZE x data parallel size -BATCH_SIZE = 256 -LEARNING_RATE = 3e-3 -WEIGHT_DECAY = 0.3 -NUM_EPOCHS = 300 -WARMUP_EPOCHS = 32 - -# model config -IMG_SIZE = 224 -PATCH_SIZE = 16 -HIDDEN_SIZE = 768 -DEPTH = 12 -NUM_HEADS = 12 -MLP_RATIO = 4 -NUM_CLASSES = 10 -CHECKPOINT = True -SEQ_LENGTH = (IMG_SIZE // PATCH_SIZE) ** 2 + 1 # add 1 for cls token +plugin = HybridParallelPlugin( + precision="fp16", + initial_scale=1, + ) ``` - -### Build pipeline model (`/hybrid_parallel/model/vit.py`) -Colossal-AI provides two methods to build a pipeline model from the existing model. -- `colossalai.legacy.builder.build_pipeline_model_from_cfg` -- `colossalai.legacy.builder.build_pipeline_model` - -Besides, you can also build a pipeline model from scratch with Colossal-AI. +### Tensor parallelism +`HybridParallelPlugin` achieves tensor parallelism through Shardformer. In this plugin, you can set the `tp_size` to determine the size of tensor parallel groups. Additionally, there are multiple parameters that can be configured to optimize tensor parallelism features when using this plugin: +`enable_all_optimization` (boolean, optional): Whether to enable all optimization methods supported by Shardformer. Currently, all optimization methods include fused normalization, flash attention, and JIT. Default is False. +`enable_fused_normalization` (boolean, optional): Whether to enable fused normalization in Shardformer. Default is False. +`enable_flash_attention` (boolean, optional): Whether to enable flash attention in Shardformer. Default is False. +`enable_jit_fused` (boolean, optional): Whether to enable JIT (Just-In-Time) fusion in Shardformer. Default is False. +`enable_sequence_parallelism` (boolean): Whether to enable sequence parallelism in Shardformer. Default is False. +`enable_sequence_overlap` (boolean): Whether to enable sequence overlap in Shardformer. Default is False. +Example of a tensor parallelism plugin: ```python -import math -from typing import Callable - -import inspect -import torch -from colossalai import nn as col_nn -from colossalai.legacy.registry import LAYERS, MODELS -from colossalai.logging import get_dist_logger -from colossalai.core import global_context as gpc -from colossalai.context import ParallelMode -from colossalai.legacy.builder.pipeline import partition_uniform -from torch import dtype, nn -from model_zoo.vit.vit import ViTBlock, ViTEmbedding, ViTHead - - -@MODELS.register_module -class PipelineVisionTransformer(nn.Module): - def __init__(self, - img_size: int = 224, - patch_size: int = 16, - in_chans: int = 3, - num_classes: int = 1000, - depth: int = 12, - num_heads: int = 12, - dim: int = 768, - mlp_ratio: int = 4, - attention_dropout: float = 0., - dropout: float = 0.1, - drop_path: float = 0., - layernorm_epsilon: float = 1e-6, - activation: Callable = nn.functional.gelu, - representation_size: int = None, - dtype: dtype = None, - bias: bool = True, - checkpoint: bool = False, - init_method: str = 'torch', - first_stage=True, - last_stage=True, - start_idx=None, - end_idx=None,): - super().__init__() - - layers = [] - - if first_stage: - embed = ViTEmbedding(img_size=img_size, - patch_size=patch_size, - in_chans=in_chans, - embedding_dim=dim, - dropout=dropout, - dtype=dtype, - init_method=init_method) - layers.append(embed) - - # stochastic depth decay rule - dpr = [x.item() for x in torch.linspace(0, drop_path, depth)] - - if start_idx is None and end_idx is None: - start_idx = 0 - end_idx = depth - - blocks = [ - ViTBlock( - dim=dim, - num_heads=num_heads, - mlp_ratio=mlp_ratio, - attention_dropout=attention_dropout, - dropout=dropout, - drop_path=dpr[i], - activation=activation, - dtype=dtype, - bias=bias, - checkpoint=checkpoint, - init_method=init_method, - ) for i in range(start_idx, end_idx) - ] - layers.extend(blocks) - - if last_stage: - norm = col_nn.LayerNorm(normalized_shape=dim, eps=layernorm_epsilon, dtype=dtype) - head = ViTHead(dim=dim, - num_classes=num_classes, - representation_size=representation_size, - dtype=dtype, - bias=bias, - init_method=init_method) - layers.extend([norm, head]) - - self.layers = nn.Sequential( - *layers +plugin = HybridParallelPlugin( + tp_size=4, + enable_all_optimization=True ) - - def forward(self, x): - x = self.layers(x) - return x - - -def _filter_kwargs(func, kwargs): - sig = inspect.signature(func) - return {k: v for k, v in kwargs.items() if k in sig.parameters} - - -def _build_pipeline_vit(module_cls, num_layers, num_chunks, device=torch.device('cuda'), **kwargs): - logger = get_dist_logger() - if gpc.is_initialized(ParallelMode.PIPELINE): - pipeline_size = gpc.get_world_size(ParallelMode.PIPELINE) - pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE) - else: - pipeline_size = 1 - pipeline_rank = 0 - rank = gpc.get_global_rank() - parts = partition_uniform(num_layers, pipeline_size, num_chunks)[pipeline_rank] - models = [] - - for start, end in parts: - kwargs['first_stage'] = start == 0 - kwargs['last_stage'] = end == num_layers - kwargs['start_idx'] = start - kwargs['end_idx'] = end - logger.info(f'Rank{rank} build layer {start}-{end}, {end-start}/{num_layers} layers') - chunk = module_cls(**_filter_kwargs(module_cls.__init__, kwargs)).to(device) - models.append(chunk) - if len(models) == 1: - model = models[0] - else: - model = nn.ModuleList(models) - return model - - -def build_pipeline_vit(num_layers, num_chunks, device=torch.device('cuda'), **kwargs): - return _build_pipeline_vit(PipelineVisionTransformer, num_layers, num_chunks, device, **kwargs) ``` +### Pipeline Parallelism -### Modify train script (`/hybrid_parallel/train_with_cifar10.py`) - -#### Import modules +`HybridParallelPlugin` determines the size of pipeline parallelism groups by setting `pp_size`. `num_microbatches` is used to specify the number of microbatches into which the entire batch is divided during pipeline parallelism, and `microbatch_size` can be set to define the size of these microbatches. The plugin will prioritize using `num_microbatches` to determine the microbatch configuration. +Example of a plugin for pipeline parallelism: ```python -from colossalai.legacy.engine.schedule import (InterleavedPipelineSchedule, - PipelineSchedule) -from colossalai.utils import MultiTimer -import os - -import colossalai - -import torch -from colossalai.context import ParallelMode -from colossalai.core import global_context as gpc -from colossalai.logging import get_dist_logger -from colossalai.nn import CrossEntropyLoss -from colossalai.nn.lr_scheduler import CosineAnnealingWarmupLR -from colossalai.utils import is_using_pp, get_dataloader -from model.vit import build_pipeline_vit -from model_zoo.vit.vit import _create_vit_model -from tqdm import tqdm - -from torchvision import transforms -from torchvision.datasets import CIFAR10 +plugin = HybridParallelPlugin( + pp_size=4, + num_microbatches=None, + microbatch_size=1 + ) ``` - -#### Launch Colossal-AI -`colossalai.utils.is_using_pp` can help check whether pipeline parallelism is required in config file. - +### Data Parallelism +The `HybridParallelPlugin`'s data parallelism includes both the zero-dp series and Torch DDP. When `zero_stage` is set to 0 (the default), it means using Torch DDP. Please note that Torch DDP conflicts with pipeline parallelism and cannot be used together. When `zero_stage` is set to 1, it indicates the use of the zero1 strategy. When `zero_stage` is set to 2, it implies the use of the zero2 strategy. The zero2 strategy also cannot be used together with pipeline parallelism. If you want to use zero3, please use the [`GeminiPlugin`](../basics/booster_plugins.md). +When using data parallelism with the zero series, please set the training precision to half-precision. If you haven't specified the use of zero or pipeline parallelism, and if `world_size//(tp_size*pp_size)` is greater than 1, the HybridParallelPlugin will automatically enable the Torch DDP parallel strategy for you. +Here are some related parameters for configuring Torch DDP: +`broadcast_buffers` (boolean, optional): Whether to broadcast buffers at the beginning of training when using DDP. Default is True. +`ddp_bucket_cap_mb` (integer, optional): Size of the bucket (in MB) when using DDP. Default is 25. +`find_unused_parameters` (boolean, optional): Whether to search for unused parameters when using DDP. Default is False. +`check_reduction` (boolean, optional): Whether to check the reduction operation when using DDP. Default is False. +`gradient_as_bucket_view` (boolean, optional): Whether to use gradients as bucket views when using DDP. Default is False. +`static_graph` (boolean, optional): Whether to use a static graph when using DDP. Default is False. +Example of a plugin for Torch DDP. ```python -# initialize distributed setting -parser = colossalai.get_default_parser() -args = parser.parse_args() - -# launch from torch -colossalai.launch_from_torch(config=args.config) - -# get logger -logger = get_dist_logger() -logger.info("initialized distributed environment", ranks=[0]) - -if hasattr(gpc.config, 'LOG_PATH'): - if gpc.get_global_rank() == 0: - log_path = gpc.config.LOG_PATH - if not os.path.exists(log_path): - os.mkdir(log_path) - logger.log_to_file(log_path) - -use_pipeline = is_using_pp() +plugin = HybridParallelPlugin( + tp_size=2, + pp_size=1, + zero_stage=0, + precision="fp16", + initial_scale=1, + ) ``` - -#### Define model - +If there are 4 parallel processes, the parallel group size for Torch DDP is 2. +ZeRO-related parameters: +`zero_bucket_size_in_m` (integer, optional): The bucket size for gradient reduction in megabytes when using ZeRO. Default is 12. +`cpu_offload` (boolean, optional): Whether to enable cpu_offload when using ZeRO. Default is False. +`communication_dtype` (torch data type, optional): The data type for communication when using ZeRO. If not specified, the data type of the parameters will be used. Default is None. +`overlap_communication` (boolean, optional): Whether to overlap communication and computation when using ZeRO. Default is True. +Example of a plugin for ZERO1. ```python -# create model -model_kwargs = dict(img_size=gpc.config.IMG_SIZE, - patch_size=gpc.config.PATCH_SIZE, - dim=gpc.config.HIDDEN_SIZE, - depth=gpc.config.DEPTH, - num_heads=gpc.config.NUM_HEADS, - mlp_ratio=gpc.config.MLP_RATIO, - num_classes=gpc.config.NUM_CLASSES, - init_method='jax', - checkpoint=gpc.config.CHECKPOINT) - -if use_pipeline: - model = build_pipeline_vit(num_layers=model_kwargs['depth'], num_chunks=1, **model_kwargs) -else: - model = _create_vit_model(**model_kwargs) -``` - -#### Count number of parameters - -You can count model parameters on different pipeline stages easily. - -``` -# count number of parameters -total_numel = 0 -for p in model.parameters(): - total_numel += p.numel() -if not gpc.is_initialized(ParallelMode.PIPELINE): - pipeline_stage = 0 -else: - pipeline_stage = gpc.get_local_rank(ParallelMode.PIPELINE) -logger.info(f"number of parameters: {total_numel} on pipeline stage {pipeline_stage}") +plugin = HybridParallelPlugin( + tp_size=1, + pp_size=1, + zero_stage=1, + cpu_offload=True, + precision="fp16", + initial_scale=1, + ) ``` -#### Build dataloader, optimizer, etc. - +### Hybrid Parallelism +You can refer to the above-mentioned strategies to customize an appropriate hybrid parallelism strategy. And use this plugin to define a booster. ```python -def build_cifar(batch_size): - transform_train = transforms.Compose([ - transforms.RandomCrop(224, pad_if_needed=True), - transforms.AutoAugment(policy=transforms.AutoAugmentPolicy.CIFAR10), - transforms.ToTensor(), - transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), - ]) - transform_test = transforms.Compose([ - transforms.Resize(224), - transforms.ToTensor(), - transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), - ]) - - train_dataset = CIFAR10(root=os.environ['DATA'], train=True, download=True, transform=transform_train) - test_dataset = CIFAR10(root=os.environ['DATA'], train=False, transform=transform_test) - train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=batch_size, pin_memory=True) - test_dataloader = get_dataloader(dataset=test_dataset, batch_size=batch_size, pin_memory=True) - return train_dataloader, test_dataloader - - -# create dataloaders -train_dataloader , test_dataloader = build_cifar() - -# create loss function -criterion = CrossEntropyLoss(label_smoothing=0.1) - -# create optimizer -optimizer = torch.optim.AdamW(model.parameters(), lr=gpc.config.LEARNING_RATE, weight_decay=gpc.config.WEIGHT_DECAY) - -# create lr scheduler -lr_scheduler = CosineAnnealingWarmupLR(optimizer=optimizer, - total_steps=gpc.config.NUM_EPOCHS, - warmup_steps=gpc.config.WARMUP_EPOCHS) +plugin = HybridParallelPlugin( + tp_size=TP_SIZE, + pp_size=PP_SIZE, + num_microbatches=None, + microbatch_size=1, + enable_all_optimization=True, + precision="fp16", + initial_scale=1, + ) +booster = Booster(plugin=plugin) ``` - -#### Start Colossal-AI engine - +Next, we use `booster.boost` to inject the features encapsulated by the plugin into the model training components. ```python -# initialize -engine, train_dataloader, test_dataloader, _ = colossalai.initialize(model=model, - optimizer=optimizer, - criterion=criterion, - train_dataloader=train_dataloader, - test_dataloader=test_dataloader) - -logger.info("Engine is built", ranks=[0]) +model, optimizer, _criterion, train_dataloader, lr_scheduler = booster.boost( + model=model, optimizer=optimizer, criterion=criterion, dataloader=train_dataloader, lr_scheduler=lr_scheduler + ) ``` - -#### Train: based on engine - -In the data parallelism example, we show how to train a model with Trainer API. We can also directly train a model based on engine. In this way, you can customize your training with more features. - +## Train ViT using hybrid parallelism. +Finally, we can use the hybrid parallelism strategy to train the model. Let's first define a training function that describes the training process. It's important to note that if the pipeline parallelism strategy is used, you should call `booster.execute_pipeline` to perform the model training. This function will invoke the `scheduler` to manage the model's forward and backward operations. ```python -data_iter = iter(train_dataloader) - -for epoch in range(gpc.config.NUM_EPOCHS): - # training - engine.train() - - if gpc.get_global_rank() == 0: - description = 'Epoch {} / {}'.format( - epoch, - gpc.config.NUM_EPOCHS +def run_forward_backward( + model: nn.Module, + optimizer: Optimizer, + criterion: Callable[[Any, Any], torch.Tensor], + data_iter: Iterator, + booster: Booster, +): + if optimizer is not None: + optimizer.zero_grad() + if isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1: + # run pipeline forward backward when enabling pp in hybrid parallel plugin + output_dict = booster.execute_pipeline( + data_iter, model, criterion, optimizer, return_loss=True, return_outputs=True ) - progress = tqdm(range(len(train_dataloader)), desc=description) + loss, outputs = output_dict["loss"], output_dict["outputs"] else: - progress = range(len(train_dataloader)) - for _ in progress: - engine.zero_grad() - engine.execute_schedule(data_iter, return_output_label=False) - engine.step() - lr_scheduler.step() -``` - -### Start training -```bash -export DATA= -# If your torch >= 1.10.0 -torchrun --standalone --nproc_per_node train_hybrid.py --config ./configs/config_pipeline_parallel.py -# If your torch >= 1.9.0 -# python -m torch.distributed.run --standalone --nproc_per_node= train_hybrid.py --config ./configs/config_pipeline_parallel.py -``` - - - - -## Tensor Parallelism and Hybrid Parallelism -Tensor parallelism partitions each weight parameter across multiple devices in order to reduce memory load. Colossal-AI support 1D, 2D, 2.5D and 3D tensor parallelism. Besides, you can combine tensor parallelism with pipeline parallelism and data parallelism to reach hybrid parallelism. Colossal-AI also provides an easy way to apply tensor parallelism and hybrid parallelism. On the basis of pipeline parallelism, a few lines of code changing in config file is all you need. - -### Define your configuration file(`/hybrid_parallel/configs/vit_1d_tp2_pp2.py`) -To use tensor parallelism, you only need to add related information to the **parallel dict**. To be specific, `TENSOR_PARALLEL_MODE` can be '1d', '2d', '2.5d', '3d'. And the size of different parallelism should satisfy: `#GPUs = pipeline parallel size x tensor parallel size x data parallel size`. `data parallel size` will automatically computed after you specify the number of GPUs, pipeline parallel size and tensor parallel size. - -```python -from colossalai.amp import AMP_TYPE -# parallel setting -TENSOR_PARALLEL_SIZE = 2 -TENSOR_PARALLEL_MODE = '1d' - -parallel = dict( - pipeline=2, - tensor=dict(mode=TENSOR_PARALLEL_MODE, size=TENSOR_PARALLEL_SIZE) -) - -fp16 = dict(mode=AMP_TYPE.NAIVE) -clip_grad_norm = 1.0 - - -# pipeline config -NUM_MICRO_BATCHES = parallel['pipeline'] -TENSOR_SHAPE = (BATCH_SIZE // NUM_MICRO_BATCHES, SEQ_LENGTH, HIDDEN_SIZE) + batch = next(data_iter) + batch = move_to_cuda(batch, torch.cuda.current_device()) + outputs = model(**batch) + loss = criterion(outputs, None) + if optimizer is not None: + booster.backward(loss, optimizer) + +def train_epoch( + epoch: int, + model: nn.Module, + optimizer: Optimizer, + criterion: Callable[[Any, Any], torch.Tensor], + lr_scheduler: LRScheduler, + dataloader: DataLoader, + booster: Booster, + coordinator: DistCoordinator, +): + torch.cuda.synchronize() + + num_steps = len(dataloader) + data_iter = iter(dataloader) + enable_pbar = coordinator.is_master() + if isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1: + # when using pp, only the last stage of master pipeline (dp_rank and tp_rank are both zero) shows pbar + tp_rank = dist.get_rank(booster.plugin.tp_group) + dp_rank = dist.get_rank(booster.plugin.dp_group) + enable_pbar = tp_rank == 0 and dp_rank == 0 and booster.plugin.stage_manager.is_last_stage() + model.train() + + with tqdm(range(num_steps), desc=f"Epoch [{epoch + 1}]", disable=not enable_pbar) as pbar: + for _ in pbar: + loss, _ = run_forward_backward(model, optimizer, criterion, data_iter, booster) + optimizer.step() + lr_scheduler.step() + + # Print batch loss + if enable_pbar: + pbar.set_postfix({"loss": loss.item()}) ``` - -Other configs: +Start training the model. ```python -# hyper parameters -# BATCH_SIZE is as per GPU -# global batch size = BATCH_SIZE x data parallel size -BATCH_SIZE = 256 -LEARNING_RATE = 3e-3 -WEIGHT_DECAY = 0.3 -NUM_EPOCHS = 300 -WARMUP_EPOCHS = 32 - -# model config -IMG_SIZE = 224 -PATCH_SIZE = 16 -HIDDEN_SIZE = 768 -DEPTH = 12 -NUM_HEADS = 12 -MLP_RATIO = 4 -NUM_CLASSES = 10 -CHECKPOINT = True -SEQ_LENGTH = (IMG_SIZE // PATCH_SIZE) ** 2 + 1 # add 1 for cls token -``` - -### Start training -```bash -export DATA= -# If your torch >= 1.10.0 -torchrun --standalone --nproc_per_node train_hybrid.py --config ./configs/config_hybrid_parallel.py -# If your torch >= 1.9.0 -# python -m torch.distributed.run --standalone --nproc_per_node= train_hybrid.py --config ./configs/config_hybrid_parallel.py +for epoch in range(NUM_EPOCH): + train_epoch(epoch, model, optimizer, criterion, lr_scheduler, train_dataloader, booster, coordinator) ``` diff --git a/docs/source/zh-Hans/advanced_tutorials/train_gpt_using_hybrid_parallelism.md b/docs/source/zh-Hans/advanced_tutorials/train_gpt_using_hybrid_parallelism.md index a1d58e9fd..117406980 100644 --- a/docs/source/zh-Hans/advanced_tutorials/train_gpt_using_hybrid_parallelism.md +++ b/docs/source/zh-Hans/advanced_tutorials/train_gpt_using_hybrid_parallelism.md @@ -1,10 +1,13 @@ -# 使用混合并行训练 GPT +# 使用混合并行训练 GPT-2 -作者: Hongxin Liu, Yongbin Li +作者: Hongxin Liu, Yongbin Li, Mingyan Jiang + +**前置教程** +- [并行插件](../basics/booster_plugins.md) +- [booster API](../basics/booster_api.md) **示例代码** -- [ColossalAI-Examples GPT2](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/language/gpt_2) -- [ColossalAI-Examples GPT3](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/language/gpt_3) +- [ColossalAI-Examples GPT2](https://github.com/hpcaitech/ColossalAI/blob/main/examples/language/gpt/hybridparallelism/finetune.py) **相关论文** - [Colossal-AI: A Unified Deep Learning System For Large-Scale Parallel Training](https://arxiv.org/abs/2110.14883) @@ -12,265 +15,190 @@ ## 引言 -在上一篇教程中,我们介绍了如何用流水并行训练 ViT。在本教程中,你将学习一个更复杂的场景--用混合并行方式训练GPT。在这种情况下,由于GPT-3过大,即使CPU内存也无法容纳它。因此,你必须自己分割模型。 +在上一篇教程中,我们介绍了如何用流水并行训练 ViT。在本教程中,你将学习一个更复杂的场景--用混合并行方式训练GPT-2。在这种情况下,由于GPT-2过大,即使CPU内存也无法容纳它。因此,该模型必须被分割。 ## 目录 在本教程中,我们将介绍: - -1. 基于 colossalai/model_zoo 定义 GPT 模型 -2. 处理数据集 -3. 使用混合并行训练 GPT +1. 初始化混合并行插件 +2. 定义 GPT-2 模型的训练组件 +3. 使用 [HybridParallelPlugin](../basics/booster_plugins.md) 增强GPT-2模型 +4. 使用混合并行训练 GPT-2 ## 导入依赖库 ```python -import json -import os -from typing import Callable - -import colossalai -import colossalai.utils as utils -import model_zoo.gpt.gpt as col_gpt +from typing import Callable, List, Union import torch +import torch.distributed as dist import torch.nn as nn -from colossalai import nn as col_nn -from colossalai.amp import AMP_TYPE -from colossalai.legacy.builder.pipeline import partition_uniform -from colossalai.legacy.context.parallel_mode import ParallelMode -from colossalai.core import global_context as gpc -from colossalai.legacy.engine.schedule import (InterleavedPipelineSchedule, - PipelineSchedule) -from colossalai.logging import disable_existing_loggers, get_dist_logger -from colossalai.legacy.nn.layer.wrapper import PipelineSharedModuleWrapper -from colossalai.legacy.trainer import Trainer, hooks -from colossalai.utils.timer import MultiTimer -from model_zoo.gpt import GPTLMLoss -from torch.nn import functional as F -from torch.utils.data import Dataset -from transformers import GPT2Tokenizer -``` - - - -## 定义 GPT 模型 - -在前面的教程中,我们介绍了3种建立流水并行模型的方法,但对于像 GPT-3 这样的巨大模型,你甚至不能在 CPU 中建立模型。在这种情况下,你必须自己分割模型。 - -GPT 数据加载器返回 `input_ids` 和 `attention_mask`, 因此我们在 `forward()` 中使用两个关键字参数来获得它们。请注意,对于除第一阶段以外的其他阶段, `forward()` 的第一个位置参数是上一阶段的输出张量。所以 `hidden_states` 来自前一阶段,并且对于第一阶段来说,它是 `None`。 - -对于 GPT, *word embedding layer* 与 *output head* 共享权重。我们提供 `PipelineSharedModuleWrapper` 在流水阶段间共享参数。它需要一个 `int` 型的 `list` 作为参数, 这意味着 rank 们共享这些参数。你可以使用 `register_module()` -或 `register_parameter()` 来注册一个模块或一个参数作为共享模块或参数。如果你有多组共享模块/参数,你应该有多个 `PipelineSharedModuleWrapper` 实例。 如果参数在**一个**阶段内共享, 你不应该使用 -`PipelineSharedModuleWrapper`, 而只是使用同一个模块/参数实例。在这个例子中,*word embedding layer* 在第一阶段, 而 *output head* 在最后一个阶段。因此,他们在 rank `[0, pipeline_size - 1]` 之间共享参数。 - -对于第一阶段,它维护 embedding layer 和一些 transformer blocks。对于最后一个阶段,它维护一些 transformer blocks 和 output head layer。对于其他阶段,他们只维护一些 transformer blocks。 -`partition_uniform(num_layers, pipeline_size, num_chunks)` 返回所有 rank 的 parts, part 是一个 `(start, end)` (不包括end) 的 `tuple`。`start == 0` 表示这是第一阶段, 而 `end == num_layers` 表示这是最后一个阶段。 +from torch.optim import Optimizer +from torch.optim.lr_scheduler import _LRScheduler as LRScheduler +from tqdm import tqdm +from transformers import AutoConfig, GPT2ForSequenceClassification, get_linear_schedule_with_warmup +from transformers import AutoTokenizer +import colossalai +from colossalai.booster import Booster +from colossalai.booster.plugin import GeminiPlugin, HybridParallelPlugin, LowLevelZeroPlugin, TorchDDPPlugin +from colossalai.cluster import DistCoordinator +from colossalai.nn.optimizer import HybridAdam +from colossalai.utils import get_current_device +``` +### 定义plugin +定义一个[`HybridParallelPlugin`](../basics/booster_plugins.md)对象,指定所需要使用的并行策略,在该例子中,同时使用了流水线并行和zero1. ```python -class PipelineGPTHybrid(nn.Module): - def __init__(self, - num_layers: int = 12, - hidden_size: int = 768, - num_attention_heads: int = 12, - vocab_size: int = 50304, - embed_drop_rate: float = 0., - act_func: Callable = F.gelu, - mlp_ratio: int = 4, - attn_drop_rate: float = 0., - drop_rate: float = 0., - dtype: torch.dtype = torch.float, - checkpoint: bool = False, - max_position_embeddings: int = 1024, - layer_norm_epsilon: float = 1e-5, - first: bool = False, - last: bool = False): - super().__init__() - self.embedding = None - self.norm = None - self.head = None - if first: - self.embedding = col_gpt.GPTEmbedding( - hidden_size, vocab_size, max_position_embeddings, dropout=embed_drop_rate, dtype=dtype) - self.blocks = nn.ModuleList([ - col_gpt.GPTBlock(hidden_size, num_attention_heads, mlp_ratio=mlp_ratio, attention_dropout=attn_drop_rate, - dropout=drop_rate, dtype=dtype, checkpoint=checkpoint, activation=act_func) - for _ in range(num_layers) - ]) - if last: - self.norm = col_nn.LayerNorm(hidden_size, eps=layer_norm_epsilon) - self.head = col_gpt.GPTLMHead(vocab_size=vocab_size, - dim=hidden_size, - dtype=dtype, - bias=False) - - def forward(self, hidden_states=None, input_ids=None, attention_mask=None): - if self.embedding is not None: - hidden_states = self.embedding(input_ids=input_ids) - batch_size = hidden_states.shape[0] - attention_mask = attention_mask.view(batch_size, -1) - attention_mask = attention_mask[:, None, None, :] - attention_mask = attention_mask.to(dtype=hidden_states.dtype) # fp16 compatibility - attention_mask = (1.0 - attention_mask) * -10000.0 - for block in self.blocks: - hidden_states, attention_mask = block(hidden_states, attention_mask) - if self.norm is not None: - hidden_states = self.head(self.norm(hidden_states)) - return hidden_states - - -def build_gpt_pipeline(num_layers, num_chunks, device=torch.device('cuda'), **kwargs): - logger = get_dist_logger() - pipeline_size = gpc.get_world_size(ParallelMode.PIPELINE) - pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE) - rank = gpc.get_global_rank() - wrapper = PipelineSharedModuleWrapper([0, pipeline_size - 1]) - parts = partition_uniform(num_layers, pipeline_size, num_chunks)[pipeline_rank] - models = [] - for start, end in parts: - kwargs['num_layers'] = end - start - kwargs['first'] = start == 0 - kwargs['last'] = end == num_layers - logger.info(f'Rank{rank} build layer {start}-{end}, {end-start}/{num_layers} layers') - chunk = PipelineGPTHybrid(**kwargs).to(device) - if start == 0: - wrapper.register_module(chunk.embedding.word_embeddings) - elif end == num_layers: - wrapper.register_module(chunk.head) - models.append(chunk) - if len(models) == 1: - model = models[0] - else: - model = nn.ModuleList(models) - return model - - -def GPT2_exlarge_pipeline_hybrid(num_chunks=1, checkpoint=False, dtype=torch.float): - cfg = dict(hidden_size=1600, num_attention_heads=32, checkpoint=checkpoint, dtype=dtype) - return build_gpt_pipeline(48, num_chunks, **cfg) - - -def GPT3_pipeline_hybrid(num_chunks=1, checkpoint=False, dtype=torch.float): - cfg = dict(hidden_size=12288, num_attention_heads=96, - checkpoint=checkpoint, max_position_embeddings=2048, dtype=dtype) - return build_gpt_pipeline(96, num_chunks, **cfg) +plugin = HybridParallelPlugin( + tp_size=1, + pp_size=2, + num_microbatches=None, + microbatch_size=1, + enable_all_optimization=True, + zero_stage=1, + precision="fp16", + initial_scale=1, +) ``` -## 处理数据集 - -我们在这里提供了一个小型 GPT web-text 数据集。 原始格式是 loose JSON, 我们将保存处理后的数据集。 - +## 创建分布式环境. ```python -class WebtextDataset(Dataset): - def __init__(self, path, seq_len=1024) -> None: - super().__init__() - root = os.path.dirname(path) - encoded_data_cache_path = os.path.join(root, f'gpt_webtext_{seq_len}.pt') - if os.path.isfile(encoded_data_cache_path): - seq_len_, data, attention_mask = torch.load( - encoded_data_cache_path) - if seq_len_ == seq_len: - self.data = data - self.attention_mask = attention_mask - return - raw_data = [] - with open(path) as f: - for line in f.readlines(): - raw_data.append(json.loads(line)['text']) - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.unk_token - encoded_data = tokenizer( - raw_data, padding=True, truncation=True, max_length=seq_len, return_tensors='pt') - self.data = encoded_data['input_ids'] - self.attention_mask = encoded_data['attention_mask'] - torch.save((seq_len, self.data, self.attention_mask), - encoded_data_cache_path) - - def __len__(self): - return len(self.data) - - def __getitem__(self, index): - return { - 'input_ids': self.data[index], - 'attention_mask': self.attention_mask[index] - }, self.data[index] +# Launch ColossalAI +colossalai.launch_from_torch(config={}, seed=42) +coordinator = DistCoordinator() ``` - -## 使用混合并行训练 GPT - -在上一个教程中,我们解释了一些流水并行的参数含义。在本例中,我们可以确定在流水阶段之间交换的每个输出张量的形状。对于 GPT,该形状为 -`(MICRO BATCH SIZE, SEQUENCE LEN, HIDDEN SIZE)`。通过设置该参数,我们可以避免交换每个阶段的张量形状。当你不确定张量的形状时,你可以把它保留为 -`None`, 形状会被自动推测。请确保你的模型的 `dtype` 是正确的:当你使用 `fp16`,模型的 `dtype` 必须是 `torch.half`;否则,`dtype` 必须是 `torch.float`。对于流水并行,仅支持 `AMP_TYPE.NAIVE`。 - -你可以通过在 `CONFIG` 里使用 `parallel` 来轻松使用张量并行。数据并行的大小是根据 GPU 的数量自动设置的。 - +## 定义GPT-2模型的训练组件 +在使用混合并行之前,您需要定义训练所使用的组件。 +定义超参数。 ```python -NUM_EPOCHS = 60 -SEQ_LEN = 1024 -BATCH_SIZE = 192 -NUM_CHUNKS = None -TENSOR_SHAPE = (1, 1024, 1600) -# only pipeline parallel -# CONFIG = dict(NUM_MICRO_BATCHES = 192, parallel=dict(pipeline=2), fp16=dict(mode=AMP_TYPE.NAIVE)) -# pipeline + 1D model parallel -CONFIG = dict(NUM_MICRO_BATCHES = 192, parallel=dict(pipeline=2, tensor=dict(mode='1d', size=2)), fp16=dict(mode=AMP_TYPE.NAIVE)) - - -def train(): - disable_existing_loggers() - parser = colossalai.get_default_parser() - args = parser.parse_args() - colossalai.launch_from_torch(config=CONFIG, backend=args.backend) - logger = get_dist_logger() - - train_ds = WebtextDataset(os.environ['DATA'], seq_len=SEQ_LEN) - train_dataloader = utils.get_dataloader(train_ds, - seed=42, - batch_size=BATCH_SIZE, - pin_memory=True, - shuffle=True, - drop_last=True) - - use_interleaved = NUM_CHUNKS is not None - num_chunks = 1 if not use_interleaved else NUM_CHUNKS - model = GPT2_exlarge_pipeline_hybrid(num_chunks=num_chunks, checkpoint=True, dtype=torch.half) - # model = GPT3_pipeline_hybrid(num_chunks=num_chunks, checkpoint=True, dtype=torch.half) - if use_interleaved and not isinstance(model, nn.ModuleList): - model = nn.ModuleList([model]) - - criterion = GPTLMLoss() - - optimizer = torch.optim.Adam(model.parameters(), lr=0.00015, weight_decay=1e-2,) - - engine, train_dataloader, _, _ = colossalai.initialize(model, - optimizer, - criterion, - train_dataloader=train_dataloader) - global_batch_size = BATCH_SIZE * \ - gpc.get_world_size(ParallelMode.DATA) * getattr(gpc.config, "gradient_accumulation", 1) - logger.info(f'Init done, global batch size = {global_batch_size}', ranks=[0]) +NUM_EPOCHS = 3 +BATCH_SIZE = 32 +LEARNING_RATE = 2.4e-5 +WEIGHT_DECAY = 0.01 +WARMUP_FRACTION = 0.1 +``` +获取数据集。您可以使用`plugin.prepare_dataloader`生成dataloader,也可以自定义您的dataloader。 +```python +def tokenize_batch(batch, tokenizer: Optional[AutoTokenizer] = None, max_length: int = 2048): + texts = [sample["sentence1"] + sample["sentence2"] for sample in batch] + data = tokenizer(texts, return_tensors="pt", padding="max_length", truncation=True, max_length=max_length) + data = {k: v.cuda() for k, v in data.items()} + data["labels"] = data["input_ids"].clone() + return data + +tokenizer = AutoTokenizer.from_pretrained("gpt2") +dataset = datasets.load_dataset("glue", "mrpc") +train_dataloader = plugin.prepare_dataloader( + dataset["train"], + batch_size=BATCH_SIZE, + shuffle=True, + drop_last=True, + collate_fn=partial(tokenize_batch, tokenizer=tokenizer, max_length=512), +) +``` +定义GPT-2模型。 +```python +cfg = AutoConfig.from_pretrained("gpt2", num_labels=2) +model = GPT2ForSequenceClassification.from_pretrained("gpt2", config=cfg).cuda() +``` +准备优化器 +```python +lr = LEARNING_RATE * coordinator.world_size +no_decay = ["bias", "LayerNorm.weight"] +optimizer_grouped_parameters = [ + { + "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], + "weight_decay": WEIGHT_DECAY, + }, + { + "params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], + "weight_decay": 0.0, + }, +] + +optimizer = HybridAdam(optimizer_grouped_parameters, lr=lr, eps=1e-8) +``` +准备 `lr_scheduler` 和 `criterion`,需要注意的是,当混合并行使用了管道并行时,还需定义`criterion`函数。这个函数应该以模型前后向的输入和输出作为参数,并返回loss。 +```python +# lr scheduler +total_steps = len(train_dataloader) * NUM_EPOCHS +num_warmup_steps = int(WARMUP_FRACTION * total_steps) +lr_scheduler = get_linear_schedule_with_warmup( + optimizer, + num_warmup_steps=num_warmup_steps, + num_training_steps=total_steps, +) + +def _criterion(outputs, inputs): + return outputs.loss +``` +## 增强GPT-2模型 +使用 HybridParallelPlugin 定义一个 booster(增强器)。根据设置的插件参数,booster会将一种或者多种并行策略注入到模型中。该例子中使用了管道并行,zero1,及半精度训练等优化。 +```python +booster = Booster(plugin=plugin) +``` +使用定义的 booster 来增强这些组件。 +```python +model, optimizer, _criterion, _, lr_scheduler = booster.boost( + model, optimizer, criterion=_criterion, lr_scheduler=lr_scheduler +) +``` - timer = MultiTimer() - trainer = Trainer( - engine=engine, - logger=logger, - timer=timer - ) +## 使用混合并行训练 GPT-2 - hook_list = [ - hooks.LossHook(), - hooks.LogMetricByEpochHook(logger), - hooks.ThroughputHook(), - hooks.LogMetricByStepHook(), - ] +在前面的教程中,我们已经解释了如何使用 Booster 和 HybridParallelPlugin 将各种并行特性注入到模型及其训练组件中。现在我们可以开始模型训练。 +定义一个训练函数。当使用了管道并行时,需要调用`booster.execute_pipeline`进行模型训练的阶段调度。 +```python +def train_epoch( + epoch: int, + model: nn.Module, + optimizer: Optimizer, + _criterion: Callable, + lr_scheduler: LRScheduler, + train_dataloader: DataLoader, + booster: Booster, + coordinator: DistCoordinator, +): + use_pipeline = isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1 + is_pp_last_stage = use_pipeline and booster.plugin.stage_manager.is_last_stage() + print_flag = (not use_pipeline and coordinator.is_master()) or (use_pipeline and is_pp_last_stage) + total_step = len(train_dataloader) + + model.train() + optimizer.zero_grad() + train_dataloader_iter = iter(train_dataloader) + with tqdm( + range(total_step), + desc=f"Epoch [{epoch + 1}/{NUM_EPOCHS}]", + disable=not print_flag, + ) as pbar: + # Forward pass + for _ in pbar: + if use_pipeline: + outputs = booster.execute_pipeline( + train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + ) + # Backward and optimize + if is_pp_last_stage: + loss = outputs["loss"] + pbar.set_postfix({"loss": loss.item()}) + else: + data = next(train_dataloader_iter) + data = move_to_cuda(data) + outputs = model(**data) + loss = _criterion(outputs, None) + # Backward + booster.backward(loss, optimizer) + pbar.set_postfix({"loss": loss.item()}) + + optimizer.step() + optimizer.zero_grad() + lr_scheduler.step() - trainer.fit( - train_dataloader=train_dataloader, - epochs=NUM_EPOCHS, - test_interval=1, - hooks=hook_list, - display_progress=True, - return_output_label=False, - ) ``` - +训练 GPT-2 模型。 +```python +for epoch in range(NUM_EPOCHS): + train_epoch(epoch, model, optimizer, _criterion, lr_scheduler, train_dataloader, booster, coordinator) +``` + \ No newline at end of file diff --git a/docs/source/zh-Hans/advanced_tutorials/train_vit_using_pipeline_parallelism.md b/docs/source/zh-Hans/advanced_tutorials/train_vit_using_pipeline_parallelism.md deleted file mode 100644 index 5ef863dcd..000000000 --- a/docs/source/zh-Hans/advanced_tutorials/train_vit_using_pipeline_parallelism.md +++ /dev/null @@ -1,247 +0,0 @@ -# 使用流水并行训练 ViT - -作者: Hongxin Liu, Yongbin Li - -**示例代码** -- [ColossalAI-Examples Pipeline Parallel ViT](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/image/vision_transformer/pipeline_parallel) - -**相关论文** -- [Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM](https://arxiv.org/abs/2104.04473) - -## 引言 - -在本教程中,你将学习如何使用流水并行从头开始训练用于图像分类的 Vision Transformer (ViT)。流水并行是一种模型并行,主要针对 GPU 内存不能满足模型容量的情况。 -通过使用流水并行,我们将原始模型分割成多个阶段,每个阶段保留原始模型的一部分。我们假设你的 GPU 内存不能容纳 ViT/L-16,而你的内存可以容纳这个模型。 - -## 目录 - -在本教程中,我们将介绍: - -1. 基于 [TIMM](https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/vision_transformer.py) 定义 ViT 模型 -2. 处理数据集 -3. 使用流水并行训练 ViT - -## 导入依赖库 - -```python -import os -from collections import OrderedDict -from functools import partial - -import colossalai -import colossalai.nn as col_nn -import torch -import torch.nn as nn -from colossalai.legacy.builder import build_pipeline_model -from colossalai.legacy.engine.schedule import (InterleavedPipelineSchedule, - PipelineSchedule) -from colossalai.logging import disable_existing_loggers, get_dist_logger -from colossalai.legacy.trainer import Trainer, hooks -from colossalai.utils import MultiTimer, get_dataloader -from timm.models import vision_transformer as vit -from torchvision import transforms -from torchvision.datasets import CIFAR10 -``` - - -## 定义 Vision Transformer 模型 - -总的来说, 我们提供3种方法来建立一个流水并行的模型: - -1. `colossalai.legacy.builder.build_pipeline_model_from_cfg` -2. `colossalai.legacy.builder.build_pipeline_model` -3. 自己按阶段拆分模型 - -当你的内存能够容纳模型时,你可以使用前两种方法来建立你的模型,否则你必须自己分割模型。前两种方法首先在 CPU 上建立整个模型,然后分割模型,最后你可以直接把模型的相应部分移到 GPU 上。 - -`colossalai.legacy.builder.build_pipeline_model_from_cfg()` 接收一个模型的配置文件,它可以均匀地(按层)或平衡地(按参数大小)分割模型。 - -如果你熟悉 `PyTorch`, 你可以使用 `colossalai.legacy.builder.build_pipeline_model()` 它接收一个 `torch.nn.Sequential` 模型并按层均匀分割。 - -在本教程中,我们将修改 [TIMM/ViT](https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/vision_transformer.py) to `torch.nn.Sequential`,然后使用 `colossalai.legacy.builder.build_pipeline_model()` 来建立流水线模型。 - -当数据是 **一个** `Tensor`, 你可以使用你的模型 `forward()` 中的位置参数来获得数据张量。对于流水线的第一阶段,`forward()` 的第一个位置参数是从数据加载器加载的数据张量。对于其他阶段,`forward()` 的第一个位置参数是上一阶段的输出张量。注意,如果该阶段不是最后一个阶段,则 `forward()` 的返回必须是一个 `Tensor`。 - -当数据是一个 `Tensor` 的 `dict`, 你可以使用你模型 `forward()` 的命名关键字参数来获得数据的 `dict`。 - -```python -class ViTEmbedding(nn.Module): - def __init__(self, img_size=224, patch_size=16, in_chans=3, embed_dim=768, embed_layer=vit.PatchEmbed, drop_rate=0., distilled=False): - super().__init__() - self.embed_dim = embed_dim # num_features for consistency with other models - self.num_tokens = 2 if distilled else 1 - self.patch_embed = embed_layer( - img_size=img_size, patch_size=patch_size, in_chans=in_chans, embed_dim=embed_dim) - num_patches = self.patch_embed.num_patches - - self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) - self.dist_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) if distilled else None - self.pos_embed = nn.Parameter(torch.zeros(1, num_patches + self.num_tokens, embed_dim)) - self.pos_drop = nn.Dropout(p=drop_rate) - self.init_weights() - - def forward(self, x): - x = self.patch_embed(x) - cls_token = self.cls_token.expand(x.shape[0], -1, -1) # stole cls_tokens impl from Phil Wang, thanks - if self.dist_token is None: - x = torch.cat((cls_token, x), dim=1) - else: - x = torch.cat((cls_token, self.dist_token.expand(x.shape[0], -1, -1), x), dim=1) - x = self.pos_drop(x + self.pos_embed) - return x - - def init_weights(self): - vit.trunc_normal_(self.pos_embed, std=.02) - if self.dist_token is not None: - vit.trunc_normal_(self.dist_token, std=.02) - vit.trunc_normal_(self.cls_token, std=.02) - self.apply(vit._init_vit_weights) - - -class ViTHead(nn.Module): - def __init__(self, embed_dim=768, num_classes=1000, norm_layer=None, distilled=False, representation_size=None): - super().__init__() - norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6) - self.norm = norm_layer(embed_dim) - self.num_classes = num_classes - self.distilled = distilled - self.num_features = embed_dim - # Representation layer - if representation_size and not distilled: - self.num_features = representation_size - self.pre_logits = nn.Sequential(OrderedDict([ - ('fc', nn.Linear(embed_dim, representation_size)), - ('act', nn.Tanh()) - ])) - else: - self.pre_logits = nn.Identity() - # Classifier head(s) - self.head = nn.Linear(self.num_features, num_classes) if num_classes > 0 else nn.Identity() - self.head_dist = None - if distilled: - self.head_dist = nn.Linear(embed_dim, num_classes) if num_classes > 0 else nn.Identity() - self.init_weights() - - def forward(self, x): - x = self.norm(x) - if self.distilled: - x, x_dist = self.head(x[:, 0]), self.head_dist(x[:, 1]) - if self.training and not torch.jit.is_scripting(): - # during inference, return the average of both classifier predictions - return x, x_dist - else: - return (x + x_dist) / 2 - else: - x = self.pre_logits(x[:, 0]) - x = self.head(x) - return x - - def init_weights(self): - self.apply(vit._init_vit_weights) - - -def sequential_vit(img_size=224, patch_size=16, in_chans=3, num_classes=1000, embed_dim=768, depth=12, - num_heads=12, mlp_ratio=4., qkv_bias=True, representation_size=None, distilled=False, - drop_rate=0., attn_drop_rate=0., drop_path_rate=0., embed_layer=vit.PatchEmbed, norm_layer=None, - act_layer=None): - norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6) - act_layer = act_layer or nn.GELU - embedding = ViTEmbedding(img_size=img_size, patch_size=patch_size, in_chans=in_chans, - embed_dim=embed_dim, embed_layer=embed_layer, drop_rate=drop_rate, distilled=distilled) - dpr = [x.item() for x in torch.linspace(0, drop_path_rate, depth)] # stochastic depth decay rule - blocks = [vit.Block( - dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, qkv_bias=qkv_bias, drop=drop_rate, - attn_drop=attn_drop_rate, drop_path=dpr[i], norm_layer=norm_layer, act_layer=act_layer) - for i in range(depth)] - for block in blocks: - block.apply(vit._init_vit_weights) - head = ViTHead(embed_dim=embed_dim, num_classes=num_classes, norm_layer=norm_layer, - distilled=distilled, representation_size=representation_size) - return nn.Sequential(embedding, *blocks, head) - - -def vit_large_patch16_224(**kwargs): - model_kwargs = dict(embed_dim=1024, depth=24, num_heads=16, **kwargs) - return sequential_vit(**model_kwargs) -``` - -## 处理数据集 - -一般来说, 我们在大型数据集如 ImageNet 上训练 ViT。为了简单期间,我们在这里只使用 CIFAR-10, 因为本教程只是用于流水并行训练。 - -```python -def build_cifar(batch_size): - transform_train = transforms.Compose([ - transforms.RandomCrop(224, pad_if_needed=True), - transforms.AutoAugment(policy=transforms.AutoAugmentPolicy.CIFAR10), - transforms.ToTensor(), - transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), - ]) - transform_test = transforms.Compose([ - transforms.Resize(224), - transforms.ToTensor(), - transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), - ]) - - train_dataset = CIFAR10(root=os.environ['DATA'], train=True, download=True, transform=transform_train) - test_dataset = CIFAR10(root=os.environ['DATA'], train=False, transform=transform_test) - train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=batch_size, pin_memory=True) - test_dataloader = get_dataloader(dataset=test_dataset, batch_size=batch_size, pin_memory=True) - return train_dataloader, test_dataloader -``` - -## 使用流水并行训练 ViT - -你可以在配置文件中设置流水并行的大小。`NUM_CHUNKS` 在使用交错流水线时很有用 (更多细节见 [Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM](https://arxiv.org/abs/2104.04473) )。 -原始 batch 将会被分割为 `num_microbatches`, 每个阶段每次将加载一个 micro batch。如果你确定性地知道每个阶段输出张量的形状,你可以在配置文件中设置 `tensor_shape` 来减少通信。 -我们的仓库会自动为用户生成合适的schedule来支持流水并行训练。如果你不需要模型的输出和标签,你可以在调用 `trainer.fit()` 时,将 `return_output_label` 设置为 `False`,这样能进一步减少 GPU 显存使用。 - -你应当使用 `export DATA=/path/to/cifar`。 - -```python -BATCH_SIZE = 16 -NUM_EPOCHS = 60 -NUM_CHUNKS = 1 -CONFIG = dict(NUM_MICRO_BATCHES=4, parallel=dict(pipeline=2)) - - -def train(): - disable_existing_loggers() - parser = colossalai.get_default_parser() - args = parser.parse_args() - colossalai.launch_from_torch(backend=args.backend, config=CONFIG) - logger = get_dist_logger() - - # build model - model = vit_large_patch16_224() - model = build_pipeline_model(model, num_chunks=NUM_CHUNKS, verbose=True) - - # build criterion - criterion = nn.CrossEntropyLoss() - - # optimizer - optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=0) - - # build dataloader - train_dataloader, test_dataloader = build_cifar(BATCH_SIZE) - - engine, train_dataloader, test_dataloader, _ = colossalai.initialize(model, optimizer, criterion, - train_dataloader, test_dataloader) - timer = MultiTimer() - - trainer = Trainer(engine=engine, timer=timer, logger=logger) - - hook_list = [ - hooks.LossHook(), - hooks.AccuracyHook(col_nn.metric.Accuracy()), - hooks.LogMetricByEpochHook(logger), - ] - - trainer.fit(train_dataloader=train_dataloader, - epochs=NUM_EPOCHS, - test_dataloader=test_dataloader, - test_interval=1, - hooks=hook_list, - display_progress=True) -``` - diff --git a/docs/source/zh-Hans/advanced_tutorials/train_vit_with_hybrid_parallelism.md b/docs/source/zh-Hans/advanced_tutorials/train_vit_with_hybrid_parallelism.md index f7dd8d477..3de41601a 100644 --- a/docs/source/zh-Hans/advanced_tutorials/train_vit_with_hybrid_parallelism.md +++ b/docs/source/zh-Hans/advanced_tutorials/train_vit_with_hybrid_parallelism.md @@ -1,10 +1,14 @@ # 使用 Colossal-AI (从数据并行到异构并行)加速 ViT 训练详解 -作者:Yuxuan Lou +作者:Yuxuan Lou, Mingyan Jiang + +**前置教程** +- [并行插件](../basics/booster_plugins.md) +- [booster API](../basics/booster_api.md) **示例代码** -- [Colossal-AI Examples ViT on Cifar10](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/image/vision_transformer) +- [Colossal-AI Examples ViT on `beans`](https://github.com/hpcaitech/ColossalAI/blob/main/examples/images/vit/vit_train_demo.py) **相关文献** - [An Image is Worth 16x16 Words: Transformers for Image Recognition at Scale](https://arxiv.org/pdf/2010.11929.pdf) @@ -12,14 +16,14 @@ ## 引言 -在这个ViT模型的样例中,Colossal-AI 提供了三种不同的并行技术来加速模型训练:数据并行,流水线并行和张量并行。我们将展示如何使用这三种并行技术在 CIFAR-10 数据集上训练 ViT。为了运行项目,需要2-4个 GPU。 +在这个ViT模型的样例中,Colossal-AI 提供了三种不同的并行技术来加速模型训练:数据并行,流水线并行和张量并行。我们将展示如何使用这三种并行技术在 `beans` 数据集上训练 ViT。为了运行项目,需要2-4个 GPU。 ## 目录 1. Colossal-AI 安装方法 -2. 使用数据并行训练 ViT 步骤 -3. 使用数据流水线并行训练 ViT 步骤 -4. 使用张量并行或异构并行训练 ViT 步骤 +2. 定义VIT模型及相关训练组件 +3. 使用使用 [HybridParallelPlugin](../basics/booster_plugins.md) 增强VIT模型 +4. 使用数据并行、流水线并行及张量并行训练VIT模型 ## Colossal-AI 安装 可以通过 Python 的官方索引来安装 Colossal-AI 软件包。 @@ -27,566 +31,255 @@ pip install colossalai ``` - - -## 数据并行 -数据并行是实现加速模型训练的基本方法。通过两步可以实现训练的数据并行: -1. 构建一个配置文件 -2. 在训练脚本中修改很少的几行代码 - -### 构建配置文件 (`data_parallel/config.py`) -为了使用 Colossal-AI,第一步是构建配置文件。并且,在这里有两种变量: - -1. **Colossal-AI 功能配置** - -Colossal-AI 提供了一系列的功能来加快训练速度(包括模型并行,混合精度,零冗余优化器等)。每个功能都是由配置文件中的相应字段定义的。如果我们只用到数据并行,那么我们只需要具体说明并行模式。在本例中,我们使用 PyTorch 最初提出的混合精度训练,只需要定义混合精度配置 `fp16 = dict(mode=AMP_TYPE.TORCH)` 。 - -2. **全局超参数** - -全局超参数包括特定于模型的超参数、训练设置、数据集信息等。 +## 导入依赖库 ```python -from colossalai.amp import AMP_TYPE -# ViT Base -BATCH_SIZE = 256 -DROP_RATE = 0.1 -NUM_EPOCHS = 300 -# mix precision -fp16 = dict( - mode=AMP_TYPE.TORCH, -) -gradient_accumulation = 16 -clip_grad_norm = 1.0 -dali = dict( - gpu_aug=True, - mixup_alpha=0.2 -) -``` +from typing import Any, Callable, Iterator -### 修改训练脚本 (`/data_parallel/train_with_cifar10.py`) +import torch +import torch.distributed as dist +import torch.nn as nn +import transformers +from data import BeansDataset, beans_collator +from torch.optim import Optimizer +from torch.optim.lr_scheduler import _LRScheduler as LRScheduler +from torch.utils.data import DataLoader +from tqdm import tqdm +from transformers import ViTConfig, ViTForImageClassification, ViTImageProcessor -#### 导入模块 -- Colossal-AI 相关模块 -```python import colossalai -from colossalai.context import ParallelMode -from colossalai.core import global_context as gpc +from colossalai.booster import Booster +from colossalai.booster.plugin import GeminiPlugin, HybridParallelPlugin, LowLevelZeroPlugin, TorchDDPPlugin +from colossalai.cluster import DistCoordinator from colossalai.logging import disable_existing_loggers, get_dist_logger -from colossalai.nn.lr_scheduler import LinearWarmupLR -from colossalai.legacy.nn.metric import Accuracy -from colossalai.legacy.trainer import Trainer, hooks +from colossalai.nn.lr_scheduler import CosineAnnealingWarmupLR +from colossalai.nn.optimizer import HybridAdam ``` - -- 其他模块 +## 定义 Vision Transformer 模型 +定义超参数 ```python -import os -import torch -from timm.models import vit_base_patch16_224 -from torchvision import transforms -from torchvision.datasets import CIFAR10 +SEED = 42 +MODEL_PATH = "google/vit-base-patch16-224" +LEARNING_RATE = 5e-5 +WEIGHT_DECAY = 0.0 +NUM_EPOCH = 3 +WARMUP_RATIO = 0.3 +TP_SIZE = 2 +PP_SIZE = 2 ``` - -#### 启动 Colossal-AI - -在训练脚本中,在构建好配置文件后,需要为 Colossal-AI 初始化分布式环境。我们将此过程称为 `launch` 。在 Colossal-AI 中,我们提供了几种启动方法来初始化分布式后端。在大多数情况下,您可以使用 `colossalai.launch` 和 `colossalai.get_default_parser ` 来实现使用命令行传递参数。此外,Colossal-AI 可以利用 PyTorch 提供的现有启动工具,正如许多用户通过使用熟知的 `colossalai.launch_from_torch` 那样。更多详细信息,您可以查看相关[文档](https://www.colossalai.org/docs/basics/launch_colossalai)。 - - +首先我们创建一个分布式环境 ```python -# initialize distributed setting -parser = colossalai.get_default_parser() -args = parser.parse_args() -colossalai.launch_from_torch(config=args.config) -disable_existing_loggers() -logger = get_dist_logger() +# Launch ColossalAI +colossalai.launch_from_torch(config={}, seed=SEEDå) +coordinator = DistCoordinator() +world_size = coordinator.world_size ``` - -初始化后,您可以使用 `colossalai.core.global_context` 访问配置文件中的变量。 - +在训练之前您可以按照正常流程定义模型训练的相关组,如定义模型,数据加载器,优化器等。需要注意的是,当使用管道并行时,还需定义一个criterion函数,该函数的输入是模型前向的输入和输出,返回的是loss。 +获取数据集, `BeansDataset`定义在[data.py](https://github.com/hpcaitech/ColossalAI/blob/main/examples/images/vit/data.py) ```python -#access parameters -print(gpc.config.BATCH_SIZE) +image_processor = ViTImageProcessor.from_pretrained(MODEL_PATH) +train_dataset = BeansDataset(image_processor, TP_SIZE, split="train") +eval_dataset = BeansDataset(image_processor, RP_SIZE, split="validation") +num_labels = train_dataset.num_labels ``` - -#### 构建模型 - -如果只需要数据并行性,则无需对模型代码进行任何更改。这里,我们使用 `timm` 中的 `vit_base_patch16_224`。 - +定义VIT模型: ```python -# build model -model = vit_base_patch16_224(drop_rate=0.1, num_classes=gpc.config.NUM_CLASSES) +config = ViTConfig.from_pretrained(MODEL_PATH) +config.num_labels = num_labels +config.id2label = {str(i): c for i, c in enumerate(train_dataset.label_names)} +config.label2id = {c: str(i) for i, c in enumerate(train_dataset.label_names)} +model = ViTForImageClassification.from_pretrained( + MODEL_PATH, config=config, ignore_mismatched_sizes=True +) ``` - -#### 构建 CIFAR-10 数据加载器 -`colossalai.utils.get_dataloader` 可以帮助您轻松构建数据加载器。 - +定义optimizer: ```python -def build_cifar(batch_size): - transform_train = transforms.Compose([ - transforms.RandomCrop(224, pad_if_needed=True), - transforms.AutoAugment(policy=transforms.AutoAugmentPolicy.CIFAR10), - transforms.ToTensor(), - transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), - ]) - transform_test = transforms.Compose([ - transforms.Resize(224), - transforms.ToTensor(), - transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), - ]) - train_dataset = CIFAR10(root=os.environ['DATA'], train=True, download=True, transform=transform_train) - test_dataset = CIFAR10(root=os.environ['DATA'], train=False, transform=transform_test) - train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=batch_size, pin_memory=True) - test_dataloader = get_dataloader(dataset=test_dataset, batch_size=batch_size, pin_memory=True) - return train_dataloader, test_dataloader -# build dataloader -train_dataloader, test_dataloader = build_cifar(gpc.config.BATCH_SIZE) +optimizer = HybridAdam(model.parameters(), lr=(LEARNING_RATE * world_size), weight_decay=WEIGHT_DECAY) ``` - -#### 定义优化器,损失函数和学习率调度器 - -Colossal-AI 提供了自己的优化器、损失函数和学习率调度器。PyTorch 的这些组件与Colossal-AI也兼容。 - +定义lr scheduler: ```python -# build optimizer -optimizer = colossalai.nn.Lamb(model.parameters(), lr=1.8e-2, weight_decay=0.1) -# build loss -criterion = torch.nn.CrossEntropyLoss() -# lr_scheduler -lr_scheduler = LinearWarmupLR(optimizer, warmup_steps=50, total_steps=gpc.config.NUM_EPOCHS) -``` - -#### 启动用于训练的 Colossal-AI 引擎 - -Engine 本质上是对模型、优化器和损失函数的封装类。当我们使用 `colossalai.initialize` ,将返回一个 engine 对象,并且它已经按照配置文件中的指定内容,配置了梯度剪裁、梯度累积和零冗余优化器等功能。之后,基于 Colossal-AI 的 engine 我们可以进行模型训练。 - -```python -engine, train_dataloader, test_dataloader, _ = colossalai.initialize( - model, optimizer, criterion, train_dataloader, test_dataloader +total_steps = len(train_dataloader) * NUM_EPOCH +num_warmup_steps = int(WARMUP_RATIO * total_steps) +lr_scheduler = CosineAnnealingWarmupLR( + optimizer=optimizer, total_steps=(len(train_dataloader) * NUM_EPOCH), warmup_steps=num_warmup_steps ) ``` - -#### 训练:Trainer 应用程序编程接口 -Trainer 是一个更高级的封装类,用户可以用更少的代码就可以实现训练。通过传递 engine 对象很容易创建 trainer 对象。 - -此外,在 trainer 中,用户可以自定义一些挂钩,并将这些挂钩连接到 trainer 对象。钩子对象将根据训练方案定期执行生命周期方法。例如,`LRSchedulerHook` 将执行`lr_scheduler.step()` 在 `after_train_iter` 或 `after_train_epoch` 阶段更新模型的学习速率。 - -```python -# build trainer -trainer = Trainer(engine=engine, logger=logger) -# build hooks -hook_list = [ - hooks.LossHook(), - hooks.AccuracyHook(accuracy_func=MixupAccuracy()), - hooks.LogMetricByEpochHook(logger), - hooks.LRSchedulerHook(lr_scheduler, by_epoch=True), - # comment if you do not need to use the hooks below - hooks.SaveCheckpointHook(interval=1, checkpoint_dir='./ckpt'), - hooks.TensorboardHook(log_dir='./tb_logs', ranks=[0]), -] -``` - -使用 `trainer.fit` 进行训练: - +定义criterion函数: ```python -# start training -trainer.fit( - train_dataloader=train_dataloader, - test_dataloader=test_dataloader, - epochs=gpc.config.NUM_EPOCHS, - hooks=hook_list, - display_progress=True, - test_interval=1 -) +def _criterion(outputs, inputs): + return outputs.loss ``` - -### 开始训练 -`DATA` 是自动下载和存储 CIFAR-10 数据集的文件路径。 - -`` 是要用于使用 CIFAR-10 数据集,以数据并行方式训练 ViT 的 GPU 数。 - -```bash -export DATA= -# If your torch >= 1.10.0 -torchrun --standalone --nproc_per_node train_dp.py --config ./configs/config_data_parallel.py -# If your torch >= 1.9.0 -# python -m torch.distributed.run --standalone --nproc_per_node= train_dp.py --config ./configs/config_data_parallel.py -# Otherwise -# python -m torch.distributed.launch --nproc_per_node --master_addr --master_port 29500 train_dp.py --config ./configs/config.py -``` - - - -## 流水线并行 -除了数据并行性,Colossal-AI 还支持流水线并行。具体而言,Colossal-AI 使用 NVIDIA 引入的 1F1B 流水线。更多详细信息,您可以查看相关[文档](https://www.colossalai.org/tutorials/features/pipeline_parallel)。 - -### 构建配置文件(`hybrid_parallel/configs/vit_pipeline.py`) -要在数据并行的基础上应用流水线并行,只需添加一个 **parallel dict** +## 增强VIT模型 +我们开始使用colossalai的混合并行策略来增强模型,首先我们先定义一个`HybridParallelPlugin`的对象,[`HybridParallelPlugin`](../basics/booster_plugins.md)封装了colossalai的多种并行策略,之后我们使用`HybridParallelPlugin`对象来初始化booster并调用`booster.boost`来增强模型。 +### 半精度训练 +在`HybridParallelPlugin`插件中,通过设置`precision`确定训练精度,可支持'fp16','bf16','fp32'三种类型。'fp16','bf16'为半精度类型,半精度在`HybridParallelPlugin`中有两种应用场景,一是使用zero数据并行时,需设置为半精度;二是指定使用amp半精度进行训练。 + +使用amp半精度时,可设置相关参数。 +`initial_scale`(浮点数,可选项):AMP的初始损失缩放比例。默认值为2**16。 +`min_scale`(浮点数,可选项):AMP的最小损失缩放比例。默认值为1。 +`growth_factor`(浮点数,可选项):在使用AMP时,用于增加损失缩放比例的乘法因子。默认值为2。 +`backoff_factor`(浮点数,可选项):在使用AMP时,用于减少损失缩放比例的乘法因子。默认值为0.5。 +`growth_interval`(整数,可选项):在使用AMP时,当没有溢出时增加损失缩放比例的步数。默认值为1000。 +`hysteresis`(整数,可选项):在使用AMP时,减少损失缩放比例之前的溢出次数。默认值为2。 +`max_scale`(浮点数,可选项):AMP的最大损失缩放比例。默认值为2**32。 + +使用AMP的plugin示例: ```python -from colossalai.amp import AMP_TYPE -parallel = dict( - pipeline=2 -) -# pipeline config -NUM_MICRO_BATCHES = parallel['pipeline'] -TENSOR_SHAPE = (BATCH_SIZE // NUM_MICRO_BATCHES, SEQ_LENGTH, HIDDEN_SIZE) -fp16 = dict(mode=AMP_TYPE.NAIVE) -clip_grad_norm = 1.0 +plugin = HybridParallelPlugin( + precision="fp16", + initial_scale=1, + ) ``` -其他配置: -```python -# hyperparameters -# BATCH_SIZE is as per GPU -# global batch size = BATCH_SIZE x data parallel size -BATCH_SIZE = 256 -LEARNING_RATE = 3e-3 -WEIGHT_DECAY = 0.3 -NUM_EPOCHS = 300 -WARMUP_EPOCHS = 32 -# model config -IMG_SIZE = 224 -PATCH_SIZE = 16 -HIDDEN_SIZE = 768 -DEPTH = 12 -NUM_HEADS = 12 -MLP_RATIO = 4 -NUM_CLASSES = 10 -CHECKPOINT = True -SEQ_LENGTH = (IMG_SIZE // PATCH_SIZE) ** 2 + 1 # add 1 for cls token -``` +### 张量并行 +`HybridParallelPlugin`是通过shardformer实现张量并行,在该插件中,可设置`tp_size`确定张量并行组的大小,此外,还有多个参数可设置张量并行时的优化特性: -### 构建流水线模型 (`/hybrid_parallel/model/vit.py`) -Colossal-AI 提供了两种从现有模型构建流水线模型的方法。 -- `colossalai.legacy.builder.build_pipeline_model_from_cfg` -- `colossalai.legacy.builder.build_pipeline_model` +`enable_all_optimization`(布尔类型,可选项):是否启用Shardformer支持的所有优化方法,目前所有优化方法包括融合归一化、flash attention和JIT。默认为False。 +`enable_fused_normalization`(布尔类型,可选项):是否在Shardformer中启用融合归一化。默认为False。 +`enable_flash_attention`(布尔类型,可选项):是否在Shardformer中启用flash attention。默认为False。 +`enable_jit_fused`(布尔类型,可选项):是否在Shardformer中启用JIT。默认为False。 +`enable_sequence_parallelism`(布尔类型):是否在Shardformer中启用序列并行性。默认为False。 +`enable_sequence_overlap`(布尔类型):是否在Shardformer中启用序列重叠性。默认为False。 -此外,您还可以使用 Colossal-AI 从头开始构建流水线模型。 +张量并行的plugin示例 ```python -import math -from typing import Callable -import inspect -import torch -from colossalai import nn as col_nn -from colossalai.legacy.registry import LAYERS, MODELS -from colossalai.logging import get_dist_logger -from colossalai.core import global_context as gpc -from colossalai.context import ParallelMode -from colossalai.legacy.builder.pipeline import partition_uniform -from torch import dtype, nn -from model_zoo.vit.vit import ViTBlock, ViTEmbedding, ViTHead -@MODELS.register_module -class PipelineVisionTransformer(nn.Module): - def __init__(self, - img_size: int = 224, - patch_size: int = 16, - in_chans: int = 3, - num_classes: int = 1000, - depth: int = 12, - num_heads: int = 12, - dim: int = 768, - mlp_ratio: int = 4, - attention_dropout: float = 0., - dropout: float = 0.1, - drop_path: float = 0., - layernorm_epsilon: float = 1e-6, - activation: Callable = nn.functional.gelu, - representation_size: int = None, - dtype: dtype = None, - bias: bool = True, - checkpoint: bool = False, - init_method: str = 'torch', - first_stage=True, - last_stage=True, - start_idx=None, - end_idx=None,): - super().__init__() - layers = [] - if first_stage: - embed = ViTEmbedding(img_size=img_size, - patch_size=patch_size, - in_chans=in_chans, - embedding_dim=dim, - dropout=dropout, - dtype=dtype, - init_method=init_method) - layers.append(embed) - # stochastic depth decay rule - dpr = [x.item() for x in torch.linspace(0, drop_path, depth)] - if start_idx is None and end_idx is None: - start_idx = 0 - end_idx = depth - blocks = [ - ViTBlock( - dim=dim, - num_heads=num_heads, - mlp_ratio=mlp_ratio, - attention_dropout=attention_dropout, - dropout=dropout, - drop_path=dpr[i], - activation=activation, - dtype=dtype, - bias=bias, - checkpoint=checkpoint, - init_method=init_method, - ) for i in range(start_idx, end_idx) - ] - layers.extend(blocks) - if last_stage: - norm = col_nn.LayerNorm(normalized_shape=dim, eps=layernorm_epsilon, dtype=dtype) - head = ViTHead(dim=dim, - num_classes=num_classes, - representation_size=representation_size, - dtype=dtype, - bias=bias, - init_method=init_method) - layers.extend([norm, head]) - self.layers = nn.Sequential( - *layers +plugin = HybridParallelPlugin( + tp_size=4, + enable_all_optimization=True ) - def forward(self, x): - x = self.layers(x) - return x -def _filter_kwargs(func, kwargs): - sig = inspect.signature(func) - return {k: v for k, v in kwargs.items() if k in sig.parameters} -def _build_pipeline_vit(module_cls, num_layers, num_chunks, device=torch.device('cuda'), **kwargs): - logger = get_dist_logger() - if gpc.is_initialized(ParallelMode.PIPELINE): - pipeline_size = gpc.get_world_size(ParallelMode.PIPELINE) - pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE) - else: - pipeline_size = 1 - pipeline_rank = 0 - rank = gpc.get_global_rank() - parts = partition_uniform(num_layers, pipeline_size, num_chunks)[pipeline_rank] - models = [] - for start, end in parts: - kwargs['first_stage'] = start == 0 - kwargs['last_stage'] = end == num_layers - kwargs['start_idx'] = start - kwargs['end_idx'] = end - logger.info(f'Rank{rank} build layer {start}-{end}, {end-start}/{num_layers} layers') - chunk = module_cls(**_filter_kwargs(module_cls.__init__, kwargs)).to(device) - models.append(chunk) - if len(models) == 1: - model = models[0] - else: - model = nn.ModuleList(models) - return model -def build_pipeline_vit(num_layers, num_chunks, device=torch.device('cuda'), **kwargs): - return _build_pipeline_vit(PipelineVisionTransformer, num_layers, num_chunks, device, **kwargs) ``` - -### 修改训练脚本 (`/hybrid_parallel/train_with_cifar10.py`) - -#### 导入模块 +### 流水线并行 +`HybridParallelPlugin`通过设置`pp_size`确定流水线并行组的大小,`num_microbatches`设置流水线并行时将整个batch划分为小batch的数量,`microbatch_size`可设置小batch的大小,插件会优先使用`num_microbatches`来确定micro batch的配置。 +流水线并行的plugin示例 ```python -from colossalai.legacy.engine.schedule import (InterleavedPipelineSchedule, - PipelineSchedule) -from colossalai.utils import MultiTimer -import os -import colossalai -import torch -from colossalai.context import ParallelMode -from colossalai.core import global_context as gpc -from colossalai.logging import get_dist_logger -from colossalai.nn import CrossEntropyLoss -from colossalai.nn.lr_scheduler import CosineAnnealingWarmupLR -from colossalai.utils import is_using_pp, get_dataloader -from model.vit import build_pipeline_vit -from model_zoo.vit.vit import _create_vit_model -from tqdm import tqdm -from torchvision import transforms -from torchvision.datasets import CIFAR10 +plugin = HybridParallelPlugin( + pp_size=4, + num_microbatches=None, + microbatch_size=1 + ) ``` - -#### 启动 Colossal-AI -`colossalai.utils.is_using_pp` 可以帮您检查配置文件是否满足流水线并行的要求。 - +### 数据并行 +`HybridParallelPlugin`插件的数据并行包括zero-dp系列及torch DDP。当`zero_stage`为0(默认值)时表示使用torch DDP,注意torch DDP与流水线并行有冲突,不能一起使用。`zero_stage`为1时表示使用zero1策略。`zero_stage`为2使用zero2,zero2策略也无法与流水线并行一起使用。如果想使用zero3,请使用[`GeminiPlugin`](../basics/booster_plugins.md)。使用zero系列的数据并行,请设置训练精度为半精度。当未指定使用zero及流水线并行,且world_size//(tp_size*pp_size)大于1时,`HybridParallelPlugin`会为您打开torch DDP并行策略。 +torch DDP相关参数设置: +`broadcast_buffers`(布尔值,可选项):在使用DDP时,在训练开始时是否广播缓冲区。默认为True。 +`ddp_bucket_cap_mb`(整数,可选项):在使用DDP时的桶大小(以MB为单位)。默认为25。 +`find_unused_parameters`(布尔值,可选项):在使用DDP时是否查找未使用的参数。默认为False。 +`check_reduction(布尔值,可选项):在使用DDP时是否检查减少。默认为False。 +`gradient_as_bucket_view`(布尔值,可选项):在使用DDP时是否将梯度作为桶视图使用。默认为False。 +`static_graph`(布尔值,可选项):在使用DDP时是否使用静态图。默认为False。 + +Torch DDP的plugin示例 ```python -# initialize distributed setting -parser = colossalai.get_default_parser() -args = parser.parse_args() -# launch from torch -colossalai.launch_from_torch(config=args.config) -# get logger -logger = get_dist_logger() -logger.info("initialized distributed environment", ranks=[0]) -if hasattr(gpc.config, 'LOG_PATH'): - if gpc.get_global_rank() == 0: - log_path = gpc.config.LOG_PATH - if not os.path.exists(log_path): - os.mkdir(log_path) - logger.log_to_file(log_path) -use_pipeline = is_using_pp() +plugin = HybridParallelPlugin( + tp_size=2, + pp_size=1, + zero_stage=0, + precision="fp16", + initial_scale=1, + ) ``` +若并行进程为4,则torch DDP的并行组大小为2. +zero相关参数设置: +`zero_bucket_size_in_m`(整数,可选项):在使用ZeRO时,以百万元素为单位的梯度减小桶大小。默认为12。 +`cpu_offload`(布尔值,可选项):在使用ZeRO时是否打开`cpu_offload`。默认为False。 +`communication_dtype`(torch数据类型,可选项):在使用ZeRO时的通信数据类型。如果未指定,则将使用参数的数据类型。默认为None。 +`overlap_communication`(布尔值,可选项):在使用ZeRO时是否重叠通信和计算。默认为True。 -#### 定义模型 +zero1的plugin示例 ```python -# create model -model_kwargs = dict(img_size=gpc.config.IMG_SIZE, - patch_size=gpc.config.PATCH_SIZE, - dim=gpc.config.HIDDEN_SIZE, - depth=gpc.config.DEPTH, - num_heads=gpc.config.NUM_HEADS, - mlp_ratio=gpc.config.MLP_RATIO, - num_classes=gpc.config.NUM_CLASSES, - init_method='jax', - checkpoint=gpc.config.CHECKPOINT) -if use_pipeline: - model = build_pipeline_vit(num_layers=model_kwargs['depth'], num_chunks=1, **model_kwargs) -else: - model = _create_vit_model(**model_kwargs) -``` - -#### 计算参数个数 - -您可以轻松计算不同流水线阶段上的模型参数个数。 - -``` -# count number of parameters -total_numel = 0 -for p in model.parameters(): - total_numel += p.numel() -if not gpc.is_initialized(ParallelMode.PIPELINE): - pipeline_stage = 0 -else: - pipeline_stage = gpc.get_local_rank(ParallelMode.PIPELINE) -logger.info(f"number of parameters: {total_numel} on pipeline stage {pipeline_stage}") +plugin = HybridParallelPlugin( + tp_size=1, + pp_size=1, + zero_stage=1, + cpu_offload=True, + precision="fp16", + initial_scale=1, + ) ``` -#### 构建数据加载器,优化器等组件 +### 混合并行 +可参考上述的策略自定义合适的混合并行策略。定义混合并行的插件,并使用该插件定义一个booster: ```python -def build_cifar(batch_size): - transform_train = transforms.Compose([ - transforms.RandomCrop(224, pad_if_needed=True), - transforms.AutoAugment(policy=transforms.AutoAugmentPolicy.CIFAR10), - transforms.ToTensor(), - transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), - ]) - transform_test = transforms.Compose([ - transforms.Resize(224), - transforms.ToTensor(), - transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), - ]) - train_dataset = CIFAR10(root=os.environ['DATA'], train=True, download=True, transform=transform_train) - test_dataset = CIFAR10(root=os.environ['DATA'], train=False, transform=transform_test) - train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=batch_size, pin_memory=True) - test_dataloader = get_dataloader(dataset=test_dataset, batch_size=batch_size, pin_memory=True) - return train_dataloader, test_dataloader - - -# create dataloaders -train_dataloader , test_dataloader = build_cifar() -# create loss function -criterion = CrossEntropyLoss(label_smoothing=0.1) -# create optimizer -optimizer = torch.optim.AdamW(model.parameters(), lr=gpc.config.LEARNING_RATE, weight_decay=gpc.config.WEIGHT_DECAY) -# create lr scheduler -lr_scheduler = CosineAnnealingWarmupLR(optimizer=optimizer, - total_steps=gpc.config.NUM_EPOCHS, - warmup_steps=gpc.config.WARMUP_EPOCHS) +plugin = HybridParallelPlugin( + tp_size=TP_SIZE, + pp_size=PP_SIZE, + num_microbatches=None, + microbatch_size=1, + enable_all_optimization=True, + precision="fp16", + initial_scale=1, + ) +booster = Booster(plugin=plugin) ``` - -#### 启动 Colossal-AI 引擎 - +接着我们使用`booster.boost`来将plugin所封装的特性注入到模型训练组件中。 ```python -# initialize -engine, train_dataloader, test_dataloader, _ = colossalai.initialize(model=model, - optimizer=optimizer, - criterion=criterion, - train_dataloader=train_dataloader, - test_dataloader=test_dataloader) -logger.info("Engine is built", ranks=[0]) +model, optimizer, _criterion, train_dataloader, lr_scheduler = booster.boost( + model=model, optimizer=optimizer, criterion=criterion, dataloader=train_dataloader, lr_scheduler=lr_scheduler + ) ``` - -#### 训练:基于engine - -在数据并行示例中,我们展示了如何使用 Trainer API 训练模型。我们还可以直接训练基于 engine 的模型。通过这种方式,您可以使用更多功能自定义训练方法。 - +## 使用混合并行训练 ViT +最后就可以使用混合并行策略来训练模型了。我们先定义一个训练函数,描述训练过程。需要注意的是,如果使用了管道并行策略,需要调用`booster.execute_pipeline`来执行模型的训练,它会调用`scheduler`管理模型的前后向操作。 ```python -data_iter = iter(train_dataloader) -for epoch in range(gpc.config.NUM_EPOCHS): - # training - engine.train() - if gpc.get_global_rank() == 0: - description = 'Epoch {} / {}'.format( - epoch, - gpc.config.NUM_EPOCHS +def run_forward_backward( + model: nn.Module, + optimizer: Optimizer, + criterion: Callable[[Any, Any], torch.Tensor], + data_iter: Iterator, + booster: Booster, +): + if optimizer is not None: + optimizer.zero_grad() + if isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1: + # run pipeline forward backward when enabling pp in hybrid parallel plugin + output_dict = booster.execute_pipeline( + data_iter, model, criterion, optimizer, return_loss=True, return_outputs=True ) - progress = tqdm(range(len(train_dataloader)), desc=description) + loss, outputs = output_dict["loss"], output_dict["outputs"] else: - progress = range(len(train_dataloader)) - for _ in progress: - engine.zero_grad() - engine.execute_schedule(data_iter, return_output_label=False) - engine.step() - lr_scheduler.step() -``` - -### 开始训练 -```bash -export DATA= -# If your torch >= 1.10.0 -torchrun --standalone --nproc_per_node train_hybrid.py --config ./configs/config_pipeline_parallel.py -# If your torch >= 1.9.0 -# python -m torch.distributed.run --standalone --nproc_per_node= train_hybrid.py --config ./configs/config_pipeline_parallel.py + batch = next(data_iter) + batch = move_to_cuda(batch, torch.cuda.current_device()) + outputs = model(**batch) + loss = criterion(outputs, None) + if optimizer is not None: + booster.backward(loss, optimizer) + +def train_epoch( + epoch: int, + model: nn.Module, + optimizer: Optimizer, + criterion: Callable[[Any, Any], torch.Tensor], + lr_scheduler: LRScheduler, + dataloader: DataLoader, + booster: Booster, + coordinator: DistCoordinator, +): + torch.cuda.synchronize() + + num_steps = len(dataloader) + data_iter = iter(dataloader) + enable_pbar = coordinator.is_master() + if isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1: + # when using pp, only the last stage of master pipeline (dp_rank and tp_rank are both zero) shows pbar + tp_rank = dist.get_rank(booster.plugin.tp_group) + dp_rank = dist.get_rank(booster.plugin.dp_group) + enable_pbar = tp_rank == 0 and dp_rank == 0 and booster.plugin.stage_manager.is_last_stage() + model.train() + + with tqdm(range(num_steps), desc=f"Epoch [{epoch + 1}]", disable=not enable_pbar) as pbar: + for _ in pbar: + loss, _ = run_forward_backward(model, optimizer, criterion, data_iter, booster) + optimizer.step() + lr_scheduler.step() + + # Print batch loss + if enable_pbar: + pbar.set_postfix({"loss": loss.item()}) ``` - - - - -## 张量并行和异构并行 -张量并行将每个权重参数跨多个设备进行分区,以减少内存负载。Colossal-AI 支持 1D、2D、2.5D 和 3D 张量并行。此外,还可以将张量并行、流水线并行和数据并行结合起来,实现混合并行。Colossal-AI 还提供了一种简单的方法来应用张量并行和混合并行。只需在配置文件中更改几行代码即可实现流水线并行。 - -### 构造您的配置文件 (`/hybrid_parallel/configs/vit_1d_tp2_pp2.py`) -使用张量并行,只需将相关信息添加到 **parallel dict**。具体而言,`TENSOR_PARALLEL_MODE` 可以是“1d”、“2d”、“2.5d”、“3d”。不同并行度的大小应满足:`#GPUs = pipeline parallel size x tensor parallel size x data parallel size`。在指定 GPU 数量、流水线并行大小和张量并行大小后 `data parallel size` 会自动计算。 - -```python -from colossalai.amp import AMP_TYPE -# parallel setting -TENSOR_PARALLEL_SIZE = 2 -TENSOR_PARALLEL_MODE = '1d' -parallel = dict( - pipeline=2, - tensor=dict(mode=TENSOR_PARALLEL_MODE, size=TENSOR_PARALLEL_SIZE) -) -fp16 = dict(mode=AMP_TYPE.NAIVE) -clip_grad_norm = 1.0 -# pipeline config -NUM_MICRO_BATCHES = parallel['pipeline'] -TENSOR_SHAPE = (BATCH_SIZE // NUM_MICRO_BATCHES, SEQ_LENGTH, HIDDEN_SIZE) -``` - -其他配置: +开始训练模型 ```python -# hyperparameters -# BATCH_SIZE is as per GPU -# global batch size = BATCH_SIZE x data parallel size -BATCH_SIZE = 256 -LEARNING_RATE = 3e-3 -WEIGHT_DECAY = 0.3 -NUM_EPOCHS = 300 -WARMUP_EPOCHS = 32 -# model config -IMG_SIZE = 224 -PATCH_SIZE = 16 -HIDDEN_SIZE = 768 -DEPTH = 12 -NUM_HEADS = 12 -MLP_RATIO = 4 -NUM_CLASSES = 10 -CHECKPOINT = True -SEQ_LENGTH = (IMG_SIZE // PATCH_SIZE) ** 2 + 1 # add 1 for cls token -``` - -### 开始训练 -```bash -export DATA= -# If your torch >= 1.10.0 -torchrun --standalone --nproc_per_node train_hybrid.py --config ./configs/config_hybrid_parallel.py -# If your torch >= 1.9.0 -# python -m torch.distributed.run --standalone --nproc_per_node= train_hybrid.py --config ./configs/config_hybrid_parallel.py +for epoch in range(NUM_EPOCH): + train_epoch(epoch, model, optimizer, criterion, lr_scheduler, train_dataloader, booster, coordinator) ```