diff --git a/colossalai/auto_parallel/solver/op_handler/node_handler.py b/colossalai/auto_parallel/solver/op_handler/node_handler.py index ba13591a6..e667049db 100644 --- a/colossalai/auto_parallel/solver/op_handler/node_handler.py +++ b/colossalai/auto_parallel/solver/op_handler/node_handler.py @@ -5,6 +5,7 @@ from colossalai.tensor.shape_consistency import ShapeConsistencyManager from typing import Dict, List, Union from ..sharding_strategy import ShardingStrategy_V2, StrategiesVector, OperationData, TrainCycleItem from ..strategy import StrategyGenerator_V2 +from .._utils import generate_resharding_costs class NodeHandler(ABC): @@ -52,19 +53,22 @@ class NodeHandler(ABC): # create data structrure to store costs if op_data not in resharding_costs: - resharding_costs[op_data] = {} + resharding_costs[node] = [] # for each sharding spec generated by the predecessor's node handler # compute the resharding cost to switch to the sharding spec generated # by the current node handler for prev_sharding_spec in prev_sharding_specs: - fwd_cost = shape_consistency_manager.shape_consistency(prev_sharding_spec, current_sharding_spec) - bwd_cost = shape_consistency_manager.shape_consistency(current_sharding_spec, prev_sharding_spec) - resharding_cost = TrainCycleItem(fwd=fwd_cost, bwd=bwd_cost, total=fwd_cost + bwd_cost) - resharding_costs[op_data][prev_sharding_spec] = resharding_cost + _, _, resharding_cost = shape_consistency_manager.shape_consistency(prev_sharding_spec, + current_sharding_spec) + resharding_cost = TrainCycleItem(fwd=resharding_cost["forward"], + bwd=resharding_cost["backward"], + total=resharding_cost["total"]) + resharding_costs[node].append(resharding_cost) strategy.resharding_costs = resharding_costs + return strategy - def register_strategy(self, compute_resharding_cost: bool = False) -> StrategiesVector: + def register_strategy(self, compute_resharding_cost: bool = True) -> StrategiesVector: """ Register different sharding strategies for the current node. """ @@ -86,7 +90,8 @@ class NodeHandler(ABC): # compute the resharding costs based on the previous node # strategies if specified if compute_resharding_cost: - post_processed_strategies = list(map(self.update_resharding_cost, post_processed_strategies)) + updated_strategies = map(self.update_resharding_cost, strategies) + strategies = list(updated_strategies) self.strategies_vector.extend(post_processed_strategies) diff --git a/colossalai/auto_parallel/solver/op_handler/output_handler.py b/colossalai/auto_parallel/solver/op_handler/output_handler.py new file mode 100644 index 000000000..55ff2f843 --- /dev/null +++ b/colossalai/auto_parallel/solver/op_handler/output_handler.py @@ -0,0 +1,39 @@ +import torch +from .node_handler import NodeHandler +from ..sharding_strategy import ShardingStrategy_V2, OperationDataType, OperationData, StrategiesVector +from colossalai.auto_parallel.solver.strategy import StrategyGenerator_V2 +from colossalai.auto_parallel.solver.strategy.output_generator import OutputGenerator +from typing import List, Dict +from .registry import operator_registry + +__all__ = ['OuputHandler'] + + +class OuputHandler(NodeHandler): + """ + A OuputHandler which deals with the sharding strategies for Output Node. + """ + + def get_strategy_generator(self) -> List[StrategyGenerator_V2]: + op_data_mapping = self.get_operation_data_mapping() + generators = [] + generators.append(OutputGenerator(op_data_mapping, self.device_mesh, self.predecessor_node)) + return generators + + def get_operation_data_mapping(self) -> Dict[str, OperationData]: + # use transposed shape for strategies + # the strategies will be transformed back to its original shape in self.post_process + dummy_output = torch.empty(1,).to("meta") + physical_output = OperationData(name=str(self.node), type=OperationDataType.OUTPUT, data=dummy_output) + + mapping = {"output": physical_output} + for index, input_node in enumerate(self.predecessor_node): + if not hasattr(input_node, "_meta_data"): + print(input_node.name) + physical_inputs = OperationData(name=str(input_node), + type=OperationDataType.ARG, + data=input_node._meta_data) + name_key = f'input_{index}' + mapping[name_key] = physical_inputs + + return mapping diff --git a/colossalai/auto_parallel/solver/op_handler/placeholder_handler.py b/colossalai/auto_parallel/solver/op_handler/placeholder_handler.py new file mode 100644 index 000000000..4c4f0a83a --- /dev/null +++ b/colossalai/auto_parallel/solver/op_handler/placeholder_handler.py @@ -0,0 +1,30 @@ +import torch +from .node_handler import NodeHandler +from ..sharding_strategy import ShardingStrategy_V2, OperationDataType, OperationData +from colossalai.auto_parallel.solver.strategy import StrategyGenerator_V2 +from colossalai.auto_parallel.solver.strategy.placeholder_generator import PlaceholderGenerator +from typing import List, Dict +from .registry import operator_registry + +__all__ = ['PlacehodlerHandler'] + + +class PlacehodlerHandler(NodeHandler): + """ + A PlacehodlerHandler which deals with the sharding strategies for Placeholder Node. + """ + + def get_strategy_generator(self) -> List[StrategyGenerator_V2]: + op_data_mapping = self.get_operation_data_mapping() + generators = [] + generators.append(PlaceholderGenerator(op_data_mapping, self.device_mesh)) + return generators + + def get_operation_data_mapping(self) -> Dict[str, OperationData]: + # use transposed shape for strategies + # the strategies will be transformed back to its original shape in self.post_process + physical_output = OperationData(name=str(self.node), type=OperationDataType.OUTPUT, data=self.node._meta_data) + + mapping = {"output": physical_output} + + return mapping diff --git a/colossalai/auto_parallel/solver/sharding_strategy.py b/colossalai/auto_parallel/solver/sharding_strategy.py index 64e0ea779..990710aef 100644 --- a/colossalai/auto_parallel/solver/sharding_strategy.py +++ b/colossalai/auto_parallel/solver/sharding_strategy.py @@ -129,7 +129,7 @@ class ShardingStrategy_V2: communication_cost: TrainCycleItem = None memory_cost: TrainCycleItem = None communication_actions: Dict[OperationData, CommSpec] = None - resharding_costs: Dict[OperationData, Dict[ShardingSpec, TrainCycleItem]] = None + resharding_costs: Dict[Node, List[TrainCycleItem]] = None @property def input_sharding_specs(self) -> Dict[OperationData, ShardingSpec]: diff --git a/colossalai/auto_parallel/solver/strategy/output_generator.py b/colossalai/auto_parallel/solver/strategy/output_generator.py new file mode 100644 index 000000000..a6cde6b64 --- /dev/null +++ b/colossalai/auto_parallel/solver/strategy/output_generator.py @@ -0,0 +1,59 @@ +import operator +from functools import reduce +from ..sharding_strategy import ShardingStrategy_V2, TrainCycleItem, MemoryCost +from colossalai.tensor.shape_consistency import CollectiveCommPattern +from .strategy_generator import OutputStrategyGenerator +from typing import List +from .._utils import exception_handler +import copy + +__all__ = ['OutputGenerator'] + + +class OutputGenerator(OutputStrategyGenerator): + """ + OutputGenerator is a generic class to generate strategies for Output Node. + """ + + def validate(self) -> bool: + return super().validate() + + def update_compute_cost(self, strategy: ShardingStrategy_V2): + compute_cost = TrainCycleItem(fwd=10, bwd=10, total=20) + strategy.compute_cost = compute_cost + + def update_memory_cost(self, strategy: ShardingStrategy_V2): + ''' + Compute the memory cost per device with this specific strategy. + ''' + fwd_mem_cost = MemoryCost(activation=0, parameter=0) + + bwd_mem_cost = MemoryCost(activation=0, parameter=0) + + # compute total cost + total_mem_cost = MemoryCost(activation=0, parameter=0) + memory_cost = TrainCycleItem(fwd=fwd_mem_cost, bwd=bwd_mem_cost, total=total_mem_cost) + strategy.memory_cost = memory_cost + + def generate(self): + dim_partition_dict_mapping = { + "output": {}, + } + for index, _ in enumerate(self.predecessor_nodes): + mapping_name = f"input_{index}" + dim_partition_dict_mapping[mapping_name] = {} + + communication_action_mapping = {} + sharding_spec_mapping = self.to_sharding_spec_mapping(dim_partition_dict_mapping) + + name = f'Replica Output' + + strategy = self.get_sharding_strategy(name=name, + sharding_spec_mapping=sharding_spec_mapping, + communication_action_mapping=communication_action_mapping) + + self.update_communication_cost(strategy) + self.update_compute_cost(strategy) + self.update_memory_cost(strategy) + + return [strategy] diff --git a/colossalai/auto_parallel/solver/strategy/placeholder_generator.py b/colossalai/auto_parallel/solver/strategy/placeholder_generator.py new file mode 100644 index 000000000..b5c65e615 --- /dev/null +++ b/colossalai/auto_parallel/solver/strategy/placeholder_generator.py @@ -0,0 +1,60 @@ +import operator +from functools import reduce +from ..sharding_strategy import ShardingStrategy_V2, TrainCycleItem, MemoryCost +from colossalai.tensor.shape_consistency import CollectiveCommPattern +from .strategy_generator import StrategyGenerator_V2 +from typing import List +from .._utils import exception_handler +import copy + +__all__ = ['PlaceholderGenerator'] + + +class PlaceholderGenerator(StrategyGenerator_V2): + """ + PlaceholderGenerator is a generic class to generate strategies for placeholder node. + """ + + def validate(self) -> bool: + return super().validate() + + def update_compute_cost(self, strategy: ShardingStrategy_V2): + compute_cost = TrainCycleItem(fwd=10, bwd=10, total=20) + strategy.compute_cost = compute_cost + + def update_memory_cost(self, strategy: ShardingStrategy_V2): + ''' + Compute the memory cost per device with this specific strategy. + ''' + forward_size_mapping = {'output': self._compute_size_in_bytes(strategy, "output")} + + # compute fwd cost incurred + # fwd_cost = output + fwd_activation_cost = sum([v for k, v in forward_size_mapping.items()]) + fwd_mem_cost = MemoryCost(activation=fwd_activation_cost, parameter=0) + + bwd_mem_cost = MemoryCost(activation=0, parameter=0) + + # compute total cost + total_mem_cost = MemoryCost(activation=fwd_activation_cost, parameter=0) + memory_cost = TrainCycleItem(fwd=fwd_mem_cost, bwd=bwd_mem_cost, total=total_mem_cost) + strategy.memory_cost = memory_cost + + def generate(self): + dim_partition_dict_mapping = { + "output": {}, + } + communication_action_mapping = {} + sharding_spec_mapping = self.to_sharding_spec_mapping(dim_partition_dict_mapping) + + name = f'Replica Placeholder' + + strategy = self.get_sharding_strategy(name=name, + sharding_spec_mapping=sharding_spec_mapping, + communication_action_mapping=communication_action_mapping) + + self.update_communication_cost(strategy) + self.update_compute_cost(strategy) + self.update_memory_cost(strategy) + + return [strategy] diff --git a/colossalai/auto_parallel/solver/strategy/strategy_generator.py b/colossalai/auto_parallel/solver/strategy/strategy_generator.py index 5bf3a8327..ec7d96298 100644 --- a/colossalai/auto_parallel/solver/strategy/strategy_generator.py +++ b/colossalai/auto_parallel/solver/strategy/strategy_generator.py @@ -169,3 +169,15 @@ class FollowingStrategyGenerator(StrategyGenerator_V2): self.op_data = operation_data_mapping self.device_mesh = device_mesh self.predecessor_node = predecessor_node + + +class OutputStrategyGenerator(StrategyGenerator_V2): + """ + OutputStrategyGenerator is used to generate the sharding strategies for Output Node. + """ + + def __init__(self, operation_data_mapping: Dict[str, OperationData], device_mesh: DeviceMesh, + predecessor_nodes: List[Node]): + self.op_data = operation_data_mapping + self.device_mesh = device_mesh + self.predecessor_nodes = predecessor_nodes diff --git a/tests/test_auto_parallel/test_node_handler/test_batch_norm_handler_v2.py b/tests/test_auto_parallel/test_node_handler/test_batch_norm_handler_v2.py index 8b33431de..e0d98c758 100644 --- a/tests/test_auto_parallel/test_node_handler/test_batch_norm_handler_v2.py +++ b/tests/test_auto_parallel/test_node_handler/test_batch_norm_handler_v2.py @@ -58,7 +58,7 @@ def test_bn_module_handler(): assert mapping['output'].data.shape == torch.Size([4, 16, 64, 64]) assert mapping['output'].type == OperationDataType.OUTPUT - strategies_vector = handler.register_strategy() + strategies_vector = handler.register_strategy(compute_resharding_cost=False) strategy_name_list = [val.name for val in strategies_vector] # RS = RS x S diff --git a/tests/test_auto_parallel/test_node_handler/test_bmm_handler.py b/tests/test_auto_parallel/test_node_handler/test_bmm_handler.py index 75988e5b8..dfe50a0e7 100644 --- a/tests/test_auto_parallel/test_node_handler/test_bmm_handler.py +++ b/tests/test_auto_parallel/test_node_handler/test_bmm_handler.py @@ -68,7 +68,7 @@ def test_2d_device_mesh(module): assert mapping['output'].data.shape == torch.Size([4, 8, 8]) assert mapping['output'].type == OperationDataType.OUTPUT - strategies_vector = handler.register_strategy() + strategies_vector = handler.register_strategy(compute_resharding_cost=False) strategy_name_list = [val.name for val in strategies_vector] # one batch dim @@ -138,7 +138,7 @@ def test_1d_device_mesh(module): assert mapping['output'].data.shape == torch.Size([4, 8, 8]) assert mapping['output'].type == OperationDataType.OUTPUT - strategies_vector = handler.register_strategy() + strategies_vector = handler.register_strategy(compute_resharding_cost=False) strategy_name_list = [val.name for val in strategies_vector] assert len(strategy_name_list) == 1 # one batch dim diff --git a/tests/test_auto_parallel/test_node_handler/test_conv_handler_v2.py b/tests/test_auto_parallel/test_node_handler/test_conv_handler_v2.py index 8fb21e91d..56bae372a 100644 --- a/tests/test_auto_parallel/test_node_handler/test_conv_handler_v2.py +++ b/tests/test_auto_parallel/test_node_handler/test_conv_handler_v2.py @@ -58,7 +58,7 @@ def test_conv_module_handler(): assert mapping['output'].data.shape == torch.Size([4, 16, 64, 64]) assert mapping['output'].type == OperationDataType.OUTPUT - strategies_vector = handler.register_strategy() + strategies_vector = handler.register_strategy(compute_resharding_cost=False) strategy_name_list = [val.name for val in strategies_vector] # SS = SR x RS @@ -165,7 +165,7 @@ def test_conv_function_handler(): assert mapping['output'].data.shape == torch.Size([4, 16, 64, 64]) assert mapping['output'].type == OperationDataType.OUTPUT - handler.register_strategy() + handler.register_strategy(compute_resharding_cost=False) strategy_name_list = [val.name for val in strategies_vector] # SS = SR x RS diff --git a/tests/test_auto_parallel/test_node_handler/test_getitem_handler.py b/tests/test_auto_parallel/test_node_handler/test_getitem_handler.py index c4ef16fc1..08877eb25 100644 --- a/tests/test_auto_parallel/test_node_handler/test_getitem_handler.py +++ b/tests/test_auto_parallel/test_node_handler/test_getitem_handler.py @@ -47,13 +47,13 @@ def test_getitem_function_handler(): conv_handler = ConvFunctionHandler(node=conv_mod_node, device_mesh=device_mesh, strategies_vector=conv_strategies_vector) - conv_handler.register_strategy() + conv_handler.register_strategy(compute_resharding_cost=False) setattr(conv_mod_node, 'strategies_vector', conv_strategies_vector) getitem_handler = GetItemHandler(node=getitem_mod_node, device_mesh=device_mesh, strategies_vector=getitem_strategies_vector) - getitem_handler.register_strategy() + getitem_handler.register_strategy(compute_resharding_cost=False) # check operation data mapping mapping = getitem_handler.get_operation_data_mapping() diff --git a/tests/test_auto_parallel/test_node_handler/test_layer_norm_handler_v2.py b/tests/test_auto_parallel/test_node_handler/test_layer_norm_handler_v2.py index 628ee51ba..9bb7882bd 100644 --- a/tests/test_auto_parallel/test_node_handler/test_layer_norm_handler_v2.py +++ b/tests/test_auto_parallel/test_node_handler/test_layer_norm_handler_v2.py @@ -58,7 +58,7 @@ def test_ln_module_handler(): assert mapping['output'].data.shape == torch.Size([4, 16]) assert mapping['output'].type == OperationDataType.OUTPUT - strategies_vector = handler.register_strategy() + strategies_vector = handler.register_strategy(compute_resharding_cost=False) strategy_name_list = [val.name for val in strategies_vector] # SR = SR x R diff --git a/tests/test_auto_parallel/test_node_handler/test_linear_handler_v2.py b/tests/test_auto_parallel/test_node_handler/test_linear_handler_v2.py index 7ef8b9e68..059e661da 100644 --- a/tests/test_auto_parallel/test_node_handler/test_linear_handler_v2.py +++ b/tests/test_auto_parallel/test_node_handler/test_linear_handler_v2.py @@ -57,7 +57,7 @@ def test_linear_module_handler(): assert mapping['output'].type == OperationDataType.OUTPUT assert mapping['output'].logical_shape == torch.Size([16, 32]) - strategies_vector = handler.register_strategy() + strategies_vector = handler.register_strategy(compute_resharding_cost=False) strategy_name_list = [val.name for val in strategies_vector] # one strategy will be converted to different physical sharding spec assert len(strategy_name_list) > 8 @@ -138,7 +138,7 @@ def test_linear_function_handler(): assert mapping['output'].data.shape == torch.Size([4, 32]) assert mapping['output'].type == OperationDataType.OUTPUT - strategies_vector = handler.register_strategy() + strategies_vector = handler.register_strategy(compute_resharding_cost=False) strategy_name_list = [val.name for val in strategies_vector] # one strategy will be converted to different physical sharding spec assert len(strategy_name_list) > 8 diff --git a/tests/test_auto_parallel/test_node_handler/test_output_handler.py b/tests/test_auto_parallel/test_node_handler/test_output_handler.py new file mode 100644 index 000000000..48fd4d2c5 --- /dev/null +++ b/tests/test_auto_parallel/test_node_handler/test_output_handler.py @@ -0,0 +1,57 @@ +import torch +import torch.nn as nn +from colossalai.fx import ColoTracer, ColoGraphModule +from colossalai.auto_parallel.solver.op_handler.output_handler import OuputHandler +from colossalai.auto_parallel.solver.sharding_strategy import OperationData, OperationDataType, StrategiesVector +from colossalai.device.device_mesh import DeviceMesh + + +class OutputModel(nn.Module): + + def __init__(self): + super().__init__() + + def forward(self, x): + y = x * 2 + return x, y + + +def test_output_handler(): + model = OutputModel() + tracer = ColoTracer() + # graph(): + # %x : torch.Tensor [#users=2] = placeholder[target=x] + # %mul : [#users=1] = call_function[target=operator.mul](args = (%x, 2), kwargs = {}) + # return (x, mul) + graph = tracer.trace(model, meta_args={ + "x": torch.rand(4, 4, 64, 64).to('meta'), + }) + gm = ColoGraphModule(model, graph) + physical_mesh_id = torch.arange(0, 4) + + mesh_shape = (2, 2) + device_mesh = DeviceMesh(physical_mesh_id, mesh_shape) + output_node = list(graph.nodes)[2] + output_strategies_vector = StrategiesVector(output_node) + + # build handler + otuput_handler = OuputHandler(node=output_node, device_mesh=device_mesh, strategies_vector=output_strategies_vector) + + otuput_handler.register_strategy(compute_resharding_cost=False) + # check operation data mapping + mapping = otuput_handler.get_operation_data_mapping() + + for name, op_data in mapping.items(): + op_data: OperationData + # make sure they have valid values + assert op_data.data is not None + + assert mapping['output'].name == "output" + assert mapping['output'].data.is_meta + assert mapping['output'].type == OperationDataType.OUTPUT + strategy_name_list = [val.name for val in otuput_handler.strategies_vector] + assert "Replica Output" in strategy_name_list + + +if __name__ == '__main__': + test_output_handler() diff --git a/tests/test_auto_parallel/test_node_handler/test_placeholder_handler.py b/tests/test_auto_parallel/test_node_handler/test_placeholder_handler.py new file mode 100644 index 000000000..68f9aff14 --- /dev/null +++ b/tests/test_auto_parallel/test_node_handler/test_placeholder_handler.py @@ -0,0 +1,58 @@ +import torch +import torch.nn as nn +from colossalai.fx import ColoTracer, ColoGraphModule +from colossalai.auto_parallel.solver.op_handler.placeholder_handler import PlacehodlerHandler +from colossalai.auto_parallel.solver.sharding_strategy import OperationData, OperationDataType, StrategiesVector +from colossalai.device.device_mesh import DeviceMesh + + +class PlaceholderModel(nn.Module): + + def __init__(self): + super().__init__() + + def forward(self, input): + return input + + +def test_placeholder_handler(): + model = PlaceholderModel() + tracer = ColoTracer() + # graph(): + # %input_1 : torch.Tensor [#users=1] = placeholder[target=input] + # return input_1 + graph = tracer.trace(model, meta_args={ + "input": torch.rand(4, 4, 64, 64).to('meta'), + }) + gm = ColoGraphModule(model, graph) + physical_mesh_id = torch.arange(0, 4) + + mesh_shape = (2, 2) + device_mesh = DeviceMesh(physical_mesh_id, mesh_shape) + placeholder_node = list(graph.nodes)[0] + placeholder_strategies_vector = StrategiesVector(placeholder_node) + + # build handler + placeholder_handler = PlacehodlerHandler(node=placeholder_node, + device_mesh=device_mesh, + strategies_vector=placeholder_strategies_vector) + + placeholder_handler.register_strategy(compute_resharding_cost=False) + # check operation data mapping + mapping = placeholder_handler.get_operation_data_mapping() + + for name, op_data in mapping.items(): + op_data: OperationData + # make sure they have valid values + assert op_data.data is not None + + assert mapping['output'].name == "input_1" + assert mapping['output'].data.is_meta + assert mapping['output'].data.shape == torch.Size((4, 4, 64, 64)) + assert mapping['output'].type == OperationDataType.OUTPUT + strategy_name_list = [val.name for val in placeholder_handler.strategies_vector] + assert "Replica Placeholder" in strategy_name_list + + +if __name__ == '__main__': + test_placeholder_handler() diff --git a/tests/test_auto_parallel/test_node_handler/test_reshape_handler_v2.py b/tests/test_auto_parallel/test_node_handler/test_reshape_handler_v2.py index e9a77b65b..758337ef0 100644 --- a/tests/test_auto_parallel/test_node_handler/test_reshape_handler_v2.py +++ b/tests/test_auto_parallel/test_node_handler/test_reshape_handler_v2.py @@ -46,13 +46,13 @@ def test_reshape_handler(): conv_handler = ConvFunctionHandler(node=conv_mod_node, device_mesh=device_mesh, strategies_vector=conv_strategies_vector) - conv_handler.register_strategy() + conv_handler.register_strategy(compute_resharding_cost=False) setattr(conv_mod_node, 'strategies_vector', conv_strategies_vector) reshape_handler = ReshapeHandler(node=reshape_node, device_mesh=device_mesh, strategies_vector=reshape_strategies_vector) - reshape_handler.register_strategy() + reshape_handler.register_strategy(compute_resharding_cost=False) # check operation data mapping mapping = reshape_handler.get_operation_data_mapping() diff --git a/tests/test_auto_parallel/test_node_handler/test_unary_element_wise_handler_v2.py b/tests/test_auto_parallel/test_node_handler/test_unary_element_wise_handler_v2.py index 10516f81f..62265f6cb 100644 --- a/tests/test_auto_parallel/test_node_handler/test_unary_element_wise_handler_v2.py +++ b/tests/test_auto_parallel/test_node_handler/test_unary_element_wise_handler_v2.py @@ -48,13 +48,13 @@ def test_elementwise_handler(): conv_handler = ConvFunctionHandler(node=conv_mod_node, device_mesh=device_mesh, strategies_vector=conv_strategies_vector) - conv_handler.register_strategy() + conv_handler.register_strategy(compute_resharding_cost=False) setattr(conv_mod_node, 'strategies_vector', conv_strategies_vector) relu_handler = UnaryElementwiseHandler(node=relu_mod_node, device_mesh=device_mesh, strategies_vector=relu_strategies_vector) - relu_handler.register_strategy() + relu_handler.register_strategy(compute_resharding_cost=False) # check operation data mapping mapping = relu_handler.get_operation_data_mapping() diff --git a/tests/test_auto_parallel/test_node_handler/test_where_handler_v2.py b/tests/test_auto_parallel/test_node_handler/test_where_handler_v2.py index eee32a9ea..8e039472f 100644 --- a/tests/test_auto_parallel/test_node_handler/test_where_handler_v2.py +++ b/tests/test_auto_parallel/test_node_handler/test_where_handler_v2.py @@ -75,7 +75,7 @@ def test_where_handler(): assert mapping['output'].data.shape == torch.Size([4, 4, 64, 64]) assert mapping['output'].type == OperationDataType.OUTPUT - handler.register_strategy() + handler.register_strategy(compute_resharding_cost=False) strategy_name_list = [val.name for val in strategies_vector] # 4*3 + 4*3/2*2 + 1 assert len(strategy_name_list) == 25 diff --git a/tests/test_auto_parallel/test_solver_with_resnet.py b/tests/test_auto_parallel/test_solver_with_resnet.py deleted file mode 100644 index a46ceb700..000000000 --- a/tests/test_auto_parallel/test_solver_with_resnet.py +++ /dev/null @@ -1,121 +0,0 @@ -import torch -from torch.fx import GraphModule -import torch.nn as nn -import pytest - -from colossalai.fx.tracer.tracer import ColoTracer -from colossalai.auto_parallel.solver.sharding_strategy import ShardingStrategy, StrategiesVector -from colossalai.tensor.shape_consistency import ShapeConsistencyManager -from colossalai.device.device_mesh import DeviceMesh -from colossalai.auto_parallel.solver.strategies_constructor import StrategiesConstructor -from colossalai.auto_parallel.solver.cost_graph import CostGraph -from copy import deepcopy -from colossalai.auto_parallel.solver import Solver -from torchvision.models import resnet34, resnet50 -from colossalai.auto_parallel.solver.constants import * -from colossalai.auto_parallel.solver.graph_analysis import GraphAnalyser -from colossalai.auto_parallel.solver.options import SolverOptions - - -class ConvModel(nn.Module): - - def __init__(self, c_in, c_out): - super().__init__() - self.conv1 = nn.Conv2d(c_in, c_out, kernel_size=3) - self.conv2 = nn.Conv2d(c_out, c_out, kernel_size=3) - self.conv3 = nn.Conv2d(c_out, c_out, kernel_size=3) - self.relu = nn.ReLU() - - def forward(self, x): - x = x * 2 - x = self.conv1(x) - x = self.conv2(x) - x = x / 2 - x = self.conv3(x) - x = self.relu(x) - return x - - -@pytest.mark.skip("for higher testing speed") -def test_cost_graph(): - physical_mesh_id = torch.arange(0, 8) - mesh_shape = (2, 4) - # [[0, 1] - # [2, 3]] - device_mesh = DeviceMesh(physical_mesh_id, mesh_shape) - shape_consistency_manager = ShapeConsistencyManager() - - tracer = ColoTracer() - # model = ConvModel(16, 32) - # input_sample = {'x': torch.rand(4, 16, 64, 64).to('meta')} - model = resnet50(num_classes=100000) - input_sample = {'x': torch.rand(128, 3, 224, 224).to('meta')} - - graph = tracer.trace(root=model, meta_args=input_sample) - # graph(): - # %x : torch.Tensor [#users=1] = placeholder[target=x] - # %conv1 : [#users=1] = call_module[target=conv1](args = (%x,), kwargs = {}) - # %bn1 : [#users=1] = call_module[target=bn1](args = (%conv1,), kwargs = {}) - # %relu : [#users=1] = call_module[target=relu](args = (%bn1,), kwargs = {}) - # %maxpool : [#users=2] = call_module[target=maxpool](args = (%relu,), kwargs = {}) - # %layer1_0_conv1 : [#users=1] = call_module[target=layer1.0.conv1](args = (%maxpool,), kwargs = {}) - # %layer1_0_bn1 : [#users=1] = call_module[target=layer1.0.bn1](args = (%layer1_0_conv1,), kwargs = {}) - # %layer1_0_relu : [#users=1] = call_module[target=layer1.0.relu](args = (%layer1_0_bn1,), kwargs = {}) - # %layer1_0_conv2 : [#users=1] = call_module[target=layer1.0.conv2](args = (%layer1_0_relu,), kwargs = {}) - # %layer1_0_bn2 : [#users=1] = call_module[target=layer1.0.bn2](args = (%layer1_0_conv2,), kwargs = {}) - # %add : [#users=1] = call_function[target=operator.add](args = (%layer1_0_bn2, %maxpool), kwargs = {}) - # %layer1_0_relu_1 : [#users=2] = call_module[target=layer1.0.relu](args = (%add,), kwargs = {}) - # %layer1_1_conv1 : [#users=1] = call_module[target=layer1.1.conv1](args = (%layer1_0_relu_1,), kwargs = {}) - # %layer1_1_bn1 : [#users=1] = call_module[target=layer1.1.bn1](args = (%layer1_1_conv1,), kwargs = {}) - # %layer1_1_relu : [#users=1] = call_module[target=layer1.1.relu](args = (%layer1_1_bn1,), kwargs = {}) - # %layer1_1_conv2 : [#users=1] = call_module[target=layer1.1.conv2](args = (%layer1_1_relu,), kwargs = {}) - # %layer1_1_bn2 : [#users=1] = call_module[target=layer1.1.bn2](args = (%layer1_1_conv2,), kwargs = {}) - # %add_1 : [#users=1] = call_function[target=operator.add](args = (%layer1_1_bn2, %layer1_0_relu_1), kwargs = {}) - # ... - # %avgpool : [#users=1] = call_module[target=avgpool](args = (%layer4_2_relu_1,), kwargs = {}) - # %flatten : [#users=1] = call_function[target=torch.flatten](args = (%avgpool, 1), kwargs = {}) - # %fc : [#users=1] = call_module[target=fc](args = (%flatten,), kwargs = {}) - # return fc - gm = GraphModule(model, graph, model.__class__.__name__) - gm.recompile() - graph_analyser = GraphAnalyser(gm) - liveness_list = graph_analyser.liveness_analysis() - solver_options = SolverOptions(fast=True) - strategies_constructor = StrategiesConstructor(graph, device_mesh, solver_options) - strategies_constructor.build_strategies_and_cost() - - cost_graph = CostGraph(strategies_constructor.leaf_strategies) - cost_graph.simplify_graph() - solver = Solver(gm.graph, strategies_constructor, cost_graph, graph_analyser) - - ret = solver.call_solver_serialized_args() - print(ret[0]) - solver._recover_merged_node_strategy() - print(solver.last_s_val) - strategies_list = solver.last_s_val - - computation_cost = 0 - communication_cost = 0 - communication_cost_bn = 0 - memory_cost = 0 - for index, node in enumerate(graph.nodes): - if node.op == 'call_module': - submod = node.graph.owning_module.get_submodule(node.target) - if type(submod) in BATCHNORM_MODULE_OP: - communication_cost_bn += node.strategies_vector[strategies_list[index]].communication_cost - print(node.name, node.strategies_vector[strategies_list[index]].name) - computation_cost += node.strategies_vector[strategies_list[index]].compute_cost - communication_cost += node.strategies_vector[strategies_list[index]].communication_cost - node_memory_cost = node.strategies_vector[strategies_list[index]].memory_cost - if isinstance(node_memory_cost, tuple): - node_memory_cost = node_memory_cost[0] - memory_cost += node_memory_cost - - print(f'computation cost is {computation_cost}') - print(f'communication cost is {communication_cost}') - print(f'memory cost is {memory_cost}') - print(f'bn communication cost is {communication_cost_bn}') - - -if __name__ == '__main__': - test_cost_graph()