import torch from colossalai.legacy.context import ParallelMode from colossalai.legacy.core import global_context as gpc from colossalai.legacy.nn.layer.utils import CheckpointModule from colossalai.tensor import ColoParameter from colossalai.utils.model.utils import InsertPostInitMethodToModuleSubClasses from .layer_spec import LayerSpec from .utils import ( build_kwargs_for_module, call_module, customized_partition, exec_funcs_with_kwargs, partition_balanced, partition_uniform, ) class PipelinableContext(InsertPostInitMethodToModuleSubClasses): """ A context manager to split the model into pipeline stages. """ def __init__(self, policy: str = "balanced"): super().__init__() self._layer_spec_dict = {} self._root_children = None self._model = None self._layer_spec_list = [] self._func_dict = {} self._policy = policy @property def policy(self): return self._policy @policy.setter def policy(self, policy: str): self._policy = policy @property def layers_count(self): return len(self._layer_spec_list) @property def funcs_count(self): return len(self._func_dict) def _pre_context_exec(self): """ The Callback function when entering the context """ # reserve rng states self.cpu_rng_state = torch.get_rng_state() self.cuda_rng_state = torch.cuda.get_rng_state() def _post_context_exec(self): """ The callback function when exiting context. """ # reset rng states torch.set_rng_state(self.cpu_rng_state) torch.cuda.set_rng_state(self.cuda_rng_state) def _post_init_method(self, module: torch.nn.Module, *args, **kwargs): """ The function to call at the end of the constructor of each module. NOTE() The module may be passed to this function multiple times. """ # iterate over the positional arguments # to check if an argument is a torch Module # if found any torch Module, replace it with its layer spec # for storage purpose modified_args = [] for arg in args: if isinstance(arg, torch.nn.Module): # if nn.Module is an argument of a non-root module, then we should convert it to layer spec, which make sure the correct init method used in the real build. # if nn.Module is an argument of the root module, then we should just record the module instance itself, because those instance has been built outside of the context. if id(arg) in self._layer_spec_dict: arg = self._layer_spec_dict[id(arg)] modified_args.append(arg) # to the same for the keyword arguments modified_kwargs = {} for k, v in kwargs.items(): if isinstance(v, torch.nn.Module): v = self._layer_spec_dict[id(v)] # (lyl)TODO: analyze ColoTensor as well modified_kwargs[k] = v # keep track of the module children # as torch.nn.Module.__init__ is called from inner module to outer module, # the final value of self._model will be the outermost model # e.g. if the model is torchvision.models.resnet18, then the final value of self._model # will be the ``ResNet`` object. self._root_children = list(module.children()) self._model = module # store the children to keep the module hierarchy layer_spec = LayerSpec(module.__class__, *modified_args, **modified_kwargs) layer_spec.set_children(module.children()) # store the layer spec in this context module_id = id(module) self._layer_spec_dict[module_id] = layer_spec # convert all torch.nn.Parameter to colossalai.tensor.ColoParameter name_list = [] for name, param in module.named_parameters(): if isinstance(param, ColoParameter): continue name_list.append((name, param)) for name, param in name_list: if hasattr(module, name): delattr(module, name) setattr(module, name, ColoParameter.from_torch_tensor(tensor=param.data, requires_grad=param.requires_grad)) def to_layer_list(self, exec_seq=None): """ Create a layer spec list and func list with execution sequence given by user. If exec_seq is None, we will take the module initializing order as execution order. """ self._exec_seq = exec_seq if exec_seq is None: # if user do not provide the model executing sequence, we use the initialization order as the executing order. children_name = [] for child in self._root_children: layer_spec = self._layer_spec_dict[id(child)] if layer_spec.typename in ( torch.nn.modules.container.ModuleList, torch.nn.modules.container.Sequential, ): for child_in_container in layer_spec.children: self._layer_spec_list.append(self._layer_spec_dict[id(child_in_container)]) for name, module in self._model.named_modules(): if id(module) == id(child_in_container): children_name.append(name) break else: self._layer_spec_list.append(layer_spec) for name, module in self._model.named_modules(): if id(module) == id(child): children_name.append(name) break else: front_funcs_list = [] named_modules = dict(self._model.named_modules()) for index, element in enumerate(exec_seq): if isinstance(element, str): if element == "SPLIT_NODE": continue assert ( element in named_modules ), f"Found invalid module name {element}, please check if you spell the module name correctly." # get the layer spec based on the module ID module = named_modules[element] layer_spec = self._layer_spec_dict[id(module)] # check whether there are functions which should be executed before this module if len(front_funcs_list) != 0: func_key = (layer_spec, "front") if func_key not in self._func_dict: self._func_dict[func_key] = [] for f in front_funcs_list: self._func_dict[func_key].append(f) front_funcs_list = [] func_key = (layer_spec, "behind") self._layer_spec_list.append(layer_spec) elif isinstance(element, tuple) and element[1] == "front": front_funcs_list.append(element[0]) else: if func_key not in self._func_dict: self._func_dict[func_key] = [] if isinstance(element, tuple): self._func_dict[func_key].append(element[0]) else: self._func_dict[func_key].append(element) def partition(self, num_chunks, pipeline_size, rank): """ Partitioned model will be built respect to partition policy. The real module instance will be built in this method. """ if isinstance(self._policy, str): if self._policy == "uniform": parts = partition_uniform(len(self._layer_spec_list), pipeline_size, num_chunks)[rank] elif self._policy == "balanced": param_counts = [] for layer_spec in self._layer_spec_list: param_counts.append(layer_spec.count_params()) parts = partition_balanced(param_counts, pipeline_size, num_chunks)[rank] elif self._policy == "customized": assert ( self._exec_seq is not None ), f"An explicit exec_seq must be defined by user in customized policy mode." self.customized_parts = customized_partition(self._exec_seq) assert len(self.customized_parts) == gpc.get_world_size( ParallelMode.PIPELINE ), f"World size is {gpc.get_world_size(ParallelMode.PIPELINE)}, but the number of partitions is {len(self.customized_parts)}" parts = self.customized_parts[rank] else: raise ValueError("A string partition policy should be one of ['uniform', 'balanced', 'customized'].") elif isinstance(self._policy, dict): parts = self._policy[rank] else: raise ValueError("A partition policy should be either a string or a dictionary.") layers_to_build = [] for start, end in parts: layers_to_build += self._layer_spec_list[start:end] behind_func_dict_in_partition = {} front_func_dict_in_partition = {} module_list_in_partition = [] for layer in layers_to_build: module = layer.build() module_list_in_partition.append(module) if (layer, "front") in self._func_dict: front_func_dict_in_partition[id(module)] = self._func_dict[(layer, "front")] elif (layer, "behind") in self._func_dict: behind_func_dict_in_partition[id(module)] = self._func_dict[(layer, "behind")] module_list_in_partition = torch.nn.ModuleList(module_list_in_partition) pipeline_model = PipelinableModel( module_list_in_partition, front_func_dict_in_partition, behind_func_dict_in_partition ) return pipeline_model class PipelinableModel(torch.nn.Module): def __init__(self, module_list, front_func_dict, behind_func_dict): super().__init__() self._module_list = module_list self._front_func_dict = front_func_dict self._behind_func_dict = behind_func_dict def forward(self, *input_tensor, **kwargs): for module in self._module_list: if id(module) in self._front_func_dict: input_tensor = exec_funcs_with_kwargs(self._front_func_dict, id(module), input_tensor, kwargs) if isinstance(module, CheckpointModule): forward_func = module._forward else: forward_func = module.forward module_kwargs = build_kwargs_for_module(forward_func, input_tensor, kwargs) if input_tensor is None: input_tensor = call_module(module, kwargs=module_kwargs) elif isinstance(input_tensor, torch.Tensor): input_tensor = call_module(module, args=(input_tensor,), kwargs=module_kwargs) else: input_tensor = call_module(module, args=input_tensor, kwargs=module_kwargs) if id(module) in self._behind_func_dict: input_tensor = exec_funcs_with_kwargs(self._behind_func_dict, id(module), input_tensor, kwargs) return input_tensor