Cloudreve/pkg/cluster/node.go

414 lines
12 KiB
Go

package cluster
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/cloudreve/Cloudreve/v4/application/constants"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/ent/node"
"github.com/cloudreve/Cloudreve/v4/ent/task"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/auth"
"github.com/cloudreve/Cloudreve/v4/pkg/cluster/routes"
"github.com/cloudreve/Cloudreve/v4/pkg/conf"
"github.com/cloudreve/Cloudreve/v4/pkg/downloader"
"github.com/cloudreve/Cloudreve/v4/pkg/downloader/aria2"
"github.com/cloudreve/Cloudreve/v4/pkg/downloader/qbittorrent"
"github.com/cloudreve/Cloudreve/v4/pkg/downloader/slave"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
"github.com/cloudreve/Cloudreve/v4/pkg/request"
"github.com/cloudreve/Cloudreve/v4/pkg/serializer"
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
"strconv"
)
type (
Node interface {
fs.StatelessUploadManager
ID() int
Name() string
IsMaster() bool
// CreateTask creates a task on the node. It does not have effect on master node.
CreateTask(ctx context.Context, taskType string, state string) (int, error)
// GetTask returns the task summary of the task with the given id.
GetTask(ctx context.Context, id int, clearOnComplete bool) (*SlaveTaskSummary, error)
// CleanupFolders cleans up the given folders on the node.
CleanupFolders(ctx context.Context, folders ...string) error
// AuthInstance returns the auth instance for the node.
AuthInstance() auth.Auth
// CreateDownloader creates a downloader instance from the node for remote download tasks.
CreateDownloader(ctx context.Context, c request.Client, settings setting.Provider) (downloader.Downloader, error)
// Settings returns the settings of the node.
Settings(ctx context.Context) *types.NodeSetting
}
// Request body for creating tasks on slave node
CreateSlaveTask struct {
Type string `json:"type"`
State string `json:"state"`
}
// Request body for cleaning up folders on slave node
FolderCleanup struct {
Path []string `json:"path" binding:"required"`
}
SlaveTaskSummary struct {
Status task.Status `json:"status"`
Error string `json:"error"`
PrivateState string `json:"private_state"`
Progress queue.Progresses `json:"progress,omitempty"`
}
MasterSiteUrlCtx struct{}
MasterSiteVersionCtx struct{}
MasterSiteIDCtx struct{}
SlaveNodeIDCtx struct{}
masterNode struct {
nodeBase
client request.Client
}
)
func newNode(ctx context.Context, model *ent.Node, config conf.ConfigProvider, settings setting.Provider) Node {
if model.Type == node.TypeMaster {
return newMasterNode(model, config, settings)
}
return newSlaveNode(ctx, model, config, settings)
}
func newMasterNode(model *ent.Node, config conf.ConfigProvider, settings setting.Provider) *masterNode {
n := &masterNode{
nodeBase: nodeBase{
model: model,
},
}
if config.System().Mode == conf.SlaveMode {
n.client = request.NewClient(config,
request.WithCorrelationID(),
request.WithCredential(auth.HMACAuth{
[]byte(config.Slave().Secret),
}, int64(config.Slave().SignatureTTL)),
)
}
return n
}
func (b *masterNode) PrepareUpload(ctx context.Context, args *fs.StatelessPrepareUploadService) (*fs.StatelessPrepareUploadResponse, error) {
reqBody, err := json.Marshal(args)
if err != nil {
return nil, fmt.Errorf("failed to marshal request body: %w", err)
}
requestDst := routes.MasterStatelessUrl(MasterSiteUrlFromContext(ctx), "prepare")
resp, err := b.client.Request(
"PUT",
requestDst.String(),
bytes.NewReader(reqBody),
request.WithContext(ctx),
request.WithSlaveMeta(NodeIdFromContext(ctx)),
request.WithLogger(logging.FromContext(ctx)),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return nil, err
}
// 处理列取结果
if resp.Code != 0 {
return nil, serializer.NewErrorFromResponse(resp)
}
uploadRequest := &fs.StatelessPrepareUploadResponse{}
resp.GobDecode(uploadRequest)
return uploadRequest, nil
}
func (b *masterNode) CompleteUpload(ctx context.Context, args *fs.StatelessCompleteUploadService) error {
reqBody, err := json.Marshal(args)
if err != nil {
return fmt.Errorf("failed to marshal request body: %w", err)
}
requestDst := routes.MasterStatelessUrl(MasterSiteUrlFromContext(ctx), "complete")
resp, err := b.client.Request(
"POST",
requestDst.String(),
bytes.NewReader(reqBody),
request.WithContext(ctx),
request.WithSlaveMeta(NodeIdFromContext(ctx)),
request.WithLogger(logging.FromContext(ctx)),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return err
}
// 处理列取结果
if resp.Code != 0 {
return serializer.NewErrorFromResponse(resp)
}
return nil
}
func (b *masterNode) OnUploadFailed(ctx context.Context, args *fs.StatelessOnUploadFailedService) error {
reqBody, err := json.Marshal(args)
if err != nil {
return fmt.Errorf("failed to marshal request body: %w", err)
}
requestDst := routes.MasterStatelessUrl(MasterSiteUrlFromContext(ctx), "failed")
resp, err := b.client.Request(
"POST",
requestDst.String(),
bytes.NewReader(reqBody),
request.WithContext(ctx),
request.WithSlaveMeta(NodeIdFromContext(ctx)),
request.WithLogger(logging.FromContext(ctx)),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return err
}
// 处理列取结果
if resp.Code != 0 {
return serializer.NewErrorFromResponse(resp)
}
return nil
}
func (b *masterNode) CreateFile(ctx context.Context, args *fs.StatelessCreateFileService) error {
reqBody, err := json.Marshal(args)
if err != nil {
return fmt.Errorf("failed to marshal request body: %w", err)
}
requestDst := routes.MasterStatelessUrl(MasterSiteUrlFromContext(ctx), "create")
resp, err := b.client.Request(
"POST",
requestDst.String(),
bytes.NewReader(reqBody),
request.WithContext(ctx),
request.WithSlaveMeta(NodeIdFromContext(ctx)),
request.WithLogger(logging.FromContext(ctx)),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return err
}
// 处理列取结果
if resp.Code != 0 {
return serializer.NewErrorFromResponse(resp)
}
return nil
}
func (b *masterNode) CreateDownloader(ctx context.Context, c request.Client, settings setting.Provider) (downloader.Downloader, error) {
return NewDownloader(ctx, c, settings, b.Settings(ctx))
}
// NewDownloader creates a new downloader instance from the node for remote download tasks.
func NewDownloader(ctx context.Context, c request.Client, settings setting.Provider, options *types.NodeSetting) (downloader.Downloader, error) {
if options.Provider == types.DownloaderProviderQBittorrent {
return qbittorrent.NewClient(logging.FromContext(ctx), c, settings, options.QBittorrentSetting)
} else if options.Provider == types.DownloaderProviderAria2 {
return aria2.New(logging.FromContext(ctx), settings, options.Aria2Setting), nil
} else if options.Provider == "" {
return nil, errors.New("downloader not configured for this node")
} else {
return nil, errors.New("unknown downloader provider")
}
}
type slaveNode struct {
nodeBase
client request.Client
}
func newSlaveNode(ctx context.Context, model *ent.Node, config conf.ConfigProvider, settings setting.Provider) *slaveNode {
siteBasic := settings.SiteBasic(ctx)
return &slaveNode{
nodeBase: nodeBase{
model: model,
},
client: request.NewClient(config,
request.WithCorrelationID(),
request.WithSlaveMeta(model.ID),
request.WithMasterMeta(siteBasic.ID, settings.SiteURL(setting.UseFirstSiteUrl(ctx)).String()),
request.WithCredential(auth.HMACAuth{[]byte(model.SlaveKey)}, int64(settings.SlaveRequestSignTTL(ctx))),
request.WithEndpoint(model.Server)),
}
}
func (n *slaveNode) CreateTask(ctx context.Context, taskType string, state string) (int, error) {
reqBody, err := json.Marshal(&CreateSlaveTask{
Type: taskType,
State: state,
})
if err != nil {
return 0, fmt.Errorf("failed to marshal request body: %w", err)
}
resp, err := n.client.Request(
"PUT",
constants.APIPrefixSlave+"/task",
bytes.NewReader(reqBody),
request.WithContext(ctx),
request.WithLogger(logging.FromContext(ctx)),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return 0, err
}
// 处理列取结果
if resp.Code != 0 {
return 0, serializer.NewErrorFromResponse(resp)
}
taskId := 0
if resp.GobDecode(&taskId); taskId > 0 {
return taskId, nil
}
return 0, fmt.Errorf("unexpected response data: %v", resp.Data)
}
func (n *slaveNode) GetTask(ctx context.Context, id int, clearOnComplete bool) (*SlaveTaskSummary, error) {
resp, err := n.client.Request(
"GET",
routes.SlaveGetTaskRoute(id, clearOnComplete),
nil,
request.WithContext(ctx),
request.WithLogger(logging.FromContext(ctx)),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return nil, err
}
// 处理列取结果
if resp.Code != 0 {
return nil, serializer.NewErrorFromResponse(resp)
}
summary := &SlaveTaskSummary{}
resp.GobDecode(summary)
return summary, nil
}
func (b *slaveNode) CleanupFolders(ctx context.Context, folders ...string) error {
args := &FolderCleanup{
Path: folders,
}
reqBody, err := json.Marshal(args)
if err != nil {
return fmt.Errorf("failed to marshal request body: %w", err)
}
resp, err := b.client.Request(
"POST",
constants.APIPrefixSlave+"/task/cleanup",
bytes.NewReader(reqBody),
request.WithContext(ctx),
request.WithLogger(logging.FromContext(ctx)),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return err
}
// 处理列取结果
if resp.Code != 0 {
return serializer.NewErrorFromResponse(resp)
}
return nil
}
func (b *slaveNode) CreateDownloader(ctx context.Context, c request.Client, settings setting.Provider) (downloader.Downloader, error) {
return slave.NewSlaveDownloader(b.client, b.Settings(ctx)), nil
}
type nodeBase struct {
model *ent.Node
}
func (b *nodeBase) ID() int {
return b.model.ID
}
func (b *nodeBase) Name() string {
return b.model.Name
}
func (b *nodeBase) IsMaster() bool {
return b.model.Type == node.TypeMaster
}
func (b *nodeBase) CreateTask(ctx context.Context, taskType string, state string) (int, error) {
return 0, errors.New("not implemented")
}
func (b *nodeBase) AuthInstance() auth.Auth {
return auth.HMACAuth{[]byte(b.model.SlaveKey)}
}
func (b *nodeBase) GetTask(ctx context.Context, id int, clearOnComplete bool) (*SlaveTaskSummary, error) {
return nil, errors.New("not implemented")
}
func (b *nodeBase) CleanupFolders(ctx context.Context, folders ...string) error {
return errors.New("not implemented")
}
func (b *nodeBase) PrepareUpload(ctx context.Context, args *fs.StatelessPrepareUploadService) (*fs.StatelessPrepareUploadResponse, error) {
return nil, errors.New("not implemented")
}
func (b *nodeBase) CompleteUpload(ctx context.Context, args *fs.StatelessCompleteUploadService) error {
return errors.New("not implemented")
}
func (b *nodeBase) OnUploadFailed(ctx context.Context, args *fs.StatelessOnUploadFailedService) error {
return errors.New("not implemented")
}
func (b *nodeBase) CreateFile(ctx context.Context, args *fs.StatelessCreateFileService) error {
return errors.New("not implemented")
}
func (b *nodeBase) CreateDownloader(ctx context.Context, c request.Client, settings setting.Provider) (downloader.Downloader, error) {
return nil, errors.New("not implemented")
}
func (b *nodeBase) Settings(ctx context.Context) *types.NodeSetting {
return b.model.Settings
}
func NodeIdFromContext(ctx context.Context) int {
nodeIdStr, ok := ctx.Value(SlaveNodeIDCtx{}).(string)
if !ok {
return 0
}
nodeId, _ := strconv.Atoi(nodeIdStr)
return nodeId
}
func MasterSiteUrlFromContext(ctx context.Context) string {
if u, ok := ctx.Value(MasterSiteUrlCtx{}).(string); ok {
return u
}
return ""
}