mirror of https://github.com/hpcaitech/ColossalAI
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
68 lines
2.2 KiB
68 lines
2.2 KiB
4 months ago
|
import torch
|
||
|
import torch.distributed as dist
|
||
|
from torch.distributed.distributed_c10d import _get_default_group
|
||
|
from torch.testing import assert_close
|
||
|
|
||
|
from colossalai import launch
|
||
|
from colossalai.accelerator import get_accelerator
|
||
|
from colossalai.quantization.fp8 import all_to_all_single_fp8
|
||
|
from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn
|
||
|
|
||
|
|
||
|
@parameterize("shape", [(4,), (1, 8, 16), (4, 8, 16)])
|
||
|
@parameterize("dtype", [torch.bfloat16])
|
||
|
def check_all2all(shape, dtype):
|
||
|
x = torch.rand(shape, dtype=dtype, device=get_accelerator().get_current_device())
|
||
|
output = torch.empty_like(x)
|
||
|
output_fp8 = torch.empty_like(x)
|
||
|
dist.all_to_all_single(output, x, group=_get_default_group(), async_op=False)
|
||
|
all_to_all_single_fp8(output_fp8, x, group=_get_default_group(), async_op=False)
|
||
|
assert_close(output, output_fp8, rtol=0.1, atol=0.1)
|
||
|
|
||
|
|
||
|
@parameterize("shape", [(8, 8, 16)])
|
||
|
@parameterize("dtype", [torch.bfloat16, torch.float16])
|
||
|
def check_all2all_uneven(shape, dtype):
|
||
|
x = torch.rand(shape, dtype=dtype, device=get_accelerator().get_current_device())
|
||
|
input_split_sizes = [3, 3, 1, 1]
|
||
|
if dist.get_rank() in [0, 1]:
|
||
|
output_split_sizes = [3, 3, 3, 3]
|
||
|
else:
|
||
|
output_split_sizes = [1, 1, 1, 1]
|
||
|
output_shape = list(shape)
|
||
|
output_shape[0] = sum(output_split_sizes)
|
||
|
output = torch.empty(output_shape, device=x.device, dtype=x.dtype)
|
||
|
output_fp8 = torch.empty(output_shape, device=x.device, dtype=x.dtype)
|
||
|
dist.all_to_all_single(
|
||
|
output,
|
||
|
x,
|
||
|
output_split_sizes=output_split_sizes,
|
||
|
input_split_sizes=input_split_sizes,
|
||
|
group=_get_default_group(),
|
||
|
async_op=False,
|
||
|
)
|
||
|
all_to_all_single_fp8(
|
||
|
output_fp8,
|
||
|
x,
|
||
|
output_split_sizes=output_split_sizes,
|
||
|
input_split_sizes=input_split_sizes,
|
||
|
group=_get_default_group(),
|
||
|
async_op=False,
|
||
|
)
|
||
|
assert_close(output, output_fp8, rtol=0.1, atol=0.1)
|
||
|
|
||
|
|
||
|
def run_dist(rank, world_size, port):
|
||
|
launch(rank=rank, world_size=world_size, port=port, host="localhost")
|
||
|
check_all2all()
|
||
|
check_all2all_uneven()
|
||
|
|
||
|
|
||
|
@rerun_if_address_is_in_use()
|
||
|
def test_all_to_all_single():
|
||
|
spawn(run_dist, 4)
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
test_all_to_all_single()
|