diff --git a/applications/ColossalMoE/colossal_moe/models/mixtral_checkpoint.py b/applications/ColossalMoE/colossal_moe/models/mixtral_checkpoint.py index 078994628..f27b787e5 100644 --- a/applications/ColossalMoE/colossal_moe/models/mixtral_checkpoint.py +++ b/applications/ColossalMoE/colossal_moe/models/mixtral_checkpoint.py @@ -2,52 +2,14 @@ import logging import os from pathlib import Path -import torch.distributed as dist -import torch.nn as nn -import copy -import logging -import os -from pathlib import Path -from shutil import rmtree -from typing import Dict, Iterator, Optional, OrderedDict, Tuple - import torch import torch.distributed as dist import torch.nn as nn -from torch.distributed import ProcessGroup -from colossalai.checkpoint_io import CheckpointIndexFile, HybridParallelCheckpointIO -from colossalai.checkpoint_io.utils import ( - StateDictSharder, - gather_distributed_param, - get_model_base_filenames, - get_optimizer_base_filenames, - is_safetensors_available, - load_shard_state_dict, - load_state_dict, - load_state_dict_into_model, - load_states_into_optimizer, - save_config_file, - save_param_groups, - save_state_dict, - save_state_dict_shards, - sharded_optimizer_loading_epilogue, -) -from colossalai.interface import OptimizerWrapper -from colossalai.moe.manager import MOE_MANAGER -from colossalai.tensor.moe_tensor.api import ( - get_dp_group, - get_dp_rank, - get_dp_size, - get_ep_group, - get_ep_rank, - get_ep_size, - is_moe_tensor, -) from colossalai.checkpoint_io import CheckpointIndexFile from colossalai.checkpoint_io.utils import is_safetensors_available, load_shard_state_dict, load_state_dict_into_model from colossalai.moe import MoECheckpintIO -from colossalai.tensor.moe_tensor.api import get_ep_rank, get_ep_size, is_moe_tensor +from colossalai.tensor.moe_tensor.api import get_dp_rank, get_ep_group, get_ep_rank, get_ep_size, is_moe_tensor class MixtralMoECheckpointIO(MoECheckpintIO): @@ -62,8 +24,8 @@ class MixtralMoECheckpointIO(MoECheckpintIO): model_param_dict = dict(model.named_parameters()) for name, param in list(state_dict.items()): if ".gate.weight" in name: - new_name = "module." + name.replace(".gate.weight", ".gate_weight") - state_dict[new_name] = state_dict.pop(name) + new_name = "module." + name.replace(".gate.weight", ".gate_weight") + state_dict[new_name] = state_dict.pop(name) elif ".experts." in name: # if is moe tensor # in our moe module, expert is cat as one tensor @@ -94,7 +56,7 @@ class MixtralMoECheckpointIO(MoECheckpintIO): state_dict[model_param_name] = new_param state_dict.pop(name) else: - new_name = "module." + name + new_name = "module." + name state_dict[new_name] = state_dict.pop(name) for name, param in list(state_dict.items()): diff --git a/applications/ColossalMoE/colossal_moe/models/mixtral_layer.py b/applications/ColossalMoE/colossal_moe/models/mixtral_layer.py index 9d5854991..724aaeda5 100644 --- a/applications/ColossalMoE/colossal_moe/models/mixtral_layer.py +++ b/applications/ColossalMoE/colossal_moe/models/mixtral_layer.py @@ -1,10 +1,9 @@ import torch import torch.nn as nn -from transformers.models.mixtral.modeling_mixtral import MixtralSparseMoeBlock, MixtralDecoderLayer +from transformers.models.mixtral.modeling_mixtral import MixtralDecoderLayer, MixtralSparseMoeBlock from colossalai.lazy import LazyInitContext from colossalai.moe import SparseMLP -from colossalai.tensor.moe_tensor.api import get_ep_rank, is_moe_tensor class MixtralSparseMLP: diff --git a/applications/ColossalMoE/infer.py b/applications/ColossalMoE/infer.py index 0131c5a38..7467ec397 100644 --- a/applications/ColossalMoE/infer.py +++ b/applications/ColossalMoE/infer.py @@ -1,12 +1,12 @@ import argparse +import os import torch import torch.distributed as dist from colossal_moe.models.mixtral_checkpoint import MixtralMoECheckpointIO -from colossal_moe.models.mixtral_policy import MixtralForCausalLMPolicy from colossal_moe.models.mixtral_layer import replace_moe_layer -from torch.utils.data import Dataset -from tqdm import tqdm +from colossal_moe.models.mixtral_policy import MixtralForCausalLMPolicy +from huggingface_hub import snapshot_download from transformers import AutoTokenizer from transformers.models.mixtral import MixtralConfig, MixtralForCausalLM @@ -14,29 +14,7 @@ import colossalai from colossalai.booster import Booster from colossalai.booster.plugin.moe_hybrid_parallel_plugin import MoeHybridParallelPlugin from colossalai.cluster import DistCoordinator -from colossalai.lazy import LazyInitContext -from colossalai.moe import MOE_MANAGER, apply_load_balance -from colossalai.utils import get_current_device -import argparse -import os -from functools import partial -from typing import Dict - -import torch -import torch.distributed as dist -from datasets import load_dataset -from huggingface_hub import snapshot_download -from torch.utils.data import Dataset -from tqdm import tqdm -from transformers import T5Tokenizer -from transformers.models.llama import LlamaConfig - -import colossalai -from colossalai.booster import Booster -from colossalai.booster.plugin.moe_hybrid_parallel_plugin import MoeHybridParallelPlugin -from colossalai.cluster import DistCoordinator -from colossalai.moe.layers import apply_load_balance -from colossalai.moe.manager import MOE_MANAGER +from colossalai.moe import MOE_MANAGER from colossalai.moe.utils import skip_init from colossalai.utils import get_current_device @@ -88,9 +66,7 @@ def parse_args(): choices=["fp32", "bf16", "fp16"], help="The mixed precision training.", ) - parser.add_argument( - "--seed", type=int, default=42, help="A seed for reproducible training." - ) + parser.add_argument("--seed", type=int, default=42, help="A seed for reproducible training.") # kernel parser.add_argument( @@ -147,11 +123,7 @@ def main(): config.num_local_experts = 1 # dont change this. it will not affect model with skip_init(): model = MixtralForCausalLM(config) - model = ( - model.to(torch.bfloat16) - if args.precision == "bf16" - else model.to(torch.float16) - ) + model = model.to(torch.bfloat16) if args.precision == "bf16" else model.to(torch.float16) model = model.to(get_current_device()) coordinator.print_on_master(f"Finish init model with config:\n{config}") diff --git a/applications/ColossalMoE/tests/test_moe_checkpoint.py b/applications/ColossalMoE/tests/test_moe_checkpoint.py index e97e4a845..d365b7a54 100644 --- a/applications/ColossalMoE/tests/test_moe_checkpoint.py +++ b/applications/ColossalMoE/tests/test_moe_checkpoint.py @@ -1,4 +1,3 @@ -import importlib import os import shutil import sys @@ -6,7 +5,9 @@ import sys import pytest import torch import torch.distributed as dist -from transformers.models.llama import LlamaConfig +from colossal_moe.models.mixtral_checkpoint import MixtralMoECheckpointIO +from colossal_moe.models.mixtral_policy import MixtralForCausalLMPolicy +from transformers.models.mixtral import MixtralConfig, MixtralForCausalLM import colossalai from colossalai.booster import Booster @@ -14,9 +15,6 @@ from colossalai.booster.plugin.moe_hybrid_parallel_plugin import MoeHybridParall from colossalai.moe.manager import MOE_MANAGER from colossalai.testing import DummyDataloader, check_state_dict_equal, rerun_if_address_is_in_use, spawn from colossalai.utils import get_current_device -from transformers.models.mixtral import MixtralConfig, MixtralForCausalLM -from colossal_moe.models.mixtral_checkpoint import MixtralMoECheckpointIO -from colossal_moe.models.mixtral_policy import MixtralForCausalLMPolicy sys.path.append( os.path.join( diff --git a/applications/ColossalMoE/train.py b/applications/ColossalMoE/train.py index 7f6bd3b8c..c7af67172 100644 --- a/applications/ColossalMoE/train.py +++ b/applications/ColossalMoE/train.py @@ -1,10 +1,15 @@ import argparse +import os import torch import torch.distributed as dist from colossal_moe.models.mixtral_checkpoint import MixtralMoECheckpointIO -from colossal_moe.models.mixtral_policy import MixtralForCausalLMPolicy from colossal_moe.models.mixtral_layer import replace_moe_layer +from colossal_moe.models.mixtral_policy import MixtralForCausalLMPolicy +from huggingface_hub import snapshot_download + +# from colossalai.nn.optimizer import HybridAdam +from torch.optim import Adam as HybridAdam from torch.utils.data import Dataset from tqdm import tqdm from transformers import AutoTokenizer @@ -14,37 +19,16 @@ import colossalai from colossalai.booster import Booster from colossalai.booster.plugin.moe_hybrid_parallel_plugin import MoeHybridParallelPlugin from colossalai.cluster import DistCoordinator -from colossalai.lazy import LazyInitContext from colossalai.moe import MOE_MANAGER, apply_load_balance -# from colossalai.nn.optimizer import HybridAdam -from torch.optim import Adam as HybridAdam -from colossalai.utils import get_current_device -import argparse -import os -from functools import partial -from typing import Dict - -import torch -import torch.distributed as dist -from datasets import load_dataset -from huggingface_hub import snapshot_download -from torch.utils.data import Dataset -from tqdm import tqdm -from transformers import T5Tokenizer -from transformers.models.llama import LlamaConfig - -import colossalai -from colossalai.booster import Booster -from colossalai.booster.plugin.moe_hybrid_parallel_plugin import MoeHybridParallelPlugin -from colossalai.cluster import DistCoordinator from colossalai.moe.layers import apply_load_balance from colossalai.moe.manager import MOE_MANAGER -from colossalai.moe.utils import skip_init from colossalai.utils import get_current_device + def move_to_cuda(batch, device): return {k: v.to(device) for k, v in batch.items()} + def load_ckpt(repo_name: str, model, booster: Booster): ckpt_path = snapshot_download(repo_name) # single ckpt @@ -284,7 +268,9 @@ def main(): model = model.to(get_current_device()) replace_moe_layer(model) # torch.set_default_tensor_type(torch.float32) - print(f"0-2 param num: {sum(p.numel() for p in model.parameters())/ 1000.0 ** 3}GB, memory: {torch.cuda.memory_allocated()/ 1000.0 ** 3}GB") + print( + f"0-2 param num: {sum(p.numel() for p in model.parameters())/ 1000.0 ** 3}GB, memory: {torch.cuda.memory_allocated()/ 1000.0 ** 3}GB" + ) coordinator.print_on_master(f"Finish init model with config:\n{config}") # Enable gradient checkpointing @@ -298,7 +284,9 @@ def main(): dataset, batch_size=args.batch_size, shuffle=True, drop_last=True, collate_fn=collate_fn ) torch.cuda.synchronize() - print(f"1 param num: {sum(p.numel() for p in model.parameters())/ 1000.0 ** 3}GB, memory: {torch.cuda.memory_allocated()/ 1000.0 ** 3}GB") + print( + f"1 param num: {sum(p.numel() for p in model.parameters())/ 1000.0 ** 3}GB, memory: {torch.cuda.memory_allocated()/ 1000.0 ** 3}GB" + ) # Set optimizer optimizer = HybridAdam(model.parameters(), lr=args.lr, weight_decay=args.weight_decay) @@ -307,10 +295,14 @@ def main(): booster = Booster(plugin=plugin, **booster_kwargs) model, optimizer, _, dataloader, _ = booster.boost(model=model, optimizer=optimizer, dataloader=dataloader) torch.cuda.synchronize() - print(f"2-1 param num: {sum(p.numel() for p in model.parameters())/ 1000.0 ** 3}GB, memory: {torch.cuda.memory_allocated()/ 1000.0 ** 3}GB") + print( + f"2-1 param num: {sum(p.numel() for p in model.parameters())/ 1000.0 ** 3}GB, memory: {torch.cuda.memory_allocated()/ 1000.0 ** 3}GB" + ) load_ckpt("mistralai/Mixtral-8x7B-v0.1", model, booster) torch.cuda.synchronize() - print(f"2 param num: {sum(p.numel() for p in model.parameters())/ 1000.0 ** 3}GB, memory: {torch.cuda.memory_allocated()/ 1000.0 ** 3}GB") + print( + f"2 param num: {sum(p.numel() for p in model.parameters())/ 1000.0 ** 3}GB, memory: {torch.cuda.memory_allocated()/ 1000.0 ** 3}GB" + ) use_pipeline = isinstance(booster.plugin, MoeHybridParallelPlugin) and booster.plugin.pp_size > 1 is_pp_last_stage = use_pipeline and booster.plugin.stage_manager.is_last_stage() @@ -348,11 +340,15 @@ def main(): data = move_to_cuda(data, torch.cuda.current_device()) outputs = model(**data) loss = outputs["loss"] - print(f"3 param num: {sum(p.numel() for p in model.parameters())/ 1000.0 ** 3}GB, memory: {torch.cuda.memory_allocated()/ 1000.0 ** 3}GB") + print( + f"3 param num: {sum(p.numel() for p in model.parameters())/ 1000.0 ** 3}GB, memory: {torch.cuda.memory_allocated()/ 1000.0 ** 3}GB" + ) # Backward booster.backward(loss, optimizer) - print(f"4 param num: {sum(p.numel() for p in model.parameters())/ 1000.0 ** 3}GB, memory: {torch.cuda.memory_allocated()/ 1000.0 ** 3}GB") + print( + f"4 param num: {sum(p.numel() for p in model.parameters())/ 1000.0 ** 3}GB, memory: {torch.cuda.memory_allocated()/ 1000.0 ** 3}GB" + ) pbar.set_postfix({"loss": loss.item()})