mirror of https://github.com/hpcaitech/ColossalAI
aibig-modeldata-parallelismdeep-learningdistributed-computingfoundation-modelsheterogeneous-traininghpcinferencelarge-scalemodel-parallelismpipeline-parallelism
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
55 lines
1.5 KiB
55 lines
1.5 KiB
import torch |
|
import torch.distributed as dist |
|
from torch.testing import assert_close |
|
|
|
from colossalai import launch |
|
from colossalai.accelerator import get_accelerator |
|
from colossalai.quantization.fp8 import all_reduce_fp8 |
|
from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn |
|
|
|
|
|
@parameterize( |
|
"shape", |
|
[ |
|
(3, 7), |
|
(4, 7), |
|
(7, 4), |
|
(8, 9), |
|
(3), |
|
(7,), |
|
(8,), |
|
], |
|
) |
|
@parameterize("dtype", [torch.float16, torch.bfloat16]) |
|
@parameterize("fp8_format", ["e4m3", "e5m2"]) |
|
@parameterize("async_op", [True, False]) |
|
def check_4gpu(shape, dtype, fp8_format, async_op): |
|
x = torch.rand(shape, dtype=dtype, device=get_accelerator().get_current_device()) |
|
x_fp8 = x.clone() |
|
origin_handle = dist.all_reduce(x, async_op=async_op) |
|
fp8_handle = all_reduce_fp8(x_fp8, fp8_format=fp8_format, async_op=async_op) |
|
if async_op: |
|
origin_handle.wait() |
|
fp8_handle.wait() |
|
assert_close(x, x_fp8, rtol=0.1, atol=0.1) |
|
|
|
origin_handle = dist.all_reduce(x, op=dist.ReduceOp.AVG, async_op=async_op) |
|
fp8_handle = all_reduce_fp8(x_fp8, op=dist.ReduceOp.AVG, fp8_format=fp8_format, async_op=async_op) |
|
if async_op: |
|
origin_handle.wait() |
|
fp8_handle.wait() |
|
assert_close(x, x_fp8, rtol=0.1, atol=0.1) |
|
|
|
|
|
def run_dist(rank, world_size, port): |
|
launch(rank=rank, world_size=world_size, port=port, host="localhost") |
|
check_4gpu() |
|
|
|
|
|
@rerun_if_address_is_in_use() |
|
def test_all_reduce(): |
|
spawn(run_dist, 4) |
|
|
|
|
|
if __name__ == "__main__": |
|
test_all_reduce()
|
|
|