mirror of https://github.com/cloudreve/Cloudreve
Fix: capacity should be returned back when put failed
parent
8d437a451c
commit
dc32e85492
|
@ -50,7 +50,27 @@ func (handler Driver) Put(ctx context.Context, file io.ReadCloser, dst string, s
|
||||||
// Delete 删除一个或多个文件,
|
// Delete 删除一个或多个文件,
|
||||||
// 返回未删除的文件,及遇到的最后一个错误
|
// 返回未删除的文件,及遇到的最后一个错误
|
||||||
func (handler Driver) Delete(ctx context.Context, files []string) ([]string, error) {
|
func (handler Driver) Delete(ctx context.Context, files []string) ([]string, error) {
|
||||||
return []string{}, errors.New("未实现")
|
obs := []cossdk.Object{}
|
||||||
|
for _, v := range files {
|
||||||
|
obs = append(obs, cossdk.Object{Key: v})
|
||||||
|
}
|
||||||
|
opt := &cossdk.ObjectDeleteMultiOptions{
|
||||||
|
Objects: obs,
|
||||||
|
Quiet: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
res, _, err := handler.Client.Object.DeleteMulti(context.Background(), opt)
|
||||||
|
if err != nil {
|
||||||
|
return files, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 整理删除结果
|
||||||
|
failed := make([]string, 0, len(files))
|
||||||
|
for _, v := range res.Errors {
|
||||||
|
failed = append(failed, v.Key)
|
||||||
|
}
|
||||||
|
|
||||||
|
return failed, errors.New("删除失败")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Thumb 获取文件缩略图
|
// Thumb 获取文件缩略图
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package onedrive
|
package onedrive
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -9,7 +10,6 @@ import (
|
||||||
"github.com/HFO4/cloudreve/pkg/cache"
|
"github.com/HFO4/cloudreve/pkg/cache"
|
||||||
"github.com/HFO4/cloudreve/pkg/request"
|
"github.com/HFO4/cloudreve/pkg/request"
|
||||||
"github.com/HFO4/cloudreve/pkg/util"
|
"github.com/HFO4/cloudreve/pkg/util"
|
||||||
"github.com/cloudflare/cfssl/log"
|
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -142,7 +142,7 @@ func (client *Client) GetUploadSessionStatus(ctx context.Context, uploadURL stri
|
||||||
// UploadChunk 上传分片
|
// UploadChunk 上传分片
|
||||||
func (client *Client) UploadChunk(ctx context.Context, uploadURL string, chunk *Chunk) (*UploadSessionResponse, error) {
|
func (client *Client) UploadChunk(ctx context.Context, uploadURL string, chunk *Chunk) (*UploadSessionResponse, error) {
|
||||||
res, err := client.request(
|
res, err := client.request(
|
||||||
ctx, "PUT", uploadURL, chunk.Reader,
|
ctx, "PUT", uploadURL, bytes.NewReader(chunk.Data[0:chunk.ChunkSize]),
|
||||||
request.WithContentLength(int64(chunk.ChunkSize)),
|
request.WithContentLength(int64(chunk.ChunkSize)),
|
||||||
request.WithHeader(http.Header{
|
request.WithHeader(http.Header{
|
||||||
"Content-Range": {fmt.Sprintf("bytes %d-%d/%d", chunk.Offset, chunk.Offset+chunk.ChunkSize-1, chunk.Total)},
|
"Content-Range": {fmt.Sprintf("bytes %d-%d/%d", chunk.Offset, chunk.Offset+chunk.ChunkSize-1, chunk.Total)},
|
||||||
|
@ -153,7 +153,7 @@ func (client *Client) UploadChunk(ctx context.Context, uploadURL string, chunk *
|
||||||
// 如果重试次数小于限制,5秒后重试
|
// 如果重试次数小于限制,5秒后重试
|
||||||
if chunk.Retried < model.GetIntSetting("onedrive_chunk_retries", 1) {
|
if chunk.Retried < model.GetIntSetting("onedrive_chunk_retries", 1) {
|
||||||
chunk.Retried++
|
chunk.Retried++
|
||||||
log.Debug("分片偏移%d上传失败,5秒钟后重试", chunk.Offset)
|
util.Log().Debug("分片偏移%d上传失败,5秒钟后重试", chunk.Offset)
|
||||||
time.Sleep(time.Duration(5) * time.Second)
|
time.Sleep(time.Duration(5) * time.Second)
|
||||||
return client.UploadChunk(ctx, uploadURL, chunk)
|
return client.UploadChunk(ctx, uploadURL, chunk)
|
||||||
}
|
}
|
||||||
|
@ -196,6 +196,9 @@ func (client *Client) Upload(ctx context.Context, dst string, size int, file io.
|
||||||
if size%int(ChunkSize) != 0 {
|
if size%int(ChunkSize) != 0 {
|
||||||
chunkNum++
|
chunkNum++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
chunkData := make([]byte, ChunkSize)
|
||||||
|
|
||||||
for i := 0; i < chunkNum; i++ {
|
for i := 0; i < chunkNum; i++ {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -207,17 +210,23 @@ func (client *Client) Upload(ctx context.Context, dst string, size int, file io.
|
||||||
if size-offset < chunkSize {
|
if size-offset < chunkSize {
|
||||||
chunkSize = size - offset
|
chunkSize = size - offset
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 因为后面需要错误重试,这里要把分片内容读到内存中
|
||||||
|
chunkContent := chunkData[:chunkSize]
|
||||||
|
_, err := io.ReadFull(file, chunkContent)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
chunk := Chunk{
|
chunk := Chunk{
|
||||||
Offset: offset,
|
Offset: offset,
|
||||||
ChunkSize: chunkSize,
|
ChunkSize: chunkSize,
|
||||||
Total: size,
|
Total: size,
|
||||||
Reader: &io.LimitedReader{
|
Data: chunkContent,
|
||||||
R: file,
|
|
||||||
N: int64(chunkSize),
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 上传
|
// 上传
|
||||||
_, err := client.UploadChunk(ctx, uploadURL, &chunk)
|
_, err = client.UploadChunk(ctx, uploadURL, &chunk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -388,7 +388,7 @@ func TestClient_UploadChunk(t *testing.T) {
|
||||||
ChunkSize: 10,
|
ChunkSize: 10,
|
||||||
Total: 100,
|
Total: 100,
|
||||||
Retried: 0,
|
Retried: 0,
|
||||||
Reader: strings.NewReader("1231312"),
|
Data: strings.NewReader("1231312"),
|
||||||
})
|
})
|
||||||
clientMock.AssertExpectations(t)
|
clientMock.AssertExpectations(t)
|
||||||
asserts.NoError(err)
|
asserts.NoError(err)
|
||||||
|
@ -418,7 +418,7 @@ func TestClient_UploadChunk(t *testing.T) {
|
||||||
ChunkSize: 10,
|
ChunkSize: 10,
|
||||||
Total: 100,
|
Total: 100,
|
||||||
Retried: 0,
|
Retried: 0,
|
||||||
Reader: strings.NewReader("1231312"),
|
Data: strings.NewReader("1231312"),
|
||||||
})
|
})
|
||||||
clientMock.AssertExpectations(t)
|
clientMock.AssertExpectations(t)
|
||||||
asserts.Error(err)
|
asserts.Error(err)
|
||||||
|
@ -448,7 +448,7 @@ func TestClient_UploadChunk(t *testing.T) {
|
||||||
ChunkSize: 5,
|
ChunkSize: 5,
|
||||||
Total: 100,
|
Total: 100,
|
||||||
Retried: 0,
|
Retried: 0,
|
||||||
Reader: strings.NewReader("1231312"),
|
Data: strings.NewReader("1231312"),
|
||||||
})
|
})
|
||||||
clientMock.AssertExpectations(t)
|
clientMock.AssertExpectations(t)
|
||||||
asserts.NoError(err)
|
asserts.NoError(err)
|
||||||
|
@ -483,7 +483,7 @@ func TestClient_UploadChunk(t *testing.T) {
|
||||||
ChunkSize: 5,
|
ChunkSize: 5,
|
||||||
Total: 100,
|
Total: 100,
|
||||||
Retried: 0,
|
Retried: 0,
|
||||||
Reader: strings.NewReader("1231312"),
|
Data: strings.NewReader("1231312"),
|
||||||
}
|
}
|
||||||
res, err := client.UploadChunk(context.Background(), "http://dev.com", chunk)
|
res, err := client.UploadChunk(context.Background(), "http://dev.com", chunk)
|
||||||
clientMock.AssertExpectations(t)
|
clientMock.AssertExpectations(t)
|
||||||
|
|
|
@ -2,7 +2,6 @@ package onedrive
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/gob"
|
"encoding/gob"
|
||||||
"io"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
@ -90,7 +89,7 @@ type Chunk struct {
|
||||||
ChunkSize int
|
ChunkSize int
|
||||||
Total int
|
Total int
|
||||||
Retried int
|
Retried int
|
||||||
Reader io.Reader
|
Data []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// oauthEndpoint OAuth接口地址
|
// oauthEndpoint OAuth接口地址
|
||||||
|
|
|
@ -43,6 +43,7 @@ func (fs *FileSystem) Upload(ctx context.Context, file FileHeader) (err error) {
|
||||||
// 保存文件
|
// 保存文件
|
||||||
err = fs.Handler.Put(ctx, file, savePath, file.GetSize())
|
err = fs.Handler.Put(ctx, file, savePath, file.GetSize())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
fs.Trigger(ctx, "AfterUploadFailed")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -131,7 +131,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
func lex(s string) (tokenType rune, tokenStr string, remaining string) {
|
func lex(s string) (tokenType rune, tokenStr string, remaining string) {
|
||||||
// The net/textproto Reader that parses the HTTP header will collapse
|
// The net/textproto Data that parses the HTTP header will collapse
|
||||||
// Linear White Space that spans multiple "\r\n" lines to a single " ",
|
// Linear White Space that spans multiple "\r\n" lines to a single " ",
|
||||||
// so we don't need to look for '\r' or '\n'.
|
// so we don't need to look for '\r' or '\n'.
|
||||||
for len(s) > 0 && (s[0] == '\t' || s[0] == ' ') {
|
for len(s) > 0 && (s[0] == '\t' || s[0] == ' ') {
|
||||||
|
|
|
@ -346,6 +346,7 @@ func (h *Handler) handlePut(w http.ResponseWriter, r *http.Request, fs *filesyst
|
||||||
fs.Use("AfterUpload", filesystem.GenericAfterUpload)
|
fs.Use("AfterUpload", filesystem.GenericAfterUpload)
|
||||||
fs.Use("AfterValidateFailed", filesystem.HookDeleteTempFile)
|
fs.Use("AfterValidateFailed", filesystem.HookDeleteTempFile)
|
||||||
fs.Use("AfterValidateFailed", filesystem.HookGiveBackCapacity)
|
fs.Use("AfterValidateFailed", filesystem.HookGiveBackCapacity)
|
||||||
|
fs.Use("AfterUploadFailed", filesystem.HookGiveBackCapacity)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 执行上传
|
// 执行上传
|
||||||
|
|
|
@ -288,6 +288,7 @@ func FileUploadStream(c *gin.Context) {
|
||||||
fs.Use("AfterUpload", filesystem.GenericAfterUpload)
|
fs.Use("AfterUpload", filesystem.GenericAfterUpload)
|
||||||
fs.Use("AfterValidateFailed", filesystem.HookDeleteTempFile)
|
fs.Use("AfterValidateFailed", filesystem.HookDeleteTempFile)
|
||||||
fs.Use("AfterValidateFailed", filesystem.HookGiveBackCapacity)
|
fs.Use("AfterValidateFailed", filesystem.HookGiveBackCapacity)
|
||||||
|
fs.Use("AfterUploadFailed", filesystem.HookGiveBackCapacity)
|
||||||
|
|
||||||
// 执行上传
|
// 执行上传
|
||||||
uploadCtx := context.WithValue(ctx, fsctx.GinCtx, c)
|
uploadCtx := context.WithValue(ctx, fsctx.GinCtx, c)
|
||||||
|
|
Loading…
Reference in New Issue