feat: customize workers and retry of task (close #5493 fix #5274)

pull/5554/head
Andy Hsu 2023-11-21 15:51:08 +08:00
parent 11a30c5044
commit 7583c4d734
8 changed files with 82 additions and 31 deletions

View File

@ -2,6 +2,7 @@ package cmd
import (
"context"
"errors"
"fmt"
"net"
"net/http"
@ -36,6 +37,7 @@ the address is defined in config file`,
}
bootstrap.InitOfflineDownloadTools()
bootstrap.LoadStorages()
bootstrap.InitTaskManager()
if !flags.Debug && !flags.Dev {
gin.SetMode(gin.ReleaseMode)
}
@ -49,7 +51,7 @@ the address is defined in config file`,
httpSrv = &http.Server{Addr: httpBase, Handler: r}
go func() {
err := httpSrv.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
if err != nil && !errors.Is(err, http.ErrServerClosed) {
utils.Log.Fatalf("failed to start http: %s", err.Error())
}
}()
@ -60,7 +62,7 @@ the address is defined in config file`,
httpsSrv = &http.Server{Addr: httpsBase, Handler: r}
go func() {
err := httpsSrv.ListenAndServeTLS(conf.Conf.Scheme.CertFile, conf.Conf.Scheme.KeyFile)
if err != nil && err != http.ErrServerClosed {
if err != nil && !errors.Is(err, http.ErrServerClosed) {
utils.Log.Fatalf("failed to start https: %s", err.Error())
}
}()
@ -84,7 +86,7 @@ the address is defined in config file`,
}
}
err = unixSrv.Serve(listener)
if err != nil && err != http.ErrServerClosed {
if err != nil && !errors.Is(err, http.ErrServerClosed) {
utils.Log.Fatalf("failed to start unix: %s", err.Error())
}
}()

View File

@ -0,0 +1,15 @@
package bootstrap
import (
"github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/fs"
"github.com/alist-org/alist/v3/internal/offline_download/tool"
"github.com/xhofe/tache"
)
func InitTaskManager() {
fs.UploadTaskManager = tache.NewManager[*fs.UploadTask](tache.WithWorks(conf.Conf.Tasks.Upload.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Upload.MaxRetry))
fs.CopyTaskManager = tache.NewManager[*fs.CopyTask](tache.WithWorks(conf.Conf.Tasks.Copy.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry))
tool.DownloadTaskManager = tache.NewManager[*tool.DownloadTask](tache.WithWorks(conf.Conf.Tasks.Download.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Download.MaxRetry))
tool.TransferTaskManager = tache.NewManager[*tool.TransferTask](tache.WithWorks(conf.Conf.Tasks.Transfer.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Transfer.MaxRetry))
}

View File

@ -8,15 +8,15 @@ import (
)
type Database struct {
Type string `json:"type" env:"DB_TYPE"`
Host string `json:"host" env:"DB_HOST"`
Port int `json:"port" env:"DB_PORT"`
User string `json:"user" env:"DB_USER"`
Password string `json:"password" env:"DB_PASS"`
Name string `json:"name" env:"DB_NAME"`
DBFile string `json:"db_file" env:"DB_FILE"`
TablePrefix string `json:"table_prefix" env:"DB_TABLE_PREFIX"`
SSLMode string `json:"ssl_mode" env:"DB_SSL_MODE"`
Type string `json:"type" env:"TYPE"`
Host string `json:"host" env:"HOST"`
Port int `json:"port" env:"PORT"`
User string `json:"user" env:"USER"`
Password string `json:"password" env:"PASS"`
Name string `json:"name" env:"NAME"`
DBFile string `json:"db_file" env:"FILE"`
TablePrefix string `json:"table_prefix" env:"TABLE_PREFIX"`
SSLMode string `json:"ssl_mode" env:"SSL_MODE"`
}
type Scheme struct {
@ -39,21 +39,34 @@ type LogConfig struct {
Compress bool `json:"compress" env:"COMPRESS"`
}
type TaskConfig struct {
Workers int `json:"workers" env:"WORKERS"`
MaxRetry int `json:"max_retry" env:"MAX_RETRY"`
}
type TasksConfig struct {
Download TaskConfig `json:"download" envPrefix:"DOWNLOAD_"`
Transfer TaskConfig `json:"transfer" envPrefix:"TRANSFER_"`
Upload TaskConfig `json:"upload" envPrefix:"UPLOAD_"`
Copy TaskConfig `json:"copy" envPrefix:"COPY_"`
}
type Config struct {
Force bool `json:"force" env:"FORCE"`
SiteURL string `json:"site_url" env:"SITE_URL"`
Cdn string `json:"cdn" env:"CDN"`
JwtSecret string `json:"jwt_secret" env:"JWT_SECRET"`
TokenExpiresIn int `json:"token_expires_in" env:"TOKEN_EXPIRES_IN"`
Database Database `json:"database"`
Scheme Scheme `json:"scheme"`
TempDir string `json:"temp_dir" env:"TEMP_DIR"`
BleveDir string `json:"bleve_dir" env:"BLEVE_DIR"`
DistDir string `json:"dist_dir"`
Log LogConfig `json:"log"`
DelayedStart int `json:"delayed_start" env:"DELAYED_START"`
MaxConnections int `json:"max_connections" env:"MAX_CONNECTIONS"`
TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify" env:"TLS_INSECURE_SKIP_VERIFY"`
Force bool `json:"force" env:"FORCE"`
SiteURL string `json:"site_url" env:"SITE_URL"`
Cdn string `json:"cdn" env:"CDN"`
JwtSecret string `json:"jwt_secret" env:"JWT_SECRET"`
TokenExpiresIn int `json:"token_expires_in" env:"TOKEN_EXPIRES_IN"`
Database Database `json:"database" envPrefix:"DB_"`
Scheme Scheme `json:"scheme"`
TempDir string `json:"temp_dir" env:"TEMP_DIR"`
BleveDir string `json:"bleve_dir" env:"BLEVE_DIR"`
DistDir string `json:"dist_dir"`
Log LogConfig `json:"log"`
DelayedStart int `json:"delayed_start" env:"DELAYED_START"`
MaxConnections int `json:"max_connections" env:"MAX_CONNECTIONS"`
TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify" env:"TLS_INSECURE_SKIP_VERIFY"`
Tasks TasksConfig `json:"tasks" envPrefix:"TASKS_"`
}
func DefaultConfig() *Config {
@ -90,5 +103,22 @@ func DefaultConfig() *Config {
},
MaxConnections: 0,
TlsInsecureSkipVerify: true,
Tasks: TasksConfig{
Download: TaskConfig{
Workers: 5,
MaxRetry: 1,
},
Transfer: TaskConfig{
Workers: 5,
MaxRetry: 2,
},
Upload: TaskConfig{
Workers: 5,
},
Copy: TaskConfig{
Workers: 5,
MaxRetry: 2,
},
},
}
}

View File

@ -35,7 +35,7 @@ func (t *CopyTask) Run() error {
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.srcObjPath, t.dstDirPath)
}
var CopyTaskManager = tache.NewManager[*CopyTask]()
var CopyTaskManager *tache.Manager[*CopyTask]
// Copy if in the same storage, call move method
// if not, add copy task

View File

@ -30,7 +30,7 @@ func (t *UploadTask) Run() error {
return op.Put(t.Ctx(), t.storage, t.dstDirActualPath, t.file, t.SetProgress, true)
}
var UploadTaskManager = tache.NewManager[*UploadTask]()
var UploadTaskManager *tache.Manager[*UploadTask]
// putAsTask add as a put task and return immediately
func putAsTask(dstDirPath string, file model.FileStreamer) error {

View File

@ -143,5 +143,5 @@ func (t *DownloadTask) GetStatus() string {
}
var (
DownloadTaskManager *tache.Manager[*DownloadTask] = tache.NewManager[*DownloadTask]()
DownloadTaskManager *tache.Manager[*DownloadTask]
)

View File

@ -62,5 +62,5 @@ func (t *TransferTask) GetStatus() string {
}
var (
TransferTaskManager *tache.Manager[*TransferTask] = tache.NewManager[*TransferTask]()
TransferTaskManager *tache.Manager[*TransferTask]
)

View File

@ -18,13 +18,17 @@ type TaskInfo struct {
}
func getTaskInfo[T tache.TaskWithInfo](task T) TaskInfo {
errMsg := ""
if task.GetErr() != nil {
errMsg = task.GetErr().Error()
}
return TaskInfo{
ID: task.GetID(),
Name: task.GetName(),
State: task.GetState(),
Status: task.GetStatus(),
Progress: task.GetProgress(),
Error: task.GetErr().Error(),
Error: errMsg,
}
}