mirror of https://github.com/cloudreve/Cloudreve
257 lines
7.2 KiB
Go
257 lines
7.2 KiB
Go
package admin
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/cloudreve/Cloudreve/v4/application/dependency"
|
|
"github.com/cloudreve/Cloudreve/v4/ent"
|
|
"github.com/cloudreve/Cloudreve/v4/ent/node"
|
|
"github.com/cloudreve/Cloudreve/v4/inventory"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/auth"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/cluster/routes"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/downloader"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/downloader/slave"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/request"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/serializer"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/samber/lo"
|
|
)
|
|
|
|
const (
|
|
nodeStatusCondition = "node_status"
|
|
)
|
|
|
|
func (service *AdminListService) Nodes(c *gin.Context) (*ListNodeResponse, error) {
|
|
dep := dependency.FromContext(c)
|
|
nodeClient := dep.NodeClient()
|
|
|
|
ctx := context.WithValue(c, inventory.LoadNodeStoragePolicy{}, true)
|
|
res, err := nodeClient.ListNodes(ctx, &inventory.ListNodeParameters{
|
|
PaginationArgs: &inventory.PaginationArgs{
|
|
Page: service.Page - 1,
|
|
PageSize: service.PageSize,
|
|
OrderBy: service.OrderBy,
|
|
Order: inventory.OrderDirection(service.OrderDirection),
|
|
},
|
|
Status: node.Status(service.Conditions[nodeStatusCondition]),
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, serializer.NewError(serializer.CodeDBError, "Failed to list nodes", err)
|
|
}
|
|
|
|
return &ListNodeResponse{Nodes: res.Nodes, Pagination: res.PaginationResults}, nil
|
|
}
|
|
|
|
type (
|
|
SingleNodeService struct {
|
|
ID int `uri:"id" json:"id" binding:"required"`
|
|
}
|
|
SingleNodeParamCtx struct{}
|
|
)
|
|
|
|
func (service *SingleNodeService) Get(c *gin.Context) (*GetNodeResponse, error) {
|
|
dep := dependency.FromContext(c)
|
|
nodeClient := dep.NodeClient()
|
|
|
|
ctx := context.WithValue(c, inventory.LoadNodeStoragePolicy{}, true)
|
|
node, err := nodeClient.GetNodeById(ctx, service.ID)
|
|
if err != nil {
|
|
return nil, serializer.NewError(serializer.CodeDBError, "Failed to get node", err)
|
|
}
|
|
|
|
return &GetNodeResponse{Node: node}, nil
|
|
}
|
|
|
|
type (
|
|
TestNodeService struct {
|
|
Node *ent.Node `json:"node" binding:"required"`
|
|
}
|
|
TestNodeParamCtx struct{}
|
|
)
|
|
|
|
func (service *TestNodeService) Test(c *gin.Context) error {
|
|
dep := dependency.FromContext(c)
|
|
settings := dep.SettingProvider()
|
|
|
|
slave, err := url.Parse(service.Node.Server)
|
|
if err != nil {
|
|
return serializer.NewError(serializer.CodeParamErr, "Failed to parse node URL", err)
|
|
}
|
|
|
|
primaryURL := settings.SiteURL(setting.UseFirstSiteUrl(c)).String()
|
|
body := map[string]string{
|
|
"callback": primaryURL,
|
|
}
|
|
bodyByte, _ := json.Marshal(body)
|
|
|
|
r := dep.RequestClient()
|
|
res, err := r.Request(
|
|
"POST",
|
|
routes.SlavePingRoute(slave),
|
|
bytes.NewReader(bodyByte),
|
|
request.WithTimeout(time.Duration(10)*time.Second),
|
|
request.WithCredential(
|
|
auth.HMACAuth{SecretKey: []byte(service.Node.SlaveKey)},
|
|
int64(settings.SlaveRequestSignTTL(c)),
|
|
),
|
|
request.WithSlaveMeta(int(service.Node.ID)),
|
|
request.WithMasterMeta(settings.SiteBasic(c).ID, primaryURL),
|
|
request.WithCorrelationID(),
|
|
).CheckHTTPResponse(http.StatusOK).DecodeResponse()
|
|
|
|
if err != nil {
|
|
return serializer.NewError(serializer.CodeParamErr, "Failed to connect to node: "+err.Error(), nil)
|
|
}
|
|
|
|
if res.Code != 0 {
|
|
return serializer.NewError(serializer.CodeParamErr, "Successfully connected to slave node, but slave returns: "+res.Msg, nil)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type (
|
|
TestNodeDownloaderService struct {
|
|
Node *ent.Node `json:"node" binding:"required"`
|
|
}
|
|
TestNodeDownloaderParamCtx struct{}
|
|
)
|
|
|
|
func (service *TestNodeDownloaderService) Test(c *gin.Context) (string, error) {
|
|
dep := dependency.FromContext(c)
|
|
settings := dep.SettingProvider()
|
|
var (
|
|
dl downloader.Downloader
|
|
err error
|
|
)
|
|
if service.Node.Type == node.TypeMaster {
|
|
dl, err = cluster.NewDownloader(c, dep.RequestClient(request.WithContext(c)), dep.SettingProvider(), service.Node.Settings)
|
|
} else {
|
|
dl = slave.NewSlaveDownloader(dep.RequestClient(
|
|
request.WithContext(c),
|
|
request.WithCorrelationID(),
|
|
request.WithSlaveMeta(service.Node.ID),
|
|
request.WithMasterMeta(settings.SiteBasic(c).ID, settings.SiteURL(setting.UseFirstSiteUrl(c)).String()),
|
|
request.WithCredential(auth.HMACAuth{[]byte(service.Node.SlaveKey)}, int64(settings.SlaveRequestSignTTL(c))),
|
|
request.WithEndpoint(service.Node.Server),
|
|
), service.Node.Settings)
|
|
}
|
|
|
|
if err != nil {
|
|
return "", serializer.NewError(serializer.CodeParamErr, "Failed to create downloader", err)
|
|
}
|
|
|
|
version, err := dl.Test(c)
|
|
if err != nil {
|
|
return "", serializer.NewError(serializer.CodeParamErr, "Failed to test downloader: "+err.Error(), nil)
|
|
}
|
|
|
|
return version, nil
|
|
}
|
|
|
|
type (
|
|
UpsertNodeService struct {
|
|
Node *ent.Node `json:"node" binding:"required"`
|
|
}
|
|
UpsertNodeParamCtx struct{}
|
|
)
|
|
|
|
func (s *UpsertNodeService) Update(c *gin.Context) (*GetNodeResponse, error) {
|
|
dep := dependency.FromContext(c)
|
|
nodeClient := dep.NodeClient()
|
|
|
|
if s.Node.ID == 0 {
|
|
return nil, serializer.NewError(serializer.CodeParamErr, "ID is required", nil)
|
|
}
|
|
|
|
node, err := nodeClient.Upsert(c, s.Node)
|
|
if err != nil {
|
|
return nil, serializer.NewError(serializer.CodeDBError, "Failed to update node", err)
|
|
}
|
|
|
|
// reload node pool
|
|
np, err := dep.NodePool(c)
|
|
if err != nil {
|
|
return nil, serializer.NewError(serializer.CodeInternalSetting, "Failed to get node pool", err)
|
|
}
|
|
np.Upsert(c, node)
|
|
|
|
// Clear policy cache since some this node maybe cached by some storage policy
|
|
kv := dep.KV()
|
|
kv.Delete(inventory.StoragePolicyCacheKey)
|
|
|
|
service := &SingleNodeService{ID: node.ID}
|
|
return service.Get(c)
|
|
}
|
|
|
|
func (s *UpsertNodeService) Create(c *gin.Context) (*GetNodeResponse, error) {
|
|
dep := dependency.FromContext(c)
|
|
nodeClient := dep.NodeClient()
|
|
|
|
if s.Node.ID > 0 {
|
|
return nil, serializer.NewError(serializer.CodeParamErr, "ID must be 0", nil)
|
|
}
|
|
|
|
node, err := nodeClient.Upsert(c, s.Node)
|
|
if err != nil {
|
|
return nil, serializer.NewError(serializer.CodeDBError, "Failed to create node", err)
|
|
}
|
|
|
|
// reload node pool
|
|
np, err := dep.NodePool(c)
|
|
if err != nil {
|
|
return nil, serializer.NewError(serializer.CodeInternalSetting, "Failed to get node pool", err)
|
|
}
|
|
np.Upsert(c, node)
|
|
|
|
service := &SingleNodeService{ID: node.ID}
|
|
return service.Get(c)
|
|
}
|
|
|
|
func (s *SingleNodeService) Delete(c *gin.Context) error {
|
|
dep := dependency.FromContext(c)
|
|
nodeClient := dep.NodeClient()
|
|
|
|
ctx := context.WithValue(c, inventory.LoadNodeStoragePolicy{}, true)
|
|
existing, err := nodeClient.GetNodeById(ctx, s.ID)
|
|
if err != nil {
|
|
return serializer.NewError(serializer.CodeDBError, "Failed to get node", err)
|
|
}
|
|
|
|
if existing.Type == node.TypeMaster {
|
|
return serializer.NewError(serializer.CodeInvalidActionOnSystemNode, "", nil)
|
|
}
|
|
|
|
if len(existing.Edges.StoragePolicy) > 0 {
|
|
return serializer.NewError(
|
|
serializer.CodeNodeUsedByStoragePolicy,
|
|
strings.Join(lo.Map(existing.Edges.StoragePolicy, func(i *ent.StoragePolicy, _ int) string {
|
|
return i.Name
|
|
}), ", "),
|
|
nil,
|
|
)
|
|
}
|
|
|
|
// insert dummpy disabled node in nodepool to evict it
|
|
disabledNode := &ent.Node{
|
|
ID: s.ID,
|
|
Type: node.TypeSlave,
|
|
Status: node.StatusSuspended,
|
|
}
|
|
np, err := dep.NodePool(c)
|
|
if err != nil {
|
|
return serializer.NewError(serializer.CodeInternalSetting, "Failed to get node pool", err)
|
|
}
|
|
np.Upsert(c, disabledNode)
|
|
return nodeClient.Delete(c, s.ID)
|
|
}
|