diff --git a/go.mod b/go.mod index c6a6bc21..e81d863f 100644 --- a/go.mod +++ b/go.mod @@ -123,6 +123,7 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgx/v5 v5.3.0 // indirect + github.com/jaevor/go-nanoid v1.3.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect @@ -183,6 +184,8 @@ require ( github.com/u2takey/go-utils v0.3.1 // indirect github.com/ugorji/go/codec v1.2.11 // indirect github.com/x448/float16 v0.8.4 // indirect + github.com/xhofe/gsync v0.0.0-20230917091818-2111ceb38a25 // indirect + github.com/xhofe/tache v0.0.0-20231120085916-722855be0521 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect go.etcd.io/bbolt v1.3.7 // indirect golang.org/x/arch v0.3.0 // indirect diff --git a/go.sum b/go.sum index 5abe5aef..7d1ccf7b 100644 --- a/go.sum +++ b/go.sum @@ -221,6 +221,8 @@ github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZ github.com/jackc/pgx/v5 v5.3.0 h1:/NQi8KHMpKWHInxXesC8yD4DhkXPrVhmnwYkjp9AmBA= github.com/jackc/pgx/v5 v5.3.0/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8= github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jaevor/go-nanoid v1.3.0 h1:nD+iepesZS6pr3uOVf20vR9GdGgJW1HPaR46gtrxzkg= +github.com/jaevor/go-nanoid v1.3.0/go.mod h1:SI+jFaPuddYkqkVQoNGHs81navCtH388TcrH0RqFKgY= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= @@ -433,6 +435,16 @@ github.com/winfsp/cgofuse v1.5.1-0.20230130140708-f87f5db493b5 h1:jxZvjx8Ve5sOXo github.com/winfsp/cgofuse v1.5.1-0.20230130140708-f87f5db493b5/go.mod h1:uxjoF2jEYT3+x+vC2KJddEGdk/LU8pRowXmyVMHSV5I= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +github.com/xhofe/gsync v0.0.0-20230917091818-2111ceb38a25 h1:eDfebW/yfq9DtG9RO3KP7BT2dot2CvJGIvrB0NEoDXI= +github.com/xhofe/gsync v0.0.0-20230917091818-2111ceb38a25/go.mod h1:fH4oNm5F9NfI5dLi0oIMtsLNKQOirUDbEMCIBb/7SU0= +github.com/xhofe/tache v0.0.0-20231110075853-2bd4b52dad9b h1:958N/31ioR0QSg6RarX1aqBsfmlOI2JeYiVzxeGdUAA= +github.com/xhofe/tache v0.0.0-20231110075853-2bd4b52dad9b/go.mod h1:1ISbKrHZNMMrXvgCdaFV0Vkc9Wbo7WV1q7Teovm4Huc= +github.com/xhofe/tache v0.0.0-20231119124711-c417893fc267 h1:MC271sH8UHYqr/IDz9PsqTlyD51HyFvxtQRTemwxR9s= +github.com/xhofe/tache v0.0.0-20231119124711-c417893fc267/go.mod h1:iKumPFvywf30FRpAHHCt64G0JHLMzT0K+wyGedHsmTQ= +github.com/xhofe/tache v0.0.0-20231120064353-a3585a237e25 h1:XZBuEzDB9Kqni/+zAKxl30iOdp80/GavUsCkPMiQMjg= +github.com/xhofe/tache v0.0.0-20231120064353-a3585a237e25/go.mod h1:iKumPFvywf30FRpAHHCt64G0JHLMzT0K+wyGedHsmTQ= +github.com/xhofe/tache v0.0.0-20231120085916-722855be0521 h1:m7O+xOqQRysjFngMhQ39RzCFdiCouFLvsrV7N2ScbUY= +github.com/xhofe/tache v0.0.0-20231120085916-722855be0521/go.mod h1:iKumPFvywf30FRpAHHCt64G0JHLMzT0K+wyGedHsmTQ= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= diff --git a/internal/fs/copy.go b/internal/fs/copy.go index c3e387cb..911e528a 100644 --- a/internal/fs/copy.go +++ b/internal/fs/copy.go @@ -3,24 +3,39 @@ package fs import ( "context" "fmt" - "net/http" - stdpath "path" - "sync/atomic" - "github.com/alist-org/alist/v3/internal/conf" "github.com/alist-org/alist/v3/internal/driver" "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/op" "github.com/alist-org/alist/v3/internal/stream" - "github.com/alist-org/alist/v3/pkg/task" "github.com/alist-org/alist/v3/pkg/utils" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "github.com/xhofe/tache" + "net/http" + stdpath "path" ) -var CopyTaskManager = task.NewTaskManager(3, func(tid *uint64) { - atomic.AddUint64(tid, 1) -}) +type CopyTask struct { + tache.Base + Status string `json:"status"` + srcStorage, dstStorage driver.Driver + srcObjPath, dstDirPath string +} + +func (t *CopyTask) GetName() string { + return fmt.Sprintf("copy [%s](%s) to [%s](%s)", + t.srcStorage.GetStorage().MountPath, t.srcObjPath, t.dstStorage.GetStorage().MountPath, t.dstDirPath) +} + +func (t *CopyTask) GetStatus() string { + return t.Status +} + +func (t *CopyTask) Run() error { + return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.srcObjPath, t.dstDirPath) +} + +var CopyTaskManager = tache.NewManager[*CopyTask]() // Copy if in the same storage, call move method // if not, add copy task @@ -63,59 +78,52 @@ func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool } } // not in the same storage - CopyTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{ - Name: fmt.Sprintf("copy [%s](%s) to [%s](%s)", srcStorage.GetStorage().MountPath, srcObjActualPath, dstStorage.GetStorage().MountPath, dstDirActualPath), - Func: func(task *task.Task[uint64]) error { - return copyBetween2Storages(task, srcStorage, dstStorage, srcObjActualPath, dstDirActualPath) - }, - })) + CopyTaskManager.Add(&CopyTask{ + srcStorage: srcStorage, + dstStorage: dstStorage, + srcObjPath: srcObjActualPath, + dstDirPath: dstDirActualPath, + }) return true, nil } -func copyBetween2Storages(t *task.Task[uint64], srcStorage, dstStorage driver.Driver, srcObjPath, dstDirPath string) error { - t.SetStatus("getting src object") - srcObj, err := op.Get(t.Ctx, srcStorage, srcObjPath) +func copyBetween2Storages(t *CopyTask, srcStorage, dstStorage driver.Driver, srcObjPath, dstDirPath string) error { + t.Status = "getting src object" + srcObj, err := op.Get(t.Ctx(), srcStorage, srcObjPath) if err != nil { return errors.WithMessagef(err, "failed get src [%s] file", srcObjPath) } if srcObj.IsDir() { - t.SetStatus("src object is dir, listing objs") - objs, err := op.List(t.Ctx, srcStorage, srcObjPath, model.ListArgs{}) + t.Status = "src object is dir, listing objs" + objs, err := op.List(t.Ctx(), srcStorage, srcObjPath, model.ListArgs{}) if err != nil { return errors.WithMessagef(err, "failed list src [%s] objs", srcObjPath) } for _, obj := range objs { - if utils.IsCanceled(t.Ctx) { + if utils.IsCanceled(t.Ctx()) { return nil } srcObjPath := stdpath.Join(srcObjPath, obj.GetName()) dstObjPath := stdpath.Join(dstDirPath, srcObj.GetName()) - CopyTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{ - Name: fmt.Sprintf("copy [%s](%s) to [%s](%s)", srcStorage.GetStorage().MountPath, srcObjPath, dstStorage.GetStorage().MountPath, dstObjPath), - Func: func(t *task.Task[uint64]) error { - return copyBetween2Storages(t, srcStorage, dstStorage, srcObjPath, dstObjPath) - }, - })) + CopyTaskManager.Add(&CopyTask{ + srcStorage: srcStorage, + dstStorage: dstStorage, + srcObjPath: srcObjPath, + dstDirPath: dstObjPath, + }) } - } else { - CopyTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{ - Name: fmt.Sprintf("copy [%s](%s) to [%s](%s)", srcStorage.GetStorage().MountPath, srcObjPath, dstStorage.GetStorage().MountPath, dstDirPath), - Func: func(t *task.Task[uint64]) error { - err := copyFileBetween2Storages(t, srcStorage, dstStorage, srcObjPath, dstDirPath) - log.Debugf("copy file between storages: %+v", err) - return err - }, - })) + t.Status = "src object is dir, added all copy tasks of objs" + return nil } - return nil + return copyFileBetween2Storages(t, srcStorage, dstStorage, srcObjPath, dstDirPath) } -func copyFileBetween2Storages(tsk *task.Task[uint64], srcStorage, dstStorage driver.Driver, srcFilePath, dstDirPath string) error { - srcFile, err := op.Get(tsk.Ctx, srcStorage, srcFilePath) +func copyFileBetween2Storages(tsk *CopyTask, srcStorage, dstStorage driver.Driver, srcFilePath, dstDirPath string) error { + srcFile, err := op.Get(tsk.Ctx(), srcStorage, srcFilePath) if err != nil { return errors.WithMessagef(err, "failed get src [%s] file", srcFilePath) } - link, _, err := op.Link(tsk.Ctx, srcStorage, srcFilePath, model.LinkArgs{ + link, _, err := op.Link(tsk.Ctx(), srcStorage, srcFilePath, model.LinkArgs{ Header: http.Header{}, }) if err != nil { @@ -123,12 +131,12 @@ func copyFileBetween2Storages(tsk *task.Task[uint64], srcStorage, dstStorage dri } fs := stream.FileStream{ Obj: srcFile, - Ctx: tsk.Ctx, + Ctx: tsk.Ctx(), } // any link provided is seekable ss, err := stream.NewSeekableStream(fs, link) if err != nil { return errors.WithMessagef(err, "failed get [%s] stream", srcFilePath) } - return op.Put(tsk.Ctx, dstStorage, dstDirPath, ss, tsk.SetProgress, true) + return op.Put(tsk.Ctx(), dstStorage, dstDirPath, ss, tsk.SetProgress, true) } diff --git a/internal/fs/put.go b/internal/fs/put.go index ab6d24bf..5c154756 100644 --- a/internal/fs/put.go +++ b/internal/fs/put.go @@ -3,18 +3,34 @@ package fs import ( "context" "fmt" - "github.com/alist-org/alist/v3/internal/model" - "sync/atomic" - + "github.com/alist-org/alist/v3/internal/driver" "github.com/alist-org/alist/v3/internal/errs" + "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/op" - "github.com/alist-org/alist/v3/pkg/task" "github.com/pkg/errors" + "github.com/xhofe/tache" ) -var UploadTaskManager = task.NewTaskManager(3, func(tid *uint64) { - atomic.AddUint64(tid, 1) -}) +type UploadTask struct { + tache.Base + storage driver.Driver + dstDirActualPath string + file model.FileStreamer +} + +func (t *UploadTask) GetName() string { + return fmt.Sprintf("upload %s to [%s](%s)", t.file.GetName(), t.storage.GetStorage().MountPath, t.dstDirActualPath) +} + +func (t *UploadTask) GetStatus() string { + return "uploading" +} + +func (t *UploadTask) Run() error { + return op.Put(t.Ctx(), t.storage, t.dstDirActualPath, t.file, t.SetProgress, true) +} + +var UploadTaskManager = tache.NewManager[*UploadTask]() // putAsTask add as a put task and return immediately func putAsTask(dstDirPath string, file model.FileStreamer) error { @@ -33,12 +49,11 @@ func putAsTask(dstDirPath string, file model.FileStreamer) error { //file.SetReader(tempFile) //file.SetTmpFile(tempFile) } - UploadTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{ - Name: fmt.Sprintf("upload %s to [%s](%s)", file.GetName(), storage.GetStorage().MountPath, dstDirActualPath), - Func: func(task *task.Task[uint64]) error { - return op.Put(task.Ctx, storage, dstDirActualPath, file, task.SetProgress, true) - }, - })) + UploadTaskManager.Add(&UploadTask{ + storage: storage, + dstDirActualPath: dstDirActualPath, + file: file, + }) return nil } diff --git a/internal/offline_download/aria2/aria2.go b/internal/offline_download/aria2/aria2.go index f2b9628c..4cdad64b 100644 --- a/internal/offline_download/aria2/aria2.go +++ b/internal/offline_download/aria2/aria2.go @@ -21,6 +21,10 @@ type Aria2 struct { client rpc.Client } +func (a *Aria2) Name() string { + return "aria2" +} + func (a *Aria2) Items() []model.SettingItem { // aria2 settings return []model.SettingItem{ @@ -58,16 +62,17 @@ func (a *Aria2) AddURL(args *tool.AddUrlArgs) (string, error) { if err != nil { return "", err } + notify.Signals.Store(gid, args.Signal) return gid, nil } -func (a *Aria2) Remove(tid string) error { - _, err := a.client.Remove(tid) +func (a *Aria2) Remove(task *tool.DownloadTask) error { + _, err := a.client.Remove(task.GID) return err } -func (a *Aria2) Status(tid string) (*tool.Status, error) { - info, err := a.client.TellStatus(tid) +func (a *Aria2) Status(task *tool.DownloadTask) (*tool.Status, error) { + info, err := a.client.TellStatus(task.GID) if err != nil { return nil, err } @@ -85,15 +90,15 @@ func (a *Aria2) Status(tid string) (*tool.Status, error) { } s.Progress = float64(downloaded) / float64(total) * 100 if len(info.FollowedBy) != 0 { - s.NewTID = info.FollowedBy[0] - notify.Signals.Delete(tid) - //notify.Signals.Store(gid, m.c) + s.NewGID = info.FollowedBy[0] + notify.Signals.Delete(task.GID) + notify.Signals.Store(s.NewGID, task.Signal) } switch info.Status { case "complete": s.Completed = true case "error": - s.Err = errors.Errorf("failed to download %s, error: %s", tid, info.ErrorMessage) + s.Err = errors.Errorf("failed to download %s, error: %s", task.GID, info.ErrorMessage) case "active": s.Status = "aria2: " + info.Status if info.Seeder == "true" { @@ -102,32 +107,15 @@ func (a *Aria2) Status(tid string) (*tool.Status, error) { case "waiting", "paused": s.Status = "aria2: " + info.Status case "removed": - s.Err = errors.Errorf("failed to download %s, removed", tid) + s.Err = errors.Errorf("failed to download %s, removed", task.GID) default: return nil, errors.Errorf("[aria2] unknown status %s", info.Status) } return s, nil } -func (a *Aria2) GetFiles(tid string) []tool.File { - //files, err := a.client.GetFiles(tid) - //if err != nil { - // return nil - //} - //return utils.MustSliceConvert(files, func(f rpc.FileInfo) tool.File { - // return tool.File{ - // //ReadCloser: nil, - // Name: path.Base(f.Path), - // Size: f.Length, - // Path: "", - // Modified: time.Time{}, - // } - //}) - return nil -} - var _ tool.Tool = (*Aria2)(nil) func init() { - tool.Tools.Add("aria2", &Aria2{}) + tool.Tools.Add(&Aria2{}) } diff --git a/internal/offline_download/qbit/qbit.go b/internal/offline_download/qbit/qbit.go index 388ce22e..28a5170e 100644 --- a/internal/offline_download/qbit/qbit.go +++ b/internal/offline_download/qbit/qbit.go @@ -13,6 +13,10 @@ type QBittorrent struct { client qbittorrent.Client } +func (a *QBittorrent) Name() string { + return "qBittorrent" +} + func (a *QBittorrent) Items() []model.SettingItem { // qBittorrent settings return []model.SettingItem{ @@ -44,13 +48,13 @@ func (a *QBittorrent) AddURL(args *tool.AddUrlArgs) (string, error) { return args.UID, nil } -func (a *QBittorrent) Remove(tid string) error { - err := a.client.Delete(tid, true) +func (a *QBittorrent) Remove(task *tool.DownloadTask) error { + err := a.client.Delete(task.GID, true) return err } -func (a *QBittorrent) Status(tid string) (*tool.Status, error) { - info, err := a.client.GetInfo(tid) +func (a *QBittorrent) Status(task *tool.DownloadTask) (*tool.Status, error) { + info, err := a.client.GetInfo(task.GID) if err != nil { return nil, err } @@ -62,19 +66,15 @@ func (a *QBittorrent) Status(tid string) (*tool.Status, error) { case qbittorrent.ALLOCATING, qbittorrent.DOWNLOADING, qbittorrent.METADL, qbittorrent.PAUSEDDL, qbittorrent.QUEUEDDL, qbittorrent.STALLEDDL, qbittorrent.CHECKINGDL, qbittorrent.FORCEDDL, qbittorrent.CHECKINGRESUMEDATA, qbittorrent.MOVING: s.Status = "[qBittorrent] downloading" case qbittorrent.ERROR, qbittorrent.MISSINGFILES, qbittorrent.UNKNOWN: - s.Err = errors.Errorf("[qBittorrent] failed to download %s, error: %s", tid, info.State) + s.Err = errors.Errorf("[qBittorrent] failed to download %s, error: %s", task.GID, info.State) default: - s.Err = errors.Errorf("[qBittorrent] unknown error occurred downloading %s", tid) + s.Err = errors.Errorf("[qBittorrent] unknown error occurred downloading %s", task.GID) } return s, nil } -func (a *QBittorrent) GetFiles(tid string) []tool.File { - return nil -} - var _ tool.Tool = (*QBittorrent)(nil) func init() { - tool.Tools.Add("qBittorrent", &QBittorrent{}) + tool.Tools.Add(&QBittorrent{}) } diff --git a/internal/offline_download/tool/add.go b/internal/offline_download/tool/add.go index ceaf92d3..9ad8d055 100644 --- a/internal/offline_download/tool/add.go +++ b/internal/offline_download/tool/add.go @@ -2,21 +2,27 @@ package tool import ( "context" - "fmt" - "path/filepath" - "github.com/alist-org/alist/v3/internal/conf" "github.com/alist-org/alist/v3/internal/errs" "github.com/alist-org/alist/v3/internal/op" - "github.com/alist-org/alist/v3/pkg/task" "github.com/google/uuid" "github.com/pkg/errors" + "path/filepath" +) + +type DeletePolicy string + +const ( + DeleteOnUploadSucceed DeletePolicy = "delete_on_upload_succeed" + DeleteOnUploadFailed DeletePolicy = "delete_on_upload_failed" + DeleteNever DeletePolicy = "delete_never" ) type AddURLArgs struct { - URL string - DstDirPath string - Tool string + URL string + DstDirPath string + Tool string + DeletePolicy DeletePolicy } func AddURL(ctx context.Context, args *AddURLArgs) error { @@ -56,29 +62,13 @@ func AddURL(ctx context.Context, args *AddURLArgs) error { uid := uuid.NewString() tempDir := filepath.Join(conf.Conf.TempDir, args.Tool, uid) - signal := make(chan int) - gid, err := tool.AddURL(&AddUrlArgs{ - Url: args.URL, - UID: uid, - TempDir: tempDir, - Signal: signal, - }) - if err != nil { - return errors.Wrapf(err, "[%s] failed to add uri %s", args.Tool, args.URL) + t := &DownloadTask{ + Url: args.URL, + DstDirPath: args.DstDirPath, + TempDir: tempDir, + DeletePolicy: args.DeletePolicy, + tool: tool, } - DownTaskManager.Submit(task.WithCancelCtx(&task.Task[string]{ - ID: gid, - Name: fmt.Sprintf("download %s to [%s](%s)", args.URL, storage.GetStorage().MountPath, dstDirActualPath), - Func: func(tsk *task.Task[string]) error { - m := &Monitor{ - tool: tool, - tsk: tsk, - tempDir: tempDir, - dstDirPath: args.DstDirPath, - signal: signal, - } - return m.Loop() - }, - })) + DownloadTaskManager.Add(t) return nil } diff --git a/internal/offline_download/tool/base.go b/internal/offline_download/tool/base.go index 4689635b..1dd8e82b 100644 --- a/internal/offline_download/tool/base.go +++ b/internal/offline_download/tool/base.go @@ -17,13 +17,14 @@ type AddUrlArgs struct { type Status struct { Progress float64 - NewTID string + NewGID string Completed bool Status string Err error } type Tool interface { + Name() string // Items return the setting items the tool need Items() []model.SettingItem Init() (string, error) @@ -31,20 +32,23 @@ type Tool interface { // AddURL add an uri to download, return the task id AddURL(args *AddUrlArgs) (string, error) // Remove the download if task been canceled - Remove(tid string) error + Remove(task *DownloadTask) error // Status return the status of the download task, if an error occurred, return the error in Status.Err - Status(tid string) (*Status, error) + Status(task *DownloadTask) (*Status, error) +} + +type GetFileser interface { // GetFiles return the files of the download task, if nil, means walk the temp dir to get the files - GetFiles(tid string) []File + GetFiles(task *DownloadTask) []File } type File struct { // ReadCloser for http client - io.ReadCloser - Name string - Size int64 - Path string - Modified time.Time + ReadCloser io.ReadCloser + Name string + Size int64 + Path string + Modified time.Time } func (f *File) GetReadCloser() (io.ReadCloser, error) { diff --git a/internal/offline_download/tool/download.go b/internal/offline_download/tool/download.go new file mode 100644 index 00000000..7b536762 --- /dev/null +++ b/internal/offline_download/tool/download.go @@ -0,0 +1,147 @@ +package tool + +import ( + "fmt" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/xhofe/tache" + "sync" + "time" +) + +type DownloadTask struct { + tache.Base + Url string `json:"url"` + DstDirPath string `json:"dst_dir_path"` + TempDir string `json:"temp_dir"` + DeletePolicy DeletePolicy `json:"delete_policy"` + + Status string `json:"status"` + Signal chan int `json:"-"` + GID string `json:"-"` + finish chan struct{} + tool Tool + callStatusRetried int +} + +func (t *DownloadTask) Run() error { + t.Signal = make(chan int) + t.finish = make(chan struct{}) + defer func() { + t.Signal = nil + t.finish = nil + }() + gid, err := t.tool.AddURL(&AddUrlArgs{ + Url: t.Url, + UID: t.ID, + TempDir: t.TempDir, + Signal: t.Signal, + }) + if err != nil { + return err + } + t.GID = gid + var ( + ok bool + ) +outer: + for { + select { + case <-t.CtxDone(): + err := t.tool.Remove(t) + return err + case <-t.Signal: + ok, err = t.Update() + if ok { + break outer + } + case <-time.After(time.Second * 3): + ok, err = t.Update() + if ok { + break outer + } + } + } + if err != nil { + return err + } + t.Status = "aria2 download completed, maybe transferring" + t.finish <- struct{}{} + t.Status = "offline download completed" + return nil +} + +// Update download status, return true if download completed +func (t *DownloadTask) Update() (bool, error) { + info, err := t.tool.Status(t) + if err != nil { + t.callStatusRetried++ + log.Errorf("failed to get status of %s, retried %d times", t.ID, t.callStatusRetried) + return false, nil + } + if t.callStatusRetried > 5 { + return true, errors.Errorf("failed to get status of %s, retried %d times", t.ID, t.callStatusRetried) + } + t.callStatusRetried = 0 + t.SetProgress(info.Progress) + t.Status = fmt.Sprintf("[%s]: %s", t.tool.Name(), info.Status) + if info.NewGID != "" { + log.Debugf("followen by: %+v", info.NewGID) + t.GID = info.NewGID + return false, nil + } + // if download completed + if info.Completed { + err := t.Complete() + return true, errors.WithMessage(err, "failed to transfer file") + } + // if download failed + if info.Err != nil { + return true, errors.Errorf("failed to download %s, error: %s", t.ID, info.Err.Error()) + } + return false, nil +} + +func (t *DownloadTask) Complete() error { + var ( + files []File + err error + ) + if getFileser, ok := t.tool.(GetFileser); ok { + files = getFileser.GetFiles(t) + } else { + files, err = GetFiles(t.TempDir) + if err != nil { + return errors.Wrapf(err, "failed to get files") + } + } + // upload files + var wg sync.WaitGroup + wg.Add(len(files)) + go func() { + wg.Wait() + t.finish <- struct{}{} + }() + for i, _ := range files { + file := files[i] + TransferTaskManager.Add(&TransferTask{ + file: file, + dstDirPath: t.DstDirPath, + wg: &wg, + tempDir: t.TempDir, + }) + } + return nil +} + +func (t *DownloadTask) GetName() string { + return fmt.Sprintf("download %s to (%s)", t.Url, t.DstDirPath) +} + +func (t *DownloadTask) GetStatus() string { + return t.Status +} + +var ( + DownloadTaskManager *tache.Manager[*DownloadTask] = tache.NewManager[*DownloadTask]() +) diff --git a/internal/offline_download/tool/monitor.go b/internal/offline_download/tool/monitor.go deleted file mode 100644 index 984bda17..00000000 --- a/internal/offline_download/tool/monitor.go +++ /dev/null @@ -1,159 +0,0 @@ -package tool - -import ( - "fmt" - "os" - "path/filepath" - "sync" - "sync/atomic" - "time" - - "github.com/alist-org/alist/v3/internal/model" - "github.com/alist-org/alist/v3/internal/op" - "github.com/alist-org/alist/v3/internal/stream" - "github.com/alist-org/alist/v3/pkg/task" - "github.com/alist-org/alist/v3/pkg/utils" - "github.com/pkg/errors" - log "github.com/sirupsen/logrus" -) - -type Monitor struct { - tool Tool - tsk *task.Task[string] - tempDir string - retried int - dstDirPath string - finish chan struct{} - signal chan int -} - -func (m *Monitor) Loop() error { - m.finish = make(chan struct{}) - var ( - err error - ok bool - ) -outer: - for { - select { - case <-m.tsk.Ctx.Done(): - err := m.tool.Remove(m.tsk.ID) - return err - case <-m.signal: - ok, err = m.Update() - if ok { - break outer - } - case <-time.After(time.Second * 2): - ok, err = m.Update() - if ok { - break outer - } - } - } - if err != nil { - return err - } - m.tsk.SetStatus("aria2 download completed, transferring") - <-m.finish - m.tsk.SetStatus("completed") - return nil -} - -// Update download status, return true if download completed -func (m *Monitor) Update() (bool, error) { - info, err := m.tool.Status(m.tsk.ID) - if err != nil { - m.retried++ - log.Errorf("failed to get status of %s, retried %d times", m.tsk.ID, m.retried) - return false, nil - } - if m.retried > 5 { - return true, errors.Errorf("failed to get status of %s, retried %d times", m.tsk.ID, m.retried) - } - m.retried = 0 - m.tsk.SetProgress(info.Progress) - m.tsk.SetStatus("tool: " + info.Status) - if info.NewTID != "" { - log.Debugf("followen by: %+v", info.NewTID) - DownTaskManager.RawTasks().Delete(m.tsk.ID) - m.tsk.ID = info.NewTID - DownTaskManager.RawTasks().Store(m.tsk.ID, m.tsk) - return false, nil - } - // if download completed - if info.Completed { - err := m.Complete() - return true, errors.WithMessage(err, "failed to transfer file") - } - // if download failed - if info.Err != nil { - return true, errors.Errorf("failed to download %s, error: %s", m.tsk.ID, info.Err.Error()) - } - return false, nil -} - -var TransferTaskManager = task.NewTaskManager(3, func(k *uint64) { - atomic.AddUint64(k, 1) -}) - -func (m *Monitor) Complete() error { - // check dstDir again - storage, dstDirActualPath, err := op.GetStorageAndActualPath(m.dstDirPath) - if err != nil { - return errors.WithMessage(err, "failed get storage") - } - var files []File - if f := m.tool.GetFiles(m.tsk.ID); f != nil { - files = f - } else { - files, err = GetFiles(m.tempDir) - if err != nil { - return errors.Wrapf(err, "failed to get files") - } - } - // upload files - var wg sync.WaitGroup - wg.Add(len(files)) - go func() { - wg.Wait() - err := os.RemoveAll(m.tempDir) - m.finish <- struct{}{} - if err != nil { - log.Errorf("failed to remove aria2 temp dir: %+v", err.Error()) - } - }() - for i, _ := range files { - file := files[i] - TransferTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{ - Name: fmt.Sprintf("transfer %s to [%s](%s)", file.Path, storage.GetStorage().MountPath, dstDirActualPath), - Func: func(tsk *task.Task[uint64]) error { - defer wg.Done() - mimetype := utils.GetMimeType(file.Path) - rc, err := file.GetReadCloser() - if err != nil { - return errors.Wrapf(err, "failed to open file %s", file.Path) - } - s := &stream.FileStream{ - Ctx: nil, - Obj: &model.Object{ - Name: filepath.Base(file.Path), - Size: file.Size, - Modified: file.Modified, - IsFolder: false, - }, - Reader: rc, - Mimetype: mimetype, - Closers: utils.NewClosers(rc), - } - relDir, err := filepath.Rel(m.tempDir, filepath.Dir(file.Path)) - if err != nil { - log.Errorf("find relation directory error: %v", err) - } - newDistDir := filepath.Join(dstDirActualPath, relDir) - return op.Put(tsk.Ctx, storage, newDistDir, s, tsk.SetProgress) - }, - })) - } - return nil -} diff --git a/internal/offline_download/tool/tools.go b/internal/offline_download/tool/tools.go index b7eacbd2..9de7d526 100644 --- a/internal/offline_download/tool/tools.go +++ b/internal/offline_download/tool/tools.go @@ -2,14 +2,11 @@ package tool import ( "fmt" - "github.com/alist-org/alist/v3/internal/model" - "github.com/alist-org/alist/v3/pkg/task" ) var ( - Tools = make(ToolsManager) - DownTaskManager = task.NewTaskManager[string](3) + Tools = make(ToolsManager) ) type ToolsManager map[string]Tool @@ -21,8 +18,8 @@ func (t ToolsManager) Get(name string) (Tool, error) { return nil, fmt.Errorf("tool %s not found", name) } -func (t ToolsManager) Add(name string, tool Tool) { - t[name] = tool +func (t ToolsManager) Add(tool Tool) { + t[tool.Name()] = tool } func (t ToolsManager) Names() []string { diff --git a/internal/offline_download/tool/transfer.go b/internal/offline_download/tool/transfer.go new file mode 100644 index 00000000..f7d1791d --- /dev/null +++ b/internal/offline_download/tool/transfer.go @@ -0,0 +1,66 @@ +package tool + +import ( + "fmt" + "github.com/alist-org/alist/v3/internal/model" + "github.com/alist-org/alist/v3/internal/op" + "github.com/alist-org/alist/v3/internal/stream" + "github.com/alist-org/alist/v3/pkg/utils" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/xhofe/tache" + "path/filepath" + "sync" +) + +type TransferTask struct { + tache.Base + file File + dstDirPath string + wg *sync.WaitGroup + tempDir string +} + +func (t *TransferTask) Run() error { + defer t.wg.Done() + // check dstDir again + storage, dstDirActualPath, err := op.GetStorageAndActualPath(t.dstDirPath) + if err != nil { + return errors.WithMessage(err, "failed get storage") + } + mimetype := utils.GetMimeType(t.file.Path) + rc, err := t.file.GetReadCloser() + if err != nil { + return errors.Wrapf(err, "failed to open file %s", t.file.Path) + } + s := &stream.FileStream{ + Ctx: nil, + Obj: &model.Object{ + Name: filepath.Base(t.file.Path), + Size: t.file.Size, + Modified: t.file.Modified, + IsFolder: false, + }, + Reader: rc, + Mimetype: mimetype, + Closers: utils.NewClosers(rc), + } + relDir, err := filepath.Rel(t.tempDir, filepath.Dir(t.file.Path)) + if err != nil { + log.Errorf("find relation directory error: %v", err) + } + newDistDir := filepath.Join(dstDirActualPath, relDir) + return op.Put(t.Ctx(), storage, newDistDir, s, t.SetProgress) +} + +func (t *TransferTask) GetName() string { + return fmt.Sprintf("transfer %s to [%s]", t.file.Path, t.dstDirPath) +} + +func (t *TransferTask) GetStatus() string { + return "transferring" +} + +var ( + TransferTaskManager *tache.Manager[*TransferTask] = tache.NewManager[*TransferTask]() +) diff --git a/server/handles/offline_download.go b/server/handles/offline_download.go index cf9c1775..fdee063d 100644 --- a/server/handles/offline_download.go +++ b/server/handles/offline_download.go @@ -74,9 +74,10 @@ func OfflineDownloadTools(c *gin.Context) { } type AddOfflineDownloadReq struct { - Urls []string `json:"urls"` - Path string `json:"path"` - Tool string `json:"tool"` + Urls []string `json:"urls"` + Path string `json:"path"` + Tool string `json:"tool"` + DeletePolicy string `json:"delete_policy"` } func AddOfflineDownload(c *gin.Context) { @@ -98,9 +99,10 @@ func AddOfflineDownload(c *gin.Context) { } for _, url := range req.Urls { err := tool.AddURL(c, &tool.AddURLArgs{ - URL: url, - DstDirPath: reqPath, - Tool: req.Tool, + URL: url, + DstDirPath: reqPath, + Tool: req.Tool, + DeletePolicy: tool.DeletePolicy(req.DeletePolicy), }) if err != nil { common.ErrorResp(c, err, 500) diff --git a/server/handles/task.go b/server/handles/task.go index 821f7d56..1193ce05 100644 --- a/server/handles/task.go +++ b/server/handles/task.go @@ -1,122 +1,77 @@ package handles import ( - "strconv" - "github.com/alist-org/alist/v3/internal/fs" "github.com/alist-org/alist/v3/internal/offline_download/tool" - "github.com/alist-org/alist/v3/pkg/task" "github.com/alist-org/alist/v3/server/common" "github.com/gin-gonic/gin" + "github.com/xhofe/tache" ) type TaskInfo struct { - ID string `json:"id"` - Name string `json:"name"` - State string `json:"state"` - Status string `json:"status"` - Progress float64 `json:"progress"` - Error string `json:"error"` + ID string `json:"id"` + Name string `json:"name"` + State tache.State `json:"state"` + Status string `json:"status"` + Progress float64 `json:"progress"` + Error string `json:"error"` } -type K2Str[K comparable] func(k K) string - -func uint64K2Str(k uint64) string { - return strconv.FormatUint(k, 10) -} - -func strK2Str(str string) string { - return str -} - -func getTaskInfo[K comparable](task *task.Task[K], k2Str K2Str[K]) TaskInfo { +func getTaskInfo[T tache.TaskWithInfo](task T) TaskInfo { return TaskInfo{ - ID: k2Str(task.ID), - Name: task.Name, + ID: task.GetID(), + Name: task.GetName(), State: task.GetState(), Status: task.GetStatus(), Progress: task.GetProgress(), - Error: task.GetErrMsg(), + Error: task.GetErr().Error(), } } -func getTaskInfos[K comparable](tasks []*task.Task[K], k2Str K2Str[K]) []TaskInfo { +func getTaskInfos[T tache.TaskWithInfo](tasks []T) []TaskInfo { var infos []TaskInfo for _, t := range tasks { - infos = append(infos, getTaskInfo(t, k2Str)) + infos = append(infos, getTaskInfo(t)) } return infos } -type Str2K[K comparable] func(str string) (K, error) - -func str2Uint64K(str string) (uint64, error) { - return strconv.ParseUint(str, 10, 64) -} - -func str2StrK(str string) (string, error) { - return str, nil -} - -func taskRoute[K comparable](g *gin.RouterGroup, manager *task.Manager[K], k2Str K2Str[K], str2K Str2K[K]) { +func taskRoute[T tache.TaskWithInfo](g *gin.RouterGroup, manager *tache.Manager[T]) { g.GET("/undone", func(c *gin.Context) { - common.SuccessResp(c, getTaskInfos(manager.ListUndone(), k2Str)) + common.SuccessResp(c, getTaskInfos(manager.GetByState(tache.StatePending, tache.StateRunning, + tache.StateCanceling, tache.StateErrored, tache.StateFailing, tache.StateWaitingRetry, tache.StateBeforeRetry))) }) g.GET("/done", func(c *gin.Context) { - common.SuccessResp(c, getTaskInfos(manager.ListDone(), k2Str)) + common.SuccessResp(c, getTaskInfos(manager.GetByState(tache.StateCanceled, tache.StateFailed, tache.StateSucceeded))) }) g.POST("/cancel", func(c *gin.Context) { tid := c.Query("tid") - id, err := str2K(tid) - if err != nil { - common.ErrorResp(c, err, 400) - return - } - if err := manager.Cancel(id); err != nil { - common.ErrorResp(c, err, 500) - } else { - common.SuccessResp(c) - } + manager.Cancel(tid) + common.SuccessResp(c) }) g.POST("/delete", func(c *gin.Context) { tid := c.Query("tid") - id, err := str2K(tid) - if err != nil { - common.ErrorResp(c, err, 400) - return - } - if err := manager.Remove(id); err != nil { - common.ErrorResp(c, err, 500) - } else { - common.SuccessResp(c) - } + manager.Remove(tid) + common.SuccessResp(c) }) g.POST("/retry", func(c *gin.Context) { tid := c.Query("tid") - id, err := str2K(tid) - if err != nil { - common.ErrorResp(c, err, 400) - return - } - if err := manager.Retry(id); err != nil { - common.ErrorResp(c, err, 500) - } else { - common.SuccessResp(c) - } + manager.Retry(tid) + common.SuccessResp(c) }) g.POST("/clear_done", func(c *gin.Context) { - manager.ClearDone() + manager.RemoveByState(tache.StateCanceled, tache.StateFailed, tache.StateSucceeded) common.SuccessResp(c) }) g.POST("/clear_succeeded", func(c *gin.Context) { - manager.ClearSucceeded() + manager.RemoveByState(tache.StateSucceeded) common.SuccessResp(c) }) } func SetupTaskRoute(g *gin.RouterGroup) { - taskRoute(g.Group("/upload"), fs.UploadTaskManager, uint64K2Str, str2Uint64K) - taskRoute(g.Group("/copy"), fs.CopyTaskManager, uint64K2Str, str2Uint64K) - taskRoute(g.Group("/offline_download"), tool.DownTaskManager, strK2Str, str2StrK) - taskRoute(g.Group("/offline_download_transfer"), tool.TransferTaskManager, uint64K2Str, str2Uint64K) + taskRoute(g.Group("/upload"), fs.UploadTaskManager) + taskRoute(g.Group("/copy"), fs.CopyTaskManager) + taskRoute(g.Group("/offline_download"), tool.DownloadTaskManager) + taskRoute(g.Group("/offline_download_transfer"), tool.TransferTaskManager) }