mirror of https://github.com/cloudreve/Cloudreve
876 lines
25 KiB
Go
876 lines
25 KiB
Go
package dependency
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
iofs "io/fs"
|
|
"net/url"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cloudreve/Cloudreve/v4/application/statics"
|
|
"github.com/cloudreve/Cloudreve/v4/ent"
|
|
"github.com/cloudreve/Cloudreve/v4/inventory"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/auth"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/cache"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/conf"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/credmanager"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/email"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs/mime"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/lock"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/mediameta"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/request"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/thumb"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/util"
|
|
"github.com/gin-contrib/static"
|
|
"github.com/go-webauthn/webauthn/webauthn"
|
|
"github.com/robfig/cron/v3"
|
|
"github.com/samber/lo"
|
|
"github.com/ua-parser/uap-go/uaparser"
|
|
)
|
|
|
|
var (
|
|
ErrorConfigPathNotSet = errors.New("config path not set")
|
|
)
|
|
|
|
type (
|
|
// DepCtx defines keys for dependency manager
|
|
DepCtx struct{}
|
|
// ReloadCtx force reload new dependency
|
|
ReloadCtx struct{}
|
|
)
|
|
|
|
// Dep manages all dependencies of the server application. The default implementation is not
|
|
// concurrent safe, so all inner deps should be initialized before any goroutine starts.
|
|
type Dep interface {
|
|
// ConfigProvider Get a singleton conf.ConfigProvider instance.
|
|
ConfigProvider() conf.ConfigProvider
|
|
// Logger Get a singleton logging.Logger instance.
|
|
Logger() logging.Logger
|
|
// Statics Get a singleton fs.FS instance for embedded static resources.
|
|
Statics() iofs.FS
|
|
// ServerStaticFS Get a singleton static.ServeFileSystem instance for serving static resources.
|
|
ServerStaticFS() static.ServeFileSystem
|
|
// DBClient Get a singleton ent.Client instance for database access.
|
|
DBClient() *ent.Client
|
|
// KV Get a singleton cache.Driver instance for KV store.
|
|
KV() cache.Driver
|
|
// NavigatorStateKV Get a singleton cache.Driver instance for navigator state store. It forces use in-memory
|
|
// map instead of Redis to get better performance for complex nested linked list.
|
|
NavigatorStateKV() cache.Driver
|
|
// SettingClient Get a singleton inventory.SettingClient instance for access DB setting store.
|
|
SettingClient() inventory.SettingClient
|
|
// SettingProvider Get a singleton setting.Provider instance for access setting store in strong type.
|
|
SettingProvider() setting.Provider
|
|
// UserClient Creates a new inventory.UserClient instance for access DB user store.
|
|
UserClient() inventory.UserClient
|
|
// GroupClient Creates a new inventory.GroupClient instance for access DB group store.
|
|
GroupClient() inventory.GroupClient
|
|
// EmailClient Get a singleton email.Driver instance for sending emails.
|
|
EmailClient(ctx context.Context) email.Driver
|
|
// GeneralAuth Get a singleton auth.Auth instance for general authentication.
|
|
GeneralAuth() auth.Auth
|
|
// Shutdown the dependencies gracefully.
|
|
Shutdown(ctx context.Context) error
|
|
// FileClient Creates a new inventory.FileClient instance for access DB file store.
|
|
FileClient() inventory.FileClient
|
|
// NodeClient Creates a new inventory.NodeClient instance for access DB node store.
|
|
NodeClient() inventory.NodeClient
|
|
// DavAccountClient Creates a new inventory.DavAccountClient instance for access DB dav account store.
|
|
DavAccountClient() inventory.DavAccountClient
|
|
// DirectLinkClient Creates a new inventory.DirectLinkClient instance for access DB direct link store.
|
|
DirectLinkClient() inventory.DirectLinkClient
|
|
// HashIDEncoder Get a singleton hashid.Encoder instance for encoding/decoding hashids.
|
|
HashIDEncoder() hashid.Encoder
|
|
// TokenAuth Get a singleton auth.TokenAuth instance for token authentication.
|
|
TokenAuth() auth.TokenAuth
|
|
// LockSystem Get a singleton lock.LockSystem instance for file lock management.
|
|
LockSystem() lock.LockSystem
|
|
// ShareClient Creates a new inventory.ShareClient instance for access DB share store.
|
|
StoragePolicyClient() inventory.StoragePolicyClient
|
|
// RequestClient Creates a new request.Client instance for HTTP requests.
|
|
RequestClient(opts ...request.Option) request.Client
|
|
// ShareClient Creates a new inventory.ShareClient instance for access DB share store.
|
|
ShareClient() inventory.ShareClient
|
|
// TaskClient Creates a new inventory.TaskClient instance for access DB task store.
|
|
TaskClient() inventory.TaskClient
|
|
// ForkWithLogger create a shallow copy of dependency with a new correlated logger, used as per-request dep.
|
|
ForkWithLogger(ctx context.Context, l logging.Logger) context.Context
|
|
// MediaMetaQueue Get a singleton queue.Queue instance for media metadata processing.
|
|
MediaMetaQueue(ctx context.Context) queue.Queue
|
|
// SlaveQueue Get a singleton queue.Queue instance for slave tasks.
|
|
SlaveQueue(ctx context.Context) queue.Queue
|
|
// MediaMetaExtractor Get a singleton mediameta.Extractor instance for media metadata extraction.
|
|
MediaMetaExtractor(ctx context.Context) mediameta.Extractor
|
|
// ThumbPipeline Get a singleton thumb.Generator instance for chained thumbnail generation.
|
|
ThumbPipeline() thumb.Generator
|
|
// ThumbQueue Get a singleton queue.Queue instance for thumbnail generation.
|
|
ThumbQueue(ctx context.Context) queue.Queue
|
|
// EntityRecycleQueue Get a singleton queue.Queue instance for entity recycle.
|
|
EntityRecycleQueue(ctx context.Context) queue.Queue
|
|
// MimeDetector Get a singleton fs.MimeDetector instance for MIME type detection.
|
|
MimeDetector(ctx context.Context) mime.MimeDetector
|
|
// CredManager Get a singleton credmanager.CredManager instance for credential management.
|
|
CredManager() credmanager.CredManager
|
|
// IoIntenseQueue Get a singleton queue.Queue instance for IO intense tasks.
|
|
IoIntenseQueue(ctx context.Context) queue.Queue
|
|
// RemoteDownloadQueue Get a singleton queue.Queue instance for remote download tasks.
|
|
RemoteDownloadQueue(ctx context.Context) queue.Queue
|
|
// NodePool Get a singleton cluster.NodePool instance for node pool management.
|
|
NodePool(ctx context.Context) (cluster.NodePool, error)
|
|
// TaskRegistry Get a singleton queue.TaskRegistry instance for task registration.
|
|
TaskRegistry() queue.TaskRegistry
|
|
// WebAuthn Get a singleton webauthn.WebAuthn instance for WebAuthn authentication.
|
|
WebAuthn(ctx context.Context) (*webauthn.WebAuthn, error)
|
|
// UAParser Get a singleton uaparser.Parser instance for user agent parsing.
|
|
UAParser() *uaparser.Parser
|
|
}
|
|
|
|
type dependency struct {
|
|
configProvider conf.ConfigProvider
|
|
logger logging.Logger
|
|
statics iofs.FS
|
|
serverStaticFS static.ServeFileSystem
|
|
dbClient *ent.Client
|
|
rawEntClient *ent.Client
|
|
kv cache.Driver
|
|
navigatorStateKv cache.Driver
|
|
settingClient inventory.SettingClient
|
|
fileClient inventory.FileClient
|
|
shareClient inventory.ShareClient
|
|
settingProvider setting.Provider
|
|
userClient inventory.UserClient
|
|
groupClient inventory.GroupClient
|
|
storagePolicyClient inventory.StoragePolicyClient
|
|
taskClient inventory.TaskClient
|
|
nodeClient inventory.NodeClient
|
|
davAccountClient inventory.DavAccountClient
|
|
directLinkClient inventory.DirectLinkClient
|
|
emailClient email.Driver
|
|
generalAuth auth.Auth
|
|
hashidEncoder hashid.Encoder
|
|
tokenAuth auth.TokenAuth
|
|
lockSystem lock.LockSystem
|
|
requestClient request.Client
|
|
ioIntenseQueue queue.Queue
|
|
thumbQueue queue.Queue
|
|
mediaMetaQueue queue.Queue
|
|
entityRecycleQueue queue.Queue
|
|
slaveQueue queue.Queue
|
|
remoteDownloadQueue queue.Queue
|
|
ioIntenseQueueTask queue.Task
|
|
mediaMeta mediameta.Extractor
|
|
thumbPipeline thumb.Generator
|
|
mimeDetector mime.MimeDetector
|
|
credManager credmanager.CredManager
|
|
nodePool cluster.NodePool
|
|
taskRegistry queue.TaskRegistry
|
|
webauthn *webauthn.WebAuthn
|
|
parser *uaparser.Parser
|
|
cron *cron.Cron
|
|
|
|
configPath string
|
|
isPro bool
|
|
requiredDbVersion string
|
|
licenseKey string
|
|
|
|
// Protects inner deps that can be reloaded at runtime.
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// NewDependency creates a new Dep instance for construct dependencies.
|
|
func NewDependency(opts ...Option) Dep {
|
|
d := &dependency{}
|
|
for _, o := range opts {
|
|
o.apply(d)
|
|
}
|
|
|
|
return d
|
|
}
|
|
|
|
// FromContext retrieves a Dep instance from context.
|
|
func FromContext(ctx context.Context) Dep {
|
|
return ctx.Value(DepCtx{}).(Dep)
|
|
}
|
|
|
|
func (d *dependency) RequestClient(opts ...request.Option) request.Client {
|
|
if d.requestClient != nil {
|
|
return d.requestClient
|
|
}
|
|
|
|
return request.NewClient(d.ConfigProvider(), opts...)
|
|
}
|
|
|
|
func (d *dependency) WebAuthn(ctx context.Context) (*webauthn.WebAuthn, error) {
|
|
if d.webauthn != nil {
|
|
return d.webauthn, nil
|
|
}
|
|
|
|
settings := d.SettingProvider()
|
|
siteBasic := settings.SiteBasic(ctx)
|
|
wConfig := &webauthn.Config{
|
|
RPDisplayName: siteBasic.Name,
|
|
RPID: settings.SiteURL(ctx).Hostname(),
|
|
RPOrigins: lo.Map(settings.AllSiteURLs(ctx), func(item *url.URL, index int) string {
|
|
item.Path = ""
|
|
return item.String()
|
|
}), // The origin URLs allowed for WebAuthn requests
|
|
}
|
|
|
|
return webauthn.New(wConfig)
|
|
}
|
|
|
|
func (d *dependency) UAParser() *uaparser.Parser {
|
|
if d.parser != nil {
|
|
return d.parser
|
|
}
|
|
|
|
d.parser = uaparser.NewFromSaved()
|
|
return d.parser
|
|
}
|
|
|
|
func (d *dependency) ConfigProvider() conf.ConfigProvider {
|
|
if d.configProvider != nil {
|
|
return d.configProvider
|
|
}
|
|
|
|
if d.configPath == "" {
|
|
d.panicError(ErrorConfigPathNotSet)
|
|
}
|
|
|
|
var err error
|
|
d.configProvider, err = conf.NewIniConfigProvider(d.configPath, logging.NewConsoleLogger(logging.LevelInformational))
|
|
if err != nil {
|
|
d.panicError(err)
|
|
}
|
|
|
|
return d.configProvider
|
|
}
|
|
|
|
func (d *dependency) Logger() logging.Logger {
|
|
if d.logger != nil {
|
|
return d.logger
|
|
}
|
|
|
|
config := d.ConfigProvider()
|
|
logLevel := logging.LogLevel(config.System().LogLevel)
|
|
if config.System().Debug {
|
|
logLevel = logging.LevelDebug
|
|
}
|
|
|
|
d.logger = logging.NewConsoleLogger(logLevel)
|
|
d.logger.Info("Logger initialized with LogLevel=%q.", logLevel)
|
|
return d.logger
|
|
}
|
|
|
|
func (d *dependency) Statics() iofs.FS {
|
|
if d.statics != nil {
|
|
return d.statics
|
|
}
|
|
|
|
d.statics = statics.NewStaticFS(d.Logger())
|
|
return d.statics
|
|
}
|
|
|
|
func (d *dependency) ServerStaticFS() static.ServeFileSystem {
|
|
if d.serverStaticFS != nil {
|
|
return d.serverStaticFS
|
|
}
|
|
|
|
sfs, err := statics.NewServerStaticFS(d.Logger(), d.Statics(), d.isPro)
|
|
if err != nil {
|
|
d.panicError(err)
|
|
}
|
|
|
|
d.serverStaticFS = sfs
|
|
return d.serverStaticFS
|
|
}
|
|
|
|
func (d *dependency) DBClient() *ent.Client {
|
|
if d.dbClient != nil {
|
|
return d.dbClient
|
|
}
|
|
|
|
if d.rawEntClient == nil {
|
|
client, err := inventory.NewRawEntClient(d.Logger(), d.ConfigProvider())
|
|
if err != nil {
|
|
d.panicError(err)
|
|
}
|
|
|
|
d.rawEntClient = client
|
|
}
|
|
|
|
proSuffix := ""
|
|
if d.isPro {
|
|
proSuffix = "-pro"
|
|
}
|
|
|
|
client, err := inventory.InitializeDBClient(d.Logger(), d.rawEntClient, d.KV(), d.requiredDbVersion+proSuffix)
|
|
if err != nil {
|
|
d.panicError(err)
|
|
}
|
|
|
|
d.dbClient = client
|
|
return d.dbClient
|
|
}
|
|
|
|
func (d *dependency) KV() cache.Driver {
|
|
if d.kv != nil {
|
|
return d.kv
|
|
}
|
|
|
|
config := d.ConfigProvider().Redis()
|
|
if config.Server != "" {
|
|
d.kv = cache.NewRedisStore(
|
|
d.Logger(),
|
|
10,
|
|
config,
|
|
)
|
|
} else {
|
|
d.kv = cache.NewMemoStore(util.DataPath(cache.DefaultCacheFile), d.Logger())
|
|
}
|
|
|
|
return d.kv
|
|
}
|
|
|
|
func (d *dependency) NavigatorStateKV() cache.Driver {
|
|
if d.navigatorStateKv != nil {
|
|
return d.navigatorStateKv
|
|
}
|
|
d.navigatorStateKv = cache.NewMemoStore("", d.Logger())
|
|
return d.navigatorStateKv
|
|
}
|
|
|
|
func (d *dependency) SettingClient() inventory.SettingClient {
|
|
if d.settingClient != nil {
|
|
return d.settingClient
|
|
}
|
|
|
|
d.settingClient = inventory.NewSettingClient(d.DBClient(), d.KV())
|
|
return d.settingClient
|
|
}
|
|
|
|
func (d *dependency) SettingProvider() setting.Provider {
|
|
if d.settingProvider != nil {
|
|
return d.settingProvider
|
|
}
|
|
|
|
if d.ConfigProvider().System().Mode == conf.MasterMode {
|
|
// For master mode, setting value will be retrieved in order:
|
|
// Env overwrite -> KV Store -> DB Setting Store
|
|
d.settingProvider = setting.NewProvider(
|
|
setting.NewEnvOverrideStore(
|
|
setting.NewKvSettingStore(d.KV(),
|
|
setting.NewDbSettingStore(d.SettingClient(), nil),
|
|
),
|
|
d.Logger(),
|
|
),
|
|
)
|
|
} else {
|
|
// For slave mode, setting value will be retrieved in order:
|
|
// Env overwrite -> Config file overwrites -> Setting defaults in DB schema
|
|
d.settingProvider = setting.NewProvider(
|
|
setting.NewEnvOverrideStore(
|
|
setting.NewConfSettingStore(d.ConfigProvider(),
|
|
setting.NewDbDefaultStore(nil),
|
|
),
|
|
d.Logger(),
|
|
),
|
|
)
|
|
}
|
|
|
|
return d.settingProvider
|
|
}
|
|
|
|
func (d *dependency) UserClient() inventory.UserClient {
|
|
if d.userClient != nil {
|
|
return d.userClient
|
|
}
|
|
|
|
return inventory.NewUserClient(d.DBClient())
|
|
}
|
|
|
|
func (d *dependency) GroupClient() inventory.GroupClient {
|
|
if d.groupClient != nil {
|
|
return d.groupClient
|
|
}
|
|
|
|
return inventory.NewGroupClient(d.DBClient(), d.ConfigProvider().Database().Type, d.KV())
|
|
}
|
|
|
|
func (d *dependency) NodeClient() inventory.NodeClient {
|
|
if d.nodeClient != nil {
|
|
return d.nodeClient
|
|
}
|
|
|
|
return inventory.NewNodeClient(d.DBClient())
|
|
}
|
|
|
|
func (d *dependency) NodePool(ctx context.Context) (cluster.NodePool, error) {
|
|
reload, _ := ctx.Value(ReloadCtx{}).(bool)
|
|
if d.nodePool != nil && !reload {
|
|
return d.nodePool, nil
|
|
}
|
|
|
|
if d.ConfigProvider().System().Mode == conf.MasterMode {
|
|
np, err := cluster.NewNodePool(ctx, d.Logger(), d.ConfigProvider(), d.SettingProvider(), d.NodeClient())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
d.nodePool = np
|
|
} else {
|
|
d.nodePool = cluster.NewSlaveDummyNodePool(ctx, d.ConfigProvider(), d.SettingProvider())
|
|
}
|
|
|
|
return d.nodePool, nil
|
|
}
|
|
|
|
func (d *dependency) EmailClient(ctx context.Context) email.Driver {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
if reload, _ := ctx.Value(ReloadCtx{}).(bool); reload || d.emailClient == nil {
|
|
if d.emailClient != nil {
|
|
d.emailClient.Close()
|
|
}
|
|
d.emailClient = email.NewSMTPPool(d.SettingProvider(), d.Logger())
|
|
}
|
|
|
|
return d.emailClient
|
|
}
|
|
|
|
func (d *dependency) MimeDetector(ctx context.Context) mime.MimeDetector {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
_, reload := ctx.Value(ReloadCtx{}).(bool)
|
|
if d.mimeDetector != nil && !reload {
|
|
return d.mimeDetector
|
|
}
|
|
|
|
d.mimeDetector = mime.NewMimeDetector(ctx, d.SettingProvider(), d.Logger())
|
|
return d.mimeDetector
|
|
}
|
|
|
|
func (d *dependency) MediaMetaExtractor(ctx context.Context) mediameta.Extractor {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
_, reload := ctx.Value(ReloadCtx{}).(bool)
|
|
if d.mediaMeta != nil && !reload {
|
|
return d.mediaMeta
|
|
}
|
|
|
|
d.mediaMeta = mediameta.NewExtractorManager(ctx, d.SettingProvider(), d.Logger())
|
|
return d.mediaMeta
|
|
}
|
|
|
|
func (d *dependency) ThumbQueue(ctx context.Context) queue.Queue {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
_, reload := ctx.Value(ReloadCtx{}).(bool)
|
|
if d.thumbQueue != nil && !reload {
|
|
return d.thumbQueue
|
|
}
|
|
|
|
if d.thumbQueue != nil {
|
|
d.thumbQueue.Shutdown()
|
|
}
|
|
|
|
settings := d.SettingProvider()
|
|
queueSetting := settings.Queue(context.Background(), setting.QueueTypeThumb)
|
|
var (
|
|
t inventory.TaskClient
|
|
)
|
|
if d.ConfigProvider().System().Mode == conf.MasterMode {
|
|
t = d.TaskClient()
|
|
}
|
|
|
|
d.thumbQueue = queue.New(d.Logger(), t, nil, d,
|
|
queue.WithBackoffFactor(queueSetting.BackoffFactor),
|
|
queue.WithMaxRetry(queueSetting.MaxRetry),
|
|
queue.WithBackoffMaxDuration(queueSetting.BackoffMaxDuration),
|
|
queue.WithRetryDelay(queueSetting.RetryDelay),
|
|
queue.WithWorkerCount(queueSetting.WorkerNum),
|
|
queue.WithName("ThumbQueue"),
|
|
queue.WithMaxTaskExecution(queueSetting.MaxExecution),
|
|
)
|
|
return d.thumbQueue
|
|
}
|
|
|
|
func (d *dependency) MediaMetaQueue(ctx context.Context) queue.Queue {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
_, reload := ctx.Value(ReloadCtx{}).(bool)
|
|
if d.mediaMetaQueue != nil && !reload {
|
|
return d.mediaMetaQueue
|
|
}
|
|
|
|
if d.mediaMetaQueue != nil {
|
|
d.mediaMetaQueue.Shutdown()
|
|
}
|
|
|
|
settings := d.SettingProvider()
|
|
queueSetting := settings.Queue(context.Background(), setting.QueueTypeMediaMeta)
|
|
|
|
d.mediaMetaQueue = queue.New(d.Logger(), d.TaskClient(), nil, d,
|
|
queue.WithBackoffFactor(queueSetting.BackoffFactor),
|
|
queue.WithMaxRetry(queueSetting.MaxRetry),
|
|
queue.WithBackoffMaxDuration(queueSetting.BackoffMaxDuration),
|
|
queue.WithRetryDelay(queueSetting.RetryDelay),
|
|
queue.WithWorkerCount(queueSetting.WorkerNum),
|
|
queue.WithName("MediaMetadataQueue"),
|
|
queue.WithMaxTaskExecution(queueSetting.MaxExecution),
|
|
queue.WithResumeTaskType(queue.MediaMetaTaskType),
|
|
)
|
|
return d.mediaMetaQueue
|
|
}
|
|
|
|
func (d *dependency) IoIntenseQueue(ctx context.Context) queue.Queue {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
_, reload := ctx.Value(ReloadCtx{}).(bool)
|
|
if d.ioIntenseQueue != nil && !reload {
|
|
return d.ioIntenseQueue
|
|
}
|
|
|
|
if d.ioIntenseQueue != nil {
|
|
d.ioIntenseQueue.Shutdown()
|
|
}
|
|
|
|
settings := d.SettingProvider()
|
|
queueSetting := settings.Queue(context.Background(), setting.QueueTypeIOIntense)
|
|
|
|
d.ioIntenseQueue = queue.New(d.Logger(), d.TaskClient(), d.TaskRegistry(), d,
|
|
queue.WithBackoffFactor(queueSetting.BackoffFactor),
|
|
queue.WithMaxRetry(queueSetting.MaxRetry),
|
|
queue.WithBackoffMaxDuration(queueSetting.BackoffMaxDuration),
|
|
queue.WithRetryDelay(queueSetting.RetryDelay),
|
|
queue.WithWorkerCount(queueSetting.WorkerNum),
|
|
queue.WithName("IoIntenseQueue"),
|
|
queue.WithMaxTaskExecution(queueSetting.MaxExecution),
|
|
queue.WithResumeTaskType(queue.CreateArchiveTaskType, queue.ExtractArchiveTaskType, queue.RelocateTaskType, queue.ImportTaskType),
|
|
queue.WithTaskPullInterval(10*time.Second),
|
|
)
|
|
return d.ioIntenseQueue
|
|
}
|
|
|
|
func (d *dependency) RemoteDownloadQueue(ctx context.Context) queue.Queue {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
_, reload := ctx.Value(ReloadCtx{}).(bool)
|
|
if d.remoteDownloadQueue != nil && !reload {
|
|
return d.remoteDownloadQueue
|
|
}
|
|
|
|
if d.remoteDownloadQueue != nil {
|
|
d.remoteDownloadQueue.Shutdown()
|
|
}
|
|
|
|
settings := d.SettingProvider()
|
|
queueSetting := settings.Queue(context.Background(), setting.QueueTypeRemoteDownload)
|
|
|
|
d.remoteDownloadQueue = queue.New(d.Logger(), d.TaskClient(), d.TaskRegistry(), d,
|
|
queue.WithBackoffFactor(queueSetting.BackoffFactor),
|
|
queue.WithMaxRetry(queueSetting.MaxRetry),
|
|
queue.WithBackoffMaxDuration(queueSetting.BackoffMaxDuration),
|
|
queue.WithRetryDelay(queueSetting.RetryDelay),
|
|
queue.WithWorkerCount(queueSetting.WorkerNum),
|
|
queue.WithName("RemoteDownloadQueue"),
|
|
queue.WithMaxTaskExecution(queueSetting.MaxExecution),
|
|
queue.WithResumeTaskType(queue.RemoteDownloadTaskType),
|
|
queue.WithTaskPullInterval(10*time.Second),
|
|
)
|
|
return d.remoteDownloadQueue
|
|
}
|
|
|
|
func (d *dependency) EntityRecycleQueue(ctx context.Context) queue.Queue {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
_, reload := ctx.Value(ReloadCtx{}).(bool)
|
|
if d.entityRecycleQueue != nil && !reload {
|
|
return d.entityRecycleQueue
|
|
}
|
|
|
|
if d.entityRecycleQueue != nil {
|
|
d.entityRecycleQueue.Shutdown()
|
|
}
|
|
|
|
settings := d.SettingProvider()
|
|
queueSetting := settings.Queue(context.Background(), setting.QueueTypeEntityRecycle)
|
|
|
|
d.entityRecycleQueue = queue.New(d.Logger(), d.TaskClient(), nil, d,
|
|
queue.WithBackoffFactor(queueSetting.BackoffFactor),
|
|
queue.WithMaxRetry(queueSetting.MaxRetry),
|
|
queue.WithBackoffMaxDuration(queueSetting.BackoffMaxDuration),
|
|
queue.WithRetryDelay(queueSetting.RetryDelay),
|
|
queue.WithWorkerCount(queueSetting.WorkerNum),
|
|
queue.WithName("EntityRecycleQueue"),
|
|
queue.WithMaxTaskExecution(queueSetting.MaxExecution),
|
|
queue.WithResumeTaskType(queue.EntityRecycleRoutineTaskType, queue.ExplicitEntityRecycleTaskType, queue.UploadSentinelCheckTaskType),
|
|
queue.WithTaskPullInterval(10*time.Second),
|
|
)
|
|
return d.entityRecycleQueue
|
|
}
|
|
|
|
func (d *dependency) SlaveQueue(ctx context.Context) queue.Queue {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
_, reload := ctx.Value(ReloadCtx{}).(bool)
|
|
if d.slaveQueue != nil && !reload {
|
|
return d.slaveQueue
|
|
}
|
|
|
|
if d.slaveQueue != nil {
|
|
d.slaveQueue.Shutdown()
|
|
}
|
|
|
|
settings := d.SettingProvider()
|
|
queueSetting := settings.Queue(context.Background(), setting.QueueTypeSlave)
|
|
|
|
d.slaveQueue = queue.New(d.Logger(), nil, nil, d,
|
|
queue.WithBackoffFactor(queueSetting.BackoffFactor),
|
|
queue.WithMaxRetry(queueSetting.MaxRetry),
|
|
queue.WithBackoffMaxDuration(queueSetting.BackoffMaxDuration),
|
|
queue.WithRetryDelay(queueSetting.RetryDelay),
|
|
queue.WithWorkerCount(queueSetting.WorkerNum),
|
|
queue.WithName("SlaveQueue"),
|
|
queue.WithMaxTaskExecution(queueSetting.MaxExecution),
|
|
)
|
|
return d.slaveQueue
|
|
}
|
|
|
|
func (d *dependency) GeneralAuth() auth.Auth {
|
|
if d.generalAuth != nil {
|
|
return d.generalAuth
|
|
}
|
|
|
|
var secretKey string
|
|
if d.ConfigProvider().System().Mode == conf.MasterMode {
|
|
secretKey = d.SettingProvider().SecretKey(context.Background())
|
|
} else {
|
|
secretKey = d.ConfigProvider().Slave().Secret
|
|
if secretKey == "" {
|
|
d.panicError(errors.New("SlaveSecret is not set, please specify it in config file"))
|
|
}
|
|
}
|
|
|
|
d.generalAuth = auth.HMACAuth{
|
|
SecretKey: []byte(secretKey),
|
|
}
|
|
|
|
return d.generalAuth
|
|
}
|
|
|
|
func (d *dependency) FileClient() inventory.FileClient {
|
|
if d.fileClient != nil {
|
|
return d.fileClient
|
|
}
|
|
|
|
return inventory.NewFileClient(d.DBClient(), d.ConfigProvider().Database().Type, d.HashIDEncoder())
|
|
}
|
|
|
|
func (d *dependency) ShareClient() inventory.ShareClient {
|
|
if d.shareClient != nil {
|
|
return d.shareClient
|
|
}
|
|
|
|
return inventory.NewShareClient(d.DBClient(), d.ConfigProvider().Database().Type, d.HashIDEncoder())
|
|
}
|
|
|
|
func (d *dependency) TaskClient() inventory.TaskClient {
|
|
if d.taskClient != nil {
|
|
return d.taskClient
|
|
}
|
|
|
|
return inventory.NewTaskClient(d.DBClient(), d.ConfigProvider().Database().Type, d.HashIDEncoder())
|
|
}
|
|
|
|
func (d *dependency) DavAccountClient() inventory.DavAccountClient {
|
|
if d.davAccountClient != nil {
|
|
return d.davAccountClient
|
|
}
|
|
|
|
return inventory.NewDavAccountClient(d.DBClient(), d.ConfigProvider().Database().Type, d.HashIDEncoder())
|
|
}
|
|
|
|
func (d *dependency) DirectLinkClient() inventory.DirectLinkClient {
|
|
if d.directLinkClient != nil {
|
|
return d.directLinkClient
|
|
}
|
|
|
|
return inventory.NewDirectLinkClient(d.DBClient(), d.ConfigProvider().Database().Type, d.HashIDEncoder())
|
|
}
|
|
|
|
func (d *dependency) HashIDEncoder() hashid.Encoder {
|
|
if d.hashidEncoder != nil {
|
|
return d.hashidEncoder
|
|
}
|
|
|
|
encoder, err := hashid.New(d.SettingProvider().HashIDSalt(context.Background()))
|
|
if err != nil {
|
|
d.panicError(err)
|
|
}
|
|
|
|
d.hashidEncoder = encoder
|
|
return d.hashidEncoder
|
|
}
|
|
|
|
func (d *dependency) CredManager() credmanager.CredManager {
|
|
if d.credManager != nil {
|
|
return d.credManager
|
|
}
|
|
|
|
if d.ConfigProvider().System().Mode == conf.MasterMode {
|
|
d.credManager = credmanager.New(d.KV())
|
|
} else {
|
|
d.credManager = credmanager.NewSlaveManager(d.KV(), d.ConfigProvider())
|
|
}
|
|
return d.credManager
|
|
}
|
|
|
|
func (d *dependency) TokenAuth() auth.TokenAuth {
|
|
if d.tokenAuth != nil {
|
|
return d.tokenAuth
|
|
}
|
|
|
|
d.tokenAuth = auth.NewTokenAuth(d.HashIDEncoder(), d.SettingProvider(),
|
|
[]byte(d.SettingProvider().SecretKey(context.Background())), d.UserClient(), d.Logger(), d.KV())
|
|
return d.tokenAuth
|
|
}
|
|
|
|
func (d *dependency) LockSystem() lock.LockSystem {
|
|
if d.lockSystem != nil {
|
|
return d.lockSystem
|
|
}
|
|
|
|
d.lockSystem = lock.NewMemLS(d.HashIDEncoder(), d.Logger())
|
|
return d.lockSystem
|
|
}
|
|
|
|
func (d *dependency) StoragePolicyClient() inventory.StoragePolicyClient {
|
|
if d.storagePolicyClient != nil {
|
|
return d.storagePolicyClient
|
|
}
|
|
|
|
return inventory.NewStoragePolicyClient(d.DBClient(), d.KV())
|
|
}
|
|
|
|
func (d *dependency) ThumbPipeline() thumb.Generator {
|
|
if d.thumbPipeline != nil {
|
|
return d.thumbPipeline
|
|
}
|
|
|
|
d.thumbPipeline = thumb.NewPipeline(d.SettingProvider(), d.Logger())
|
|
return d.thumbPipeline
|
|
}
|
|
|
|
func (d *dependency) TaskRegistry() queue.TaskRegistry {
|
|
if d.taskRegistry != nil {
|
|
return d.taskRegistry
|
|
}
|
|
|
|
d.taskRegistry = queue.NewTaskRegistry()
|
|
return d.taskRegistry
|
|
}
|
|
|
|
func (d *dependency) Shutdown(ctx context.Context) error {
|
|
d.mu.Lock()
|
|
|
|
if d.emailClient != nil {
|
|
d.emailClient.Close()
|
|
}
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
if d.mediaMetaQueue != nil {
|
|
wg.Add(1)
|
|
go func() {
|
|
d.mediaMetaQueue.Shutdown()
|
|
defer wg.Done()
|
|
}()
|
|
}
|
|
|
|
if d.thumbQueue != nil {
|
|
wg.Add(1)
|
|
go func() {
|
|
d.thumbQueue.Shutdown()
|
|
defer wg.Done()
|
|
}()
|
|
}
|
|
|
|
if d.ioIntenseQueue != nil {
|
|
wg.Add(1)
|
|
go func() {
|
|
d.ioIntenseQueue.Shutdown()
|
|
defer wg.Done()
|
|
}()
|
|
}
|
|
|
|
if d.entityRecycleQueue != nil {
|
|
wg.Add(1)
|
|
go func() {
|
|
d.entityRecycleQueue.Shutdown()
|
|
defer wg.Done()
|
|
}()
|
|
}
|
|
|
|
if d.slaveQueue != nil {
|
|
wg.Add(1)
|
|
go func() {
|
|
d.slaveQueue.Shutdown()
|
|
defer wg.Done()
|
|
}()
|
|
}
|
|
|
|
if d.remoteDownloadQueue != nil {
|
|
wg.Add(1)
|
|
go func() {
|
|
d.remoteDownloadQueue.Shutdown()
|
|
defer wg.Done()
|
|
}()
|
|
}
|
|
|
|
d.mu.Unlock()
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *dependency) panicError(err error) {
|
|
if d.logger != nil {
|
|
d.logger.Panic("Fatal error in dependency initialization: %s", err)
|
|
}
|
|
|
|
panic(err)
|
|
}
|
|
|
|
func (d *dependency) ForkWithLogger(ctx context.Context, l logging.Logger) context.Context {
|
|
dep := &dependencyCorrelated{
|
|
l: l,
|
|
dependency: d,
|
|
}
|
|
return context.WithValue(ctx, DepCtx{}, dep)
|
|
}
|
|
|
|
type dependencyCorrelated struct {
|
|
l logging.Logger
|
|
*dependency
|
|
}
|
|
|
|
func (d *dependencyCorrelated) Logger() logging.Logger {
|
|
return d.l
|
|
}
|