mirror of https://github.com/cloudreve/Cloudreve
414 lines
12 KiB
Go
414 lines
12 KiB
Go
package cluster
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/cloudreve/Cloudreve/v4/application/constants"
|
|
"github.com/cloudreve/Cloudreve/v4/ent"
|
|
"github.com/cloudreve/Cloudreve/v4/ent/node"
|
|
"github.com/cloudreve/Cloudreve/v4/ent/task"
|
|
"github.com/cloudreve/Cloudreve/v4/inventory/types"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/auth"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/cluster/routes"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/conf"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/downloader"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/downloader/aria2"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/downloader/qbittorrent"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/downloader/slave"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/request"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/serializer"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
|
|
"strconv"
|
|
)
|
|
|
|
type (
|
|
Node interface {
|
|
fs.StatelessUploadManager
|
|
ID() int
|
|
Name() string
|
|
IsMaster() bool
|
|
// CreateTask creates a task on the node. It does not have effect on master node.
|
|
CreateTask(ctx context.Context, taskType string, state string) (int, error)
|
|
// GetTask returns the task summary of the task with the given id.
|
|
GetTask(ctx context.Context, id int, clearOnComplete bool) (*SlaveTaskSummary, error)
|
|
// CleanupFolders cleans up the given folders on the node.
|
|
CleanupFolders(ctx context.Context, folders ...string) error
|
|
// AuthInstance returns the auth instance for the node.
|
|
AuthInstance() auth.Auth
|
|
// CreateDownloader creates a downloader instance from the node for remote download tasks.
|
|
CreateDownloader(ctx context.Context, c request.Client, settings setting.Provider) (downloader.Downloader, error)
|
|
// Settings returns the settings of the node.
|
|
Settings(ctx context.Context) *types.NodeSetting
|
|
}
|
|
|
|
// Request body for creating tasks on slave node
|
|
CreateSlaveTask struct {
|
|
Type string `json:"type"`
|
|
State string `json:"state"`
|
|
}
|
|
|
|
// Request body for cleaning up folders on slave node
|
|
FolderCleanup struct {
|
|
Path []string `json:"path" binding:"required"`
|
|
}
|
|
|
|
SlaveTaskSummary struct {
|
|
Status task.Status `json:"status"`
|
|
Error string `json:"error"`
|
|
PrivateState string `json:"private_state"`
|
|
Progress queue.Progresses `json:"progress,omitempty"`
|
|
}
|
|
|
|
MasterSiteUrlCtx struct{}
|
|
MasterSiteVersionCtx struct{}
|
|
MasterSiteIDCtx struct{}
|
|
SlaveNodeIDCtx struct{}
|
|
masterNode struct {
|
|
nodeBase
|
|
client request.Client
|
|
}
|
|
)
|
|
|
|
func newNode(ctx context.Context, model *ent.Node, config conf.ConfigProvider, settings setting.Provider) Node {
|
|
if model.Type == node.TypeMaster {
|
|
return newMasterNode(model, config, settings)
|
|
}
|
|
return newSlaveNode(ctx, model, config, settings)
|
|
}
|
|
|
|
func newMasterNode(model *ent.Node, config conf.ConfigProvider, settings setting.Provider) *masterNode {
|
|
n := &masterNode{
|
|
nodeBase: nodeBase{
|
|
model: model,
|
|
},
|
|
}
|
|
|
|
if config.System().Mode == conf.SlaveMode {
|
|
n.client = request.NewClient(config,
|
|
request.WithCorrelationID(),
|
|
request.WithCredential(auth.HMACAuth{
|
|
[]byte(config.Slave().Secret),
|
|
}, int64(config.Slave().SignatureTTL)),
|
|
)
|
|
}
|
|
|
|
return n
|
|
}
|
|
|
|
func (b *masterNode) PrepareUpload(ctx context.Context, args *fs.StatelessPrepareUploadService) (*fs.StatelessPrepareUploadResponse, error) {
|
|
reqBody, err := json.Marshal(args)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal request body: %w", err)
|
|
}
|
|
|
|
requestDst := routes.MasterStatelessUrl(MasterSiteUrlFromContext(ctx), "prepare")
|
|
resp, err := b.client.Request(
|
|
"PUT",
|
|
requestDst.String(),
|
|
bytes.NewReader(reqBody),
|
|
request.WithContext(ctx),
|
|
request.WithSlaveMeta(NodeIdFromContext(ctx)),
|
|
request.WithLogger(logging.FromContext(ctx)),
|
|
).CheckHTTPResponse(200).DecodeResponse()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// 处理列取结果
|
|
if resp.Code != 0 {
|
|
return nil, serializer.NewErrorFromResponse(resp)
|
|
}
|
|
|
|
uploadRequest := &fs.StatelessPrepareUploadResponse{}
|
|
resp.GobDecode(uploadRequest)
|
|
|
|
return uploadRequest, nil
|
|
}
|
|
|
|
func (b *masterNode) CompleteUpload(ctx context.Context, args *fs.StatelessCompleteUploadService) error {
|
|
reqBody, err := json.Marshal(args)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal request body: %w", err)
|
|
}
|
|
|
|
requestDst := routes.MasterStatelessUrl(MasterSiteUrlFromContext(ctx), "complete")
|
|
resp, err := b.client.Request(
|
|
"POST",
|
|
requestDst.String(),
|
|
bytes.NewReader(reqBody),
|
|
request.WithContext(ctx),
|
|
request.WithSlaveMeta(NodeIdFromContext(ctx)),
|
|
request.WithLogger(logging.FromContext(ctx)),
|
|
).CheckHTTPResponse(200).DecodeResponse()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 处理列取结果
|
|
if resp.Code != 0 {
|
|
return serializer.NewErrorFromResponse(resp)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *masterNode) OnUploadFailed(ctx context.Context, args *fs.StatelessOnUploadFailedService) error {
|
|
reqBody, err := json.Marshal(args)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal request body: %w", err)
|
|
}
|
|
|
|
requestDst := routes.MasterStatelessUrl(MasterSiteUrlFromContext(ctx), "failed")
|
|
resp, err := b.client.Request(
|
|
"POST",
|
|
requestDst.String(),
|
|
bytes.NewReader(reqBody),
|
|
request.WithContext(ctx),
|
|
request.WithSlaveMeta(NodeIdFromContext(ctx)),
|
|
request.WithLogger(logging.FromContext(ctx)),
|
|
).CheckHTTPResponse(200).DecodeResponse()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 处理列取结果
|
|
if resp.Code != 0 {
|
|
return serializer.NewErrorFromResponse(resp)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *masterNode) CreateFile(ctx context.Context, args *fs.StatelessCreateFileService) error {
|
|
reqBody, err := json.Marshal(args)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal request body: %w", err)
|
|
}
|
|
|
|
requestDst := routes.MasterStatelessUrl(MasterSiteUrlFromContext(ctx), "create")
|
|
resp, err := b.client.Request(
|
|
"POST",
|
|
requestDst.String(),
|
|
bytes.NewReader(reqBody),
|
|
request.WithContext(ctx),
|
|
request.WithSlaveMeta(NodeIdFromContext(ctx)),
|
|
request.WithLogger(logging.FromContext(ctx)),
|
|
).CheckHTTPResponse(200).DecodeResponse()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 处理列取结果
|
|
if resp.Code != 0 {
|
|
return serializer.NewErrorFromResponse(resp)
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
func (b *masterNode) CreateDownloader(ctx context.Context, c request.Client, settings setting.Provider) (downloader.Downloader, error) {
|
|
return NewDownloader(ctx, c, settings, b.Settings(ctx))
|
|
}
|
|
|
|
// NewDownloader creates a new downloader instance from the node for remote download tasks.
|
|
func NewDownloader(ctx context.Context, c request.Client, settings setting.Provider, options *types.NodeSetting) (downloader.Downloader, error) {
|
|
if options.Provider == types.DownloaderProviderQBittorrent {
|
|
return qbittorrent.NewClient(logging.FromContext(ctx), c, settings, options.QBittorrentSetting)
|
|
} else if options.Provider == types.DownloaderProviderAria2 {
|
|
return aria2.New(logging.FromContext(ctx), settings, options.Aria2Setting), nil
|
|
} else if options.Provider == "" {
|
|
return nil, errors.New("downloader not configured for this node")
|
|
} else {
|
|
return nil, errors.New("unknown downloader provider")
|
|
}
|
|
}
|
|
|
|
type slaveNode struct {
|
|
nodeBase
|
|
client request.Client
|
|
}
|
|
|
|
func newSlaveNode(ctx context.Context, model *ent.Node, config conf.ConfigProvider, settings setting.Provider) *slaveNode {
|
|
siteBasic := settings.SiteBasic(ctx)
|
|
return &slaveNode{
|
|
nodeBase: nodeBase{
|
|
model: model,
|
|
},
|
|
client: request.NewClient(config,
|
|
request.WithCorrelationID(),
|
|
request.WithSlaveMeta(model.ID),
|
|
request.WithMasterMeta(siteBasic.ID, settings.SiteURL(setting.UseFirstSiteUrl(ctx)).String()),
|
|
request.WithCredential(auth.HMACAuth{[]byte(model.SlaveKey)}, int64(settings.SlaveRequestSignTTL(ctx))),
|
|
request.WithEndpoint(model.Server)),
|
|
}
|
|
}
|
|
|
|
func (n *slaveNode) CreateTask(ctx context.Context, taskType string, state string) (int, error) {
|
|
reqBody, err := json.Marshal(&CreateSlaveTask{
|
|
Type: taskType,
|
|
State: state,
|
|
})
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to marshal request body: %w", err)
|
|
}
|
|
|
|
resp, err := n.client.Request(
|
|
"PUT",
|
|
constants.APIPrefixSlave+"/task",
|
|
bytes.NewReader(reqBody),
|
|
request.WithContext(ctx),
|
|
request.WithLogger(logging.FromContext(ctx)),
|
|
).CheckHTTPResponse(200).DecodeResponse()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// 处理列取结果
|
|
if resp.Code != 0 {
|
|
return 0, serializer.NewErrorFromResponse(resp)
|
|
}
|
|
|
|
taskId := 0
|
|
if resp.GobDecode(&taskId); taskId > 0 {
|
|
return taskId, nil
|
|
}
|
|
|
|
return 0, fmt.Errorf("unexpected response data: %v", resp.Data)
|
|
}
|
|
|
|
func (n *slaveNode) GetTask(ctx context.Context, id int, clearOnComplete bool) (*SlaveTaskSummary, error) {
|
|
resp, err := n.client.Request(
|
|
"GET",
|
|
routes.SlaveGetTaskRoute(id, clearOnComplete),
|
|
nil,
|
|
request.WithContext(ctx),
|
|
request.WithLogger(logging.FromContext(ctx)),
|
|
).CheckHTTPResponse(200).DecodeResponse()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// 处理列取结果
|
|
if resp.Code != 0 {
|
|
return nil, serializer.NewErrorFromResponse(resp)
|
|
}
|
|
|
|
summary := &SlaveTaskSummary{}
|
|
resp.GobDecode(summary)
|
|
|
|
return summary, nil
|
|
}
|
|
|
|
func (b *slaveNode) CleanupFolders(ctx context.Context, folders ...string) error {
|
|
args := &FolderCleanup{
|
|
Path: folders,
|
|
}
|
|
reqBody, err := json.Marshal(args)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal request body: %w", err)
|
|
}
|
|
|
|
resp, err := b.client.Request(
|
|
"POST",
|
|
constants.APIPrefixSlave+"/task/cleanup",
|
|
bytes.NewReader(reqBody),
|
|
request.WithContext(ctx),
|
|
request.WithLogger(logging.FromContext(ctx)),
|
|
).CheckHTTPResponse(200).DecodeResponse()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 处理列取结果
|
|
if resp.Code != 0 {
|
|
return serializer.NewErrorFromResponse(resp)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *slaveNode) CreateDownloader(ctx context.Context, c request.Client, settings setting.Provider) (downloader.Downloader, error) {
|
|
return slave.NewSlaveDownloader(b.client, b.Settings(ctx)), nil
|
|
}
|
|
|
|
type nodeBase struct {
|
|
model *ent.Node
|
|
}
|
|
|
|
func (b *nodeBase) ID() int {
|
|
return b.model.ID
|
|
}
|
|
|
|
func (b *nodeBase) Name() string {
|
|
return b.model.Name
|
|
}
|
|
|
|
func (b *nodeBase) IsMaster() bool {
|
|
return b.model.Type == node.TypeMaster
|
|
}
|
|
|
|
func (b *nodeBase) CreateTask(ctx context.Context, taskType string, state string) (int, error) {
|
|
return 0, errors.New("not implemented")
|
|
}
|
|
|
|
func (b *nodeBase) AuthInstance() auth.Auth {
|
|
return auth.HMACAuth{[]byte(b.model.SlaveKey)}
|
|
}
|
|
|
|
func (b *nodeBase) GetTask(ctx context.Context, id int, clearOnComplete bool) (*SlaveTaskSummary, error) {
|
|
return nil, errors.New("not implemented")
|
|
}
|
|
|
|
func (b *nodeBase) CleanupFolders(ctx context.Context, folders ...string) error {
|
|
return errors.New("not implemented")
|
|
}
|
|
|
|
func (b *nodeBase) PrepareUpload(ctx context.Context, args *fs.StatelessPrepareUploadService) (*fs.StatelessPrepareUploadResponse, error) {
|
|
return nil, errors.New("not implemented")
|
|
}
|
|
|
|
func (b *nodeBase) CompleteUpload(ctx context.Context, args *fs.StatelessCompleteUploadService) error {
|
|
return errors.New("not implemented")
|
|
}
|
|
|
|
func (b *nodeBase) OnUploadFailed(ctx context.Context, args *fs.StatelessOnUploadFailedService) error {
|
|
return errors.New("not implemented")
|
|
}
|
|
|
|
func (b *nodeBase) CreateFile(ctx context.Context, args *fs.StatelessCreateFileService) error {
|
|
return errors.New("not implemented")
|
|
}
|
|
|
|
func (b *nodeBase) CreateDownloader(ctx context.Context, c request.Client, settings setting.Provider) (downloader.Downloader, error) {
|
|
return nil, errors.New("not implemented")
|
|
}
|
|
|
|
func (b *nodeBase) Settings(ctx context.Context) *types.NodeSetting {
|
|
return b.model.Settings
|
|
}
|
|
|
|
func NodeIdFromContext(ctx context.Context) int {
|
|
nodeIdStr, ok := ctx.Value(SlaveNodeIDCtx{}).(string)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
|
|
nodeId, _ := strconv.Atoi(nodeIdStr)
|
|
return nodeId
|
|
}
|
|
|
|
func MasterSiteUrlFromContext(ctx context.Context) string {
|
|
if u, ok := ctx.Value(MasterSiteUrlCtx{}).(string); ok {
|
|
return u
|
|
}
|
|
|
|
return ""
|
|
}
|