From d5ec998699dd592e3ee7f54cf5bcce7dc697c173 Mon Sep 17 00:00:00 2001 From: KirCute_ECT <951206789@qq.com> Date: Mon, 27 Jan 2025 20:18:10 +0800 Subject: [PATCH] feat(task): allow retry canceled (#7852) --- internal/conf/config.go | 14 ++++++++------ internal/fs/archive.go | 6 +++++- internal/fs/copy.go | 1 + internal/offline_download/tool/download.go | 1 + internal/offline_download/tool/transfer.go | 1 + internal/task/base.go | 15 +++++++++++++++ 6 files changed, 31 insertions(+), 7 deletions(-) diff --git a/internal/conf/config.go b/internal/conf/config.go index 39b23227..1766ae84 100644 --- a/internal/conf/config.go +++ b/internal/conf/config.go @@ -53,12 +53,13 @@ type TaskConfig struct { } type TasksConfig struct { - Download TaskConfig `json:"download" envPrefix:"DOWNLOAD_"` - Transfer TaskConfig `json:"transfer" envPrefix:"TRANSFER_"` - Upload TaskConfig `json:"upload" envPrefix:"UPLOAD_"` - Copy TaskConfig `json:"copy" envPrefix:"COPY_"` - Decompress TaskConfig `json:"decompress" envPrefix:"DECOMPRESS_"` - DecompressUpload TaskConfig `json:"decompress_upload" envPrefix:"DECOMPRESS_UPLOAD_"` + Download TaskConfig `json:"download" envPrefix:"DOWNLOAD_"` + Transfer TaskConfig `json:"transfer" envPrefix:"TRANSFER_"` + Upload TaskConfig `json:"upload" envPrefix:"UPLOAD_"` + Copy TaskConfig `json:"copy" envPrefix:"COPY_"` + Decompress TaskConfig `json:"decompress" envPrefix:"DECOMPRESS_"` + DecompressUpload TaskConfig `json:"decompress_upload" envPrefix:"DECOMPRESS_UPLOAD_"` + AllowRetryCanceled bool `json:"allow_retry_canceled" env:"ALLOW_RETRY_CANCELED"` } type Cors struct { @@ -182,6 +183,7 @@ func DefaultConfig() *Config { Workers: 5, MaxRetry: 2, }, + AllowRetryCanceled: false, }, Cors: Cors{ AllowOrigins: []string{"*"}, diff --git a/internal/fs/archive.go b/internal/fs/archive.go index f3e05926..39131827 100644 --- a/internal/fs/archive.go +++ b/internal/fs/archive.go @@ -50,6 +50,7 @@ func (t *ArchiveDownloadTask) GetStatus() string { } func (t *ArchiveDownloadTask) Run() error { + t.ReinitCtx() t.ClearEndTime() t.SetStartTime(time.Now()) defer func() { t.SetEndTime(time.Now()) }() @@ -144,6 +145,7 @@ func (t *ArchiveContentUploadTask) GetStatus() string { } func (t *ArchiveContentUploadTask) Run() error { + t.ReinitCtx() t.ClearEndTime() t.SetStartTime(time.Now()) defer func() { t.SetEndTime(time.Now()) }() @@ -235,7 +237,9 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi func (t *ArchiveContentUploadTask) Cancel() { t.TaskExtension.Cancel() - t.deleteSrcFile() + if !conf.Conf.Tasks.AllowRetryCanceled { + t.deleteSrcFile() + } } func (t *ArchiveContentUploadTask) deleteSrcFile() { diff --git a/internal/fs/copy.go b/internal/fs/copy.go index 977f7280..155e3cf7 100644 --- a/internal/fs/copy.go +++ b/internal/fs/copy.go @@ -39,6 +39,7 @@ func (t *CopyTask) GetStatus() string { } func (t *CopyTask) Run() error { + t.ReinitCtx() t.ClearEndTime() t.SetStartTime(time.Now()) defer func() { t.SetEndTime(time.Now()) }() diff --git a/internal/offline_download/tool/download.go b/internal/offline_download/tool/download.go index c3b30f1b..42b2dbfb 100644 --- a/internal/offline_download/tool/download.go +++ b/internal/offline_download/tool/download.go @@ -28,6 +28,7 @@ type DownloadTask struct { } func (t *DownloadTask) Run() error { + t.ReinitCtx() t.ClearEndTime() t.SetStartTime(time.Now()) defer func() { t.SetEndTime(time.Now()) }() diff --git a/internal/offline_download/tool/transfer.go b/internal/offline_download/tool/transfer.go index 8c7ab244..1d5ece61 100644 --- a/internal/offline_download/tool/transfer.go +++ b/internal/offline_download/tool/transfer.go @@ -32,6 +32,7 @@ type TransferTask struct { } func (t *TransferTask) Run() error { + t.ReinitCtx() t.ClearEndTime() t.SetStartTime(time.Now()) defer func() { t.SetEndTime(time.Now()) }() diff --git a/internal/task/base.go b/internal/task/base.go index 22b16741..c3703bd1 100644 --- a/internal/task/base.go +++ b/internal/task/base.go @@ -2,6 +2,7 @@ package task import ( "context" + "github.com/alist-org/alist/v3/internal/conf" "github.com/alist-org/alist/v3/internal/model" "github.com/xhofe/tache" "sync" @@ -66,6 +67,20 @@ func (t *TaskExtension) Ctx() context.Context { return t.ctx } +func (t *TaskExtension) ReinitCtx() { + if !conf.Conf.Tasks.AllowRetryCanceled { + return + } + select { + case <-t.Base.Ctx().Done(): + ctx, cancel := context.WithCancel(context.Background()) + t.SetCtx(ctx) + t.SetCancelFunc(cancel) + t.ctx = nil + default: + } +} + type TaskExtensionInfo interface { tache.TaskWithInfo GetCreator() *model.User