feat(task): add speed monitor (#7655)

pull/7673/head
KirCute_ECT 2024-12-25 21:09:54 +08:00 committed by GitHub
parent db99224126
commit d7aa1608ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 116 additions and 44 deletions

View File

@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
stdpath "path"
"time"
"github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/driver"
@ -18,7 +19,7 @@ import (
)
type CopyTask struct {
task.TaskWithCreator
task.TaskExtension
Status string `json:"-"` //don't save status to save space
SrcObjPath string `json:"src_path"`
DstDirPath string `json:"dst_path"`
@ -37,6 +38,9 @@ func (t *CopyTask) GetStatus() string {
}
func (t *CopyTask) Run() error {
t.ClearEndTime()
t.SetStartTime(time.Now())
defer func() { t.SetEndTime(time.Now()) }()
var err error
if t.srcStorage == nil {
t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
@ -54,7 +58,7 @@ var CopyTaskManager *tache.Manager[*CopyTask]
// Copy if in the same storage, call move method
// if not, add copy task
func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskInfoWithCreator, error) {
func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) {
srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(srcObjPath)
if err != nil {
return nil, errors.WithMessage(err, "failed get src storage")
@ -93,9 +97,9 @@ func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool
}
}
// not in the same storage
taskCreator, _ := ctx.Value("user").(*model.User) // taskCreator is nil when convert failed
taskCreator, _ := ctx.Value("user").(*model.User)
t := &CopyTask{
TaskWithCreator: task.TaskWithCreator{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
},
srcStorage: srcStorage,
@ -128,8 +132,8 @@ func copyBetween2Storages(t *CopyTask, srcStorage, dstStorage driver.Driver, src
srcObjPath := stdpath.Join(srcObjPath, obj.GetName())
dstObjPath := stdpath.Join(dstDirPath, srcObj.GetName())
CopyTaskManager.Add(&CopyTask{
TaskWithCreator: task.TaskWithCreator{
Creator: t.Creator,
TaskExtension: task.TaskExtension{
Creator: t.GetCreator(),
},
srcStorage: srcStorage,
dstStorage: dstStorage,
@ -150,6 +154,7 @@ func copyFileBetween2Storages(tsk *CopyTask, srcStorage, dstStorage driver.Drive
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", srcFilePath)
}
tsk.SetTotalBytes(srcFile.GetSize())
link, _, err := op.Link(tsk.Ctx(), srcStorage, srcFilePath, model.LinkArgs{
Header: http.Header{},
})

View File

@ -69,7 +69,7 @@ func Move(ctx context.Context, srcPath, dstDirPath string, lazyCache ...bool) er
return err
}
func Copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskInfoWithCreator, error) {
func Copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) {
res, err := _copy(ctx, srcObjPath, dstDirPath, lazyCache...)
if err != nil {
log.Errorf("failed copy %s to %s: %+v", srcObjPath, dstDirPath, err)
@ -101,7 +101,7 @@ func PutDirectly(ctx context.Context, dstDirPath string, file model.FileStreamer
return err
}
func PutAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer) (task.TaskInfoWithCreator, error) {
func PutAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer) (task.TaskExtensionInfo, error) {
t, err := putAsTask(ctx, dstDirPath, file)
if err != nil {
log.Errorf("failed put %s: %+v", dstDirPath, err)

View File

@ -10,10 +10,11 @@ import (
"github.com/alist-org/alist/v3/internal/task"
"github.com/pkg/errors"
"github.com/xhofe/tache"
"time"
)
type UploadTask struct {
task.TaskWithCreator
task.TaskExtension
storage driver.Driver
dstDirActualPath string
file model.FileStreamer
@ -28,13 +29,16 @@ func (t *UploadTask) GetStatus() string {
}
func (t *UploadTask) Run() error {
t.ClearEndTime()
t.SetStartTime(time.Now())
defer func() { t.SetEndTime(time.Now()) }()
return op.Put(t.Ctx(), t.storage, t.dstDirActualPath, t.file, t.SetProgress, true)
}
var UploadTaskManager *tache.Manager[*UploadTask]
// putAsTask add as a put task and return immediately
func putAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer) (task.TaskInfoWithCreator, error) {
func putAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer) (task.TaskExtensionInfo, error) {
storage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)
if err != nil {
return nil, errors.WithMessage(err, "failed get storage")
@ -52,13 +56,14 @@ func putAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer)
}
taskCreator, _ := ctx.Value("user").(*model.User) // taskCreator is nil when convert failed
t := &UploadTask{
TaskWithCreator: task.TaskWithCreator{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
},
storage: storage,
dstDirActualPath: dstDirActualPath,
file: file,
}
t.SetTotalBytes(file.GetSize())
UploadTaskManager.Add(t)
return t, nil
}

View File

@ -107,6 +107,7 @@ func (p *Cloud115) Status(task *tool.DownloadTask) (*tool.Status, error) {
s.Progress = t.Percent
s.Status = t.GetStatus()
s.Completed = t.IsDone()
s.TotalBytes = t.Size
if t.IsFailed() {
s.Err = fmt.Errorf(t.GetStatus())
}

View File

@ -82,7 +82,7 @@ func (a *Aria2) Status(task *tool.DownloadTask) (*tool.Status, error) {
if err != nil {
return nil, err
}
total, err := strconv.ParseUint(info.TotalLength, 10, 64)
total, err := strconv.ParseInt(info.TotalLength, 10, 64)
if err != nil {
total = 0
}
@ -91,8 +91,9 @@ func (a *Aria2) Status(task *tool.DownloadTask) (*tool.Status, error) {
downloaded = 0
}
s := &tool.Status{
Completed: info.Status == "complete",
Err: err,
Completed: info.Status == "complete",
Err: err,
TotalBytes: total,
}
s.Progress = float64(downloaded) / float64(total) * 100
if len(info.FollowedBy) != 0 {

View File

@ -83,6 +83,7 @@ func (s SimpleHttp) Run(task *tool.DownloadTask) error {
}
defer file.Close()
fileSize := resp.ContentLength
task.SetTotalBytes(fileSize)
err = utils.CopyWithCtx(task.Ctx(), file, resp.Body, fileSize, task.SetProgress)
return err
}

View File

@ -3,6 +3,7 @@ package pikpak
import (
"context"
"fmt"
"strconv"
"github.com/alist-org/alist/v3/drivers/pikpak"
"github.com/alist-org/alist/v3/internal/errs"
@ -105,6 +106,10 @@ func (p *PikPak) Status(task *tool.DownloadTask) (*tool.Status, error) {
s.Progress = float64(t.Progress)
s.Status = t.Message
s.Completed = (t.Phase == "PHASE_TYPE_COMPLETE")
s.TotalBytes, err = strconv.ParseInt(t.FileSize, 10, 64)
if err != nil {
s.TotalBytes = 0
}
if t.Phase == "PHASE_TYPE_ERROR" {
s.Err = fmt.Errorf(t.Message)
}

View File

@ -64,6 +64,7 @@ func (a *QBittorrent) Status(task *tool.DownloadTask) (*tool.Status, error) {
return nil, err
}
s := &tool.Status{}
s.TotalBytes = info.Size
s.Progress = float64(info.Completed) / float64(info.Size) * 100
switch info.State {
case qbittorrent.UPLOADING, qbittorrent.PAUSEDUP, qbittorrent.QUEUEDUP, qbittorrent.STALLEDUP, qbittorrent.FORCEDUP, qbittorrent.CHECKINGUP:

View File

@ -29,7 +29,7 @@ type AddURLArgs struct {
DeletePolicy DeletePolicy
}
func AddURL(ctx context.Context, args *AddURLArgs) (task.TaskInfoWithCreator, error) {
func AddURL(ctx context.Context, args *AddURLArgs) (task.TaskExtensionInfo, error) {
// get tool
tool, err := Tools.Get(args.Tool)
if err != nil {
@ -81,7 +81,7 @@ func AddURL(ctx context.Context, args *AddURLArgs) (task.TaskInfoWithCreator, er
taskCreator, _ := ctx.Value("user").(*model.User) // taskCreator is nil when convert failed
t := &DownloadTask{
TaskWithCreator: task.TaskWithCreator{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
},
Url: args.URL,

View File

@ -16,11 +16,12 @@ type AddUrlArgs struct {
}
type Status struct {
Progress float64
NewGID string
Completed bool
Status string
Err error
TotalBytes int64
Progress float64
NewGID string
Completed bool
Status string
Err error
}
type Tool interface {

View File

@ -14,7 +14,7 @@ import (
)
type DownloadTask struct {
task.TaskWithCreator
task.TaskExtension
Url string `json:"url"`
DstDirPath string `json:"dst_dir_path"`
TempDir string `json:"temp_dir"`
@ -28,6 +28,9 @@ type DownloadTask struct {
}
func (t *DownloadTask) Run() error {
t.ClearEndTime()
t.SetStartTime(time.Now())
defer func() { t.SetEndTime(time.Now()) }()
if t.tool == nil {
tool, err := Tools.Get(t.Toolname)
if err != nil {
@ -131,6 +134,7 @@ func (t *DownloadTask) Update() (bool, error) {
}
t.callStatusRetried = 0
t.SetProgress(info.Progress)
t.SetTotalBytes(info.TotalBytes)
t.Status = fmt.Sprintf("[%s]: %s", t.tool.Name(), info.Status)
if info.NewGID != "" {
log.Debugf("followen by: %+v", info.NewGID)
@ -171,16 +175,18 @@ func (t *DownloadTask) Complete() error {
// upload files
for i := range files {
file := files[i]
TransferTaskManager.Add(&TransferTask{
TaskWithCreator: task.TaskWithCreator{
Creator: t.Creator,
tsk := &TransferTask{
TaskExtension: task.TaskExtension{
Creator: t.GetCreator(),
},
file: file,
DstDirPath: t.DstDirPath,
TempDir: t.TempDir,
DeletePolicy: t.DeletePolicy,
FileDir: file.Path,
})
}
tsk.SetTotalBytes(file.Size)
TransferTaskManager.Add(tsk)
}
return nil
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
"time"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
@ -16,7 +17,7 @@ import (
)
type TransferTask struct {
task.TaskWithCreator
task.TaskExtension
FileDir string `json:"file_dir"`
DstDirPath string `json:"dst_dir_path"`
TempDir string `json:"temp_dir"`
@ -25,6 +26,9 @@ type TransferTask struct {
}
func (t *TransferTask) Run() error {
t.ClearEndTime()
t.SetStartTime(time.Now())
defer func() { t.SetEndTime(time.Now()) }()
// check dstDir again
var err error
if (t.file == File{}) {

View File

@ -150,6 +150,7 @@ func (t *Transmission) Status(task *tool.DownloadTask) (*tool.Status, error) {
Err: err,
}
s.Progress = *info.PercentDone * 100
s.TotalBytes = int64(*info.SizeWhenDone / 8)
switch *info.Status {
case transmissionrpc.TorrentStatusCheckWait,

View File

@ -3,24 +3,58 @@ package task
import (
"github.com/alist-org/alist/v3/internal/model"
"github.com/xhofe/tache"
"time"
)
type TaskWithCreator struct {
type TaskExtension struct {
tache.Base
Creator *model.User
Creator *model.User
startTime *time.Time
endTime *time.Time
totalBytes int64
}
func (t *TaskWithCreator) SetCreator(creator *model.User) {
func (t *TaskExtension) SetCreator(creator *model.User) {
t.Creator = creator
t.Persist()
}
func (t *TaskWithCreator) GetCreator() *model.User {
func (t *TaskExtension) GetCreator() *model.User {
return t.Creator
}
type TaskInfoWithCreator interface {
tache.TaskWithInfo
SetCreator(creator *model.User)
GetCreator() *model.User
func (t *TaskExtension) SetStartTime(startTime time.Time) {
t.startTime = &startTime
}
func (t *TaskExtension) GetStartTime() *time.Time {
return t.startTime
}
func (t *TaskExtension) SetEndTime(endTime time.Time) {
t.endTime = &endTime
}
func (t *TaskExtension) GetEndTime() *time.Time {
return t.endTime
}
func (t *TaskExtension) ClearEndTime() {
t.endTime = nil
}
func (t *TaskExtension) SetTotalBytes(totalBytes int64) {
t.totalBytes = totalBytes
}
func (t *TaskExtension) GetTotalBytes() int64 {
return t.totalBytes
}
type TaskExtensionInfo interface {
tache.TaskWithInfo
GetCreator() *model.User
GetStartTime() *time.Time
GetEndTime() *time.Time
GetTotalBytes() int64
}

View File

@ -121,7 +121,7 @@ func FsCopy(c *gin.Context) {
common.ErrorResp(c, err, 403)
return
}
var addedTasks []task.TaskInfoWithCreator
var addedTasks []task.TaskExtensionInfo
for i, name := range req.Names {
t, err := fs.Copy(c, stdpath.Join(srcDir, name), dstDir, len(req.Names) > i+1)
if t != nil {

View File

@ -57,7 +57,7 @@ func FsStream(c *gin.Context) {
Mimetype: c.GetHeader("Content-Type"),
WebPutAsTask: asTask,
}
var t task.TaskInfoWithCreator
var t task.TaskExtensionInfo
if asTask {
t, err = fs.PutAsTask(c, dir, s)
} else {
@ -122,7 +122,7 @@ func FsForm(c *gin.Context) {
Mimetype: file.Header.Get("Content-Type"),
WebPutAsTask: asTask,
}
var t task.TaskInfoWithCreator
var t task.TaskExtensionInfo
if asTask {
s.Reader = struct {
io.Reader

View File

@ -133,7 +133,7 @@ func AddOfflineDownload(c *gin.Context) {
common.ErrorResp(c, err, 403)
return
}
var tasks []task.TaskInfoWithCreator
var tasks []task.TaskExtensionInfo
for _, url := range req.Urls {
t, err := tool.AddURL(c, &tool.AddURLArgs{
URL: url,

View File

@ -4,6 +4,7 @@ import (
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/task"
"math"
"time"
"github.com/alist-org/alist/v3/internal/fs"
"github.com/alist-org/alist/v3/internal/offline_download/tool"
@ -21,10 +22,13 @@ type TaskInfo struct {
State tache.State `json:"state"`
Status string `json:"status"`
Progress float64 `json:"progress"`
StartTime *time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time"`
TotalBytes int64 `json:"total_bytes"`
Error string `json:"error"`
}
func getTaskInfo[T task.TaskInfoWithCreator](task T) TaskInfo {
func getTaskInfo[T task.TaskExtensionInfo](task T) TaskInfo {
errMsg := ""
if task.GetErr() != nil {
errMsg = task.GetErr().Error()
@ -48,11 +52,14 @@ func getTaskInfo[T task.TaskInfoWithCreator](task T) TaskInfo {
State: task.GetState(),
Status: task.GetStatus(),
Progress: progress,
StartTime: task.GetStartTime(),
EndTime: task.GetEndTime(),
TotalBytes: task.GetTotalBytes(),
Error: errMsg,
}
}
func getTaskInfos[T task.TaskInfoWithCreator](tasks []T) []TaskInfo {
func getTaskInfos[T task.TaskExtensionInfo](tasks []T) []TaskInfo {
return utils.MustSliceConvert(tasks, getTaskInfo[T])
}
@ -68,7 +75,7 @@ func getUserInfo(c *gin.Context) (bool, uint, bool) {
}
}
func getTargetedHandler[T task.TaskInfoWithCreator](manager *tache.Manager[T], callback func(c *gin.Context, task T)) gin.HandlerFunc {
func getTargetedHandler[T task.TaskExtensionInfo](manager *tache.Manager[T], callback func(c *gin.Context, task T)) gin.HandlerFunc {
return func(c *gin.Context) {
isAdmin, uid, ok := getUserInfo(c)
if !ok {
@ -90,7 +97,7 @@ func getTargetedHandler[T task.TaskInfoWithCreator](manager *tache.Manager[T], c
}
}
func getBatchHandler[T task.TaskInfoWithCreator](manager *tache.Manager[T], callback func(task T)) gin.HandlerFunc {
func getBatchHandler[T task.TaskExtensionInfo](manager *tache.Manager[T], callback func(task T)) gin.HandlerFunc {
return func(c *gin.Context) {
isAdmin, uid, ok := getUserInfo(c)
if !ok {
@ -115,7 +122,7 @@ func getBatchHandler[T task.TaskInfoWithCreator](manager *tache.Manager[T], call
}
}
func taskRoute[T task.TaskInfoWithCreator](g *gin.RouterGroup, manager *tache.Manager[T]) {
func taskRoute[T task.TaskExtensionInfo](g *gin.RouterGroup, manager *tache.Manager[T]) {
g.GET("/undone", func(c *gin.Context) {
isAdmin, uid, ok := getUserInfo(c)
if !ok {