Cloudreve/pkg/filemanager/driver/remote/client.go

292 lines
8.1 KiB
Go

package remote
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/cloudreve/Cloudreve/v4/application/constants"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/pkg/auth"
"github.com/cloudreve/Cloudreve/v4/pkg/cluster/routes"
"github.com/cloudreve/Cloudreve/v4/pkg/conf"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/chunk"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/chunk/backoff"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/driver"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/request"
"github.com/cloudreve/Cloudreve/v4/pkg/serializer"
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
"github.com/gofrs/uuid"
)
const (
OverwriteHeader = constants.CrHeaderPrefix + "Overwrite"
chunkRetrySleep = time.Duration(5) * time.Second
)
// Client to operate uploading to remote slave server
type Client interface {
// CreateUploadSession creates remote upload session
CreateUploadSession(ctx context.Context, session *fs.UploadSession, overwrite bool) error
// GetUploadURL signs an url for uploading file
GetUploadURL(ctx context.Context, expires time.Time, sessionID string) (string, string, error)
// Upload uploads file to remote server
Upload(ctx context.Context, file *fs.UploadRequest) error
// DeleteUploadSession deletes remote upload session
DeleteUploadSession(ctx context.Context, sessionID string) error
// MediaMeta gets media meta from remote server
MediaMeta(ctx context.Context, src, ext string) ([]driver.MediaMeta, error)
// DeleteFiles deletes files from remote server
DeleteFiles(ctx context.Context, files ...string) ([]string, error)
// List lists files from remote server
List(ctx context.Context, path string, recursive bool) ([]fs.PhysicalObject, error)
}
type DeleteFileRequest struct {
Files []string `json:"files"`
}
// NewClient creates new Client from given policy
func NewClient(ctx context.Context, policy *ent.StoragePolicy, settings setting.Provider, config conf.ConfigProvider, l logging.Logger) (Client, error) {
if policy.Edges.Node == nil {
return nil, fmt.Errorf("remote storage policy %d has no node", policy.ID)
}
authInstance := auth.HMACAuth{[]byte(policy.Edges.Node.SlaveKey)}
serverURL, err := url.Parse(policy.Edges.Node.Server)
if err != nil {
return nil, err
}
base, _ := url.Parse(constants.APIPrefixSlave)
return &remoteClient{
policy: policy,
authInstance: authInstance,
httpClient: request.NewClient(
config,
request.WithEndpoint(serverURL.ResolveReference(base).String()),
request.WithCredential(authInstance, int64(settings.SlaveRequestSignTTL(ctx))),
request.WithSlaveMeta(policy.Edges.Node.ID),
request.WithMasterMeta(settings.SiteBasic(ctx).ID, settings.SiteURL(setting.UseFirstSiteUrl(ctx)).String()),
request.WithCorrelationID(),
),
settings: settings,
l: l,
}, nil
}
type remoteClient struct {
policy *ent.StoragePolicy
authInstance auth.Auth
httpClient request.Client
settings setting.Provider
l logging.Logger
}
func (c *remoteClient) Upload(ctx context.Context, file *fs.UploadRequest) error {
ttl := c.settings.UploadSessionTTL(ctx)
session := &fs.UploadSession{
Props: file.Props.Copy(),
Policy: c.policy,
}
session.Props.UploadSessionID = uuid.Must(uuid.NewV4()).String()
session.Props.ExpireAt = time.Now().Add(ttl)
// Create upload session
overwrite := file.Mode&fs.ModeOverwrite == fs.ModeOverwrite
if err := c.CreateUploadSession(ctx, session, overwrite); err != nil {
return fmt.Errorf("failed to create upload session: %w", err)
}
// Initial chunk groups
chunks := chunk.NewChunkGroup(file, c.policy.Settings.ChunkSize, &backoff.ConstantBackoff{
Max: c.settings.ChunkRetryLimit(ctx),
Sleep: chunkRetrySleep,
}, c.settings.UseChunkBuffer(ctx), c.l, c.settings.TempPath(ctx))
uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error {
return c.uploadChunk(ctx, session.Props.UploadSessionID, current.Index(), content, overwrite, current.Length())
}
// upload chunks
for chunks.Next() {
if err := chunks.Process(uploadFunc); err != nil {
if err := c.DeleteUploadSession(ctx, session.Props.UploadSessionID); err != nil {
c.l.Warning("failed to delete upload session: %s", err)
}
return fmt.Errorf("failed to upload chunk #%d: %w", chunks.Index(), err)
}
}
return nil
}
func (c *remoteClient) DeleteUploadSession(ctx context.Context, sessionID string) error {
resp, err := c.httpClient.Request(
"DELETE",
"upload/"+sessionID,
nil,
request.WithContext(ctx),
request.WithLogger(logging.FromContext(ctx)),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return err
}
if resp.Code != 0 {
return serializer.NewErrorFromResponse(resp)
}
return nil
}
func (c *remoteClient) DeleteFiles(ctx context.Context, files ...string) ([]string, error) {
req := &DeleteFileRequest{
Files: files,
}
reqStr, err := json.Marshal(req)
if err != nil {
return files, fmt.Errorf("failed to marshal delete request: %w", err)
}
resp, err := c.httpClient.Request(
"DELETE",
"file",
bytes.NewReader(reqStr),
request.WithContext(ctx),
request.WithLogger(logging.FromContext(ctx)),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return files, err
}
if resp.Code != 0 {
var failed []string
failed = files
if resp.Code == serializer.CodeNotFullySuccess {
resp.GobDecode(&failed)
}
return failed, fmt.Errorf(resp.Error)
}
return nil, nil
}
func (c *remoteClient) MediaMeta(ctx context.Context, src, ext string) ([]driver.MediaMeta, error) {
resp, err := c.httpClient.Request(
http.MethodGet,
routes.SlaveMediaMetaRoute(src, ext),
nil,
request.WithContext(ctx),
request.WithLogger(c.l),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return nil, err
}
if resp.Code != 0 {
return nil, fmt.Errorf(resp.Error)
}
var metas []driver.MediaMeta
resp.GobDecode(&metas)
return metas, nil
}
func (c *remoteClient) CreateUploadSession(ctx context.Context, session *fs.UploadSession, overwrite bool) error {
reqBodyEncoded, err := json.Marshal(map[string]interface{}{
"session": session,
"overwrite": overwrite,
})
if err != nil {
return err
}
bodyReader := strings.NewReader(string(reqBodyEncoded))
resp, err := c.httpClient.Request(
"PUT",
"upload",
bodyReader,
request.WithContext(ctx),
request.WithLogger(c.l),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return err
}
if resp.Code != 0 {
return serializer.NewErrorFromResponse(resp)
}
return nil
}
func (c *remoteClient) List(ctx context.Context, path string, recursive bool) ([]fs.PhysicalObject, error) {
resp, err := c.httpClient.Request(
http.MethodGet,
routes.SlaveFileListRoute(path, recursive),
nil,
request.WithContext(ctx),
request.WithLogger(c.l),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return nil, err
}
if resp.Code != 0 {
return nil, serializer.NewErrorFromResponse(resp)
}
var objects []fs.PhysicalObject
resp.GobDecode(&objects)
return objects, nil
}
func (c *remoteClient) GetUploadURL(ctx context.Context, expires time.Time, sessionID string) (string, string, error) {
base, err := url.Parse(c.policy.Edges.Node.Server)
if err != nil {
return "", "", err
}
req, err := http.NewRequest("POST", routes.SlaveUploadUrl(base, sessionID).String(), nil)
if err != nil {
return "", "", err
}
req = auth.SignRequest(ctx, c.authInstance, req, &expires)
return req.URL.String(), req.Header["Authorization"][0], nil
}
func (c *remoteClient) uploadChunk(ctx context.Context, sessionID string, index int, chunk io.Reader, overwrite bool, size int64) error {
resp, err := c.httpClient.Request(
"POST",
fmt.Sprintf("upload/%s?chunk=%d", sessionID, index),
chunk,
request.WithContext(ctx),
request.WithTimeout(time.Duration(0)),
request.WithContentLength(size),
request.WithHeader(map[string][]string{OverwriteHeader: {fmt.Sprintf("%t", overwrite)}}),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return err
}
if resp.Code != 0 {
return serializer.NewErrorFromResponse(resp)
}
return nil
}