diff --git a/cmd/server.go b/cmd/server.go index 0678e3e1..d03a9d80 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "errors" "fmt" "net" "net/http" @@ -36,6 +37,7 @@ the address is defined in config file`, } bootstrap.InitOfflineDownloadTools() bootstrap.LoadStorages() + bootstrap.InitTaskManager() if !flags.Debug && !flags.Dev { gin.SetMode(gin.ReleaseMode) } @@ -49,7 +51,7 @@ the address is defined in config file`, httpSrv = &http.Server{Addr: httpBase, Handler: r} go func() { err := httpSrv.ListenAndServe() - if err != nil && err != http.ErrServerClosed { + if err != nil && !errors.Is(err, http.ErrServerClosed) { utils.Log.Fatalf("failed to start http: %s", err.Error()) } }() @@ -60,7 +62,7 @@ the address is defined in config file`, httpsSrv = &http.Server{Addr: httpsBase, Handler: r} go func() { err := httpsSrv.ListenAndServeTLS(conf.Conf.Scheme.CertFile, conf.Conf.Scheme.KeyFile) - if err != nil && err != http.ErrServerClosed { + if err != nil && !errors.Is(err, http.ErrServerClosed) { utils.Log.Fatalf("failed to start https: %s", err.Error()) } }() @@ -84,7 +86,7 @@ the address is defined in config file`, } } err = unixSrv.Serve(listener) - if err != nil && err != http.ErrServerClosed { + if err != nil && !errors.Is(err, http.ErrServerClosed) { utils.Log.Fatalf("failed to start unix: %s", err.Error()) } }() diff --git a/internal/bootstrap/task.go b/internal/bootstrap/task.go new file mode 100644 index 00000000..5d52e9d2 --- /dev/null +++ b/internal/bootstrap/task.go @@ -0,0 +1,15 @@ +package bootstrap + +import ( + "github.com/alist-org/alist/v3/internal/conf" + "github.com/alist-org/alist/v3/internal/fs" + "github.com/alist-org/alist/v3/internal/offline_download/tool" + "github.com/xhofe/tache" +) + +func InitTaskManager() { + fs.UploadTaskManager = tache.NewManager[*fs.UploadTask](tache.WithWorks(conf.Conf.Tasks.Upload.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Upload.MaxRetry)) + fs.CopyTaskManager = tache.NewManager[*fs.CopyTask](tache.WithWorks(conf.Conf.Tasks.Copy.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry)) + tool.DownloadTaskManager = tache.NewManager[*tool.DownloadTask](tache.WithWorks(conf.Conf.Tasks.Download.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Download.MaxRetry)) + tool.TransferTaskManager = tache.NewManager[*tool.TransferTask](tache.WithWorks(conf.Conf.Tasks.Transfer.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Transfer.MaxRetry)) +} diff --git a/internal/conf/config.go b/internal/conf/config.go index de26e1fe..2754064c 100644 --- a/internal/conf/config.go +++ b/internal/conf/config.go @@ -8,15 +8,15 @@ import ( ) type Database struct { - Type string `json:"type" env:"DB_TYPE"` - Host string `json:"host" env:"DB_HOST"` - Port int `json:"port" env:"DB_PORT"` - User string `json:"user" env:"DB_USER"` - Password string `json:"password" env:"DB_PASS"` - Name string `json:"name" env:"DB_NAME"` - DBFile string `json:"db_file" env:"DB_FILE"` - TablePrefix string `json:"table_prefix" env:"DB_TABLE_PREFIX"` - SSLMode string `json:"ssl_mode" env:"DB_SSL_MODE"` + Type string `json:"type" env:"TYPE"` + Host string `json:"host" env:"HOST"` + Port int `json:"port" env:"PORT"` + User string `json:"user" env:"USER"` + Password string `json:"password" env:"PASS"` + Name string `json:"name" env:"NAME"` + DBFile string `json:"db_file" env:"FILE"` + TablePrefix string `json:"table_prefix" env:"TABLE_PREFIX"` + SSLMode string `json:"ssl_mode" env:"SSL_MODE"` } type Scheme struct { @@ -39,21 +39,34 @@ type LogConfig struct { Compress bool `json:"compress" env:"COMPRESS"` } +type TaskConfig struct { + Workers int `json:"workers" env:"WORKERS"` + MaxRetry int `json:"max_retry" env:"MAX_RETRY"` +} + +type TasksConfig struct { + Download TaskConfig `json:"download" envPrefix:"DOWNLOAD_"` + Transfer TaskConfig `json:"transfer" envPrefix:"TRANSFER_"` + Upload TaskConfig `json:"upload" envPrefix:"UPLOAD_"` + Copy TaskConfig `json:"copy" envPrefix:"COPY_"` +} + type Config struct { - Force bool `json:"force" env:"FORCE"` - SiteURL string `json:"site_url" env:"SITE_URL"` - Cdn string `json:"cdn" env:"CDN"` - JwtSecret string `json:"jwt_secret" env:"JWT_SECRET"` - TokenExpiresIn int `json:"token_expires_in" env:"TOKEN_EXPIRES_IN"` - Database Database `json:"database"` - Scheme Scheme `json:"scheme"` - TempDir string `json:"temp_dir" env:"TEMP_DIR"` - BleveDir string `json:"bleve_dir" env:"BLEVE_DIR"` - DistDir string `json:"dist_dir"` - Log LogConfig `json:"log"` - DelayedStart int `json:"delayed_start" env:"DELAYED_START"` - MaxConnections int `json:"max_connections" env:"MAX_CONNECTIONS"` - TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify" env:"TLS_INSECURE_SKIP_VERIFY"` + Force bool `json:"force" env:"FORCE"` + SiteURL string `json:"site_url" env:"SITE_URL"` + Cdn string `json:"cdn" env:"CDN"` + JwtSecret string `json:"jwt_secret" env:"JWT_SECRET"` + TokenExpiresIn int `json:"token_expires_in" env:"TOKEN_EXPIRES_IN"` + Database Database `json:"database" envPrefix:"DB_"` + Scheme Scheme `json:"scheme"` + TempDir string `json:"temp_dir" env:"TEMP_DIR"` + BleveDir string `json:"bleve_dir" env:"BLEVE_DIR"` + DistDir string `json:"dist_dir"` + Log LogConfig `json:"log"` + DelayedStart int `json:"delayed_start" env:"DELAYED_START"` + MaxConnections int `json:"max_connections" env:"MAX_CONNECTIONS"` + TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify" env:"TLS_INSECURE_SKIP_VERIFY"` + Tasks TasksConfig `json:"tasks" envPrefix:"TASKS_"` } func DefaultConfig() *Config { @@ -90,5 +103,22 @@ func DefaultConfig() *Config { }, MaxConnections: 0, TlsInsecureSkipVerify: true, + Tasks: TasksConfig{ + Download: TaskConfig{ + Workers: 5, + MaxRetry: 1, + }, + Transfer: TaskConfig{ + Workers: 5, + MaxRetry: 2, + }, + Upload: TaskConfig{ + Workers: 5, + }, + Copy: TaskConfig{ + Workers: 5, + MaxRetry: 2, + }, + }, } } diff --git a/internal/fs/copy.go b/internal/fs/copy.go index 911e528a..43e16396 100644 --- a/internal/fs/copy.go +++ b/internal/fs/copy.go @@ -35,7 +35,7 @@ func (t *CopyTask) Run() error { return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.srcObjPath, t.dstDirPath) } -var CopyTaskManager = tache.NewManager[*CopyTask]() +var CopyTaskManager *tache.Manager[*CopyTask] // Copy if in the same storage, call move method // if not, add copy task diff --git a/internal/fs/put.go b/internal/fs/put.go index 5c154756..43d41acf 100644 --- a/internal/fs/put.go +++ b/internal/fs/put.go @@ -30,7 +30,7 @@ func (t *UploadTask) Run() error { return op.Put(t.Ctx(), t.storage, t.dstDirActualPath, t.file, t.SetProgress, true) } -var UploadTaskManager = tache.NewManager[*UploadTask]() +var UploadTaskManager *tache.Manager[*UploadTask] // putAsTask add as a put task and return immediately func putAsTask(dstDirPath string, file model.FileStreamer) error { diff --git a/internal/offline_download/tool/download.go b/internal/offline_download/tool/download.go index 7b536762..36ab6c82 100644 --- a/internal/offline_download/tool/download.go +++ b/internal/offline_download/tool/download.go @@ -143,5 +143,5 @@ func (t *DownloadTask) GetStatus() string { } var ( - DownloadTaskManager *tache.Manager[*DownloadTask] = tache.NewManager[*DownloadTask]() + DownloadTaskManager *tache.Manager[*DownloadTask] ) diff --git a/internal/offline_download/tool/transfer.go b/internal/offline_download/tool/transfer.go index f7d1791d..c39e4ba0 100644 --- a/internal/offline_download/tool/transfer.go +++ b/internal/offline_download/tool/transfer.go @@ -62,5 +62,5 @@ func (t *TransferTask) GetStatus() string { } var ( - TransferTaskManager *tache.Manager[*TransferTask] = tache.NewManager[*TransferTask]() + TransferTaskManager *tache.Manager[*TransferTask] ) diff --git a/server/handles/task.go b/server/handles/task.go index 1193ce05..acfa1b02 100644 --- a/server/handles/task.go +++ b/server/handles/task.go @@ -18,13 +18,17 @@ type TaskInfo struct { } func getTaskInfo[T tache.TaskWithInfo](task T) TaskInfo { + errMsg := "" + if task.GetErr() != nil { + errMsg = task.GetErr().Error() + } return TaskInfo{ ID: task.GetID(), Name: task.GetName(), State: task.GetState(), Status: task.GetStatus(), Progress: task.GetProgress(), - Error: task.GetErr().Error(), + Error: errMsg, } }