From 4ec274e748557c62a402bd5ed11c7a03eedf3d3c Mon Sep 17 00:00:00 2001 From: NewbieOrange Date: Sat, 1 Apr 2023 14:54:29 +0800 Subject: [PATCH] fix(aliyundrive_open): refresh upload url if expired (#3999 close #3823) * fix(aliyundrive_open): refresh upload url for large files * fix(aliyundrive_open): retry upload on url expiry * fix(aliyundrive_open): ignore 409 error * feat(aliyundrive): cleanup upload retry logic * feat(util): add multireadable io utility * feat(aliyundrive_open): make upload fully stream * feat(aliyundrive_open): refresh upload url every 20 puts * fix(aliyundrive_open): part info panic * chore: change refresh upload url strategy --------- Co-authored-by: Andy Hsu --- drivers/aliyundrive_open/driver.go | 32 +++++++--------- drivers/aliyundrive_open/types.go | 18 +++++---- drivers/aliyundrive_open/util.go | 59 ++++++++++++++++++++++++++++++ pkg/utils/io.go | 44 ++++++++++++++++++++++ 4 files changed, 126 insertions(+), 27 deletions(-) diff --git a/drivers/aliyundrive_open/driver.go b/drivers/aliyundrive_open/driver.go index ace2d4cc..4e85bceb 100644 --- a/drivers/aliyundrive_open/driver.go +++ b/drivers/aliyundrive_open/driver.go @@ -5,7 +5,7 @@ import ( "io" "math" "net/http" - "strings" + "time" "github.com/alist-org/alist/v3/drivers/base" "github.com/alist-org/alist/v3/internal/driver" @@ -148,11 +148,7 @@ func (d *AliyundriveOpen) Put(ctx context.Context, dstDir model.Obj, stream mode count := 1 if stream.GetSize() > DEFAULT { count = int(math.Ceil(float64(stream.GetSize()) / float64(DEFAULT))) - partInfoList := make([]base.Json, 0, count) - for i := 1; i <= count; i++ { - partInfoList = append(partInfoList, base.Json{"part_number": i}) - } - createData["part_info_list"] = partInfoList + createData["part_info_list"] = makePartInfos(count) } var createResp CreateResp _, err := d.request("/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) { @@ -162,28 +158,26 @@ func (d *AliyundriveOpen) Put(ctx context.Context, dstDir model.Obj, stream mode return err } // 2. upload - for i, partInfo := range createResp.PartInfoList { + preTime := time.Now() + for i := 1; i <= len(createResp.PartInfoList); i++ { if utils.IsCanceled(ctx) { return ctx.Err() } - uploadUrl := partInfo.UploadUrl - if d.InternalUpload { - //Replace a known public Host with an internal Host - uploadUrl = strings.ReplaceAll(uploadUrl, "https://cn-beijing-data.aliyundrive.net/", "http://ccp-bj29-bj-1592982087.oss-cn-beijing-internal.aliyuncs.com/") - } - req, err := http.NewRequest("PUT", uploadUrl, io.LimitReader(stream, DEFAULT)) + err = d.uploadPart(ctx, i, count, utils.NewMultiReadable(io.LimitReader(stream, DEFAULT)), &createResp, true) if err != nil { return err } - req = req.WithContext(ctx) - res, err := base.HttpClient.Do(req) - if err != nil { - return err - } - res.Body.Close() if count > 0 { up(i * 100 / count) } + // refresh upload url if 50 minutes passed + if time.Since(preTime) > 50*time.Minute { + createResp.PartInfoList, err = d.getUploadUrl(count, createResp.FileId, createResp.UploadId) + if err != nil { + return err + } + preTime = time.Now() + } } // 3. complete _, err = d.request("/adrive/v1.0/openFile/complete", http.MethodPost, func(req *resty.Request) { diff --git a/drivers/aliyundrive_open/types.go b/drivers/aliyundrive_open/types.go index 3aceb16e..6980effd 100644 --- a/drivers/aliyundrive_open/types.go +++ b/drivers/aliyundrive_open/types.go @@ -45,6 +45,14 @@ func fileToObj(f File) *model.ObjThumb { } } +type PartInfo struct { + Etag interface{} `json:"etag"` + PartNumber int `json:"part_number"` + PartSize interface{} `json:"part_size"` + UploadUrl string `json:"upload_url"` + ContentType string `json:"content_type"` +} + type CreateResp struct { //Type string `json:"type"` //ParentFileId string `json:"parent_file_id"` @@ -56,12 +64,6 @@ type CreateResp struct { //FileName string `json:"file_name"` UploadId string `json:"upload_id"` //Location string `json:"location"` - RapidUpload bool `json:"rapid_upload"` - PartInfoList []struct { - Etag interface{} `json:"etag"` - PartNumber int `json:"part_number"` - PartSize interface{} `json:"part_size"` - UploadUrl string `json:"upload_url"` - ContentType string `json:"content_type"` - } `json:"part_info_list"` + RapidUpload bool `json:"rapid_upload"` + PartInfoList []PartInfo `json:"part_info_list"` } diff --git a/drivers/aliyundrive_open/util.go b/drivers/aliyundrive_open/util.go index e2687d81..55e2b27c 100644 --- a/drivers/aliyundrive_open/util.go +++ b/drivers/aliyundrive_open/util.go @@ -1,12 +1,15 @@ package aliyundrive_open import ( + "context" "errors" "fmt" "net/http" + "strings" "github.com/alist-org/alist/v3/drivers/base" "github.com/alist-org/alist/v3/internal/op" + "github.com/alist-org/alist/v3/pkg/utils" "github.com/go-resty/resty/v2" ) @@ -106,3 +109,59 @@ func (d *AliyundriveOpen) getFiles(fileId string) ([]File, error) { } return res, nil } + +func makePartInfos(size int) []base.Json { + partInfoList := make([]base.Json, size) + for i := 0; i < size; i++ { + partInfoList[i] = base.Json{"part_number": 1 + i} + } + return partInfoList +} + +func (d *AliyundriveOpen) getUploadUrl(count int, fileId, uploadId string) ([]PartInfo, error) { + partInfoList := makePartInfos(count) + var resp CreateResp + _, err := d.request("/adrive/v1.0/openFile/getUploadUrl", http.MethodPost, func(req *resty.Request) { + req.SetBody(base.Json{ + "drive_id": d.DriveId, + "file_id": fileId, + "part_info_list": partInfoList, + "upload_id": uploadId, + }).SetResult(&resp) + }) + return resp.PartInfoList, err +} + +func (d *AliyundriveOpen) uploadPart(ctx context.Context, i, count int, reader *utils.MultiReadable, resp *CreateResp, retry bool) error { + partInfo := resp.PartInfoList[i-1] + uploadUrl := partInfo.UploadUrl + if d.InternalUpload { + uploadUrl = strings.ReplaceAll(uploadUrl, "https://cn-beijing-data.aliyundrive.net/", "http://ccp-bj29-bj-1592982087.oss-cn-beijing-internal.aliyuncs.com/") + } + req, err := http.NewRequest("PUT", uploadUrl, reader) + if err != nil { + return err + } + req = req.WithContext(ctx) + res, err := base.HttpClient.Do(req) + if err != nil { + if retry { + reader.Reset() + return d.uploadPart(ctx, i, count, reader, resp, false) + } + return err + } + res.Body.Close() + if retry && res.StatusCode == http.StatusForbidden { + resp.PartInfoList, err = d.getUploadUrl(count, resp.FileId, resp.UploadId) + if err != nil { + return err + } + reader.Reset() + return d.uploadPart(ctx, i, count, reader, resp, false) + } + if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusConflict { + return fmt.Errorf("upload status: %d", res.StatusCode) + } + return nil +} diff --git a/pkg/utils/io.go b/pkg/utils/io.go index 76b514a5..f3831a73 100644 --- a/pkg/utils/io.go +++ b/pkg/utils/io.go @@ -1,6 +1,7 @@ package utils import ( + "bytes" "context" "io" ) @@ -91,3 +92,46 @@ func NewReadCloser(reader io.Reader, close CloseFunc) io.ReadCloser { func NewLimitReadCloser(reader io.Reader, close CloseFunc, limit int64) io.ReadCloser { return NewReadCloser(io.LimitReader(reader, limit), close) } + +type MultiReadable struct { + originReader io.Reader + reader io.Reader + cache *bytes.Buffer +} + +func NewMultiReadable(reader io.Reader) *MultiReadable { + return &MultiReadable{ + originReader: reader, + reader: reader, + } +} + +func (mr *MultiReadable) Read(p []byte) (int, error) { + n, err := mr.reader.Read(p) + if _, ok := mr.reader.(io.Seeker); !ok && n > 0 { + if mr.cache == nil { + mr.cache = &bytes.Buffer{} + } + mr.cache.Write(p[:n]) + } + return n, err +} + +func (mr *MultiReadable) Reset() error { + if seeker, ok := mr.reader.(io.Seeker); ok { + _, err := seeker.Seek(0, io.SeekStart) + return err + } + if mr.cache != nil && mr.cache.Len() > 0 { + mr.reader = io.MultiReader(mr.cache, mr.reader) + mr.cache = nil + } + return nil +} + +func (mr *MultiReadable) Close() error { + if closer, ok := mr.originReader.(io.Closer); ok { + return closer.Close() + } + return nil +}