diff --git a/internlm/utils/storage_manager.py b/internlm/utils/storage_manager.py index c76b570..86adaaf 100644 --- a/internlm/utils/storage_manager.py +++ b/internlm/utils/storage_manager.py @@ -30,6 +30,8 @@ except ImportError: try: import tos + from tos import DataTransferType + from tos.utils import SizeAdapter, MergeProcess except ImportError: pass @@ -66,6 +68,12 @@ def llm_save(save_path: str, saved_obj: Any, **kwargs): storage_manager.save(save_path, to_save_obj=saved_obj, **kwargs) +def percentage(consumed_bytes: int, total_bytes: int, rw_once_bytes: int, type: DataTransferType): + if total_bytes and gpc.is_rank_for_log(): + rate = int(100 * float(consumed_bytes) / float(total_bytes)) + logger.info(f"rate:{rate}, consumed_bytes:{consumed_bytes},total_bytes{total_bytes}, rw_once_bytes:{rw_once_bytes}, type:{type}") + + class StorageClient: """ StorageClient as a client for s3 storage access. @@ -537,7 +545,30 @@ class VolcClient(StorageClient): @staticmethod def async_upload_fileobj(handler, bucket_name: str, fp: str, local_nvme_path: str): try: - handler.client.put_object_from_file(bucket_name, fp, local_nvme_path) + total_size = os.path.getsize(local_nvme_path) + part_size = 5 * 1024 * 1024 + + data_transfer_listener = MergeProcess(percentage, total_size, (total_size + part_size - 1) // part_size, 0) + multi_result = handler.client.create_multipart_upload(bucket_name, fp) + + upload_id = multi_result.upload_id + parts = [] + + # 上传分片数据 + with open(local_nvme_path, 'rb') as f: + part_number = 1 + offset = 0 + while offset < total_size: + num_to_upload = min(part_size, total_size - offset) + out = handler.client.upload_part(bucket_name, fp, upload_id, part_number, + content=SizeAdapter(f, num_to_upload, init_offset=offset), + data_transfer_listener=data_transfer_listener) + parts.append(out) + offset += num_to_upload + part_number += 1 + + # 完成分片上传任务 + handler.client.complete_multipart_upload(bucket_name, fp, upload_id, parts) except handler.handler.exceptions.TosClientError as exc: raise RuntimeError( f"Volc Network Error: fail with client error, message:{exc.message}, cause: {exc.cause}" @@ -548,6 +579,8 @@ class VolcClient(StorageClient): f"error with request id: {exec.request_id}", f"error with message: {exec.message}", f"error with http code: {exec.status_code}", + f"error with ec: {exec.ec}", + f"error with request url: {exec.request_url}", ) from exc except Exception as e: raise e