Cloudreve/pkg/cluster/pool.go

204 lines
5.7 KiB
Go

package cluster
import (
"context"
"fmt"
"sync"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/ent/node"
"github.com/cloudreve/Cloudreve/v4/inventory"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/conf"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
"github.com/samber/lo"
)
type NodePool interface {
// Upsert updates or inserts a node into the pool.
Upsert(ctx context.Context, node *ent.Node)
// Get returns a node with the given capability and preferred node id. `allowed` is a list of allowed node ids.
// If `allowed` is empty, all nodes with the capability are considered.
Get(ctx context.Context, capability types.NodeCapability, preferred int) (Node, error)
}
type (
weightedNodePool struct {
lock sync.RWMutex
conf conf.ConfigProvider
settings setting.Provider
nodes map[types.NodeCapability][]*nodeItem
}
nodeItem struct {
node Node
weight int
current int
}
)
var (
ErrNoAvailableNode = fmt.Errorf("no available node found")
supportedCapabilities = []types.NodeCapability{
types.NodeCapabilityNone,
types.NodeCapabilityCreateArchive,
types.NodeCapabilityExtractArchive,
types.NodeCapabilityRemoteDownload,
}
)
func NewNodePool(ctx context.Context, l logging.Logger, config conf.ConfigProvider, settings setting.Provider,
client inventory.NodeClient) (NodePool, error) {
nodes, err := client.ListActiveNodes(ctx, nil)
if err != nil {
return nil, fmt.Errorf("failed to list active nodes: %w", err)
}
pool := &weightedNodePool{
nodes: make(map[types.NodeCapability][]*nodeItem),
conf: config,
settings: settings,
}
for _, node := range nodes {
for _, capability := range supportedCapabilities {
// If current capability is enabled, add it to pool slot.
if capability == types.NodeCapabilityNone ||
(node.Capabilities != nil && node.Capabilities.Enabled(int(capability))) {
if _, ok := pool.nodes[capability]; !ok {
pool.nodes[capability] = make([]*nodeItem, 0)
}
l.Debug("Add node %q to capability slot %d with weight %d", node.Name, capability, node.Weight)
pool.nodes[capability] = append(pool.nodes[capability], &nodeItem{
node: newNode(ctx, node, config, settings),
weight: node.Weight,
current: 0,
})
}
}
}
return pool, nil
}
func (p *weightedNodePool) Get(ctx context.Context, capability types.NodeCapability, preferred int) (Node, error) {
l := logging.FromContext(ctx)
p.lock.RLock()
defer p.lock.RUnlock()
nodes, ok := p.nodes[capability]
if !ok || len(nodes) == 0 {
return nil, fmt.Errorf("no node found with capability %d: %w", capability, ErrNoAvailableNode)
}
var selected *nodeItem
if preferred > 0 {
// First try to find the preferred node.
for _, n := range nodes {
if n.node.ID() == preferred {
selected = n
break
}
}
if selected == nil {
l.Debug("Preferred node %d not found, fallback to select a node with the least current weight", preferred)
}
}
if selected == nil {
// If no preferred one, or the preferred one is not available, select a node with the least current weight.
// Total weight of all items.
var total int
// Loop through the list of items and add the item's weight to the current weight.
// Also increment the total weight counter.
var maxNode *nodeItem
for _, item := range nodes {
item.current += max(1, item.weight)
total += max(1, item.weight)
// Select the item with max weight.
if maxNode == nil || item.current > maxNode.current {
maxNode = item
}
}
// Select the item with the max weight.
selected = maxNode
if selected == nil {
return nil, fmt.Errorf("no node found with capability %d: %w", capability, ErrNoAvailableNode)
}
l.Debug("Selected node %q with weight=%d, current=%d, total=%d", selected.node.Name(), selected.weight, maxNode.current, total)
// Reduce the current weight of the selected item by the total weight.
maxNode.current -= total
}
return selected.node, nil
}
func (p *weightedNodePool) Upsert(ctx context.Context, n *ent.Node) {
p.lock.Lock()
defer p.lock.Unlock()
for _, capability := range supportedCapabilities {
_, index, found := lo.FindIndexOf(p.nodes[capability], func(i *nodeItem) bool {
return i.node.ID() == n.ID
})
if capability == types.NodeCapabilityNone ||
(n.Capabilities != nil && n.Capabilities.Enabled(int(capability))) {
if n.Status != node.StatusActive && found {
// Remove inactive node
p.nodes[capability] = append(p.nodes[capability][:index], p.nodes[capability][index+1:]...)
continue
}
if found {
p.nodes[capability][index].node = newNode(ctx, n, p.conf, p.settings)
} else {
p.nodes[capability] = append(p.nodes[capability], &nodeItem{
node: newNode(ctx, n, p.conf, p.settings),
weight: n.Weight,
current: 0,
})
}
} else if found {
// Capability changed, remove the old node.
p.nodes[capability] = append(p.nodes[capability][:index], p.nodes[capability][index+1:]...)
}
}
}
type slaveDummyNodePool struct {
conf conf.ConfigProvider
settings setting.Provider
masterNode Node
}
func NewSlaveDummyNodePool(ctx context.Context, config conf.ConfigProvider, settings setting.Provider) NodePool {
return &slaveDummyNodePool{
conf: config,
settings: settings,
masterNode: newNode(ctx, &ent.Node{
ID: 0,
Name: "Master",
Type: node.TypeMaster,
}, config, settings),
}
}
func (s *slaveDummyNodePool) Upsert(ctx context.Context, node *ent.Node) {
}
func (s *slaveDummyNodePool) Get(ctx context.Context, capability types.NodeCapability, preferred int) (Node, error) {
return s.masterNode, nil
}