mirror of https://github.com/hpcaitech/ColossalAI
aibig-modeldata-parallelismdeep-learningdistributed-computingfoundation-modelsheterogeneous-traininghpcinferencelarge-scalemodel-parallelismpipeline-parallelism
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
155 lines
5.3 KiB
155 lines
5.3 KiB
import argparse |
|
import os |
|
import warnings |
|
from typing import Any, Callable, Dict, List, Tuple, Type, Union |
|
|
|
import torch |
|
import torch.distributed.rpc as rpc |
|
import torch.multiprocessing as mp |
|
from torch._C._distributed_rpc import _is_current_rpc_agent_set |
|
from torch.futures import Future |
|
|
|
from colossalai.initialize import launch |
|
from colossalai.pipeline.pipeline_process_group import ppg |
|
|
|
|
|
def pyobj_map(obj: Any, fn: Callable, process_types: Union[Type, Tuple[Type]] = ()) -> Any: |
|
if isinstance(obj, process_types): |
|
return fn(obj) |
|
elif type(obj) is dict: |
|
return {k: pyobj_map(obj[k], fn, process_types) for k in obj} |
|
elif type(obj) is tuple: |
|
return tuple(pyobj_map(o, fn, process_types) for o in obj) |
|
elif type(obj) is list: |
|
return list(pyobj_map(o, fn, process_types) for o in obj) |
|
else: |
|
return obj |
|
|
|
|
|
def pytree_map(obj: Any, fn: Callable, process_types: Union[Type, Tuple[Type]] = (), map_all: bool = False) -> Any: |
|
"""process object recursively, like pytree |
|
|
|
Args: |
|
obj (:class:`Any`): object to process |
|
fn (:class:`Callable`): a function to process subobject in obj |
|
process_types (:class: `type | tuple[type]`): types to determine the type to process |
|
map_all (:class: `bool`): if map_all is True, then any type of element will use fn |
|
|
|
Returns: |
|
:class:`Any`: returns have the same structure of `obj` and type in process_types after map of `fn` |
|
""" |
|
if isinstance(obj, dict): |
|
return {k: pytree_map(obj[k], fn, process_types, map_all) for k in obj} |
|
elif isinstance(obj, tuple): |
|
return tuple(pytree_map(o, fn, process_types, map_all) for o in obj) |
|
elif isinstance(obj, list): |
|
return list(pytree_map(o, fn, process_types, map_all) for o in obj) |
|
elif isinstance(obj, process_types): |
|
return fn(obj) |
|
else: |
|
return fn(obj) if map_all else obj |
|
|
|
|
|
def tensor_shape_list(obj): |
|
return pytree_map(obj, fn=lambda x: x.shape, process_types=torch.Tensor) |
|
|
|
|
|
def get_batch_lengths(batch): |
|
lengths = [] |
|
pytree_map(batch, fn=lambda x: lengths.append(len(x)), process_types=torch.Tensor) |
|
return lengths |
|
|
|
|
|
def split_batch(batch: Any, start, stop, device: str): |
|
if device == 'cuda': |
|
fn = lambda x: x[start:stop].cuda() |
|
else: |
|
fn = lambda x: x[start:stop] |
|
return pytree_map(batch, fn=fn, process_types=torch.Tensor) |
|
|
|
|
|
def type_detail(obj): |
|
return pytree_map(obj, lambda x: type(x), map_all=True) |
|
|
|
|
|
def pytree_filter(fn, obj, process_types): |
|
if obj is None: |
|
return None |
|
|
|
filters = [] |
|
|
|
def condition_append(obj): |
|
if fn(obj): |
|
filters.append(obj) |
|
|
|
pytree_map(obj, fn=condition_append, process_types=process_types) |
|
return filters |
|
|
|
|
|
def get_real_args_kwargs(args_or_kwargs): |
|
args_or_kwargs = pytree_map(args_or_kwargs, fn=lambda x: x.wait(), process_types=Future) |
|
# TODO : combine producer and consumer |
|
# by default, merge all args in the output args or kwargs |
|
if args_or_kwargs is not None: |
|
if isinstance(args_or_kwargs, dict): |
|
pass |
|
else: |
|
flatten_args = [] |
|
pytree_map(args_or_kwargs, fn=lambda x: flatten_args.append(x), map_all=True) |
|
args_or_kwargs = flatten_args |
|
|
|
return args_or_kwargs |
|
|
|
|
|
def run_worker(rank, args, master_func): |
|
os.environ['MASTER_ADDR'] = args.master_addr |
|
os.environ['MASTER_PORT'] = args.master_port |
|
|
|
device = args.device |
|
world_size = args.world_size |
|
dp_degree = args.dp_degree |
|
tp_degree = args.tp_degree |
|
num_worker_threads = args.num_worker_threads |
|
host = args.master_addr |
|
port = args.master_port |
|
backend = 'nccl' if device == 'cuda' else 'gloo' |
|
|
|
launch(dict(), rank, world_size, host, int(port), backend, verbose=False) |
|
ppg.set_global_info(rank=rank, |
|
world_size=world_size, |
|
dp_degree=dp_degree, |
|
tp_degree=tp_degree, |
|
num_worker_threads=num_worker_threads, |
|
device=device) |
|
ppg.args = args |
|
# in rpc mode, only rank 0 is needed to be coded |
|
if rank == 0: |
|
master_func(args) |
|
# barrier here |
|
if _is_current_rpc_agent_set(): |
|
rpc.shutdown() |
|
else: |
|
warnings.warn("RPC has not been initialized") |
|
|
|
|
|
def rpc_run(args, master_func): |
|
world_size = args.world_size |
|
mp.spawn(run_worker, args=(args, master_func), nprocs=world_size) |
|
|
|
|
|
def parse_args(): |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument('--epoch', type=int, default=1) |
|
parser.add_argument('--world_size', type=int, default=2) |
|
parser.add_argument('--batch_size', type=int, default=16) |
|
parser.add_argument('--dp_degree', type=int, default=1) |
|
parser.add_argument('--tp_degree', type=int, default=1) |
|
parser.add_argument('--num_microbatches', type=int, default=2) |
|
parser.add_argument('--chunk', type=int, default=1) |
|
parser.add_argument('--use_checkpoint', action='store_true') |
|
parser.add_argument('--optimizer', type=str, choices=['SGD', 'Adam', 'RMSprop'], default='SGD') |
|
parser.add_argument('--device', type=str, choices=['cpu', 'cuda'], default='cuda') |
|
parser.add_argument('--master_addr', type=str, default='localhost') |
|
parser.add_argument('--master_port', type=str, default='29020') |
|
parser.add_argument('--num_worker_threads', type=int, default=128) |
|
return parser.parse_args()
|
|
|