Cloudreve/service/admin/task.go

283 lines
7.6 KiB
Go

package admin
import (
"context"
"strconv"
"time"
"github.com/cloudreve/Cloudreve/v4/application/dependency"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/ent/task"
"github.com/cloudreve/Cloudreve/v4/inventory"
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
"github.com/cloudreve/Cloudreve/v4/pkg/serializer"
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
"github.com/gin-gonic/gin"
"github.com/gofrs/uuid"
"github.com/samber/lo"
)
func GetQueueMetrics(c *gin.Context) ([]QueueMetric, error) {
res := []QueueMetric{}
dep := dependency.FromContext(c)
mediaMeta := dep.MediaMetaQueue(c)
entityRecycle := dep.EntityRecycleQueue(c)
ioIntense := dep.IoIntenseQueue(c)
remoteDownload := dep.RemoteDownloadQueue(c)
thumb := dep.ThumbQueue(c)
res = append(res, QueueMetric{
Name: setting.QueueTypeMediaMeta,
BusyWorkers: mediaMeta.BusyWorkers(),
SuccessTasks: mediaMeta.SuccessTasks(),
FailureTasks: mediaMeta.FailureTasks(),
SubmittedTasks: mediaMeta.SubmittedTasks(),
SuspendingTasks: mediaMeta.SuspendingTasks(),
})
res = append(res, QueueMetric{
Name: setting.QueueTypeEntityRecycle,
BusyWorkers: entityRecycle.BusyWorkers(),
SuccessTasks: entityRecycle.SuccessTasks(),
FailureTasks: entityRecycle.FailureTasks(),
SubmittedTasks: entityRecycle.SubmittedTasks(),
SuspendingTasks: entityRecycle.SuspendingTasks(),
})
res = append(res, QueueMetric{
Name: setting.QueueTypeIOIntense,
BusyWorkers: ioIntense.BusyWorkers(),
SuccessTasks: ioIntense.SuccessTasks(),
FailureTasks: ioIntense.FailureTasks(),
SubmittedTasks: ioIntense.SubmittedTasks(),
SuspendingTasks: ioIntense.SuspendingTasks(),
})
res = append(res, QueueMetric{
Name: setting.QueueTypeRemoteDownload,
BusyWorkers: remoteDownload.BusyWorkers(),
SuccessTasks: remoteDownload.SuccessTasks(),
FailureTasks: remoteDownload.FailureTasks(),
SubmittedTasks: remoteDownload.SubmittedTasks(),
SuspendingTasks: remoteDownload.SuspendingTasks(),
})
res = append(res, QueueMetric{
Name: setting.QueueTypeThumb,
BusyWorkers: thumb.BusyWorkers(),
SuccessTasks: thumb.SuccessTasks(),
FailureTasks: thumb.FailureTasks(),
SubmittedTasks: thumb.SubmittedTasks(),
SuspendingTasks: thumb.SuspendingTasks(),
})
return res, nil
}
const (
taskTypeCondition = "task_type"
taskStatusCondition = "task_status"
taskCorrelationIDCondition = "task_correlation_id"
taskUserIDCondition = "task_user_id"
)
func (s *AdminListService) Tasks(c *gin.Context) (*ListTaskResponse, error) {
dep := dependency.FromContext(c)
taskClient := dep.TaskClient()
hasher := dep.HashIDEncoder()
var (
err error
userID int
correlationID *uuid.UUID
status []task.Status
taskType []string
)
if s.Conditions[taskTypeCondition] != "" {
taskType = []string{s.Conditions[taskTypeCondition]}
}
if s.Conditions[taskStatusCondition] != "" {
status = []task.Status{task.Status(s.Conditions[taskStatusCondition])}
}
if s.Conditions[taskCorrelationIDCondition] != "" {
cid, err := uuid.FromString(s.Conditions[taskCorrelationIDCondition])
if err != nil {
return nil, serializer.NewError(serializer.CodeParamErr, "Invalid task correlation ID", err)
}
correlationID = &cid
}
if s.Conditions[taskUserIDCondition] != "" {
userID, err = strconv.Atoi(s.Conditions[taskUserIDCondition])
if err != nil {
return nil, serializer.NewError(serializer.CodeParamErr, "Invalid task user ID", err)
}
}
ctx := context.WithValue(c, inventory.LoadTaskUser{}, true)
res, err := taskClient.List(ctx, &inventory.ListTaskArgs{
PaginationArgs: &inventory.PaginationArgs{
Page: s.Page - 1,
PageSize: s.PageSize,
OrderBy: s.OrderBy,
Order: inventory.OrderDirection(s.OrderDirection),
},
UserID: userID,
CorrelationID: correlationID,
Types: taskType,
Status: status,
})
if err != nil {
return nil, serializer.NewError(serializer.CodeDBError, "Failed to list tasks", err)
}
tasks := make([]queue.Task, 0, len(res.Tasks))
nodeMap := make(map[int]*ent.Node)
for _, t := range res.Tasks {
task, err := queue.NewTaskFromModel(t)
if err != nil {
return nil, serializer.NewError(serializer.CodeDBError, "Failed to parse task", err)
}
summary := task.Summarize(hasher)
if summary != nil && summary.NodeID > 0 {
if _, ok := nodeMap[summary.NodeID]; !ok {
nodeMap[summary.NodeID] = nil
}
}
tasks = append(tasks, task)
}
// Get nodes
nodes, err := dep.NodeClient().GetNodeByIds(c, lo.Keys(nodeMap))
if err != nil {
return nil, serializer.NewError(serializer.CodeDBError, "Failed to query nodes", err)
}
for _, n := range nodes {
nodeMap[n.ID] = n
}
return &ListTaskResponse{
Pagination: res.PaginationResults,
Tasks: lo.Map(res.Tasks, func(task *ent.Task, i int) GetTaskResponse {
var (
uid string
node *ent.Node
summary *queue.Summary
)
if task.Edges.User != nil {
uid = hashid.EncodeUserID(hasher, task.Edges.User.ID)
}
t := tasks[i]
summary = t.Summarize(hasher)
if summary != nil && summary.NodeID > 0 {
node = nodeMap[summary.NodeID]
}
return GetTaskResponse{
Task: task,
TaskHashID: hashid.EncodeTaskID(hasher, task.ID),
UserHashID: uid,
Node: node,
Summary: summary,
}
}),
}, nil
}
type (
SingleTaskService struct {
ID int `uri:"id" json:"id" binding:"required"`
}
SingleTaskParamCtx struct{}
)
func (s *SingleTaskService) Get(c *gin.Context) (*GetTaskResponse, error) {
dep := dependency.FromContext(c)
taskClient := dep.TaskClient()
hasher := dep.HashIDEncoder()
ctx := context.WithValue(c, inventory.LoadTaskUser{}, true)
task, err := taskClient.GetTaskByID(ctx, s.ID)
if err != nil {
return nil, serializer.NewError(serializer.CodeDBError, "Failed to get task", err)
}
t, err := queue.NewTaskFromModel(task)
if err != nil {
return nil, serializer.NewError(serializer.CodeDBError, "Failed to parse task", err)
}
summary := t.Summarize(hasher)
var (
node *ent.Node
userHashID string
)
if summary != nil && summary.NodeID > 0 {
node, _ = dep.NodeClient().GetNodeById(c, summary.NodeID)
}
if task.Edges.User != nil {
userHashID = hashid.EncodeUserID(hasher, task.Edges.User.ID)
}
return &GetTaskResponse{
Task: task,
Summary: summary,
Node: node,
UserHashID: userHashID,
TaskHashID: hashid.EncodeTaskID(hasher, task.ID),
}, nil
}
type (
BatchTaskService struct {
IDs []int `json:"ids" binding:"required"`
}
BatchTaskParamCtx struct{}
)
func (s *BatchTaskService) Delete(c *gin.Context) error {
dep := dependency.FromContext(c)
taskClient := dep.TaskClient()
err := taskClient.DeleteByIDs(c, s.IDs...)
if err != nil {
return serializer.NewError(serializer.CodeDBError, "Failed to delete tasks", err)
}
return nil
}
type (
CleanupTaskService struct {
NotAfter time.Time `json:"not_after" binding:"required"`
Types []string `json:"types"`
Status []task.Status `json:"status"`
}
CleanupTaskParameterCtx struct{}
)
func (s *CleanupTaskService) CleanupTask(c *gin.Context) error {
dep := dependency.FromContext(c)
taskClient := dep.TaskClient()
if len(s.Status) == 0 {
s.Status = []task.Status{task.StatusCanceled, task.StatusCompleted, task.StatusError}
}
if err := taskClient.DeleteBy(c, &inventory.DeleteTaskArgs{
NotAfter: s.NotAfter,
Types: s.Types,
Status: s.Status,
}); err != nil {
return serializer.NewError(serializer.CodeDBError, "Failed to cleanup tasks", err)
}
return nil
}