fix(workflow): concurrent read&write to progress map while transfer files in batch (#2737)

pull/2724/head^2
Aaron Liu 2025-08-05 12:02:17 +08:00
parent 51d9e06f21
commit e31a6cbcb3
4 changed files with 46 additions and 33 deletions

2
assets

@ -1 +1 @@
Subproject commit 0b49582a07eccfd63896dd18f7d944f7e96ed47d Subproject commit c4a6593921d34ec47d78d5288c2d1c0865d435b6

View File

@ -121,7 +121,7 @@ func (f *DBFS) Create(ctx context.Context, path *fs.URI, fileType types.FileType
ancestor = newFile(ancestor, newFolder) ancestor = newFile(ancestor, newFolder)
} else { } else {
// valide file name // valide file name
policy, err := f.getPreferredPolicy(ctx, ancestor, 0) policy, err := f.getPreferredPolicy(ctx, ancestor)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -432,12 +432,6 @@ func (m *RemoteDownloadTask) masterTransfer(ctx context.Context, dep dependency.
ae := serializer.NewAggregateError() ae := serializer.NewAggregateError()
transferFunc := func(workerId int, file downloader.TaskFile) { transferFunc := func(workerId int, file downloader.TaskFile) {
defer func() {
atomic.AddInt64(&m.progress[ProgressTypeUploadCount].Current, 1)
worker <- workerId
wg.Done()
}()
sanitizedName := sanitizeFileName(file.Name) sanitizedName := sanitizeFileName(file.Name)
dst := dstUri.JoinRaw(sanitizedName) dst := dstUri.JoinRaw(sanitizedName)
src := filepath.FromSlash(path.Join(m.state.Status.SavePath, file.Name)) src := filepath.FromSlash(path.Join(m.state.Status.SavePath, file.Name))
@ -446,12 +440,21 @@ func (m *RemoteDownloadTask) masterTransfer(ctx context.Context, dep dependency.
progressKey := fmt.Sprintf("%s%d", ProgressTypeUploadSinglePrefix, workerId) progressKey := fmt.Sprintf("%s%d", ProgressTypeUploadSinglePrefix, workerId)
m.Lock() m.Lock()
m.progress[progressKey] = &queue.Progress{Identifier: dst.String(), Total: file.Size} m.progress[progressKey] = &queue.Progress{Identifier: dst.String(), Total: file.Size}
fileProgress := m.progress[progressKey]
uploadProgress := m.progress[ProgressTypeUpload]
uploadCountProgress := m.progress[ProgressTypeUploadCount]
m.Unlock() m.Unlock()
defer func() {
atomic.AddInt64(&uploadCountProgress.Current, 1)
worker <- workerId
wg.Done()
}()
fileStream, err := os.Open(src) fileStream, err := os.Open(src)
if err != nil { if err != nil {
m.l.Warning("Failed to open file %s: %s", src, err.Error()) m.l.Warning("Failed to open file %s: %s", src, err.Error())
atomic.AddInt64(&m.progress[ProgressTypeUpload].Current, file.Size) atomic.AddInt64(&uploadProgress.Current, file.Size)
atomic.AddInt64(&failed, 1) atomic.AddInt64(&failed, 1)
ae.Add(file.Name, fmt.Errorf("failed to open file: %w", err)) ae.Add(file.Name, fmt.Errorf("failed to open file: %w", err))
return return
@ -465,8 +468,8 @@ func (m *RemoteDownloadTask) masterTransfer(ctx context.Context, dep dependency.
Size: file.Size, Size: file.Size,
}, },
ProgressFunc: func(current, diff int64, total int64) { ProgressFunc: func(current, diff int64, total int64) {
atomic.AddInt64(&m.progress[progressKey].Current, diff) atomic.AddInt64(&fileProgress.Current, diff)
atomic.AddInt64(&m.progress[ProgressTypeUpload].Current, diff) atomic.AddInt64(&uploadProgress.Current, diff)
}, },
File: fileStream, File: fileStream,
} }
@ -475,7 +478,7 @@ func (m *RemoteDownloadTask) masterTransfer(ctx context.Context, dep dependency.
if err != nil { if err != nil {
m.l.Warning("Failed to upload file %s: %s", src, err.Error()) m.l.Warning("Failed to upload file %s: %s", src, err.Error())
atomic.AddInt64(&failed, 1) atomic.AddInt64(&failed, 1)
atomic.AddInt64(&m.progress[ProgressTypeUpload].Current, file.Size) atomic.AddInt64(&uploadProgress.Current, file.Size)
ae.Add(file.Name, fmt.Errorf("failed to upload file: %w", err)) ae.Add(file.Name, fmt.Errorf("failed to upload file: %w", err))
return return
} }
@ -490,8 +493,10 @@ func (m *RemoteDownloadTask) masterTransfer(ctx context.Context, dep dependency.
// Check if file is already transferred // Check if file is already transferred
if _, ok := m.state.Transferred[file.Index]; ok { if _, ok := m.state.Transferred[file.Index]; ok {
m.l.Info("File %s already transferred, skipping...", file.Name) m.l.Info("File %s already transferred, skipping...", file.Name)
m.Lock()
atomic.AddInt64(&m.progress[ProgressTypeUpload].Current, file.Size) atomic.AddInt64(&m.progress[ProgressTypeUpload].Current, file.Size)
atomic.AddInt64(&m.progress[ProgressTypeUploadCount].Current, 1) atomic.AddInt64(&m.progress[ProgressTypeUploadCount].Current, 1)
m.Unlock()
continue continue
} }
@ -625,19 +630,18 @@ func (m *RemoteDownloadTask) Progress(ctx context.Context) queue.Progresses {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
if m.state.NodeState.progress != nil { merged := make(queue.Progresses)
merged := make(queue.Progresses) for k, v := range m.progress {
for k, v := range m.progress { merged[k] = v
merged[k] = v }
}
if m.state.NodeState.progress != nil {
for k, v := range m.state.NodeState.progress { for k, v := range m.state.NodeState.progress {
merged[k] = v merged[k] = v
} }
return merged
} }
return m.progress
return merged
} }
func sanitizeFileName(name string) string { func sanitizeFileName(name string) string {

View File

@ -115,23 +115,26 @@ func (t *SlaveUploadTask) Do(ctx context.Context) (task.Status, error) {
atomic.StoreInt64(&t.progress[ProgressTypeUpload].Total, totalSize) atomic.StoreInt64(&t.progress[ProgressTypeUpload].Total, totalSize)
ae := serializer.NewAggregateError() ae := serializer.NewAggregateError()
transferFunc := func(workerId, fileId int, file SlaveUploadEntity) { transferFunc := func(workerId, fileId int, file SlaveUploadEntity) {
defer func() {
atomic.AddInt64(&t.progress[ProgressTypeUploadCount].Current, 1)
worker <- workerId
wg.Done()
}()
t.l.Info("Uploading file %s to %s...", file.Src, file.Uri.String()) t.l.Info("Uploading file %s to %s...", file.Src, file.Uri.String())
progressKey := fmt.Sprintf("%s%d", ProgressTypeUploadSinglePrefix, workerId) progressKey := fmt.Sprintf("%s%d", ProgressTypeUploadSinglePrefix, workerId)
t.Lock() t.Lock()
t.progress[progressKey] = &queue.Progress{Identifier: file.Uri.String(), Total: file.Size} t.progress[progressKey] = &queue.Progress{Identifier: file.Uri.String(), Total: file.Size}
fileProgress := t.progress[progressKey]
uploadProgress := t.progress[ProgressTypeUpload]
uploadCountProgress := t.progress[ProgressTypeUploadCount]
t.Unlock() t.Unlock()
defer func() {
atomic.AddInt64(&uploadCountProgress.Current, 1)
worker <- workerId
wg.Done()
}()
handle, err := os.Open(filepath.FromSlash(file.Src)) handle, err := os.Open(filepath.FromSlash(file.Src))
if err != nil { if err != nil {
t.l.Warning("Failed to open file %s: %s", file.Src, err.Error()) t.l.Warning("Failed to open file %s: %s", file.Src, err.Error())
atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, file.Size) atomic.AddInt64(&fileProgress.Current, file.Size)
ae.Add(path.Base(file.Src), fmt.Errorf("failed to open file: %w", err)) ae.Add(path.Base(file.Src), fmt.Errorf("failed to open file: %w", err))
return return
} }
@ -140,7 +143,7 @@ func (t *SlaveUploadTask) Do(ctx context.Context) (task.Status, error) {
if err != nil { if err != nil {
t.l.Warning("Failed to get file stat for %s: %s", file.Src, err.Error()) t.l.Warning("Failed to get file stat for %s: %s", file.Src, err.Error())
handle.Close() handle.Close()
atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, file.Size) atomic.AddInt64(&fileProgress.Current, file.Size)
ae.Add(path.Base(file.Src), fmt.Errorf("failed to get file stat: %w", err)) ae.Add(path.Base(file.Src), fmt.Errorf("failed to get file stat: %w", err))
return return
} }
@ -151,9 +154,9 @@ func (t *SlaveUploadTask) Do(ctx context.Context) (task.Status, error) {
Size: stat.Size(), Size: stat.Size(),
}, },
ProgressFunc: func(current, diff int64, total int64) { ProgressFunc: func(current, diff int64, total int64) {
atomic.AddInt64(&t.progress[progressKey].Current, diff) atomic.AddInt64(&fileProgress.Current, diff)
atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, diff) atomic.AddInt64(&uploadCountProgress.Current, 1)
atomic.StoreInt64(&t.progress[progressKey].Total, total) atomic.StoreInt64(&fileProgress.Total, total)
}, },
File: handle, File: handle,
Seeker: handle, Seeker: handle,
@ -163,7 +166,7 @@ func (t *SlaveUploadTask) Do(ctx context.Context) (task.Status, error) {
if err != nil { if err != nil {
handle.Close() handle.Close()
t.l.Warning("Failed to upload file %s: %s", file.Src, err.Error()) t.l.Warning("Failed to upload file %s: %s", file.Src, err.Error())
atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, file.Size) atomic.AddInt64(&uploadProgress.Current, file.Size)
ae.Add(path.Base(file.Src), fmt.Errorf("failed to upload file: %w", err)) ae.Add(path.Base(file.Src), fmt.Errorf("failed to upload file: %w", err))
return return
} }
@ -179,8 +182,10 @@ func (t *SlaveUploadTask) Do(ctx context.Context) (task.Status, error) {
// Check if file is already transferred // Check if file is already transferred
if _, ok := t.state.Transferred[fileId]; ok { if _, ok := t.state.Transferred[fileId]; ok {
t.l.Info("File %s already transferred, skipping...", file.Src) t.l.Info("File %s already transferred, skipping...", file.Src)
t.Lock()
atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, file.Size) atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, file.Size)
atomic.AddInt64(&t.progress[ProgressTypeUploadCount].Current, 1) atomic.AddInt64(&t.progress[ProgressTypeUploadCount].Current, 1)
t.Unlock()
continue continue
} }
@ -221,5 +226,9 @@ func (m *SlaveUploadTask) Progress(ctx context.Context) queue.Progresses {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
return m.progress res := make(queue.Progresses)
for k, v := range m.progress {
res[k] = v
}
return res
} }