Browse Source

fix optim

pull/5190/head
Xuanlei Zhao 11 months ago
parent
commit
44014faa67
  1. BIN
      applications/ColossalMoE/README.md
  2. 49
      colossalai/zero/low_level/low_level_optim.py
  3. 20
      tests/test_moe/moe_utils.py
  4. 64
      tests/test_moe/test_moe_zero_fwd_bwd.py
  5. 119
      tests/test_moe/test_moe_zero_optim.py

BIN
applications/ColossalMoE/README.md

Binary file not shown.

49
colossalai/zero/low_level/low_level_optim.py

@ -144,7 +144,7 @@ class LowLevelZeroOptimizer(OptimizerWrapper):
# because they have different parallel strategy
# so we need to store them separately in param_groups
# instead of working_groups
moe_params = list()
self.working_moe_params = list()
# iterate over the param group in the optimizer
# partition these param groups for data parallel training
@ -156,7 +156,7 @@ class LowLevelZeroOptimizer(OptimizerWrapper):
if self.moe_extra_dp_pg is None:
# skip moe param
if is_moe_tensor(param):
moe_params.append(param)
self.working_moe_params.append(param)
continue
group_params.append(param)
@ -172,12 +172,15 @@ class LowLevelZeroOptimizer(OptimizerWrapper):
param_group["params"] = master_param_current_rank
# if there are moe params, store in addtional group in optim
if len(moe_params) > 0:
if len(self.working_moe_params) > 0:
param_group = dict()
for key, value in self.optim.param_groups[0].items():
if key != "params":
param_group[key] = value
param_group["params"] = moe_params
self.master_moe_params = []
for param in self.working_moe_params:
self.master_moe_params.append(param.to(torch.float32))
param_group["params"] = self.master_moe_params
self.optim.param_groups.append(param_group)
# intialize communication stream for
@ -596,24 +599,40 @@ class LowLevelZeroOptimizer(OptimizerWrapper):
# update the params in the optimizer
self.optim.param_groups[group_id]["params"] = real_master_params[group_id]
# update param for moe ep
# move grad to master param and compute norm
if len(self.working_moe_params) > 0:
moe_grads = []
for master_moe_param, working_moe_param in zip(self.master_moe_params, self.working_moe_params):
if master_moe_param.grad is not None:
raise RuntimeError("Moe param should not have grad here")
grad = working_moe_param.grad
# no need to copy fp32 grad if master_weights is False
if self._master_weights:
grad = grad.to(master_moe_param.dtype).to(master_moe_param.device)
master_moe_param.grad = grad
working_moe_param.grad = None
moe_grads.append(grad)
grad_partition_groups.append(grad)
norm_group = self._compute_grad_norm(gradients=moe_grads)
norm_groups.append(norm_group)
self.optim.param_groups[-1]["params"] = self.master_moe_params
del moe_grads
# unscale and clip grads
global_norm = calculate_global_norm_from_list(norm_list=norm_groups)
self._unscale_and_clip_grads(grad_partition_groups, global_norm)
# TODO: we should store master param for ep
if len(self.param_groups) > len(self._working_param_groups):
for param in self.param_groups[-1]["params"]:
param.data = param.data.to(torch.float32)
param.grad = param.grad.to(torch.float32)
# update the parameters
self.optim.step()
# release the moe gradm
if len(self.param_groups) > len(self._working_param_groups):
for param in self.param_groups[-1]["params"]:
param.grad = None
param.data = param.data.to(self._dtype)
# release moe grad
if len(self.working_moe_params) > 0:
for master_moe_param, working_moe_param in zip(self.master_moe_params, self.working_moe_params):
master_moe_param.grad = None
working_moe_param.data = (
master_moe_param.data.to(working_moe_param.device).to(working_moe_param.dtype).detach()
)
# release the grad
grad_partition_groups = []

20
tests/test_moe/moe_utils.py

@ -1,6 +1,7 @@
import torch
import torch.distributed as dist
import torch.nn as nn
from torch.testing import assert_close
from colossalai.booster.plugin.low_level_zero_plugin import LowLevelZeroModel
from colossalai.legacy.engine.gradient_handler._base_gradient_handler import BaseGradientHandler
@ -10,7 +11,6 @@ from colossalai.moe import SparseMLP
from colossalai.moe.manager import MOE_MANAGER
from colossalai.moe.utils import get_moe_epsize_param_dict
from colossalai.tensor.moe_tensor.api import get_ep_group, get_ep_size
from tests.test_moe.moe_utils import MoeModel
def delete_moe_info(model):
@ -126,7 +126,7 @@ def sync_local_from_ep(local_model: SparseMLP, ep_model: SparseMLP, assert_grad_
for (local_name, local_param), (ep_name, ep_param) in zip(
local_model.named_parameters(), ep_model.named_parameters()
):
assert local_name == ep_name, print(f"{local_name} != {ep_name}")
assert local_name in ep_name, print(f"{local_name} != {ep_name}")
if "experts" not in local_name:
if assert_grad_flag:
assert torch.allclose(local_param, ep_param), f"local_param: {local_param}, ep_param: {ep_param}"
@ -149,3 +149,19 @@ def sync_local_from_ep(local_model: SparseMLP, ep_model: SparseMLP, assert_grad_
assert torch.allclose(local_param.grad, all_grad)
else:
local_param.data.copy_(all_param.data)
def loose_close(a, b, dtype: torch.dtype = torch.float32):
rtol = None
atol = None
if dtype is torch.float16:
rtol = 5e-2
atol = 5e-4
elif dtype is torch.bfloat16:
rtol = 4e-3
atol = 4e-3
a = a.detach().to(dtype)
b = b.detach().to(dtype).to(a.device)
assert_close(a, b, rtol=rtol, atol=atol)

64
tests/test_moe/test_moe_zero_fwd_bwd.py

@ -15,64 +15,64 @@ def run_zero_test(local_rank, stage=1):
MOE_MANAGER.__init__()
MOE_MANAGER.setup(parallel="EP")
moe_model = MoeModel().bfloat16()
moe_optimizer = torch.optim.Adam(moe_model.parameters())
moe_plugin = LowLevelZeroPlugin(stage=stage, precision="bf16")
moe_booster = Booster(plugin=moe_plugin)
moe_model, moe_optimizer, _, _, _ = moe_booster.boost(moe_model, moe_optimizer)
MOE_MANAGER.__init__()
MOE_MANAGER.setup(parallel=None)
zero_model = MoeModel().bfloat16()
delete_moe_info(zero_model)
zero_optimizer = torch.optim.Adam(zero_model.parameters())
zero_plugin = LowLevelZeroPlugin(stage=stage, precision="bf16")
zero_booster = Booster(plugin=zero_plugin)
zero_model, zero_optimizer, _, _, _ = zero_booster.boost(zero_model, zero_optimizer)
MOE_MANAGER.__init__()
MOE_MANAGER.setup(parallel=None)
torch_model = MoeModel().bfloat16()
delete_moe_info(torch_model)
torch_optimizer = torch.optim.Adam(torch_model.parameters())
torch_plugin = LowLevelZeroPlugin(stage=stage, precision="bf16")
torch_booster = Booster(plugin=torch_plugin)
torch_model, torch_optimizer, _, _, _ = torch_booster.boost(torch_model, torch_optimizer)
sync_local_from_ep(torch_model, zero_model)
sync_local_from_ep(zero_model, moe_model)
data = torch.randn(16, 4).bfloat16().cuda()
label = torch.randint(0, 4, (16,)).cuda()
torch_out = run_fwd_bwd(torch_model, data, label, criterion, torch_optimizer)
zero_out = run_fwd_bwd(zero_model, data, label, criterion, zero_optimizer)
assert torch.allclose(torch_out, zero_out)
moe_out = run_fwd_bwd(moe_model, data, label, criterion, moe_optimizer)
assert torch.allclose(zero_out, moe_out)
for (zero_name, zero_param), (torch_name, torch_param) in zip(
zero_model.module.named_parameters(), torch_model.module.named_parameters()
for (moe_name, moe_param), (zero_name, zero_param) in zip(
moe_model.module.named_parameters(), zero_model.module.named_parameters()
):
assert zero_name == torch_name
assert moe_name == zero_name
moe_grad_list = moe_optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(moe_param))
zero_grad_list = zero_optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(zero_param))
torch_grad_list = torch_optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(torch_param))
if hasattr(zero_param, "moe_info"):
assert len(zero_grad_list) == 0
if hasattr(moe_param, "moe_info"):
assert len(moe_grad_list) == 0
if stage == 1:
torch_grad = torch_grad_list[local_rank].view(zero_param.grad.shape)
zero_grad = zero_grad_list[local_rank].view(moe_param.grad.shape)
else:
torch_grad = torch_grad_list[0].view(zero_param.grad.shape)
zero_grad = zero_grad_list[0].view(moe_param.grad.shape)
assert torch.allclose(
zero_param.grad, torch_grad, atol=1e-5
), f"zero grad:\n{zero_param.grad}\ntorch grad:\n{torch_grad}\nmax diff: {(zero_param.grad - torch_grad).abs().max()}, mean diff: {(zero_param.grad - torch_grad).abs().mean()}"
moe_param.grad, zero_grad, atol=1e-5
), f"zero grad:\n{moe_param.grad}\ntorch grad:\n{zero_grad}\nmax diff: {(moe_param.grad - zero_grad).abs().max()}, mean diff: {(moe_param.grad - zero_grad).abs().mean()}"
else:
assert len(zero_grad_list) > 0
assert len(zero_grad_list) == len(torch_grad_list)
for zero_grad, torch_grad in zip(zero_grad_list, torch_grad_list):
assert torch.allclose(zero_grad, torch_grad)
assert len(moe_grad_list) > 0
assert len(moe_grad_list) == len(zero_grad_list)
for moe_grad, zero_grad in zip(moe_grad_list, zero_grad_list):
assert torch.allclose(moe_grad, zero_grad)
def run_dist(rank, world_size, port):
def run_dist(rank, world_size, port, stage):
colossalai.launch(config=dict(), rank=rank, world_size=world_size, host="localhost", port=port, backend="nccl")
seed_all(42 + rank)
run_zero_test(rank, stage=1)
run_zero_test(rank, stage=2)
run_zero_test(rank, stage=stage)
@pytest.mark.dist
@pytest.mark.parametrize("world_size", [2])
@pytest.mark.parametrize("stage", [1, 2])
@rerun_if_address_is_in_use()
def test_moe_zero_model(world_size):
spawn(run_dist, world_size)
def test_moe_zero_model(world_size, stage):
spawn(run_dist, world_size, stage=stage)
if __name__ == "__main__":
test_moe_zero_model(world_size=2)
test_moe_zero_model(world_size=2, stage=1)

119
tests/test_moe/test_moe_zero_optim.py

@ -4,89 +4,80 @@ import torch
import colossalai
from colossalai.booster import Booster
from colossalai.booster.plugin import LowLevelZeroPlugin
from colossalai.booster.plugin.low_level_zero_plugin import LowLevelZeroModel
from colossalai.moe.manager import MOE_MANAGER
from colossalai.tensor.moe_tensor.api import is_moe_tensor
from colossalai.testing import rerun_if_address_is_in_use, spawn
from tests.test_moe.moe_utils import MoeGradientHandler, MoeModel
from colossalai.testing.random import seed_all
from tests.test_moe.moe_utils import MoeModel, delete_moe_info, loose_close, run_fwd_bwd, sync_local_from_ep
def split_ddp_grad(grad, world_size):
with torch.no_grad():
grad = grad.clone().detach().flatten()
padding_size = (world_size - grad.numel() % world_size) % world_size
if padding_size > 0:
grad = torch.nn.functional.pad(grad, [0, padding_size])
splited_grad = grad.split(grad.numel() // world_size)
return splited_grad
def run_fwd_bwd(model, data, label, criterion, optimizer, enable_autocast=False):
model.train()
with torch.cuda.amp.autocast(enabled=enable_autocast):
if criterion:
y = model(data)
loss = criterion(y, label)
else:
loss = model(data, label)
loss = loss.float()
if isinstance(model, LowLevelZeroModel):
optimizer.backward(loss)
else:
loss.backward()
return y
def run_zero_optim_test(local_rank, world_size, stage=1):
def run_zero_test(local_rank, stage=1):
criterion = torch.nn.CrossEntropyLoss()
zero_model = MoeModel()
zero_optimizer = torch.optim.Adam(zero_model.parameters())
plugin = LowLevelZeroPlugin(stage=stage, precision="fp32")
booster = Booster(plugin=plugin)
zero_model, zero_optimizer, _, _, _ = booster.boost(zero_model, zero_optimizer)
torch_model = MoeModel()
for zero_param, torch_param in zip(zero_model.parameters(), torch_model.parameters()):
torch_param.data.copy_(zero_param.data)
torch_optimizer = torch.optim.Adam(torch_model.parameters())
torch_model = torch_model.cuda()
grad_handler = MoeGradientHandler(torch_model)
for _ in range(2):
data = torch.randn(16, 4).cuda() / (local_rank + 1)
label = torch.randint(0, 4, (16,)).cuda()
run_fwd_bwd(torch_model, data, label, criterion, None)
run_fwd_bwd(zero_model, data, label, criterion, zero_optimizer)
grad_handler.handle_gradient()
torch_optimizer.step()
MOE_MANAGER.__init__()
MOE_MANAGER.setup(parallel="EP")
moe_model = MoeModel().bfloat16()
moe_optimizer = torch.optim.Adam(moe_model.parameters(), lr=1.0)
moe_plugin = LowLevelZeroPlugin(stage=stage, precision="bf16")
moe_booster = Booster(plugin=moe_plugin)
moe_model, moe_optimizer, _, _, _ = moe_booster.boost(moe_model, moe_optimizer)
MOE_MANAGER.__init__()
MOE_MANAGER.setup(parallel=None)
zero_model = MoeModel().bfloat16()
delete_moe_info(zero_model)
sync_local_from_ep(zero_model, moe_model)
zero_optimizer = torch.optim.Adam(zero_model.parameters(), lr=1.0)
zero_plugin = LowLevelZeroPlugin(stage=stage, precision="bf16")
zero_booster = Booster(plugin=zero_plugin)
zero_model, zero_optimizer, _, _, _ = zero_booster.boost(zero_model, zero_optimizer)
for (moe_name, moe_param), (zero_name, zero_param) in zip(
moe_model.named_parameters(), zero_model.named_parameters()
):
if ".experts." in moe_name:
continue
assert moe_name == zero_name
assert torch.allclose(
moe_param.data, zero_param.data
), f"{moe_name}\ntorch_param {moe_param.data}\nzero_param {zero_param.data}"
for _ in range(1):
data = torch.randn(2, 4).bfloat16().cuda()
label = torch.randint(0, 4, (2,)).cuda()
moe_out = run_fwd_bwd(moe_model, data, label, criterion, moe_optimizer)
zero_out = run_fwd_bwd(zero_model, data, label, criterion, zero_optimizer)
assert torch.allclose(zero_out, moe_out)
moe_optimizer.step()
zero_optimizer.step()
for (torch_name, torch_param), (zero_name, zero_param) in zip(
torch_model.named_parameters(), zero_model.named_parameters()
for (moe_name, moe_param), (zero_name, zero_param) in zip(
moe_model.named_parameters(), zero_model.named_parameters()
):
assert torch.allclose(
torch_param.data, zero_param.data
), f"{torch_name}\ntorch_param {torch_param.data}\nzero_param {zero_param.data}"
assert moe_name == zero_name
if is_moe_tensor(moe_param):
param_size = moe_param.shape[0]
zero_param = zero_param[local_rank * param_size : (local_rank + 1) * param_size]
loose_close(moe_param.data, zero_param.data, dtype=moe_param.dtype)
torch_optimizer.zero_grad()
moe_optimizer.zero_grad()
zero_optimizer.zero_grad()
def run_dist(rank, world_size, port):
def run_dist(rank, world_size, port, stage):
colossalai.launch(config=dict(), rank=rank, world_size=world_size, host="localhost", port=port, backend="nccl")
MOE_MANAGER.setup(parallel="EP")
run_zero_optim_test(rank, world_size, stage=1)
run_zero_optim_test(rank, world_size, stage=2)
seed_all(42 + rank)
run_zero_test(rank, stage=stage)
@pytest.mark.dist
@pytest.mark.parametrize("world_size", [2])
@pytest.mark.parametrize("stage", [1, 2])
@rerun_if_address_is_in_use()
def test_moe_zero_optim(world_size):
spawn(run_dist, world_size)
def test_moe_zero_optim(world_size, stage):
spawn(run_dist, world_size, stage=stage)
if __name__ == "__main__":
test_moe_zero_optim(world_size=2)
test_moe_zero_optim(world_size=2, stage=1)

Loading…
Cancel
Save