diff --git a/drivers/189pc/driver.go b/drivers/189pc/driver.go index cce1e861..c64c0463 100644 --- a/drivers/189pc/driver.go +++ b/drivers/189pc/driver.go @@ -3,6 +3,7 @@ package _189pc import ( "context" "net/http" + "strconv" "strings" "time" @@ -24,6 +25,8 @@ type Cloud189PC struct { loginParam *LoginParam tokenInfo *AppSessionResp + + uploadThread int } func (y *Cloud189PC) Config() driver.Config { @@ -44,6 +47,12 @@ func (y *Cloud189PC) Init(ctx context.Context) (err error) { y.FamilyID = "" } + // 限制上传线程数 + y.uploadThread, _ = strconv.Atoi(y.UploadThread) + if y.uploadThread < 1 || y.uploadThread > 32 { + y.uploadThread, y.UploadThread = 3, "3" + } + // 初始化请求客户端 if y.client == nil { y.client = base.NewRestyClient().SetHeaders(map[string]string{ diff --git a/drivers/189pc/meta.go b/drivers/189pc/meta.go index dde17842..e4280186 100644 --- a/drivers/189pc/meta.go +++ b/drivers/189pc/meta.go @@ -15,6 +15,7 @@ type Addition struct { Type string `json:"type" type:"select" options:"personal,family" default:"personal"` FamilyID string `json:"family_id"` UploadMethod string `json:"upload_method" type:"select" options:"stream,rapid,old" default:"stream"` + UploadThread string `json:"upload_thread" default:"3" help:"1<=thread<=32"` NoUseOcr bool `json:"no_use_ocr"` } diff --git a/drivers/189pc/utils.go b/drivers/189pc/utils.go index 96bf3ace..d9049c36 100644 --- a/drivers/189pc/utils.go +++ b/drivers/189pc/utils.go @@ -15,6 +15,7 @@ import ( "net/url" "os" "regexp" + "strconv" "strings" "time" @@ -24,6 +25,7 @@ import ( "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/op" "github.com/alist-org/alist/v3/internal/setting" + "github.com/alist-org/alist/v3/pkg/errgroup" "github.com/alist-org/alist/v3/pkg/utils" "github.com/avast/retry-go" @@ -436,14 +438,18 @@ func (y *Cloud189PC) refreshSession() (err error) { // 普通上传 // 无法上传大小为0的文件 func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) { - var DEFAULT = partSize(file.GetSize()) - var count = int(math.Ceil(float64(file.GetSize()) / float64(DEFAULT))) + var sliceSize = partSize(file.GetSize()) + count := int(math.Ceil(float64(file.GetSize()) / float64(sliceSize))) + lastPartSize := file.GetSize() % sliceSize + if file.GetSize() > 0 && lastPartSize == 0 { + lastPartSize = sliceSize + } params := Params{ "parentFolderId": dstDir.GetID(), "fileName": url.QueryEscape(file.GetName()), "fileSize": fmt.Sprint(file.GetSize()), - "sliceSize": fmt.Sprint(DEFAULT), + "sliceSize": fmt.Sprint(sliceSize), "lazyCheck": "1", } @@ -468,17 +474,19 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo fileMd5 := md5.New() silceMd5 := md5.New() silceMd5Hexs := make([]string, 0, count) - byteData := bytes.NewBuffer(make([]byte, DEFAULT)) + byteData := make([]byte, sliceSize) for i := 1; i <= count; i++ { if utils.IsCanceled(ctx) { return nil, ctx.Err() } + if i == count { + byteData = byteData[:lastPartSize] + } + // 读取块 - byteData.Reset() silceMd5.Reset() - _, err := io.CopyN(io.MultiWriter(fileMd5, silceMd5, byteData), file, DEFAULT) - if err != io.EOF && err != io.ErrUnexpectedEOF && err != nil { + if _, err := io.ReadFull(io.TeeReader(file, io.MultiWriter(fileMd5, silceMd5)), byteData); err != io.EOF && err != nil { return nil, err } @@ -504,13 +512,13 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo uploadData := uploadUrl.UploadUrls[fmt.Sprint("partNumber_", i)] err = retry.Do(func() error { - _, err := y.put(ctx, uploadData.RequestURL, ParseHttpHeader(uploadData.RequestHeader), false, bytes.NewReader(byteData.Bytes())) + _, err := y.put(ctx, uploadData.RequestURL, ParseHttpHeader(uploadData.RequestHeader), false, bytes.NewReader(byteData)) return err }, retry.Context(ctx), retry.Attempts(3), retry.Delay(time.Second), - retry.MaxDelay(5*time.Second)) + retry.DelayType(retry.BackOffDelay)) if err != nil { return nil, err } @@ -519,7 +527,7 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo fileMd5Hex := strings.ToUpper(hex.EncodeToString(fileMd5.Sum(nil))) sliceMd5Hex := fileMd5Hex - if file.GetSize() > DEFAULT { + if file.GetSize() > sliceSize { sliceMd5Hex = strings.ToUpper(utils.GetMD5EncodeStr(strings.Join(silceMd5Hexs, "\n"))) } @@ -554,10 +562,15 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode _ = os.Remove(tempFile.Name()) }() - var DEFAULT = partSize(file.GetSize()) - count := int(math.Ceil(float64(file.GetSize()) / float64(DEFAULT))) + var sliceSize = partSize(file.GetSize()) + count := int(math.Ceil(float64(file.GetSize()) / float64(sliceSize))) + lastPartSize := file.GetSize() % sliceSize + if file.GetSize() > 0 && lastPartSize == 0 { + lastPartSize = sliceSize + } - // 优先计算所需信息 + //step.1 优先计算所需信息 + byteSize := sliceSize fileMd5 := md5.New() silceMd5 := md5.New() silceMd5Hexs := make([]string, 0, count) @@ -567,31 +580,32 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode return nil, ctx.Err() } + if i == count { + byteSize = lastPartSize + } + silceMd5.Reset() - if _, err := io.CopyN(io.MultiWriter(fileMd5, silceMd5), tempFile, DEFAULT); err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + if _, err := io.CopyN(io.MultiWriter(fileMd5, silceMd5), tempFile, byteSize); err != nil && err != io.EOF { return nil, err } md5Byte := silceMd5.Sum(nil) silceMd5Hexs = append(silceMd5Hexs, strings.ToUpper(hex.EncodeToString(md5Byte))) silceMd5Base64s = append(silceMd5Base64s, fmt.Sprint(i, "-", base64.StdEncoding.EncodeToString(md5Byte))) } - if _, err = tempFile.Seek(0, io.SeekStart); err != nil { - return nil, err - } fileMd5Hex := strings.ToUpper(hex.EncodeToString(fileMd5.Sum(nil))) sliceMd5Hex := fileMd5Hex - if file.GetSize() > DEFAULT { + if file.GetSize() > sliceSize { sliceMd5Hex = strings.ToUpper(utils.GetMD5EncodeStr(strings.Join(silceMd5Hexs, "\n"))) } - // 检测是否支持快传 + //step.2 预上传 params := Params{ "parentFolderId": dstDir.GetID(), "fileName": url.QueryEscape(file.GetName()), "fileSize": fmt.Sprint(file.GetSize()), "fileMd5": fileMd5Hex, - "sliceSize": fmt.Sprint(DEFAULT), + "sliceSize": fmt.Sprint(sliceSize), "sliceMd5": sliceMd5Hex, } @@ -614,6 +628,7 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode // 网盘中不存在该文件,开始上传 if uploadInfo.Data.FileDataExists != 1 { + // step.3 获取上传切片信息 var uploadUrls UploadUrlsResp _, err = y.request(fullUrl+"/getMultiUploadUrls", http.MethodGet, func(req *resty.Request) { @@ -626,30 +641,36 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode return nil, err } - buf := make([]byte, DEFAULT) - for i := 1; i <= count; i++ { - if utils.IsCanceled(ctx) { - return nil, ctx.Err() + // step.4 上传切片 + threadG, upCtx := errgroup.NewGroupWithContext(ctx, y.uploadThread, + retry.Attempts(3), + retry.Delay(time.Second), + retry.DelayType(retry.BackOffDelay)) + for k, part := range uploadUrls.UploadUrls { + if utils.IsCanceled(upCtx) { + break } - - n, err := io.ReadFull(tempFile, buf) - if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { - return nil, err - } - uploadData := uploadUrls.UploadUrls[fmt.Sprint("partNumber_", i)] - err = retry.Do(func() error { - _, err := y.put(ctx, uploadData.RequestURL, ParseHttpHeader(uploadData.RequestHeader), false, bytes.NewReader(buf[:n])) - return err - }, - retry.Context(ctx), - retry.Attempts(3), - retry.Delay(time.Second), - retry.MaxDelay(5*time.Second)) + partNumber, err := strconv.Atoi(strings.TrimPrefix(k, "partNumber_")) if err != nil { return nil, err } - up(int(i * 100 / count)) + part, byteSize, offset := part, sliceSize, int64(partNumber-1)*sliceSize + if partNumber == count { + byteSize = lastPartSize + } + + threadG.Go(func(ctx context.Context) error { + _, err := y.put(ctx, part.RequestURL, ParseHttpHeader(part.RequestHeader), false, io.NewSectionReader(tempFile, offset, byteSize)) + if err != nil { + return err + } + up(int(threadG.Success()) * 100 / len(uploadUrls.UploadUrls)) + return nil + }) + } + if err = threadG.Wait(); err != nil { + return nil, err } } diff --git a/drivers/aliyundrive_open/driver.go b/drivers/aliyundrive_open/driver.go index c941acef..bc41e56b 100644 --- a/drivers/aliyundrive_open/driver.go +++ b/drivers/aliyundrive_open/driver.go @@ -107,7 +107,9 @@ func (d *AliyundriveOpen) Link(ctx context.Context, file model.Obj, args model.L return d.limitLink(ctx, file) } -func (d *AliyundriveOpen) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) error { +func (d *AliyundriveOpen) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) (model.Obj, error) { + nowTime, _ := getNowTime() + newDir := File{CreatedAt: nowTime, UpdatedAt: nowTime} _, err := d.request("/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) { req.SetBody(base.Json{ "drive_id": d.DriveId, @@ -115,12 +117,16 @@ func (d *AliyundriveOpen) MakeDir(ctx context.Context, parentDir model.Obj, dirN "name": dirName, "type": "folder", "check_name_mode": "refuse", - }) + }).SetResult(&newDir) }) - return err + if err != nil { + return nil, err + } + return fileToObj(newDir), nil } -func (d *AliyundriveOpen) Move(ctx context.Context, srcObj, dstDir model.Obj) error { +func (d *AliyundriveOpen) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) { + var resp MoveOrCopyResp _, err := d.request("/adrive/v1.0/openFile/move", http.MethodPost, func(req *resty.Request) { req.SetBody(base.Json{ "drive_id": d.DriveId, @@ -128,20 +134,36 @@ func (d *AliyundriveOpen) Move(ctx context.Context, srcObj, dstDir model.Obj) er "to_parent_file_id": dstDir.GetID(), "check_name_mode": "refuse", // optional:ignore,auto_rename,refuse //"new_name": "newName", // The new name to use when a file of the same name exists - }) + }).SetResult(&resp) }) - return err + if err != nil { + return nil, err + } + if resp.Exist { + return nil, errors.New("existence of files with the same name") + } + + if srcObj, ok := srcObj.(*model.ObjThumb); ok { + srcObj.ID = resp.FileID + srcObj.Modified = time.Now() + return srcObj, nil + } + return nil, nil } -func (d *AliyundriveOpen) Rename(ctx context.Context, srcObj model.Obj, newName string) error { +func (d *AliyundriveOpen) Rename(ctx context.Context, srcObj model.Obj, newName string) (model.Obj, error) { + var newFile File _, err := d.request("/adrive/v1.0/openFile/update", http.MethodPost, func(req *resty.Request) { req.SetBody(base.Json{ "drive_id": d.DriveId, "file_id": srcObj.GetID(), "name": newName, - }) + }).SetResult(&newFile) }) - return err + if err != nil { + return nil, err + } + return fileToObj(newFile), nil } func (d *AliyundriveOpen) Copy(ctx context.Context, srcObj, dstDir model.Obj) error { @@ -170,7 +192,7 @@ func (d *AliyundriveOpen) Remove(ctx context.Context, obj model.Obj) error { return err } -func (d *AliyundriveOpen) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error { +func (d *AliyundriveOpen) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) { return d.upload(ctx, dstDir, stream, up) } @@ -199,3 +221,7 @@ func (d *AliyundriveOpen) Other(ctx context.Context, args model.OtherArgs) (inte } var _ driver.Driver = (*AliyundriveOpen)(nil) +var _ driver.MkdirResult = (*AliyundriveOpen)(nil) +var _ driver.MoveResult = (*AliyundriveOpen)(nil) +var _ driver.RenameResult = (*AliyundriveOpen)(nil) +var _ driver.PutResult = (*AliyundriveOpen)(nil) diff --git a/drivers/aliyundrive_open/types.go b/drivers/aliyundrive_open/types.go index 6980effd..3ae5961c 100644 --- a/drivers/aliyundrive_open/types.go +++ b/drivers/aliyundrive_open/types.go @@ -17,22 +17,28 @@ type Files struct { } type File struct { - DriveId string `json:"drive_id"` - FileId string `json:"file_id"` - ParentFileId string `json:"parent_file_id"` - Name string `json:"name"` - Size int64 `json:"size"` - FileExtension string `json:"file_extension"` - ContentHash string `json:"content_hash"` - Category string `json:"category"` - Type string `json:"type"` - Thumbnail string `json:"thumbnail"` - Url string `json:"url"` - CreatedAt *time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + DriveId string `json:"drive_id"` + FileId string `json:"file_id"` + ParentFileId string `json:"parent_file_id"` + Name string `json:"name"` + Size int64 `json:"size"` + FileExtension string `json:"file_extension"` + ContentHash string `json:"content_hash"` + Category string `json:"category"` + Type string `json:"type"` + Thumbnail string `json:"thumbnail"` + Url string `json:"url"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + + // create only + FileName string `json:"file_name"` } func fileToObj(f File) *model.ObjThumb { + if f.Name == "" { + f.Name = f.FileName + } return &model.ObjThumb{ Object: model.Object{ ID: f.FileId, @@ -67,3 +73,9 @@ type CreateResp struct { RapidUpload bool `json:"rapid_upload"` PartInfoList []PartInfo `json:"part_info_list"` } + +type MoveOrCopyResp struct { + Exist bool `json:"exist"` + DriveID string `json:"drive_id"` + FileID string `json:"file_id"` +} diff --git a/drivers/aliyundrive_open/upload.go b/drivers/aliyundrive_open/upload.go index f7bb7f28..9a69c7ca 100644 --- a/drivers/aliyundrive_open/upload.go +++ b/drivers/aliyundrive_open/upload.go @@ -19,6 +19,7 @@ import ( "github.com/alist-org/alist/v3/internal/driver" "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/pkg/utils" + "github.com/avast/retry-go" "github.com/go-resty/resty/v2" log "github.com/sirupsen/logrus" ) @@ -65,73 +66,40 @@ func (d *AliyundriveOpen) getUploadUrl(count int, fileId, uploadId string) ([]Pa return resp.PartInfoList, err } -func (d *AliyundriveOpen) uploadPart(ctx context.Context, i, count int, reader *utils.MultiReadable, resp *CreateResp, retry bool) error { - partInfo := resp.PartInfoList[i-1] +func (d *AliyundriveOpen) uploadPart(ctx context.Context, r io.Reader, partInfo PartInfo) error { uploadUrl := partInfo.UploadUrl if d.InternalUpload { uploadUrl = strings.ReplaceAll(uploadUrl, "https://cn-beijing-data.aliyundrive.net/", "http://ccp-bj29-bj-1592982087.oss-cn-beijing-internal.aliyuncs.com/") } - req, err := http.NewRequest("PUT", uploadUrl, reader) + req, err := http.NewRequestWithContext(ctx, "PUT", uploadUrl, r) if err != nil { return err } - req = req.WithContext(ctx) res, err := base.HttpClient.Do(req) if err != nil { - if retry { - reader.Reset() - return d.uploadPart(ctx, i, count, reader, resp, false) - } return err } res.Body.Close() - if retry && res.StatusCode == http.StatusForbidden { - resp.PartInfoList, err = d.getUploadUrl(count, resp.FileId, resp.UploadId) - if err != nil { - return err - } - reader.Reset() - return d.uploadPart(ctx, i, count, reader, resp, false) - } if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusConflict { return fmt.Errorf("upload status: %d", res.StatusCode) } return nil } -func (d *AliyundriveOpen) normalUpload(ctx context.Context, stream model.FileStreamer, up driver.UpdateProgress, createResp CreateResp, count int, partSize int64) error { - log.Debugf("[aliyundive_open] normal upload") - // 2. upload - preTime := time.Now() - for i := 1; i <= len(createResp.PartInfoList); i++ { - if utils.IsCanceled(ctx) { - return ctx.Err() - } - err := d.uploadPart(ctx, i, count, utils.NewMultiReadable(io.LimitReader(stream, partSize)), &createResp, true) - if err != nil { - return err - } - if count > 0 { - up(i * 100 / count) - } - // refresh upload url if 50 minutes passed - if time.Since(preTime) > 50*time.Minute { - createResp.PartInfoList, err = d.getUploadUrl(count, createResp.FileId, createResp.UploadId) - if err != nil { - return err - } - preTime = time.Now() - } - } +func (d *AliyundriveOpen) completeUpload(fileId, uploadId string) (model.Obj, error) { // 3. complete + var newFile File _, err := d.request("/adrive/v1.0/openFile/complete", http.MethodPost, func(req *resty.Request) { req.SetBody(base.Json{ "drive_id": d.DriveId, - "file_id": createResp.FileId, - "upload_id": createResp.UploadId, - }) + "file_id": fileId, + "upload_id": uploadId, + }).SetResult(&newFile) }) - return err + if err != nil { + return nil, err + } + return fileToObj(newFile), nil } type ProofRange struct { @@ -172,7 +140,7 @@ func (d *AliyundriveOpen) calProofCode(file *os.File, fileSize int64) (string, e return base64.StdEncoding.EncodeToString(buf), nil } -func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error { +func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) { // 1. create // Part Size Unit: Bytes, Default: 20MB, // Maximum number of slices 10,000, ≈195.3125GB @@ -194,14 +162,14 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m buf := bytes.NewBuffer(make([]byte, 0, 1024)) _, err := io.CopyN(buf, stream, 1024) if err != nil { - return err + return nil, err } createData["size"] = stream.GetSize() createData["pre_hash"] = utils.GetSHA1Encode(buf.Bytes()) // if support seek, seek to start if localFile, ok := stream.(io.Seeker); ok { if _, err := localFile.Seek(0, io.SeekStart); err != nil { - return err + return nil, err } } else { // Put spliced head back to stream @@ -220,13 +188,13 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m }) if err != nil { if e.Code != "PreHashMatched" || !rapidUpload { - return err + return nil, err } log.Debugf("[aliyundrive_open] pre_hash matched, start rapid upload") // convert to local file file, err := utils.CreateTempFile(stream, stream.GetSize()) if err != nil { - return err + return nil, err } _ = stream.GetReadCloser().Close() stream.SetReadCloser(file) @@ -234,35 +202,62 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m h := sha1.New() _, err = io.Copy(h, file) if err != nil { - return err + return nil, err } delete(createData, "pre_hash") createData["proof_version"] = "v1" createData["content_hash_name"] = "sha1" createData["content_hash"] = hex.EncodeToString(h.Sum(nil)) - // seek to start - if _, err = file.Seek(0, io.SeekStart); err != nil { - return err - } createData["proof_code"], err = d.calProofCode(file, stream.GetSize()) if err != nil { - return fmt.Errorf("cal proof code error: %s", err.Error()) + return nil, fmt.Errorf("cal proof code error: %s", err.Error()) } _, err = d.request("/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) { req.SetBody(createData).SetResult(&createResp) }) if err != nil { - return err + return nil, err } - if createResp.RapidUpload { - log.Debugf("[aliyundrive_open] rapid upload success, file id: %s", createResp.FileId) - return nil - } - // failed to rapid upload, try normal upload + // seek to start if _, err = file.Seek(0, io.SeekStart); err != nil { - return err + return nil, err } } + + if !createResp.RapidUpload { + // 2. upload + log.Debugf("[aliyundive_open] normal upload") + + preTime := time.Now() + for i := 0; i < len(createResp.PartInfoList); i++ { + if utils.IsCanceled(ctx) { + return nil, ctx.Err() + } + // refresh upload url if 50 minutes passed + if time.Since(preTime) > 50*time.Minute { + createResp.PartInfoList, err = d.getUploadUrl(count, createResp.FileId, createResp.UploadId) + if err != nil { + return nil, err + } + preTime = time.Now() + } + rd := utils.NewMultiReadable(io.LimitReader(stream, partSize)) + err = retry.Do(func() error { + rd.Reset() + return d.uploadPart(ctx, rd, createResp.PartInfoList[i]) + }, + retry.Attempts(3), + retry.DelayType(retry.BackOffDelay), + retry.Delay(time.Second)) + if err != nil { + return nil, err + } + } + } else { + log.Debugf("[aliyundrive_open] rapid upload success, file id: %s", createResp.FileId) + } + log.Debugf("[aliyundrive_open] create file success, resp: %+v", createResp) - return d.normalUpload(ctx, stream, up, createResp, count, partSize) + // 3. complete + return d.completeUpload(createResp.FileId, createResp.UploadId) } diff --git a/drivers/aliyundrive_open/util.go b/drivers/aliyundrive_open/util.go index f23b6a1e..29db07df 100644 --- a/drivers/aliyundrive_open/util.go +++ b/drivers/aliyundrive_open/util.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "strings" + "time" "github.com/alist-org/alist/v3/drivers/base" "github.com/alist-org/alist/v3/internal/op" @@ -169,3 +170,9 @@ func (d *AliyundriveOpen) getFiles(ctx context.Context, fileId string) ([]File, } return res, nil } + +func getNowTime() (time.Time, string) { + nowTime := time.Now() + nowTimeStr := nowTime.Format("2006-01-02T15:04:05.000Z") + return nowTime, nowTimeStr +} diff --git a/drivers/baidu_netdisk/driver.go b/drivers/baidu_netdisk/driver.go index 470d3b06..2b837887 100644 --- a/drivers/baidu_netdisk/driver.go +++ b/drivers/baidu_netdisk/driver.go @@ -5,24 +5,28 @@ import ( "crypto/md5" "encoding/hex" "fmt" - "github.com/alist-org/alist/v3/drivers/base" - "github.com/alist-org/alist/v3/internal/driver" - "github.com/alist-org/alist/v3/internal/errs" - "github.com/alist-org/alist/v3/internal/model" - "github.com/alist-org/alist/v3/pkg/utils" - "github.com/avast/retry-go" - log "github.com/sirupsen/logrus" "io" "math" "os" stdpath "path" "strconv" - "strings" + "time" + + "github.com/alist-org/alist/v3/drivers/base" + "github.com/alist-org/alist/v3/internal/driver" + "github.com/alist-org/alist/v3/internal/errs" + "github.com/alist-org/alist/v3/internal/model" + "github.com/alist-org/alist/v3/pkg/errgroup" + "github.com/alist-org/alist/v3/pkg/utils" + "github.com/avast/retry-go" + log "github.com/sirupsen/logrus" ) type BaiduNetdisk struct { model.Storage Addition + + uploadThread int } const BaiduFileAPI = "https://d.pcs.baidu.com/rest/2.0/pcs/superfile2" @@ -37,6 +41,10 @@ func (d *BaiduNetdisk) GetAddition() driver.Additional { } func (d *BaiduNetdisk) Init(ctx context.Context) error { + d.uploadThread, _ = strconv.Atoi(d.UploadThread) + if d.uploadThread < 1 || d.uploadThread > 32 { + d.uploadThread, d.UploadThread = 3, "3" + } res, err := d.get("/xpan/nas", map[string]string{ "method": "uinfo", }, nil) @@ -65,12 +73,16 @@ func (d *BaiduNetdisk) Link(ctx context.Context, file model.Obj, args model.Link return d.linkOfficial(file, args) } -func (d *BaiduNetdisk) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) error { - _, err := d.create(stdpath.Join(parentDir.GetPath(), dirName), 0, 1, "", "") - return err +func (d *BaiduNetdisk) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) (model.Obj, error) { + var newDir File + _, err := d.create(stdpath.Join(parentDir.GetPath(), dirName), 0, 1, "", "", &newDir) + if err != nil { + return nil, err + } + return fileToObj(newDir), nil } -func (d *BaiduNetdisk) Move(ctx context.Context, srcObj, dstDir model.Obj) error { +func (d *BaiduNetdisk) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) { data := []base.Json{ { "path": srcObj.GetPath(), @@ -79,10 +91,18 @@ func (d *BaiduNetdisk) Move(ctx context.Context, srcObj, dstDir model.Obj) error }, } _, err := d.manage("move", data) - return err + if err != nil { + return nil, err + } + if srcObj, ok := srcObj.(*model.ObjThumb); ok { + srcObj.SetPath(stdpath.Join(dstDir.GetPath(), srcObj.GetName())) + srcObj.Modified = time.Now() + return srcObj, nil + } + return nil, nil } -func (d *BaiduNetdisk) Rename(ctx context.Context, srcObj model.Obj, newName string) error { +func (d *BaiduNetdisk) Rename(ctx context.Context, srcObj model.Obj, newName string) (model.Obj, error) { data := []base.Json{ { "path": srcObj.GetPath(), @@ -90,7 +110,17 @@ func (d *BaiduNetdisk) Rename(ctx context.Context, srcObj model.Obj, newName str }, } _, err := d.manage("rename", data) - return err + if err != nil { + return nil, err + } + + if srcObj, ok := srcObj.(*model.ObjThumb); ok { + srcObj.SetPath(stdpath.Join(stdpath.Dir(srcObj.GetPath()), newName)) + srcObj.Name = newName + srcObj.Modified = time.Now() + return srcObj, nil + } + return nil, nil } func (d *BaiduNetdisk) Copy(ctx context.Context, srcObj, dstDir model.Obj) error { @@ -111,63 +141,58 @@ func (d *BaiduNetdisk) Remove(ctx context.Context, obj model.Obj) error { return err } -func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error { - streamSize := stream.GetSize() - +func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) { tempFile, err := utils.CreateTempFile(stream.GetReadCloser(), stream.GetSize()) if err != nil { - return err + return nil, err } defer func() { _ = tempFile.Close() _ = os.Remove(tempFile.Name()) }() - count := int(math.Ceil(float64(streamSize) / float64(DefaultSliceSize))) + streamSize := stream.GetSize() + count := int(math.Max(math.Ceil(float64(streamSize)/float64(DefaultSliceSize)), 1)) + lastBlockSize := streamSize % DefaultSliceSize + if streamSize > 0 && lastBlockSize == 0 { + lastBlockSize = DefaultSliceSize + } + //cal md5 for first 256k data const SliceSize int64 = 256 * 1024 // cal md5 - h1 := md5.New() - h2 := md5.New() - blockList := make([]string, 0) - contentMd5 := "" - sliceMd5 := "" - left := streamSize - for i := 0; i < count; i++ { - byteSize := DefaultSliceSize - if left < DefaultSliceSize { - byteSize = left + blockList := make([]string, 0, count) + byteSize := DefaultSliceSize + fileMd5H := md5.New() + sliceMd5H := md5.New() + sliceMd5H2 := md5.New() + slicemd5H2Write := utils.LimitWriter(sliceMd5H2, SliceSize) + + for i := 1; i <= count; i++ { + if utils.IsCanceled(ctx) { + return nil, ctx.Err() } - left -= byteSize - _, err = io.Copy(io.MultiWriter(h1, h2), io.LimitReader(tempFile, byteSize)) - if err != nil { - return err + if i == count { + byteSize = lastBlockSize } - blockList = append(blockList, fmt.Sprintf("\"%s\"", hex.EncodeToString(h2.Sum(nil)))) - h2.Reset() - } - contentMd5 = hex.EncodeToString(h1.Sum(nil)) - _, err = tempFile.Seek(0, io.SeekStart) - if err != nil { - return err - } - if streamSize <= SliceSize { - sliceMd5 = contentMd5 - } else { - sliceData := make([]byte, SliceSize) - _, err = io.ReadFull(tempFile, sliceData) - if err != nil { - return err + _, err := io.CopyN(io.MultiWriter(fileMd5H, sliceMd5H, slicemd5H2Write), tempFile, byteSize) + if err != nil && err != io.EOF { + return nil, err } - h2.Write(sliceData) - sliceMd5 = hex.EncodeToString(h2.Sum(nil)) + blockList = append(blockList, hex.EncodeToString(sliceMd5H.Sum(nil))) + sliceMd5H.Reset() } + contentMd5 := hex.EncodeToString(fileMd5H.Sum(nil)) + sliceMd5 := hex.EncodeToString(sliceMd5H2.Sum(nil)) + blockListStr, _ := utils.Json.MarshalToString(blockList) + + // step.1 预上传 rawPath := stdpath.Join(dstDir.GetPath(), stream.GetName()) path := encodeURIComponent(rawPath) - block_list_str := fmt.Sprintf("[%s]", strings.Join(blockList, ",")) - data := fmt.Sprintf("path=%s&size=%d&isdir=0&autoinit=1&block_list=%s&content-md5=%s&slice-md5=%s", + + data := fmt.Sprintf("path=%s&size=%d&isdir=0&autoinit=1&rtype=3&block_list=%s&content-md5=%s&slice-md5=%s", path, streamSize, - block_list_str, + blockListStr, contentMd5, sliceMd5) params := map[string]string{ "method": "precreate", @@ -176,52 +201,65 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F var precreateResp PrecreateResp _, err = d.post("/xpan/file", params, data, &precreateResp) if err != nil { - return err + return nil, err } log.Debugf("%+v", precreateResp) if precreateResp.ReturnType == 2 { //rapid upload, since got md5 match from baidu server - return nil - } - params = map[string]string{ - "method": "upload", - "access_token": d.AccessToken, - "type": "tmpfile", - "path": path, - "uploadid": precreateResp.Uploadid, - } - - var offset int64 = 0 - for i, partseq := range precreateResp.BlockList { - params["partseq"] = strconv.Itoa(partseq) - byteSize := int64(math.Min(float64(streamSize-offset), float64(DefaultSliceSize))) - err := retry.Do(func() error { - return d.uploadSlice(ctx, ¶ms, stream.GetName(), tempFile, offset, byteSize) - }, - retry.Context(ctx), - retry.Attempts(3)) if err != nil { - return err - } - offset += byteSize - - if len(precreateResp.BlockList) > 0 { - up(i * 100 / len(precreateResp.BlockList)) + return nil, err } + return fileToObj(precreateResp.File), nil } - _, err = d.create(rawPath, streamSize, 0, precreateResp.Uploadid, block_list_str) - return err -} -func (d *BaiduNetdisk) uploadSlice(ctx context.Context, params *map[string]string, fileName string, file *os.File, offset int64, byteSize int64) error { - _, err := file.Seek(offset, io.SeekStart) + + // step.2 上传分片 + threadG, upCtx := errgroup.NewGroupWithContext(ctx, d.uploadThread, + retry.Attempts(3), + retry.Delay(time.Second), + retry.DelayType(retry.BackOffDelay)) + for _, partseq := range precreateResp.BlockList { + if utils.IsCanceled(upCtx) { + break + } + + partseq, offset, byteSize := partseq, int64(partseq)*DefaultSliceSize, DefaultSliceSize + if partseq+1 == count { + byteSize = lastBlockSize + } + threadG.Go(func(ctx context.Context) error { + params := map[string]string{ + "method": "upload", + "access_token": d.AccessToken, + "type": "tmpfile", + "path": path, + "uploadid": precreateResp.Uploadid, + "partseq": strconv.Itoa(partseq), + } + err := d.uploadSlice(ctx, params, stream.GetName(), io.NewSectionReader(tempFile, offset, byteSize)) + if err != nil { + return err + } + up(int(threadG.Success()) * 100 / len(precreateResp.BlockList)) + return nil + }) + } + if err = threadG.Wait(); err != nil { + return nil, err + } + + // step.3 创建文件 + var newFile File + _, err = d.create(rawPath, streamSize, 0, precreateResp.Uploadid, blockListStr, &newFile) if err != nil { - return err + return nil, err } - + return fileToObj(newFile), nil +} +func (d *BaiduNetdisk) uploadSlice(ctx context.Context, params map[string]string, fileName string, file io.Reader) error { res, err := base.RestyClient.R(). SetContext(ctx). - SetQueryParams(*params). - SetFileReader("file", fileName, io.LimitReader(file, byteSize)). + SetQueryParams(params). + SetFileReader("file", fileName, file). Post(BaiduFileAPI) if err != nil { return err diff --git a/drivers/baidu_netdisk/meta.go b/drivers/baidu_netdisk/meta.go index 4a7bfc8b..d8c0d19f 100644 --- a/drivers/baidu_netdisk/meta.go +++ b/drivers/baidu_netdisk/meta.go @@ -15,6 +15,7 @@ type Addition struct { ClientSecret string `json:"client_secret" required:"true" default:"jXiFMOPVPCWlO2M5CwWQzffpNPaGTRBG"` CustomCrackUA string `json:"custom_crack_ua" required:"true" default:"netdisk"` AccessToken string + UploadThread string `json:"upload_thread" default:"3" help:"1<=thread<=32"` } var config = driver.Config{ diff --git a/drivers/baidu_netdisk/types.go b/drivers/baidu_netdisk/types.go index 6833c780..4effe70f 100644 --- a/drivers/baidu_netdisk/types.go +++ b/drivers/baidu_netdisk/types.go @@ -1,6 +1,7 @@ package baidu_netdisk import ( + "path" "strconv" "time" @@ -17,10 +18,8 @@ type File struct { //OwnerType int `json:"owner_type"` //Category int `json:"category"` //RealCategory string `json:"real_category"` - FsId int64 `json:"fs_id"` - ServerMtime int64 `json:"server_mtime"` + FsId int64 `json:"fs_id"` //OperId int `json:"oper_id"` - //ServerCtime int `json:"server_ctime"` Thumbs struct { //Icon string `json:"icon"` Url3 string `json:"url3"` @@ -28,25 +27,41 @@ type File struct { //Url1 string `json:"url1"` } `json:"thumbs"` //Wpfile int `json:"wpfile"` - //LocalMtime int `json:"local_mtime"` + Size int64 `json:"size"` //ExtentTinyint7 int `json:"extent_tinyint7"` Path string `json:"path"` //Share int `json:"share"` - //ServerAtime int `json:"server_atime"` //Pl int `json:"pl"` - //LocalCtime int `json:"local_ctime"` ServerFilename string `json:"server_filename"` - //Md5 string `json:"md5"` + Md5 string `json:"md5"` //OwnerId int `json:"owner_id"` //Unlist int `json:"unlist"` Isdir int `json:"isdir"` + + // list resp + //ServerCtime int64 `json:"server_ctime"` + ServerMtime int64 `json:"server_mtime"` + //ServerAtime int64 `json:"server_atime"` + //LocalCtime int64 `json:"local_ctime"` + //LocalMtime int64 `json:"local_mtime"` + + // only create and precreate resp + Ctime int64 `json:"ctime"` + Mtime int64 `json:"mtime"` } func fileToObj(f File) *model.ObjThumb { + if f.ServerFilename == "" { + f.ServerFilename = path.Base(f.Path) + } + if f.ServerMtime == 0 { + f.ServerMtime = int64(f.Mtime) + } return &model.ObjThumb{ Object: model.Object{ ID: strconv.FormatInt(f.FsId, 10), + Path: f.Path, Name: f.ServerFilename, Size: f.Size, Modified: time.Unix(f.ServerMtime, 0), @@ -154,10 +169,15 @@ type DownloadResp2 struct { } type PrecreateResp struct { - Path string `json:"path"` - Uploadid string `json:"uploadid"` - ReturnType int `json:"return_type"` - BlockList []int `json:"block_list"` - Errno int `json:"errno"` - RequestId int64 `json:"request_id"` + Errno int `json:"errno"` + RequestId int64 `json:"request_id"` + ReturnType int `json:"return_type"` + + // return_type=1 + Path string `json:"path"` + Uploadid string `json:"uploadid"` + BlockList []int `json:"block_list"` + + // return_type=2 + File File `json:"info"` } diff --git a/drivers/baidu_netdisk/util.go b/drivers/baidu_netdisk/util.go index bb344967..81b798e5 100644 --- a/drivers/baidu_netdisk/util.go +++ b/drivers/baidu_netdisk/util.go @@ -2,17 +2,18 @@ package baidu_netdisk import ( "fmt" - "github.com/avast/retry-go" "net/http" "net/url" "strconv" "strings" + "time" "github.com/alist-org/alist/v3/drivers/base" "github.com/alist-org/alist/v3/internal/errs" "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/op" "github.com/alist-org/alist/v3/pkg/utils" + "github.com/avast/retry-go" "github.com/go-resty/resty/v2" log "github.com/sirupsen/logrus" ) @@ -76,12 +77,20 @@ func (d *BaiduNetdisk) request(furl string, method string, callback base.ReqCall return err2 } } - return fmt.Errorf("req: [%s] ,errno: %d, refer to https://pan.baidu.com/union/doc/", furl, errno) + + err2 := fmt.Errorf("req: [%s] ,errno: %d, refer to https://pan.baidu.com/union/doc/", furl, errno) + if !utils.SliceContains([]int{2}, errno) { + err2 = retry.Unrecoverable(err2) + } + return err2 } result = res.Body() return nil }, - retry.Attempts(3)) + retry.LastErrorOnly(true), + retry.Attempts(5), + retry.Delay(time.Second), + retry.DelayType(retry.BackOffDelay)) return result, err } @@ -179,20 +188,17 @@ func (d *BaiduNetdisk) linkCrack(file model.Obj, args model.LinkArgs) (*model.Li }, nil } -func (d *BaiduNetdisk) manage(opera string, filelist interface{}) ([]byte, error) { +func (d *BaiduNetdisk) manage(opera string, filelist any) ([]byte, error) { params := map[string]string{ "method": "filemanager", "opera": opera, } - marshal, err := utils.Json.Marshal(filelist) - if err != nil { - return nil, err - } - data := fmt.Sprintf("async=0&filelist=%s&ondup=newcopy", string(marshal)) + marshal, _ := utils.Json.MarshalToString(filelist) + data := fmt.Sprintf("async=0&filelist=%s&ondup=fail", marshal) return d.post("/xpan/file", params, data, nil) } -func (d *BaiduNetdisk) create(path string, size int64, isdir int, uploadid, block_list string) ([]byte, error) { +func (d *BaiduNetdisk) create(path string, size int64, isdir int, uploadid, block_list string, resp any) ([]byte, error) { params := map[string]string{ "method": "create", } @@ -200,7 +206,7 @@ func (d *BaiduNetdisk) create(path string, size int64, isdir int, uploadid, bloc if uploadid != "" { data += fmt.Sprintf("&uploadid=%s&block_list=%s", uploadid, block_list) } - return d.post("/xpan/file", params, data, nil) + return d.post("/xpan/file", params, data, resp) } func encodeURIComponent(str string) string { diff --git a/drivers/baidu_photo/driver.go b/drivers/baidu_photo/driver.go index 760a5976..c92c48c5 100644 --- a/drivers/baidu_photo/driver.go +++ b/drivers/baidu_photo/driver.go @@ -11,11 +11,14 @@ import ( "regexp" "strconv" "strings" + "time" "github.com/alist-org/alist/v3/internal/driver" "github.com/alist-org/alist/v3/internal/errs" "github.com/alist-org/alist/v3/internal/model" + "github.com/alist-org/alist/v3/pkg/errgroup" "github.com/alist-org/alist/v3/pkg/utils" + "github.com/avast/retry-go" "github.com/go-resty/resty/v2" ) @@ -26,6 +29,8 @@ type BaiduPhoto struct { AccessToken string Uk int64 root model.Obj + + uploadThread int } func (d *BaiduPhoto) Config() driver.Config { @@ -37,6 +42,11 @@ func (d *BaiduPhoto) GetAddition() driver.Additional { } func (d *BaiduPhoto) Init(ctx context.Context) error { + d.uploadThread, _ = strconv.Atoi(d.UploadThread) + if d.uploadThread < 1 || d.uploadThread > 32 { + d.uploadThread, d.UploadThread = 3, "3" + } + if err := d.refreshToken(); err != nil { return err } @@ -211,6 +221,11 @@ func (d *BaiduPhoto) Remove(ctx context.Context, obj model.Obj) error { } func (d *BaiduPhoto) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) { + // 不支持大小为0的文件 + if stream.GetSize() == 0 { + return nil, fmt.Errorf("file size cannot be zero") + } + // 需要获取完整文件md5,必须支持 io.Seek tempFile, err := utils.CreateTempFile(stream.GetReadCloser(), stream.GetSize()) if err != nil { @@ -221,35 +236,43 @@ func (d *BaiduPhoto) Put(ctx context.Context, dstDir model.Obj, stream model.Fil _ = os.Remove(tempFile.Name()) }() - // 计算需要的数据 - const DEFAULT = 1 << 22 - const SliceSize = 1 << 18 - count := int(math.Ceil(float64(stream.GetSize()) / float64(DEFAULT))) + const DEFAULT int64 = 1 << 22 + const SliceSize int64 = 1 << 18 + // 计算需要的数据 + streamSize := stream.GetSize() + count := int(math.Ceil(float64(streamSize) / float64(DEFAULT))) + lastBlockSize := streamSize % DEFAULT + if lastBlockSize == 0 { + lastBlockSize = DEFAULT + } + + // step.1 计算MD5 sliceMD5List := make([]string, 0, count) - fileMd5 := md5.New() - sliceMd5 := md5.New() - sliceMd52 := md5.New() - slicemd52Write := utils.LimitWriter(sliceMd52, SliceSize) + byteSize := int64(DEFAULT) + fileMd5H := md5.New() + sliceMd5H := md5.New() + sliceMd5H2 := md5.New() + slicemd5H2Write := utils.LimitWriter(sliceMd5H2, SliceSize) for i := 1; i <= count; i++ { if utils.IsCanceled(ctx) { return nil, ctx.Err() } - - _, err := io.CopyN(io.MultiWriter(fileMd5, sliceMd5, slicemd52Write), tempFile, DEFAULT) - if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + if i == count { + byteSize = lastBlockSize + } + _, err := io.CopyN(io.MultiWriter(fileMd5H, sliceMd5H, slicemd5H2Write), tempFile, byteSize) + if err != nil && err != io.EOF { return nil, err } - sliceMD5List = append(sliceMD5List, hex.EncodeToString(sliceMd5.Sum(nil))) - sliceMd5.Reset() + sliceMD5List = append(sliceMD5List, hex.EncodeToString(sliceMd5H.Sum(nil))) + sliceMd5H.Reset() } - if _, err = tempFile.Seek(0, io.SeekStart); err != nil { - return nil, err - } - content_md5 := hex.EncodeToString(fileMd5.Sum(nil)) - slice_md5 := hex.EncodeToString(sliceMd52.Sum(nil)) + contentMd5 := hex.EncodeToString(fileMd5H.Sum(nil)) + sliceMd5 := hex.EncodeToString(sliceMd5H2.Sum(nil)) + blockListStr, _ := utils.Json.MarshalToString(sliceMD5List) - // 开始执行上传 + // step.2 预上传 params := map[string]string{ "autoinit": "1", "isdir": "0", @@ -257,12 +280,11 @@ func (d *BaiduPhoto) Put(ctx context.Context, dstDir model.Obj, stream model.Fil "ctype": "11", "path": fmt.Sprintf("/%s", stream.GetName()), "size": fmt.Sprint(stream.GetSize()), - "slice-md5": slice_md5, - "content-md5": content_md5, - "block_list": MustString(utils.Json.MarshalToString(sliceMD5List)), + "slice-md5": sliceMd5, + "content-md5": contentMd5, + "block_list": blockListStr, } - // 预上传 var precreateResp PrecreateResp _, err = d.Post(FILE_API_URL_V1+"/precreate", func(r *resty.Request) { r.SetContext(ctx) @@ -273,30 +295,46 @@ func (d *BaiduPhoto) Put(ctx context.Context, dstDir model.Obj, stream model.Fil } switch precreateResp.ReturnType { - case 1: // 上传文件 - uploadParams := map[string]string{ - "method": "upload", - "path": params["path"], - "uploadid": precreateResp.UploadID, - } + case 1: //step.3 上传文件切片 + threadG, upCtx := errgroup.NewGroupWithContext(ctx, d.uploadThread, + retry.Attempts(3), + retry.Delay(time.Second), + retry.DelayType(retry.BackOffDelay)) + for _, partseq := range precreateResp.BlockList { + if utils.IsCanceled(upCtx) { + break + } - for i := 0; i < count; i++ { - if utils.IsCanceled(ctx) { - return nil, ctx.Err() + partseq, offset, byteSize := partseq, int64(partseq)*DEFAULT, DEFAULT + if partseq+1 == count { + byteSize = lastBlockSize } - uploadParams["partseq"] = fmt.Sprint(i) - _, err = d.Post("https://c3.pcs.baidu.com/rest/2.0/pcs/superfile2", func(r *resty.Request) { - r.SetContext(ctx) - r.SetQueryParams(uploadParams) - r.SetFileReader("file", stream.GetName(), io.LimitReader(tempFile, DEFAULT)) - }, nil) - if err != nil { - return nil, err - } - up(i * 100 / count) + + threadG.Go(func(ctx context.Context) error { + uploadParams := map[string]string{ + "method": "upload", + "path": params["path"], + "partseq": fmt.Sprint(partseq), + "uploadid": precreateResp.UploadID, + } + + _, err = d.Post("https://c3.pcs.baidu.com/rest/2.0/pcs/superfile2", func(r *resty.Request) { + r.SetContext(ctx) + r.SetQueryParams(uploadParams) + r.SetFileReader("file", stream.GetName(), io.NewSectionReader(tempFile, offset, byteSize)) + }, nil) + if err != nil { + return err + } + up(int(threadG.Success()) * 100 / len(precreateResp.BlockList)) + return nil + }) + } + if err = threadG.Wait(); err != nil { + return nil, err } fallthrough - case 2: // 创建文件 + case 2: //step.4 创建文件 params["uploadid"] = precreateResp.UploadID _, err = d.Post(FILE_API_URL_V1+"/create", func(r *resty.Request) { r.SetContext(ctx) @@ -306,7 +344,7 @@ func (d *BaiduPhoto) Put(ctx context.Context, dstDir model.Obj, stream model.Fil return nil, err } fallthrough - case 3: // 增加到相册 + case 3: //step.5 增加到相册 rootfile := precreateResp.Data.toFile() if album, ok := dstDir.(*Album); ok { return d.AddAlbumFile(ctx, album, rootfile) diff --git a/drivers/baidu_photo/meta.go b/drivers/baidu_photo/meta.go index e562b540..da2229f5 100644 --- a/drivers/baidu_photo/meta.go +++ b/drivers/baidu_photo/meta.go @@ -13,6 +13,7 @@ type Addition struct { DeleteOrigin bool `json:"delete_origin"` ClientID string `json:"client_id" required:"true" default:"iYCeC9g08h5vuP9UqvPHKKSVrKFXGa1v"` ClientSecret string `json:"client_secret" required:"true" default:"jXiFMOPVPCWlO2M5CwWQzffpNPaGTRBG"` + UploadThread string `json:"upload_thread" default:"3" help:"1<=thread<=32"` } var config = driver.Config{ diff --git a/drivers/baidu_photo/types.go b/drivers/baidu_photo/types.go index b701a4da..7ac66570 100644 --- a/drivers/baidu_photo/types.go +++ b/drivers/baidu_photo/types.go @@ -160,9 +160,9 @@ type ( CreateFileResp //不存在返回 - Path string `json:"path"` - UploadID string `json:"uploadid"` - Blocklist []int64 `json:"block_list"` + Path string `json:"path"` + UploadID string `json:"uploadid"` + BlockList []int `json:"block_list"` } ) diff --git a/drivers/mopan/driver.go b/drivers/mopan/driver.go index edbcfe3a..18b34516 100644 --- a/drivers/mopan/driver.go +++ b/drivers/mopan/driver.go @@ -7,12 +7,14 @@ import ( "io" "net/http" "os" + "strconv" "time" "github.com/alist-org/alist/v3/drivers/base" "github.com/alist-org/alist/v3/internal/driver" "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/op" + "github.com/alist-org/alist/v3/pkg/errgroup" "github.com/alist-org/alist/v3/pkg/utils" "github.com/avast/retry-go" "github.com/foxxorcat/mopan-sdk-go" @@ -23,7 +25,8 @@ type MoPan struct { Addition client *mopan.MoClient - userID string + userID string + uploadThread int } func (d *MoPan) Config() driver.Config { @@ -35,6 +38,10 @@ func (d *MoPan) GetAddition() driver.Additional { } func (d *MoPan) Init(ctx context.Context) error { + d.uploadThread, _ = strconv.Atoi(d.UploadThread) + if d.uploadThread < 1 || d.uploadThread > 32 { + d.uploadThread, d.UploadThread = 3, "3" + } login := func() error { data, err := d.client.Login(d.Phone, d.Password) if err != nil { @@ -49,7 +56,7 @@ func (d *MoPan) Init(ctx context.Context) error { d.userID = info.UserID return nil } - d.client = mopan.NewMoClient(). + d.client = mopan.NewMoClientWithRestyClient(base.NewRestyClient()). SetRestyClient(base.RestyClient). SetOnAuthorizationExpired(func(_ error) error { err := login() @@ -221,6 +228,7 @@ func (d *MoPan) Put(ctx context.Context, dstDir model.Obj, stream model.FileStre _ = os.Remove(file.Name()) }() + // step.1 initUpdload, err := d.client.InitMultiUpload(ctx, mopan.UpdloadFileParam{ ParentFolderId: dstDir.GetID(), FileName: stream.GetName(), @@ -234,46 +242,50 @@ func (d *MoPan) Put(ctx context.Context, dstDir model.Obj, stream model.FileStre } if !initUpdload.FileDataExists { + // step.2 parts, err := d.client.GetAllMultiUploadUrls(initUpdload.UploadFileID, initUpdload.PartInfo) if err != nil { return nil, err } d.client.CloudDiskStartBusiness() - for i, part := range parts { - if utils.IsCanceled(ctx) { - return nil, ctx.Err() + + // step.3 + threadG, upCtx := errgroup.NewGroupWithContext(ctx, d.uploadThread, + retry.Attempts(3), + retry.Delay(time.Second), + retry.DelayType(retry.BackOffDelay)) + for _, part := range parts { + if utils.IsCanceled(upCtx) { + break } - err := retry.Do(func() error { - if _, err := file.Seek(int64(part.PartNumber-1)*int64(initUpdload.PartSize), io.SeekStart); err != nil { - return retry.Unrecoverable(err) - } + part, byteSize := part, initUpdload.PartSize + if part.PartNumber == len(initUpdload.PartInfo) { + byteSize = initUpdload.LastPartSize + } - req, err := part.NewRequest(ctx, io.LimitReader(file, int64(initUpdload.PartSize))) + threadG.Go(func(ctx context.Context) error { + req, err := part.NewRequest(ctx, io.NewSectionReader(file, int64(part.PartNumber-1)*initUpdload.PartSize, byteSize)) if err != nil { return err } - resp, err := base.HttpClient.Do(req) if err != nil { return err } - + resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("upload err,code=%d", resp.StatusCode) } + up(100 * int(threadG.Success()) / len(parts)) return nil - }, - retry.Context(ctx), - retry.Attempts(3), - retry.Delay(time.Second), - retry.MaxDelay(5*time.Second)) - if err != nil { - return nil, err - } - up(100 * (i + 1) / len(parts)) + }) + } + if err = threadG.Wait(); err != nil { + return nil, err } } + //step.4 uFile, err := d.client.CommitMultiUploadFile(initUpdload.UploadFileID, nil) if err != nil { return nil, err diff --git a/drivers/mopan/meta.go b/drivers/mopan/meta.go index 2b64955b..ecea3bbd 100644 --- a/drivers/mopan/meta.go +++ b/drivers/mopan/meta.go @@ -17,6 +17,8 @@ type Addition struct { OrderDirection string `json:"order_direction" type:"select" options:"asc,desc" default:"asc"` DeviceInfo string `json:"device_info"` + + UploadThread string `json:"upload_thread" default:"3" help:"1<=thread<=32"` } func (a *Addition) GetRootId() string { @@ -24,7 +26,7 @@ func (a *Addition) GetRootId() string { } var config = driver.Config{ - Name: "MoPan", + Name: "MoPan", // DefaultRoot: "root, / or other", CheckStatus: true, Alert: "warning|This network disk may store your password in clear text. Please set your password carefully", diff --git a/drivers/weiyun/driver.go b/drivers/weiyun/driver.go index 3bd622a2..628536f0 100644 --- a/drivers/weiyun/driver.go +++ b/drivers/weiyun/driver.go @@ -2,11 +2,12 @@ package weiyun import ( "context" + "fmt" "io" "math" "net/http" "os" - "sync" + "strconv" "time" "github.com/alist-org/alist/v3/drivers/base" @@ -15,7 +16,9 @@ import ( "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/op" "github.com/alist-org/alist/v3/pkg/cron" + "github.com/alist-org/alist/v3/pkg/errgroup" "github.com/alist-org/alist/v3/pkg/utils" + "github.com/avast/retry-go" weiyunsdkgo "github.com/foxxorcat/weiyun-sdk-go" ) @@ -26,6 +29,8 @@ type WeiYun struct { client *weiyunsdkgo.WeiYunClient cron *cron.Cron rootFolder *Folder + + uploadThread int } func (d *WeiYun) Config() driver.Config { @@ -37,7 +42,13 @@ func (d *WeiYun) GetAddition() driver.Additional { } func (d *WeiYun) Init(ctx context.Context) error { - d.client = weiyunsdkgo.NewWeiYunClientWithRestyClient(base.RestyClient) + // 限制上传线程数 + d.uploadThread, _ = strconv.Atoi(d.UploadThread) + if d.uploadThread < 4 || d.uploadThread > 32 { + d.uploadThread, d.UploadThread = 4, "4" + } + + d.client = weiyunsdkgo.NewWeiYunClientWithRestyClient(base.NewRestyClient()) err := d.client.SetCookiesStr(d.Cookies).RefreshCtoken() if err != nil { return err @@ -77,6 +88,10 @@ func (d *WeiYun) Init(ctx context.Context) error { if err != nil { return err } + if len(folders) == 0 { + return fmt.Errorf("invalid directory ID") + } + folder := folders[len(folders)-1] d.rootFolder = &Folder{ PFolder: &Folder{ @@ -187,6 +202,7 @@ func (d *WeiYun) MakeDir(ctx context.Context, parentDir model.Obj, dirName strin } func (d *WeiYun) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) { + // TODO: 默认策略为重命名,使用缓存可能出现冲突。微云app也有这个冲突,不知道腾讯怎么搞的 if dstDir, ok := dstDir.(*Folder); ok { dstParam := weiyunsdkgo.FolderParam{ PdirKey: dstDir.GetPKey(), @@ -204,7 +220,6 @@ func (d *WeiYun) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, if err != nil { return nil, err } - return &File{ PFolder: dstDir, File: srcObj.File, @@ -219,7 +234,6 @@ func (d *WeiYun) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, if err != nil { return nil, err } - return &Folder{ PFolder: dstDir, Folder: srcObj.Folder, @@ -271,7 +285,6 @@ func (d *WeiYun) Rename(ctx context.Context, srcObj model.Obj, newName string) ( } func (d *WeiYun) Copy(ctx context.Context, srcObj, dstDir model.Obj) error { - // TODO copy obj, optional return errs.NotImplement } @@ -292,7 +305,6 @@ func (d *WeiYun) Remove(ctx context.Context, obj model.Obj) error { DirName: obj.GetName(), }) } - // TODO remove obj, optional return errs.NotSupport } @@ -325,35 +337,44 @@ func (d *WeiYun) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr // fast upload if !preData.FileExist { - // step 2. - upCtx, cancel := context.WithCancelCause(ctx) - var wg sync.WaitGroup + // step.2 增加上传通道 + if len(preData.ChannelList) < d.uploadThread { + newCh, err := d.client.AddUploadChannel(len(preData.ChannelList), d.uploadThread, preData.UploadAuthData) + if err != nil { + return nil, err + } + preData.ChannelList = append(preData.ChannelList, newCh.AddChannels...) + } + // step.3 上传 + threadG, upCtx := errgroup.NewGroupWithContext(ctx, len(preData.ChannelList), + retry.Attempts(3), + retry.Delay(time.Second), + retry.DelayType(retry.BackOffDelay)) + for _, channel := range preData.ChannelList { - wg.Add(1) - go func(channel weiyunsdkgo.UploadChannelData) { - defer wg.Done() - if utils.IsCanceled(upCtx) { - return - } + if utils.IsCanceled(upCtx) { + break + } + + var channel = channel + threadG.Go(func(ctx context.Context) error { for { channel.Len = int(math.Min(float64(stream.GetSize()-channel.Offset), float64(channel.Len))) upData, err := d.client.UploadFile(upCtx, channel, preData.UploadAuthData, io.NewSectionReader(file, channel.Offset, int64(channel.Len))) if err != nil { - cancel(err) - return + return err } // 上传完成 if upData.UploadState != 1 { - return + return nil } channel = upData.Channel } - }(channel) + }) } - wg.Wait() - if utils.IsCanceled(upCtx) { - return nil, context.Cause(upCtx) + if err = threadG.Wait(); err != nil { + return nil, err } } diff --git a/drivers/weiyun/meta.go b/drivers/weiyun/meta.go index c9bbbf7b..11200b6b 100644 --- a/drivers/weiyun/meta.go +++ b/drivers/weiyun/meta.go @@ -10,6 +10,7 @@ type Addition struct { Cookies string `json:"cookies" required:"true"` OrderBy string `json:"order_by" type:"select" options:"name,size,updated_at" default:"name"` OrderDirection string `json:"order_direction" type:"select" options:"asc,desc" default:"asc"` + UploadThread string `json:"upload_thread" default:"4" help:"4<=thread<=32"` } var config = driver.Config{ diff --git a/go.mod b/go.mod index 60f73db2..aaea9010 100644 --- a/go.mod +++ b/go.mod @@ -15,8 +15,8 @@ require ( github.com/deckarep/golang-set/v2 v2.3.0 github.com/disintegration/imaging v1.6.2 github.com/dustinxie/ecc v0.0.0-20210511000915-959544187564 - github.com/foxxorcat/mopan-sdk-go v0.1.1 - github.com/foxxorcat/weiyun-sdk-go v0.1.1 + github.com/foxxorcat/mopan-sdk-go v0.1.2 + github.com/foxxorcat/weiyun-sdk-go v0.1.2 github.com/gin-contrib/cors v1.4.0 github.com/gin-gonic/gin v1.9.1 github.com/go-resty/resty/v2 v2.7.0 diff --git a/go.sum b/go.sum index 9654023f..ab65353b 100644 --- a/go.sum +++ b/go.sum @@ -114,10 +114,10 @@ github.com/disintegration/imaging v1.6.2 h1:w1LecBlG2Lnp8B3jk5zSuNqd7b4DXhcjwek1 github.com/disintegration/imaging v1.6.2/go.mod h1:44/5580QXChDfwIclfc/PCwrr44amcmDAg8hxG0Ewe4= github.com/dustinxie/ecc v0.0.0-20210511000915-959544187564 h1:I6KUy4CI6hHjqnyJLNCEi7YHVMkwwtfSr2k9splgdSM= github.com/dustinxie/ecc v0.0.0-20210511000915-959544187564/go.mod h1:yekO+3ZShy19S+bsmnERmznGy9Rfg6dWWWpiGJjNAz8= -github.com/foxxorcat/mopan-sdk-go v0.1.1 h1:JYMeCu4PFpqgHapvOz4jPMT7CxR6Yebu3aWkgGMDeIU= -github.com/foxxorcat/mopan-sdk-go v0.1.1/go.mod h1:LpBPmwezjQNyhaNo3HGzgFtQbhvxmF5ZybSVuKi7OVA= -github.com/foxxorcat/weiyun-sdk-go v0.1.1 h1:m4qcJk0adr+bpM4es2zCqP3jhMEwEPyTMGICsamygEQ= -github.com/foxxorcat/weiyun-sdk-go v0.1.1/go.mod h1:AKsLFuWhWlClpGrg1zxTdMejugZEZtmhIuElAk3W83s= +github.com/foxxorcat/mopan-sdk-go v0.1.2 h1:1QM/JUWQpjbu53pVvbMfp3PLz09duGY5Mid3Qlc9WOk= +github.com/foxxorcat/mopan-sdk-go v0.1.2/go.mod h1:iWHA2JFhzmKR28ySp1ON0g6DjLaYtvb5jhTqPVTDW9A= +github.com/foxxorcat/weiyun-sdk-go v0.1.2 h1:waRWIBmjL9GCcndJ8HvOYrrVB4hhoPYzRrn3I/Cnzqw= +github.com/foxxorcat/weiyun-sdk-go v0.1.2/go.mod h1:AKsLFuWhWlClpGrg1zxTdMejugZEZtmhIuElAk3W83s= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= diff --git a/pkg/errgroup/errgroup.go b/pkg/errgroup/errgroup.go new file mode 100644 index 00000000..5fc63d66 --- /dev/null +++ b/pkg/errgroup/errgroup.go @@ -0,0 +1,93 @@ +package errgroup + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + + "github.com/avast/retry-go" +) + +type token struct{} +type Group struct { + cancel func(error) + ctx context.Context + opts []retry.Option + + success uint64 + + wg sync.WaitGroup + sem chan token +} + +func NewGroupWithContext(ctx context.Context, limit int, retryOpts ...retry.Option) (*Group, context.Context) { + ctx, cancel := context.WithCancelCause(ctx) + return (&Group{cancel: cancel, ctx: ctx, opts: retryOpts}).SetLimit(limit), ctx +} + +func (g *Group) done() { + if g.sem != nil { + <-g.sem + } + g.wg.Done() + atomic.AddUint64(&g.success, 1) +} + +func (g *Group) Wait() error { + g.wg.Wait() + return context.Cause(g.ctx) +} + +func (g *Group) Go(f func(ctx context.Context) error) { + if g.sem != nil { + g.sem <- token{} + } + + g.wg.Add(1) + go func() { + defer g.done() + if err := retry.Do(func() error { return f(g.ctx) }, g.opts...); err != nil { + g.cancel(err) + } + }() +} + +func (g *Group) TryGo(f func(ctx context.Context) error) bool { + if g.sem != nil { + select { + case g.sem <- token{}: + default: + return false + } + } + + g.wg.Add(1) + go func() { + defer g.done() + if err := retry.Do(func() error { return f(g.ctx) }, g.opts...); err != nil { + g.cancel(err) + } + }() + return true +} + +func (g *Group) SetLimit(n int) *Group { + if len(g.sem) != 0 { + panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) + } + if n > 0 { + g.sem = make(chan token, n) + } else { + g.sem = nil + } + return g +} + +func (g *Group) Success() uint64 { + return atomic.LoadUint64(&g.success) +} + +func (g *Group) Err() error { + return context.Cause(g.ctx) +} diff --git a/pkg/utils/io.go b/pkg/utils/io.go index 7af7136a..e0fea4a6 100644 --- a/pkg/utils/io.go +++ b/pkg/utils/io.go @@ -4,9 +4,10 @@ import ( "bytes" "context" "fmt" - log "github.com/sirupsen/logrus" "io" "time" + + log "github.com/sirupsen/logrus" ) // here is some syntaxic sugar inspired by the Tomas Senart's video, @@ -47,31 +48,22 @@ func CopyWithCtx(ctx context.Context, out io.Writer, in io.Reader, size int64, p type limitWriter struct { w io.Writer - count int64 limit int64 } -func (l limitWriter) Write(p []byte) (n int, err error) { - wn := int(l.limit - l.count) - if wn > len(p) { - wn = len(p) - } - if wn > 0 { - if n, err = l.w.Write(p[:wn]); err != nil { - return - } - if n < wn { - err = io.ErrShortWrite +func (l *limitWriter) Write(p []byte) (n int, err error) { + if l.limit > 0 { + if int64(len(p)) > l.limit { + p = p[:l.limit] } + l.limit -= int64(len(p)) + _, err = l.w.Write(p) } - if err == nil { - n = len(p) - } - return + return len(p), err } -func LimitWriter(w io.Writer, size int64) io.Writer { - return &limitWriter{w: w, limit: size} +func LimitWriter(w io.Writer, limit int64) io.Writer { + return &limitWriter{w: w, limit: limit} } type ReadCloser struct {