mirror of https://github.com/cloudreve/Cloudreve
151 lines
4.0 KiB
Go
151 lines
4.0 KiB
Go
package node
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"strconv"
|
|
|
|
"github.com/cloudreve/Cloudreve/v4/application/dependency"
|
|
"github.com/cloudreve/Cloudreve/v4/ent/task"
|
|
"github.com/cloudreve/Cloudreve/v4/inventory/types"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/cluster/routes"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/workflows"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/serializer"
|
|
"github.com/gin-gonic/gin"
|
|
)
|
|
|
|
type (
|
|
CreateSlaveTaskParamCtx struct{}
|
|
)
|
|
|
|
func CreateTaskInSlave(s *cluster.CreateSlaveTask, c *gin.Context) (int, error) {
|
|
dep := dependency.FromContext(c)
|
|
registry := dep.TaskRegistry()
|
|
|
|
props, err := slaveTaskPropsFromContext(c)
|
|
if err != nil {
|
|
return 0, serializer.NewError(serializer.CodeParamErr, "failed to get master props from header", err)
|
|
}
|
|
|
|
var t queue.Task
|
|
switch s.Type {
|
|
case queue.SlaveUploadTaskType:
|
|
t = workflows.NewSlaveUploadTask(c, props, registry.NextID(), s.State)
|
|
case queue.SlaveCreateArchiveTaskType:
|
|
t = workflows.NewSlaveCreateArchiveTask(c, props, registry.NextID(), s.State)
|
|
case queue.SlaveExtractArchiveType:
|
|
t = workflows.NewSlaveExtractArchiveTask(c, props, registry.NextID(), s.State)
|
|
default:
|
|
return 0, serializer.NewError(serializer.CodeParamErr, "type not supported", nil)
|
|
}
|
|
|
|
if err := dep.SlaveQueue(c).QueueTask(c, t); err != nil {
|
|
return 0, serializer.NewError(serializer.CodeInternalSetting, "failed to queue task", err)
|
|
}
|
|
|
|
registry.Set(t.ID(), t)
|
|
return t.ID(), nil
|
|
}
|
|
|
|
type (
|
|
GetSlaveTaskParamCtx struct{}
|
|
GetSlaveTaskService struct {
|
|
ID int `uri:"id" binding:"required"`
|
|
}
|
|
)
|
|
|
|
func (s *GetSlaveTaskService) Get(c *gin.Context) (*cluster.SlaveTaskSummary, error) {
|
|
dep := dependency.FromContext(c)
|
|
registry := dep.TaskRegistry()
|
|
|
|
t, ok := registry.Get(s.ID)
|
|
if !ok {
|
|
return nil, serializer.NewError(serializer.CodeNotFound, "task not found", nil)
|
|
}
|
|
status := t.Status()
|
|
_, clearOnComplete := c.GetQuery(routes.SlaveClearTaskRegistryQuery)
|
|
if clearOnComplete && status == task.StatusCompleted ||
|
|
status == task.StatusError ||
|
|
status == task.StatusCanceled {
|
|
registry.Delete(s.ID)
|
|
}
|
|
|
|
res := &cluster.SlaveTaskSummary{
|
|
Status: status,
|
|
PrivateState: t.State(),
|
|
Progress: t.Progress(c),
|
|
}
|
|
err := t.Error()
|
|
if err != nil {
|
|
res.Error = err.Error()
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func slaveTaskPropsFromContext(ctx context.Context) (*types.SlaveTaskProps, error) {
|
|
nodeIdStr, ok := ctx.Value(cluster.SlaveNodeIDCtx{}).(string)
|
|
if !ok {
|
|
return nil, fmt.Errorf("failed to get node ID from context")
|
|
}
|
|
|
|
nodeId, err := strconv.Atoi(nodeIdStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to convert node ID to int: %w", err)
|
|
}
|
|
|
|
masterSiteUrl := cluster.MasterSiteUrlFromContext(ctx)
|
|
if masterSiteUrl == "" {
|
|
return nil, fmt.Errorf("failed to get master site URL from context")
|
|
}
|
|
|
|
masterSiteVersion, ok := ctx.Value(cluster.MasterSiteVersionCtx{}).(string)
|
|
if !ok {
|
|
return nil, fmt.Errorf("failed to get master site version from context")
|
|
}
|
|
|
|
masterSiteId, ok := ctx.Value(cluster.MasterSiteIDCtx{}).(string)
|
|
if !ok {
|
|
return nil, fmt.Errorf("failed to convert master site ID to int: %w", err)
|
|
}
|
|
|
|
props := &types.SlaveTaskProps{
|
|
NodeID: nodeId,
|
|
MasterSiteID: masterSiteId,
|
|
MasterSiteURl: masterSiteUrl,
|
|
MasterSiteVersion: masterSiteVersion,
|
|
}
|
|
|
|
return props, nil
|
|
}
|
|
|
|
type (
|
|
FolderCleanupParamCtx struct{}
|
|
)
|
|
|
|
func Cleanup(args *cluster.FolderCleanup, c *gin.Context) error {
|
|
l := logging.FromContext(c)
|
|
ae := serializer.NewAggregateError()
|
|
for _, p := range args.Path {
|
|
l.Info("Cleaning up folder %q", p)
|
|
if err := os.RemoveAll(p); err != nil {
|
|
l.Warning("Failed to clean up folder %q: %s", p, err)
|
|
ae.Add(p, err)
|
|
}
|
|
}
|
|
|
|
return ae.Aggregate()
|
|
}
|
|
|
|
type (
|
|
CreateSlaveDownloadTaskParamCtx struct{}
|
|
GetSlaveDownloadTaskParamCtx struct{}
|
|
CancelSlaveDownloadTaskParamCtx struct{}
|
|
SelectSlaveDownloadFilesParamCtx struct{}
|
|
TestSlaveDownloadParamCtx struct{}
|
|
)
|