From d7aa1608ac2f3834af87b4a81b8f0970b3dadbc0 Mon Sep 17 00:00:00 2001 From: KirCute_ECT <951206789@qq.com> Date: Wed, 25 Dec 2024 21:09:54 +0800 Subject: [PATCH] feat(task): add speed monitor (#7655) --- internal/fs/copy.go | 17 ++++--- internal/fs/fs.go | 4 +- internal/fs/put.go | 11 ++-- internal/offline_download/115/client.go | 1 + internal/offline_download/aria2/aria2.go | 7 +-- internal/offline_download/http/client.go | 1 + internal/offline_download/pikpak/pikpak.go | 5 ++ internal/offline_download/qbit/qbit.go | 1 + internal/offline_download/tool/add.go | 4 +- internal/offline_download/tool/base.go | 11 ++-- internal/offline_download/tool/download.go | 16 ++++-- internal/offline_download/tool/transfer.go | 6 ++- .../offline_download/transmission/client.go | 1 + internal/task/base.go | 50 ++++++++++++++++--- server/handles/fsmanage.go | 2 +- server/handles/fsup.go | 4 +- server/handles/offline_download.go | 2 +- server/handles/task.go | 17 +++++-- 18 files changed, 116 insertions(+), 44 deletions(-) diff --git a/internal/fs/copy.go b/internal/fs/copy.go index d4ad452b..c3fadaab 100644 --- a/internal/fs/copy.go +++ b/internal/fs/copy.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" stdpath "path" + "time" "github.com/alist-org/alist/v3/internal/conf" "github.com/alist-org/alist/v3/internal/driver" @@ -18,7 +19,7 @@ import ( ) type CopyTask struct { - task.TaskWithCreator + task.TaskExtension Status string `json:"-"` //don't save status to save space SrcObjPath string `json:"src_path"` DstDirPath string `json:"dst_path"` @@ -37,6 +38,9 @@ func (t *CopyTask) GetStatus() string { } func (t *CopyTask) Run() error { + t.ClearEndTime() + t.SetStartTime(time.Now()) + defer func() { t.SetEndTime(time.Now()) }() var err error if t.srcStorage == nil { t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp) @@ -54,7 +58,7 @@ var CopyTaskManager *tache.Manager[*CopyTask] // Copy if in the same storage, call move method // if not, add copy task -func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskInfoWithCreator, error) { +func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) { srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(srcObjPath) if err != nil { return nil, errors.WithMessage(err, "failed get src storage") @@ -93,9 +97,9 @@ func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool } } // not in the same storage - taskCreator, _ := ctx.Value("user").(*model.User) // taskCreator is nil when convert failed + taskCreator, _ := ctx.Value("user").(*model.User) t := &CopyTask{ - TaskWithCreator: task.TaskWithCreator{ + TaskExtension: task.TaskExtension{ Creator: taskCreator, }, srcStorage: srcStorage, @@ -128,8 +132,8 @@ func copyBetween2Storages(t *CopyTask, srcStorage, dstStorage driver.Driver, src srcObjPath := stdpath.Join(srcObjPath, obj.GetName()) dstObjPath := stdpath.Join(dstDirPath, srcObj.GetName()) CopyTaskManager.Add(&CopyTask{ - TaskWithCreator: task.TaskWithCreator{ - Creator: t.Creator, + TaskExtension: task.TaskExtension{ + Creator: t.GetCreator(), }, srcStorage: srcStorage, dstStorage: dstStorage, @@ -150,6 +154,7 @@ func copyFileBetween2Storages(tsk *CopyTask, srcStorage, dstStorage driver.Drive if err != nil { return errors.WithMessagef(err, "failed get src [%s] file", srcFilePath) } + tsk.SetTotalBytes(srcFile.GetSize()) link, _, err := op.Link(tsk.Ctx(), srcStorage, srcFilePath, model.LinkArgs{ Header: http.Header{}, }) diff --git a/internal/fs/fs.go b/internal/fs/fs.go index 65e5a2c2..24f1d47f 100644 --- a/internal/fs/fs.go +++ b/internal/fs/fs.go @@ -69,7 +69,7 @@ func Move(ctx context.Context, srcPath, dstDirPath string, lazyCache ...bool) er return err } -func Copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskInfoWithCreator, error) { +func Copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) { res, err := _copy(ctx, srcObjPath, dstDirPath, lazyCache...) if err != nil { log.Errorf("failed copy %s to %s: %+v", srcObjPath, dstDirPath, err) @@ -101,7 +101,7 @@ func PutDirectly(ctx context.Context, dstDirPath string, file model.FileStreamer return err } -func PutAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer) (task.TaskInfoWithCreator, error) { +func PutAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer) (task.TaskExtensionInfo, error) { t, err := putAsTask(ctx, dstDirPath, file) if err != nil { log.Errorf("failed put %s: %+v", dstDirPath, err) diff --git a/internal/fs/put.go b/internal/fs/put.go index 23197f5b..bc33a3ac 100644 --- a/internal/fs/put.go +++ b/internal/fs/put.go @@ -10,10 +10,11 @@ import ( "github.com/alist-org/alist/v3/internal/task" "github.com/pkg/errors" "github.com/xhofe/tache" + "time" ) type UploadTask struct { - task.TaskWithCreator + task.TaskExtension storage driver.Driver dstDirActualPath string file model.FileStreamer @@ -28,13 +29,16 @@ func (t *UploadTask) GetStatus() string { } func (t *UploadTask) Run() error { + t.ClearEndTime() + t.SetStartTime(time.Now()) + defer func() { t.SetEndTime(time.Now()) }() return op.Put(t.Ctx(), t.storage, t.dstDirActualPath, t.file, t.SetProgress, true) } var UploadTaskManager *tache.Manager[*UploadTask] // putAsTask add as a put task and return immediately -func putAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer) (task.TaskInfoWithCreator, error) { +func putAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer) (task.TaskExtensionInfo, error) { storage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath) if err != nil { return nil, errors.WithMessage(err, "failed get storage") @@ -52,13 +56,14 @@ func putAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer) } taskCreator, _ := ctx.Value("user").(*model.User) // taskCreator is nil when convert failed t := &UploadTask{ - TaskWithCreator: task.TaskWithCreator{ + TaskExtension: task.TaskExtension{ Creator: taskCreator, }, storage: storage, dstDirActualPath: dstDirActualPath, file: file, } + t.SetTotalBytes(file.GetSize()) UploadTaskManager.Add(t) return t, nil } diff --git a/internal/offline_download/115/client.go b/internal/offline_download/115/client.go index 0ebf38ff..45f147db 100644 --- a/internal/offline_download/115/client.go +++ b/internal/offline_download/115/client.go @@ -107,6 +107,7 @@ func (p *Cloud115) Status(task *tool.DownloadTask) (*tool.Status, error) { s.Progress = t.Percent s.Status = t.GetStatus() s.Completed = t.IsDone() + s.TotalBytes = t.Size if t.IsFailed() { s.Err = fmt.Errorf(t.GetStatus()) } diff --git a/internal/offline_download/aria2/aria2.go b/internal/offline_download/aria2/aria2.go index d22b32f9..fb212b35 100644 --- a/internal/offline_download/aria2/aria2.go +++ b/internal/offline_download/aria2/aria2.go @@ -82,7 +82,7 @@ func (a *Aria2) Status(task *tool.DownloadTask) (*tool.Status, error) { if err != nil { return nil, err } - total, err := strconv.ParseUint(info.TotalLength, 10, 64) + total, err := strconv.ParseInt(info.TotalLength, 10, 64) if err != nil { total = 0 } @@ -91,8 +91,9 @@ func (a *Aria2) Status(task *tool.DownloadTask) (*tool.Status, error) { downloaded = 0 } s := &tool.Status{ - Completed: info.Status == "complete", - Err: err, + Completed: info.Status == "complete", + Err: err, + TotalBytes: total, } s.Progress = float64(downloaded) / float64(total) * 100 if len(info.FollowedBy) != 0 { diff --git a/internal/offline_download/http/client.go b/internal/offline_download/http/client.go index 6f22fcf7..9b83400e 100644 --- a/internal/offline_download/http/client.go +++ b/internal/offline_download/http/client.go @@ -83,6 +83,7 @@ func (s SimpleHttp) Run(task *tool.DownloadTask) error { } defer file.Close() fileSize := resp.ContentLength + task.SetTotalBytes(fileSize) err = utils.CopyWithCtx(task.Ctx(), file, resp.Body, fileSize, task.SetProgress) return err } diff --git a/internal/offline_download/pikpak/pikpak.go b/internal/offline_download/pikpak/pikpak.go index 618b1442..f07b3de8 100644 --- a/internal/offline_download/pikpak/pikpak.go +++ b/internal/offline_download/pikpak/pikpak.go @@ -3,6 +3,7 @@ package pikpak import ( "context" "fmt" + "strconv" "github.com/alist-org/alist/v3/drivers/pikpak" "github.com/alist-org/alist/v3/internal/errs" @@ -105,6 +106,10 @@ func (p *PikPak) Status(task *tool.DownloadTask) (*tool.Status, error) { s.Progress = float64(t.Progress) s.Status = t.Message s.Completed = (t.Phase == "PHASE_TYPE_COMPLETE") + s.TotalBytes, err = strconv.ParseInt(t.FileSize, 10, 64) + if err != nil { + s.TotalBytes = 0 + } if t.Phase == "PHASE_TYPE_ERROR" { s.Err = fmt.Errorf(t.Message) } diff --git a/internal/offline_download/qbit/qbit.go b/internal/offline_download/qbit/qbit.go index 807ebfef..458de03f 100644 --- a/internal/offline_download/qbit/qbit.go +++ b/internal/offline_download/qbit/qbit.go @@ -64,6 +64,7 @@ func (a *QBittorrent) Status(task *tool.DownloadTask) (*tool.Status, error) { return nil, err } s := &tool.Status{} + s.TotalBytes = info.Size s.Progress = float64(info.Completed) / float64(info.Size) * 100 switch info.State { case qbittorrent.UPLOADING, qbittorrent.PAUSEDUP, qbittorrent.QUEUEDUP, qbittorrent.STALLEDUP, qbittorrent.FORCEDUP, qbittorrent.CHECKINGUP: diff --git a/internal/offline_download/tool/add.go b/internal/offline_download/tool/add.go index 1c9da146..42349e2e 100644 --- a/internal/offline_download/tool/add.go +++ b/internal/offline_download/tool/add.go @@ -29,7 +29,7 @@ type AddURLArgs struct { DeletePolicy DeletePolicy } -func AddURL(ctx context.Context, args *AddURLArgs) (task.TaskInfoWithCreator, error) { +func AddURL(ctx context.Context, args *AddURLArgs) (task.TaskExtensionInfo, error) { // get tool tool, err := Tools.Get(args.Tool) if err != nil { @@ -81,7 +81,7 @@ func AddURL(ctx context.Context, args *AddURLArgs) (task.TaskInfoWithCreator, er taskCreator, _ := ctx.Value("user").(*model.User) // taskCreator is nil when convert failed t := &DownloadTask{ - TaskWithCreator: task.TaskWithCreator{ + TaskExtension: task.TaskExtension{ Creator: taskCreator, }, Url: args.URL, diff --git a/internal/offline_download/tool/base.go b/internal/offline_download/tool/base.go index 3b9fb07a..ae9eac26 100644 --- a/internal/offline_download/tool/base.go +++ b/internal/offline_download/tool/base.go @@ -16,11 +16,12 @@ type AddUrlArgs struct { } type Status struct { - Progress float64 - NewGID string - Completed bool - Status string - Err error + TotalBytes int64 + Progress float64 + NewGID string + Completed bool + Status string + Err error } type Tool interface { diff --git a/internal/offline_download/tool/download.go b/internal/offline_download/tool/download.go index 038baf96..a0f1a81b 100644 --- a/internal/offline_download/tool/download.go +++ b/internal/offline_download/tool/download.go @@ -14,7 +14,7 @@ import ( ) type DownloadTask struct { - task.TaskWithCreator + task.TaskExtension Url string `json:"url"` DstDirPath string `json:"dst_dir_path"` TempDir string `json:"temp_dir"` @@ -28,6 +28,9 @@ type DownloadTask struct { } func (t *DownloadTask) Run() error { + t.ClearEndTime() + t.SetStartTime(time.Now()) + defer func() { t.SetEndTime(time.Now()) }() if t.tool == nil { tool, err := Tools.Get(t.Toolname) if err != nil { @@ -131,6 +134,7 @@ func (t *DownloadTask) Update() (bool, error) { } t.callStatusRetried = 0 t.SetProgress(info.Progress) + t.SetTotalBytes(info.TotalBytes) t.Status = fmt.Sprintf("[%s]: %s", t.tool.Name(), info.Status) if info.NewGID != "" { log.Debugf("followen by: %+v", info.NewGID) @@ -171,16 +175,18 @@ func (t *DownloadTask) Complete() error { // upload files for i := range files { file := files[i] - TransferTaskManager.Add(&TransferTask{ - TaskWithCreator: task.TaskWithCreator{ - Creator: t.Creator, + tsk := &TransferTask{ + TaskExtension: task.TaskExtension{ + Creator: t.GetCreator(), }, file: file, DstDirPath: t.DstDirPath, TempDir: t.TempDir, DeletePolicy: t.DeletePolicy, FileDir: file.Path, - }) + } + tsk.SetTotalBytes(file.Size) + TransferTaskManager.Add(tsk) } return nil } diff --git a/internal/offline_download/tool/transfer.go b/internal/offline_download/tool/transfer.go index 085b4a66..a77c4822 100644 --- a/internal/offline_download/tool/transfer.go +++ b/internal/offline_download/tool/transfer.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "time" "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/op" @@ -16,7 +17,7 @@ import ( ) type TransferTask struct { - task.TaskWithCreator + task.TaskExtension FileDir string `json:"file_dir"` DstDirPath string `json:"dst_dir_path"` TempDir string `json:"temp_dir"` @@ -25,6 +26,9 @@ type TransferTask struct { } func (t *TransferTask) Run() error { + t.ClearEndTime() + t.SetStartTime(time.Now()) + defer func() { t.SetEndTime(time.Now()) }() // check dstDir again var err error if (t.file == File{}) { diff --git a/internal/offline_download/transmission/client.go b/internal/offline_download/transmission/client.go index a6075414..4131f3e1 100644 --- a/internal/offline_download/transmission/client.go +++ b/internal/offline_download/transmission/client.go @@ -150,6 +150,7 @@ func (t *Transmission) Status(task *tool.DownloadTask) (*tool.Status, error) { Err: err, } s.Progress = *info.PercentDone * 100 + s.TotalBytes = int64(*info.SizeWhenDone / 8) switch *info.Status { case transmissionrpc.TorrentStatusCheckWait, diff --git a/internal/task/base.go b/internal/task/base.go index a30e5987..93f413a7 100644 --- a/internal/task/base.go +++ b/internal/task/base.go @@ -3,24 +3,58 @@ package task import ( "github.com/alist-org/alist/v3/internal/model" "github.com/xhofe/tache" + "time" ) -type TaskWithCreator struct { +type TaskExtension struct { tache.Base - Creator *model.User + Creator *model.User + startTime *time.Time + endTime *time.Time + totalBytes int64 } -func (t *TaskWithCreator) SetCreator(creator *model.User) { +func (t *TaskExtension) SetCreator(creator *model.User) { t.Creator = creator t.Persist() } -func (t *TaskWithCreator) GetCreator() *model.User { +func (t *TaskExtension) GetCreator() *model.User { return t.Creator } -type TaskInfoWithCreator interface { - tache.TaskWithInfo - SetCreator(creator *model.User) - GetCreator() *model.User +func (t *TaskExtension) SetStartTime(startTime time.Time) { + t.startTime = &startTime +} + +func (t *TaskExtension) GetStartTime() *time.Time { + return t.startTime +} + +func (t *TaskExtension) SetEndTime(endTime time.Time) { + t.endTime = &endTime +} + +func (t *TaskExtension) GetEndTime() *time.Time { + return t.endTime +} + +func (t *TaskExtension) ClearEndTime() { + t.endTime = nil +} + +func (t *TaskExtension) SetTotalBytes(totalBytes int64) { + t.totalBytes = totalBytes +} + +func (t *TaskExtension) GetTotalBytes() int64 { + return t.totalBytes +} + +type TaskExtensionInfo interface { + tache.TaskWithInfo + GetCreator() *model.User + GetStartTime() *time.Time + GetEndTime() *time.Time + GetTotalBytes() int64 } diff --git a/server/handles/fsmanage.go b/server/handles/fsmanage.go index 42d53d7e..9877b127 100644 --- a/server/handles/fsmanage.go +++ b/server/handles/fsmanage.go @@ -121,7 +121,7 @@ func FsCopy(c *gin.Context) { common.ErrorResp(c, err, 403) return } - var addedTasks []task.TaskInfoWithCreator + var addedTasks []task.TaskExtensionInfo for i, name := range req.Names { t, err := fs.Copy(c, stdpath.Join(srcDir, name), dstDir, len(req.Names) > i+1) if t != nil { diff --git a/server/handles/fsup.go b/server/handles/fsup.go index 3a366d49..a17c50f0 100644 --- a/server/handles/fsup.go +++ b/server/handles/fsup.go @@ -57,7 +57,7 @@ func FsStream(c *gin.Context) { Mimetype: c.GetHeader("Content-Type"), WebPutAsTask: asTask, } - var t task.TaskInfoWithCreator + var t task.TaskExtensionInfo if asTask { t, err = fs.PutAsTask(c, dir, s) } else { @@ -122,7 +122,7 @@ func FsForm(c *gin.Context) { Mimetype: file.Header.Get("Content-Type"), WebPutAsTask: asTask, } - var t task.TaskInfoWithCreator + var t task.TaskExtensionInfo if asTask { s.Reader = struct { io.Reader diff --git a/server/handles/offline_download.go b/server/handles/offline_download.go index ff1fcfa0..9e26030a 100644 --- a/server/handles/offline_download.go +++ b/server/handles/offline_download.go @@ -133,7 +133,7 @@ func AddOfflineDownload(c *gin.Context) { common.ErrorResp(c, err, 403) return } - var tasks []task.TaskInfoWithCreator + var tasks []task.TaskExtensionInfo for _, url := range req.Urls { t, err := tool.AddURL(c, &tool.AddURLArgs{ URL: url, diff --git a/server/handles/task.go b/server/handles/task.go index 5f996505..c7d9ef48 100644 --- a/server/handles/task.go +++ b/server/handles/task.go @@ -4,6 +4,7 @@ import ( "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/task" "math" + "time" "github.com/alist-org/alist/v3/internal/fs" "github.com/alist-org/alist/v3/internal/offline_download/tool" @@ -21,10 +22,13 @@ type TaskInfo struct { State tache.State `json:"state"` Status string `json:"status"` Progress float64 `json:"progress"` + StartTime *time.Time `json:"start_time"` + EndTime *time.Time `json:"end_time"` + TotalBytes int64 `json:"total_bytes"` Error string `json:"error"` } -func getTaskInfo[T task.TaskInfoWithCreator](task T) TaskInfo { +func getTaskInfo[T task.TaskExtensionInfo](task T) TaskInfo { errMsg := "" if task.GetErr() != nil { errMsg = task.GetErr().Error() @@ -48,11 +52,14 @@ func getTaskInfo[T task.TaskInfoWithCreator](task T) TaskInfo { State: task.GetState(), Status: task.GetStatus(), Progress: progress, + StartTime: task.GetStartTime(), + EndTime: task.GetEndTime(), + TotalBytes: task.GetTotalBytes(), Error: errMsg, } } -func getTaskInfos[T task.TaskInfoWithCreator](tasks []T) []TaskInfo { +func getTaskInfos[T task.TaskExtensionInfo](tasks []T) []TaskInfo { return utils.MustSliceConvert(tasks, getTaskInfo[T]) } @@ -68,7 +75,7 @@ func getUserInfo(c *gin.Context) (bool, uint, bool) { } } -func getTargetedHandler[T task.TaskInfoWithCreator](manager *tache.Manager[T], callback func(c *gin.Context, task T)) gin.HandlerFunc { +func getTargetedHandler[T task.TaskExtensionInfo](manager *tache.Manager[T], callback func(c *gin.Context, task T)) gin.HandlerFunc { return func(c *gin.Context) { isAdmin, uid, ok := getUserInfo(c) if !ok { @@ -90,7 +97,7 @@ func getTargetedHandler[T task.TaskInfoWithCreator](manager *tache.Manager[T], c } } -func getBatchHandler[T task.TaskInfoWithCreator](manager *tache.Manager[T], callback func(task T)) gin.HandlerFunc { +func getBatchHandler[T task.TaskExtensionInfo](manager *tache.Manager[T], callback func(task T)) gin.HandlerFunc { return func(c *gin.Context) { isAdmin, uid, ok := getUserInfo(c) if !ok { @@ -115,7 +122,7 @@ func getBatchHandler[T task.TaskInfoWithCreator](manager *tache.Manager[T], call } } -func taskRoute[T task.TaskInfoWithCreator](g *gin.RouterGroup, manager *tache.Manager[T]) { +func taskRoute[T task.TaskExtensionInfo](g *gin.RouterGroup, manager *tache.Manager[T]) { g.GET("/undone", func(c *gin.Context) { isAdmin, uid, ok := getUserInfo(c) if !ok {