pull/182/head
Wenwen Qu 2023-08-30 16:14:33 +08:00
parent f3da80a7ca
commit b021995199
8 changed files with 52 additions and 180 deletions

View File

@ -1,152 +0,0 @@
JOB_NAME = "7b_train"
SEQ_LEN = 2048
HIDDEN_SIZE = 4096
NUM_ATTENTION_HEAD = 32
MLP_RATIO = 8 / 3
NUM_LAYER = 16
VOCAB_SIZE = 103168
MODEL_ONLY_FOLDER = "local:llm_ckpts/xxxx"
# Ckpt folder format:
# fs: 'local:/mnt/nfs/XXX'
SAVE_CKPT_FOLDER = "local:llm_ckpts"
LOAD_CKPT_FOLDER = "local:llm_ckpts/49"
# boto3 Ckpt folder format:
# import os
# BOTO3_IP = os.environ["BOTO3_IP"] # boto3 bucket endpoint
# SAVE_CKPT_FOLDER = f"boto3:s3://model_weights.{BOTO3_IP}/internlm"
# LOAD_CKPT_FOLDER = f"boto3:s3://model_weights.{BOTO3_IP}/internlm/snapshot/1/"
CHECKPOINT_EVERY = 50
ckpt = dict(
enable_save_ckpt=False, # enable ckpt save.
save_ckpt_folder=SAVE_CKPT_FOLDER, # Path to save training ckpt.
# load_ckpt_folder=LOAD_CKPT_FOLDER, # Ckpt path to resume training(load weights and scheduler/context states).
# load_model_only_folder=MODEL_ONLY_FOLDER, # Path to initialize with given model weights.
load_optimizer=True, # Wheter to load optimizer states when continuing training.
checkpoint_every=CHECKPOINT_EVERY,
async_upload=True, # async ckpt upload. (only work for boto3 ckpt)
async_upload_tmp_folder="/dev/shm/internlm_tmp_ckpt/", # path for temporarily files during asynchronous upload.
snapshot_ckpt_folder="/".join([SAVE_CKPT_FOLDER, "snapshot"]), # directory for snapshot ckpt storage path.
oss_snapshot_freq=int(CHECKPOINT_EVERY / 2), # snapshot ckpt save frequency.
)
TRAIN_FOLDER = "/mnt/petrelfs/share_data/llm_data/0623_scratch_tokenized_filtered/train/en/enwiki"
VALID_FOLDER = "/mnt/petrelfs/share_data/llm_data/0623_scratch_tokenized_filtered/train/en/enwiki"
data = dict(
seq_len=SEQ_LEN,
# micro_num means the number of micro_batch contained in one gradient update
micro_num=4,
packed_length = 2 * SEQ_LEN,
micro_bsz=2,
# defaults to the value of micro_num
valid_micro_num=4,
# defaults to 0, means disable evaluate
valid_every=50000,
pack_sample_into_one=False,
total_steps=50000,
skip_batches="",
rampup_batch_size="",
# Datasets with less than 50 rows will be discarded
min_length=50,
train_folder=TRAIN_FOLDER,
valid_folder=VALID_FOLDER,
)
grad_scaler = dict(
fp16=dict(
# the initial loss scale, defaults to 2**16
initial_scale=2**16,
# the minimum loss scale, defaults to None
min_scale=1,
# the number of steps to increase loss scale when no overflow occurs
growth_interval=1000,
),
# the multiplication factor for increasing loss scale, defaults to 2
growth_factor=2,
# the multiplication factor for decreasing loss scale, defaults to 0.5
backoff_factor=0.5,
# the maximum loss scale, defaults to None
max_scale=2**24,
# the number of overflows before decreasing loss scale, defaults to 2
hysteresis=2,
)
hybrid_zero_optimizer = dict(
# Enable low_level_optimzer overlap_communication
zero_overlap_communication=True,
# bucket size for nccl communication params
reduce_bucket_size=512 * 1024 * 1024,
# grad clipping
clip_grad_norm=1.0,
)
loss = dict(
label_smoothing=0,
moe_loss_coeff=0.1,
)
adam = dict(
lr=1e-4,
adam_beta1=0.9,
adam_beta2=0.95,
adam_beta2_c=0,
adam_eps=1e-8,
weight_decay=0.01,
)
lr_scheduler = dict(
total_steps=data["total_steps"],
init_steps=0, # optimizer_warmup_step
warmup_ratio=0.01,
eta_min=1e-5,
last_epoch=-1,
)
beta2_scheduler = dict(
init_beta2=adam["adam_beta2"],
c=adam["adam_beta2_c"],
cur_iter=-1,
)
model = dict(
checkpoint=False,
num_attention_heads=NUM_ATTENTION_HEAD,
embed_split_hidden=True,
vocab_size=VOCAB_SIZE,
embed_grad_scale=1,
parallel_output=True,
hidden_size=HIDDEN_SIZE,
num_layers=NUM_LAYER,
mlp_ratio=MLP_RATIO,
apply_post_layer_norm=False,
dtype="torch.bfloat16",
norm_type="rmsnorm",
layer_norm_epsilon=1e-5,
use_flash_attn=True,
num_chunks=1, # if num_chunks > 1, interleaved pipeline scheduler is used.
sequence_parallel=False,
num_experts=4,
moe_use_residual=False,
)
"""
zero1 parallel:
1. if zero1 <= 0, The size of the zero process group is equal to the size of the dp process group,
so parameters will be divided within the range of dp.
2. if zero1 == 1, zero is not used, and all dp groups retain the full amount of model parameters.
3. zero1 > 1 and zero1 <= dp world size, the world size of zero is a subset of dp world size.
For smaller models, it is usually a better choice to split the parameters within nodes with a setting <= 8.
pipeline parallel (dict):
1. size: int, the size of pipeline parallel.
2. interleaved_overlap: bool, enable/disable communication overlap when using interleaved pipeline scheduler.
tensor parallel: tensor parallel size, usually the number of GPUs per node.
"""
parallel = dict(
# zero1=4,
pipeline=dict(size=4, interleaved_overlap=False),
# tensor=dict(size=4),
)
cudnn_deterministic = False
cudnn_benchmark = False

View File

@ -422,8 +422,6 @@ class Initializer_Expert_Data(ProcessGroupInitializer):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.num_expert_parallel_group = self.world_size // self.expert_parallel_size self.num_expert_parallel_group = self.world_size // self.expert_parallel_size
assert self.world_size % self.rank_num_per_expert_group == 0
def _get_expert_parallel_ranks(self): def _get_expert_parallel_ranks(self):
""" """
Create expert and data parallel groups Create expert and data parallel groups
@ -434,17 +432,18 @@ class Initializer_Expert_Data(ProcessGroupInitializer):
expert_data_parallel_group = [0,4], [2,6], [1,5], [3,7] expert_data_parallel_group = [0,4], [2,6], [1,5], [3,7]
""" """
data_parallel_groups = [] data_parallel_groups = []
for i in range(self.model_parallel_size): model_parallel_size = self.pipeline_parallel_size * self.tensor_parallel_size
data_parallel_groups.append(list(range(i, self.world_size, self.model_parallel_size))) for i in range(model_parallel_size):
data_parallel_groups.append(list(range(i, self.world_size, model_parallel_size)))
expert_parallel_groups = [] expert_parallel_groups = []
expert_data_parallel_groups = [] expert_data_parallel_groups = []
for dp_ranks in range(self.num_expert_parallel_group): for dp_ranks in data_parallel_groups:
# partition of expert parallel group, e.g. [0,2], [4,6] # partition of expert parallel group, e.g. [0,2], [4,6]
part_ep_group = [] part_ep_group = []
for i in range(0, self.data_parallel_size, self.expert_parallel_size): for i in range(0, self.data_parallel_size, self.expert_parallel_size):
part_ep_group.append(dp_ranks[i : i + self.expert_parallel_size]) part_ep_group.append(dp_ranks[i : i + self.expert_parallel_size])
expert_data_parallel_groups.extend(part_ep_group) expert_parallel_groups.extend(part_ep_group)
for expert_dp_ranks in zip(*part_ep_group): for expert_dp_ranks in zip(*part_ep_group):
expert_data_parallel_groups.append(list(expert_dp_ranks)) expert_data_parallel_groups.append(list(expert_dp_ranks))
@ -458,6 +457,11 @@ class Initializer_Expert_Data(ProcessGroupInitializer):
list: [(local_rank, group_world_size, process_group, ranks_in_group, mode), ...]: list: [(local_rank, group_world_size, process_group, ranks_in_group, mode), ...]:
A length 2 list consists of expert parallelism's and expert data parallelism's information tuple. A length 2 list consists of expert parallelism's and expert data parallelism's information tuple.
""" """
local_rank = None
ranks_in_group = None
process_group = None
cpu_group = None
group_world_size = None
expert_parallel_groups, expert_data_parallel_groups = self._get_expert_parallel_ranks() expert_parallel_groups, expert_data_parallel_groups = self._get_expert_parallel_ranks()
groups = [] groups = []
@ -473,7 +477,9 @@ class Initializer_Expert_Data(ProcessGroupInitializer):
process_group = group process_group = group
cpu_group = group_cpu cpu_group = group_cpu
ranks_in_group = ranks ranks_in_group = ranks
groups.append((local_rank, group_world_size, process_group, cpu_group, ranks_in_group, ParallelMode.EXPERT)) groups.append(
(local_rank, group_world_size, process_group, cpu_group, ranks_in_group, ParallelMode.EXPERT)
)
for ranks in expert_data_parallel_groups: for ranks in expert_data_parallel_groups:
group = dist.new_group(ranks) group = dist.new_group(ranks)
@ -487,8 +493,8 @@ class Initializer_Expert_Data(ProcessGroupInitializer):
process_group = group process_group = group
cpu_group = group_cpu cpu_group = group_cpu
ranks_in_group = ranks ranks_in_group = ranks
groups.append( groups.append(
(local_rank, group_world_size, process_group, cpu_group, ranks_in_group, ParallelMode.EXPERT_DATA) (local_rank, group_world_size, process_group, cpu_group, ranks_in_group, ParallelMode.EXPERT_DATA)
) )
return groups return groups

View File

@ -268,6 +268,8 @@ and 'load_given_ckpt' is True, so internlm will load from 'load_ckpt_folder'"
# process the model config # process the model config
if "use_flash_attn" not in gpc.config.model: if "use_flash_attn" not in gpc.config.model:
gpc.config.model._add_item("use_flash_attn", True) gpc.config.model._add_item("use_flash_attn", True)
if "num_experts" not in model:
model._add_item("num_experts", 0)
# process the parallel config # process the parallel config
if "sequence_parallel" not in gpc.config.parallel: if "sequence_parallel" not in gpc.config.parallel:

View File

@ -133,7 +133,7 @@ class PackedFlashBaseLayer1D(nn.Module):
self.moe_use_rts = moe_use_rts self.moe_use_rts = moe_use_rts
self.moe_use_residual = moe_use_residual self.moe_use_residual = moe_use_residual
ep_size = gpc.get_world_size(ParallelMode.EXPERT) ep_size = gpc.get_world_size(ParallelMode.EXPERT)
if num_experts <= 1: # dense, not MoE if num_experts == 0: # dense, not MoE
if use_swiglu: if use_swiglu:
self.mlp = FeedForward( self.mlp = FeedForward(
hidden_size, hidden_size,
@ -173,6 +173,8 @@ class PackedFlashBaseLayer1D(nn.Module):
drop_tokens=moe_drop_tokens, drop_tokens=moe_drop_tokens,
use_rts=moe_use_rts, use_rts=moe_use_rts,
use_residual=moe_use_residual, use_residual=moe_use_residual,
device=device,
dtype=dtype,
) )
self.dropout2 = nn.Dropout(drop_rate) self.dropout2 = nn.Dropout(drop_rate)

View File

@ -2,10 +2,10 @@ import typing
from typing import Dict, Tuple from typing import Dict, Tuple
import torch import torch
from flash_attn.modules.mlp import ParallelFusedMLP
from internlm.core.context import ParallelMode from internlm.core.context import ParallelMode
from internlm.core.context import global_context as gpc from internlm.core.context import global_context as gpc
from internlm.model.linear import FeedForward
from internlm.moe.experts import Experts from internlm.moe.experts import Experts
from internlm.moe.sharded_moe import MOELayer, TopKGate from internlm.moe.sharded_moe import MOELayer, TopKGate
from internlm.utils.logger import get_logger from internlm.utils.logger import get_logger
@ -33,7 +33,7 @@ def has_moe_layers(m):
def is_moe_param(param: torch.Tensor) -> bool: def is_moe_param(param: torch.Tensor) -> bool:
if hasattr(param, "all_reduce") and not param.all_reduce: if hasattr(param, "is_expert") and param.is_expert:
return True return True
return False return False
@ -75,6 +75,8 @@ class MoE(torch.nn.Module):
use_rts: bool = True, use_rts: bool = True,
using_default_moe: bool = True, using_default_moe: bool = True,
use_residual=False, use_residual=False,
device=None,
dtype=None,
): ):
super().__init__() super().__init__()
@ -86,10 +88,11 @@ class MoE(torch.nn.Module):
self.num_experts = num_experts self.num_experts = num_experts
self.num_local_experts = num_experts // self.ep_size self.num_local_experts = num_experts // self.ep_size
logger.info( # pylint: disable=W1203 if gpc.is_rank_for_log():
f"Creating MoE layer with num_experts: {num_experts} | num_local_experts:" logger.info( # pylint: disable=W1203
f"{self.num_local_experts} | expert_parallel_size: {self.ep_size}" f"Creating MoE layer with num_experts: {num_experts} | num_local_experts:"
) f"{self.num_local_experts} | expert_parallel_size: {self.ep_size}"
)
assert noisy_gate_policy is None or noisy_gate_policy in ["None", "Jitter", "RSample"], ( assert noisy_gate_policy is None or noisy_gate_policy in ["None", "Jitter", "RSample"], (
"Unsupported noisy_gate_policy: " + noisy_gate_policy "Unsupported noisy_gate_policy: " + noisy_gate_policy
) )
@ -98,14 +101,20 @@ class MoE(torch.nn.Module):
expert_group_name = f"ep_size_{self.ep_size}" expert_group_name = f"ep_size_{self.ep_size}"
experts = torch.nn.ModuleList( experts = torch.nn.ModuleList(
[ [
FeedForward( # TODO have trouble when use internlm.model.linear.FeedForward
ParallelFusedMLP(
hidden_size, hidden_size,
int(hidden_size * gpc.config.model.mlp_ratio), int(hidden_size * gpc.config.model.mlp_ratio),
out_features=hidden_size, out_features=hidden_size,
activation="gelu_approx",
process_group=gpc.get_group(ParallelMode.TENSOR), process_group=gpc.get_group(ParallelMode.TENSOR),
bias=False, bias1=False,
device=torch.device("cuda"), bias2=False,
dtype=torch.float, sequence_parallel=gpc.config.model.sequence_parallel,
checkpoint_lvl=0,
heuristic="auto",
device=device,
dtype=dtype,
) )
for _ in range(self.num_local_experts) for _ in range(self.num_local_experts)
] ]
@ -134,14 +143,19 @@ class MoE(torch.nn.Module):
# residual network, see https://arxiv.org/pdf/2201.05596.pdf, seems useful for convergence # residual network, see https://arxiv.org/pdf/2201.05596.pdf, seems useful for convergence
self.use_residual = use_residual self.use_residual = use_residual
if use_residual: if use_residual:
self.residual_mlp = FeedForward( self.residual_mlp = ParallelFusedMLP(
hidden_size, hidden_size,
int(hidden_size * gpc.config.model.mlp_ratio), int(hidden_size * gpc.config.model.mlp_ratio),
out_features=hidden_size, out_features=hidden_size,
activation="gelu_approx",
process_group=gpc.get_group(ParallelMode.TENSOR), process_group=gpc.get_group(ParallelMode.TENSOR),
bias=False, bias1=False,
device=torch.device("cuda"), bias2=False,
dtype=torch.float, sequence_parallel=gpc.config.model.sequence_parallel,
checkpoint_lvl=0,
heuristic="auto",
device=device,
dtype=dtype,
) )
# coefficient is used for weighted sum of the output of expert and residual mlp # coefficient is used for weighted sum of the output of expert and residual mlp
self.coefficient = torch.nn.Linear(hidden_size, 2) self.coefficient = torch.nn.Linear(hidden_size, 2)

View File

@ -37,7 +37,7 @@ class Experts(torch.nn.Module):
for expert in self.experts: for expert in self.experts:
# TODO: Create param groups to handle expert + data case (e.g. param.group = moe_group) # TODO: Create param groups to handle expert + data case (e.g. param.group = moe_group)
for _, param in expert.named_parameters(): for _, param in expert.named_parameters():
param.all_reduce = False param.belong_expert = True
param.group_name = expert_group_name param.group_name = expert_group_name
def forward(self, inputs): def forward(self, inputs):

View File

@ -375,7 +375,7 @@ class TopKGate(Module):
if self.wall_clock_breakdown: if self.wall_clock_breakdown:
timer("TopKGate").start() timer("TopKGate").start()
if self.wg.weight.dtype != torch.float32: if self.wg.weight.dtype != torch.float32: # TODO can we change it to fp16
self.wg = self.wg.float() self.wg = self.wg.float()
inputs_fp32 = inputs.float() inputs_fp32 = inputs.float()
# input jittering # input jittering

View File

@ -100,7 +100,7 @@ def initialize_optimizer(model: Union[nn.Module, nn.ModuleList]):
adam_cfg = gpc.config.adam adam_cfg = gpc.config.adam
# split the moe parameters into different groups # split the moe parameters into different groups
if gpc.config.model.num_experts > 1: if gpc.config.model.num_experts != 0:
params = create_moe_param_groups(model, adam_cfg.weight_decay) params = create_moe_param_groups(model, adam_cfg.weight_decay)
else: else:
params = [{"params": model.parameters(), "weight_decay": adam_cfg.weight_decay}] params = [{"params": model.parameters(), "weight_decay": adam_cfg.weight_decay}]