[fx] add balanced policy v2 (#1251)

* [CLI] add CLI launcher

* Revert "[CLI] add CLI launcher"

This reverts commit df7e6506d4.

* [fx] add balanced policy v2

* add unittest
pull/1325/head
YuliangLiu0306 2 years ago committed by GitHub
parent ca2d3f284f
commit e8acf55e8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -10,7 +10,9 @@ def pipe_split():
def balanced_split_pass(gm: torch.fx.GraphModule, pp_size: int):
# TODO(lyl): balanced policy V2, split module by node size(weight+bias+output)
"""
In balanced_split_pass, we split module by the size of parameters(weights+bias).
"""
mod_graph = gm.graph
total_param_amount = 0
for param in mod_graph.owning_module.parameters():
@ -39,6 +41,36 @@ def balanced_split_pass(gm: torch.fx.GraphModule, pp_size: int):
return gm
def balanced_split_pass_v2(gm: torch.fx.GraphModule, pp_size: int):
"""
In balanced_split_pass_v12, we split module by the size of nodes(weights+bias+outputs).
"""
mod_graph = gm.graph
# To use balanced_split_pass_v2, we need run meta_info_prop interpreter first.
# If nodes don't have meta info, this pass will fall back to normal balanced split pass.
check_node = list(mod_graph.nodes)[0]
if 'tensor_meta' not in check_node.meta:
return balanced_split_pass(gm, pp_size)
total_element_size = 0
for node in mod_graph.nodes:
total_element_size += node.node_size
partition_size = total_element_size // pp_size
accumulate_node_size = 0
for node in mod_graph.nodes:
if pp_size <= 1:
break
accumulate_node_size += node.node_size
if accumulate_node_size >= partition_size:
accumulate_node_size = 0
pp_size -= 1
with mod_graph.inserting_after(node):
split_node = mod_graph.create_node('call_function', pipe_split)
gm.recompile()
return gm
def uniform_split_pass(gm: torch.fx.GraphModule, pp_size: int):
mod_graph = gm.graph
valid_children_size = 0

@ -67,7 +67,6 @@ class MetaInfoProp(torch.fx.Interpreter):
def run_node(self, n: Node) -> Any:
result = super().run_node(n)
found_tensor = False
def extract_tensor_meta(obj):
@ -83,7 +82,25 @@ class MetaInfoProp(torch.fx.Interpreter):
n.meta['tensor_meta'] = meta
else:
n.meta['tensor_meta'] = TensorMetadata(None, None, False, None, 0)
# counting the total size of node outputs
total_node_size = 0
if isinstance(n.meta['tensor_meta'], TensorMetadata):
total_node_size += n.meta['tensor_meta'].numel
else:
for element in n.meta['tensor_meta']:
assert isinstance(
element, TensorMetadata
), f"``n.meta['tensor_meta']`` should be either TensorMetadata or a tuple of TensorMetadata."
total_node_size += element.numel
# counting the total size of parameters
total_param_size = 0
if n.op == 'call_module':
target_module = n.graph.owning_module.get_submodule(n.target)
for param in target_module.parameters():
total_param_size += param.numel()
total_node_size += total_param_size
n.node_size = total_node_size
n.meta['type'] = type(result)
return result

@ -4,7 +4,8 @@ import colossalai
import colossalai.nn as col_nn
from torch.fx import symbolic_trace
from colossalai.fx.passes.adding_split_node_pass import split_with_split_nodes_pass, balanced_split_pass, \
uniform_split_pass
uniform_split_pass, balanced_split_pass_v2
import pytest
MODEL_DIM = 16
@ -43,6 +44,7 @@ def test_pipeline_passes():
model = MLP(MODEL_DIM)
data = torch.rand(BATCH_SIZE, MODEL_DIM)
pipeline_pass_test_helper(model, data, balanced_split_pass)
pipeline_pass_test_helper(model, data, balanced_split_pass_v2)
pipeline_pass_test_helper(model, data, uniform_split_pass)

Loading…
Cancel
Save