Cloudreve/pkg/filemanager/workflows/worfklows.go

63 lines
1.9 KiB
Go

package workflows
import (
"context"
"fmt"
"path"
"strconv"
"time"
"github.com/cloudreve/Cloudreve/v4/application/dependency"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
"github.com/cloudreve/Cloudreve/v4/pkg/util"
)
const (
TaskTempPath = "fm_workflows"
slaveProgressRefreshInterval = 5 * time.Second
)
type NodeState struct {
NodeID int `json:"node_id"`
progress queue.Progresses
}
// allocateNode allocates a node for the task.
func allocateNode(ctx context.Context, dep dependency.Dep, state *NodeState, capability types.NodeCapability) (cluster.Node, error) {
np, err := dep.NodePool(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get node pool: %w", err)
}
node, err := np.Get(ctx, capability, state.NodeID)
if err != nil {
return nil, fmt.Errorf("failed to get node: %w", err)
}
state.NodeID = node.ID()
return node, nil
}
// prepareSlaveTaskCtx prepares the context for the slave task.
func prepareSlaveTaskCtx(ctx context.Context, props *types.SlaveTaskProps) context.Context {
ctx = context.WithValue(ctx, cluster.SlaveNodeIDCtx{}, strconv.Itoa(props.NodeID))
ctx = context.WithValue(ctx, cluster.MasterSiteUrlCtx{}, props.MasterSiteURl)
ctx = context.WithValue(ctx, cluster.MasterSiteVersionCtx{}, props.MasterSiteVersion)
ctx = context.WithValue(ctx, cluster.MasterSiteIDCtx{}, props.MasterSiteID)
return ctx
}
func prepareTempFolder(ctx context.Context, dep dependency.Dep, t queue.Task) (string, error) {
settings := dep.SettingProvider()
tempPath := util.DataPath(path.Join(settings.TempPath(ctx), TaskTempPath, strconv.Itoa(t.ID())))
if err := util.CreatNestedFolder(tempPath); err != nil {
return "", fmt.Errorf("failed to create temp folder: %w", err)
}
dep.Logger().Info("Temp folder created: %s", tempPath)
return tempPath, nil
}