mirror of https://github.com/cloudreve/Cloudreve
63 lines
1.9 KiB
Go
63 lines
1.9 KiB
Go
package workflows
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"path"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/cloudreve/Cloudreve/v4/application/dependency"
|
|
"github.com/cloudreve/Cloudreve/v4/inventory/types"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/util"
|
|
)
|
|
|
|
const (
|
|
TaskTempPath = "fm_workflows"
|
|
slaveProgressRefreshInterval = 5 * time.Second
|
|
)
|
|
|
|
type NodeState struct {
|
|
NodeID int `json:"node_id"`
|
|
|
|
progress queue.Progresses
|
|
}
|
|
|
|
// allocateNode allocates a node for the task.
|
|
func allocateNode(ctx context.Context, dep dependency.Dep, state *NodeState, capability types.NodeCapability) (cluster.Node, error) {
|
|
np, err := dep.NodePool(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get node pool: %w", err)
|
|
}
|
|
|
|
node, err := np.Get(ctx, capability, state.NodeID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get node: %w", err)
|
|
}
|
|
|
|
state.NodeID = node.ID()
|
|
return node, nil
|
|
}
|
|
|
|
// prepareSlaveTaskCtx prepares the context for the slave task.
|
|
func prepareSlaveTaskCtx(ctx context.Context, props *types.SlaveTaskProps) context.Context {
|
|
ctx = context.WithValue(ctx, cluster.SlaveNodeIDCtx{}, strconv.Itoa(props.NodeID))
|
|
ctx = context.WithValue(ctx, cluster.MasterSiteUrlCtx{}, props.MasterSiteURl)
|
|
ctx = context.WithValue(ctx, cluster.MasterSiteVersionCtx{}, props.MasterSiteVersion)
|
|
ctx = context.WithValue(ctx, cluster.MasterSiteIDCtx{}, props.MasterSiteID)
|
|
return ctx
|
|
}
|
|
|
|
func prepareTempFolder(ctx context.Context, dep dependency.Dep, t queue.Task) (string, error) {
|
|
settings := dep.SettingProvider()
|
|
tempPath := util.DataPath(path.Join(settings.TempPath(ctx), TaskTempPath, strconv.Itoa(t.ID())))
|
|
if err := util.CreatNestedFolder(tempPath); err != nil {
|
|
return "", fmt.Errorf("failed to create temp folder: %w", err)
|
|
}
|
|
|
|
dep.Logger().Info("Temp folder created: %s", tempPath)
|
|
return tempPath, nil
|
|
}
|