mirror of https://github.com/Xhofe/alist
parent
a4de04528a
commit
d4ea8787c9
|
@ -34,6 +34,25 @@ func (d *Pan123) getS3PreSignedUrls(ctx context.Context, upReq *UploadResp, star
|
|||
return &s3PreSignedUrls, nil
|
||||
}
|
||||
|
||||
func (d *Pan123) getS3Auth(ctx context.Context, upReq *UploadResp, start, end int) (*S3PreSignedURLs, error) {
|
||||
data := base.Json{
|
||||
"StorageNode": upReq.Data.StorageNode,
|
||||
"bucket": upReq.Data.Bucket,
|
||||
"key": upReq.Data.Key,
|
||||
"partNumberEnd": end,
|
||||
"partNumberStart": start,
|
||||
"uploadId": upReq.Data.UploadId,
|
||||
}
|
||||
var s3PreSignedUrls S3PreSignedURLs
|
||||
_, err := d.request(S3Auth, http.MethodPost, func(req *resty.Request) {
|
||||
req.SetBody(data).SetContext(ctx)
|
||||
}, &s3PreSignedUrls)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &s3PreSignedUrls, nil
|
||||
}
|
||||
|
||||
func (d *Pan123) completeS3(ctx context.Context, upReq *UploadResp, file model.FileStreamer, isMultipart bool) error {
|
||||
data := base.Json{
|
||||
"StorageNode": upReq.Data.StorageNode,
|
||||
|
@ -51,11 +70,17 @@ func (d *Pan123) completeS3(ctx context.Context, upReq *UploadResp, file model.F
|
|||
}
|
||||
|
||||
func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.FileStreamer, reader io.Reader, up driver.UpdateProgress) error {
|
||||
chunkSize := int64(1024 * 1024 * 5)
|
||||
chunkSize := int64(1024 * 1024 * 16)
|
||||
// fetch s3 pre signed urls
|
||||
chunkCount := int(math.Ceil(float64(file.GetSize()) / float64(chunkSize)))
|
||||
// upload 10 chunks each batch
|
||||
batchSize := 10
|
||||
// only 1 batch is allowed
|
||||
isMultipart := chunkCount > 1
|
||||
batchSize := 1
|
||||
getS3UploadUrl := d.getS3Auth
|
||||
if isMultipart {
|
||||
batchSize = 10
|
||||
getS3UploadUrl = d.getS3PreSignedUrls
|
||||
}
|
||||
for i := 1; i <= chunkCount; i += batchSize {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
|
@ -65,7 +90,7 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
|
|||
if end > chunkCount+1 {
|
||||
end = chunkCount + 1
|
||||
}
|
||||
s3PreSignedUrls, err := d.getS3PreSignedUrls(ctx, upReq, start, end)
|
||||
s3PreSignedUrls, err := getS3UploadUrl(ctx, upReq, start, end)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -78,7 +103,7 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
|
|||
if j == chunkCount {
|
||||
curSize = file.GetSize() - (int64(chunkCount)-1)*chunkSize
|
||||
}
|
||||
err = d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, j, end, io.LimitReader(reader, chunkSize), curSize, false)
|
||||
err = d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, j, end, io.LimitReader(reader, chunkSize), curSize, false, getS3UploadUrl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -89,7 +114,7 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
|
|||
return d.completeS3(ctx, upReq, file, chunkCount > 1)
|
||||
}
|
||||
|
||||
func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSignedUrls *S3PreSignedURLs, cur, end int, reader io.Reader, curSize int64, retry bool) error {
|
||||
func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSignedUrls *S3PreSignedURLs, cur, end int, reader io.Reader, curSize int64, retry bool, getS3UploadUrl func(ctx context.Context, upReq *UploadResp, start int, end int) (*S3PreSignedURLs, error)) error {
|
||||
uploadUrl := s3PreSignedUrls.Data.PreSignedUrls[strconv.Itoa(cur)]
|
||||
if uploadUrl == "" {
|
||||
return fmt.Errorf("upload url is empty, s3PreSignedUrls: %+v", s3PreSignedUrls)
|
||||
|
@ -111,13 +136,13 @@ func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSign
|
|||
return fmt.Errorf("upload s3 chunk %d failed, status code: %d", cur, res.StatusCode)
|
||||
}
|
||||
// refresh s3 pre signed urls
|
||||
newS3PreSignedUrls, err := d.getS3PreSignedUrls(ctx, upReq, cur, end)
|
||||
newS3PreSignedUrls, err := getS3UploadUrl(ctx, upReq, cur, end)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s3PreSignedUrls.Data.PreSignedUrls = newS3PreSignedUrls.Data.PreSignedUrls
|
||||
// retry
|
||||
return d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, cur, end, reader, curSize, true)
|
||||
return d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, cur, end, reader, curSize, true, getS3UploadUrl)
|
||||
}
|
||||
if res.StatusCode != http.StatusOK {
|
||||
body, err := io.ReadAll(res.Body)
|
||||
|
|
Loading…
Reference in New Issue