Cloudreve/service/admin/node.go

257 lines
7.2 KiB
Go

package admin
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/url"
"strings"
"time"
"github.com/cloudreve/Cloudreve/v4/application/dependency"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/ent/node"
"github.com/cloudreve/Cloudreve/v4/inventory"
"github.com/cloudreve/Cloudreve/v4/pkg/auth"
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
"github.com/cloudreve/Cloudreve/v4/pkg/cluster/routes"
"github.com/cloudreve/Cloudreve/v4/pkg/downloader"
"github.com/cloudreve/Cloudreve/v4/pkg/downloader/slave"
"github.com/cloudreve/Cloudreve/v4/pkg/request"
"github.com/cloudreve/Cloudreve/v4/pkg/serializer"
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
"github.com/gin-gonic/gin"
"github.com/samber/lo"
)
const (
nodeStatusCondition = "node_status"
)
func (service *AdminListService) Nodes(c *gin.Context) (*ListNodeResponse, error) {
dep := dependency.FromContext(c)
nodeClient := dep.NodeClient()
ctx := context.WithValue(c, inventory.LoadNodeStoragePolicy{}, true)
res, err := nodeClient.ListNodes(ctx, &inventory.ListNodeParameters{
PaginationArgs: &inventory.PaginationArgs{
Page: service.Page - 1,
PageSize: service.PageSize,
OrderBy: service.OrderBy,
Order: inventory.OrderDirection(service.OrderDirection),
},
Status: node.Status(service.Conditions[nodeStatusCondition]),
})
if err != nil {
return nil, serializer.NewError(serializer.CodeDBError, "Failed to list nodes", err)
}
return &ListNodeResponse{Nodes: res.Nodes, Pagination: res.PaginationResults}, nil
}
type (
SingleNodeService struct {
ID int `uri:"id" json:"id" binding:"required"`
}
SingleNodeParamCtx struct{}
)
func (service *SingleNodeService) Get(c *gin.Context) (*GetNodeResponse, error) {
dep := dependency.FromContext(c)
nodeClient := dep.NodeClient()
ctx := context.WithValue(c, inventory.LoadNodeStoragePolicy{}, true)
node, err := nodeClient.GetNodeById(ctx, service.ID)
if err != nil {
return nil, serializer.NewError(serializer.CodeDBError, "Failed to get node", err)
}
return &GetNodeResponse{Node: node}, nil
}
type (
TestNodeService struct {
Node *ent.Node `json:"node" binding:"required"`
}
TestNodeParamCtx struct{}
)
func (service *TestNodeService) Test(c *gin.Context) error {
dep := dependency.FromContext(c)
settings := dep.SettingProvider()
slave, err := url.Parse(service.Node.Server)
if err != nil {
return serializer.NewError(serializer.CodeParamErr, "Failed to parse node URL", err)
}
primaryURL := settings.SiteURL(setting.UseFirstSiteUrl(c)).String()
body := map[string]string{
"callback": primaryURL,
}
bodyByte, _ := json.Marshal(body)
r := dep.RequestClient()
res, err := r.Request(
"POST",
routes.SlavePingRoute(slave),
bytes.NewReader(bodyByte),
request.WithTimeout(time.Duration(10)*time.Second),
request.WithCredential(
auth.HMACAuth{SecretKey: []byte(service.Node.SlaveKey)},
int64(settings.SlaveRequestSignTTL(c)),
),
request.WithSlaveMeta(int(service.Node.ID)),
request.WithMasterMeta(settings.SiteBasic(c).ID, primaryURL),
request.WithCorrelationID(),
).CheckHTTPResponse(http.StatusOK).DecodeResponse()
if err != nil {
return serializer.NewError(serializer.CodeParamErr, "Failed to connect to node: "+err.Error(), nil)
}
if res.Code != 0 {
return serializer.NewError(serializer.CodeParamErr, "Successfully connected to slave node, but slave returns: "+res.Msg, nil)
}
return nil
}
type (
TestNodeDownloaderService struct {
Node *ent.Node `json:"node" binding:"required"`
}
TestNodeDownloaderParamCtx struct{}
)
func (service *TestNodeDownloaderService) Test(c *gin.Context) (string, error) {
dep := dependency.FromContext(c)
settings := dep.SettingProvider()
var (
dl downloader.Downloader
err error
)
if service.Node.Type == node.TypeMaster {
dl, err = cluster.NewDownloader(c, dep.RequestClient(request.WithContext(c)), dep.SettingProvider(), service.Node.Settings)
} else {
dl = slave.NewSlaveDownloader(dep.RequestClient(
request.WithContext(c),
request.WithCorrelationID(),
request.WithSlaveMeta(service.Node.ID),
request.WithMasterMeta(settings.SiteBasic(c).ID, settings.SiteURL(setting.UseFirstSiteUrl(c)).String()),
request.WithCredential(auth.HMACAuth{[]byte(service.Node.SlaveKey)}, int64(settings.SlaveRequestSignTTL(c))),
request.WithEndpoint(service.Node.Server),
), service.Node.Settings)
}
if err != nil {
return "", serializer.NewError(serializer.CodeParamErr, "Failed to create downloader", err)
}
version, err := dl.Test(c)
if err != nil {
return "", serializer.NewError(serializer.CodeParamErr, "Failed to test downloader: "+err.Error(), nil)
}
return version, nil
}
type (
UpsertNodeService struct {
Node *ent.Node `json:"node" binding:"required"`
}
UpsertNodeParamCtx struct{}
)
func (s *UpsertNodeService) Update(c *gin.Context) (*GetNodeResponse, error) {
dep := dependency.FromContext(c)
nodeClient := dep.NodeClient()
if s.Node.ID == 0 {
return nil, serializer.NewError(serializer.CodeParamErr, "ID is required", nil)
}
node, err := nodeClient.Upsert(c, s.Node)
if err != nil {
return nil, serializer.NewError(serializer.CodeDBError, "Failed to update node", err)
}
// reload node pool
np, err := dep.NodePool(c)
if err != nil {
return nil, serializer.NewError(serializer.CodeInternalSetting, "Failed to get node pool", err)
}
np.Upsert(c, node)
// Clear policy cache since some this node maybe cached by some storage policy
kv := dep.KV()
kv.Delete(inventory.StoragePolicyCacheKey)
service := &SingleNodeService{ID: node.ID}
return service.Get(c)
}
func (s *UpsertNodeService) Create(c *gin.Context) (*GetNodeResponse, error) {
dep := dependency.FromContext(c)
nodeClient := dep.NodeClient()
if s.Node.ID > 0 {
return nil, serializer.NewError(serializer.CodeParamErr, "ID must be 0", nil)
}
node, err := nodeClient.Upsert(c, s.Node)
if err != nil {
return nil, serializer.NewError(serializer.CodeDBError, "Failed to create node", err)
}
// reload node pool
np, err := dep.NodePool(c)
if err != nil {
return nil, serializer.NewError(serializer.CodeInternalSetting, "Failed to get node pool", err)
}
np.Upsert(c, node)
service := &SingleNodeService{ID: node.ID}
return service.Get(c)
}
func (s *SingleNodeService) Delete(c *gin.Context) error {
dep := dependency.FromContext(c)
nodeClient := dep.NodeClient()
ctx := context.WithValue(c, inventory.LoadNodeStoragePolicy{}, true)
existing, err := nodeClient.GetNodeById(ctx, s.ID)
if err != nil {
return serializer.NewError(serializer.CodeDBError, "Failed to get node", err)
}
if existing.Type == node.TypeMaster {
return serializer.NewError(serializer.CodeInvalidActionOnSystemNode, "", nil)
}
if len(existing.Edges.StoragePolicy) > 0 {
return serializer.NewError(
serializer.CodeNodeUsedByStoragePolicy,
strings.Join(lo.Map(existing.Edges.StoragePolicy, func(i *ent.StoragePolicy, _ int) string {
return i.Name
}), ", "),
nil,
)
}
// insert dummpy disabled node in nodepool to evict it
disabledNode := &ent.Node{
ID: s.ID,
Type: node.TypeSlave,
Status: node.StatusSuspended,
}
np, err := dep.NodePool(c)
if err != nil {
return serializer.NewError(serializer.CodeInternalSetting, "Failed to get node pool", err)
}
np.Upsert(c, disabledNode)
return nodeClient.Delete(c, s.ID)
}