[misc] resolve code factor issues (#4433)

pull/4445/head
Hongxin Liu 2023-08-14 17:43:33 +08:00
parent 328a791d10
commit 172f7fa3cf
20 changed files with 31 additions and 205 deletions

View File

@ -139,7 +139,7 @@ class Booster:
loss (torch.Tensor): The loss to be backpropagated. loss (torch.Tensor): The loss to be backpropagated.
optimizer (Optimizer): The optimizer to be updated. optimizer (Optimizer): The optimizer to be updated.
""" """
# TODO: implement this method with plugin # TODO(frank lee): implement this method with plugin
optimizer.backward(loss) optimizer.backward(loss)
def execute_pipeline(self, def execute_pipeline(self,

View File

@ -29,8 +29,6 @@ class Randomizer:
_INDEX = 0 _INDEX = 0
def __init__(self, seed: int): def __init__(self, seed: int):
# TODO: remove colossalai.context.random
self.seed = seed self.seed = seed
# Handle CUDA rng state # Handle CUDA rng state

View File

@ -57,7 +57,7 @@ class BertPipelineForwards:
hidden_states: Optional[torch.FloatTensor] = None, # this is from the previous stage hidden_states: Optional[torch.FloatTensor] = None, # this is from the previous stage
stage_index: Optional[List[int]] = None, stage_index: Optional[List[int]] = None,
): ):
# TODO: add explaination of the output here. # TODO(jianghai): add explaination of the output here.
r""" r"""
encoder_hidden_states (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): encoder_hidden_states (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*):
Sequence of hidden-states at the output of the last layer of the encoder. Used in the cross-attention if Sequence of hidden-states at the output of the last layer of the encoder. Used in the cross-attention if
@ -113,7 +113,7 @@ class BertPipelineForwards:
batch_size, seq_length = input_shape batch_size, seq_length = input_shape
device = hidden_states.device device = hidden_states.device
# TODO: left the recording kv-value tensors as () or None type, this feature may be added in the future. # TODO(jianghai): left the recording kv-value tensors as () or None type, this feature may be added in the future.
if output_attentions: if output_attentions:
logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.') logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.')
output_attentions = False output_attentions = False
@ -272,7 +272,7 @@ class BertPipelineForwards:
logger = logging.get_logger(__name__) logger = logging.get_logger(__name__)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# TODO: left the recording kv-value tensors as () or None type, this feature may be added in the future. # TODO(jianghai) left the recording kv-value tensors as () or None type, this feature may be added in the future.
if output_attentions: if output_attentions:
logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.') logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.')
output_attentions = False output_attentions = False
@ -534,7 +534,7 @@ class BertPipelineForwards:
stage_index: Optional[List[int]] = None, stage_index: Optional[List[int]] = None,
**kwargs, **kwargs,
): ):
#-> Union[Tuple[torch.Tensor], NextSentencePredictorOutput]: # -> Union[Tuple[torch.Tensor], NextSentencePredictorOutput]:
r""" r"""
labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*): labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
Labels for computing the next sequence prediction (classification) loss. Input should be a sequence pair Labels for computing the next sequence prediction (classification) loss. Input should be a sequence pair

View File

@ -252,7 +252,7 @@ class BloomPipelineForwards:
# Add last hidden state # Add last hidden state
hidden_states = self.ln_f(hidden_states) hidden_states = self.ln_f(hidden_states)
# TODO: deal with all_hidden_states, all_self_attentions, presents # TODO(jianghai): deal with all_hidden_states, all_self_attentions, presents
if output_hidden_states: if output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states,) all_hidden_states = all_hidden_states + (hidden_states,)
@ -307,7 +307,7 @@ class BloomPipelineForwards:
raise ValueError(f"Got unexpected arguments: {deprecated_arguments}") raise ValueError(f"Got unexpected arguments: {deprecated_arguments}")
return_dict = return_dict if return_dict is not None else self.config.use_return_dict return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# TODO: left the recording kv-value tensors as () or None type, this feature may be added in the future. # TODO(jianghai): left the recording kv-value tensors as () or None type, this feature may be added in the future.
if output_attentions: if output_attentions:
logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.') logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.')
output_attentions = False output_attentions = False
@ -402,7 +402,7 @@ class BloomPipelineForwards:
return_dict = return_dict if return_dict is not None else self.config.use_return_dict return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# TODO: left the recording kv-value tensors as () or None type, this feature may be added in the future. # TODO(jianghai): left the recording kv-value tensors as () or None type, this feature may be added in the future.
if output_attentions: if output_attentions:
logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.') logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.')
output_attentions = False output_attentions = False
@ -431,7 +431,7 @@ class BloomPipelineForwards:
all_cross_attentions = None all_cross_attentions = None
if stage_manager.is_last_stage(): if stage_manager.is_last_stage():
batch_size = hidden_states.shape[0] batch_size = hidden_states.shape[0]
#update batch size # update batch size
hidden_states = transformer_outputs[0] hidden_states = transformer_outputs[0]
logits = self.score(hidden_states) logits = self.score(hidden_states)
@ -525,7 +525,7 @@ class BloomPipelineForwards:
return_dict = return_dict if return_dict is not None else self.config.use_return_dict return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# TODO: left the recording kv-value tensors as () or None type, this feature may be added in the future. # TODO(jianghai): left the recording kv-value tensors as () or None type, this feature may be added in the future.
if output_attentions: if output_attentions:
logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.') logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.')
output_attentions = False output_attentions = False
@ -611,7 +611,7 @@ class BloomPipelineForwards:
logger = logging.get_logger(__name__) logger = logging.get_logger(__name__)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# TODO: left the recording kv-value tensors as () or None type, this feature may be added in the future. # TODO(jianghai): left the recording kv-value tensors as () or None type, this feature may be added in the future.
if output_attentions: if output_attentions:
logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.') logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.')
output_attentions = False output_attentions = False

View File

@ -152,7 +152,7 @@ class ChatGLMPipelineForwards:
if output_hidden_states is not None else self.config.output_hidden_states) if output_hidden_states is not None else self.config.output_hidden_states)
use_cache = use_cache if use_cache is not None else self.config.use_cache use_cache = use_cache if use_cache is not None else self.config.use_cache
return_dict = return_dict if return_dict is not None else self.config.use_return_dict return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# TODO: left the recording kv-value tensors as () or None type, this feature may be added in the future. # TODO(jianghai): left the recording kv-value tensors as () or None type, this feature may be added in the future.
if past_key_values: if past_key_values:
logger.warning_once('Non-empty past_key_values is not supported for pipeline models at the moment.') logger.warning_once('Non-empty past_key_values is not supported for pipeline models at the moment.')
past_key_values = None past_key_values = None

View File

@ -57,7 +57,7 @@ class GPT2PipelineForwards:
logger = logging.get_logger(__name__) logger = logging.get_logger(__name__)
# Preprocess passed in arguments # Preprocess passed in arguments
# TODO: left the recording kv-value tensors as () or None type, this feature may be added in the future. # TODO(baizhou): left the recording kv-value tensors as () or None type, this feature may be added in the future.
if past_key_values: if past_key_values:
logger.warning_once('Non-empty past_key_values is not supported for pipeline models at the moment.') logger.warning_once('Non-empty past_key_values is not supported for pipeline models at the moment.')
past_key_values = None past_key_values = None

View File

@ -65,7 +65,7 @@ class LlamaPipelineForwards:
seq_length_with_past = seq_length seq_length_with_past = seq_length
past_key_values_length = 0 past_key_values_length = 0
# TODO: left the recording kv-value tensors as () or None type, this feature may be added in the future. # TODO(jianghai): left the recording kv-value tensors as () or None type, this feature may be added in the future.
if output_attentions: if output_attentions:
logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.') logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.')
output_attentions = False output_attentions = False
@ -216,7 +216,7 @@ class LlamaPipelineForwards:
if output_hidden_states is not None else self.config.output_hidden_states) if output_hidden_states is not None else self.config.output_hidden_states)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# TODO: left the recording kv-value tensors as () or None type, this feature may be added in the future. # TODO(jianghai): left the recording kv-value tensors as () or None type, this feature may be added in the future.
if output_attentions: if output_attentions:
logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.') logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.')
output_attentions = False output_attentions = False
@ -301,7 +301,7 @@ class LlamaPipelineForwards:
logger = logging.get_logger(__name__) logger = logging.get_logger(__name__)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# TODO: left the recording kv-value tensors as () or None type, this feature may be added in the future. # TODO(jianghai): left the recording kv-value tensors as () or None type, this feature may be added in the future.
if output_attentions: if output_attentions:
logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.') logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.')
output_attentions = False output_attentions = False

View File

@ -148,7 +148,7 @@ class OPTPipelineForwards:
"`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`...") "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`...")
use_cache = False use_cache = False
# TODO: left the recording kv-value tensors as () or None type, this feature may be added in the future. # TODO(baizhou): left the recording kv-value tensors as () or None type, this feature may be added in the future.
if past_key_values: if past_key_values:
logger.warning_once('Non-empty past_key_values is not supported for pipeline models at the moment.') logger.warning_once('Non-empty past_key_values is not supported for pipeline models at the moment.')
past_key_values = None past_key_values = None

View File

@ -50,7 +50,7 @@ class T5PipelineForwards:
logger = logging.get_logger(__name__) logger = logging.get_logger(__name__)
# TODO: left the recording kv-value tensors as () or None type, this feature may be added in the future. # TODO(baizhou): left the recording kv-value tensors as () or None type, this feature may be added in the future.
if past_key_values: if past_key_values:
logger.warning_once('Non-empty past_key_values is not supported for pipeline models at the moment.') logger.warning_once('Non-empty past_key_values is not supported for pipeline models at the moment.')
past_key_values = None past_key_values = None
@ -285,7 +285,7 @@ class T5PipelineForwards:
logger = logging.get_logger(__name__) logger = logging.get_logger(__name__)
# TODO: left the recording kv-value tensors as () or None type, this feature may be added in the future. # TODO(baizhou): left the recording kv-value tensors as () or None type, this feature may be added in the future.
if past_key_values: if past_key_values:
logger.warning_once('Non-empty past_key_values is not supported for pipeline models at the moment.') logger.warning_once('Non-empty past_key_values is not supported for pipeline models at the moment.')
past_key_values = None past_key_values = None
@ -422,7 +422,7 @@ class T5PipelineForwards:
logger = logging.get_logger(__name__) logger = logging.get_logger(__name__)
# TODO: left the recording kv-value tensors as () or None type, this feature may be added in the future. # TODO(baizhou): left the recording kv-value tensors as () or None type, this feature may be added in the future.
if past_key_values: if past_key_values:
logger.warning_once('Non-empty past_key_values is not supported for pipeline models at the moment.') logger.warning_once('Non-empty past_key_values is not supported for pipeline models at the moment.')
past_key_values = None past_key_values = None

View File

@ -96,7 +96,7 @@ def ViTModel_pipeline_forward(stage_manager: PipelineStageManager, stage_index:
if pixel_values is None: if pixel_values is None:
raise ValueError("You have to specify pixel_values") raise ValueError("You have to specify pixel_values")
# TODO: maybe have a cleaner way to cast the input (from `ImageProcessor` side?) # TODO(FoolPlayer): maybe have a cleaner way to cast the input (from `ImageProcessor` side?)
expected_dtype = self.embeddings.patch_embeddings.projection.weight.dtype expected_dtype = self.embeddings.patch_embeddings.projection.weight.dtype
if pixel_values.dtype != expected_dtype: if pixel_values.dtype != expected_dtype:
pixel_values = pixel_values.to(expected_dtype) pixel_values = pixel_values.to(expected_dtype)

View File

@ -29,7 +29,6 @@ class ShardConfig:
enable_flash_attention: bool = False enable_flash_attention: bool = False
enable_jit_fused: bool = False enable_jit_fused: bool = False
# TODO: add support for tensor parallel
# pipeline_parallel_size: int # pipeline_parallel_size: int
# data_parallel_size: int # data_parallel_size: int
# tensor_parallel_mode: Literal['1d', '2d', '2.5d', '3d'] # tensor_parallel_mode: Literal['1d', '2d', '2.5d', '3d']

View File

@ -15,7 +15,7 @@ def test_gpt():
for name, (model_fn, data_gen_fn, _, _, _) in sub_registry.items(): for name, (model_fn, data_gen_fn, _, _, _) in sub_registry.items():
model = model_fn() model = model_fn()
# TODO: support the following models # TODO(ver217): support the following models
# 1. GPT2DoubleHeadsModel # 1. GPT2DoubleHeadsModel
# as they are not supported, let's skip them # as they are not supported, let's skip them
if model.__class__.__name__ in ['GPT2DoubleHeadsModel', 'GPT2ForQuestionAnswering']: if model.__class__.__name__ in ['GPT2DoubleHeadsModel', 'GPT2ForQuestionAnswering']:

View File

@ -1,171 +0,0 @@
import copy
import random
from typing import Any, Callable, Iterator, List, Optional, Tuple
import numpy as np
import pytest
import torch
import torch.distributed as dist
from torch.nn import Module
from torch.optim import Optimizer
from torch.optim.lr_scheduler import _LRScheduler as LRScheduler
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
import colossalai
from colossalai.cluster import ProcessGroupMesh
from colossalai.interface import ModelWrapper, OptimizerWrapper
from colossalai.logging import disable_existing_loggers
from colossalai.pipeline.schedule import OneForwardOneBackwardSchedule
from colossalai.pipeline.stage_manager import PipelineStageManager
from colossalai.shardformer import ShardConfig, ShardFormer
from colossalai.testing import (
assert_hf_output_close,
clear_cache_before_run,
parameterize,
rerun_if_address_is_in_use,
spawn,
)
from tests.kit.model_zoo import model_zoo
from tests.test_shardformer.test_model._utils import build_model, build_pipeline_model, run_forward
DP_AXIS, PP_AXIS, TP_AXIS = 0, 1, 2
class PipelineOptimizer(OptimizerWrapper):
def __init__(self, optim: Optimizer, model: Module):
super().__init__(optim)
params = set(model.parameters())
new_param_groups = []
for group in optim.param_groups:
params = [p for p in group['params'] if p in params]
new_param_groups.append({**group, 'params': params})
optim.__setstate__({'param_groups': new_param_groups})
# TODO: support amp
class PipelinedModel(ModelWrapper):
def __init__(self, module: Module, shard_config: ShardConfig, stage_manager: PipelineStageManager) -> None:
self.stage_manager = stage_manager
shardformer = ShardFormer(shard_config)
module, self.shared_params = shardformer.optimize(module)
self.shared_param_process_groups = []
super().__init__(module)
def prepare_dataloader(dataset, batch_size, shuffle=False, seed=1024, drop_last=False, pin_memory=False, num_workers=0):
sampler = DistributedSampler(
dataset,
# rank=self.pg_mesh.coordinate(DP_AXIS),
shuffle=shuffle)
# Deterministic dataloader
def seed_worker(worker_id):
worker_seed = seed
np.random.seed(worker_seed)
torch.manual_seed(worker_seed)
random.seed(worker_seed)
return DataLoader(
dataset,
batch_size=batch_size,
sampler=sampler,
worker_init_fn=seed_worker,
drop_last=drop_last,
pin_memory=pin_memory,
num_workers=num_workers,
)
def execute_pipeline(
data_iter: Iterator,
model: PipelinedModel,
criterion: Callable[[Any, Any], torch.Tensor],
optimizer: PipelineOptimizer,
return_loss: bool = True,
return_outputs: bool = False,
schedule: OneForwardOneBackwardSchedule = None,
) -> dict:
# return loss or outputs if needed
outputs = schedule.forward_backward_step(model, optimizer, data_iter, criterion, return_loss, return_outputs)
return outputs
class data_loader():
def __getitem__(self, x):
return torch.ones((4, 128), dtype=torch.int).cuda() * 10
def loss(y, x):
return (y[0].float().mean() - x[0].float().mean())
@parameterize('enable_fused_normalization', [False])
@parameterize('enable_tensor_parallelism', [False])
@parameterize('use_lazy_init', [False])
def run_llama_test(enable_fused_normalization, enable_tensor_parallelism, use_lazy_init):
PP_DIM = 0
PP_SIZE = 2
RANK_TO_COORDINATE = {
0: (0, 0),
1: (0, 1),
2: (1, 0),
3: (1, 1),
}
PP_RANKS_IN_GROUP = {
0: [0, 1],
1: [0, 1],
2: [2, 3],
3: [2, 3],
}
pg_mesh = ProcessGroupMesh(PP_SIZE)
stage_manager = PipelineStageManager(pg_mesh, PP_DIM)
sub_model_zoo = model_zoo.get_sub_registry('transformers_llama')
for name, (model_fn, data_gen_fn, output_transform_fn, loss_fn, _) in sub_model_zoo.items():
if name != 'transformers_llama':
continue
num_microbatches = 2
org_model = model_fn().cuda()
data_iter = iter(data_loader())
model_copy = copy.deepcopy(org_model)
batch = next(data_iter)
with torch.no_grad():
y = model_copy(batch)
org_loss = loss(y, batch)
optimizer = torch.optim.AdamW(org_model.parameters(), lr=1e-3)
schedule = OneForwardOneBackwardSchedule(num_microbatches, stage_manager)
shard_config = ShardConfig(enable_fused_normalization=enable_fused_normalization,
enable_tensor_parallelism=enable_tensor_parallelism,
pipeline_stage_manager=stage_manager)
pipelined_model = PipelinedModel(org_model, shard_config, stage_manager)
pp_optimizer = PipelineOptimizer(optimizer, pipelined_model)
results = execute_pipeline(data_iter, pipelined_model, loss, pp_optimizer, schedule=schedule)
if stage_manager.is_last_stage():
assert results['loss'] == org_loss
else:
assert results['loss'] is None
assert results['outputs'] is None
torch.cuda.empty_cache()
def check_llama(rank, world_size, port):
disable_existing_loggers()
colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
run_llama_test()
@pytest.mark.dist
@rerun_if_address_is_in_use()
@clear_cache_before_run()
def test_llama():
spawn(check_llama, 2)
if __name__ == "__main__":
test_llama()

View File

@ -101,7 +101,7 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn,
}]) }])
def run_bloom_test(test_config): def run_bloom_test(test_config):
# TODO: add test_config for TP+DP after supporting & debugging it # TODO(baizhou): add test_config for TP+DP after supporting & debugging it
sub_model_zoo = model_zoo.get_sub_registry('transformers_bloom') sub_model_zoo = model_zoo.get_sub_registry('transformers_bloom')

View File

@ -125,7 +125,7 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn,
}]) }])
def run_chatglm_test(test_config): def run_chatglm_test(test_config):
# TODO: add test_config for TP+DP after supporting & debugging it # TODO(baizhou): add test_config for TP+DP after supporting & debugging it
sub_model_zoo = model_zoo.get_sub_registry('transformers_chatglm') sub_model_zoo = model_zoo.get_sub_registry('transformers_chatglm')

View File

@ -110,7 +110,7 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn,
@clear_cache_before_run() @clear_cache_before_run()
def run_gpt2_test(test_config): def run_gpt2_test(test_config):
# TODO: add test_config for TP+DP after supporting & debugging it # TODO(baizhou): add test_config for TP+DP after supporting & debugging it
sub_model_zoo = model_zoo.get_sub_registry('transformers_gpt') sub_model_zoo = model_zoo.get_sub_registry('transformers_gpt')

View File

@ -133,7 +133,7 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn,
}]) }])
def run_llama_test(test_config): def run_llama_test(test_config):
# TODO: add test_config for TP+DP after supporting & debugging it # TODO(baizhou): add test_config for TP+DP after supporting & debugging it
sub_model_zoo = model_zoo.get_sub_registry('transformers_llama') sub_model_zoo = model_zoo.get_sub_registry('transformers_llama')

View File

@ -127,7 +127,7 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn,
}]) }])
def run_opt_test(test_config): def run_opt_test(test_config):
# TODO: add test_config for TP+DP after supporting & debugging it # TODO(baizhou): add test_config for TP+DP after supporting & debugging it
sub_model_zoo = model_zoo.get_sub_registry('transformers_opt') sub_model_zoo = model_zoo.get_sub_registry('transformers_opt')

View File

@ -105,10 +105,10 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn,
@clear_cache_before_run() @clear_cache_before_run()
def run_t5_test(test_config): def run_t5_test(test_config):
# TODO: add plugin_config for TP+DP after supporting & debugging it # TODO(baizhou): add plugin_config for TP+DP after supporting & debugging it
# {'tp_size': 2, 'pp_size': 1, 'enable_fused_normalization': True} # {'tp_size': 2, 'pp_size': 1, 'enable_fused_normalization': True}
# TODO: add test_config for flash attention & jit operator after supporting # TODO(baizhou): add test_config for flash attention & jit operator after supporting
sub_model_zoo = model_zoo.get_sub_registry('transformers_t5') sub_model_zoo = model_zoo.get_sub_registry('transformers_t5')

View File

@ -124,8 +124,8 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn,
}]) }])
def run_vit_test(test_config): def run_vit_test(test_config):
# TODO: add test_config for TP+DP after supporting & debugging it # TODO(baizhou): add test_config for TP+DP after supporting & debugging it
# TODO: fix bug when settign lazy_init for Conv2D Layers in ViT models # TODO(baizhou): fix bug when settign lazy_init for Conv2D Layers in ViT models
sub_model_zoo = model_zoo.get_sub_registry('transformers_vit') sub_model_zoo = model_zoo.get_sub_registry('transformers_vit')