import argparse import os import warnings import torch import torch.distributed.rpc as rpc import torch.multiprocessing as mp from torch import nn from torch._C._distributed_rpc import _is_current_rpc_agent_set from colossalai.legacy import launch from colossalai.legacy.pipeline.pipeline_process_group import ppg from colossalai.logging import disable_existing_loggers rpc_is_initialized = _is_current_rpc_agent_set def color_debug(text, prefix=" ", color="blue"): color = color.upper() print(getattr(Back, color), prefix, Style.RESET_ALL, text) class MLP(nn.Module): def __init__(self, dim: int, layers: int): super().__init__() self.layers = torch.nn.ModuleList() for _ in range(layers): self.layers.append(nn.Linear(dim, dim, bias=False)) def forward(self, x): for layer in self.layers: x = layer(x) return x.sum() class DAG_MLP(nn.Module): def __init__(self, dim: int, layers: int): super().__init__() self.layers = torch.nn.ModuleList() self.dag_layer = nn.Linear(dim, dim, bias=False) for _ in range(layers): self.layers.append(nn.Linear(dim, dim, bias=False)) def forward(self, x, y): for layer in self.layers: x = layer(x) y = self.dag_layer(y) return x.sum(), y.sum() class RpcTestModel(nn.Module): def __init__(self, stage_id, actual_stage_num, feat_num, h) -> None: super().__init__() self.rank = stage_id self.is_last_rank = stage_id == actual_stage_num - 1 self.linear_name = f"linear_{stage_id}" if stage_id == 0: linear = nn.Linear(feat_num, h) elif stage_id == actual_stage_num - 1: linear = nn.Linear(h, 1) else: linear = nn.Linear(h, h) setattr(self, self.linear_name, linear) def forward(self, x) -> torch.Tensor: linear: nn.Module = getattr(self, self.linear_name) out: torch.Tensor = linear(x) if self.is_last_rank: out = out.sum() return out def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--epoch", type=int, default=1) parser.add_argument("--world_size", type=int, default=2) parser.add_argument("--batch_size", type=int, default=16) parser.add_argument("--dp_degree", type=int, default=1) parser.add_argument("--tp_degree", type=int, default=1) parser.add_argument("--num_microbatches", type=int, default=2) parser.add_argument("--chunk", type=int, default=1) parser.add_argument("--use_checkpoint", action="store_true") parser.add_argument("--optimizer", type=str, choices=["SGD", "Adam", "RMSprop"], default="SGD") parser.add_argument("--device", type=str, choices=["cpu", "cuda"], default="cuda") parser.add_argument("--master_addr", type=str, default="localhost") parser.add_argument("--master_port", type=str, default="29020") parser.add_argument("--num_worker_threads", type=str, default=128) return parser.parse_args() def pg_parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--world_size", type=int, default=4) parser.add_argument("--dp_degree", type=int, default=2) parser.add_argument("--tp_degree", type=int, default=1) parser.add_argument("--chunk", type=int, default=1) parser.add_argument("--num_worker_threads", type=str, default=128) parser.add_argument("--device", type=str, choices=["cpu", "cuda"], default="cuda") parser.add_argument("--master_addr", type=str, default="localhost") parser.add_argument("--master_port", type=str, default="29020") return parser.parse_args() def run_worker(rank, args, master_func): os.environ["MASTER_ADDR"] = args.master_addr os.environ["MASTER_PORT"] = args.master_port device = args.device world_size = args.world_size dp_degree = args.dp_degree tp_degree = args.tp_degree num_worker_threads = args.num_worker_threads host = args.master_addr port = args.master_port backend = "nccl" if device == "cuda" else "gloo" disable_existing_loggers() launch(dict(), rank, world_size, host, int(port), backend, verbose=False) ppg.set_global_info( rank=rank, world_size=world_size, dp_degree=dp_degree, tp_degree=tp_degree, num_worker_threads=num_worker_threads, device=device, ) # in rpc mode, only rank 0 is needed to be coded if rank == 0: master_func(args) # barrier here if rpc_is_initialized(): rpc.shutdown() else: warnings.warn("RPC has not been initialized") def rpc_run(args, master_func): world_size = args.world_size assert args.num_microbatches >= args.world_size, "num_microbatches cannot be fewer than world_size!" mp.spawn(run_worker, args=(args, master_func), nprocs=world_size)