feat(crypt): force stream upload for supported drivers (#6270)

pull/6283/head
NewbieOrange 2024-03-29 14:42:01 +08:00 committed by GitHub
parent d517adde71
commit e37465e67e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 32 additions and 17 deletions

View File

@ -312,13 +312,17 @@ func (y *Cloud189PC) Remove(ctx context.Context, obj model.Obj) error {
func (y *Cloud189PC) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) { func (y *Cloud189PC) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) {
// 响应时间长,按需启用 // 响应时间长,按需启用
if y.Addition.RapidUpload { if y.Addition.RapidUpload && !stream.IsForceStreamUpload() {
if newObj, err := y.RapidUpload(ctx, dstDir, stream); err == nil { if newObj, err := y.RapidUpload(ctx, dstDir, stream); err == nil {
return newObj, nil return newObj, nil
} }
} }
switch y.UploadMethod { uploadMethod := y.UploadMethod
if stream.IsForceStreamUpload() {
uploadMethod = "stream"
}
switch uploadMethod {
case "old": case "old":
return y.OldUpload(ctx, dstDir, stream, up) return y.OldUpload(ctx, dstDir, stream, up)
case "rapid": case "rapid":

View File

@ -164,7 +164,7 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m
count := int(math.Ceil(float64(stream.GetSize()) / float64(partSize))) count := int(math.Ceil(float64(stream.GetSize()) / float64(partSize)))
createData["part_info_list"] = makePartInfos(count) createData["part_info_list"] = makePartInfos(count)
// rapid upload // rapid upload
rapidUpload := stream.GetSize() > 100*utils.KB && d.RapidUpload rapidUpload := !stream.IsForceStreamUpload() && stream.GetSize() > 100*utils.KB && d.RapidUpload
if rapidUpload { if rapidUpload {
log.Debugf("[aliyundrive_open] start cal pre_hash") log.Debugf("[aliyundrive_open] start cal pre_hash")
// read 1024 bytes to calculate pre hash // read 1024 bytes to calculate pre hash
@ -242,13 +242,16 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m
if remain := stream.GetSize() - offset; length > remain { if remain := stream.GetSize() - offset; length > remain {
length = remain length = remain
} }
//rd := utils.NewMultiReadable(io.LimitReader(stream, partSize)) rd := utils.NewMultiReadable(io.LimitReader(stream, partSize))
rd, err := stream.RangeRead(http_range.Range{Start: offset, Length: length}) if rapidUpload {
srd, err := stream.RangeRead(http_range.Range{Start: offset, Length: length})
if err != nil { if err != nil {
return nil, err return nil, err
} }
rd = utils.NewMultiReadable(srd)
}
err = retry.Do(func() error { err = retry.Do(func() error {
//rd.Reset() rd.Reset()
return d.uploadPart(ctx, rd, createResp.PartInfoList[i]) return d.uploadPart(ctx, rd, createResp.PartInfoList[i])
}, },
retry.Attempts(3), retry.Attempts(3),

View File

@ -3,7 +3,6 @@ package crypt
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/alist-org/alist/v3/internal/stream"
"io" "io"
stdpath "path" stdpath "path"
"regexp" "regexp"
@ -14,6 +13,7 @@ import (
"github.com/alist-org/alist/v3/internal/fs" "github.com/alist-org/alist/v3/internal/fs"
"github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op" "github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/internal/stream"
"github.com/alist-org/alist/v3/pkg/http_range" "github.com/alist-org/alist/v3/pkg/http_range"
"github.com/alist-org/alist/v3/pkg/utils" "github.com/alist-org/alist/v3/pkg/utils"
"github.com/alist-org/alist/v3/server/common" "github.com/alist-org/alist/v3/server/common"
@ -392,6 +392,7 @@ func (d *Crypt) Put(ctx context.Context, dstDir model.Obj, streamer model.FileSt
Reader: wrappedIn, Reader: wrappedIn,
Mimetype: "application/octet-stream", Mimetype: "application/octet-stream",
WebPutAsTask: streamer.NeedStore(), WebPutAsTask: streamer.NeedStore(),
ForceStreamUpload: true,
Exist: streamer.GetExist(), Exist: streamer.GetExist(),
} }
err = op.Put(ctx, d.remoteStorage, dstDirActualPath, streamOut, up, false) err = op.Put(ctx, d.remoteStorage, dstDirActualPath, streamOut, up, false)

View File

@ -41,6 +41,7 @@ type FileStreamer interface {
GetMimetype() string GetMimetype() string
//SetReader(io.Reader) //SetReader(io.Reader)
NeedStore() bool NeedStore() bool
IsForceStreamUpload() bool
GetExist() Obj GetExist() Obj
SetExist(Obj) SetExist(Obj)
//for a non-seekable Stream, RangeRead supports peeking some data, and CacheFullInTempFile still works //for a non-seekable Stream, RangeRead supports peeking some data, and CacheFullInTempFile still works

View File

@ -20,6 +20,7 @@ type FileStream struct {
io.Reader io.Reader
Mimetype string Mimetype string
WebPutAsTask bool WebPutAsTask bool
ForceStreamUpload bool
Exist model.Obj //the file existed in the destination, we can reuse some info since we wil overwrite it Exist model.Obj //the file existed in the destination, we can reuse some info since we wil overwrite it
utils.Closers utils.Closers
tmpFile *os.File //if present, tmpFile has full content, it will be deleted at last tmpFile *os.File //if present, tmpFile has full content, it will be deleted at last
@ -43,6 +44,11 @@ func (f *FileStream) GetMimetype() string {
func (f *FileStream) NeedStore() bool { func (f *FileStream) NeedStore() bool {
return f.WebPutAsTask return f.WebPutAsTask
} }
func (f *FileStream) IsForceStreamUpload() bool {
return f.ForceStreamUpload
}
func (f *FileStream) Close() error { func (f *FileStream) Close() error {
var err1, err2 error var err1, err2 error
err1 = f.Closers.Close() err1 = f.Closers.Close()