Cloudreve/pkg/filemanager/workflows/upload.go

235 lines
6.5 KiB
Go

package workflows
import (
"context"
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"sync"
"sync/atomic"
"github.com/cloudreve/Cloudreve/v4/application/dependency"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/ent/task"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/manager"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
"github.com/cloudreve/Cloudreve/v4/pkg/serializer"
)
type (
SlaveUploadEntity struct {
Uri *fs.URI `json:"uri"`
Src string `json:"src"`
Size int64 `json:"size"`
Index int `json:"index"`
}
SlaveUploadTaskState struct {
MaxParallel int `json:"max_parallel"`
Files []SlaveUploadEntity `json:"files"`
Transferred map[int]interface{} `json:"transferred"`
UserID int `json:"user_id"`
First5TransferErrors string `json:"first_5_transfer_errors,omitempty"`
}
SlaveUploadTask struct {
*queue.InMemoryTask
progress queue.Progresses
l logging.Logger
state *SlaveUploadTaskState
node cluster.Node
}
)
// NewSlaveUploadTask creates a new SlaveUploadTask from raw private state
func NewSlaveUploadTask(ctx context.Context, props *types.SlaveTaskProps, id int, state string) queue.Task {
return &SlaveUploadTask{
InMemoryTask: &queue.InMemoryTask{
DBTask: &queue.DBTask{
Task: &ent.Task{
ID: id,
CorrelationID: logging.CorrelationID(ctx),
PublicState: &types.TaskPublicState{
SlaveTaskProps: props,
},
PrivateState: state,
},
},
},
progress: make(queue.Progresses),
}
}
func (t *SlaveUploadTask) Do(ctx context.Context) (task.Status, error) {
ctx = prepareSlaveTaskCtx(ctx, t.Model().PublicState.SlaveTaskProps)
dep := dependency.FromContext(ctx)
t.l = dep.Logger()
np, err := dep.NodePool(ctx)
if err != nil {
return task.StatusError, fmt.Errorf("failed to get node pool: %w", err)
}
t.node, err = np.Get(ctx, types.NodeCapabilityNone, 0)
if err != nil || !t.node.IsMaster() {
return task.StatusError, fmt.Errorf("failed to get master node: %w", err)
}
fm := manager.NewFileManager(dep, nil)
// unmarshal state
state := &SlaveUploadTaskState{}
if err := json.Unmarshal([]byte(t.State()), state); err != nil {
return task.StatusError, fmt.Errorf("failed to unmarshal state: %w", err)
}
t.state = state
if t.state.Transferred == nil {
t.state.Transferred = make(map[int]interface{})
}
wg := sync.WaitGroup{}
worker := make(chan int, t.state.MaxParallel)
for i := 0; i < t.state.MaxParallel; i++ {
worker <- i
}
// Sum up total count
totalCount := 0
totalSize := int64(0)
for _, res := range state.Files {
totalSize += res.Size
totalCount++
}
t.Lock()
t.progress[ProgressTypeUploadCount] = &queue.Progress{}
t.progress[ProgressTypeUpload] = &queue.Progress{}
t.Unlock()
atomic.StoreInt64(&t.progress[ProgressTypeUploadCount].Total, int64(totalCount))
atomic.StoreInt64(&t.progress[ProgressTypeUpload].Total, totalSize)
ae := serializer.NewAggregateError()
transferFunc := func(workerId, fileId int, file SlaveUploadEntity) {
t.l.Info("Uploading file %s to %s...", file.Src, file.Uri.String())
progressKey := fmt.Sprintf("%s%d", ProgressTypeUploadSinglePrefix, workerId)
t.Lock()
t.progress[progressKey] = &queue.Progress{Identifier: file.Uri.String(), Total: file.Size}
fileProgress := t.progress[progressKey]
uploadProgress := t.progress[ProgressTypeUpload]
uploadCountProgress := t.progress[ProgressTypeUploadCount]
t.Unlock()
defer func() {
atomic.AddInt64(&uploadCountProgress.Current, 1)
worker <- workerId
wg.Done()
}()
handle, err := os.Open(filepath.FromSlash(file.Src))
if err != nil {
t.l.Warning("Failed to open file %s: %s", file.Src, err.Error())
atomic.AddInt64(&fileProgress.Current, file.Size)
ae.Add(path.Base(file.Src), fmt.Errorf("failed to open file: %w", err))
return
}
stat, err := handle.Stat()
if err != nil {
t.l.Warning("Failed to get file stat for %s: %s", file.Src, err.Error())
handle.Close()
atomic.AddInt64(&fileProgress.Current, file.Size)
ae.Add(path.Base(file.Src), fmt.Errorf("failed to get file stat: %w", err))
return
}
fileData := &fs.UploadRequest{
Props: &fs.UploadProps{
Uri: file.Uri,
Size: stat.Size(),
},
ProgressFunc: func(current, diff int64, total int64) {
atomic.AddInt64(&fileProgress.Current, diff)
atomic.AddInt64(&uploadCountProgress.Current, 1)
atomic.StoreInt64(&fileProgress.Total, total)
},
File: handle,
Seeker: handle,
}
_, err = fm.Update(ctx, fileData, fs.WithNode(t.node), fs.WithStatelessUserID(t.state.UserID), fs.WithNoEntityType())
if err != nil {
handle.Close()
t.l.Warning("Failed to upload file %s: %s", file.Src, err.Error())
atomic.AddInt64(&uploadProgress.Current, file.Size)
ae.Add(path.Base(file.Src), fmt.Errorf("failed to upload file: %w", err))
return
}
t.Lock()
t.state.Transferred[fileId] = nil
t.Unlock()
handle.Close()
}
// Start upload files
for fileId, file := range t.state.Files {
// Check if file is already transferred
if _, ok := t.state.Transferred[fileId]; ok {
t.l.Info("File %s already transferred, skipping...", file.Src)
t.Lock()
atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, file.Size)
atomic.AddInt64(&t.progress[ProgressTypeUploadCount].Current, 1)
t.Unlock()
continue
}
select {
case <-ctx.Done():
return task.StatusError, ctx.Err()
case workerId := <-worker:
wg.Add(1)
go transferFunc(workerId, fileId, file)
}
}
wg.Wait()
t.state.First5TransferErrors = ae.FormatFirstN(5)
newStateStr, marshalErr := json.Marshal(t.state)
if marshalErr != nil {
return task.StatusError, fmt.Errorf("failed to marshal state: %w", marshalErr)
}
t.Lock()
t.Task.PrivateState = string(newStateStr)
t.Unlock()
// If all files are failed to transfer, return error
if len(t.state.Transferred) != len(t.state.Files) {
t.l.Warning("%d files not transferred", len(t.state.Files)-len(t.state.Transferred))
if len(t.state.Transferred) == 0 {
return task.StatusError, fmt.Errorf("all file failed to transfer")
}
}
return task.StatusCompleted, nil
}
func (m *SlaveUploadTask) Progress(ctx context.Context) queue.Progresses {
m.Lock()
defer m.Unlock()
res := make(queue.Progresses)
for k, v := range m.progress {
res[k] = v
}
return res
}