feat: persistant Task (#6925 close #5313)

pull/6958/head
itsHenry 2024-08-07 12:16:21 +08:00 committed by GitHub
parent f2727095d9
commit 74f8295960
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 201 additions and 56 deletions

2
go.mod
View File

@ -55,7 +55,7 @@ require (
github.com/u2takey/ffmpeg-go v0.5.0 github.com/u2takey/ffmpeg-go v0.5.0
github.com/upyun/go-sdk/v3 v3.0.4 github.com/upyun/go-sdk/v3 v3.0.4
github.com/winfsp/cgofuse v1.5.1-0.20230130140708-f87f5db493b5 github.com/winfsp/cgofuse v1.5.1-0.20230130140708-f87f5db493b5
github.com/xhofe/tache v0.1.1 github.com/xhofe/tache v0.1.2
github.com/xhofe/wopan-sdk-go v0.1.3 github.com/xhofe/wopan-sdk-go v0.1.3
github.com/zzzhr1990/go-common-entity v0.0.0-20221216044934-fd1c571e3a22 github.com/zzzhr1990/go-common-entity v0.0.0-20221216044934-fd1c571e3a22
golang.org/x/crypto v0.25.0 golang.org/x/crypto v0.25.0

2
go.sum
View File

@ -506,6 +506,8 @@ github.com/xhofe/gsync v0.0.0-20230917091818-2111ceb38a25 h1:eDfebW/yfq9DtG9RO3K
github.com/xhofe/gsync v0.0.0-20230917091818-2111ceb38a25/go.mod h1:fH4oNm5F9NfI5dLi0oIMtsLNKQOirUDbEMCIBb/7SU0= github.com/xhofe/gsync v0.0.0-20230917091818-2111ceb38a25/go.mod h1:fH4oNm5F9NfI5dLi0oIMtsLNKQOirUDbEMCIBb/7SU0=
github.com/xhofe/tache v0.1.1 h1:O5QY4cVjIGELx3UGh6LbVAc18MWGXgRNQjMt72x6w/8= github.com/xhofe/tache v0.1.1 h1:O5QY4cVjIGELx3UGh6LbVAc18MWGXgRNQjMt72x6w/8=
github.com/xhofe/tache v0.1.1/go.mod h1:iKumPFvywf30FRpAHHCt64G0JHLMzT0K+wyGedHsmTQ= github.com/xhofe/tache v0.1.1/go.mod h1:iKumPFvywf30FRpAHHCt64G0JHLMzT0K+wyGedHsmTQ=
github.com/xhofe/tache v0.1.2 h1:pHrXlrWcbTb4G7hVUDW7Rc+YTUnLJvnLBrdktVE1Fqg=
github.com/xhofe/tache v0.1.2/go.mod h1:iKumPFvywf30FRpAHHCt64G0JHLMzT0K+wyGedHsmTQ=
github.com/xhofe/wopan-sdk-go v0.1.3 h1:J58X6v+n25ewBZjb05pKOr7AWGohb+Rdll4CThGh6+A= github.com/xhofe/wopan-sdk-go v0.1.3 h1:J58X6v+n25ewBZjb05pKOr7AWGohb+Rdll4CThGh6+A=
github.com/xhofe/wopan-sdk-go v0.1.3/go.mod h1:dcY9yA28fnaoZPnXZiVTFSkcd7GnIPTpTIIlfSI5z5Q= github.com/xhofe/wopan-sdk-go v0.1.3/go.mod h1:dcY9yA28fnaoZPnXZiVTFSkcd7GnIPTpTIIlfSI5z5Q=
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no=

View File

@ -68,11 +68,7 @@ func InitConfig() {
} }
conf.Conf.TempDir = absPath conf.Conf.TempDir = absPath
} }
err := os.RemoveAll(filepath.Join(conf.Conf.TempDir)) err := os.MkdirAll(conf.Conf.TempDir, 0o777)
if err != nil {
log.Errorln("failed delete temp file:", err)
}
err = os.MkdirAll(conf.Conf.TempDir, 0o777)
if err != nil { if err != nil {
log.Fatalf("create temp dir error: %+v", err) log.Fatalf("create temp dir error: %+v", err)
} }
@ -104,3 +100,9 @@ func initURL() {
} }
conf.URL = u conf.URL = u
} }
func CleanTempDir() {
if err := os.RemoveAll(conf.Conf.TempDir); err != nil {
log.Errorln("failed delete temp file: ", err)
}
}

View File

@ -5,6 +5,7 @@ import "github.com/alist-org/alist/v3/cmd/flags"
func InitData() { func InitData() {
initUser() initUser()
initSettings() initSettings()
initTasks()
if flags.Dev { if flags.Dev {
initDevData() initDevData()
initDevDo() initDevDo()

View File

@ -0,0 +1,29 @@
package data
import (
"github.com/alist-org/alist/v3/internal/db"
"github.com/alist-org/alist/v3/internal/model"
)
var initialTaskItems []model.TaskItem
func initTasks() {
InitialTasks()
for i := range initialTaskItems {
item := &initialTaskItems[i]
taskitem, _ := db.GetTaskDataByType(item.Key)
if taskitem == nil {
db.CreateTaskData(item)
}
}
}
func InitialTasks() []model.TaskItem {
initialTaskItems = []model.TaskItem{
{Key: "copy", PersistData: "[]"},
{Key: "download", PersistData: "[]"},
{Key: "transfer", PersistData: "[]"},
}
return initialTaskItems
}

View File

@ -2,14 +2,18 @@ package bootstrap
import ( import (
"github.com/alist-org/alist/v3/internal/conf" "github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/db"
"github.com/alist-org/alist/v3/internal/fs" "github.com/alist-org/alist/v3/internal/fs"
"github.com/alist-org/alist/v3/internal/offline_download/tool" "github.com/alist-org/alist/v3/internal/offline_download/tool"
"github.com/xhofe/tache" "github.com/xhofe/tache"
) )
func InitTaskManager() { func InitTaskManager() {
fs.UploadTaskManager = tache.NewManager[*fs.UploadTask](tache.WithWorks(conf.Conf.Tasks.Upload.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Upload.MaxRetry)) fs.UploadTaskManager = tache.NewManager[*fs.UploadTask](tache.WithWorks(conf.Conf.Tasks.Upload.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Upload.MaxRetry)) //upload will not support persist
fs.CopyTaskManager = tache.NewManager[*fs.CopyTask](tache.WithWorks(conf.Conf.Tasks.Copy.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry)) fs.CopyTaskManager = tache.NewManager[*fs.CopyTask](tache.WithWorks(conf.Conf.Tasks.Copy.Workers), tache.WithPersistFunction(db.GetTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant), db.UpdateTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry))
tool.DownloadTaskManager = tache.NewManager[*tool.DownloadTask](tache.WithWorks(conf.Conf.Tasks.Download.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Download.MaxRetry)) tool.DownloadTaskManager = tache.NewManager[*tool.DownloadTask](tache.WithWorks(conf.Conf.Tasks.Download.Workers), tache.WithPersistFunction(db.GetTaskDataFunc("download", conf.Conf.Tasks.Download.TaskPersistant), db.UpdateTaskDataFunc("download", conf.Conf.Tasks.Download.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Download.MaxRetry))
tool.TransferTaskManager = tache.NewManager[*tool.TransferTask](tache.WithWorks(conf.Conf.Tasks.Transfer.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Transfer.MaxRetry)) tool.TransferTaskManager = tache.NewManager[*tool.TransferTask](tache.WithWorks(conf.Conf.Tasks.Transfer.Workers), tache.WithPersistFunction(db.GetTaskDataFunc("transfer", conf.Conf.Tasks.Transfer.TaskPersistant), db.UpdateTaskDataFunc("transfer", conf.Conf.Tasks.Transfer.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Transfer.MaxRetry))
if len(tool.TransferTaskManager.GetAll()) == 0 { //prevent offline downloaded files from being deleted
CleanTempDir()
}
} }

View File

@ -49,6 +49,7 @@ type LogConfig struct {
type TaskConfig struct { type TaskConfig struct {
Workers int `json:"workers" env:"WORKERS"` Workers int `json:"workers" env:"WORKERS"`
MaxRetry int `json:"max_retry" env:"MAX_RETRY"` MaxRetry int `json:"max_retry" env:"MAX_RETRY"`
TaskPersistant bool `json:"task_persistant" env:"TASK_PERSISTANT"`
} }
type TasksConfig struct { type TasksConfig struct {
@ -132,10 +133,12 @@ func DefaultConfig() *Config {
Download: TaskConfig{ Download: TaskConfig{
Workers: 5, Workers: 5,
MaxRetry: 1, MaxRetry: 1,
TaskPersistant: true,
}, },
Transfer: TaskConfig{ Transfer: TaskConfig{
Workers: 5, Workers: 5,
MaxRetry: 2, MaxRetry: 2,
TaskPersistant: true,
}, },
Upload: TaskConfig{ Upload: TaskConfig{
Workers: 5, Workers: 5,
@ -143,6 +146,7 @@ func DefaultConfig() *Config {
Copy: TaskConfig{ Copy: TaskConfig{
Workers: 5, Workers: 5,
MaxRetry: 2, MaxRetry: 2,
TaskPersistant: true,
}, },
}, },
Cors: Cors{ Cors: Cors{

View File

@ -12,7 +12,7 @@ var db *gorm.DB
func Init(d *gorm.DB) { func Init(d *gorm.DB) {
db = d db = d
err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode)) err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode), new(model.TaskItem))
if err != nil { if err != nil {
log.Fatalf("failed migrate database: %s", err.Error()) log.Fatalf("failed migrate database: %s", err.Error())
} }

48
internal/db/tasks.go Normal file
View File

@ -0,0 +1,48 @@
package db
import (
"github.com/alist-org/alist/v3/internal/model"
"github.com/pkg/errors"
)
func GetTaskDataByType(type_s string) (*model.TaskItem, error) {
task := model.TaskItem{Key: type_s}
if err := db.Where(task).First(&task).Error; err != nil {
return nil, errors.Wrapf(err, "failed find task")
}
return &task, nil
}
func UpdateTaskData(t *model.TaskItem) error {
return errors.WithStack(db.Model(&model.TaskItem{}).Where("key = ?", t.Key).Update("persist_data", t.PersistData).Error)
}
func CreateTaskData(t *model.TaskItem) error {
return errors.WithStack(db.Create(t).Error)
}
func GetTaskDataFunc(type_s string, enabled bool) func() ([]byte, error) {
if !enabled {
return nil
}
task, err := GetTaskDataByType(type_s)
if err != nil {
return nil
}
return func() ([]byte, error) {
return []byte(task.PersistData), nil
}
}
func UpdateTaskDataFunc(type_s string, enabled bool) func([]byte) error {
if !enabled {
return nil
}
return func(data []byte) error {
s := string(data)
if s == "null" || s == "" {
s = "[]"
}
return UpdateTaskData(&model.TaskItem{Key: type_s, PersistData: s})
}
}

View File

@ -3,6 +3,9 @@ package fs
import ( import (
"context" "context"
"fmt" "fmt"
"net/http"
stdpath "path"
"github.com/alist-org/alist/v3/internal/conf" "github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/driver" "github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/model"
@ -11,20 +14,21 @@ import (
"github.com/alist-org/alist/v3/pkg/utils" "github.com/alist-org/alist/v3/pkg/utils"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/xhofe/tache" "github.com/xhofe/tache"
"net/http"
stdpath "path"
) )
type CopyTask struct { type CopyTask struct {
tache.Base tache.Base
Status string `json:"status"` Status string `json:"-"` //don't save status to save space
srcStorage, dstStorage driver.Driver SrcObjPath string `json:"src_path"`
srcObjPath, dstDirPath string DstDirPath string `json:"dst_path"`
srcStorage driver.Driver `json:"-"`
dstStorage driver.Driver `json:"-"`
SrcStorageMp string `json:"src_storage_mp"`
DstStorageMp string `json:"dst_storage_mp"`
} }
func (t *CopyTask) GetName() string { func (t *CopyTask) GetName() string {
return fmt.Sprintf("copy [%s](%s) to [%s](%s)", return fmt.Sprintf("copy [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath)
t.srcStorage.GetStorage().MountPath, t.srcObjPath, t.dstStorage.GetStorage().MountPath, t.dstDirPath)
} }
func (t *CopyTask) GetStatus() string { func (t *CopyTask) GetStatus() string {
@ -32,7 +36,17 @@ func (t *CopyTask) GetStatus() string {
} }
func (t *CopyTask) Run() error { func (t *CopyTask) Run() error {
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.srcObjPath, t.dstDirPath) var err error
if t.srcStorage == nil {
t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
}
if t.dstStorage == nil {
t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp)
}
if err != nil {
return errors.WithMessage(err, "failed get storage")
}
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath)
} }
var CopyTaskManager *tache.Manager[*CopyTask] var CopyTaskManager *tache.Manager[*CopyTask]
@ -81,8 +95,10 @@ func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool
t := &CopyTask{ t := &CopyTask{
srcStorage: srcStorage, srcStorage: srcStorage,
dstStorage: dstStorage, dstStorage: dstStorage,
srcObjPath: srcObjActualPath, SrcObjPath: srcObjActualPath,
dstDirPath: dstDirActualPath, DstDirPath: dstDirActualPath,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
} }
CopyTaskManager.Add(t) CopyTaskManager.Add(t)
return t, nil return t, nil
@ -109,8 +125,10 @@ func copyBetween2Storages(t *CopyTask, srcStorage, dstStorage driver.Driver, src
CopyTaskManager.Add(&CopyTask{ CopyTaskManager.Add(&CopyTask{
srcStorage: srcStorage, srcStorage: srcStorage,
dstStorage: dstStorage, dstStorage: dstStorage,
srcObjPath: srcObjPath, SrcObjPath: srcObjPath,
dstDirPath: dstObjPath, DstDirPath: dstObjPath,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
}) })
} }
t.Status = "src object is dir, added all copy tasks of objs" t.Status = "src object is dir, added all copy tasks of objs"

6
internal/model/task.go Normal file
View File

@ -0,0 +1,6 @@
package model
type TaskItem struct {
Key string `json:"key"`
PersistData string `gorm:"type:text" json:"persist_data"`
}

View File

@ -76,6 +76,7 @@ func AddURL(ctx context.Context, args *AddURLArgs) (tache.TaskWithInfo, error) {
DstDirPath: args.DstDirPath, DstDirPath: args.DstDirPath,
TempDir: tempDir, TempDir: tempDir,
DeletePolicy: deletePolicy, DeletePolicy: deletePolicy,
Toolname: args.Tool,
tool: tool, tool: tool,
} }
DownloadTaskManager.Add(t) DownloadTaskManager.Add(t)

View File

@ -18,8 +18,8 @@ type DownloadTask struct {
DstDirPath string `json:"dst_dir_path"` DstDirPath string `json:"dst_dir_path"`
TempDir string `json:"temp_dir"` TempDir string `json:"temp_dir"`
DeletePolicy DeletePolicy `json:"delete_policy"` DeletePolicy DeletePolicy `json:"delete_policy"`
Toolname string `json:"toolname"`
Status string `json:"status"` Status string `json:"-"`
Signal chan int `json:"-"` Signal chan int `json:"-"`
GID string `json:"-"` GID string `json:"-"`
tool Tool tool Tool
@ -27,6 +27,13 @@ type DownloadTask struct {
} }
func (t *DownloadTask) Run() error { func (t *DownloadTask) Run() error {
if t.tool == nil {
tool, err := Tools.Get(t.Toolname)
if err != nil {
return errors.WithMessage(err, "failed get tool")
}
t.tool = tool
}
if err := t.tool.Run(t); !errs.IsNotSupportError(err) { if err := t.tool.Run(t); !errs.IsNotSupportError(err) {
if err == nil { if err == nil {
return t.Complete() return t.Complete()
@ -142,9 +149,10 @@ func (t *DownloadTask) Complete() error {
file := files[i] file := files[i]
TransferTaskManager.Add(&TransferTask{ TransferTaskManager.Add(&TransferTask{
file: file, file: file,
dstDirPath: t.DstDirPath, DstDirPath: t.DstDirPath,
tempDir: t.TempDir, TempDir: t.TempDir,
deletePolicy: t.DeletePolicy, DeletePolicy: t.DeletePolicy,
FileDir: file.Path,
}) })
} }
return nil return nil

View File

@ -2,6 +2,9 @@ package tool
import ( import (
"fmt" "fmt"
"os"
"path/filepath"
"github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op" "github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/internal/stream" "github.com/alist-org/alist/v3/internal/stream"
@ -9,21 +12,27 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/xhofe/tache" "github.com/xhofe/tache"
"os"
"path/filepath"
) )
type TransferTask struct { type TransferTask struct {
tache.Base tache.Base
FileDir string `json:"file_dir"`
DstDirPath string `json:"dst_dir_path"`
TempDir string `json:"temp_dir"`
DeletePolicy DeletePolicy `json:"delete_policy"`
file File file File
dstDirPath string
tempDir string
deletePolicy DeletePolicy
} }
func (t *TransferTask) Run() error { func (t *TransferTask) Run() error {
// check dstDir again // check dstDir again
storage, dstDirActualPath, err := op.GetStorageAndActualPath(t.dstDirPath) var err error
if (t.file == File{}) {
t.file, err = GetFile(t.FileDir)
if err != nil {
return errors.Wrapf(err, "failed to get file %s", t.FileDir)
}
}
storage, dstDirActualPath, err := op.GetStorageAndActualPath(t.DstDirPath)
if err != nil { if err != nil {
return errors.WithMessage(err, "failed get storage") return errors.WithMessage(err, "failed get storage")
} }
@ -44,7 +53,7 @@ func (t *TransferTask) Run() error {
Mimetype: mimetype, Mimetype: mimetype,
Closers: utils.NewClosers(rc), Closers: utils.NewClosers(rc),
} }
relDir, err := filepath.Rel(t.tempDir, filepath.Dir(t.file.Path)) relDir, err := filepath.Rel(t.TempDir, filepath.Dir(t.file.Path))
if err != nil { if err != nil {
log.Errorf("find relation directory error: %v", err) log.Errorf("find relation directory error: %v", err)
} }
@ -53,7 +62,7 @@ func (t *TransferTask) Run() error {
} }
func (t *TransferTask) GetName() string { func (t *TransferTask) GetName() string {
return fmt.Sprintf("transfer %s to [%s]", t.file.Path, t.dstDirPath) return fmt.Sprintf("transfer %s to [%s]", t.file.Path, t.DstDirPath)
} }
func (t *TransferTask) GetStatus() string { func (t *TransferTask) GetStatus() string {
@ -61,7 +70,7 @@ func (t *TransferTask) GetStatus() string {
} }
func (t *TransferTask) OnSucceeded() { func (t *TransferTask) OnSucceeded() {
if t.deletePolicy == DeleteOnUploadSucceed || t.deletePolicy == DeleteAlways { if t.DeletePolicy == DeleteOnUploadSucceed || t.DeletePolicy == DeleteAlways {
err := os.Remove(t.file.Path) err := os.Remove(t.file.Path)
if err != nil { if err != nil {
log.Errorf("failed to delete file %s, error: %s", t.file.Path, err.Error()) log.Errorf("failed to delete file %s, error: %s", t.file.Path, err.Error())
@ -70,7 +79,7 @@ func (t *TransferTask) OnSucceeded() {
} }
func (t *TransferTask) OnFailed() { func (t *TransferTask) OnFailed() {
if t.deletePolicy == DeleteOnUploadFailed || t.deletePolicy == DeleteAlways { if t.DeletePolicy == DeleteOnUploadFailed || t.DeletePolicy == DeleteAlways {
err := os.Remove(t.file.Path) err := os.Remove(t.file.Path)
if err != nil { if err != nil {
log.Errorf("failed to delete file %s, error: %s", t.file.Path, err.Error()) log.Errorf("failed to delete file %s, error: %s", t.file.Path, err.Error())

View File

@ -26,3 +26,16 @@ func GetFiles(dir string) ([]File, error) {
} }
return files, nil return files, nil
} }
func GetFile(path string) (File, error) {
info, err := os.Stat(path)
if err != nil {
return File{}, err
}
return File{
Name: info.Name(),
Size: info.Size(),
Path: path,
Modified: info.ModTime(),
}, nil
}