diff --git a/drivers/189pc/help.go b/drivers/189pc/help.go index c7b2e12a..ba1f3f08 100644 --- a/drivers/189pc/help.go +++ b/drivers/189pc/help.go @@ -160,9 +160,8 @@ func toDesc(o string) string { func ParseHttpHeader(str string) map[string]string { header := make(map[string]string) for _, value := range strings.Split(str, "&") { - i := strings.Index(value, "=") - if i > 0 { - header[strings.TrimSpace(value[0:i])] = strings.TrimSpace(value[i+1:]) + if k, v, found := strings.Cut(value, "="); found { + header[k] = v } } return header diff --git a/drivers/189pc/types.go b/drivers/189pc/types.go index a3ce0145..1087d33b 100644 --- a/drivers/189pc/types.go +++ b/drivers/189pc/types.go @@ -239,14 +239,25 @@ type InitMultiUploadResp struct { } `json:"data"` } type UploadUrlsResp struct { - Code string `json:"code"` - UploadUrls map[string]Part `json:"uploadUrls"` + Code string `json:"code"` + Data map[string]UploadUrlsData `json:"uploadUrls"` } -type Part struct { +type UploadUrlsData struct { RequestURL string `json:"requestURL"` RequestHeader string `json:"requestHeader"` } +type UploadUrlInfo struct { + PartNumber int + Headers map[string]string + UploadUrlsData +} + +type UploadProgress struct { + UploadInfo InitMultiUploadResp + UploadParts []string +} + /* 第二种上传方式 */ type CreateUploadFileResp struct { // 上传文件请求ID diff --git a/drivers/189pc/utils.go b/drivers/189pc/utils.go index d9049c36..a35a0efd 100644 --- a/drivers/189pc/utils.go +++ b/drivers/189pc/utils.go @@ -15,6 +15,7 @@ import ( "net/url" "os" "regexp" + "sort" "strconv" "strings" "time" @@ -471,15 +472,21 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo return nil, err } + threadG, upCtx := errgroup.NewGroupWithContext(ctx, y.uploadThread, + retry.Attempts(3), + retry.Delay(time.Second), + retry.DelayType(retry.BackOffDelay)) + fileMd5 := md5.New() silceMd5 := md5.New() silceMd5Hexs := make([]string, 0, count) - byteData := make([]byte, sliceSize) + for i := 1; i <= count; i++ { - if utils.IsCanceled(ctx) { - return nil, ctx.Err() + if utils.IsCanceled(upCtx) { + break } + byteData := make([]byte, sliceSize) if i == count { byteData = byteData[:lastPartSize] } @@ -493,36 +500,26 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo // 计算块md5并进行hex和base64编码 md5Bytes := silceMd5.Sum(nil) silceMd5Hexs = append(silceMd5Hexs, strings.ToUpper(hex.EncodeToString(md5Bytes))) - silceMd5Base64 := base64.StdEncoding.EncodeToString(md5Bytes) + partInfo := fmt.Sprintf("%d-%s", i, base64.StdEncoding.EncodeToString(md5Bytes)) - // 获取上传链接 - var uploadUrl UploadUrlsResp - _, err = y.request(fullUrl+"/getMultiUploadUrls", http.MethodGet, - func(req *resty.Request) { - req.SetContext(ctx) - }, Params{ - "partInfo": fmt.Sprintf("%d-%s", i, silceMd5Base64), - "uploadFileId": initMultiUpload.Data.UploadFileID, - }, &uploadUrl) - if err != nil { - return nil, err - } + threadG.Go(func(ctx context.Context) error { + uploadUrls, err := y.GetMultiUploadUrls(ctx, initMultiUpload.Data.UploadFileID, partInfo) + if err != nil { + return err + } - // 开始上传 - uploadData := uploadUrl.UploadUrls[fmt.Sprint("partNumber_", i)] - - err = retry.Do(func() error { - _, err := y.put(ctx, uploadData.RequestURL, ParseHttpHeader(uploadData.RequestHeader), false, bytes.NewReader(byteData)) - return err - }, - retry.Context(ctx), - retry.Attempts(3), - retry.Delay(time.Second), - retry.DelayType(retry.BackOffDelay)) - if err != nil { - return nil, err - } - up(int(i * 100 / count)) + // step.4 上传切片 + uploadUrl := uploadUrls[0] + _, err = y.put(ctx, uploadUrl.RequestURL, uploadUrl.Headers, false, bytes.NewReader(byteData)) + if err != nil { + return err + } + up(int(threadG.Success()) * 100 / count) + return nil + }) + } + if err = threadG.Wait(); err != nil { + return nil, err } fileMd5Hex := strings.ToUpper(hex.EncodeToString(fileMd5.Sum(nil))) @@ -564,9 +561,9 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode var sliceSize = partSize(file.GetSize()) count := int(math.Ceil(float64(file.GetSize()) / float64(sliceSize))) - lastPartSize := file.GetSize() % sliceSize - if file.GetSize() > 0 && lastPartSize == 0 { - lastPartSize = sliceSize + lastSliceSize := file.GetSize() % sliceSize + if file.GetSize() > 0 && lastSliceSize == 0 { + lastSliceSize = sliceSize } //step.1 优先计算所需信息 @@ -574,14 +571,14 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode fileMd5 := md5.New() silceMd5 := md5.New() silceMd5Hexs := make([]string, 0, count) - silceMd5Base64s := make([]string, 0, count) + partInfos := make([]string, 0, count) for i := 1; i <= count; i++ { if utils.IsCanceled(ctx) { return nil, ctx.Err() } if i == count { - byteSize = lastPartSize + byteSize = lastSliceSize } silceMd5.Reset() @@ -590,7 +587,7 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode } md5Byte := silceMd5.Sum(nil) silceMd5Hexs = append(silceMd5Hexs, strings.ToUpper(hex.EncodeToString(md5Byte))) - silceMd5Base64s = append(silceMd5Base64s, fmt.Sprint(i, "-", base64.StdEncoding.EncodeToString(md5Byte))) + partInfos = append(partInfos, fmt.Sprint(i, "-", base64.StdEncoding.EncodeToString(md5Byte))) } fileMd5Hex := strings.ToUpper(hex.EncodeToString(fileMd5.Sum(nil))) @@ -599,88 +596,95 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode sliceMd5Hex = strings.ToUpper(utils.GetMD5EncodeStr(strings.Join(silceMd5Hexs, "\n"))) } - //step.2 预上传 - params := Params{ - "parentFolderId": dstDir.GetID(), - "fileName": url.QueryEscape(file.GetName()), - "fileSize": fmt.Sprint(file.GetSize()), - "fileMd5": fileMd5Hex, - "sliceSize": fmt.Sprint(sliceSize), - "sliceMd5": sliceMd5Hex, - } - fullUrl := UPLOAD_URL if y.isFamily() { - params.Set("familyId", y.FamilyID) fullUrl += "/family" } else { //params.Set("extend", `{"opScene":"1","relativepath":"","rootfolderid":""}`) fullUrl += "/person" } - var uploadInfo InitMultiUploadResp - _, err = y.request(fullUrl+"/initMultiUpload", http.MethodGet, func(req *resty.Request) { - req.SetContext(ctx) - }, params, &uploadInfo) - if err != nil { - return nil, err - } - - // 网盘中不存在该文件,开始上传 - if uploadInfo.Data.FileDataExists != 1 { - // step.3 获取上传切片信息 - var uploadUrls UploadUrlsResp - _, err = y.request(fullUrl+"/getMultiUploadUrls", http.MethodGet, - func(req *resty.Request) { - req.SetContext(ctx) - }, Params{ - "uploadFileId": uploadInfo.Data.UploadFileID, - "partInfo": strings.Join(silceMd5Base64s, ","), - }, &uploadUrls) + // 尝试恢复进度 + uploadProgress, ok := base.GetUploadProgress[*UploadProgress](y, y.tokenInfo.SessionKey, fileMd5Hex) + if !ok { + //step.2 预上传 + params := Params{ + "parentFolderId": dstDir.GetID(), + "fileName": url.QueryEscape(file.GetName()), + "fileSize": fmt.Sprint(file.GetSize()), + "fileMd5": fileMd5Hex, + "sliceSize": fmt.Sprint(sliceSize), + "sliceMd5": sliceMd5Hex, + } + if y.isFamily() { + params.Set("familyId", y.FamilyID) + } + var uploadInfo InitMultiUploadResp + _, err = y.request(fullUrl+"/initMultiUpload", http.MethodGet, func(req *resty.Request) { + req.SetContext(ctx) + }, params, &uploadInfo) if err != nil { return nil, err } + uploadProgress = &UploadProgress{ + UploadInfo: uploadInfo, + UploadParts: partInfos, + } + } - // step.4 上传切片 + uploadInfo := uploadProgress.UploadInfo.Data + // 网盘中不存在该文件,开始上传 + if uploadInfo.FileDataExists != 1 { threadG, upCtx := errgroup.NewGroupWithContext(ctx, y.uploadThread, retry.Attempts(3), retry.Delay(time.Second), retry.DelayType(retry.BackOffDelay)) - for k, part := range uploadUrls.UploadUrls { + for i, uploadPart := range uploadProgress.UploadParts { if utils.IsCanceled(upCtx) { break } - partNumber, err := strconv.Atoi(strings.TrimPrefix(k, "partNumber_")) - if err != nil { - return nil, err - } - - part, byteSize, offset := part, sliceSize, int64(partNumber-1)*sliceSize - if partNumber == count { - byteSize = lastPartSize - } + i, uploadPart := i, uploadPart threadG.Go(func(ctx context.Context) error { - _, err := y.put(ctx, part.RequestURL, ParseHttpHeader(part.RequestHeader), false, io.NewSectionReader(tempFile, offset, byteSize)) + // step.3 获取上传链接 + uploadUrls, err := y.GetMultiUploadUrls(ctx, uploadInfo.UploadFileID, uploadPart) if err != nil { return err } - up(int(threadG.Success()) * 100 / len(uploadUrls.UploadUrls)) + uploadUrl := uploadUrls[0] + + byteSize, offset := sliceSize, int64(uploadUrl.PartNumber-1)*sliceSize + if uploadUrl.PartNumber == count { + byteSize = lastSliceSize + } + + // step.4 上传切片 + _, err = y.put(ctx, uploadUrl.RequestURL, uploadUrl.Headers, false, io.NewSectionReader(tempFile, offset, byteSize)) + if err != nil { + return err + } + + up(int(threadG.Success()) * 100 / len(uploadUrls)) + uploadProgress.UploadParts[i] = "" return nil }) } if err = threadG.Wait(); err != nil { + if errors.Is(err, context.Canceled) { + uploadProgress.UploadParts = utils.SliceFilter(uploadProgress.UploadParts, func(s string) bool { return s != "" }) + base.SaveUploadProgress(y, uploadProgress, y.tokenInfo.SessionKey, fileMd5Hex) + } return nil, err } } - // 提交 + // step.5 提交 var resp CommitMultiUploadFileResp _, err = y.request(fullUrl+"/commitMultiUploadFile", http.MethodGet, func(req *resty.Request) { req.SetContext(ctx) }, Params{ - "uploadFileId": uploadInfo.Data.UploadFileID, + "uploadFileId": uploadInfo.UploadFileID, "isLog": "0", "opertype": "3", }, &resp) @@ -690,6 +694,51 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode return resp.toFile(), nil } +// 获取上传切片信息 +// 对http body有大小限制,分片信息太多会出错 +func (y *Cloud189PC) GetMultiUploadUrls(ctx context.Context, uploadFileId string, partInfo ...string) ([]UploadUrlInfo, error) { + fullUrl := UPLOAD_URL + if y.isFamily() { + fullUrl += "/family" + } else { + fullUrl += "/person" + } + + var uploadUrlsResp UploadUrlsResp + _, err := y.request(fullUrl+"/getMultiUploadUrls", http.MethodGet, + func(req *resty.Request) { + req.SetContext(ctx) + }, Params{ + "uploadFileId": uploadFileId, + "partInfo": strings.Join(partInfo, ","), + }, &uploadUrlsResp) + if err != nil { + return nil, err + } + uploadUrls := uploadUrlsResp.Data + + if len(uploadUrls) != len(partInfo) { + return nil, fmt.Errorf("uploadUrls get error, due to get length %d, real length %d", len(partInfo), len(uploadUrls)) + } + + uploadUrlInfos := make([]UploadUrlInfo, 0, len(uploadUrls)) + for k, uploadUrl := range uploadUrls { + partNumber, err := strconv.Atoi(strings.TrimPrefix(k, "partNumber_")) + if err != nil { + return nil, err + } + uploadUrlInfos = append(uploadUrlInfos, UploadUrlInfo{ + PartNumber: partNumber, + Headers: ParseHttpHeader(uploadUrl.RequestHeader), + UploadUrlsData: uploadUrl, + }) + } + sort.Slice(uploadUrlInfos, func(i, j int) bool { + return uploadUrlInfos[i].PartNumber < uploadUrlInfos[j].PartNumber + }) + return uploadUrlInfos, nil +} + // 旧版本上传,家庭云不支持覆盖 func (y *Cloud189PC) OldUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) { // 需要获取完整文件md5,必须支持 io.Seek diff --git a/drivers/baidu_netdisk/driver.go b/drivers/baidu_netdisk/driver.go index 2b837887..3066843e 100644 --- a/drivers/baidu_netdisk/driver.go +++ b/drivers/baidu_netdisk/driver.go @@ -4,9 +4,11 @@ import ( "context" "crypto/md5" "encoding/hex" + "errors" "fmt" "io" "math" + "net/url" "os" stdpath "path" "strconv" @@ -29,7 +31,6 @@ type BaiduNetdisk struct { uploadThread int } -const BaiduFileAPI = "https://d.pcs.baidu.com/rest/2.0/pcs/superfile2" const DefaultSliceSize int64 = 4 * 1024 * 1024 func (d *BaiduNetdisk) Config() driver.Config { @@ -45,6 +46,11 @@ func (d *BaiduNetdisk) Init(ctx context.Context) error { if d.uploadThread < 1 || d.uploadThread > 32 { d.uploadThread, d.UploadThread = 3, "3" } + + if _, err := url.Parse(d.UploadAPI); d.UploadAPI == "" || err != nil { + d.UploadAPI = "https://d.pcs.baidu.com" + } + res, err := d.get("/xpan/nas", map[string]string{ "method": "uinfo", }, nil) @@ -186,43 +192,45 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F sliceMd5 := hex.EncodeToString(sliceMd5H2.Sum(nil)) blockListStr, _ := utils.Json.MarshalToString(blockList) - // step.1 预上传 rawPath := stdpath.Join(dstDir.GetPath(), stream.GetName()) path := encodeURIComponent(rawPath) - data := fmt.Sprintf("path=%s&size=%d&isdir=0&autoinit=1&rtype=3&block_list=%s&content-md5=%s&slice-md5=%s", - path, streamSize, - blockListStr, - contentMd5, sliceMd5) - params := map[string]string{ - "method": "precreate", - } - log.Debugf("[baidu_netdisk] precreate data: %s", data) - var precreateResp PrecreateResp - _, err = d.post("/xpan/file", params, data, &precreateResp) - if err != nil { - return nil, err - } - log.Debugf("%+v", precreateResp) - if precreateResp.ReturnType == 2 { - //rapid upload, since got md5 match from baidu server + // step.1 预上传 + // 尝试获取之前的进度 + precreateResp, ok := base.GetUploadProgress[*PrecreateResp](d, d.AccessToken, contentMd5) + if !ok { + data := fmt.Sprintf("path=%s&size=%d&isdir=0&autoinit=1&rtype=3&block_list=%s&content-md5=%s&slice-md5=%s", + path, streamSize, + blockListStr, + contentMd5, sliceMd5) + params := map[string]string{ + "method": "precreate", + } + log.Debugf("[baidu_netdisk] precreate data: %s", data) + _, err = d.post("/xpan/file", params, data, &precreateResp) if err != nil { return nil, err } - return fileToObj(precreateResp.File), nil + log.Debugf("%+v", precreateResp) + if precreateResp.ReturnType == 2 { + //rapid upload, since got md5 match from baidu server + if err != nil { + return nil, err + } + return fileToObj(precreateResp.File), nil + } } - // step.2 上传分片 threadG, upCtx := errgroup.NewGroupWithContext(ctx, d.uploadThread, retry.Attempts(3), retry.Delay(time.Second), retry.DelayType(retry.BackOffDelay)) - for _, partseq := range precreateResp.BlockList { + for i, partseq := range precreateResp.BlockList { if utils.IsCanceled(upCtx) { break } - partseq, offset, byteSize := partseq, int64(partseq)*DefaultSliceSize, DefaultSliceSize + i, partseq, offset, byteSize := i, partseq, int64(partseq)*DefaultSliceSize, DefaultSliceSize if partseq+1 == count { byteSize = lastBlockSize } @@ -240,10 +248,16 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F return err } up(int(threadG.Success()) * 100 / len(precreateResp.BlockList)) + precreateResp.BlockList[i] = -1 return nil }) } if err = threadG.Wait(); err != nil { + // 如果属于用户主动取消,则保存上传进度 + if errors.Is(err, context.Canceled) { + precreateResp.BlockList = utils.SliceFilter(precreateResp.BlockList, func(s int) bool { return s >= 0 }) + base.SaveUploadProgress(d, precreateResp, d.AccessToken, contentMd5) + } return nil, err } @@ -260,7 +274,7 @@ func (d *BaiduNetdisk) uploadSlice(ctx context.Context, params map[string]string SetContext(ctx). SetQueryParams(params). SetFileReader("file", fileName, file). - Post(BaiduFileAPI) + Post(d.UploadAPI + "/rest/2.0/pcs/superfile2") if err != nil { return err } diff --git a/drivers/baidu_netdisk/meta.go b/drivers/baidu_netdisk/meta.go index d8c0d19f..b257986b 100644 --- a/drivers/baidu_netdisk/meta.go +++ b/drivers/baidu_netdisk/meta.go @@ -16,6 +16,7 @@ type Addition struct { CustomCrackUA string `json:"custom_crack_ua" required:"true" default:"netdisk"` AccessToken string UploadThread string `json:"upload_thread" default:"3" help:"1<=thread<=32"` + UploadAPI string `json:"upload_api" default:"https://d.pcs.baidu.com"` } var config = driver.Config{ diff --git a/drivers/baidu_photo/driver.go b/drivers/baidu_photo/driver.go index c92c48c5..3ff3bc6e 100644 --- a/drivers/baidu_photo/driver.go +++ b/drivers/baidu_photo/driver.go @@ -4,6 +4,7 @@ import ( "context" "crypto/md5" "encoding/hex" + "errors" "fmt" "io" "math" @@ -13,6 +14,7 @@ import ( "strings" "time" + "github.com/alist-org/alist/v3/drivers/base" "github.com/alist-org/alist/v3/internal/driver" "github.com/alist-org/alist/v3/internal/errs" "github.com/alist-org/alist/v3/internal/model" @@ -285,13 +287,16 @@ func (d *BaiduPhoto) Put(ctx context.Context, dstDir model.Obj, stream model.Fil "block_list": blockListStr, } - var precreateResp PrecreateResp - _, err = d.Post(FILE_API_URL_V1+"/precreate", func(r *resty.Request) { - r.SetContext(ctx) - r.SetFormData(params) - }, &precreateResp) - if err != nil { - return nil, err + // 尝试获取之前的进度 + precreateResp, ok := base.GetUploadProgress[*PrecreateResp](d, d.AccessToken, contentMd5) + if !ok { + _, err = d.Post(FILE_API_URL_V1+"/precreate", func(r *resty.Request) { + r.SetContext(ctx) + r.SetFormData(params) + }, &precreateResp) + if err != nil { + return nil, err + } } switch precreateResp.ReturnType { @@ -300,12 +305,12 @@ func (d *BaiduPhoto) Put(ctx context.Context, dstDir model.Obj, stream model.Fil retry.Attempts(3), retry.Delay(time.Second), retry.DelayType(retry.BackOffDelay)) - for _, partseq := range precreateResp.BlockList { + for i, partseq := range precreateResp.BlockList { if utils.IsCanceled(upCtx) { break } - partseq, offset, byteSize := partseq, int64(partseq)*DEFAULT, DEFAULT + i, partseq, offset, byteSize := i, partseq, int64(partseq)*DEFAULT, DEFAULT if partseq+1 == count { byteSize = lastBlockSize } @@ -327,10 +332,15 @@ func (d *BaiduPhoto) Put(ctx context.Context, dstDir model.Obj, stream model.Fil return err } up(int(threadG.Success()) * 100 / len(precreateResp.BlockList)) + precreateResp.BlockList[i] = -1 return nil }) } if err = threadG.Wait(); err != nil { + if errors.Is(err, context.Canceled) { + precreateResp.BlockList = utils.SliceFilter(precreateResp.BlockList, func(s int) bool { return s >= 0 }) + base.SaveUploadProgress(d, precreateResp, d.AccessToken, contentMd5) + } return nil, err } fallthrough diff --git a/drivers/base/upload.go b/drivers/base/upload.go new file mode 100644 index 00000000..881a256e --- /dev/null +++ b/drivers/base/upload.go @@ -0,0 +1,31 @@ +package base + +import ( + "fmt" + "strings" + "time" + + "github.com/Xhofe/go-cache" + "github.com/alist-org/alist/v3/internal/driver" +) + +// storage upload progress, for upload recovery +var UploadStateCache = cache.NewMemCache(cache.WithShards[any](32)) + +// Save upload progress for 20 minutes +func SaveUploadProgress(driver driver.Driver, state any, keys ...string) bool { + return UploadStateCache.Set( + fmt.Sprint(driver.Config().Name, "-upload-", strings.Join(keys, "-")), + state, + cache.WithEx[any](time.Minute*20)) +} + +// An upload progress can only be made by one process alone, +// so here you need to get it and then delete it. +func GetUploadProgress[T any](driver driver.Driver, keys ...string) (state T, ok bool) { + v, ok := UploadStateCache.GetDel(fmt.Sprint(driver.Config().Name, "-upload-", strings.Join(keys, "-"))) + if ok { + state, ok = v.(T) + } + return +} diff --git a/drivers/mopan/driver.go b/drivers/mopan/driver.go index 18b34516..55879005 100644 --- a/drivers/mopan/driver.go +++ b/drivers/mopan/driver.go @@ -229,41 +229,52 @@ func (d *MoPan) Put(ctx context.Context, dstDir model.Obj, stream model.FileStre }() // step.1 - initUpdload, err := d.client.InitMultiUpload(ctx, mopan.UpdloadFileParam{ + uploadPartData, err := mopan.InitUploadPartData(ctx, mopan.UpdloadFileParam{ ParentFolderId: dstDir.GetID(), FileName: stream.GetName(), FileSize: stream.GetSize(), File: file, - }, mopan.WarpParamOption( - mopan.ParamOptionShareFile(d.CloudID), - )) + }) if err != nil { return nil, err } - if !initUpdload.FileDataExists { + // 尝试恢复进度 + initUpdload, ok := base.GetUploadProgress[*mopan.InitMultiUploadData](d, d.client.Authorization, uploadPartData.FileMd5) + if !ok { // step.2 - parts, err := d.client.GetAllMultiUploadUrls(initUpdload.UploadFileID, initUpdload.PartInfo) + initUpdload, err = d.client.InitMultiUpload(ctx, *uploadPartData, mopan.WarpParamOption( + mopan.ParamOptionShareFile(d.CloudID), + )) if err != nil { return nil, err } - d.client.CloudDiskStartBusiness() + } + + if !initUpdload.FileDataExists { + fmt.Println(d.client.CloudDiskStartBusiness()) - // step.3 threadG, upCtx := errgroup.NewGroupWithContext(ctx, d.uploadThread, retry.Attempts(3), retry.Delay(time.Second), retry.DelayType(retry.BackOffDelay)) - for _, part := range parts { + + // step.3 + parts, err := d.client.GetAllMultiUploadUrls(initUpdload.UploadFileID, initUpdload.PartInfos) + if err != nil { + return nil, err + } + + for i, part := range parts { if utils.IsCanceled(upCtx) { break } - - part, byteSize := part, initUpdload.PartSize - if part.PartNumber == len(initUpdload.PartInfo) { + i, part, byteSize := i, part, initUpdload.PartSize + if part.PartNumber == uploadPartData.PartTotal { byteSize = initUpdload.LastPartSize } + // step.4 threadG.Go(func(ctx context.Context) error { req, err := part.NewRequest(ctx, io.NewSectionReader(file, int64(part.PartNumber-1)*initUpdload.PartSize, byteSize)) if err != nil { @@ -278,14 +289,19 @@ func (d *MoPan) Put(ctx context.Context, dstDir model.Obj, stream model.FileStre return fmt.Errorf("upload err,code=%d", resp.StatusCode) } up(100 * int(threadG.Success()) / len(parts)) + initUpdload.PartInfos[i] = "" return nil }) } if err = threadG.Wait(); err != nil { + if errors.Is(err, context.Canceled) { + initUpdload.PartInfos = utils.SliceFilter(initUpdload.PartInfos, func(s string) bool { return s != "" }) + base.SaveUploadProgress(d, initUpdload, d.client.Authorization, uploadPartData.FileMd5) + } return nil, err } } - //step.4 + //step.5 uFile, err := d.client.CommitMultiUploadFile(initUpdload.UploadFileID, nil) if err != nil { return nil, err diff --git a/go.mod b/go.mod index aaea9010..e5a94df0 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/deckarep/golang-set/v2 v2.3.0 github.com/disintegration/imaging v1.6.2 github.com/dustinxie/ecc v0.0.0-20210511000915-959544187564 - github.com/foxxorcat/mopan-sdk-go v0.1.2 + github.com/foxxorcat/mopan-sdk-go v0.1.3 github.com/foxxorcat/weiyun-sdk-go v0.1.2 github.com/gin-contrib/cors v1.4.0 github.com/gin-gonic/gin v1.9.1 diff --git a/go.sum b/go.sum index ab65353b..3e18409e 100644 --- a/go.sum +++ b/go.sum @@ -114,8 +114,8 @@ github.com/disintegration/imaging v1.6.2 h1:w1LecBlG2Lnp8B3jk5zSuNqd7b4DXhcjwek1 github.com/disintegration/imaging v1.6.2/go.mod h1:44/5580QXChDfwIclfc/PCwrr44amcmDAg8hxG0Ewe4= github.com/dustinxie/ecc v0.0.0-20210511000915-959544187564 h1:I6KUy4CI6hHjqnyJLNCEi7YHVMkwwtfSr2k9splgdSM= github.com/dustinxie/ecc v0.0.0-20210511000915-959544187564/go.mod h1:yekO+3ZShy19S+bsmnERmznGy9Rfg6dWWWpiGJjNAz8= -github.com/foxxorcat/mopan-sdk-go v0.1.2 h1:1QM/JUWQpjbu53pVvbMfp3PLz09duGY5Mid3Qlc9WOk= -github.com/foxxorcat/mopan-sdk-go v0.1.2/go.mod h1:iWHA2JFhzmKR28ySp1ON0g6DjLaYtvb5jhTqPVTDW9A= +github.com/foxxorcat/mopan-sdk-go v0.1.3 h1:6ww0ulyLDh6neXZBqUM2PDbxQ6lfdkQbr0FCh9BTY0Y= +github.com/foxxorcat/mopan-sdk-go v0.1.3/go.mod h1:iWHA2JFhzmKR28ySp1ON0g6DjLaYtvb5jhTqPVTDW9A= github.com/foxxorcat/weiyun-sdk-go v0.1.2 h1:waRWIBmjL9GCcndJ8HvOYrrVB4hhoPYzRrn3I/Cnzqw= github.com/foxxorcat/weiyun-sdk-go v0.1.2/go.mod h1:AKsLFuWhWlClpGrg1zxTdMejugZEZtmhIuElAk3W83s= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= diff --git a/pkg/errgroup/errgroup.go b/pkg/errgroup/errgroup.go index 5fc63d66..858df044 100644 --- a/pkg/errgroup/errgroup.go +++ b/pkg/errgroup/errgroup.go @@ -23,7 +23,7 @@ type Group struct { func NewGroupWithContext(ctx context.Context, limit int, retryOpts ...retry.Option) (*Group, context.Context) { ctx, cancel := context.WithCancelCause(ctx) - return (&Group{cancel: cancel, ctx: ctx, opts: retryOpts}).SetLimit(limit), ctx + return (&Group{cancel: cancel, ctx: ctx, opts: append(retryOpts, retry.Context(ctx))}).SetLimit(limit), ctx } func (g *Group) done() { diff --git a/pkg/utils/file.go b/pkg/utils/file.go index 1eb00cc7..6dd78164 100644 --- a/pkg/utils/file.go +++ b/pkg/utils/file.go @@ -2,7 +2,6 @@ package utils import ( "fmt" - "github.com/alist-org/alist/v3/internal/errs" "io" "mime" "os" @@ -10,6 +9,8 @@ import ( "path/filepath" "strings" + "github.com/alist-org/alist/v3/internal/errs" + "github.com/alist-org/alist/v3/internal/conf" log "github.com/sirupsen/logrus" ) @@ -127,7 +128,7 @@ func CreateTempFile(r io.ReadCloser, size int64) (*os.File, error) { } if size != 0 && readBytes != size { _ = os.Remove(f.Name()) - return nil, errs.NewErr(err, "CreateTempFile failed, incoming stream actual size= %s, expect = %s ", readBytes, size) + return nil, errs.NewErr(err, "CreateTempFile failed, incoming stream actual size= %d, expect = %d ", readBytes, size) } _, err = f.Seek(0, io.SeekStart) if err != nil { diff --git a/pkg/utils/io.go b/pkg/utils/io.go index e0fea4a6..936461a7 100644 --- a/pkg/utils/io.go +++ b/pkg/utils/io.go @@ -52,14 +52,15 @@ type limitWriter struct { } func (l *limitWriter) Write(p []byte) (n int, err error) { + lp := len(p) if l.limit > 0 { - if int64(len(p)) > l.limit { + if int64(lp) > l.limit { p = p[:l.limit] } l.limit -= int64(len(p)) _, err = l.w.Write(p) } - return len(p), err + return lp, err } func LimitWriter(w io.Writer, limit int64) io.Writer { diff --git a/pkg/utils/slice.go b/pkg/utils/slice.go index d9adaeed..73bac93b 100644 --- a/pkg/utils/slice.go +++ b/pkg/utils/slice.go @@ -69,3 +69,13 @@ func SliceMeet[T1, T2 any](arr []T1, v T2, meet func(item T1, v T2) bool) bool { } return false } + +func SliceFilter[T any](arr []T, filter func(src T) bool) []T { + res := make([]T, 0, len(arr)) + for _, src := range arr { + if filter(src) { + res = append(res, src) + } + } + return res +}