multipart upload

pull/520/head
lijiaxing 2023-11-28 15:37:26 +08:00
parent 06e8301861
commit 4e4fb52898
1 changed files with 34 additions and 1 deletions

View File

@ -30,6 +30,8 @@ except ImportError:
try: try:
import tos import tos
from tos import DataTransferType
from tos.utils import SizeAdapter, MergeProcess
except ImportError: except ImportError:
pass pass
@ -66,6 +68,12 @@ def llm_save(save_path: str, saved_obj: Any, **kwargs):
storage_manager.save(save_path, to_save_obj=saved_obj, **kwargs) storage_manager.save(save_path, to_save_obj=saved_obj, **kwargs)
def percentage(consumed_bytes: int, total_bytes: int, rw_once_bytes: int, type: DataTransferType):
if total_bytes and gpc.is_rank_for_log():
rate = int(100 * float(consumed_bytes) / float(total_bytes))
logger.info(f"rate:{rate}, consumed_bytes:{consumed_bytes},total_bytes{total_bytes}, rw_once_bytes:{rw_once_bytes}, type:{type}")
class StorageClient: class StorageClient:
""" """
StorageClient as a client for s3 storage access. StorageClient as a client for s3 storage access.
@ -537,7 +545,30 @@ class VolcClient(StorageClient):
@staticmethod @staticmethod
def async_upload_fileobj(handler, bucket_name: str, fp: str, local_nvme_path: str): def async_upload_fileobj(handler, bucket_name: str, fp: str, local_nvme_path: str):
try: try:
handler.client.put_object_from_file(bucket_name, fp, local_nvme_path) total_size = os.path.getsize(local_nvme_path)
part_size = 5 * 1024 * 1024
data_transfer_listener = MergeProcess(percentage, total_size, (total_size + part_size - 1) // part_size, 0)
multi_result = handler.client.create_multipart_upload(bucket_name, fp)
upload_id = multi_result.upload_id
parts = []
# 上传分片数据
with open(local_nvme_path, 'rb') as f:
part_number = 1
offset = 0
while offset < total_size:
num_to_upload = min(part_size, total_size - offset)
out = handler.client.upload_part(bucket_name, fp, upload_id, part_number,
content=SizeAdapter(f, num_to_upload, init_offset=offset),
data_transfer_listener=data_transfer_listener)
parts.append(out)
offset += num_to_upload
part_number += 1
# 完成分片上传任务
handler.client.complete_multipart_upload(bucket_name, fp, upload_id, parts)
except handler.handler.exceptions.TosClientError as exc: except handler.handler.exceptions.TosClientError as exc:
raise RuntimeError( raise RuntimeError(
f"Volc Network Error: fail with client error, message:{exc.message}, cause: {exc.cause}" f"Volc Network Error: fail with client error, message:{exc.message}, cause: {exc.cause}"
@ -548,6 +579,8 @@ class VolcClient(StorageClient):
f"error with request id: {exec.request_id}", f"error with request id: {exec.request_id}",
f"error with message: {exec.message}", f"error with message: {exec.message}",
f"error with http code: {exec.status_code}", f"error with http code: {exec.status_code}",
f"error with ec: {exec.ec}",
f"error with request url: {exec.request_url}",
) from exc ) from exc
except Exception as e: except Exception as e:
raise e raise e