From e31a6cbcb31fd4fb6787f34a160320b979eb1e80 Mon Sep 17 00:00:00 2001 From: Aaron Liu Date: Tue, 5 Aug 2025 12:02:17 +0800 Subject: [PATCH] fix(workflow): concurrent read&write to progress map while transfer files in batch (#2737) --- assets | 2 +- pkg/filemanager/fs/dbfs/manage.go | 2 +- pkg/filemanager/workflows/remote_download.go | 40 +++++++++++--------- pkg/filemanager/workflows/upload.go | 35 ++++++++++------- 4 files changed, 46 insertions(+), 33 deletions(-) diff --git a/assets b/assets index 0b49582..c4a6593 160000 --- a/assets +++ b/assets @@ -1 +1 @@ -Subproject commit 0b49582a07eccfd63896dd18f7d944f7e96ed47d +Subproject commit c4a6593921d34ec47d78d5288c2d1c0865d435b6 diff --git a/pkg/filemanager/fs/dbfs/manage.go b/pkg/filemanager/fs/dbfs/manage.go index 914b1f0..18657e4 100644 --- a/pkg/filemanager/fs/dbfs/manage.go +++ b/pkg/filemanager/fs/dbfs/manage.go @@ -121,7 +121,7 @@ func (f *DBFS) Create(ctx context.Context, path *fs.URI, fileType types.FileType ancestor = newFile(ancestor, newFolder) } else { // valide file name - policy, err := f.getPreferredPolicy(ctx, ancestor, 0) + policy, err := f.getPreferredPolicy(ctx, ancestor) if err != nil { return nil, err } diff --git a/pkg/filemanager/workflows/remote_download.go b/pkg/filemanager/workflows/remote_download.go index efe9bfd..4425c4d 100644 --- a/pkg/filemanager/workflows/remote_download.go +++ b/pkg/filemanager/workflows/remote_download.go @@ -432,12 +432,6 @@ func (m *RemoteDownloadTask) masterTransfer(ctx context.Context, dep dependency. ae := serializer.NewAggregateError() transferFunc := func(workerId int, file downloader.TaskFile) { - defer func() { - atomic.AddInt64(&m.progress[ProgressTypeUploadCount].Current, 1) - worker <- workerId - wg.Done() - }() - sanitizedName := sanitizeFileName(file.Name) dst := dstUri.JoinRaw(sanitizedName) src := filepath.FromSlash(path.Join(m.state.Status.SavePath, file.Name)) @@ -446,12 +440,21 @@ func (m *RemoteDownloadTask) masterTransfer(ctx context.Context, dep dependency. progressKey := fmt.Sprintf("%s%d", ProgressTypeUploadSinglePrefix, workerId) m.Lock() m.progress[progressKey] = &queue.Progress{Identifier: dst.String(), Total: file.Size} + fileProgress := m.progress[progressKey] + uploadProgress := m.progress[ProgressTypeUpload] + uploadCountProgress := m.progress[ProgressTypeUploadCount] m.Unlock() + defer func() { + atomic.AddInt64(&uploadCountProgress.Current, 1) + worker <- workerId + wg.Done() + }() + fileStream, err := os.Open(src) if err != nil { m.l.Warning("Failed to open file %s: %s", src, err.Error()) - atomic.AddInt64(&m.progress[ProgressTypeUpload].Current, file.Size) + atomic.AddInt64(&uploadProgress.Current, file.Size) atomic.AddInt64(&failed, 1) ae.Add(file.Name, fmt.Errorf("failed to open file: %w", err)) return @@ -465,8 +468,8 @@ func (m *RemoteDownloadTask) masterTransfer(ctx context.Context, dep dependency. Size: file.Size, }, ProgressFunc: func(current, diff int64, total int64) { - atomic.AddInt64(&m.progress[progressKey].Current, diff) - atomic.AddInt64(&m.progress[ProgressTypeUpload].Current, diff) + atomic.AddInt64(&fileProgress.Current, diff) + atomic.AddInt64(&uploadProgress.Current, diff) }, File: fileStream, } @@ -475,7 +478,7 @@ func (m *RemoteDownloadTask) masterTransfer(ctx context.Context, dep dependency. if err != nil { m.l.Warning("Failed to upload file %s: %s", src, err.Error()) atomic.AddInt64(&failed, 1) - atomic.AddInt64(&m.progress[ProgressTypeUpload].Current, file.Size) + atomic.AddInt64(&uploadProgress.Current, file.Size) ae.Add(file.Name, fmt.Errorf("failed to upload file: %w", err)) return } @@ -490,8 +493,10 @@ func (m *RemoteDownloadTask) masterTransfer(ctx context.Context, dep dependency. // Check if file is already transferred if _, ok := m.state.Transferred[file.Index]; ok { m.l.Info("File %s already transferred, skipping...", file.Name) + m.Lock() atomic.AddInt64(&m.progress[ProgressTypeUpload].Current, file.Size) atomic.AddInt64(&m.progress[ProgressTypeUploadCount].Current, 1) + m.Unlock() continue } @@ -625,19 +630,18 @@ func (m *RemoteDownloadTask) Progress(ctx context.Context) queue.Progresses { m.Lock() defer m.Unlock() - if m.state.NodeState.progress != nil { - merged := make(queue.Progresses) - for k, v := range m.progress { - merged[k] = v - } + merged := make(queue.Progresses) + for k, v := range m.progress { + merged[k] = v + } + if m.state.NodeState.progress != nil { for k, v := range m.state.NodeState.progress { merged[k] = v } - - return merged } - return m.progress + + return merged } func sanitizeFileName(name string) string { diff --git a/pkg/filemanager/workflows/upload.go b/pkg/filemanager/workflows/upload.go index 65b36ee..b50a035 100644 --- a/pkg/filemanager/workflows/upload.go +++ b/pkg/filemanager/workflows/upload.go @@ -115,23 +115,26 @@ func (t *SlaveUploadTask) Do(ctx context.Context) (task.Status, error) { atomic.StoreInt64(&t.progress[ProgressTypeUpload].Total, totalSize) ae := serializer.NewAggregateError() transferFunc := func(workerId, fileId int, file SlaveUploadEntity) { - defer func() { - atomic.AddInt64(&t.progress[ProgressTypeUploadCount].Current, 1) - worker <- workerId - wg.Done() - }() - t.l.Info("Uploading file %s to %s...", file.Src, file.Uri.String()) progressKey := fmt.Sprintf("%s%d", ProgressTypeUploadSinglePrefix, workerId) t.Lock() t.progress[progressKey] = &queue.Progress{Identifier: file.Uri.String(), Total: file.Size} + fileProgress := t.progress[progressKey] + uploadProgress := t.progress[ProgressTypeUpload] + uploadCountProgress := t.progress[ProgressTypeUploadCount] t.Unlock() + defer func() { + atomic.AddInt64(&uploadCountProgress.Current, 1) + worker <- workerId + wg.Done() + }() + handle, err := os.Open(filepath.FromSlash(file.Src)) if err != nil { t.l.Warning("Failed to open file %s: %s", file.Src, err.Error()) - atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, file.Size) + atomic.AddInt64(&fileProgress.Current, file.Size) ae.Add(path.Base(file.Src), fmt.Errorf("failed to open file: %w", err)) return } @@ -140,7 +143,7 @@ func (t *SlaveUploadTask) Do(ctx context.Context) (task.Status, error) { if err != nil { t.l.Warning("Failed to get file stat for %s: %s", file.Src, err.Error()) handle.Close() - atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, file.Size) + atomic.AddInt64(&fileProgress.Current, file.Size) ae.Add(path.Base(file.Src), fmt.Errorf("failed to get file stat: %w", err)) return } @@ -151,9 +154,9 @@ func (t *SlaveUploadTask) Do(ctx context.Context) (task.Status, error) { Size: stat.Size(), }, ProgressFunc: func(current, diff int64, total int64) { - atomic.AddInt64(&t.progress[progressKey].Current, diff) - atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, diff) - atomic.StoreInt64(&t.progress[progressKey].Total, total) + atomic.AddInt64(&fileProgress.Current, diff) + atomic.AddInt64(&uploadCountProgress.Current, 1) + atomic.StoreInt64(&fileProgress.Total, total) }, File: handle, Seeker: handle, @@ -163,7 +166,7 @@ func (t *SlaveUploadTask) Do(ctx context.Context) (task.Status, error) { if err != nil { handle.Close() t.l.Warning("Failed to upload file %s: %s", file.Src, err.Error()) - atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, file.Size) + atomic.AddInt64(&uploadProgress.Current, file.Size) ae.Add(path.Base(file.Src), fmt.Errorf("failed to upload file: %w", err)) return } @@ -179,8 +182,10 @@ func (t *SlaveUploadTask) Do(ctx context.Context) (task.Status, error) { // Check if file is already transferred if _, ok := t.state.Transferred[fileId]; ok { t.l.Info("File %s already transferred, skipping...", file.Src) + t.Lock() atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, file.Size) atomic.AddInt64(&t.progress[ProgressTypeUploadCount].Current, 1) + t.Unlock() continue } @@ -221,5 +226,9 @@ func (m *SlaveUploadTask) Progress(ctx context.Context) queue.Progresses { m.Lock() defer m.Unlock() - return m.progress + res := make(queue.Progresses) + for k, v := range m.progress { + res[k] = v + } + return res }