mirror of https://github.com/Xhofe/alist
- Add support for remote and OneDrive storage types - Implement new upload methods for different storage types - Update driver to handle various storage policies - Add error handling and session cleanup for failed uploadspull/7591/head^2
parent
016e169c41
commit
2a035302b2
|
@ -10,6 +10,7 @@ import (
|
||||||
|
|
||||||
"github.com/alist-org/alist/v3/drivers/base"
|
"github.com/alist-org/alist/v3/drivers/base"
|
||||||
"github.com/alist-org/alist/v3/internal/driver"
|
"github.com/alist-org/alist/v3/internal/driver"
|
||||||
|
"github.com/alist-org/alist/v3/internal/errs"
|
||||||
"github.com/alist-org/alist/v3/internal/model"
|
"github.com/alist-org/alist/v3/internal/model"
|
||||||
"github.com/alist-org/alist/v3/pkg/utils"
|
"github.com/alist-org/alist/v3/pkg/utils"
|
||||||
"github.com/go-resty/resty/v2"
|
"github.com/go-resty/resty/v2"
|
||||||
|
@ -134,6 +135,8 @@ func (d *Cloudreve) Put(ctx context.Context, dstDir model.Obj, stream model.File
|
||||||
if io.ReadCloser(stream) == http.NoBody {
|
if io.ReadCloser(stream) == http.NoBody {
|
||||||
return d.create(ctx, dstDir, stream)
|
return d.create(ctx, dstDir, stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 获取存储策略
|
||||||
var r DirectoryResp
|
var r DirectoryResp
|
||||||
err := d.request(http.MethodGet, "/directory"+dstDir.GetPath(), nil, &r)
|
err := d.request(http.MethodGet, "/directory"+dstDir.GetPath(), nil, &r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -146,6 +149,8 @@ func (d *Cloudreve) Put(ctx context.Context, dstDir model.Obj, stream model.File
|
||||||
"policy_id": r.Policy.Id,
|
"policy_id": r.Policy.Id,
|
||||||
"last_modified": stream.ModTime().Unix(),
|
"last_modified": stream.ModTime().Unix(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 获取上传会话信息
|
||||||
var u UploadInfo
|
var u UploadInfo
|
||||||
err = d.request(http.MethodPut, "/file/upload", func(req *resty.Request) {
|
err = d.request(http.MethodPut, "/file/upload", func(req *resty.Request) {
|
||||||
req.SetBody(uploadBody)
|
req.SetBody(uploadBody)
|
||||||
|
@ -153,6 +158,14 @@ func (d *Cloudreve) Put(ctx context.Context, dstDir model.Obj, stream model.File
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 根据存储方式选择分片上传的方法
|
||||||
|
switch r.Policy.Type {
|
||||||
|
case "onedrive":
|
||||||
|
err = d.upOneDrive(ctx, stream, u, up)
|
||||||
|
case "remote": // 从机存储
|
||||||
|
err = d.upRemote(ctx, stream, u, up)
|
||||||
|
case "local": // 本机存储
|
||||||
var chunkSize = u.ChunkSize
|
var chunkSize = u.ChunkSize
|
||||||
var buf []byte
|
var buf []byte
|
||||||
var chunk int
|
var chunk int
|
||||||
|
@ -166,7 +179,6 @@ func (d *Cloudreve) Put(ctx context.Context, dstDir model.Obj, stream model.File
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -180,10 +192,17 @@ func (d *Cloudreve) Put(ctx context.Context, dstDir model.Obj, stream model.File
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
chunk++
|
chunk++
|
||||||
|
|
||||||
}
|
}
|
||||||
|
default:
|
||||||
|
err = errs.NotImplement
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
// 删除失败的会话
|
||||||
|
err = d.request(http.MethodDelete, "/file/upload/"+u.SessionID, nil, nil)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (d *Cloudreve) create(ctx context.Context, dir model.Obj, file model.Obj) error {
|
func (d *Cloudreve) create(ctx context.Context, dir model.Obj, file model.Obj) error {
|
||||||
body := base.Json{"path": dir.GetPath() + "/" + file.GetName()}
|
body := base.Json{"path": dir.GetPath() + "/" + file.GetName()}
|
||||||
|
|
|
@ -24,6 +24,8 @@ type UploadInfo struct {
|
||||||
SessionID string `json:"sessionID"`
|
SessionID string `json:"sessionID"`
|
||||||
ChunkSize int `json:"chunkSize"`
|
ChunkSize int `json:"chunkSize"`
|
||||||
Expires int `json:"expires"`
|
Expires int `json:"expires"`
|
||||||
|
UploadURLs []string `json:"uploadURLs"`
|
||||||
|
Credential string `json:"credential,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type DirectoryResp struct {
|
type DirectoryResp struct {
|
||||||
|
|
|
@ -1,16 +1,23 @@
|
||||||
package cloudreve
|
package cloudreve
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/alist-org/alist/v3/drivers/base"
|
"github.com/alist-org/alist/v3/drivers/base"
|
||||||
"github.com/alist-org/alist/v3/internal/conf"
|
"github.com/alist-org/alist/v3/internal/conf"
|
||||||
|
"github.com/alist-org/alist/v3/internal/driver"
|
||||||
"github.com/alist-org/alist/v3/internal/model"
|
"github.com/alist-org/alist/v3/internal/model"
|
||||||
"github.com/alist-org/alist/v3/internal/setting"
|
"github.com/alist-org/alist/v3/internal/setting"
|
||||||
"github.com/alist-org/alist/v3/pkg/cookie"
|
"github.com/alist-org/alist/v3/pkg/cookie"
|
||||||
|
"github.com/alist-org/alist/v3/pkg/utils"
|
||||||
"github.com/go-resty/resty/v2"
|
"github.com/go-resty/resty/v2"
|
||||||
json "github.com/json-iterator/go"
|
json "github.com/json-iterator/go"
|
||||||
jsoniter "github.com/json-iterator/go"
|
jsoniter "github.com/json-iterator/go"
|
||||||
|
@ -172,3 +179,95 @@ func (d *Cloudreve) GetThumb(file Object) (model.Thumbnail, error) {
|
||||||
Thumbnail: resp.Header().Get("Location"),
|
Thumbnail: resp.Header().Get("Location"),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Cloudreve) upRemote(ctx context.Context, stream model.FileStreamer, u UploadInfo, up driver.UpdateProgress) error {
|
||||||
|
uploadUrl := u.UploadURLs[0]
|
||||||
|
credential := u.Credential
|
||||||
|
var finish int64 = 0
|
||||||
|
var chunk int = 0
|
||||||
|
DEFAULT := int64(u.ChunkSize)
|
||||||
|
for finish < stream.GetSize() {
|
||||||
|
if utils.IsCanceled(ctx) {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
utils.Log.Debugf("[Cloudreve-Remote] upload: %d", finish)
|
||||||
|
var byteSize = DEFAULT
|
||||||
|
left := stream.GetSize() - finish
|
||||||
|
if left < DEFAULT {
|
||||||
|
byteSize = left
|
||||||
|
}
|
||||||
|
byteData := make([]byte, byteSize)
|
||||||
|
n, err := io.ReadFull(stream, byteData)
|
||||||
|
utils.Log.Debug(err, n)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req, err := http.NewRequest("POST", uploadUrl+"?chunk="+strconv.Itoa(chunk), bytes.NewBuffer(byteData))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
req.Header.Set("Content-Length", strconv.Itoa(int(byteSize)))
|
||||||
|
req.Header.Set("Authorization", fmt.Sprint(credential))
|
||||||
|
finish += byteSize
|
||||||
|
res, err := base.HttpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
res.Body.Close()
|
||||||
|
up(float64(finish) * 100 / float64(stream.GetSize()))
|
||||||
|
chunk++
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Cloudreve) upOneDrive(ctx context.Context, stream model.FileStreamer, u UploadInfo, up driver.UpdateProgress) error {
|
||||||
|
uploadUrl := u.UploadURLs[0]
|
||||||
|
var finish int64 = 0
|
||||||
|
DEFAULT := int64(u.ChunkSize)
|
||||||
|
for finish < stream.GetSize() {
|
||||||
|
if utils.IsCanceled(ctx) {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
utils.Log.Debugf("[Cloudreve-OneDrive] upload: %d", finish)
|
||||||
|
var byteSize = DEFAULT
|
||||||
|
left := stream.GetSize() - finish
|
||||||
|
if left < DEFAULT {
|
||||||
|
byteSize = left
|
||||||
|
}
|
||||||
|
byteData := make([]byte, byteSize)
|
||||||
|
n, err := io.ReadFull(stream, byteData)
|
||||||
|
utils.Log.Debug(err, n)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req, err := http.NewRequest("PUT", uploadUrl, bytes.NewBuffer(byteData))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
req.Header.Set("Content-Length", strconv.Itoa(int(byteSize)))
|
||||||
|
req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", finish, finish+byteSize-1, stream.GetSize()))
|
||||||
|
finish += byteSize
|
||||||
|
res, err := base.HttpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// https://learn.microsoft.com/zh-cn/onedrive/developer/rest-api/api/driveitem_createuploadsession
|
||||||
|
if res.StatusCode != 201 && res.StatusCode != 202 && res.StatusCode != 200 {
|
||||||
|
data, _ := io.ReadAll(res.Body)
|
||||||
|
res.Body.Close()
|
||||||
|
return errors.New(string(data))
|
||||||
|
}
|
||||||
|
res.Body.Close()
|
||||||
|
up(float64(finish) * 100 / float64(stream.GetSize()))
|
||||||
|
}
|
||||||
|
// 上传成功发送回调请求
|
||||||
|
err := d.request(http.MethodPost, "/callback/onedrive/finish/"+u.SessionID, func(req *resty.Request) {
|
||||||
|
req.SetBody("{}")
|
||||||
|
}, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue