feat: support volc oss

pull/397/head
li126com 2023-10-09 14:52:14 +08:00
parent c19b88a3fa
commit 67fad5c894
4 changed files with 46 additions and 15 deletions

View File

@ -39,7 +39,7 @@ CheckpointManager
load_ckpt_folder=dict(path="local:/mnt/mfs/ckpt", content=["all",], ckpt_type="internlm"),
auto_resume=False, # disable auto-resume, internlm will load model checkpoint from the path of 'load_ckpt_folder'.
checkpoint_every=CHECKPOINT_EVERY,
async_upload=True, # async ckpt upload. (only work for boto3 ckpt)
async_upload=True, # async ckpt upload. (only work for boto3 and volc ckpt)
async_upload_tmp_folder="/dev/shm/internlm_tmp_ckpt/", # path for temporarily files during asynchronous upload.
oss_snapshot_freq=int(CHECKPOINT_EVERY / 2), # snapshot ckpt save frequency.
)
@ -67,7 +67,9 @@ InternLM对config中出现的所有存储路径都遵循以下的路径格式约
1. 如果需要使用boto3的路径需要在运行前提前导入 ``S3_ACCESS_KEY_ID````S3_SECRET_ACCESS_KEY_ID`` 这两个环境变量。
2. bucket的endpoint一般分为Inside IP和Outside IP如果可以尽量使用inside IP会获得更佳的存储速度。
2. 如果需要使用volc的路径需要在运行前提前导入 ``VOLC_ACCESS_KEY_ID````VOLC_SECRET_ACCESS_KEY_ID`` 这两个环境变量。
3. bucket的endpoint一般分为Inside IP和Outside IP如果可以尽量使用inside IP会获得更佳的存储速度。
@ -114,7 +116,7 @@ config.ckpt 中相关的参数:
- ``async_upload_tmp_folder``: 异步上传临时存储路径。参数类型 ``str/None``, 默认值为 ``/dev/shm/{JOB_NAME}_tmp_ckpt/``
需要注意的是异步上传功能仅在backend为boto3时才会有效果bcakend为local时只支持同步存储。
需要注意的是异步上传功能仅在backend为boto3或volc时才会有效果bcakend为local时只支持同步存储。
``async_upload_tmp_folder`` 设置的的原则为尽量设置为计算节点的local目录这样才可以获得最佳的异步上传速度一般来说建议为 ``/dev/shm````/nvme`` 下的路径,如果使用同步上传,则该路径可不给。

Binary file not shown.

Before

Width:  |  Height:  |  Size: 153 KiB

After

Width:  |  Height:  |  Size: 212 KiB

View File

@ -133,6 +133,7 @@ class VolcMetaInfo:
handler: StorageClient,
bucket_name: str,
endpoint: str,
region: str,
file_path: str,
async_upload_fn: callable,
local_nvme_path=None,
@ -145,11 +146,12 @@ class VolcMetaInfo:
self.local_nvme_path = local_nvme_path
self.is_async = is_async
self.endpoint = endpoint
self.region = region
self.async_upload_fn = async_upload_fn
def __str__(self) -> str:
return f"is_async: {self.is_async}, bucket_name:{self.bucket_name}, endpoint:{self.endpoint}, \
local_nvme_path: {self.local_nvme_path}"
region:{self.region}, local_nvme_path: {self.local_nvme_path}"
@staticmethod
def unpack_volc_save_meta(meta):
@ -368,14 +370,16 @@ class VolcClient(StorageClient):
def __init__(
self,
endpoint: str,
region: str,
) -> None:
"""Volc object/file storage management class
Env variables:
access_key (str): Access key ID get from "VOLC_ACCESS_KEY_ID".
secret_key (str): Secret access key get from "VOLC_SECRET_ACCESS_KEY_ID".
endpoint (str): Get from "VOLC_ENDPOINT".
region (str): Get from "VOLC_REGION".
Args:
access_key (str): Volc access key ID.
secret_key (str): Volc secret access key.
endpoint (str): Volc tos endpoint.
region (str): Volc tos region.
"""
super().__init__(tos)
@ -383,12 +387,10 @@ class VolcClient(StorageClient):
try:
access_key = os.environ["VOLC_ACCESS_KEY_ID"]
secret_key = os.environ["VOLC_SECRET_ACCESS_KEY_ID"]
endpoint = os.environ["VOLC_ENDPOINT"]
region = os.environ["VOLC_REGION"]
except KeyError as exc:
raise RuntimeError(
"Please set 'VOLC_ACCESS_KEY_ID' and 'VOLC_SECRET_ACCESS_KEY_ID'",
"and 'VOLC_ENDPOINT' and 'VOLC_REGION' using environment variable!",
"using environment variable!",
) from exc
self.client = self.handler.TosClientV2(access_key, secret_key, endpoint, region)
@ -580,7 +582,11 @@ def get_volc_meta(fp: str, tmp_local_folder: str, is_async: bool) -> VolcMetaInf
match = volc_url_re.match(parts[0])
assert match is not None, f"url '{fp}' is not a valid volc url"
bucket_name, endpoint = match.group(1), match.group(2)
endpoint = "http://" + endpoint + ":80"
temp_part = endpoint.split(".")
endpoint = ".".join(temp_part[1:])
region = temp_part[1].split("-")
region = "-".join(region[1:])
if is_async:
tmp_step_file = get_tmp_file_name(tmp_local_folder, fp)
else:
@ -590,6 +596,7 @@ def get_volc_meta(fp: str, tmp_local_folder: str, is_async: bool) -> VolcMetaInf
handler=None,
bucket_name=bucket_name,
endpoint=endpoint,
region=region,
file_path=os.path.sep.join(parts[1:]),
async_upload_fn=VolcClient.async_upload_fileobj,
local_nvme_path=tmp_step_file,
@ -720,7 +727,10 @@ class StorageManager(metaclass=SingletonMeta):
elif backend == "volc":
meta_info = get_volc_meta(path, self.tmp_local_folder, async_mode)
backend_key = backend + ":" + meta_info.endpoint
init_args = (meta_info.endpoint,)
init_args = (
meta_info.endpoint,
meta_info.region,
)
if (
"http_proxy" in os.environ
or "https_proxy" in os.environ

View File

@ -6,9 +6,9 @@ import torch
from internlm.core.context.parallel_context import Config
from internlm.initialize.launch import get_config_value
from tests.test_utils.common_fixture import ( # noqa # pylint: disable=unused-import
ASYNC_TMP_FOLDER,
BOTO_SAVE_PATH,
LOCAL_SAVE_PATH,
VOLC_SAVE_PATH,
del_tmp_file,
init_dist_and_model,
reset_singletons,
@ -48,6 +48,22 @@ ckpt_config_list = [
save_folder=LOCAL_SAVE_PATH,
test_id=3,
),
# async volc
dict(
enable_save_ckpt=True,
async_upload_tmp_folder=ASYNC_TMP_FOLDER,
async_upload=True,
save_folder=VOLC_SAVE_PATH,
test_id=4,
),
# sync volc
dict(
enable_save_ckpt=True,
async_upload_tmp_folder=None,
async_upload=False,
save_folder=VOLC_SAVE_PATH,
test_id=5,
),
]
@ -97,6 +113,9 @@ internlm_ckpt_path = [
("/mnt/ckpt/", "local", "/mnt/ckpt/"),
("./ckpt/", "local", "./ckpt/"),
("s3://oss_bucket/", "boto3", "s3://oss_bucket/"),
("volc:vc://oss_bucket/", "volc", "vc://oss_bucket/"),
("volc:oss_bucket/", "volc", "oss_bucket/"),
("vc://oss_bucket/", "volc", "vc://oss_bucket/"),
]