From c19b88a3fa62f21c6e656e17a6891d34a5ad59bd Mon Sep 17 00:00:00 2001 From: li126com Date: Sun, 8 Oct 2023 15:07:02 +0800 Subject: [PATCH] feat: support volc tos --- internlm/utils/storage_manager.py | 242 +++++++++++++++++++++++++++++- 1 file changed, 234 insertions(+), 8 deletions(-) diff --git a/internlm/utils/storage_manager.py b/internlm/utils/storage_manager.py index a3f9122..d244034 100644 --- a/internlm/utils/storage_manager.py +++ b/internlm/utils/storage_manager.py @@ -25,6 +25,7 @@ from internlm.utils.logger import get_logger try: import boto3 import botocore + import tos except ImportError: pass @@ -32,6 +33,7 @@ except ImportError: logger = get_logger(__file__) boto3_url_re = re.compile(r"([^\.]+)\.([\d\.]+)") +volc_url_re = re.compile(r"^(.*?)\.(.*)$") MB = 1024**2 @@ -122,6 +124,45 @@ local_nvme_path: {self.local_nvme_path}" return meta.client, meta.bucket_name, meta.file_path +class VolcMetaInfo: + """Volc meta info for save/load etc.""" + + def __init__( + self, + is_async, + handler: StorageClient, + bucket_name: str, + endpoint: str, + file_path: str, + async_upload_fn: callable, + local_nvme_path=None, + ) -> None: + # all need info. + self.client = handler + self.bucket_name = bucket_name + self.file_path = file_path + # only save need info. + self.local_nvme_path = local_nvme_path + self.is_async = is_async + self.endpoint = endpoint + self.async_upload_fn = async_upload_fn + + def __str__(self) -> str: + return f"is_async: {self.is_async}, bucket_name:{self.bucket_name}, endpoint:{self.endpoint}, \ +local_nvme_path: {self.local_nvme_path}" + + @staticmethod + def unpack_volc_save_meta(meta): + if meta.is_async: + return meta.client, meta.bucket_name, meta.file_path, meta.local_nvme_path + else: + return meta.client, meta.bucket_name, meta.file_path + + @staticmethod + def unpack_volc_nosave_meta(meta): + return meta.client, meta.bucket_name, meta.file_path + + class LocalMetaInfo: """Local meta info for save/load etc.""" @@ -139,18 +180,22 @@ class LocalMetaInfo: return (meta.file_path,) -def unpack_save_meta(meta: Union[Boto3MetaInfo, LocalMetaInfo]): +def unpack_save_meta(meta: Union[Boto3MetaInfo, VolcMetaInfo, LocalMetaInfo]): if isinstance(meta, Boto3MetaInfo): return Boto3MetaInfo.unpack_boto3_save_meta(meta) + elif isinstance(meta, VolcMetaInfo): + return VolcMetaInfo.unpack_volc_save_meta(meta) elif isinstance(meta, LocalMetaInfo): return LocalMetaInfo.unpack_local_save_meta(meta) else: raise ValueError(f"unkonwn meta info: {type(meta)}") -def unpack_nosave_meta(meta: Union[Boto3MetaInfo, LocalMetaInfo]): +def unpack_nosave_meta(meta: Union[Boto3MetaInfo, VolcMetaInfo, LocalMetaInfo]): if isinstance(meta, Boto3MetaInfo): return Boto3MetaInfo.unpack_boto3_nosave_meta(meta) + elif isinstance(meta, VolcMetaInfo): + return VolcMetaInfo.unpack_volc_nosave_meta(meta) elif isinstance(meta, LocalMetaInfo): return LocalMetaInfo.unpack_local_nosave_meta(meta) else: @@ -170,6 +215,10 @@ def try_get_storage_backend(path: str): if gpc.is_rank_for_log(): logger.warning(f"path: '{path}' not start with backend prefix, guess it is the backend of boto3.") return "boto3", path + elif path.startswith("vc:"): + if gpc.is_rank_for_log(): + logger.warning(f"path: '{path}' not start with backend prefix, guess it is the backend of volc.") + return "volc", path else: sre = path.split(":", maxsplit=1) if len(sre) == 1: @@ -312,6 +361,143 @@ class Boto3Client(StorageClient): raise NotImplementedError("boto3 not support delete_obj") +class VolcClient(StorageClient): + """ + VolcClient + """ + + def __init__( + self, + ) -> None: + """Volc object/file storage management class + + Env variables: + access_key (str): Access key ID get from "VOLC_ACCESS_KEY_ID". + secret_key (str): Secret access key get from "VOLC_SECRET_ACCESS_KEY_ID". + endpoint (str): Get from "VOLC_ENDPOINT". + region (str): Get from "VOLC_REGION". + + """ + super().__init__(tos) + + try: + access_key = os.environ["VOLC_ACCESS_KEY_ID"] + secret_key = os.environ["VOLC_SECRET_ACCESS_KEY_ID"] + endpoint = os.environ["VOLC_ENDPOINT"] + region = os.environ["VOLC_REGION"] + except KeyError as exc: + raise RuntimeError( + "Please set 'VOLC_ACCESS_KEY_ID' and 'VOLC_SECRET_ACCESS_KEY_ID'", + "and 'VOLC_ENDPOINT' and 'VOLC_REGION' using environment variable!", + ) from exc + + self.client = self.handler.TosClientV2(access_key, secret_key, endpoint, region) + + @staticmethod + def sync_upload_fileobj(handler, bucket_name: str, fp: str, saved_obj=None, **kwargs): + assert saved_obj is not None, "saved_obj is None!" + try: + with io.BytesIO() as f: + torch.save(saved_obj, f, **kwargs) + f.seek(0) + handler.client.put_object(bucket_name, fp, content=f) + except handler.handler.exceptions.TosClientError as exc: + raise RuntimeError( + f"Volc Network Error: fail with client error, message:{exc.message}, cause: {exc.cause}" + ) from exc + except handler.handler.exceptions.TosServerError as exc: + raise RuntimeError( + f"Volc Network Error: fail with server error, code: {exec.code}", + f"error with request id: {exec.request_id}", + f"error with message: {exec.message}", + f"error with http code: {exec.status_code}", + ) from exc + + @staticmethod + def load(handler, bucket_name: str, fp: str, **kwargs) -> Dict: + """ + Args: + fp (str): Path to save, eg. vc://opennlplab/model_weights/xxx/ddd.pt + """ + try: + object_stream = handler.client.get_object(bucket_name, fp) + buffer = io.BytesIO(object_stream.read()) + states = torch.load(buffer, **kwargs) + except handler.handler.exceptions.TosClientError as exc: + raise RuntimeError( + f"Volc Network Error: fail with client error, message:{exc.message}, cause: {exc.cause}" + ) from exc + except handler.handler.exceptions.TosServerError as exc: + raise RuntimeError( + f"Volc Network Error: fail with server error, code: {exec.code}", + f"error with request id: {exec.request_id}", + f"error with message: {exec.message}", + f"error with http code: {exec.status_code}", + ) from exc + + return states + + @staticmethod + def assert_fp_exists(handler, bucket_name: str, fp: str): # pylint: disable=W0613 + assert len(list(handler.client.list_objects_type2(bucket_name, prefix=fp).contents)) > 0, fp + + @staticmethod + def is_fp_exists(handler, bucket_name: str, fp: str): # pylint: disable=W0613 + re = handler.client.list_objects_type2(bucket_name, prefix=fp) + if hasattr(re, "contents"): + return len(list(re.contents)) > 0 + else: + return False + + @staticmethod + def get_fns(handler, bucket_name: str, fp: str): + if VolcClient.is_fp_exists(handler, bucket_name, fp): + folder_name_list = [] + result = handler.client.list_objects_type2(bucket_name, prefix=fp) + if hasattr(result, "contents"): + for iterm in result.contents: + pth = iterm.key + folder_name_list.append(pth.split(fp, maxsplit=1)[1].strip("/").split("/", maxsplit=1)[0]) + + while result.is_truncated: + result = handler.client.list_objects_type2( + bucket_name, prefix=fp, continuation_token=result.next_continuation_token + ) + if hasattr(result, "contents"): + for iterm in result.contents: + pth = iterm.key + folder_name_list.append(pth.split(fp, maxsplit=1)[1].strip("/").split("/", maxsplit=1)[0]) + + return list(set(folder_name_list)) + + else: + if gpc.is_rank_for_log(): + logger.warning(f"'{fp}' not found!") + return None + + @staticmethod + def async_upload_fileobj(handler, bucket_name: str, fp: str, local_nvme_path: str): + try: + handler.client.put_object_from_file(bucket_name, fp, local_nvme_path) + except handler.handler.exceptions.TosClientError as exc: + raise RuntimeError( + f"Volc Network Error: fail with client error, message:{exc.message}, cause: {exc.cause}" + ) from exc + except handler.handler.exceptions.TosServerError as exc: + raise RuntimeError( + f"Volc Network Error: fail with server error, code: {exec.code}", + f"error with request id: {exec.request_id}", + f"error with message: {exec.message}", + f"error with http code: {exec.status_code}", + ) from exc + except Exception as e: + raise e + + @staticmethod + def delete_obj(handler, fp: str): + raise NotImplementedError("volc not support delete_obj") + + class LocalClient(StorageClient): """ Storage Client for local NFS. @@ -388,8 +574,30 @@ def get_boto3_meta(fp: str, tmp_local_folder: str, is_async: bool) -> Boto3MetaI ) +def get_volc_meta(fp: str, tmp_local_folder: str, is_async: bool) -> VolcMetaInfo: + assert fp.startswith("vc://"), f"Path '{fp}' is not a volc url" + parts = fp.lstrip("vc://").split(os.path.sep) + match = volc_url_re.match(parts[0]) + assert match is not None, f"url '{fp}' is not a valid volc url" + bucket_name, endpoint = match.group(1), match.group(2) + endpoint = "http://" + endpoint + ":80" + if is_async: + tmp_step_file = get_tmp_file_name(tmp_local_folder, fp) + else: + tmp_step_file = None + return VolcMetaInfo( + is_async=is_async, + handler=None, + bucket_name=bucket_name, + endpoint=endpoint, + file_path=os.path.sep.join(parts[1:]), + async_upload_fn=VolcClient.async_upload_fileobj, + local_nvme_path=tmp_step_file, + ) + + def get_local_meta(fp: str) -> LocalMetaInfo: - assert not fp.startswith("s3://"), f"Path '{fp}' is not a local path" + assert not fp.startswith("s3://") and not fp.startswith("vc://"), f"Path '{fp}' is not a local path" return LocalMetaInfo(fp) @@ -430,10 +638,11 @@ class StorageManager(metaclass=SingletonMeta): TODO: add a thread to poll the asynchronous storage state. """ - BACKEND_TYPE = {"boto3", "local"} + BACKEND_TYPE = {"boto3", "local", "volc"} BACKEND_INIT_METHOD = { "boto3": Boto3Client, "local": LocalClient, + "volc": VolcClient, } CLI_DICT = {} @@ -476,11 +685,12 @@ class StorageManager(metaclass=SingletonMeta): logger.error(f'tmp_local_folder only have "{free_size}" GB free space, less then 100 GB!') raise RuntimeError(f"Insufficient temporary storage space on {socket.gethostname()}") - def _get_client(self, path: str, async_mode: bool = False) -> Union[Boto3MetaInfo, LocalMetaInfo]: + def _get_client(self, path: str, async_mode: bool = False) -> Union[Boto3MetaInfo, VolcMetaInfo, LocalMetaInfo]: """ example: local:/path/to/checkpoint boto3:s3://model_weights/0331/120bi + volc:vc://model_weights/0331/120bi Args: path (str): _description_ @@ -507,10 +717,26 @@ class StorageManager(metaclass=SingletonMeta): the proxy may make boto3 unavailable or affect performance." ) self.has_warning = True + elif backend == "volc": + meta_info = get_volc_meta(path, self.tmp_local_folder, async_mode) + backend_key = backend + ":" + meta_info.endpoint + init_args = (meta_info.endpoint,) + if ( + "http_proxy" in os.environ + or "https_proxy" in os.environ + or "HTTP_PROXY" in os.environ + or "HTTPS_PROXY" in os.environ + ): + if not self.has_warning and gpc.is_rank_for_log(): + logger.warning( + "HTTP/HTTPS proxy is detected when using volc, incorrectly setting \ + the proxy may make volc unavailable or affect performance." + ) + self.has_warning = True assert backend in StorageManager.BACKEND_TYPE, f"Unkown backend: {backend}" - # boto3 backend need special treatment. + # boto3 and volc backend need special treatment. if backend_key not in StorageManager.CLI_DICT: StorageManager.CLI_DICT.update({backend_key: StorageManager.BACKEND_INIT_METHOD[backend](*init_args)}) @@ -527,11 +753,10 @@ class StorageManager(metaclass=SingletonMeta): return meta.client.get_fns(*unpack_nosave_meta(meta)) def save(self, save_path: str, to_save_obj: Any, async_upload=None, **kwargs): - if async_upload is None: async_upload = self.async_mode - if not save_path.startswith("boto3:"): + if not save_path.startswith("boto3:") and not save_path.startswith("volc:"): async_upload = False meta = self._get_client(save_path, async_upload) @@ -554,6 +779,7 @@ class StorageManager(metaclass=SingletonMeta): def load(self, load_path: str, **kwargs) -> Any: self.wait() meta = self._get_client(path=load_path) + return meta.client.load(*unpack_nosave_meta(meta), **kwargs) def delete_obj(self, fp: str):