From 31315c86ee34d1bdc7e1990c5ddebb37181988bc Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Sat, 26 Mar 2022 15:33:31 +0800 Subject: [PATCH] Feat: support option for cache streamed chunk data into temp file for potential retry. --- models/defaults.go | 1 + pkg/filesystem/chunk/chunk.go | 72 +++++++++++++++++++++----- pkg/filesystem/driver/onedrive/api.go | 10 +++- pkg/filesystem/driver/oss/handler.go | 4 +- pkg/filesystem/driver/remote/client.go | 2 +- pkg/filesystem/driver/s3/handler.go | 2 +- 6 files changed, 73 insertions(+), 18 deletions(-) diff --git a/models/defaults.go b/models/defaults.go index bd7d4a4..ecb8428 100644 --- a/models/defaults.go +++ b/models/defaults.go @@ -45,6 +45,7 @@ var defaultSettings = []Setting{ {Name: "chunk_retries", Value: `5`, Type: "retry"}, {Name: "onedrive_source_timeout", Value: `1800`, Type: "timeout"}, {Name: "reset_after_upload_failed", Value: `0`, Type: "upload"}, + {Name: "use_temp_chunk_buffer", Value: `1`, Type: "upload"}, {Name: "login_captcha", Value: `0`, Type: "login"}, {Name: "reg_captcha", Value: `0`, Type: "login"}, {Name: "email_active", Value: `0`, Type: "register"}, diff --git a/pkg/filesystem/chunk/chunk.go b/pkg/filesystem/chunk/chunk.go index 82e6cac..5313558 100644 --- a/pkg/filesystem/chunk/chunk.go +++ b/pkg/filesystem/chunk/chunk.go @@ -7,29 +7,35 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/util" "io" + "os" ) +const bufferTempPattern = "cdChunk.*.tmp" + // ChunkProcessFunc callback function for processing a chunk type ChunkProcessFunc func(c *ChunkGroup, chunk io.Reader) error // ChunkGroup manage groups of chunks type ChunkGroup struct { - file fsctx.FileHeader - chunkSize uint64 - backoff backoff.Backoff + file fsctx.FileHeader + chunkSize uint64 + backoff backoff.Backoff + enableRetryBuffer bool fileInfo *fsctx.UploadTaskInfo currentIndex int chunkNum uint64 + bufferTemp *os.File } -func NewChunkGroup(file fsctx.FileHeader, chunkSize uint64, backoff backoff.Backoff) *ChunkGroup { +func NewChunkGroup(file fsctx.FileHeader, chunkSize uint64, backoff backoff.Backoff, useBuffer bool) *ChunkGroup { c := &ChunkGroup{ - file: file, - chunkSize: chunkSize, - backoff: backoff, - fileInfo: file.Info(), - currentIndex: -1, + file: file, + chunkSize: chunkSize, + backoff: backoff, + fileInfo: file.Info(), + currentIndex: -1, + enableRetryBuffer: useBuffer, } if c.chunkSize == 0 { @@ -44,13 +50,53 @@ func NewChunkGroup(file fsctx.FileHeader, chunkSize uint64, backoff backoff.Back return c } +// TempAvailable returns if current chunk temp file is available to be read +func (c *ChunkGroup) TempAvailable() bool { + if c.bufferTemp != nil { + state, _ := c.bufferTemp.Stat() + return state != nil && state.Size() == c.Length() + } + + return false +} + // Process a chunk with retry logic func (c *ChunkGroup) Process(processor ChunkProcessFunc) error { - err := processor(c, io.LimitReader(c.file, int64(c.chunkSize))) + reader := io.LimitReader(c.file, int64(c.chunkSize)) + + // If useBuffer is enabled, tee the reader to a temp file + if c.enableRetryBuffer && c.bufferTemp == nil && !c.file.Seekable() { + c.bufferTemp, _ = os.CreateTemp("", bufferTempPattern) + reader = io.TeeReader(reader, c.bufferTemp) + } + + if c.bufferTemp != nil { + defer func() { + if c.bufferTemp != nil { + c.bufferTemp.Close() + os.Remove(c.bufferTemp.Name()) + c.bufferTemp = nil + } + }() + + // if temp buffer file is available, use it + if c.TempAvailable() { + if _, err := c.bufferTemp.Seek(0, io.SeekStart); err != nil { + return fmt.Errorf("failed to seek temp file back to chunk start: %w", err) + } + + util.Log().Debug("Chunk %d will be read from temp file %q.", c.Index(), c.bufferTemp.Name()) + reader = c.bufferTemp + } + } + + err := processor(c, reader) if err != nil { - if err != context.Canceled && c.file.Seekable() && c.backoff.Next() { - if _, seekErr := c.file.Seek(c.Start(), io.SeekStart); seekErr != nil { - return fmt.Errorf("failed to seek back to chunk start: %w, last error: %w", seekErr, err) + if err != context.Canceled && (c.file.Seekable() || c.TempAvailable()) && c.backoff.Next() { + if c.file.Seekable() { + if _, seekErr := c.file.Seek(c.Start(), io.SeekStart); seekErr != nil { + return fmt.Errorf("failed to seek back to chunk start: %w, last error: %w", seekErr, err) + } } util.Log().Debug("Retrying chunk %d, last error: %s", c.currentIndex, err) diff --git a/pkg/filesystem/driver/onedrive/api.go b/pkg/filesystem/driver/onedrive/api.go index 14c1278..2723626 100644 --- a/pkg/filesystem/driver/onedrive/api.go +++ b/pkg/filesystem/driver/onedrive/api.go @@ -221,8 +221,16 @@ func (client *Client) GetUploadSessionStatus(ctx context.Context, uploadURL stri return &uploadSession, nil } +var index = 0 + // UploadChunk 上传分片 func (client *Client) UploadChunk(ctx context.Context, uploadURL string, content io.Reader, current *chunk.ChunkGroup) (*UploadSessionResponse, error) { + index++ + if index == 1 || index == 2 { + request.BlackHole(content) + return nil, errors.New("error") + } + res, err := client.request( ctx, "PUT", uploadURL, content, request.WithContentLength(current.Length()), @@ -281,7 +289,7 @@ func (client *Client) Upload(ctx context.Context, file fsctx.FileHeader) error { chunks := chunk.NewChunkGroup(file, client.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{ Max: model.GetIntSetting("chunk_retries", 5), Sleep: chunkRetrySleep, - }) + }, model.IsTrueVal(model.GetSettingByName("use_temp_chunk_buffer"))) uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error { _, err := client.UploadChunk(ctx, uploadURL, content, current) diff --git a/pkg/filesystem/driver/oss/handler.go b/pkg/filesystem/driver/oss/handler.go index 2e15674..c7eadc2 100644 --- a/pkg/filesystem/driver/oss/handler.go +++ b/pkg/filesystem/driver/oss/handler.go @@ -252,7 +252,7 @@ func (handler *Driver) Put(ctx context.Context, file fsctx.FileHeader) error { chunks := chunk.NewChunkGroup(file, handler.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{ Max: model.GetIntSetting("chunk_retries", 5), Sleep: chunkRetrySleep, - }) + }, model.IsTrueVal(model.GetSettingByName("use_temp_chunk_buffer"))) uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error { _, err := handler.bucket.UploadPart(imur, content, current.Length(), current.Index()+1) @@ -435,7 +435,7 @@ func (handler *Driver) Token(ctx context.Context, ttl int64, uploadSession *seri uploadSession.UploadID = imur.UploadID // 为每个分片签名上传 URL - chunks := chunk.NewChunkGroup(file, handler.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{}) + chunks := chunk.NewChunkGroup(file, handler.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{}, false) urls := make([]string, chunks.Num()) for chunks.Next() { err := chunks.Process(func(c *chunk.ChunkGroup, chunk io.Reader) error { diff --git a/pkg/filesystem/driver/remote/client.go b/pkg/filesystem/driver/remote/client.go index b6759f1..2f267eb 100644 --- a/pkg/filesystem/driver/remote/client.go +++ b/pkg/filesystem/driver/remote/client.go @@ -92,7 +92,7 @@ func (c *remoteClient) Upload(ctx context.Context, file fsctx.FileHeader) error chunks := chunk.NewChunkGroup(file, c.policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{ Max: model.GetIntSetting("chunk_retries", 5), Sleep: chunkRetrySleep, - }) + }, model.IsTrueVal(model.GetSettingByName("use_temp_chunk_buffer"))) uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error { return c.uploadChunk(ctx, session.Key, current.Index(), content, overwrite, current.Length()) diff --git a/pkg/filesystem/driver/s3/handler.go b/pkg/filesystem/driver/s3/handler.go index a84178f..ba9ce60 100644 --- a/pkg/filesystem/driver/s3/handler.go +++ b/pkg/filesystem/driver/s3/handler.go @@ -342,7 +342,7 @@ func (handler *Driver) Token(ctx context.Context, ttl int64, uploadSession *seri uploadSession.UploadID = *res.UploadId // 为每个分片签名上传 URL - chunks := chunk.NewChunkGroup(file, handler.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{}) + chunks := chunk.NewChunkGroup(file, handler.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{}, false) urls := make([]string, chunks.Num()) for chunks.Next() { err := chunks.Process(func(c *chunk.ChunkGroup, chunk io.Reader) error {