mirror of https://github.com/hpcaitech/ColossalAI
55 lines
1.5 KiB
Python
55 lines
1.5 KiB
Python
|
from functools import partial
|
||
|
from typing import List
|
||
|
|
||
|
import pytest
|
||
|
import torch
|
||
|
import torch.distributed as dist
|
||
|
import torch.multiprocessing as mp
|
||
|
from colossalai.communication.p2p_v2 import _send_object, _recv_object, init_process_group
|
||
|
from colossalai.context import ParallelMode
|
||
|
from colossalai.core import global_context as gpc
|
||
|
from colossalai.initialize import launch
|
||
|
from colossalai.utils import free_port, get_current_device
|
||
|
from colossalai.testing import rerun_if_address_is_in_use
|
||
|
from colossalai.logging import disable_existing_loggers
|
||
|
|
||
|
disable_existing_loggers()
|
||
|
world_size = 4
|
||
|
CONFIG = dict(parallel=dict(pipeline=world_size))
|
||
|
torch.manual_seed(123)
|
||
|
|
||
|
|
||
|
def check_layer(rank, world_size, port):
|
||
|
disable_existing_loggers()
|
||
|
launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl', verbose=False)
|
||
|
rank = gpc.get_local_rank(ParallelMode.PIPELINE)
|
||
|
|
||
|
if rank == 0:
|
||
|
obj = [torch.randn(3,)]
|
||
|
_send_object(obj, 1)
|
||
|
|
||
|
if rank == 1:
|
||
|
_recv_object(0)
|
||
|
|
||
|
if rank == 2:
|
||
|
_recv_object(3)
|
||
|
|
||
|
if rank == 3:
|
||
|
obj = [torch.randn(3,)]
|
||
|
_send_object(obj, 2)
|
||
|
|
||
|
gpc.destroy()
|
||
|
torch.cuda.empty_cache()
|
||
|
|
||
|
|
||
|
@pytest.mark.dist
|
||
|
@rerun_if_address_is_in_use()
|
||
|
def test_object_list_p2p():
|
||
|
disable_existing_loggers()
|
||
|
run_func = partial(check_layer, world_size=world_size, port=free_port())
|
||
|
mp.spawn(run_func, nprocs=world_size)
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
test_object_list_p2p()
|