fix: unitest (#424)

pull/427/head
jiaxingli 2023-10-19 15:19:40 +08:00 committed by GitHub
parent 2c5395fdfd
commit 3ea46324dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 83 additions and 40 deletions

View File

@ -3,6 +3,7 @@ on:
push: push:
branches: branches:
- "main" - "main"
- "develop"
env: env:
SLURM_PARTITION: llm_s SLURM_PARTITION: llm_s

View File

@ -58,7 +58,12 @@ class MyLoss(nn.Module):
config = Config( config = Config(
dict( dict(
gradient_handler=[dict(type="PipelineSharedModuleGradientHandler")], gradient_handler=[dict(type="PipelineSharedModuleGradientHandler")],
parallel=dict(zero1=1, pipeline=dict(size=8, interleaved_overlap=False), sequence_parallel=False, tensor=1), parallel=dict(
zero1=dict(size=1, fsdp=False),
pipeline=dict(size=8, interleaved_overlap=False),
sequence_parallel=False,
tensor=1,
),
model_type="INTERNLM", model_type="INTERNLM",
data=dict(seq_len=8, micro_num=16, micro_bsz=1, pack_sample_into_one=False, min_length=0, total_steps=9999), data=dict(seq_len=8, micro_num=16, micro_bsz=1, pack_sample_into_one=False, min_length=0, total_steps=9999),
model=dict( model=dict(

View File

@ -16,7 +16,12 @@ from internlm.model.utils import gather_forward_split_backward
config = Config( config = Config(
dict( dict(
parallel=dict(zero1=1, pipeline=dict(size=1, interleaved_overlap=False), sequence_parallel=False, tensor=1), parallel=dict(
zero1=dict(size=1, fsdp=False),
pipeline=dict(size=1, interleaved_overlap=False),
sequence_parallel=False,
tensor=1,
),
model_type="INTERNLM", model_type="INTERNLM",
data=dict(seq_len=2048, micro_num=1, micro_bsz=1, pack_sample_into_one=False, min_length=0, total_steps=9999), data=dict(seq_len=2048, micro_num=1, micro_bsz=1, pack_sample_into_one=False, min_length=0, total_steps=9999),
model=dict( model=dict(

View File

@ -10,7 +10,7 @@ from torch.nn.parallel import DistributedDataParallel as DDP
from torch.testing import assert_close from torch.testing import assert_close
import internlm import internlm
from internlm.core.context.parallel_context import Config from internlm.core.context.parallel_context import Config, ParallelMode
from internlm.solver.optimizer import HybridZeroOptimizer from internlm.solver.optimizer import HybridZeroOptimizer
from internlm.solver.optimizer.utils import ParamBcastSyncHandler from internlm.solver.optimizer.utils import ParamBcastSyncHandler
@ -29,7 +29,12 @@ class MlpModel(nn.Module):
config = Config( config = Config(
dict( dict(
parallel=dict(zero1=1, pipeline=dict(size=1, interleaved_overlap=False), sequence_parallel=False, tensor=1), parallel=dict(
zero1=dict(size=1, fsdp=False),
pipeline=dict(size=1, interleaved_overlap=False),
sequence_parallel=False,
tensor=1,
),
model_type="INTERNLM", model_type="INTERNLM",
data=dict(seq_len=2048, micro_num=1, micro_bsz=1, pack_sample_into_one=False, min_length=0, total_steps=9999), data=dict(seq_len=2048, micro_num=1, micro_bsz=1, pack_sample_into_one=False, min_length=0, total_steps=9999),
model=dict( model=dict(
@ -103,14 +108,22 @@ def init_optimizer_grouped_parameters(check_group, model):
{ {
"params": list(model.parameters())[:2], "params": list(model.parameters())[:2],
"weight_decay": config.adam.weight_decay, "weight_decay": config.adam.weight_decay,
"dp_mode": ParallelMode.DATA,
}, },
{ {
"params": list(model.parameters())[2:], "params": list(model.parameters())[2:],
"weight_decay": config.adam.weight_decay, "weight_decay": config.adam.weight_decay,
"dp_mode": ParallelMode.DATA,
}, },
] ]
else: else:
optimizer_grouped_parameters = [{"params": model.parameters(), "weight_decay": config.adam.weight_decay}] optimizer_grouped_parameters = [
{
"params": model.parameters(),
"weight_decay": config.adam.weight_decay,
"dp_mode": ParallelMode.DATA,
}
]
return optimizer_grouped_parameters return optimizer_grouped_parameters
@ -137,7 +150,7 @@ def exam_hybrid_zero_optim_with_ddp(args):
# ParamBcastSyncHandler does not consider paramters in different optimizer group currently # ParamBcastSyncHandler does not consider paramters in different optimizer group currently
if overlap_sync_param and check_group: if overlap_sync_param and check_group:
return return
config.parallel.zero1 = zero_parallel config.parallel.zero1.size = zero_parallel
config.hybrid_zero_optimizer.overlap_sync_param = overlap_sync_param config.hybrid_zero_optimizer.overlap_sync_param = overlap_sync_param
config.hybrid_zero_optimizer.overlap_sync_grad = overlap_sync_grad config.hybrid_zero_optimizer.overlap_sync_grad = overlap_sync_grad
config.data.micro_num = micro_num config.data.micro_num = micro_num
@ -253,7 +266,7 @@ def exam_hybrid_zero_optim_with_ddp(args):
def exam_hybrid_zero_optim_with_ckpt_load_save(args): def exam_hybrid_zero_optim_with_ckpt_load_save(args):
# init # init
rank, world_size, zero_parallel, check_group, dtype = args rank, world_size, zero_parallel, check_group, dtype = args
config.parallel.zero1 = zero_parallel config.parallel.zero1.size = zero_parallel
config.parallel.dtype = dtype config.parallel.dtype = dtype
build_environment(rank, world_size) build_environment(rank, world_size)

View File

@ -10,15 +10,18 @@ from internlm.core.context.parallel_context import Config
from internlm.solver.optimizer.hybrid_zero_optim import HybridZeroOptimizer from internlm.solver.optimizer.hybrid_zero_optim import HybridZeroOptimizer
from internlm.utils.common import SingletonMeta from internlm.utils.common import SingletonMeta
OSS_NAME = os.environ["OSS_BUCKET_NAME"] OSS_NAME = os.environ.get("OSS_BUCKET_NAME")
OSS_IP = os.environ["OSS_IP"] OSS_IP = os.environ.get("OSS_IP")
USER = os.environ["USER"] USER = os.environ.get("USER")
JOB_NAME = "CI_TEST" JOB_NAME = "CI_TEST"
LOCAL_SAVE_PATH = "local:local_ckpt" LOCAL_SAVE_PATH = "local:local_ckpt"
BOTO_SAVE_PATH = f"boto3:s3://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}" BOTO_SAVE_PATH = f"boto3:s3://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}"
BOTO_SAVE_PATH_NO_PRFIX = f"s3://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}/" BOTO_SAVE_PATH_NO_PRFIX = f"s3://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}/"
VOLC_SAVE_PATH = f"volc:vc://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}"
VOLC_SAVE_PATH_NO_PRFIX = f"vc://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}/"
ASYNC_TMP_FOLDER = "./async_tmp_folder" ASYNC_TMP_FOLDER = "./async_tmp_folder"
@ -172,13 +175,25 @@ def del_tmp_file():
except FileNotFoundError: except FileNotFoundError:
pass pass
try: if OSS_NAME is not None:
cmd = r"/mnt/petrelfs/share/sensesync --dryrun --deleteSrc cp " + BOTO_SAVE_PATH_NO_PRFIX + " / " try:
with Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True) as output: cmd = r"/mnt/petrelfs/share/sensesync --dryrun --deleteSrc cp " + BOTO_SAVE_PATH_NO_PRFIX + " / "
results, presults = "", "" with Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True) as output:
for line in iter(output.stdout.readline, b""): results, presults = "", ""
results += str(line.rstrip()) for line in iter(output.stdout.readline, b""):
presults += line.rstrip().decode() + "\n" results += str(line.rstrip())
print(presults, flush=True) presults += line.rstrip().decode() + "\n"
except: # noqa # pylint: disable=bare-except print(presults, flush=True)
pass except: # noqa # pylint: disable=bare-except
pass
try:
cmd = r"/mnt/petrelfs/share/sensesync --dryrun --deleteSrc cp " + VOLC_SAVE_PATH_NO_PRFIX + " / "
with Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True) as output:
results, presults = "", ""
for line in iter(output.stdout.readline, b""):
results += str(line.rstrip())
presults += line.rstrip().decode() + "\n"
print(presults, flush=True)
except: # noqa # pylint: disable=bare-except
pass

View File

@ -10,20 +10,11 @@ from tests.test_utils.common_fixture import ( # noqa # pylint: disable=unused-i
LOCAL_SAVE_PATH, LOCAL_SAVE_PATH,
VOLC_SAVE_PATH, VOLC_SAVE_PATH,
del_tmp_file, del_tmp_file,
init_dist_and_model,
reset_singletons, reset_singletons,
) )
ASYNC_TMP_FOLDER = "./async_tmp_folder" ASYNC_TMP_FOLDER = "./async_tmp_folder"
ckpt_config_list = [ ckpt_config_list = [
# async boto
dict(
enable_save_ckpt=True,
async_upload_tmp_folder=ASYNC_TMP_FOLDER,
async_upload=True,
save_folder=BOTO_SAVE_PATH,
test_id=0,
),
# sync local # sync local
dict( dict(
enable_save_ckpt=True, enable_save_ckpt=True,
@ -32,29 +23,37 @@ ckpt_config_list = [
save_folder=LOCAL_SAVE_PATH, save_folder=LOCAL_SAVE_PATH,
test_id=1, test_id=1,
), ),
# sync boto
dict(
enable_save_ckpt=True,
async_upload_tmp_folder=None,
async_upload=False,
save_folder=BOTO_SAVE_PATH,
test_id=2,
),
# async local # async local
dict( dict(
enable_save_ckpt=True, enable_save_ckpt=True,
async_upload_tmp_folder=ASYNC_TMP_FOLDER, async_upload_tmp_folder=ASYNC_TMP_FOLDER,
async_upload=True, async_upload=True,
save_folder=LOCAL_SAVE_PATH, save_folder=LOCAL_SAVE_PATH,
test_id=2,
),
# async boto
dict(
enable_save_ckpt=True,
async_upload_tmp_folder=ASYNC_TMP_FOLDER,
async_upload=True,
save_folder=BOTO_SAVE_PATH,
test_id=3, test_id=3,
), ),
# sync boto
dict(
enable_save_ckpt=True,
async_upload_tmp_folder=None,
async_upload=False,
save_folder=BOTO_SAVE_PATH,
test_id=4,
),
# async volc # async volc
dict( dict(
enable_save_ckpt=True, enable_save_ckpt=True,
async_upload_tmp_folder=ASYNC_TMP_FOLDER, async_upload_tmp_folder=ASYNC_TMP_FOLDER,
async_upload=True, async_upload=True,
save_folder=VOLC_SAVE_PATH, save_folder=VOLC_SAVE_PATH,
test_id=4, test_id=5,
), ),
# sync volc # sync volc
dict( dict(
@ -62,7 +61,7 @@ ckpt_config_list = [
async_upload_tmp_folder=None, async_upload_tmp_folder=None,
async_upload=False, async_upload=False,
save_folder=VOLC_SAVE_PATH, save_folder=VOLC_SAVE_PATH,
test_id=5, test_id=6,
), ),
] ]
@ -77,7 +76,7 @@ def del_tmp():
@pytest.mark.usefixtures("del_tmp") @pytest.mark.usefixtures("del_tmp")
@pytest.mark.usefixtures("reset_singletons") @pytest.mark.usefixtures("reset_singletons")
@pytest.mark.parametrize("ckpt_config", ckpt_config_list) @pytest.mark.parametrize("ckpt_config", ckpt_config_list)
def test_storage_mm_save_load(ckpt_config, init_dist_and_model): # noqa # pylint: disable=unused-argument def test_storage_mm_save_load(ckpt_config): # noqa # pylint: disable=unused-argument
from internlm.utils.storage_manager import ( from internlm.utils.storage_manager import (
check_folder, check_folder,
get_fns, get_fns,
@ -88,6 +87,11 @@ def test_storage_mm_save_load(ckpt_config, init_dist_and_model): # noqa # pylin
) )
ckpt_config = Config(ckpt_config) ckpt_config = Config(ckpt_config)
if os.environ.get("OSS_BUCKET_NAME") is None:
if ckpt_config.test_id > 2:
print("Pass boto3 and volc", flush=True)
return
enable_save_ckpt = get_config_value(ckpt_config, "enable_save_ckpt", False) enable_save_ckpt = get_config_value(ckpt_config, "enable_save_ckpt", False)
async_upload_tmp_folder = get_config_value(ckpt_config, "async_upload_tmp_folder", False) async_upload_tmp_folder = get_config_value(ckpt_config, "async_upload_tmp_folder", False)
async_upload = get_config_value(ckpt_config, "async_upload", False) async_upload = get_config_value(ckpt_config, "async_upload", False)