Feat: support option for cache streamed chunk data into temp file for potential retry.

pull/1107/head^2
HFO4 2022-03-26 15:33:31 +08:00
parent 636ac52a3f
commit 31315c86ee
6 changed files with 73 additions and 18 deletions

View File

@ -45,6 +45,7 @@ var defaultSettings = []Setting{
{Name: "chunk_retries", Value: `5`, Type: "retry"}, {Name: "chunk_retries", Value: `5`, Type: "retry"},
{Name: "onedrive_source_timeout", Value: `1800`, Type: "timeout"}, {Name: "onedrive_source_timeout", Value: `1800`, Type: "timeout"},
{Name: "reset_after_upload_failed", Value: `0`, Type: "upload"}, {Name: "reset_after_upload_failed", Value: `0`, Type: "upload"},
{Name: "use_temp_chunk_buffer", Value: `1`, Type: "upload"},
{Name: "login_captcha", Value: `0`, Type: "login"}, {Name: "login_captcha", Value: `0`, Type: "login"},
{Name: "reg_captcha", Value: `0`, Type: "login"}, {Name: "reg_captcha", Value: `0`, Type: "login"},
{Name: "email_active", Value: `0`, Type: "register"}, {Name: "email_active", Value: `0`, Type: "register"},

View File

@ -7,8 +7,11 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/util" "github.com/cloudreve/Cloudreve/v3/pkg/util"
"io" "io"
"os"
) )
const bufferTempPattern = "cdChunk.*.tmp"
// ChunkProcessFunc callback function for processing a chunk // ChunkProcessFunc callback function for processing a chunk
type ChunkProcessFunc func(c *ChunkGroup, chunk io.Reader) error type ChunkProcessFunc func(c *ChunkGroup, chunk io.Reader) error
@ -17,19 +20,22 @@ type ChunkGroup struct {
file fsctx.FileHeader file fsctx.FileHeader
chunkSize uint64 chunkSize uint64
backoff backoff.Backoff backoff backoff.Backoff
enableRetryBuffer bool
fileInfo *fsctx.UploadTaskInfo fileInfo *fsctx.UploadTaskInfo
currentIndex int currentIndex int
chunkNum uint64 chunkNum uint64
bufferTemp *os.File
} }
func NewChunkGroup(file fsctx.FileHeader, chunkSize uint64, backoff backoff.Backoff) *ChunkGroup { func NewChunkGroup(file fsctx.FileHeader, chunkSize uint64, backoff backoff.Backoff, useBuffer bool) *ChunkGroup {
c := &ChunkGroup{ c := &ChunkGroup{
file: file, file: file,
chunkSize: chunkSize, chunkSize: chunkSize,
backoff: backoff, backoff: backoff,
fileInfo: file.Info(), fileInfo: file.Info(),
currentIndex: -1, currentIndex: -1,
enableRetryBuffer: useBuffer,
} }
if c.chunkSize == 0 { if c.chunkSize == 0 {
@ -44,14 +50,54 @@ func NewChunkGroup(file fsctx.FileHeader, chunkSize uint64, backoff backoff.Back
return c return c
} }
// TempAvailable returns if current chunk temp file is available to be read
func (c *ChunkGroup) TempAvailable() bool {
if c.bufferTemp != nil {
state, _ := c.bufferTemp.Stat()
return state != nil && state.Size() == c.Length()
}
return false
}
// Process a chunk with retry logic // Process a chunk with retry logic
func (c *ChunkGroup) Process(processor ChunkProcessFunc) error { func (c *ChunkGroup) Process(processor ChunkProcessFunc) error {
err := processor(c, io.LimitReader(c.file, int64(c.chunkSize))) reader := io.LimitReader(c.file, int64(c.chunkSize))
// If useBuffer is enabled, tee the reader to a temp file
if c.enableRetryBuffer && c.bufferTemp == nil && !c.file.Seekable() {
c.bufferTemp, _ = os.CreateTemp("", bufferTempPattern)
reader = io.TeeReader(reader, c.bufferTemp)
}
if c.bufferTemp != nil {
defer func() {
if c.bufferTemp != nil {
c.bufferTemp.Close()
os.Remove(c.bufferTemp.Name())
c.bufferTemp = nil
}
}()
// if temp buffer file is available, use it
if c.TempAvailable() {
if _, err := c.bufferTemp.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("failed to seek temp file back to chunk start: %w", err)
}
util.Log().Debug("Chunk %d will be read from temp file %q.", c.Index(), c.bufferTemp.Name())
reader = c.bufferTemp
}
}
err := processor(c, reader)
if err != nil { if err != nil {
if err != context.Canceled && c.file.Seekable() && c.backoff.Next() { if err != context.Canceled && (c.file.Seekable() || c.TempAvailable()) && c.backoff.Next() {
if c.file.Seekable() {
if _, seekErr := c.file.Seek(c.Start(), io.SeekStart); seekErr != nil { if _, seekErr := c.file.Seek(c.Start(), io.SeekStart); seekErr != nil {
return fmt.Errorf("failed to seek back to chunk start: %w, last error: %w", seekErr, err) return fmt.Errorf("failed to seek back to chunk start: %w, last error: %w", seekErr, err)
} }
}
util.Log().Debug("Retrying chunk %d, last error: %s", c.currentIndex, err) util.Log().Debug("Retrying chunk %d, last error: %s", c.currentIndex, err)
return c.Process(processor) return c.Process(processor)

View File

@ -221,8 +221,16 @@ func (client *Client) GetUploadSessionStatus(ctx context.Context, uploadURL stri
return &uploadSession, nil return &uploadSession, nil
} }
var index = 0
// UploadChunk 上传分片 // UploadChunk 上传分片
func (client *Client) UploadChunk(ctx context.Context, uploadURL string, content io.Reader, current *chunk.ChunkGroup) (*UploadSessionResponse, error) { func (client *Client) UploadChunk(ctx context.Context, uploadURL string, content io.Reader, current *chunk.ChunkGroup) (*UploadSessionResponse, error) {
index++
if index == 1 || index == 2 {
request.BlackHole(content)
return nil, errors.New("error")
}
res, err := client.request( res, err := client.request(
ctx, "PUT", uploadURL, content, ctx, "PUT", uploadURL, content,
request.WithContentLength(current.Length()), request.WithContentLength(current.Length()),
@ -281,7 +289,7 @@ func (client *Client) Upload(ctx context.Context, file fsctx.FileHeader) error {
chunks := chunk.NewChunkGroup(file, client.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{ chunks := chunk.NewChunkGroup(file, client.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{
Max: model.GetIntSetting("chunk_retries", 5), Max: model.GetIntSetting("chunk_retries", 5),
Sleep: chunkRetrySleep, Sleep: chunkRetrySleep,
}) }, model.IsTrueVal(model.GetSettingByName("use_temp_chunk_buffer")))
uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error { uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error {
_, err := client.UploadChunk(ctx, uploadURL, content, current) _, err := client.UploadChunk(ctx, uploadURL, content, current)

View File

@ -252,7 +252,7 @@ func (handler *Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
chunks := chunk.NewChunkGroup(file, handler.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{ chunks := chunk.NewChunkGroup(file, handler.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{
Max: model.GetIntSetting("chunk_retries", 5), Max: model.GetIntSetting("chunk_retries", 5),
Sleep: chunkRetrySleep, Sleep: chunkRetrySleep,
}) }, model.IsTrueVal(model.GetSettingByName("use_temp_chunk_buffer")))
uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error { uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error {
_, err := handler.bucket.UploadPart(imur, content, current.Length(), current.Index()+1) _, err := handler.bucket.UploadPart(imur, content, current.Length(), current.Index()+1)
@ -435,7 +435,7 @@ func (handler *Driver) Token(ctx context.Context, ttl int64, uploadSession *seri
uploadSession.UploadID = imur.UploadID uploadSession.UploadID = imur.UploadID
// 为每个分片签名上传 URL // 为每个分片签名上传 URL
chunks := chunk.NewChunkGroup(file, handler.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{}) chunks := chunk.NewChunkGroup(file, handler.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{}, false)
urls := make([]string, chunks.Num()) urls := make([]string, chunks.Num())
for chunks.Next() { for chunks.Next() {
err := chunks.Process(func(c *chunk.ChunkGroup, chunk io.Reader) error { err := chunks.Process(func(c *chunk.ChunkGroup, chunk io.Reader) error {

View File

@ -92,7 +92,7 @@ func (c *remoteClient) Upload(ctx context.Context, file fsctx.FileHeader) error
chunks := chunk.NewChunkGroup(file, c.policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{ chunks := chunk.NewChunkGroup(file, c.policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{
Max: model.GetIntSetting("chunk_retries", 5), Max: model.GetIntSetting("chunk_retries", 5),
Sleep: chunkRetrySleep, Sleep: chunkRetrySleep,
}) }, model.IsTrueVal(model.GetSettingByName("use_temp_chunk_buffer")))
uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error { uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error {
return c.uploadChunk(ctx, session.Key, current.Index(), content, overwrite, current.Length()) return c.uploadChunk(ctx, session.Key, current.Index(), content, overwrite, current.Length())

View File

@ -342,7 +342,7 @@ func (handler *Driver) Token(ctx context.Context, ttl int64, uploadSession *seri
uploadSession.UploadID = *res.UploadId uploadSession.UploadID = *res.UploadId
// 为每个分片签名上传 URL // 为每个分片签名上传 URL
chunks := chunk.NewChunkGroup(file, handler.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{}) chunks := chunk.NewChunkGroup(file, handler.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{}, false)
urls := make([]string, chunks.Num()) urls := make([]string, chunks.Num())
for chunks.Next() { for chunks.Next() {
err := chunks.Process(func(c *chunk.ChunkGroup, chunk io.Reader) error { err := chunks.Process(func(c *chunk.ChunkGroup, chunk io.Reader) error {