From bf0204604f61314316fdd43795ead7abf8b8c356 Mon Sep 17 00:00:00 2001 From: Jiarui Fang Date: Mon, 20 Feb 2023 10:35:55 +0800 Subject: [PATCH] [exmaple] add bert and albert (#2824) --- examples/language/bert/run_gemini.sh | 22 ++ examples/language/bert/test_ci.sh | 2 + examples/language/bert/train_bert_demo.py | 332 ++++++++++++++++++++++ 3 files changed, 356 insertions(+) create mode 100644 examples/language/bert/run_gemini.sh create mode 100644 examples/language/bert/test_ci.sh create mode 100644 examples/language/bert/train_bert_demo.py diff --git a/examples/language/bert/run_gemini.sh b/examples/language/bert/run_gemini.sh new file mode 100644 index 000000000..d791334e8 --- /dev/null +++ b/examples/language/bert/run_gemini.sh @@ -0,0 +1,22 @@ +set -x +# distplan in ["CAI_ZeRO1", "CAI_ZeRO2", "CAI_Gemini", "Pytorch_DDP", "Pytorch_ZeRO"] +export DISTPLAN=${DISTPLAN:-"CAI_Gemini"} + +# The following options only valid when DISTPLAN="colossalai" +export GPUNUM=${GPUNUM:-1} +export PLACEMENT=${PLACEMENT:-"cpu"} +export BATCH_SIZE=${BATCH_SIZE:-16} + +# bert | albert +export MODEL_TYPE=${MODEL_TYPE:-"bert"} +export TRAIN_STEP=${TRAIN_STEP:-10} + +mkdir -p gemini_logs + +env CUDA_LAUNCH_BLOCKING=1 torchrun --standalone --nproc_per_node=${GPUNUM} ./train_bert_demo.py \ +--model_type=${MODEL_TYPE} \ +--batch_size=${BATCH_SIZE} \ +--placement=${PLACEMENT} \ +--distplan=${DISTPLAN} \ +--train_step=${TRAIN_STEP} \ +2>&1 | tee ./gemini_logs/${MODEL_TYPE}_${DISTPLAN}_gpu_${GPUNUM}_bs_${BATCH_SIZE}_${PLACEMENT}.log diff --git a/examples/language/bert/test_ci.sh b/examples/language/bert/test_ci.sh new file mode 100644 index 000000000..42c63fec5 --- /dev/null +++ b/examples/language/bert/test_ci.sh @@ -0,0 +1,2 @@ +set -x +env GPUNUM=1 bash run_gemini.sh diff --git a/examples/language/bert/train_bert_demo.py b/examples/language/bert/train_bert_demo.py new file mode 100644 index 000000000..b690ff787 --- /dev/null +++ b/examples/language/bert/train_bert_demo.py @@ -0,0 +1,332 @@ +import os +from functools import partial +from time import time + +import psutil +import torch +from packaging import version +from torch import nn +from torch.nn.parallel import DistributedDataParallel as DDP +from transformers import AlbertConfig, AlbertForSequenceClassification, BertConfig, BertForSequenceClassification + +import colossalai +from colossalai.logging import disable_existing_loggers, get_dist_logger +from colossalai.nn.optimizer import HybridAdam +from colossalai.nn.parallel import zero_model_wrapper, zero_optim_wrapper +from colossalai.tensor import ColoParameter, ComputePattern, ComputeSpec, ProcessGroup, ReplicaSpec, ShardSpec +from colossalai.utils import get_current_device +from colossalai.utils.model.colo_init_context import ColoInitContext + +CAI_VERSION = colossalai.__version__ + + +def get_tflops(model_numel, batch_size, seq_len, step_time): + return model_numel * batch_size * seq_len * 8 / 1e12 / (step_time + 1e-12) + + +def get_profile_context(enable_flag, warmup_steps, active_steps, save_dir): + from contextlib import nullcontext + + from torch.profiler import ProfilerActivity, profile, schedule, tensorboard_trace_handler + if enable_flag: + return profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA], + schedule=schedule(wait=0, warmup=warmup_steps, active=active_steps), + on_trace_ready=tensorboard_trace_handler(save_dir), + record_shapes=True, + profile_memory=True) + else: + + class DummyProfiler: + + def __init__(self): + self.step_number = 0 + + def step(self): + self.step_number += 1 + + return nullcontext(DummyProfiler()) + + +def get_time_stamp(): + import time + cur_time = time.strftime("%d-%H:%M", time.localtime()) + return cur_time + + +def get_bert_data(batch_size: int, sequence_length: int, vacob_size: int, n_class: int, device: torch.device): + input = torch.randint( + low=0, + high=vacob_size, + size=(batch_size, sequence_length), + device=device, + dtype=torch.long, + ) + label = torch.randint(low=0, high=n_class, size=(batch_size,), device=device, dtype=torch.long) + return input, label + + +def parse_args(): + parser = colossalai.get_default_parser() + parser.add_argument( + "--distplan", + type=str, + default='CAI_Gemini', + help="The distributed plan [colossalai, zero1, zero2, torch_ddp, torch_zero].", + ) + parser.add_argument( + "--placement", + type=str, + default='cpu', + help="Placement Policy for Gemini. Valid when using colossalai as dist plan.", + ) + parser.add_argument( + "--batch_size", + type=int, + default=8, + help="batch size per DP group of training.", + ) + parser.add_argument( + "--model_type", + type=str, + default="bert", + help="bert or albert", + ) + parser.add_argument( + "--train_step", + type=int, + default=10, + help="training iterations for test", + ) + + args = parser.parse_args() + return args + + +SEQ_LEN = 512 +VOCAB_SIZE = 1000 +NUM_LABELS = 10 + + +# Parameter Sharding Strategies for Tensor Parallelism +def split_param_single_dim_tp1d(dim: int, param: ColoParameter, pg: ProcessGroup): + spec = (ShardSpec([dim], [pg.tp_world_size()]), ComputeSpec(ComputePattern.TP1D)) + param.set_tensor_spec(*spec) + + +def split_param_row_tp1d(param: ColoParameter, pg: ProcessGroup): + split_param_single_dim_tp1d(0, param, pg) + + +def split_param_col_tp1d(param: ColoParameter, pg: ProcessGroup): + split_param_single_dim_tp1d(-1, param, pg) + + +def get_cpu_mem(): + return psutil.Process().memory_info().rss / 1024**2 + + +def get_gpu_mem(): + return torch.cuda.memory_allocated() / 1024**2 + + +def get_mem_info(prefix=''): + return f'{prefix}GPU memory usage: {get_gpu_mem():.2f} MB, CPU memory usage: {get_cpu_mem():.2f} MB' + + +def get_model_size(model: nn.Module): + total_numel = 0 + for module in model.modules(): + for p in module.parameters(recurse=False): + total_numel += p.numel() + return total_numel + + +def model_builder(args): + if args.model_type == "bert": + cfg = BertConfig(vocab_size=VOCAB_SIZE, num_labels=NUM_LABELS) + return BertForSequenceClassification(cfg) + elif args.model_type == "albert": + cfg = AlbertConfig(vocab_size=VOCAB_SIZE, num_labels=NUM_LABELS) + return AlbertForSequenceClassification(cfg) + else: + raise RuntimeError + + +def model_size_formatter(numel: int) -> str: + GB_SIZE = 10**9 + MB_SIZE = 10**6 + KB_SIZE = 10**3 + if numel >= GB_SIZE: + return f'{numel / GB_SIZE:.1f}B' + elif numel >= MB_SIZE: + return f'{numel / MB_SIZE:.1f}M' + elif numel >= KB_SIZE: + return f'{numel / KB_SIZE:.1f}K' + else: + return str(numel) + + +def set_cpu_maximum_parallelism(): + conf_str = torch.__config__.parallel_info() + inter_str = conf_str.split("hardware_concurrency() : ")[1] + max_concurrency = inter_str.split('\n')[0] + os.environ["OMP_NUM_THREADS"] = max_concurrency + print(f"environmental variable OMP_NUM_THREADS is set to {max_concurrency}.") + + +def main(): + # version check + # this example is supposed to work for versions greater than 0.2.0 + assert version.parse(CAI_VERSION) >= version.parse("0.2.0") + + set_cpu_maximum_parallelism() + args = parse_args() + + # if args.distplan not in ["colossalai", "torch_ddp", "torch_zero", "zero1", "zero2"]: + if args.distplan not in ["CAI_ZeRO1", "CAI_ZeRO2", "CAI_Gemini", "Pytorch_DDP", "Pytorch_ZeRO"]: + raise TypeError(f"{args.distplan} is error") + + # batch size per DP degree + BATCH_SIZE = args.batch_size + + NUM_STEPS = args.train_step + + WARMUP_STEPS = 1 + assert WARMUP_STEPS < NUM_STEPS, "warmup steps should smaller than the total steps" + assert (NUM_STEPS - WARMUP_STEPS) % 2 == 1, "the number of valid steps should be odd to take the median" + PROF_FLAG = False # The flag of profiling, False by default + + disable_existing_loggers() + colossalai.launch_from_torch(config={}) + + logger = get_dist_logger() + logger.info(f" {args.distplan}, batch size {BATCH_SIZE}", ranks=[0]) + + torch.manual_seed(123) + if args.distplan.startswith("CAI"): + # all param must use the same process group. + world_size = torch.distributed.get_world_size() + + # build a base-bert model + with ColoInitContext(device=get_current_device(), dtype=torch.half): + model = model_builder(args) + # model = BertForSequenceClassification(BertConfig(vocal_size = VOCAB_SIZE)) + + # asign running configurations + gemini_config = None + if args.distplan.startswith("CAI_ZeRO"): + optim_config = dict(reduce_bucket_size=12 * 1024 * 1024, overlap_communication=True, verbose=True) + elif args.distplan == "CAI_Gemini": + gemini_config = dict(strict_ddp_mode=True, + device=get_current_device(), + placement_policy=args.placement, + pin_memory=True, + hidden_dim=model.config.hidden_size, + search_range_mb=128) + optim_config = dict(gpu_margin_mem_ratio=0.) + else: + raise RuntimeError + + # build a highly optimized gpu/cpu optimizer + optimizer = HybridAdam(model.parameters(), lr=1e-3) + + if args.distplan == "CAI_ZeRO1": + zero_stage = 1 + elif args.distplan == "CAI_ZeRO2": + zero_stage = 2 + elif args.distplan == "CAI_Gemini": + zero_stage = 3 + else: + raise RuntimeError + + # wrap your model and optimizer + model = zero_model_wrapper(model, zero_stage, gemini_config) + optimizer = zero_optim_wrapper(model, optimizer, optim_config=optim_config) + + logger.info(get_mem_info(prefix='After init optim, '), ranks=[0]) + elif args.distplan.startswith("Pytorch"): + model = model_builder(args).cuda() + model = DDP(model) + if args.distplan.endswith("DDP"): + optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) + elif args.distplan.endswith("ZeRO"): + from torch.distributed.optim import ZeroRedundancyOptimizer + optimizer = ZeroRedundancyOptimizer(model.parameters(), optimizer_class=torch.optim.Adam, lr=1e-3) + else: + raise RuntimeError + + # model is shared after TP + numel = get_model_size(model) + logger.info(f"the size of testing model size is {model_size_formatter(numel)}.") + logger.info(get_mem_info(prefix='After init model, '), ranks=[0]) + + # Tflops_per_GPU = global_batch * global_numel * seq_len * 8 / #gpu + # = (batch_per_DP_group * dp_degree) * (numel * tp_degree) * seq_len * 8 / (tp_degree * dp_degree) + # = batch_per_DP_group * numel * seq_len * 8 + get_tflops_func = partial(get_tflops, numel, BATCH_SIZE, SEQ_LEN) + + torch.cuda.synchronize() + model.train() + tflops_list = [] + + def train_step(): + # we just use randomly generated data here + input_ids, labels = get_bert_data(BATCH_SIZE, + SEQ_LEN, + VOCAB_SIZE, + NUM_LABELS, + device=torch.cuda.current_device()) + optimizer.zero_grad() + + start = time() + outputs = model(input_ids, labels=labels) + loss, logits = outputs[:2] + torch.cuda.synchronize() + fwd_end = time() + fwd_time = fwd_end - start + logger.info(get_mem_info(prefix=f'[{n + 1}/{NUM_STEPS}] Forward '), ranks=[0]) + + if args.distplan.startswith("CAI"): + optimizer.backward(loss) + elif args.distplan.startswith("Pytorch"): + loss.backward() + else: + raise RuntimeError + + torch.cuda.synchronize() + bwd_end = time() + bwd_time = bwd_end - fwd_end + logger.info(get_mem_info(prefix=f'[{n + 1}/{NUM_STEPS}] Backward '), ranks=[0]) + + optimizer.step() + torch.cuda.synchronize() + optim_time = time() - bwd_end + step_time = time() - start + logger.info(get_mem_info(prefix=f'[{n + 1}/{NUM_STEPS}] Optimizer step '), ranks=[0]) + + step_tflops = get_tflops_func(step_time) + logger.info( + f"[{n + 1}/{NUM_STEPS}] Loss:{loss.item():.3f}, Step time: {step_time:.3f}s, TFLOPS: {get_tflops_func(step_time):.3f}, FWD time: {fwd_time:.3f}s, BWD time: {bwd_time:.3f}s, OPTIM time: {optim_time:.3f}s", + ranks=[0], + ) + if n >= WARMUP_STEPS: + tflops_list.append(step_tflops) + + demo_profiler = get_profile_context(PROF_FLAG, + WARMUP_STEPS, + NUM_STEPS - WARMUP_STEPS, + save_dir=f"profile/{get_time_stamp()}-demo") + + with demo_profiler as prof: + for n in range(NUM_STEPS): + train_step() + prof.step() + + tflops_list.sort() + median_index = ((NUM_STEPS - WARMUP_STEPS) >> 1) + WARMUP_STEPS + logger.info(f"Median TFLOPS is {tflops_list[median_index]:.3f}") + torch.cuda.synchronize() + + +if __name__ == '__main__': + main()