mirror of https://github.com/Xhofe/alist
				
				
				
			* fix(aliyundrive_open): refresh upload url for large files * fix(aliyundrive_open): retry upload on url expiry * fix(aliyundrive_open): ignore 409 error * feat(aliyundrive): cleanup upload retry logic * feat(util): add multireadable io utility * feat(aliyundrive_open): make upload fully stream * feat(aliyundrive_open): refresh upload url every 20 puts * fix(aliyundrive_open): part info panic * chore: change refresh upload url strategy --------- Co-authored-by: Andy Hsu <i@nn.ci>pull/4027/head
							parent
							
								
									3b07c72f88
								
							
						
					
					
						commit
						4ec274e748
					
				| 
						 | 
				
			
			@ -5,7 +5,7 @@ import (
 | 
			
		|||
	"io"
 | 
			
		||||
	"math"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/alist-org/alist/v3/drivers/base"
 | 
			
		||||
	"github.com/alist-org/alist/v3/internal/driver"
 | 
			
		||||
| 
						 | 
				
			
			@ -148,11 +148,7 @@ func (d *AliyundriveOpen) Put(ctx context.Context, dstDir model.Obj, stream mode
 | 
			
		|||
	count := 1
 | 
			
		||||
	if stream.GetSize() > DEFAULT {
 | 
			
		||||
		count = int(math.Ceil(float64(stream.GetSize()) / float64(DEFAULT)))
 | 
			
		||||
		partInfoList := make([]base.Json, 0, count)
 | 
			
		||||
		for i := 1; i <= count; i++ {
 | 
			
		||||
			partInfoList = append(partInfoList, base.Json{"part_number": i})
 | 
			
		||||
		}
 | 
			
		||||
		createData["part_info_list"] = partInfoList
 | 
			
		||||
		createData["part_info_list"] = makePartInfos(count)
 | 
			
		||||
	}
 | 
			
		||||
	var createResp CreateResp
 | 
			
		||||
	_, err := d.request("/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) {
 | 
			
		||||
| 
						 | 
				
			
			@ -162,28 +158,26 @@ func (d *AliyundriveOpen) Put(ctx context.Context, dstDir model.Obj, stream mode
 | 
			
		|||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	// 2. upload
 | 
			
		||||
	for i, partInfo := range createResp.PartInfoList {
 | 
			
		||||
	preTime := time.Now()
 | 
			
		||||
	for i := 1; i <= len(createResp.PartInfoList); i++ {
 | 
			
		||||
		if utils.IsCanceled(ctx) {
 | 
			
		||||
			return ctx.Err()
 | 
			
		||||
		}
 | 
			
		||||
		uploadUrl := partInfo.UploadUrl
 | 
			
		||||
		if d.InternalUpload {
 | 
			
		||||
			//Replace a known public Host with an internal Host
 | 
			
		||||
			uploadUrl = strings.ReplaceAll(uploadUrl, "https://cn-beijing-data.aliyundrive.net/", "http://ccp-bj29-bj-1592982087.oss-cn-beijing-internal.aliyuncs.com/")
 | 
			
		||||
		}
 | 
			
		||||
		req, err := http.NewRequest("PUT", uploadUrl, io.LimitReader(stream, DEFAULT))
 | 
			
		||||
		err = d.uploadPart(ctx, i, count, utils.NewMultiReadable(io.LimitReader(stream, DEFAULT)), &createResp, true)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		req = req.WithContext(ctx)
 | 
			
		||||
		res, err := base.HttpClient.Do(req)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		res.Body.Close()
 | 
			
		||||
		if count > 0 {
 | 
			
		||||
			up(i * 100 / count)
 | 
			
		||||
		}
 | 
			
		||||
		// refresh upload url if 50 minutes passed
 | 
			
		||||
		if time.Since(preTime) > 50*time.Minute {
 | 
			
		||||
			createResp.PartInfoList, err = d.getUploadUrl(count, createResp.FileId, createResp.UploadId)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
			preTime = time.Now()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	// 3. complete
 | 
			
		||||
	_, err = d.request("/adrive/v1.0/openFile/complete", http.MethodPost, func(req *resty.Request) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -45,6 +45,14 @@ func fileToObj(f File) *model.ObjThumb {
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type PartInfo struct {
 | 
			
		||||
	Etag        interface{} `json:"etag"`
 | 
			
		||||
	PartNumber  int         `json:"part_number"`
 | 
			
		||||
	PartSize    interface{} `json:"part_size"`
 | 
			
		||||
	UploadUrl   string      `json:"upload_url"`
 | 
			
		||||
	ContentType string      `json:"content_type"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type CreateResp struct {
 | 
			
		||||
	//Type         string `json:"type"`
 | 
			
		||||
	//ParentFileId string `json:"parent_file_id"`
 | 
			
		||||
| 
						 | 
				
			
			@ -56,12 +64,6 @@ type CreateResp struct {
 | 
			
		|||
	//FileName     string `json:"file_name"`
 | 
			
		||||
	UploadId string `json:"upload_id"`
 | 
			
		||||
	//Location     string `json:"location"`
 | 
			
		||||
	RapidUpload  bool `json:"rapid_upload"`
 | 
			
		||||
	PartInfoList []struct {
 | 
			
		||||
		Etag        interface{} `json:"etag"`
 | 
			
		||||
		PartNumber  int         `json:"part_number"`
 | 
			
		||||
		PartSize    interface{} `json:"part_size"`
 | 
			
		||||
		UploadUrl   string      `json:"upload_url"`
 | 
			
		||||
		ContentType string      `json:"content_type"`
 | 
			
		||||
	} `json:"part_info_list"`
 | 
			
		||||
	RapidUpload  bool       `json:"rapid_upload"`
 | 
			
		||||
	PartInfoList []PartInfo `json:"part_info_list"`
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,12 +1,15 @@
 | 
			
		|||
package aliyundrive_open
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"strings"
 | 
			
		||||
 | 
			
		||||
	"github.com/alist-org/alist/v3/drivers/base"
 | 
			
		||||
	"github.com/alist-org/alist/v3/internal/op"
 | 
			
		||||
	"github.com/alist-org/alist/v3/pkg/utils"
 | 
			
		||||
	"github.com/go-resty/resty/v2"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -106,3 +109,59 @@ func (d *AliyundriveOpen) getFiles(fileId string) ([]File, error) {
 | 
			
		|||
	}
 | 
			
		||||
	return res, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func makePartInfos(size int) []base.Json {
 | 
			
		||||
	partInfoList := make([]base.Json, size)
 | 
			
		||||
	for i := 0; i < size; i++ {
 | 
			
		||||
		partInfoList[i] = base.Json{"part_number": 1 + i}
 | 
			
		||||
	}
 | 
			
		||||
	return partInfoList
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *AliyundriveOpen) getUploadUrl(count int, fileId, uploadId string) ([]PartInfo, error) {
 | 
			
		||||
	partInfoList := makePartInfos(count)
 | 
			
		||||
	var resp CreateResp
 | 
			
		||||
	_, err := d.request("/adrive/v1.0/openFile/getUploadUrl", http.MethodPost, func(req *resty.Request) {
 | 
			
		||||
		req.SetBody(base.Json{
 | 
			
		||||
			"drive_id":       d.DriveId,
 | 
			
		||||
			"file_id":        fileId,
 | 
			
		||||
			"part_info_list": partInfoList,
 | 
			
		||||
			"upload_id":      uploadId,
 | 
			
		||||
		}).SetResult(&resp)
 | 
			
		||||
	})
 | 
			
		||||
	return resp.PartInfoList, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *AliyundriveOpen) uploadPart(ctx context.Context, i, count int, reader *utils.MultiReadable, resp *CreateResp, retry bool) error {
 | 
			
		||||
	partInfo := resp.PartInfoList[i-1]
 | 
			
		||||
	uploadUrl := partInfo.UploadUrl
 | 
			
		||||
	if d.InternalUpload {
 | 
			
		||||
		uploadUrl = strings.ReplaceAll(uploadUrl, "https://cn-beijing-data.aliyundrive.net/", "http://ccp-bj29-bj-1592982087.oss-cn-beijing-internal.aliyuncs.com/")
 | 
			
		||||
	}
 | 
			
		||||
	req, err := http.NewRequest("PUT", uploadUrl, reader)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	req = req.WithContext(ctx)
 | 
			
		||||
	res, err := base.HttpClient.Do(req)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if retry {
 | 
			
		||||
			reader.Reset()
 | 
			
		||||
			return d.uploadPart(ctx, i, count, reader, resp, false)
 | 
			
		||||
		}
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	res.Body.Close()
 | 
			
		||||
	if retry && res.StatusCode == http.StatusForbidden {
 | 
			
		||||
		resp.PartInfoList, err = d.getUploadUrl(count, resp.FileId, resp.UploadId)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		reader.Reset()
 | 
			
		||||
		return d.uploadPart(ctx, i, count, reader, resp, false)
 | 
			
		||||
	}
 | 
			
		||||
	if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusConflict {
 | 
			
		||||
		return fmt.Errorf("upload status: %d", res.StatusCode)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,6 +1,7 @@
 | 
			
		|||
package utils
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"context"
 | 
			
		||||
	"io"
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -91,3 +92,46 @@ func NewReadCloser(reader io.Reader, close CloseFunc) io.ReadCloser {
 | 
			
		|||
func NewLimitReadCloser(reader io.Reader, close CloseFunc, limit int64) io.ReadCloser {
 | 
			
		||||
	return NewReadCloser(io.LimitReader(reader, limit), close)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type MultiReadable struct {
 | 
			
		||||
	originReader io.Reader
 | 
			
		||||
	reader       io.Reader
 | 
			
		||||
	cache        *bytes.Buffer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewMultiReadable(reader io.Reader) *MultiReadable {
 | 
			
		||||
	return &MultiReadable{
 | 
			
		||||
		originReader: reader,
 | 
			
		||||
		reader:       reader,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mr *MultiReadable) Read(p []byte) (int, error) {
 | 
			
		||||
	n, err := mr.reader.Read(p)
 | 
			
		||||
	if _, ok := mr.reader.(io.Seeker); !ok && n > 0 {
 | 
			
		||||
		if mr.cache == nil {
 | 
			
		||||
			mr.cache = &bytes.Buffer{}
 | 
			
		||||
		}
 | 
			
		||||
		mr.cache.Write(p[:n])
 | 
			
		||||
	}
 | 
			
		||||
	return n, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mr *MultiReadable) Reset() error {
 | 
			
		||||
	if seeker, ok := mr.reader.(io.Seeker); ok {
 | 
			
		||||
		_, err := seeker.Seek(0, io.SeekStart)
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if mr.cache != nil && mr.cache.Len() > 0 {
 | 
			
		||||
		mr.reader = io.MultiReader(mr.cache, mr.reader)
 | 
			
		||||
		mr.cache = nil
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mr *MultiReadable) Close() error {
 | 
			
		||||
	if closer, ok := mr.originReader.(io.Closer); ok {
 | 
			
		||||
		return closer.Close()
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue