diff --git a/configs/moe_cfg.py b/configs/moe_cfg.py deleted file mode 100644 index 89e1a96..0000000 --- a/configs/moe_cfg.py +++ /dev/null @@ -1,152 +0,0 @@ -JOB_NAME = "7b_train" - -SEQ_LEN = 2048 -HIDDEN_SIZE = 4096 -NUM_ATTENTION_HEAD = 32 -MLP_RATIO = 8 / 3 -NUM_LAYER = 16 -VOCAB_SIZE = 103168 - -MODEL_ONLY_FOLDER = "local:llm_ckpts/xxxx" -# Ckpt folder format: -# fs: 'local:/mnt/nfs/XXX' -SAVE_CKPT_FOLDER = "local:llm_ckpts" -LOAD_CKPT_FOLDER = "local:llm_ckpts/49" - -# boto3 Ckpt folder format: -# import os -# BOTO3_IP = os.environ["BOTO3_IP"] # boto3 bucket endpoint -# SAVE_CKPT_FOLDER = f"boto3:s3://model_weights.{BOTO3_IP}/internlm" -# LOAD_CKPT_FOLDER = f"boto3:s3://model_weights.{BOTO3_IP}/internlm/snapshot/1/" -CHECKPOINT_EVERY = 50 -ckpt = dict( - enable_save_ckpt=False, # enable ckpt save. - save_ckpt_folder=SAVE_CKPT_FOLDER, # Path to save training ckpt. - # load_ckpt_folder=LOAD_CKPT_FOLDER, # Ckpt path to resume training(load weights and scheduler/context states). - # load_model_only_folder=MODEL_ONLY_FOLDER, # Path to initialize with given model weights. - load_optimizer=True, # Wheter to load optimizer states when continuing training. - checkpoint_every=CHECKPOINT_EVERY, - async_upload=True, # async ckpt upload. (only work for boto3 ckpt) - async_upload_tmp_folder="/dev/shm/internlm_tmp_ckpt/", # path for temporarily files during asynchronous upload. - snapshot_ckpt_folder="/".join([SAVE_CKPT_FOLDER, "snapshot"]), # directory for snapshot ckpt storage path. - oss_snapshot_freq=int(CHECKPOINT_EVERY / 2), # snapshot ckpt save frequency. -) - -TRAIN_FOLDER = "/mnt/petrelfs/share_data/llm_data/0623_scratch_tokenized_filtered/train/en/enwiki" -VALID_FOLDER = "/mnt/petrelfs/share_data/llm_data/0623_scratch_tokenized_filtered/train/en/enwiki" -data = dict( - seq_len=SEQ_LEN, - # micro_num means the number of micro_batch contained in one gradient update - micro_num=4, - packed_length = 2 * SEQ_LEN, - micro_bsz=2, - # defaults to the value of micro_num - valid_micro_num=4, - # defaults to 0, means disable evaluate - valid_every=50000, - pack_sample_into_one=False, - total_steps=50000, - skip_batches="", - rampup_batch_size="", - # Datasets with less than 50 rows will be discarded - min_length=50, - train_folder=TRAIN_FOLDER, - valid_folder=VALID_FOLDER, -) - -grad_scaler = dict( - fp16=dict( - # the initial loss scale, defaults to 2**16 - initial_scale=2**16, - # the minimum loss scale, defaults to None - min_scale=1, - # the number of steps to increase loss scale when no overflow occurs - growth_interval=1000, - ), - # the multiplication factor for increasing loss scale, defaults to 2 - growth_factor=2, - # the multiplication factor for decreasing loss scale, defaults to 0.5 - backoff_factor=0.5, - # the maximum loss scale, defaults to None - max_scale=2**24, - # the number of overflows before decreasing loss scale, defaults to 2 - hysteresis=2, -) - -hybrid_zero_optimizer = dict( - # Enable low_level_optimzer overlap_communication - zero_overlap_communication=True, - # bucket size for nccl communication params - reduce_bucket_size=512 * 1024 * 1024, - # grad clipping - clip_grad_norm=1.0, -) - -loss = dict( - label_smoothing=0, - moe_loss_coeff=0.1, -) - -adam = dict( - lr=1e-4, - adam_beta1=0.9, - adam_beta2=0.95, - adam_beta2_c=0, - adam_eps=1e-8, - weight_decay=0.01, -) - -lr_scheduler = dict( - total_steps=data["total_steps"], - init_steps=0, # optimizer_warmup_step - warmup_ratio=0.01, - eta_min=1e-5, - last_epoch=-1, -) - -beta2_scheduler = dict( - init_beta2=adam["adam_beta2"], - c=adam["adam_beta2_c"], - cur_iter=-1, -) - -model = dict( - checkpoint=False, - num_attention_heads=NUM_ATTENTION_HEAD, - embed_split_hidden=True, - vocab_size=VOCAB_SIZE, - embed_grad_scale=1, - parallel_output=True, - hidden_size=HIDDEN_SIZE, - num_layers=NUM_LAYER, - mlp_ratio=MLP_RATIO, - apply_post_layer_norm=False, - dtype="torch.bfloat16", - norm_type="rmsnorm", - layer_norm_epsilon=1e-5, - use_flash_attn=True, - num_chunks=1, # if num_chunks > 1, interleaved pipeline scheduler is used. - sequence_parallel=False, - num_experts=4, - moe_use_residual=False, -) -""" -zero1 parallel: - 1. if zero1 <= 0, The size of the zero process group is equal to the size of the dp process group, - so parameters will be divided within the range of dp. - 2. if zero1 == 1, zero is not used, and all dp groups retain the full amount of model parameters. - 3. zero1 > 1 and zero1 <= dp world size, the world size of zero is a subset of dp world size. - For smaller models, it is usually a better choice to split the parameters within nodes with a setting <= 8. -pipeline parallel (dict): - 1. size: int, the size of pipeline parallel. - 2. interleaved_overlap: bool, enable/disable communication overlap when using interleaved pipeline scheduler. -tensor parallel: tensor parallel size, usually the number of GPUs per node. -""" -parallel = dict( - # zero1=4, - pipeline=dict(size=4, interleaved_overlap=False), - # tensor=dict(size=4), -) - -cudnn_deterministic = False -cudnn_benchmark = False diff --git a/internlm/core/context/process_group_initializer.py b/internlm/core/context/process_group_initializer.py index 2653ad4..fbed8bc 100644 --- a/internlm/core/context/process_group_initializer.py +++ b/internlm/core/context/process_group_initializer.py @@ -422,8 +422,6 @@ class Initializer_Expert_Data(ProcessGroupInitializer): super().__init__(*args, **kwargs) self.num_expert_parallel_group = self.world_size // self.expert_parallel_size - assert self.world_size % self.rank_num_per_expert_group == 0 - def _get_expert_parallel_ranks(self): """ Create expert and data parallel groups @@ -434,17 +432,18 @@ class Initializer_Expert_Data(ProcessGroupInitializer): expert_data_parallel_group = [0,4], [2,6], [1,5], [3,7] """ data_parallel_groups = [] - for i in range(self.model_parallel_size): - data_parallel_groups.append(list(range(i, self.world_size, self.model_parallel_size))) + model_parallel_size = self.pipeline_parallel_size * self.tensor_parallel_size + for i in range(model_parallel_size): + data_parallel_groups.append(list(range(i, self.world_size, model_parallel_size))) expert_parallel_groups = [] expert_data_parallel_groups = [] - for dp_ranks in range(self.num_expert_parallel_group): + for dp_ranks in data_parallel_groups: # partition of expert parallel group, e.g. [0,2], [4,6] part_ep_group = [] for i in range(0, self.data_parallel_size, self.expert_parallel_size): part_ep_group.append(dp_ranks[i : i + self.expert_parallel_size]) - expert_data_parallel_groups.extend(part_ep_group) + expert_parallel_groups.extend(part_ep_group) for expert_dp_ranks in zip(*part_ep_group): expert_data_parallel_groups.append(list(expert_dp_ranks)) @@ -458,6 +457,11 @@ class Initializer_Expert_Data(ProcessGroupInitializer): list: [(local_rank, group_world_size, process_group, ranks_in_group, mode), ...]: A length 2 list consists of expert parallelism's and expert data parallelism's information tuple. """ + local_rank = None + ranks_in_group = None + process_group = None + cpu_group = None + group_world_size = None expert_parallel_groups, expert_data_parallel_groups = self._get_expert_parallel_ranks() groups = [] @@ -473,7 +477,9 @@ class Initializer_Expert_Data(ProcessGroupInitializer): process_group = group cpu_group = group_cpu ranks_in_group = ranks - groups.append((local_rank, group_world_size, process_group, cpu_group, ranks_in_group, ParallelMode.EXPERT)) + groups.append( + (local_rank, group_world_size, process_group, cpu_group, ranks_in_group, ParallelMode.EXPERT) + ) for ranks in expert_data_parallel_groups: group = dist.new_group(ranks) @@ -487,8 +493,8 @@ class Initializer_Expert_Data(ProcessGroupInitializer): process_group = group cpu_group = group_cpu ranks_in_group = ranks - groups.append( - (local_rank, group_world_size, process_group, cpu_group, ranks_in_group, ParallelMode.EXPERT_DATA) - ) + groups.append( + (local_rank, group_world_size, process_group, cpu_group, ranks_in_group, ParallelMode.EXPERT_DATA) + ) return groups diff --git a/internlm/initialize/launch.py b/internlm/initialize/launch.py index a69a506..2527801 100644 --- a/internlm/initialize/launch.py +++ b/internlm/initialize/launch.py @@ -268,6 +268,8 @@ and 'load_given_ckpt' is True, so internlm will load from 'load_ckpt_folder'" # process the model config if "use_flash_attn" not in gpc.config.model: gpc.config.model._add_item("use_flash_attn", True) + if "num_experts" not in model: + model._add_item("num_experts", 0) # process the parallel config if "sequence_parallel" not in gpc.config.parallel: diff --git a/internlm/model/modeling_internlm.py b/internlm/model/modeling_internlm.py index 4cb64c3..d816618 100644 --- a/internlm/model/modeling_internlm.py +++ b/internlm/model/modeling_internlm.py @@ -133,7 +133,7 @@ class PackedFlashBaseLayer1D(nn.Module): self.moe_use_rts = moe_use_rts self.moe_use_residual = moe_use_residual ep_size = gpc.get_world_size(ParallelMode.EXPERT) - if num_experts <= 1: # dense, not MoE + if num_experts == 0: # dense, not MoE if use_swiglu: self.mlp = FeedForward( hidden_size, @@ -173,6 +173,8 @@ class PackedFlashBaseLayer1D(nn.Module): drop_tokens=moe_drop_tokens, use_rts=moe_use_rts, use_residual=moe_use_residual, + device=device, + dtype=dtype, ) self.dropout2 = nn.Dropout(drop_rate) diff --git a/internlm/model/moe.py b/internlm/model/moe.py index 8ddbc48..a978d31 100644 --- a/internlm/model/moe.py +++ b/internlm/model/moe.py @@ -2,10 +2,10 @@ import typing from typing import Dict, Tuple import torch +from flash_attn.modules.mlp import ParallelFusedMLP from internlm.core.context import ParallelMode from internlm.core.context import global_context as gpc -from internlm.model.linear import FeedForward from internlm.moe.experts import Experts from internlm.moe.sharded_moe import MOELayer, TopKGate from internlm.utils.logger import get_logger @@ -33,7 +33,7 @@ def has_moe_layers(m): def is_moe_param(param: torch.Tensor) -> bool: - if hasattr(param, "all_reduce") and not param.all_reduce: + if hasattr(param, "is_expert") and param.is_expert: return True return False @@ -75,6 +75,8 @@ class MoE(torch.nn.Module): use_rts: bool = True, using_default_moe: bool = True, use_residual=False, + device=None, + dtype=None, ): super().__init__() @@ -86,10 +88,11 @@ class MoE(torch.nn.Module): self.num_experts = num_experts self.num_local_experts = num_experts // self.ep_size - logger.info( # pylint: disable=W1203 - f"Creating MoE layer with num_experts: {num_experts} | num_local_experts:" - f"{self.num_local_experts} | expert_parallel_size: {self.ep_size}" - ) + if gpc.is_rank_for_log(): + logger.info( # pylint: disable=W1203 + f"Creating MoE layer with num_experts: {num_experts} | num_local_experts:" + f"{self.num_local_experts} | expert_parallel_size: {self.ep_size}" + ) assert noisy_gate_policy is None or noisy_gate_policy in ["None", "Jitter", "RSample"], ( "Unsupported noisy_gate_policy: " + noisy_gate_policy ) @@ -98,14 +101,20 @@ class MoE(torch.nn.Module): expert_group_name = f"ep_size_{self.ep_size}" experts = torch.nn.ModuleList( [ - FeedForward( + # TODO have trouble when use internlm.model.linear.FeedForward + ParallelFusedMLP( hidden_size, int(hidden_size * gpc.config.model.mlp_ratio), out_features=hidden_size, + activation="gelu_approx", process_group=gpc.get_group(ParallelMode.TENSOR), - bias=False, - device=torch.device("cuda"), - dtype=torch.float, + bias1=False, + bias2=False, + sequence_parallel=gpc.config.model.sequence_parallel, + checkpoint_lvl=0, + heuristic="auto", + device=device, + dtype=dtype, ) for _ in range(self.num_local_experts) ] @@ -134,14 +143,19 @@ class MoE(torch.nn.Module): # residual network, see https://arxiv.org/pdf/2201.05596.pdf, seems useful for convergence self.use_residual = use_residual if use_residual: - self.residual_mlp = FeedForward( + self.residual_mlp = ParallelFusedMLP( hidden_size, int(hidden_size * gpc.config.model.mlp_ratio), out_features=hidden_size, + activation="gelu_approx", process_group=gpc.get_group(ParallelMode.TENSOR), - bias=False, - device=torch.device("cuda"), - dtype=torch.float, + bias1=False, + bias2=False, + sequence_parallel=gpc.config.model.sequence_parallel, + checkpoint_lvl=0, + heuristic="auto", + device=device, + dtype=dtype, ) # coefficient is used for weighted sum of the output of expert and residual mlp self.coefficient = torch.nn.Linear(hidden_size, 2) diff --git a/internlm/moe/experts.py b/internlm/moe/experts.py index 15e5289..d57714e 100644 --- a/internlm/moe/experts.py +++ b/internlm/moe/experts.py @@ -37,7 +37,7 @@ class Experts(torch.nn.Module): for expert in self.experts: # TODO: Create param groups to handle expert + data case (e.g. param.group = moe_group) for _, param in expert.named_parameters(): - param.all_reduce = False + param.belong_expert = True param.group_name = expert_group_name def forward(self, inputs): diff --git a/internlm/moe/sharded_moe.py b/internlm/moe/sharded_moe.py index c450365..66b41d0 100644 --- a/internlm/moe/sharded_moe.py +++ b/internlm/moe/sharded_moe.py @@ -375,7 +375,7 @@ class TopKGate(Module): if self.wall_clock_breakdown: timer("TopKGate").start() - if self.wg.weight.dtype != torch.float32: + if self.wg.weight.dtype != torch.float32: # TODO can we change it to fp16 self.wg = self.wg.float() inputs_fp32 = inputs.float() # input jittering diff --git a/internlm/train/training_internlm.py b/internlm/train/training_internlm.py index 7d22ba1..ec23155 100644 --- a/internlm/train/training_internlm.py +++ b/internlm/train/training_internlm.py @@ -100,7 +100,7 @@ def initialize_optimizer(model: Union[nn.Module, nn.ModuleList]): adam_cfg = gpc.config.adam # split the moe parameters into different groups - if gpc.config.model.num_experts > 1: + if gpc.config.model.num_experts != 0: params = create_moe_param_groups(model, adam_cfg.weight_decay) else: params = [{"params": model.parameters(), "weight_decay": adam_cfg.weight_decay}]