mirror of https://github.com/cloudreve/Cloudreve
Feat: use new ChunkManager for OneDrive API client
parent
5802161102
commit
015ccd5026
|
@ -60,6 +60,7 @@ func (c *ChunkGroup) Process(processor ChunkProcessFunc) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
util.Log().Debug("Chunk %d processed", c.currentIndex)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,6 +69,16 @@ func (c *ChunkGroup) Start() int64 {
|
||||||
return int64(uint64(c.Index()) * c.chunkSize)
|
return int64(uint64(c.Index()) * c.chunkSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Total returns the total length current chunk
|
||||||
|
func (c *ChunkGroup) Total() int64 {
|
||||||
|
return int64(c.fileInfo.Size)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RangeHeader returns header value of Content-Range
|
||||||
|
func (c *ChunkGroup) RangeHeader() string {
|
||||||
|
return fmt.Sprintf("bytes %d-%d/%d", c.Start(), c.Start()+c.Length()-1, c.Total())
|
||||||
|
}
|
||||||
|
|
||||||
// Index returns current chunk index, starts from 0
|
// Index returns current chunk index, starts from 0
|
||||||
func (c *ChunkGroup) Index() int {
|
func (c *ChunkGroup) Index() int {
|
||||||
return c.currentIndex
|
return c.currentIndex
|
||||||
|
@ -89,3 +100,8 @@ func (c *ChunkGroup) Length() int64 {
|
||||||
|
|
||||||
return int64(contentLength)
|
return int64(contentLength)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsLast returns if current chunk is the last one
|
||||||
|
func (c *ChunkGroup) IsLast() bool {
|
||||||
|
return c.Index() == int(c.chunkNum-1)
|
||||||
|
}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package onedrive
|
package onedrive
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -18,6 +17,8 @@ import (
|
||||||
|
|
||||||
model "github.com/cloudreve/Cloudreve/v3/models"
|
model "github.com/cloudreve/Cloudreve/v3/models"
|
||||||
"github.com/cloudreve/Cloudreve/v3/pkg/cache"
|
"github.com/cloudreve/Cloudreve/v3/pkg/cache"
|
||||||
|
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/chunk"
|
||||||
|
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/chunk/backoff"
|
||||||
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
|
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
|
||||||
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
|
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
|
||||||
"github.com/cloudreve/Cloudreve/v3/pkg/request"
|
"github.com/cloudreve/Cloudreve/v3/pkg/request"
|
||||||
|
@ -31,6 +32,7 @@ const (
|
||||||
ChunkSize uint64 = 10 * 1024 * 1024
|
ChunkSize uint64 = 10 * 1024 * 1024
|
||||||
// ListRetry 列取请求重试次数
|
// ListRetry 列取请求重试次数
|
||||||
ListRetry = 1
|
ListRetry = 1
|
||||||
|
chunkRetrySleep = time.Second * 5
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetSourcePath 获取文件的绝对路径
|
// GetSourcePath 获取文件的绝对路径
|
||||||
|
@ -220,28 +222,21 @@ func (client *Client) GetUploadSessionStatus(ctx context.Context, uploadURL stri
|
||||||
}
|
}
|
||||||
|
|
||||||
// UploadChunk 上传分片
|
// UploadChunk 上传分片
|
||||||
func (client *Client) UploadChunk(ctx context.Context, uploadURL string, chunk *Chunk) (*UploadSessionResponse, error) {
|
func (client *Client) UploadChunk(ctx context.Context, uploadURL string, content io.Reader, current *chunk.ChunkGroup) (*UploadSessionResponse, error) {
|
||||||
res, err := client.request(
|
res, err := client.request(
|
||||||
ctx, "PUT", uploadURL, bytes.NewReader(chunk.Data[0:chunk.ChunkSize]),
|
ctx, "PUT", uploadURL, content,
|
||||||
request.WithContentLength(int64(chunk.ChunkSize)),
|
request.WithContentLength(current.Length()),
|
||||||
request.WithHeader(http.Header{
|
request.WithHeader(http.Header{
|
||||||
"Content-Range": {fmt.Sprintf("bytes %d-%d/%d", chunk.Offset, chunk.Offset+chunk.ChunkSize-1, chunk.Total)},
|
"Content-Range": {current.RangeHeader()},
|
||||||
}),
|
}),
|
||||||
request.WithoutHeader([]string{"Authorization", "Content-Type"}),
|
request.WithoutHeader([]string{"Authorization", "Content-Type"}),
|
||||||
request.WithTimeout(time.Duration(300)*time.Second),
|
request.WithTimeout(time.Duration(300)*time.Second),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// 如果重试次数小于限制,5秒后重试
|
return nil, fmt.Errorf("failed to upload OneDrive chunk #%d: %w", current.Index(), err)
|
||||||
if chunk.Retried < model.GetIntSetting("onedrive_chunk_retries", 1) {
|
|
||||||
chunk.Retried++
|
|
||||||
util.Log().Debug("分片偏移%d上传失败[%s],5秒钟后重试", chunk.Offset, err)
|
|
||||||
time.Sleep(time.Duration(5) * time.Second)
|
|
||||||
return client.UploadChunk(ctx, uploadURL, chunk)
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if chunk.IsLast() {
|
if current.IsLast() {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -282,46 +277,24 @@ func (client *Client) Upload(ctx context.Context, file fsctx.FileHeader) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
offset := 0
|
// Initial chunk groups
|
||||||
chunkNum := size / int(ChunkSize)
|
chunks := chunk.NewChunkGroup(file, client.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{
|
||||||
if size%int(ChunkSize) != 0 {
|
Max: model.GetIntSetting("onedrive_chunk_retries", 5),
|
||||||
chunkNum++
|
Sleep: chunkRetrySleep,
|
||||||
}
|
})
|
||||||
|
|
||||||
chunkData := make([]byte, ChunkSize)
|
uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error {
|
||||||
|
_, err := client.UploadChunk(ctx, uploadURL, content, current)
|
||||||
for i := 0; i < chunkNum; i++ {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
util.Log().Debug("OneDrive 客户端取消")
|
|
||||||
return ErrClientCanceled
|
|
||||||
default:
|
|
||||||
// 分块
|
|
||||||
chunkSize := int(ChunkSize)
|
|
||||||
if size-offset < chunkSize {
|
|
||||||
chunkSize = size - offset
|
|
||||||
}
|
|
||||||
|
|
||||||
// 因为后面需要错误重试,这里要把分片内容读到内存中
|
|
||||||
chunkContent := chunkData[:chunkSize]
|
|
||||||
_, err := io.ReadFull(file, chunkContent)
|
|
||||||
|
|
||||||
chunk := Chunk{
|
|
||||||
Offset: offset,
|
|
||||||
ChunkSize: chunkSize,
|
|
||||||
Total: size,
|
|
||||||
Data: chunkContent,
|
|
||||||
}
|
|
||||||
|
|
||||||
// 上传
|
|
||||||
_, err = client.UploadChunk(ctx, uploadURL, &chunk)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
offset += chunkSize
|
|
||||||
|
// upload chunks
|
||||||
|
for chunks.Next() {
|
||||||
|
if err := chunks.Process(uploadFunc); err != nil {
|
||||||
|
return fmt.Errorf("failed to upload chunk #%d: %w", chunks.Index(), err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -354,7 +327,7 @@ func (client *Client) SimpleUpload(ctx context.Context, dst string, body io.Read
|
||||||
if v, ok := ctx.Value(fsctx.RetryCtx).(int); ok {
|
if v, ok := ctx.Value(fsctx.RetryCtx).(int); ok {
|
||||||
retried = v
|
retried = v
|
||||||
}
|
}
|
||||||
if retried < model.GetIntSetting("onedrive_chunk_retries", 1) {
|
if retried < model.GetIntSetting("onedrive_chunk_retries", 5) {
|
||||||
retried++
|
retried++
|
||||||
util.Log().Debug("文件[%s]上传失败[%s],5秒钟后重试", dst, err)
|
util.Log().Debug("文件[%s]上传失败[%s],5秒钟后重试", dst, err)
|
||||||
time.Sleep(time.Duration(5) * time.Second)
|
time.Sleep(time.Duration(5) * time.Second)
|
||||||
|
|
Loading…
Reference in New Issue