merge upstream

pull/456/head
yingtongxiong 2023-10-25 14:46:45 +08:00
commit 985465c96a
34 changed files with 1417 additions and 121 deletions

76
.github/workflows/unit_tests.yaml vendored Normal file
View File

@ -0,0 +1,76 @@
name: unit-tests
on:
push:
branches:
- "develop"
- "main"
paths-ignore:
- "cmds/**"
- "**.md"
pull_request:
branches:
- "develop"
- "main"
paths-ignore:
- "cmds/**"
- "**.md"
env:
WORKSPACE_PREFIX: $(echo $GITHUB_WORKSPACE |cut -d '/' -f 1-4)
SLURM_PARTITION: llm_t
jobs:
check-requirements:
runs-on: [t_cluster]
steps:
- name: mask env
run: |
echo "::add-mask::${{env.WORKSPACE_PREFIX}}"
- uses: actions/checkout@v3
with:
fetch-depth: 2
- name: check-requirements
run: |
changed_files=$(git diff --name-only -r HEAD^1 HEAD)
echo $changed_files
if [[ $changed_files =~ "runtime.txt" ]]; then
pip install -r requirements/runtime.txt
fi
if [[ $changed_files =~ "torch.txt" ]]; then
pip install -r requirements/torch.txt
fi
unit_tests_core_pipeline:
if: ${{ always() }}
needs: check-requirements
runs-on: [t_cluster]
timeout-minutes: 20
steps:
- name: mask env
run: |
echo "::add-mask::${{env.WORKSPACE_PREFIX}}"
- uses: actions/checkout@v3
- name: core_pipeline
run: |
source /mnt/petrelfs/share_data/llm_env/env/llm-flash2.0
export PYTHONPATH=$PWD:$PYTHONPATH
srun -p ${SLURM_PARTITION} --job-name=internlm-ut-${GITHUB_RUN_ID}-${GITHUB_JOB} --quotatype=spot -N 1 -n 1 --gres=gpu:8 python -m pytest -s -v ./tests/test_core/test_pipeline.py
unit_tests_utils_storage_manager:
if: ${{ always() }}
needs: check-requirements
runs-on: [t_cluster]
timeout-minutes: 20
steps:
- name: mask env
run: |
echo "::add-mask::${{env.WORKSPACE_PREFIX}}"
- uses: actions/checkout@v3
- name: utils_storage_manager
run: |
source /mnt/petrelfs/share_data/llm_env/env/llm-flash2.0
export PYTHONPATH=$PWD:$PYTHONPATH
srun -p ${SLURM_PARTITION} --job-name=internlm-ut-${GITHUB_RUN_ID}-${GITHUB_JOB} --quotatype=spot -N 1 -n 1 --gres=gpu:8 python -m pytest -s -v ./tests/test_utils/test_storage_manager.py

View File

@ -3,6 +3,7 @@ on:
push:
branches:
- "main"
- "develop"
env:
SLURM_PARTITION: llm_s

View File

@ -4,7 +4,7 @@ DO_ALERT = False
SEQ_LEN = 2048
HIDDEN_SIZE = 4096
NUM_ATTENTION_HEAD = 32
MLP_RATIO = 8 / 3
MLP_RATIO = 4 / 3
NUM_LAYER = 32
VOCAB_SIZE = 103168
@ -30,6 +30,14 @@ ckpt = dict(
# 2. the 'content means what states will be loaded, support: "model", "sampler", "optimizer", "scheduler", "all"
# 3. the ckpt_type means the type of checkpoint to be loaded, now only 'normal' type is supported.
load_ckpt_info=dict(path=MODEL_ONLY_FOLDER, content=("model",), ckpt_type="internlm"),
# 'auto_resume' is designed to automatically load the latest checkpoint from 'save_ckpt_folder' when encountering
# training interruptions/hangs caused by hardware failures, using a scheduling system (such as k8s/slurm)
# with an automatic restart mechanism upon training reboot.
# Please be aware that if `auto_resume` is not set (its default value is True), it will not load the checkpoint
# path specified in `load_ckpt_info` by default.
# If you want to initialize your model weights from another model, you must set `auto_resume` to False.
# If you want to train from scratch, please set `auto_resume` to False and 'load_ckpt_info' to None.
auto_resume=True,
checkpoint_every=CHECKPOINT_EVERY,
async_upload=True, # async ckpt upload. (only work for boto3 ckpt)
async_upload_tmp_folder="/dev/shm/internlm_tmp_ckpt/", # path for temporarily files during asynchronous upload.
@ -43,7 +51,7 @@ data = dict(
# micro_num means the number of micro_batch contained in one gradient update
micro_num=4,
# packed_length = micro_bsz * SEQ_LEN
micro_bsz=1,
micro_bsz=2,
# defaults to the value of micro_num
valid_micro_num=4,
# defaults to 0, means disable evaluate
@ -81,8 +89,8 @@ grad_scaler = dict(
hybrid_zero_optimizer = dict(
# Enable low_level_optimzer overlap_communication
overlap_sync_grad=True,
overlap_sync_param=True,
overlap_sync_grad=False,
overlap_sync_param=False,
# bucket size for nccl communication params
reduce_bucket_size=512 * 1024 * 1024,
# grad clipping
@ -133,7 +141,7 @@ model = dict(
layer_norm_epsilon=1e-5,
use_flash_attn=True,
num_chunks=1, # if num_chunks > 1, interleaved pipeline scheduler is used.
num_experts=4,
num_experts=8,
moe_use_residual=False,
moe_gate_k=2,
)
@ -150,8 +158,8 @@ pipeline parallel (dict):
tensor parallel: tensor parallel size, usually the number of GPUs per node.
"""
parallel = dict(
zero1=-1,
tensor=2,
zero1=dict(size=-1, fsdp=False),
tensor=1,
pipeline=dict(size=1, interleaved_overlap=True),
sequence_parallel=False,
)

View File

@ -7,7 +7,7 @@ msgid ""
msgstr ""
"Project-Id-Version: InternLM \n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2023-09-07 10:56+0800\n"
"POT-Creation-Date: 2023-10-10 17:48+0800\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language: en\n"
@ -16,7 +16,7 @@ msgstr ""
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.12.1\n"
"Generated-By: Babel 2.13.0\n"
#: ../../source/index.rst:8 11e029810acf410180311a3c63eb01f4
msgid "InternLM"
@ -46,38 +46,42 @@ msgstr "Parallel Training"
msgid "混合精度"
msgstr "Mixed Precision"
#: ../../source/index.rst:59 9234725f3c464731993d73607608c874
#: ../../source/index.rst:59
msgid "混合专家模型"
msgstr "Mixture-of-Experts"
#: ../../source/index.rst:67 9234725f3c464731993d73607608c874
msgid "模型备份"
msgstr "Model Checkpointing"
#: ../../source/index.rst:67 8e4ce037017f4510b2892a66003877fa
#: ../../source/index.rst:75 8e4ce037017f4510b2892a66003877fa
msgid "性能分析"
msgstr "Profiler"
#: ../../source/index.rst:75 a36e02819ecd4b448a8cb4ebbecb6600
#: ../../source/index.rst:83 a36e02819ecd4b448a8cb4ebbecb6600
msgid "训练监控"
msgstr "Monitor"
#: ../../source/index.rst:83 b912e292486f455c8b5cdd75962e8ac2
#: ../../source/index.rst:91 b912e292486f455c8b5cdd75962e8ac2
msgid "训练样例"
msgstr "Example"
#: ../../source/index.rst:91 ea9e9281720941a1830e5df7a2badf7a
#: ../../source/index.rst:99 ea9e9281720941a1830e5df7a2badf7a
msgid "常见问题"
msgstr "Q&A"
#: ../../source/index.rst:99 e08edc5aa1c74965b10084b393b88fae
#: ../../source/index.rst:107 e08edc5aa1c74965b10084b393b88fae
msgid "索引和表格"
msgstr "Indices and tables"
#: ../../source/index.rst:101 f3fdca059caa49dcad09aa44be7f02d6
#: ../../source/index.rst:109 f3fdca059caa49dcad09aa44be7f02d6
msgid ":ref:`genindex`"
msgstr ""
#: ../../source/index.rst:102 b3791e811315435097bb507edc3f4b9b
#: ../../source/index.rst:110 b3791e811315435097bb507edc3f4b9b
msgid ":ref:`modindex`"
msgstr ""
#: ../../source/index.rst:103 a164b772960f4ab8b18c7e8820f69f55
#: ../../source/index.rst:111 a164b772960f4ab8b18c7e8820f69f55
msgid ":ref:`search`"
msgstr ""

View File

@ -0,0 +1,208 @@
# SOME DESCRIPTIVE TITLE.
# Copyright (C) 2023, InternLM Team
# This file is distributed under the same license as the InternLM package.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2023.
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: InternLM \n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2023-10-10 19:25+0800\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language: en\n"
"Language-Team: en <LL@li.org>\n"
"Plural-Forms: nplurals=2; plural=(n != 1);\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.12.1\n"
#: ../../source/moe.rst:2
msgid "混合专家模型"
msgstr "Mixture-of-Experts"
#: ../../source/moe.rst:3
msgid ""
"混合专家模型Mixture-of-Experts, MoE是一种特殊的模型结构。 "
"混合专家模型将模型拆分为一系列称为“专家”的子模型,每个“专家” 具有唯一的权重。 "
"混合专家模型可以针对每个输入标记仅激活一个或少量的专家参与运算。 例如,图 :ref:`switch_transformer` 是 `Switch"
" Transformer <https://arxiv.org/pdf/2101.03961.pdf>`_ "
"提出的稀疏混合专家模型结构其中的前向神经网络FFN被分解为多个子网络在计算时仅有少部分的模型参数参与计算以实现更有效的计算和资源分配。"
msgstr ""
"Mixture-of-Experts (MoE) is a special model structure. MoE partitions the model into a series of sub-models called \"experts\", "
"each with unique parameters. MoE only activates one or a small number of experts for each input token. For example, the figure :ref:`switch_transformer` "
" shows the sparse MoE architecture proposed by `Switch Transformer <https://arxiv.org/pdf/2101.03961.pdf>`_ . "
"The Forward Neural Network (FFN) is decomposed into multiple sub-networks, and only a small number of model parameters "
"are involved in the calculation to achieve more efficient calculation and resource allocation. "
#: ../../source/moe.rst:8
msgid ""
"稀疏混合专家模型通常还包含一个门控gating机制例如图 :ref:`switch_transformer` "
"中的Router网络。门控网络负责选择激活哪些专家参与计算并组合不同专家的预测结果。"
msgstr ""
"Sparse MoE usually also includes a gating mechanism, such as the Router in Figure :ref:`switch_transformer` . "
"The gating network is responsible for selecting which experts to activate and combining the prediction results of "
"different experts."
#: ../../source/moe.rst:16
msgid "switch transformer"
msgstr "switch transformer"
#: ../../source/moe.rst:19
msgid "参数配置"
msgstr "Parameter Settings"
#: ../../source/moe.rst:20
msgid "如果在启动训练时要使用混合专家模型,可进行如下相关配置:"
msgstr ""
"If MoE is expected to be used in the training, please make the following settings in the configuration file:"
#: ../../source/moe.rst:22
msgid "模型相关配置"
msgstr "Model related settings"
#: ../../source/moe.rst:31
msgid "num_experts专家网络个数。在InternLM中每个专家有着相同的网络结构但维护着不同的训练参数。"
msgstr ""
"num_experts: The number of expert networks. In InternLM, each expert has "
"the same network structure but maintains different training parameters."
#: ../../source/moe.rst:32
msgid ""
"moe_gate_k门控策略。决定如何将输入标记路由到不同的专家进行计算。目前InternLM支持top1gating和top2gating两种门控策略。关于这些门控策略的详细的信息可以参考"
" `GShard <https://arxiv.org/pdf/2006.16668.pdf>`_。"
msgstr ""
"moe_gate_k: Gating strategy. Determines how to route input tokens to "
"different experts for calculation. Currently, InternLM supports top1gating"
" and top2gating strategies. For detailed information about "
"these gating strategies, please refer to `GShard <https://arxiv.org/pdf/2006.16668.pdf>`_."
#: ../../source/moe.rst:34
msgid ""
"注意在目前的InternLM中每个专家都是根据配置文件中HIDDEN_SIZE和MLP_RATIO构造的一个 `SwiGLU网络 <https://arxiv.org/pdf/2002.05202.pdf>`_同时支持张量并行。用户可以根据需要构造自己的专家网络。"
msgstr ""
"Note: In the current version of InternLM, each expert is a `SwiGLU network <https://arxiv.org/pdf/2002.05202.pdf>`_ based on "
"HIDDEN_SIZE and MLP_RATIO in the configuration file, and supports tensor parallelism. Users can construct their own expert networks as needed."
#: ../../source/moe.rst:37
msgid "损失相关配置"
msgstr "Loss related settings"
#: ../../source/moe.rst:46
msgid ""
"在top1gating和top2gating门控策略中不同的专家处理的标记数量存在差异。为了提高模型效果应尽量保证输入标记被均匀地路由到不同的专家上。"
"InternLM采用 `GShard <https://arxiv.org/pdf/2006.16668.pdf>`_ 提出的负载平衡损失优化门控策略。 "
"Moe_loss_coeff项决定着负载平衡损失项将如何添加到最终的损失项中 :math:`l=l_{nll}+k·l_{moe}` )。"
"关于该部分的详细信息可以进一步参考 `GShard <https://arxiv.org/pdf/2006.16668.pdf>`_。"
msgstr ""
"In top1gating and top2gating strategies, the number of tokens to process may be different for different experts. "
"In order to improve the model effect, the input tokens should be evenly routed to different experts. "
"InternLM adopts the balancing loss to optimize the gating network proposed by GShard. "
"The moe_loss_coeff determines how the balancing loss should be added to the final loss ( :math:`l=l_{nll}+k·l_{moe}` ). "
"The details can be found in `GShard <https://arxiv.org/pdf/2006.16668.pdf>`_. "
#: ../../source/moe.rst:49
msgid "注意:这些参数需要和其他参数一起使用,具体请参考 :doc:`/usage` “训练配置”相关章节的内容。"
msgstr "Note: These parameters need to be used together with other parameters, please refer to :doc:`/usage`: Training Configuration"
#: ../../source/moe.rst:52
msgid "模型训练"
msgstr "Model Training"
#: ../../source/moe.rst:54
msgid ""
"internlm.model.modeling_moe提供了一个标准的混合专家模型的实现该模型的网络结构和图 :ref:`switch_transformer` "
"一致其中使用到internlm.model.moe.MoE实现MoE网络。用户在配置文件中指定模型类型"
msgstr ""
"internlm.model.modeling_moe provides an implementation of a standard MoE. "
"The model structure is consistent with Figure :ref:`switch_transformer` ,"
" which uses internlm.model.moe.MoE to implement the MoE network. "
"To use moe model, specify the model type in the configuration file:"
#: ../../source/moe.rst:60
msgid "并配置好稀疏专家网络的相关参数后就可以像正常启动InternLM一样进行混合专家模型的分布式训练具体请参考 :doc:`/usage` “启动训练”相关章节的内容。"
msgstr ""
"After configuring the relevant parameters of the sparse MoE, "
"the distributed training can start as the normal training process. please refer to :doc:`/usage`: Start Training"
#: internlm.model.moe.MoE:1 of
msgid "Initialize an MoE layer."
msgstr ""
#: internlm.model.moe.MoE of
msgid "参数"
msgstr "parameter"
#: internlm.model.moe.MoE:3 of
msgid ""
"the hidden dimension of the model, importantly this is also the input and"
" output dimension."
msgstr ""
#: internlm.model.moe.MoE:5 of
msgid "default=1, the total number of experts per layer."
msgstr ""
#: internlm.model.moe.MoE:7 of
msgid "default=1, number of ranks in the expert parallel world or group."
msgstr ""
#: internlm.model.moe.MoE:9 of
msgid "default=1, top-k gating value, only supports k=1 or k=2."
msgstr ""
#: internlm.model.moe.MoE:11 of
msgid "default=1.0, the capacity of the expert at training time."
msgstr ""
#: internlm.model.moe.MoE:13 of
msgid "default=1.0, the capacity of the expert at eval time."
msgstr ""
#: internlm.model.moe.MoE:15 of
msgid ""
"default=4, the minimum capacity per expert regardless of the "
"capacity_factor."
msgstr ""
#: internlm.model.moe.MoE:17 of
msgid ""
"default=None, noisy gate policy, valid options are 'Jitter', 'RSample' or"
" 'None'."
msgstr ""
#: internlm.model.moe.MoE:20 of
msgid "default=True, whether to use the default MoE layer."
msgstr ""
#: internlm.model.moe.MoE:22 of
msgid ""
"default=True, whether to drop tokens - (setting to False is equivalent to"
" infinite capacity)."
msgstr ""
#: internlm.model.moe.MoE:25 of
msgid "default=True, whether to use Random Token Selection."
msgstr ""
#: internlm.model.moe.MoE:27 of
msgid ""
"default=False, make this MoE layer a Residual MoE "
"(https://arxiv.org/abs/2201.05596) layer."
msgstr ""
#: internlm.model.moe.MoE:30 of
msgid "default=None, the torch module that defines the residual MLP."
msgstr ""
#: ../../source/moe.rst:64
msgid "注意InternLM支持用户定义自己的MoE结构。internlm.model.moe.MoE是定义MoE网络的接口目前使用SwiGLU网络实现了专家模型并支持top1gating和top2gating两种门控策略。用户可以在MoE接口中对专家网络和门控策略进行扩展。"
msgstr ""
"Note: InternLM supports users to define their own MoE structure. "
"internlm.model.moe.MoE is the interface that defines the MoE network. "
"Currently, the SwiGLU network is used to implement the experts and "
"supports two gating strategies: top1gating and top2gating. Users can "
"extend the expert network and gating strategy in the MoE interface as needed."

View File

@ -39,7 +39,7 @@ CheckpointManager
load_ckpt_folder=dict(path="local:/mnt/mfs/ckpt", content=["all",], ckpt_type="internlm"),
auto_resume=False, # disable auto-resume, internlm will load model checkpoint from the path of 'load_ckpt_folder'.
checkpoint_every=CHECKPOINT_EVERY,
async_upload=True, # async ckpt upload. (only work for boto3 ckpt)
async_upload=True, # async ckpt upload. (only work for boto3 and volc ckpt)
async_upload_tmp_folder="/dev/shm/internlm_tmp_ckpt/", # path for temporarily files during asynchronous upload.
oss_snapshot_freq=int(CHECKPOINT_EVERY / 2), # snapshot ckpt save frequency.
)
@ -67,7 +67,9 @@ InternLM对config中出现的所有存储路径都遵循以下的路径格式约
1. 如果需要使用boto3的路径需要在运行前提前导入 ``S3_ACCESS_KEY_ID````S3_SECRET_ACCESS_KEY_ID`` 这两个环境变量。
2. bucket的endpoint一般分为Inside IP和Outside IP如果可以尽量使用inside IP会获得更佳的存储速度。
2. 如果需要使用volc的路径需要在运行前提前导入 ``VOLC_ACCESS_KEY_ID````VOLC_SECRET_ACCESS_KEY_ID`` 这两个环境变量。
3. bucket的endpoint一般分为Inside IP和Outside IP如果可以尽量使用inside IP会获得更佳的存储速度。
@ -114,7 +116,7 @@ config.ckpt 中相关的参数:
- ``async_upload_tmp_folder``: 异步上传临时存储路径。参数类型 ``str/None``, 默认值为 ``/dev/shm/{JOB_NAME}_tmp_ckpt/``
需要注意的是异步上传功能仅在backend为boto3时才会有效果bcakend为local时只支持同步存储。
需要注意的是异步上传功能仅在backend为boto3或volc时才会有效果bcakend为local时只支持同步存储。
``async_upload_tmp_folder`` 设置的的原则为尽量设置为计算节点的local目录这样才可以获得最佳的异步上传速度一般来说建议为 ``/dev/shm````/nvme`` 下的路径,如果使用同步上传,则该路径可不给。

View File

@ -9,6 +9,8 @@
import os
import sys
import torch # noqa # pylint: disable=unused-import
project = "InternLM"
copyright = "2023, InternLM Team"
author = "InternLM Team"
@ -94,6 +96,10 @@ autodoc_mock_imports = [
"apex",
"torch",
"numpy",
"flash_attn",
"rotary_emb",
"einops",
"torch_scatter",
]
# support multi-language docs

View File

@ -55,6 +55,14 @@ InternLM
mixed_precision
混合专家模型
-------------------
.. toctree::
:maxdepth: 2
moe
模型备份
--------------------

View File

@ -1,5 +1,5 @@
混合精度
-----------------
============
混合精度是指在模型训练的过程中同时使用16位和32位浮点数类型是一种在最小化精度损失的前提下加速模型训练的方法。
混合精度通过让模型的某些部分使用32位浮点数以保持数值稳定性并在其余部分利用半精度浮点数加速训练并可以减少内存使用在评估指标如准确率方面仍可以获得同等的训练效果。
@ -22,10 +22,10 @@ InternLM默认将模型转换为16位浮点数类型进行训练在配置文
super().__init__()
self.linear1 = nn.Linear(4, 1, bias=False)
self.linear2 = nn.Linear(1, 4, bias=False)
# set model.linear2 as fp32 module
set_fp32_attr_to_module(model.linear2)
model = MlpModel()
# set model.linear2 as fp32 module
set_fp32_attr_to_module(model.linear2)
# apply mixed precision
model = NaiveAMPModel(
@ -78,4 +78,3 @@ InternLM支持使用TF32训练模型允许用户在config文件中将 ``dtype
torch.backends.cudnn.allow_tf32 = True
torch.backends.cuda.matmul.allow_tf32 = True

View File

@ -0,0 +1,65 @@
混合专家模型
==============
混合专家模型Mixture-of-Experts, MoE是一种特殊的模型结构。
混合专家模型将模型拆分为一系列称为“专家”的子模型,每个“专家” 具有唯一的权重。
混合专家模型可以针对每个输入标记仅激活一个或少量的专家参与运算。
例如,图 :ref:`switch_transformer``Switch Transformer <https://arxiv.org/pdf/2101.03961.pdf>`_ 提出的稀疏混合专家模型结构其中的前向神经网络FFN被分解为多个子网络在计算时仅有少部分的模型参数参与计算以实现更有效的计算和资源分配。
稀疏混合专家模型通常还包含一个门控gating机制例如图 :ref:`switch_transformer` 中的Router网络。门控网络负责选择激活哪些专家参与计算并组合不同专家的预测结果。
.. _switch_transformer:
.. figure:: ../../imgs/switch_transformer.png
:scale: 40%
:class: with-border
:align: center
switch transformer
参数配置
----------------
如果在启动训练时要使用混合专家模型,可进行如下相关配置:
1. 模型相关配置
.. code-block:: python
model = dict(
num_experts=16,
moe_gate_k=1,
)
* num_experts专家网络个数。在InternLM中每个专家有着相同的网络结构但维护着不同的训练参数。
* moe_gate_k门控策略。决定如何将输入标记路由到不同的专家进行计算。目前InternLM支持top1gating和top2gating两种门控策略。关于这些门控策略的详细的信息可以参考 `GShard <https://arxiv.org/pdf/2006.16668.pdf>`_
注意在目前的InternLM中每个专家都是根据配置文件中HIDDEN_SIZE和MLP_RATIO构造的一个 `SwiGLU网络 <https://arxiv.org/pdf/2002.05202.pdf>`_,同时支持张量并行。用户可以根据需要构造自己的专家网络。
2. 损失相关配置
.. code-block:: python
loss = dict(
moe_loss_coeff=0.1,
)
在top1gating和top2gating门控策略中不同的专家处理的标记数量存在差异。为了提高模型效果应尽量保证输入标记被均匀地路由到不同的专家上。InternLM采用 `GShard <https://arxiv.org/pdf/2006.16668.pdf>`_ 提出的负载平衡损失优化门控策略。
Moe_loss_coeff项决定着负载平衡损失项将如何添加到最终的损失项中 :math:`l=l_{nll}+k·l_{moe}` )。关于该部分的详细信息可以进一步参考 `GShard <https://arxiv.org/pdf/2006.16668.pdf>`_
注意:这些参数需要和其他参数一起使用,具体请参考 :doc:`/usage` “训练配置”相关章节的内容。
模型训练
----------------
internlm.model.modeling_moe提供了一个标准的混合专家模型的实现该模型的网络结构和图 :ref:`switch_transformer` 一致其中使用到internlm.model.moe.MoE实现MoE网络。用户在配置文件中指定模型类型
.. code-block:: python
model_type = "INTERNLM_MoE"
并配置好稀疏专家网络的相关参数后就可以像正常启动InternLM一样进行混合专家模型的分布式训练具体请参考 :doc:`/usage` “启动训练”相关章节的内容。
.. autoclass:: internlm.model.moe.MoE
注意InternLM支持用户定义自己的MoE结构。internlm.model.moe.MoE是定义MoE网络的接口目前使用SwiGLU网络实现了专家模型并支持top1gating和top2gating两种门控策略。用户可以在MoE接口中对专家网络和门控策略进行扩展。

Binary file not shown.

Before

Width:  |  Height:  |  Size: 153 KiB

After

Width:  |  Height:  |  Size: 212 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 59 KiB

View File

@ -1,4 +1,5 @@
from .parallel_context import (
IS_SEQUENCE_PARALLEL,
IS_TENSOR_PARALLEL,
Config,
ParallelContext,
@ -29,6 +30,7 @@ from .random import (
__all__ = [
"Config",
"IS_TENSOR_PARALLEL",
"IS_SEQUENCE_PARALLEL",
"global_context",
"ParallelContext",
"ParallelMode",

View File

@ -25,6 +25,7 @@ from .process_group_initializer import ParallelMode
from .random import add_seed, get_seeds, set_mode
IS_TENSOR_PARALLEL = "is_tensor_parallel"
IS_SEQUENCE_PARALLEL = "is_sequence_parallel"
logger = get_logger(__file__)
@ -329,7 +330,8 @@ class ParallelContext(metaclass=SingletonMeta):
return self.is_last_rank(ParallelMode.PIPELINE)
def is_no_pp_or_last_stage(self):
return not self.is_initialized(ParallelMode.PIPELINE) or self.is_pipeline_last_stage()
# NOTICE!!!, this will ignore virutal stage
return not self.is_initialized(ParallelMode.PIPELINE) or self.is_last_rank(ParallelMode.PIPELINE)
def get_world_size(self, parallel_mode: ParallelMode):
"""Returns the world size for `parallel_mode`.

View File

@ -977,11 +977,12 @@ class InterleavedPipelineScheduler(PipelineScheduler):
if gpc.is_pipeline_last_stage():
output_obj = None
assert output_obj is None or output_obj.dtype == self.dtype
# Send and receive tensors as appropriate (send tensors computed
# in this iteration; receive tensors for next iteration).
if k != (num_warmup_microsteps - 1) or not receive_extra_backward:
# Normal warm-up communication process, or no need to prepare backward input for the 1F1B stage
assert output_obj.dtype == self.dtype
input_obj = comm.send_forward_recv_forward(
output_obj,
input_shape,
@ -993,7 +994,6 @@ class InterleavedPipelineScheduler(PipelineScheduler):
if self._communication_overlap:
# In this case, we should handle forward and backward communication separately, consistent with the
# overlap version of the 1F1B stage
assert output_obj.dtype == self.dtype
input_obj = comm.send_forward_recv_forward(
output_obj,
input_shape,
@ -1010,7 +1010,6 @@ class InterleavedPipelineScheduler(PipelineScheduler):
else:
# In this case, we should handle forward and backward communication together, consistent with the
# non-overlap version of the 1F1B stage
assert output_obj.dtype == self.dtype
input_obj, output_obj_grad = comm.send_forward_backward_recv_forward_backward(
output_obj,
None, # no backward grad to send
@ -1082,6 +1081,7 @@ class InterleavedPipelineScheduler(PipelineScheduler):
else:
input_obj_shape = self._input_obj_shapes[next_forward_chunk_id]
assert output_obj is None or output_obj.dtype == self.dtype
forward_async_communicator = comm.AsynCommunicator(
output_obj,
input_obj_shape,
@ -1203,7 +1203,7 @@ class InterleavedPipelineScheduler(PipelineScheduler):
output_shape = self._output_obj_shapes[next_backward_chunk_id] if recv_next else None
# Communicate objs.
assert output_obj.dtype == self.dtype
assert output_obj is None or output_obj.dtype == self.dtype
input_obj, output_obj_grad = comm.send_forward_backward_recv_forward_backward(
output_obj,
input_obj_grad,

View File

@ -365,7 +365,7 @@ def args_sanity_check():
assert (
not optim_ckpt.overlap_sync_grad & optim_ckpt.overlap_sync_param
), "not support overlap and moe at the same time"
assert gpc.config.parallel.zero1 == -1, "moe only support zero1, set zero1=-1 can fix this"
assert gpc.config.parallel.zero1.size == -1, "moe only support zero1, set zero1=dict(size=-1,...) can fix this"
def launch(

View File

@ -9,7 +9,7 @@ from flash_attn.modules.embedding import ParallelGPT2Embeddings
from flash_attn.modules.mlp import ParallelFusedMLP
from torch import nn
from internlm.core.context import IS_TENSOR_PARALLEL, ParallelMode
from internlm.core.context import IS_SEQUENCE_PARALLEL, IS_TENSOR_PARALLEL, ParallelMode
from internlm.core.context.parallel_context import global_context as gpc
from internlm.initialize.initialize_tensor import normal_, scaled_init_method_normal
from internlm.model.embedding import Embedding1D
@ -142,6 +142,12 @@ class PackedFlashBaseLayer1D(nn.Module):
for _, param in self.mlp.named_parameters():
if gpc.get_world_size(ParallelMode.TENSOR) > 1:
setattr(param, IS_TENSOR_PARALLEL, True)
for param in self.norm1.parameters():
if gpc.config.parallel.sequence_parallel is True:
setattr(param, IS_SEQUENCE_PARALLEL, True)
for param in self.norm2.parameters():
if gpc.config.parallel.sequence_parallel is True:
setattr(param, IS_SEQUENCE_PARALLEL, True)
self.dropout2 = nn.Dropout(drop_rate)
self.use_swiglu = use_swiglu
@ -374,6 +380,10 @@ class PackedFlashInternLm1D(nn.Module):
normal_(std=0.0052)(param)
if gpc.get_world_size(ParallelMode.TENSOR) > 1:
setattr(param, IS_TENSOR_PARALLEL, True)
for param in self.norm.parameters():
if gpc.config.parallel.sequence_parallel is True:
setattr(param, IS_SEQUENCE_PARALLEL, True)
self.parallel_output = parallel_output
def forward(self, hidden_states=None, cu_seqlens=None, input_ids=None, indexes=None, inference_params=None):

View File

@ -18,7 +18,6 @@ class MoE(torch.nn.Module):
Arguments:
hidden_size (int): the hidden dimension of the model, importantly this is also the input and output dimension.
expert (torch.nn.Module): the torch module that defines the expert (e.g., MLP, torch.linear).
num_experts (int, optional): default=1, the total number of experts per layer.
ep_size (int, optional): default=1, number of ranks in the expert parallel world or group.
k (int, optional): default=1, top-k gating value, only supports k=1 or k=2.
@ -26,10 +25,10 @@ class MoE(torch.nn.Module):
eval_capacity_factor (float, optional): default=1.0, the capacity of the expert at eval time.
min_capacity (int, optional): default=4, the minimum capacity per expert regardless of the capacity_factor.
noisy_gate_policy (str, optional): default=None, noisy gate policy, valid options are 'Jitter', 'RSample'
or 'None'.
or 'None'.
using_default_moe (bool, optional): default=True, whether to use the default MoE layer.
drop_tokens (bool, optional): default=True, whether to drop tokens - (setting to False is equivalent to
infinite capacity).
infinite capacity).
use_rts (bool, optional): default=True, whether to use Random Token Selection.
moe_use_residual (bool, optional): default=False, make this MoE layer a Residual MoE
(https://arxiv.org/abs/2201.05596) layer.
@ -73,7 +72,6 @@ class MoE(torch.nn.Module):
gpc.expert_parallel_group_names.append(expert_group_name)
experts = torch.nn.ModuleList(
[
# TODO have trouble when use internlm.model.linear.FeedForward
FeedForward(
hidden_size,
int(hidden_size * gpc.config.model.mlp_ratio),

View File

@ -9,7 +9,7 @@ from internlm.core.context import global_context as gpc
from internlm.monitor.alert import send_feishu_msg_with_webhook
from internlm.utils.common import SingletonMeta
from .utils import get_job_key, set_env_var
from .utils import get_job_key, set_env_var, try_import_send_exception
def send_alert_message(address: str = None, title: str = None, message: str = None):
@ -132,6 +132,7 @@ class MonitorManager(metaclass=SingletonMeta):
self.monitor_thread = None
self.loss_spike_limit = loss_spike_limit
self.last_step_loss = -1
self.send_exception = try_import_send_exception()
def monitor_loss_spike(self, alert_address: str = None, step_count: int = 0, cur_step_loss: float = 0.0):
"""Check loss value, if loss spike occurs, send alert message to Feishu."""
@ -154,6 +155,8 @@ class MonitorManager(metaclass=SingletonMeta):
format_trace = ""
for line in filtered_trace:
format_trace += "\n" + line
if self.send_exception:
self.send_exception(format_trace, gpc.get_global_rank())
send_alert_message(
address=alert_address,
message=f"Catch Exception from {socket.gethostname()} with rank id {gpc.get_global_rank()}:{format_trace}",
@ -165,9 +168,12 @@ class MonitorManager(metaclass=SingletonMeta):
def sigterm_handler(sys_signal, frame):
print("receive frame: ", frame)
print("receive signal: ", sys_signal)
message = f"Process received signal {signal} and exited."
if self.send_exception:
self.send_exception(message, gpc.get_global_rank())
send_alert_message(
address=alert_address,
message=f"Process received signal {signal} and exited.",
message=message,
)
signal.signal(signal.SIGTERM, sigterm_handler)

View File

@ -32,3 +32,16 @@ def get_job_name():
def get_job_key():
return f"{get_job_id()}_{get_job_name()}"
def try_import_send_exception():
"""
Try import send_exception from uniscale_monitoring, if failed, return None
"""
try:
from uniscale_monitoring import send_exception_msg as send_exception
return send_exception
except ImportError:
return None

View File

@ -79,6 +79,10 @@ class FSDPadaptOptimizer(BaseOptimizer):
def _compute_norm_with_fsdp_flatten(self, group_id):
params = [p for p in self._fp16_param_groups[group_id] if p.untyped_storage().size() != 0]
gradients = [p.grad for p in params if p.untyped_storage().size() != 0]
norm_group = 0
if len(params) <= 0 or len(gradients) <= 0:
return norm_group
norm_group = compute_norm(gradients=gradients, parameters=params, last_stage=True)
return norm_group
@ -126,6 +130,8 @@ class FSDPadaptOptimizer(BaseOptimizer):
# create gradient for fp32 params
for group_idx in range(len(self.param_groups)):
if len(self._fp32_param_tensor_groups[group_idx]) <= 0:
continue
dtype = self._fp32_param_tensor_groups[group_idx][0].dtype
fp16_params = [p for p in self._fp16_param_groups[group_idx] if p.untyped_storage().size() != 0]
grad_fp32 = [p.grad.to(dtype) for p in fp16_params]

View File

@ -9,7 +9,7 @@ import torch
import torch.distributed as dist
from torch.optim import Optimizer
from internlm.core.context import Config, ParallelMode
from internlm.core.context import IS_SEQUENCE_PARALLEL, Config, ParallelMode
from internlm.core.context import global_context as gpc
from internlm.monitor import send_alert_message
from internlm.solver.optimizer.store import (
@ -35,7 +35,7 @@ from internlm.utils.megatron_timers import megatron_timer as timer
from internlm.utils.timeout import llm_timeout
from .base_optimizer import BaseOptimizer
from .utils import compute_norm
from .utils import compute_layer_norm, compute_norm, compute_param_norm
inf = math.inf
logger = get_logger(__file__)
@ -309,6 +309,14 @@ class HybridZeroOptimizer(BaseOptimizer):
param=param,
reduce_rank=reduce_rank,
)
def reduction_sp_func():
handle = reduce_tensor(
param.grad,
dtype=None,
dst_rank=reduce_rank,
parallel_mode=ParallelMode.TENSOR,
)
handle.wait()
# define hook
# NOT IMPORTANT BUT GOOD TO KNOW:
@ -320,6 +328,18 @@ class HybridZeroOptimizer(BaseOptimizer):
if self.skip_grad_reduce is False:
reduction_func()
# define hook for sequence_parallel
def reduce_grad_hook_sp(*args): # pylint: disable=W0613
if self.skip_grad_reduce is False:
reduction_sp_func()
# if sequence_parallel is True,
# the grad of norm should be all-reduce across the tp process group
if gpc.config.parallel.sequence_parallel is True:
if hasattr(param, IS_SEQUENCE_PARALLEL) and getattr(param, IS_SEQUENCE_PARALLEL) is True:
accum_grad_obj_sp = get_grad_accumulate_object(param)
accum_grad_obj_sp.register_hook(reduce_grad_hook_sp)
accum_grad_obj.register_hook(reduce_grad_hook)
_define_and_attach(param, reduce_rank)
@ -569,6 +589,29 @@ class HybridZeroOptimizer(BaseOptimizer):
return norm
def _compute_param_norm_stage(
self, group_id: int = 0, last_bucket: bool = False, last_stage: bool = False, previous_param_norms=None
):
# compute norm for gradients that have been reduced
params, grads = self._param_store.get_reduced_param_for_compute_norm(group_id=group_id, last_bucket=last_bucket)
total_param_norms = {}
if len(params) == 0:
dtype = self.param_groups[group_id]["dtype"]
grads = [self.padding_grad.to(dtype)]
params = [self.padding_tensor.to(dtype)]
if self._clip_grad_norm > 0:
total_param_norms = compute_param_norm(
grads,
params,
last_stage=last_stage,
previous_param_norms=previous_param_norms,
zero_mode=self._broadcast_parallel_mode[group_id],
is_moe_group=self._is_moe_group(self.optim.param_groups[group_id]),
)
return total_param_norms
@llm_timeout(func_name="optim_step")
def step(self, closure=None):
"""Performs a single optimization step.
@ -600,8 +643,11 @@ class HybridZeroOptimizer(BaseOptimizer):
# compute norm for gradients in the before bucket
groups_norms = []
groups_param_norms = []
for group_id in range(self.num_param_groups):
groups_norms.append(self._compute_norm_with_stage(group_id=group_id))
if gpc.config.get("grad_norm_profiling", False):
groups_param_norms.append(self._compute_param_norm_stage(group_id=group_id))
# clear reduced grads
# grads in the last bucket is reduced
@ -613,6 +659,8 @@ class HybridZeroOptimizer(BaseOptimizer):
self._param_store.clear_grads_of_previous_reduced_params()
# compute norm for gradients in the last bucket
total_norms = {}
total_param_norms = {}
total_layer_norms = {}
for group_id in range(self.num_param_groups):
group_name = self.param_groups[group_id]["name"] if "name" in self.param_groups[group_id] else "default"
group_name = f"{group_id}_{group_name}"
@ -622,6 +670,16 @@ class HybridZeroOptimizer(BaseOptimizer):
last_stage=True,
previous_norm=groups_norms[group_id],
)
if gpc.config.get("grad_norm_profiling", False):
param_norms = self._compute_param_norm_stage(
group_id=group_id,
last_bucket=True,
last_stage=True,
previous_param_norms=groups_param_norms[group_id],
)
total_layer_norms[group_name], total_param_norms[group_name] = compute_layer_norm(
param_norms=param_norms, loss_scale=self.loss_scale.item()
)
# Need to allreduce(avg) the norms across different ranks because moe params will not be synced
# during allreduce
@ -636,9 +694,12 @@ class HybridZeroOptimizer(BaseOptimizer):
self._sync_grad()
timer("sync_grad").stop()
res = self._step(closure=closure, norms=total_norms)
state, global_norms = self._step(closure=closure, norms=total_norms)
if gpc.config.get("grad_norm_profiling", False):
global_norms["layer_norms"] = total_layer_norms
global_norms["param_norms"] = total_param_norms
return res
return state, global_norms
def _step(self, closure=None, norms=None):
assert closure is None, "closure is not supported by step()"

View File

@ -15,7 +15,7 @@ from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
from internlm.core.context import ParallelMode
from internlm.core.context import global_context as gpc
from internlm.core.naive_amp import NaiveAMPModel
from internlm.utils.common import get_tensor_norm, move_norm_to_cuda
from internlm.utils.common import get_current_device, get_tensor_norm, move_norm_to_cuda
from internlm.utils.logger import get_logger
from internlm.utils.parallel import is_model_parallel_parameter
@ -209,6 +209,49 @@ def calc_lp(grads, norm_type):
return norm
def reduce_grads(gradients, parameters, fine_grained=False):
parallel_grads = []
if fine_grained:
parallel_grads = {}
def append_grad(g, p):
if fine_grained:
param_name = p.param_name if hasattr(p, "param_name") else "unknown-padding"
if param_name not in parallel_grads:
parallel_grads[param_name] = []
parallel_grads[param_name].append(g.data.float())
else:
parallel_grads.append(g.data.float())
for g, p in zip(gradients, parameters):
# TODO: consider the pipeline shared parameter
if (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
and dist.get_rank(p.pipeline_shared_module_pg) == 0
): # if shared between different pipe, only count o
append_grad(g, p)
elif (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
and dist.get_rank(p.pipeline_shared_module_pg) != 0
):
continue
elif (
gpc.is_initialized(ParallelMode.TENSOR)
and not is_model_parallel_parameter(p)
and gpc.get_local_rank(ParallelMode.TENSOR) == 0
): # if not used in each chunk, such as layernorm
append_grad(g, p)
elif is_model_parallel_parameter(p):
append_grad(g, p)
elif gpc.get_local_rank(ParallelMode.TENSOR) != 0:
continue
else:
raise RuntimeError("Should not arrive here")
return parallel_grads
def compute_norm(
gradients, parameters, last_stage=False, previous_norm=None, norm_type=2, zero_mode=ParallelMode.ZERO1
):
@ -247,33 +290,7 @@ def compute_norm(
)
total_norm = total_norm_cuda[0].item()
else:
tensor_parallel_grads = []
for g, p in zip(gradients, parameters):
# TODO: consider the pipeline shared parameter
if (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
and dist.get_rank(p.pipeline_shared_module_pg) == 0
): # if shared between different pipe, only count o
tensor_parallel_grads.append(g.data.float())
elif (
gpc.is_initialized(ParallelMode.PIPELINE)
and hasattr(p, "pipeline_shared_module_pg")
and dist.get_rank(p.pipeline_shared_module_pg) != 0
):
continue
elif (
gpc.is_initialized(ParallelMode.TENSOR)
and not is_model_parallel_parameter(p)
and gpc.get_local_rank(ParallelMode.TENSOR) == 0
): # if not used in each chunk, such as layernorm
tensor_parallel_grads.append(g.data.float())
elif is_model_parallel_parameter(p):
tensor_parallel_grads.append(g.data.float())
elif gpc.get_local_rank(ParallelMode.TENSOR) != 0:
continue
else:
raise RuntimeError("Should not arrive here")
tensor_parallel_grads = reduce_grads(gradients, parameters)
if norm_type == 2.0 and enable_cuda_kernels:
tensor_parallel_norm = calc_l2_norm(tensor_parallel_grads) ** norm_type
@ -319,6 +336,124 @@ def compute_norm(
return total_norm
def compute_param_norm(
gradients,
parameters,
last_stage=False,
previous_param_norms=None,
norm_type=2,
zero_mode=ParallelMode.ZERO1,
is_moe_group=False,
):
"""Get the norm of params
Arguments:
gradients (Iterable[Tensor]): The gradient value.
parameters (Iterable[Tensor]): The parameter each gradient corresponds to.
norm_type (float or int): type of the used p-norm. Can be ``'inf'`` for
infinity norm.
Returns:
The norm of the parameters.
"""
enable_cuda_kernels = gradients[0].device.type == "cuda"
# Norm parameters.
norm_type = float(norm_type)
total_param_norms = {}
param_grads = reduce_grads(gradients, parameters, fine_grained=True)
param_norms = {}
for param_name, grads in param_grads.items():
if norm_type == inf:
param_norm = max(g.data.abs().max() for g in grads)
elif norm_type == 2.0 and enable_cuda_kernels:
param_norm = calc_l2_norm(grads) ** norm_type
else:
param_norm = calc_lp(grads, norm_type)
param_norms[param_name] = param_norm.item() if torch.is_tensor(param_norm) else param_norm
if last_stage is False:
return param_norms
if previous_param_norms is not None:
for key, value in previous_param_norms.items():
if key not in param_norms:
param_norms[key] = value
continue
if norm_type == inf:
param_norms[key] = max(param_norms[key], value)
else:
param_norms[key] += value
# model parallel
model_parallel_param_norms = {}
if gpc.is_initialized(ParallelMode.MODEL):
parallel_param_norms = [None for _ in range(gpc.get_world_size(ParallelMode.MODEL))]
dist.all_gather_object(parallel_param_norms, param_norms, group=gpc.get_group(ParallelMode.MODEL))
for local_param_norm in parallel_param_norms:
for param_name, param_norm in local_param_norm.items():
if param_name not in model_parallel_param_norms:
model_parallel_param_norms[param_name] = 0.0
if norm_type == inf:
model_parallel_param_norms[param_name] = max(model_parallel_param_norms[param_name], param_norm)
else:
model_parallel_param_norms[param_name] += param_norm
# zero parallel
zero_param_norms = [None for _ in range(gpc.get_world_size(zero_mode))]
dist.all_gather_object(zero_param_norms, model_parallel_param_norms, group=gpc.get_group(zero_mode))
for local_param_norm in zero_param_norms:
for param_name, param_norm in local_param_norm.items():
if param_name not in total_param_norms:
total_param_norms[param_name] = 0.0
if norm_type == inf:
total_param_norms[param_name] = max(total_param_norms[param_name], param_norm)
else:
total_param_norms[param_name] += param_norm
# moe
if is_moe_group:
pg = gpc.get_group(ParallelMode.EXPERT)
scaled_param_norm = torch.cuda.FloatTensor(list(total_param_norms.values()), device=get_current_device())
scaled_param_norm = scaled_param_norm / float(gpc.get_world_size(ParallelMode.EXPERT))
dist.all_reduce(scaled_param_norm, group=pg)
for i, param_name in enumerate(total_param_norms.keys()):
total_param_norms[param_name] = scaled_param_norm[i].item()
# scale
for param_name, param_norm in total_param_norms.items():
if param_norm in (inf, -inf):
total_param_norms[param_name] = -1
elif math.isnan(param_norm):
total_param_norms[param_name] = -2
return total_param_norms
def compute_layer_norm(param_norms, loss_scale):
"""
compute layer norm by parameter norms
"""
param_norms_groupby_layer = {}
layer_norms = {}
for param_name, param_norm in param_norms.items():
layer_name, param_key = param_name.split("-")
if layer_name not in param_norms_groupby_layer:
param_norms_groupby_layer[layer_name] = {}
if layer_name not in layer_norms:
layer_norms[layer_name] = 0.0
if param_norm not in (-1, -2):
param_norm = param_norm**0.5 / loss_scale
param_norms_groupby_layer[layer_name][param_key] = param_norm
layer_norms[layer_name] += param_norm
return layer_norms, param_norms_groupby_layer
class BaseGradScaler(ABC):
"""A base class for the gradient scaler.

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import copy
import functools
import time
from functools import partial
@ -53,7 +54,11 @@ from internlm.train.utils import create_param_groups
from internlm.utils.common import DummyProfile
from internlm.utils.logger import get_logger
from internlm.utils.megatron_timers import megatron_timer as timer
from internlm.utils.parallel import sync_model_param, sync_model_param_within_tp
from internlm.utils.parallel import (
set_model_params_layer_name,
sync_model_param,
sync_model_param_within_tp,
)
from internlm.utils.registry import MODEL_INITIALIZER
from internlm.utils.timeout import llm_timeout
@ -159,6 +164,10 @@ def initialize_optimizer(model: Union[nn.Module, nn.ModuleList]):
Returns:
A tuple of (optimizer, beta2_scheduler, lr_scheduler).
"""
if gpc.config.get("grad_norm_profiling", False):
# set the layer name as an attribute of the model parameters
set_model_params_layer_name(model)
if gpc.config.hybrid_zero_optimizer.overlap_sync_param:
param_bcast_sync_handler = ParamBcastSyncHandler(model)
else:
@ -528,6 +537,21 @@ def record_current_batch_training_metrics(
for key, value in acc_perplex.items():
infos[key] = value
if gpc.config.get("grad_norm_profiling", False):
layer_norms = copy.deepcopy(grad_norm["layer_norms"])
param_norms = copy.deepcopy(grad_norm["param_norms"])
for group_name, value in layer_norms.items():
if value:
title = f"laye_norm_group_{group_name}"
writer.add_scalars(key=title, value=value, step=train_state.step_count)
for group_name, layer_group in param_norms.items():
if layer_group:
for layer_name, param_group in layer_group.items():
title = f"param_norm_{layer_name}_{group_name}"
writer.add_scalars(key=title, value=param_group, step=train_state.step_count)
del grad_norm["layer_norms"]
del grad_norm["param_norms"]
line = ""
for key, value in infos.items():
line += f"{key}={value} "

View File

@ -556,6 +556,18 @@ def load_optimizer_checkpoint(folder, optim):
f"Please check whether loading ckpts are saved with the HybridZeroOptimizer."
)
# compatible with old code that only have one param group, need to align with both parameter groups
if len(states["base_optim_states"]["param_groups"]) == 1:
for group in optim.param_groups:
# for new added empty group, since it has no params, just create it fakely
if len(group["params"]) == 0:
states["base_optim_states"]["param_groups"].append(group)
# for origin group, create new added attributes in recent updates
else:
saved_group = states["base_optim_states"]["param_groups"][0]
saved_group["dp_mode"] = group["dp_mode"]
saved_group["dtype"] = group["dtype"]
optim.load_state_dict(states)
del states
torch.cuda.empty_cache()
@ -598,6 +610,10 @@ def load_scheduler(ckpt_path: str, lr_scheduler, optimizer, train_state: TrainSt
lr_scheduler.load_state_dict(scheduler_states)
lr_scheduler.last_epoch = train_state.step_count + 1
# compatible with old code that only have one param group
if len(base_lrs) == 1:
base_lrs = base_lrs * len(optimizer.param_groups)
ratios = [learning_rate / lr for lr in base_lrs]
for idx, param_group in enumerate(optimizer.param_groups):
param_group["lr"] = param_group["lr"] * ratios[idx]

View File

@ -2,9 +2,11 @@
# -*- encoding: utf-8 -*-
import torch.distributed as dist
from torch import nn
from internlm.core.context import IS_TENSOR_PARALLEL, ParallelMode
from internlm.core.context import global_context as gpc
from internlm.core.naive_amp import NaiveAMPModel
def is_model_parallel_parameter(p):
@ -61,3 +63,31 @@ def get_parallel_log_file_name():
f"tp={gpc.get_local_rank(ParallelMode.TENSOR)}_pp={gpc.get_local_rank(ParallelMode.PIPELINE)}"
)
return log_file_name
def set_model_params_layer_name(model):
r"""Set the layer name as an attribute of the model parameters.
Args:
model (:class:`torch.nn.Module`): A pyTorch model on whose parameters you check the consistency.
"""
if not isinstance(model, nn.ModuleList):
model = [model]
for _chunk in model:
if isinstance(_chunk, NaiveAMPModel):
_chunk = _chunk.model
# Create a unique layer name based on the block's class name and index
for _, children in _chunk.named_children():
if isinstance(children, nn.ModuleList):
for idx, block in enumerate(children):
for param_name, param in block.named_parameters():
layer_name = f"{block.__class__.__name__}Block{idx}"
layer_param_name = f"{layer_name}-{param_name}"
param.__setattr__("layer_name", layer_name)
param.__setattr__("param_name", layer_param_name)
else:
for param_name, param in children.named_parameters():
layer_name = f"{children.__class__.__name__}"
layer_param_name = f"{layer_name}-{param_name}"
param.__setattr__("layer_name", layer_name)
param.__setattr__("param_name", f"{layer_name}-{param_name}")

View File

@ -424,7 +424,9 @@ class SimpleMemoryProfiler:
layer_name, output.element_size() * output.nelement(), flush=False
)
def _activation_trace_hook_forward(self, chunk_id: int, model: Any, inputs: Any, output: torch.Tensor) -> None:
def _activation_trace_hook_forward(
self, chunk_id: int, model: Any, inputs: Any, output: Any # pylint: disable=W0613
) -> None:
"""
Hook function to trace the activation memory usage for a forward pass.
@ -437,7 +439,6 @@ class SimpleMemoryProfiler:
None
"""
del model, inputs
assert isinstance(output, torch.Tensor), f"invalid output type: {type(output)}"
if self._stoped:
return

View File

@ -25,6 +25,7 @@ from internlm.utils.logger import get_logger
try:
import boto3
import botocore
import tos
except ImportError:
pass
@ -32,6 +33,7 @@ except ImportError:
logger = get_logger(__file__)
boto3_url_re = re.compile(r"([^\.]+)\.([\d\.]+)")
volc_url_re = re.compile(r"^(.*?)\.(.*)$")
MB = 1024**2
@ -122,6 +124,47 @@ local_nvme_path: {self.local_nvme_path}"
return meta.client, meta.bucket_name, meta.file_path
class VolcMetaInfo:
"""Volc meta info for save/load etc."""
def __init__(
self,
is_async,
handler: StorageClient,
bucket_name: str,
endpoint: str,
region: str,
file_path: str,
async_upload_fn: callable,
local_nvme_path=None,
) -> None:
# all need info.
self.client = handler
self.bucket_name = bucket_name
self.file_path = file_path
# only save need info.
self.local_nvme_path = local_nvme_path
self.is_async = is_async
self.endpoint = endpoint
self.region = region
self.async_upload_fn = async_upload_fn
def __str__(self) -> str:
return f"is_async: {self.is_async}, bucket_name:{self.bucket_name}, endpoint:{self.endpoint}, \
region:{self.region}, local_nvme_path: {self.local_nvme_path}"
@staticmethod
def unpack_volc_save_meta(meta):
if meta.is_async:
return meta.client, meta.bucket_name, meta.file_path, meta.local_nvme_path
else:
return meta.client, meta.bucket_name, meta.file_path
@staticmethod
def unpack_volc_nosave_meta(meta):
return meta.client, meta.bucket_name, meta.file_path
class LocalMetaInfo:
"""Local meta info for save/load etc."""
@ -139,18 +182,22 @@ class LocalMetaInfo:
return (meta.file_path,)
def unpack_save_meta(meta: Union[Boto3MetaInfo, LocalMetaInfo]):
def unpack_save_meta(meta: Union[Boto3MetaInfo, VolcMetaInfo, LocalMetaInfo]):
if isinstance(meta, Boto3MetaInfo):
return Boto3MetaInfo.unpack_boto3_save_meta(meta)
elif isinstance(meta, VolcMetaInfo):
return VolcMetaInfo.unpack_volc_save_meta(meta)
elif isinstance(meta, LocalMetaInfo):
return LocalMetaInfo.unpack_local_save_meta(meta)
else:
raise ValueError(f"unkonwn meta info: {type(meta)}")
def unpack_nosave_meta(meta: Union[Boto3MetaInfo, LocalMetaInfo]):
def unpack_nosave_meta(meta: Union[Boto3MetaInfo, VolcMetaInfo, LocalMetaInfo]):
if isinstance(meta, Boto3MetaInfo):
return Boto3MetaInfo.unpack_boto3_nosave_meta(meta)
elif isinstance(meta, VolcMetaInfo):
return VolcMetaInfo.unpack_volc_nosave_meta(meta)
elif isinstance(meta, LocalMetaInfo):
return LocalMetaInfo.unpack_local_nosave_meta(meta)
else:
@ -170,6 +217,10 @@ def try_get_storage_backend(path: str):
if gpc.is_rank_for_log():
logger.warning(f"path: '{path}' not start with backend prefix, guess it is the backend of boto3.")
return "boto3", path
elif path.startswith("vc:"):
if gpc.is_rank_for_log():
logger.warning(f"path: '{path}' not start with backend prefix, guess it is the backend of volc.")
return "volc", path
else:
sre = path.split(":", maxsplit=1)
if len(sre) == 1:
@ -312,6 +363,143 @@ class Boto3Client(StorageClient):
raise NotImplementedError("boto3 not support delete_obj")
class VolcClient(StorageClient):
"""
VolcClient
"""
def __init__(
self,
endpoint: str,
region: str,
) -> None:
"""Volc object/file storage management class
Args:
access_key (str): Volc access key ID.
secret_key (str): Volc secret access key.
endpoint (str): Volc tos endpoint.
region (str): Volc tos region.
"""
super().__init__(tos)
try:
access_key = os.environ["VOLC_ACCESS_KEY_ID"]
secret_key = os.environ["VOLC_SECRET_ACCESS_KEY_ID"]
except KeyError as exc:
raise RuntimeError(
"Please set 'VOLC_ACCESS_KEY_ID' and 'VOLC_SECRET_ACCESS_KEY_ID'",
"using environment variable!",
) from exc
self.client = self.handler.TosClientV2(access_key, secret_key, endpoint, region)
@staticmethod
def sync_upload_fileobj(handler, bucket_name: str, fp: str, saved_obj=None, **kwargs):
assert saved_obj is not None, "saved_obj is None!"
try:
with io.BytesIO() as f:
torch.save(saved_obj, f, **kwargs)
f.seek(0)
handler.client.put_object(bucket_name, fp, content=f)
except handler.handler.exceptions.TosClientError as exc:
raise RuntimeError(
f"Volc Network Error: fail with client error, message:{exc.message}, cause: {exc.cause}"
) from exc
except handler.handler.exceptions.TosServerError as exc:
raise RuntimeError(
f"Volc Network Error: fail with server error, code: {exec.code}",
f"error with request id: {exec.request_id}",
f"error with message: {exec.message}",
f"error with http code: {exec.status_code}",
) from exc
@staticmethod
def load(handler, bucket_name: str, fp: str, **kwargs) -> Dict:
"""
Args:
fp (str): Path to save, eg. vc://opennlplab/model_weights/xxx/ddd.pt
"""
try:
object_stream = handler.client.get_object(bucket_name, fp)
buffer = io.BytesIO(object_stream.read())
states = torch.load(buffer, **kwargs)
except handler.handler.exceptions.TosClientError as exc:
raise RuntimeError(
f"Volc Network Error: fail with client error, message:{exc.message}, cause: {exc.cause}"
) from exc
except handler.handler.exceptions.TosServerError as exc:
raise RuntimeError(
f"Volc Network Error: fail with server error, code: {exec.code}",
f"error with request id: {exec.request_id}",
f"error with message: {exec.message}",
f"error with http code: {exec.status_code}",
) from exc
return states
@staticmethod
def assert_fp_exists(handler, bucket_name: str, fp: str): # pylint: disable=W0613
assert len(list(handler.client.list_objects_type2(bucket_name, prefix=fp).contents)) > 0, fp
@staticmethod
def is_fp_exists(handler, bucket_name: str, fp: str): # pylint: disable=W0613
re = handler.client.list_objects_type2(bucket_name, prefix=fp)
if hasattr(re, "contents"):
return len(list(re.contents)) > 0
else:
return False
@staticmethod
def get_fns(handler, bucket_name: str, fp: str):
if VolcClient.is_fp_exists(handler, bucket_name, fp):
folder_name_list = []
result = handler.client.list_objects_type2(bucket_name, prefix=fp)
if hasattr(result, "contents"):
for iterm in result.contents:
pth = iterm.key
folder_name_list.append(pth.split(fp, maxsplit=1)[1].strip("/").split("/", maxsplit=1)[0])
while result.is_truncated:
result = handler.client.list_objects_type2(
bucket_name, prefix=fp, continuation_token=result.next_continuation_token
)
if hasattr(result, "contents"):
for iterm in result.contents:
pth = iterm.key
folder_name_list.append(pth.split(fp, maxsplit=1)[1].strip("/").split("/", maxsplit=1)[0])
return list(set(folder_name_list))
else:
if gpc.is_rank_for_log():
logger.warning(f"'{fp}' not found!")
return None
@staticmethod
def async_upload_fileobj(handler, bucket_name: str, fp: str, local_nvme_path: str):
try:
handler.client.put_object_from_file(bucket_name, fp, local_nvme_path)
except handler.handler.exceptions.TosClientError as exc:
raise RuntimeError(
f"Volc Network Error: fail with client error, message:{exc.message}, cause: {exc.cause}"
) from exc
except handler.handler.exceptions.TosServerError as exc:
raise RuntimeError(
f"Volc Network Error: fail with server error, code: {exec.code}",
f"error with request id: {exec.request_id}",
f"error with message: {exec.message}",
f"error with http code: {exec.status_code}",
) from exc
except Exception as e:
raise e
@staticmethod
def delete_obj(handler, fp: str):
raise NotImplementedError("volc not support delete_obj")
class LocalClient(StorageClient):
"""
Storage Client for local NFS.
@ -388,8 +576,35 @@ def get_boto3_meta(fp: str, tmp_local_folder: str, is_async: bool) -> Boto3MetaI
)
def get_volc_meta(fp: str, tmp_local_folder: str, is_async: bool) -> VolcMetaInfo:
assert fp.startswith("vc://"), f"Path '{fp}' is not a volc url"
parts = fp.lstrip("vc://").split(os.path.sep)
match = volc_url_re.match(parts[0])
assert match is not None, f"url '{fp}' is not a valid volc url"
bucket_name, endpoint = match.group(1), match.group(2)
temp_part = endpoint.split(".")
endpoint = ".".join(temp_part[1:])
region = temp_part[1].split("-")
region = "-".join(region[1:])
if is_async:
tmp_step_file = get_tmp_file_name(tmp_local_folder, fp)
else:
tmp_step_file = None
return VolcMetaInfo(
is_async=is_async,
handler=None,
bucket_name=bucket_name,
endpoint=endpoint,
region=region,
file_path=os.path.sep.join(parts[1:]),
async_upload_fn=VolcClient.async_upload_fileobj,
local_nvme_path=tmp_step_file,
)
def get_local_meta(fp: str) -> LocalMetaInfo:
assert not fp.startswith("s3://"), f"Path '{fp}' is not a local path"
assert not fp.startswith("s3://") and not fp.startswith("vc://"), f"Path '{fp}' is not a local path"
return LocalMetaInfo(fp)
@ -430,10 +645,11 @@ class StorageManager(metaclass=SingletonMeta):
TODO: add a thread to poll the asynchronous storage state.
"""
BACKEND_TYPE = {"boto3", "local"}
BACKEND_TYPE = {"boto3", "local", "volc"}
BACKEND_INIT_METHOD = {
"boto3": Boto3Client,
"local": LocalClient,
"volc": VolcClient,
}
CLI_DICT = {}
@ -476,11 +692,12 @@ class StorageManager(metaclass=SingletonMeta):
logger.error(f'tmp_local_folder only have "{free_size}" GB free space, less then 100 GB!')
raise RuntimeError(f"Insufficient temporary storage space on {socket.gethostname()}")
def _get_client(self, path: str, async_mode: bool = False) -> Union[Boto3MetaInfo, LocalMetaInfo]:
def _get_client(self, path: str, async_mode: bool = False) -> Union[Boto3MetaInfo, VolcMetaInfo, LocalMetaInfo]:
"""
example:
local:/path/to/checkpoint
boto3:s3://model_weights/0331/120bi
volc:vc://model_weights/0331/120bi
Args:
path (str): _description_
@ -507,10 +724,29 @@ class StorageManager(metaclass=SingletonMeta):
the proxy may make boto3 unavailable or affect performance."
)
self.has_warning = True
elif backend == "volc":
meta_info = get_volc_meta(path, self.tmp_local_folder, async_mode)
backend_key = backend + ":" + meta_info.endpoint
init_args = (
meta_info.endpoint,
meta_info.region,
)
if (
"http_proxy" in os.environ
or "https_proxy" in os.environ
or "HTTP_PROXY" in os.environ
or "HTTPS_PROXY" in os.environ
):
if not self.has_warning and gpc.is_rank_for_log():
logger.warning(
"HTTP/HTTPS proxy is detected when using volc, incorrectly setting \
the proxy may make volc unavailable or affect performance."
)
self.has_warning = True
assert backend in StorageManager.BACKEND_TYPE, f"Unkown backend: {backend}"
# boto3 backend need special treatment.
# boto3 and volc backend need special treatment.
if backend_key not in StorageManager.CLI_DICT:
StorageManager.CLI_DICT.update({backend_key: StorageManager.BACKEND_INIT_METHOD[backend](*init_args)})
@ -527,11 +763,10 @@ class StorageManager(metaclass=SingletonMeta):
return meta.client.get_fns(*unpack_nosave_meta(meta))
def save(self, save_path: str, to_save_obj: Any, async_upload=None, **kwargs):
if async_upload is None:
async_upload = self.async_mode
if not save_path.startswith("boto3:"):
if not save_path.startswith("boto3:") and not save_path.startswith("volc:"):
async_upload = False
meta = self._get_client(save_path, async_upload)
@ -554,6 +789,7 @@ class StorageManager(metaclass=SingletonMeta):
def load(self, load_path: str, **kwargs) -> Any:
self.wait()
meta = self._get_client(path=load_path)
return meta.client.load(*unpack_nosave_meta(meta), **kwargs)
def delete_obj(self, fp: str):

View File

@ -42,8 +42,8 @@ def init_tb_writer(
# dir of the last task by 'make_launch_script.sh'.
# If we load ckpt, 'resume_tb_folder' will be overwritten as the
# reloaded 'train_state.resume_tb_folder'.s
if resume_tb_folder is not None:
assert len(resume_tb_folder) > 0 and resume_tb_folder != "/"
if resume_tb_folder is not None and len(resume_tb_folder) > 0:
assert resume_tb_folder != "/"
if not os.path.exists(resume_tb_folder):
logger.error(
f"Can't found resume_tb_folder{resume_tb_folder}, \

View File

@ -0,0 +1,323 @@
import multiprocessing as mp
import random
import numpy as np
import pytest
import torch
from torch import nn
from torch.testing import assert_close
import internlm
from internlm.core.context import ParallelMode
from internlm.core.context import global_context as gpc
from internlm.core.context.parallel_context import Config
from internlm.core.engine import Engine
from internlm.core.gradient_handler import PipelineSharedModuleGradientHandler
from internlm.core.scheduler import (
InterleavedPipelineScheduler,
PipelineScheduler,
SchedulerMetricHook,
)
from internlm.solver.pipeline_utils import partition_uniform
from internlm.train import initialize_optimizer
class MlpModel(nn.Module):
"""
Custom model
"""
def __init__(self, start, end, model_type=None):
super().__init__()
self.part = [start, end]
self.blocks = nn.ModuleList([nn.Linear(8, 8, bias=False) for lid in range(end - start)])
self.model_type = model_type
def forward(self, hidden_states=None, input_ids=None):
if self.model_type != "torch" and self.part[0] != 0:
input_ids = hidden_states
for i in range(self.part[1] - self.part[0]):
input_ids = self.blocks[i](input_ids)
return input_ids
class MyLoss(nn.Module):
"""
Custom loss
"""
def __init__(self):
super().__init__()
def forward(self, logits, labels):
loss = torch.nn.MSELoss(reduction="sum")
return loss(logits, labels)
config = Config(
dict(
gradient_handler=[dict(type="PipelineSharedModuleGradientHandler")],
parallel=dict(
zero1=dict(size=1, fsdp=False),
pipeline=dict(size=8, interleaved_overlap=False),
sequence_parallel=False,
tensor=1,
),
model_type="INTERNLM",
data=dict(seq_len=8, micro_num=16, micro_bsz=1, pack_sample_into_one=False, min_length=0, total_steps=9999),
model=dict(
dtype=torch.bfloat16,
num_chunks=2,
use_flash_attn=True,
),
resume_tb_folder="",
tensorboard_folder="",
alert_address=None,
monitor=dict(alert=dict(enable_feishu_alert=False, feishu_alert_address=None, light_monitor_address=None)),
grad_scaler=dict(
fp16=dict(
initial_scale=1,
min_scale=1,
growth_interval=1,
),
growth_factor=1.1,
backoff_factor=0.9,
max_scale=1,
hysteresis=1,
),
adam=dict(
lr=1e-4,
adam_beta1=0.9,
adam_beta2=0.95,
adam_beta2_c=0,
adam_eps=1e-8,
weight_decay=0.01,
),
hybrid_zero_optimizer=dict(
overlap_sync_grad=False,
overlap_sync_param=False,
reduce_bucket_size=512 * 1024 * 1024,
clip_grad_norm=1.0,
),
beta2_scheduler=dict(
init_beta2=0.95,
c=0,
cur_iter=-1,
),
lr_scheduler=dict(
total_steps=100,
init_steps=0,
warmup_ratio=0.01,
eta_min=1e-5,
last_epoch=-1,
),
)
)
def build_environment(rank, world_size):
import os
os.environ["RANK"] = str(rank)
os.environ["LOCAL_RANK"] = str(rank)
os.environ["WORLD_SIZE"] = str(world_size)
os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "33333"
torch.cuda.empty_cache()
# launcher="torch"
internlm.launch_from_torch(config=config, seed=1024)
def loose_close(a, b, dtype: torch.dtype = torch.float32):
if dtype is torch.float32:
rtol = 1.3e-6
atol = 1e-5
elif dtype is torch.bfloat16:
rtol = 2e-2
atol = 2e-2
if isinstance(a, torch.Tensor):
a = a.detach().to(dtype)
b = b.detach().to(dtype)
assert_close(a, b, rtol=rtol, atol=atol)
def seed_all(seed, cuda_deterministic=False):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
if cuda_deterministic: # slower, more reproducible
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
else:
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.benchmark = True
def _build_generic_model_1d(num_layers, num_chunks):
pipeline_size = gpc.get_world_size(ParallelMode.PIPELINE)
pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE)
all_parts = partition_uniform(num_layers, pipeline_size, num_chunks)
parts = all_parts[pipeline_rank]
if gpc.is_rank_for_log():
print(f"The layer sharding is {all_parts}.", flush=True)
models = []
for start, end in parts:
models.append(MlpModel(start, end).cuda())
torch.distributed.barrier()
if len(models) == 1:
model = models[0]
else:
model = nn.ModuleList(models)
return model
def exam_pipeline_parallel(args):
# init
rank, world_size, micro_num, num_chunks, interleaved_overlap = args
config.data.micro_num = micro_num
config.model.num_chunks = num_chunks
config.parallel.pipeline.interleaved_overlap = interleaved_overlap
build_environment(rank, world_size)
device = torch.device(f"cuda:{rank}")
dtype = config.model["dtype"]
# set seed
seed_all(1024)
# pp model
pp_model = _build_generic_model_1d(num_layers=32, num_chunks=num_chunks)
pp_model = pp_model.to(dtype)
# pp scheduler
scheduler_hooks = [
SchedulerMetricHook(skip=True),
]
seq_len = gpc.config.data.seq_len
gpc.config.NUM_MICRO_BATCHES = micro_num
communication_overlap = interleaved_overlap
if num_chunks == 1:
# noninterleaved pp
scheduler = PipelineScheduler(
data_process_func=None,
num_microbatches=micro_num,
dtype=dtype,
tensor_shape=[1, 8],
scatter_gather_tensors=False,
scheduler_hooks=scheduler_hooks,
)
else:
# interleaved pp
if micro_num < gpc.get_world_size(ParallelMode.PIPELINE):
try:
scheduler = InterleavedPipelineScheduler(
num_microbatches=micro_num,
num_chunks=gpc.config.model.num_chunks,
dtype=dtype,
tensor_shape=[1, 8],
scatter_gather_tensors=False,
scheduler_hooks=scheduler_hooks,
communication_overlap=communication_overlap,
)
except AssertionError:
return
else:
raise RuntimeError("Error: AssertionError should occur when micro_num < Pipeline parrallel world size")
else:
scheduler = InterleavedPipelineScheduler(
num_microbatches=micro_num,
num_chunks=gpc.config.model.num_chunks,
dtype=dtype,
tensor_shape=[1, 8],
scatter_gather_tensors=False,
scheduler_hooks=scheduler_hooks,
communication_overlap=communication_overlap,
)
# pp optimizer and engine
optimizer, beta2_scheduler, lr_scheduler = initialize_optimizer(model=pp_model)
engine = Engine(
model=pp_model,
optimizer=optimizer,
lr_scheduler=lr_scheduler,
beta2_scheduler=beta2_scheduler,
criterion=MyLoss().to(dtype),
gradient_handlers=[PipelineSharedModuleGradientHandler(model=pp_model, optimizer=optimizer)],
clip_grad_norm=gpc.config.hybrid_zero_optimizer.get("clip_grad_norm", 0.0),
)
scheduler.pre_processing(engine)
engine.train()
# create input
x_list = []
y_list = []
for _ in range(micro_num):
x_list.append(list(range(seq_len)))
y_list.append(list(range(seq_len)))
xs = torch.tensor(x_list).to(device).to(dtype)
yx = torch.tensor(y_list).to(device).to(dtype)
input_list = [{"input_ids": xs}, yx]
# pp forward and backward
output, _, loss = scheduler.forward_backward_step(
engine, input_list, forward_only=False, return_loss=True, return_output_label=True
)
engine.step()
# torch related
if gpc.is_last_rank(ParallelMode.PIPELINE):
torch_xs = torch.tensor(x_list).to(device).to(torch.float32)
torch_ys = torch.tensor(y_list).to(device).to(torch.float32)
torch_model = MlpModel(0, 32, "torch").to(device)
torch_optimizer = torch.optim.AdamW(
params=[{"params": torch_model.parameters(), "weight_decay": config.adam.weight_decay}],
lr=config.adam.lr,
betas=(config.adam.adam_beta1, config.adam.adam_beta2),
eps=config.adam.adam_eps,
)
# check output
torch_output = torch_model(input_ids=torch_xs) # pylint: disable=E1102
loose_close(torch_output, output, dtype=dtype)
torch_criterion = MyLoss().to(torch.float32)
torch_loss = torch_criterion(torch_output, torch_ys) / micro_num # pylint: disable=E1102
torch_loss.backward()
torch_optimizer.step()
# check loss
loose_close(torch_loss, loss[0], dtype=dtype)
@pytest.mark.parametrize("micro_num", [4, 8, 16])
@pytest.mark.parametrize("num_chunks", [1, 2, 4])
@pytest.mark.parametrize("interleaved_overlap", [True, False])
def test_pipeline_parallel(micro_num, num_chunks, interleaved_overlap):
ctx = mp.get_context("spawn")
with ctx.Pool(processes=8) as pool:
pool.map(
exam_pipeline_parallel,
[[rank, 8, micro_num, num_chunks, interleaved_overlap] for rank in range(8)],
)
pool.close()
pool.join()
if __name__ == "__main__":
pytest.main(["-s", "-q", "test_pipeline.py"])

View File

@ -16,7 +16,12 @@ from internlm.model.utils import gather_forward_split_backward
config = Config(
dict(
parallel=dict(zero1=1, pipeline=dict(size=1, interleaved_overlap=False), sequence_parallel=False, tensor=1),
parallel=dict(
zero1=dict(size=1, fsdp=False),
pipeline=dict(size=1, interleaved_overlap=False),
sequence_parallel=False,
tensor=1,
),
model_type="INTERNLM",
data=dict(seq_len=2048, micro_num=1, micro_bsz=1, pack_sample_into_one=False, min_length=0, total_steps=9999),
model=dict(

View File

@ -10,7 +10,7 @@ from torch.nn.parallel import DistributedDataParallel as DDP
from torch.testing import assert_close
import internlm
from internlm.core.context.parallel_context import Config
from internlm.core.context.parallel_context import Config, ParallelMode
from internlm.solver.optimizer import HybridZeroOptimizer
from internlm.solver.optimizer.utils import ParamBcastSyncHandler
@ -29,7 +29,12 @@ class MlpModel(nn.Module):
config = Config(
dict(
parallel=dict(zero1=1, pipeline=dict(size=1, interleaved_overlap=False), sequence_parallel=False, tensor=1),
parallel=dict(
zero1=dict(size=1, fsdp=False),
pipeline=dict(size=1, interleaved_overlap=False),
sequence_parallel=False,
tensor=1,
),
model_type="INTERNLM",
data=dict(seq_len=2048, micro_num=1, micro_bsz=1, pack_sample_into_one=False, min_length=0, total_steps=9999),
model=dict(
@ -103,14 +108,22 @@ def init_optimizer_grouped_parameters(check_group, model):
{
"params": list(model.parameters())[:2],
"weight_decay": config.adam.weight_decay,
"dp_mode": ParallelMode.DATA,
},
{
"params": list(model.parameters())[2:],
"weight_decay": config.adam.weight_decay,
"dp_mode": ParallelMode.DATA,
},
]
else:
optimizer_grouped_parameters = [{"params": model.parameters(), "weight_decay": config.adam.weight_decay}]
optimizer_grouped_parameters = [
{
"params": model.parameters(),
"weight_decay": config.adam.weight_decay,
"dp_mode": ParallelMode.DATA,
}
]
return optimizer_grouped_parameters
@ -137,7 +150,7 @@ def exam_hybrid_zero_optim_with_ddp(args):
# ParamBcastSyncHandler does not consider paramters in different optimizer group currently
if overlap_sync_param and check_group:
return
config.parallel.zero1 = zero_parallel
config.parallel.zero1.size = zero_parallel
config.hybrid_zero_optimizer.overlap_sync_param = overlap_sync_param
config.hybrid_zero_optimizer.overlap_sync_grad = overlap_sync_grad
config.data.micro_num = micro_num
@ -253,7 +266,7 @@ def exam_hybrid_zero_optim_with_ddp(args):
def exam_hybrid_zero_optim_with_ckpt_load_save(args):
# init
rank, world_size, zero_parallel, check_group, dtype = args
config.parallel.zero1 = zero_parallel
config.parallel.zero1.size = zero_parallel
config.parallel.dtype = dtype
build_environment(rank, world_size)

View File

@ -10,15 +10,18 @@ from internlm.core.context.parallel_context import Config
from internlm.solver.optimizer.hybrid_zero_optim import HybridZeroOptimizer
from internlm.utils.common import SingletonMeta
OSS_NAME = os.environ["OSS_BUCKET_NAME"]
OSS_IP = os.environ["OSS_IP"]
USER = os.environ["USER"]
OSS_NAME = os.environ.get("OSS_BUCKET_NAME")
OSS_IP = os.environ.get("OSS_IP")
USER = os.environ.get("USER")
JOB_NAME = "CI_TEST"
LOCAL_SAVE_PATH = "local:local_ckpt"
BOTO_SAVE_PATH = f"boto3:s3://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}"
BOTO_SAVE_PATH_NO_PRFIX = f"s3://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}/"
VOLC_SAVE_PATH = f"volc:vc://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}"
VOLC_SAVE_PATH_NO_PRFIX = f"vc://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}/"
ASYNC_TMP_FOLDER = "./async_tmp_folder"
@ -172,13 +175,25 @@ def del_tmp_file():
except FileNotFoundError:
pass
try:
cmd = r"/mnt/petrelfs/share/sensesync --dryrun --deleteSrc cp " + BOTO_SAVE_PATH_NO_PRFIX + " / "
with Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True) as output:
results, presults = "", ""
for line in iter(output.stdout.readline, b""):
results += str(line.rstrip())
presults += line.rstrip().decode() + "\n"
print(presults, flush=True)
except: # noqa # pylint: disable=bare-except
pass
if OSS_NAME is not None:
try:
cmd = r"/mnt/petrelfs/share/sensesync --dryrun --deleteSrc cp " + BOTO_SAVE_PATH_NO_PRFIX + " / "
with Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True) as output:
results, presults = "", ""
for line in iter(output.stdout.readline, b""):
results += str(line.rstrip())
presults += line.rstrip().decode() + "\n"
print(presults, flush=True)
except: # noqa # pylint: disable=bare-except
pass
try:
cmd = r"/mnt/petrelfs/share/sensesync --dryrun --deleteSrc cp " + VOLC_SAVE_PATH_NO_PRFIX + " / "
with Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True) as output:
results, presults = "", ""
for line in iter(output.stdout.readline, b""):
results += str(line.rstrip())
presults += line.rstrip().decode() + "\n"
print(presults, flush=True)
except: # noqa # pylint: disable=bare-except
pass

View File

@ -6,24 +6,15 @@ import torch
from internlm.core.context.parallel_context import Config
from internlm.initialize.launch import get_config_value
from tests.test_utils.common_fixture import ( # noqa # pylint: disable=unused-import
ASYNC_TMP_FOLDER,
BOTO_SAVE_PATH,
LOCAL_SAVE_PATH,
VOLC_SAVE_PATH,
del_tmp_file,
init_dist_and_model,
reset_singletons,
)
ASYNC_TMP_FOLDER = "./async_tmp_folder"
ckpt_config_list = [
# async boto
dict(
enable_save_ckpt=True,
async_upload_tmp_folder=ASYNC_TMP_FOLDER,
async_upload=True,
save_folder=BOTO_SAVE_PATH,
test_id=0,
),
# sync local
dict(
enable_save_ckpt=True,
@ -32,22 +23,46 @@ ckpt_config_list = [
save_folder=LOCAL_SAVE_PATH,
test_id=1,
),
# sync boto
dict(
enable_save_ckpt=True,
async_upload_tmp_folder=None,
async_upload=False,
save_folder=BOTO_SAVE_PATH,
test_id=2,
),
# async local
dict(
enable_save_ckpt=True,
async_upload_tmp_folder=ASYNC_TMP_FOLDER,
async_upload=True,
save_folder=LOCAL_SAVE_PATH,
test_id=2,
),
# async boto
dict(
enable_save_ckpt=True,
async_upload_tmp_folder=ASYNC_TMP_FOLDER,
async_upload=True,
save_folder=BOTO_SAVE_PATH,
test_id=3,
),
# sync boto
dict(
enable_save_ckpt=True,
async_upload_tmp_folder=None,
async_upload=False,
save_folder=BOTO_SAVE_PATH,
test_id=4,
),
# async volc
dict(
enable_save_ckpt=True,
async_upload_tmp_folder=ASYNC_TMP_FOLDER,
async_upload=True,
save_folder=VOLC_SAVE_PATH,
test_id=5,
),
# sync volc
dict(
enable_save_ckpt=True,
async_upload_tmp_folder=None,
async_upload=False,
save_folder=VOLC_SAVE_PATH,
test_id=6,
),
]
@ -61,7 +76,7 @@ def del_tmp():
@pytest.mark.usefixtures("del_tmp")
@pytest.mark.usefixtures("reset_singletons")
@pytest.mark.parametrize("ckpt_config", ckpt_config_list)
def test_storage_mm_save_load(ckpt_config, init_dist_and_model): # noqa # pylint: disable=unused-argument
def test_storage_mm_save_load(ckpt_config): # noqa # pylint: disable=unused-argument
from internlm.utils.storage_manager import (
check_folder,
get_fns,
@ -72,6 +87,11 @@ def test_storage_mm_save_load(ckpt_config, init_dist_and_model): # noqa # pylin
)
ckpt_config = Config(ckpt_config)
if os.environ.get("OSS_BUCKET_NAME") is None:
if ckpt_config.test_id > 2:
print("Pass boto3 and volc", flush=True)
return
enable_save_ckpt = get_config_value(ckpt_config, "enable_save_ckpt", False)
async_upload_tmp_folder = get_config_value(ckpt_config, "async_upload_tmp_folder", False)
async_upload = get_config_value(ckpt_config, "async_upload", False)
@ -97,6 +117,9 @@ internlm_ckpt_path = [
("/mnt/ckpt/", "local", "/mnt/ckpt/"),
("./ckpt/", "local", "./ckpt/"),
("s3://oss_bucket/", "boto3", "s3://oss_bucket/"),
("volc:vc://oss_bucket/", "volc", "vc://oss_bucket/"),
("volc:oss_bucket/", "volc", "oss_bucket/"),
("vc://oss_bucket/", "volc", "vc://oss_bucket/"),
]