import typing import torch from internlm.core.context import ParallelMode from internlm.core.context import global_context as gpc from internlm.moe.experts import Experts from internlm.moe.sharded_moe import MOELayer, TopKGate from internlm.utils.logger import get_logger # Copyright (c) Microsoft Corporation. # SPDX-License-Identifier: Apache-2.0 # DeepSpeed Team # global llm logger logger = get_logger(__file__) def has_moe_layers(m): has_moe = False num_experts = 0 for _, module in m.named_modules(): if isinstance(module, MoE): has_moe = True num_experts = module.num_experts break return has_moe, num_experts def is_moe_param(param: torch.Tensor) -> bool: if hasattr(param, "allreduce") and not param.allreduce: return True return False class MoE(torch.nn.Module): """Initialize an MoE layer. Arguments: hidden_size (int): the hidden dimension of the model, importantly this is also the input and output dimension. expert (torch.nn.Module): the torch module that defines the expert (e.g., MLP, torch.linear). num_experts (int, optional): default=1, the total number of experts per layer. ep_size (int, optional): default=1, number of ranks in the expert parallel world or group. k (int, optional): default=1, top-k gating value, only supports k=1 or k=2. capacity_factor (float, optional): default=1.0, the capacity of the expert at training time. eval_capacity_factor (float, optional): default=1.0, the capacity of the expert at eval time. min_capacity (int, optional): default=4, the minimum capacity per expert regardless of the capacity_factor. noisy_gate_policy (str, optional): default=None, noisy gate policy, valid options are 'Jitter', 'RSample' or 'None'. drop_tokens (bool, optional): default=True, whether to drop tokens - (setting to False is equivalent to infinite capacity). use_rts (bool, optional): default=True, whether to use Random Token Selection. """ def __init__( self, hidden_size, expert, num_experts=1, ep_size=1, k=1, capacity_factor=1.0, eval_capacity_factor=1.0, min_capacity=4, noisy_gate_policy: typing.Optional[str] = None, drop_tokens: bool = True, use_rts: bool = True, using_default_moe: bool = True, ): super().__init__() assert ( num_experts % ep_size == 0 ), f"Number of experts ({num_experts}) should be divisible by expert parallel size ({ep_size})" self.ep_size = ep_size self.num_experts = num_experts self.num_local_experts = num_experts // self.ep_size logger.info( f"""Creating MoE layer with num_experts: {num_experts} | num_local_experts: {self.num_local_experts} | expert_parallel_size: {self.ep_size}""" ) assert noisy_gate_policy is None or noisy_gate_policy in ["None", "Jitter", "RSample"], ( "Unsupported noisy_gate_policy: " + noisy_gate_policy ) experts = Experts(expert, self.num_local_experts) if using_default_moe: self.moe_layer = MOELayer( TopKGate( hidden_size, num_experts, k, capacity_factor, eval_capacity_factor, min_capacity, noisy_gate_policy, drop_tokens, use_rts, ), experts, gpc.get_group(ParallelMode.EXPERT), self.ep_size, self.num_local_experts, ) def forward(self, hidden_states, used_token=None): """MoE forward Arguments: hidden_states (Tensor): input to the layer used_token (Tensor, optional): default: None, mask only used tokens Returns: A tuple including output, gate loss, and expert count. * output (Tensor): output of the model * l_aux (Tensor): gate loss value * exp_counts (int): expert count """ output = self.moe_layer(hidden_states, used_token) return output, self.moe_layer.l_aux, self.moe_layer.exp_counts