diff --git a/.github/workflows/weekly_test.yaml b/.github/workflows/weekly_test.yaml index 6251459..880d097 100644 --- a/.github/workflows/weekly_test.yaml +++ b/.github/workflows/weekly_test.yaml @@ -3,6 +3,7 @@ on: push: branches: - "main" + - "develop" env: SLURM_PARTITION: llm_s diff --git a/tests/test_core/test_pipeline.py b/tests/test_core/test_pipeline.py index 0b4bbdd..72bc52f 100644 --- a/tests/test_core/test_pipeline.py +++ b/tests/test_core/test_pipeline.py @@ -58,7 +58,12 @@ class MyLoss(nn.Module): config = Config( dict( gradient_handler=[dict(type="PipelineSharedModuleGradientHandler")], - parallel=dict(zero1=1, pipeline=dict(size=8, interleaved_overlap=False), sequence_parallel=False, tensor=1), + parallel=dict( + zero1=dict(size=1, fsdp=False), + pipeline=dict(size=8, interleaved_overlap=False), + sequence_parallel=False, + tensor=1, + ), model_type="INTERNLM", data=dict(seq_len=8, micro_num=16, micro_bsz=1, pack_sample_into_one=False, min_length=0, total_steps=9999), model=dict( diff --git a/tests/test_model/test_model_internlm.py b/tests/test_model/test_model_internlm.py index fb9c678..c002a96 100644 --- a/tests/test_model/test_model_internlm.py +++ b/tests/test_model/test_model_internlm.py @@ -16,7 +16,12 @@ from internlm.model.utils import gather_forward_split_backward config = Config( dict( - parallel=dict(zero1=1, pipeline=dict(size=1, interleaved_overlap=False), sequence_parallel=False, tensor=1), + parallel=dict( + zero1=dict(size=1, fsdp=False), + pipeline=dict(size=1, interleaved_overlap=False), + sequence_parallel=False, + tensor=1, + ), model_type="INTERNLM", data=dict(seq_len=2048, micro_num=1, micro_bsz=1, pack_sample_into_one=False, min_length=0, total_steps=9999), model=dict( diff --git a/tests/test_solver/test_optimizer.py b/tests/test_solver/test_optimizer.py index 6a22797..569b7b4 100644 --- a/tests/test_solver/test_optimizer.py +++ b/tests/test_solver/test_optimizer.py @@ -10,7 +10,7 @@ from torch.nn.parallel import DistributedDataParallel as DDP from torch.testing import assert_close import internlm -from internlm.core.context.parallel_context import Config +from internlm.core.context.parallel_context import Config, ParallelMode from internlm.solver.optimizer import HybridZeroOptimizer from internlm.solver.optimizer.utils import ParamBcastSyncHandler @@ -29,7 +29,12 @@ class MlpModel(nn.Module): config = Config( dict( - parallel=dict(zero1=1, pipeline=dict(size=1, interleaved_overlap=False), sequence_parallel=False, tensor=1), + parallel=dict( + zero1=dict(size=1, fsdp=False), + pipeline=dict(size=1, interleaved_overlap=False), + sequence_parallel=False, + tensor=1, + ), model_type="INTERNLM", data=dict(seq_len=2048, micro_num=1, micro_bsz=1, pack_sample_into_one=False, min_length=0, total_steps=9999), model=dict( @@ -103,14 +108,22 @@ def init_optimizer_grouped_parameters(check_group, model): { "params": list(model.parameters())[:2], "weight_decay": config.adam.weight_decay, + "dp_mode": ParallelMode.DATA, }, { "params": list(model.parameters())[2:], "weight_decay": config.adam.weight_decay, + "dp_mode": ParallelMode.DATA, }, ] else: - optimizer_grouped_parameters = [{"params": model.parameters(), "weight_decay": config.adam.weight_decay}] + optimizer_grouped_parameters = [ + { + "params": model.parameters(), + "weight_decay": config.adam.weight_decay, + "dp_mode": ParallelMode.DATA, + } + ] return optimizer_grouped_parameters @@ -137,7 +150,7 @@ def exam_hybrid_zero_optim_with_ddp(args): # ParamBcastSyncHandler does not consider paramters in different optimizer group currently if overlap_sync_param and check_group: return - config.parallel.zero1 = zero_parallel + config.parallel.zero1.size = zero_parallel config.hybrid_zero_optimizer.overlap_sync_param = overlap_sync_param config.hybrid_zero_optimizer.overlap_sync_grad = overlap_sync_grad config.data.micro_num = micro_num @@ -253,7 +266,7 @@ def exam_hybrid_zero_optim_with_ddp(args): def exam_hybrid_zero_optim_with_ckpt_load_save(args): # init rank, world_size, zero_parallel, check_group, dtype = args - config.parallel.zero1 = zero_parallel + config.parallel.zero1.size = zero_parallel config.parallel.dtype = dtype build_environment(rank, world_size) diff --git a/tests/test_utils/common_fixture.py b/tests/test_utils/common_fixture.py index 379a3e0..5d6d7da 100644 --- a/tests/test_utils/common_fixture.py +++ b/tests/test_utils/common_fixture.py @@ -10,15 +10,18 @@ from internlm.core.context.parallel_context import Config from internlm.solver.optimizer.hybrid_zero_optim import HybridZeroOptimizer from internlm.utils.common import SingletonMeta -OSS_NAME = os.environ["OSS_BUCKET_NAME"] -OSS_IP = os.environ["OSS_IP"] -USER = os.environ["USER"] +OSS_NAME = os.environ.get("OSS_BUCKET_NAME") +OSS_IP = os.environ.get("OSS_IP") +USER = os.environ.get("USER") JOB_NAME = "CI_TEST" LOCAL_SAVE_PATH = "local:local_ckpt" BOTO_SAVE_PATH = f"boto3:s3://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}" BOTO_SAVE_PATH_NO_PRFIX = f"s3://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}/" +VOLC_SAVE_PATH = f"volc:vc://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}" +VOLC_SAVE_PATH_NO_PRFIX = f"vc://{OSS_NAME}.{OSS_IP}/{USER}/{JOB_NAME}/" + ASYNC_TMP_FOLDER = "./async_tmp_folder" @@ -172,13 +175,25 @@ def del_tmp_file(): except FileNotFoundError: pass - try: - cmd = r"/mnt/petrelfs/share/sensesync --dryrun --deleteSrc cp " + BOTO_SAVE_PATH_NO_PRFIX + " / " - with Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True) as output: - results, presults = "", "" - for line in iter(output.stdout.readline, b""): - results += str(line.rstrip()) - presults += line.rstrip().decode() + "\n" - print(presults, flush=True) - except: # noqa # pylint: disable=bare-except - pass + if OSS_NAME is not None: + try: + cmd = r"/mnt/petrelfs/share/sensesync --dryrun --deleteSrc cp " + BOTO_SAVE_PATH_NO_PRFIX + " / " + with Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True) as output: + results, presults = "", "" + for line in iter(output.stdout.readline, b""): + results += str(line.rstrip()) + presults += line.rstrip().decode() + "\n" + print(presults, flush=True) + except: # noqa # pylint: disable=bare-except + pass + + try: + cmd = r"/mnt/petrelfs/share/sensesync --dryrun --deleteSrc cp " + VOLC_SAVE_PATH_NO_PRFIX + " / " + with Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True) as output: + results, presults = "", "" + for line in iter(output.stdout.readline, b""): + results += str(line.rstrip()) + presults += line.rstrip().decode() + "\n" + print(presults, flush=True) + except: # noqa # pylint: disable=bare-except + pass diff --git a/tests/test_utils/test_storage_manager.py b/tests/test_utils/test_storage_manager.py index 949c5ef..e96374e 100644 --- a/tests/test_utils/test_storage_manager.py +++ b/tests/test_utils/test_storage_manager.py @@ -10,20 +10,11 @@ from tests.test_utils.common_fixture import ( # noqa # pylint: disable=unused-i LOCAL_SAVE_PATH, VOLC_SAVE_PATH, del_tmp_file, - init_dist_and_model, reset_singletons, ) ASYNC_TMP_FOLDER = "./async_tmp_folder" ckpt_config_list = [ - # async boto - dict( - enable_save_ckpt=True, - async_upload_tmp_folder=ASYNC_TMP_FOLDER, - async_upload=True, - save_folder=BOTO_SAVE_PATH, - test_id=0, - ), # sync local dict( enable_save_ckpt=True, @@ -32,29 +23,37 @@ ckpt_config_list = [ save_folder=LOCAL_SAVE_PATH, test_id=1, ), - # sync boto - dict( - enable_save_ckpt=True, - async_upload_tmp_folder=None, - async_upload=False, - save_folder=BOTO_SAVE_PATH, - test_id=2, - ), # async local dict( enable_save_ckpt=True, async_upload_tmp_folder=ASYNC_TMP_FOLDER, async_upload=True, save_folder=LOCAL_SAVE_PATH, + test_id=2, + ), + # async boto + dict( + enable_save_ckpt=True, + async_upload_tmp_folder=ASYNC_TMP_FOLDER, + async_upload=True, + save_folder=BOTO_SAVE_PATH, test_id=3, ), + # sync boto + dict( + enable_save_ckpt=True, + async_upload_tmp_folder=None, + async_upload=False, + save_folder=BOTO_SAVE_PATH, + test_id=4, + ), # async volc dict( enable_save_ckpt=True, async_upload_tmp_folder=ASYNC_TMP_FOLDER, async_upload=True, save_folder=VOLC_SAVE_PATH, - test_id=4, + test_id=5, ), # sync volc dict( @@ -62,7 +61,7 @@ ckpt_config_list = [ async_upload_tmp_folder=None, async_upload=False, save_folder=VOLC_SAVE_PATH, - test_id=5, + test_id=6, ), ] @@ -77,7 +76,7 @@ def del_tmp(): @pytest.mark.usefixtures("del_tmp") @pytest.mark.usefixtures("reset_singletons") @pytest.mark.parametrize("ckpt_config", ckpt_config_list) -def test_storage_mm_save_load(ckpt_config, init_dist_and_model): # noqa # pylint: disable=unused-argument +def test_storage_mm_save_load(ckpt_config): # noqa # pylint: disable=unused-argument from internlm.utils.storage_manager import ( check_folder, get_fns, @@ -88,6 +87,11 @@ def test_storage_mm_save_load(ckpt_config, init_dist_and_model): # noqa # pylin ) ckpt_config = Config(ckpt_config) + if os.environ.get("OSS_BUCKET_NAME") is None: + if ckpt_config.test_id > 2: + print("Pass boto3 and volc", flush=True) + return + enable_save_ckpt = get_config_value(ckpt_config, "enable_save_ckpt", False) async_upload_tmp_folder = get_config_value(ckpt_config, "async_upload_tmp_folder", False) async_upload = get_config_value(ckpt_config, "async_upload", False)