Cloudreve/application/dependency/dependency.go

876 lines
25 KiB
Go

package dependency
import (
"context"
"errors"
iofs "io/fs"
"net/url"
"sync"
"time"
"github.com/cloudreve/Cloudreve/v4/application/statics"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/inventory"
"github.com/cloudreve/Cloudreve/v4/pkg/auth"
"github.com/cloudreve/Cloudreve/v4/pkg/cache"
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
"github.com/cloudreve/Cloudreve/v4/pkg/conf"
"github.com/cloudreve/Cloudreve/v4/pkg/credmanager"
"github.com/cloudreve/Cloudreve/v4/pkg/email"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs/mime"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/lock"
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/mediameta"
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
"github.com/cloudreve/Cloudreve/v4/pkg/request"
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
"github.com/cloudreve/Cloudreve/v4/pkg/thumb"
"github.com/cloudreve/Cloudreve/v4/pkg/util"
"github.com/gin-contrib/static"
"github.com/go-webauthn/webauthn/webauthn"
"github.com/robfig/cron/v3"
"github.com/samber/lo"
"github.com/ua-parser/uap-go/uaparser"
)
var (
ErrorConfigPathNotSet = errors.New("config path not set")
)
type (
// DepCtx defines keys for dependency manager
DepCtx struct{}
// ReloadCtx force reload new dependency
ReloadCtx struct{}
)
// Dep manages all dependencies of the server application. The default implementation is not
// concurrent safe, so all inner deps should be initialized before any goroutine starts.
type Dep interface {
// ConfigProvider Get a singleton conf.ConfigProvider instance.
ConfigProvider() conf.ConfigProvider
// Logger Get a singleton logging.Logger instance.
Logger() logging.Logger
// Statics Get a singleton fs.FS instance for embedded static resources.
Statics() iofs.FS
// ServerStaticFS Get a singleton static.ServeFileSystem instance for serving static resources.
ServerStaticFS() static.ServeFileSystem
// DBClient Get a singleton ent.Client instance for database access.
DBClient() *ent.Client
// KV Get a singleton cache.Driver instance for KV store.
KV() cache.Driver
// NavigatorStateKV Get a singleton cache.Driver instance for navigator state store. It forces use in-memory
// map instead of Redis to get better performance for complex nested linked list.
NavigatorStateKV() cache.Driver
// SettingClient Get a singleton inventory.SettingClient instance for access DB setting store.
SettingClient() inventory.SettingClient
// SettingProvider Get a singleton setting.Provider instance for access setting store in strong type.
SettingProvider() setting.Provider
// UserClient Creates a new inventory.UserClient instance for access DB user store.
UserClient() inventory.UserClient
// GroupClient Creates a new inventory.GroupClient instance for access DB group store.
GroupClient() inventory.GroupClient
// EmailClient Get a singleton email.Driver instance for sending emails.
EmailClient(ctx context.Context) email.Driver
// GeneralAuth Get a singleton auth.Auth instance for general authentication.
GeneralAuth() auth.Auth
// Shutdown the dependencies gracefully.
Shutdown(ctx context.Context) error
// FileClient Creates a new inventory.FileClient instance for access DB file store.
FileClient() inventory.FileClient
// NodeClient Creates a new inventory.NodeClient instance for access DB node store.
NodeClient() inventory.NodeClient
// DavAccountClient Creates a new inventory.DavAccountClient instance for access DB dav account store.
DavAccountClient() inventory.DavAccountClient
// DirectLinkClient Creates a new inventory.DirectLinkClient instance for access DB direct link store.
DirectLinkClient() inventory.DirectLinkClient
// HashIDEncoder Get a singleton hashid.Encoder instance for encoding/decoding hashids.
HashIDEncoder() hashid.Encoder
// TokenAuth Get a singleton auth.TokenAuth instance for token authentication.
TokenAuth() auth.TokenAuth
// LockSystem Get a singleton lock.LockSystem instance for file lock management.
LockSystem() lock.LockSystem
// ShareClient Creates a new inventory.ShareClient instance for access DB share store.
StoragePolicyClient() inventory.StoragePolicyClient
// RequestClient Creates a new request.Client instance for HTTP requests.
RequestClient(opts ...request.Option) request.Client
// ShareClient Creates a new inventory.ShareClient instance for access DB share store.
ShareClient() inventory.ShareClient
// TaskClient Creates a new inventory.TaskClient instance for access DB task store.
TaskClient() inventory.TaskClient
// ForkWithLogger create a shallow copy of dependency with a new correlated logger, used as per-request dep.
ForkWithLogger(ctx context.Context, l logging.Logger) context.Context
// MediaMetaQueue Get a singleton queue.Queue instance for media metadata processing.
MediaMetaQueue(ctx context.Context) queue.Queue
// SlaveQueue Get a singleton queue.Queue instance for slave tasks.
SlaveQueue(ctx context.Context) queue.Queue
// MediaMetaExtractor Get a singleton mediameta.Extractor instance for media metadata extraction.
MediaMetaExtractor(ctx context.Context) mediameta.Extractor
// ThumbPipeline Get a singleton thumb.Generator instance for chained thumbnail generation.
ThumbPipeline() thumb.Generator
// ThumbQueue Get a singleton queue.Queue instance for thumbnail generation.
ThumbQueue(ctx context.Context) queue.Queue
// EntityRecycleQueue Get a singleton queue.Queue instance for entity recycle.
EntityRecycleQueue(ctx context.Context) queue.Queue
// MimeDetector Get a singleton fs.MimeDetector instance for MIME type detection.
MimeDetector(ctx context.Context) mime.MimeDetector
// CredManager Get a singleton credmanager.CredManager instance for credential management.
CredManager() credmanager.CredManager
// IoIntenseQueue Get a singleton queue.Queue instance for IO intense tasks.
IoIntenseQueue(ctx context.Context) queue.Queue
// RemoteDownloadQueue Get a singleton queue.Queue instance for remote download tasks.
RemoteDownloadQueue(ctx context.Context) queue.Queue
// NodePool Get a singleton cluster.NodePool instance for node pool management.
NodePool(ctx context.Context) (cluster.NodePool, error)
// TaskRegistry Get a singleton queue.TaskRegistry instance for task registration.
TaskRegistry() queue.TaskRegistry
// WebAuthn Get a singleton webauthn.WebAuthn instance for WebAuthn authentication.
WebAuthn(ctx context.Context) (*webauthn.WebAuthn, error)
// UAParser Get a singleton uaparser.Parser instance for user agent parsing.
UAParser() *uaparser.Parser
}
type dependency struct {
configProvider conf.ConfigProvider
logger logging.Logger
statics iofs.FS
serverStaticFS static.ServeFileSystem
dbClient *ent.Client
rawEntClient *ent.Client
kv cache.Driver
navigatorStateKv cache.Driver
settingClient inventory.SettingClient
fileClient inventory.FileClient
shareClient inventory.ShareClient
settingProvider setting.Provider
userClient inventory.UserClient
groupClient inventory.GroupClient
storagePolicyClient inventory.StoragePolicyClient
taskClient inventory.TaskClient
nodeClient inventory.NodeClient
davAccountClient inventory.DavAccountClient
directLinkClient inventory.DirectLinkClient
emailClient email.Driver
generalAuth auth.Auth
hashidEncoder hashid.Encoder
tokenAuth auth.TokenAuth
lockSystem lock.LockSystem
requestClient request.Client
ioIntenseQueue queue.Queue
thumbQueue queue.Queue
mediaMetaQueue queue.Queue
entityRecycleQueue queue.Queue
slaveQueue queue.Queue
remoteDownloadQueue queue.Queue
ioIntenseQueueTask queue.Task
mediaMeta mediameta.Extractor
thumbPipeline thumb.Generator
mimeDetector mime.MimeDetector
credManager credmanager.CredManager
nodePool cluster.NodePool
taskRegistry queue.TaskRegistry
webauthn *webauthn.WebAuthn
parser *uaparser.Parser
cron *cron.Cron
configPath string
isPro bool
requiredDbVersion string
licenseKey string
// Protects inner deps that can be reloaded at runtime.
mu sync.Mutex
}
// NewDependency creates a new Dep instance for construct dependencies.
func NewDependency(opts ...Option) Dep {
d := &dependency{}
for _, o := range opts {
o.apply(d)
}
return d
}
// FromContext retrieves a Dep instance from context.
func FromContext(ctx context.Context) Dep {
return ctx.Value(DepCtx{}).(Dep)
}
func (d *dependency) RequestClient(opts ...request.Option) request.Client {
if d.requestClient != nil {
return d.requestClient
}
return request.NewClient(d.ConfigProvider(), opts...)
}
func (d *dependency) WebAuthn(ctx context.Context) (*webauthn.WebAuthn, error) {
if d.webauthn != nil {
return d.webauthn, nil
}
settings := d.SettingProvider()
siteBasic := settings.SiteBasic(ctx)
wConfig := &webauthn.Config{
RPDisplayName: siteBasic.Name,
RPID: settings.SiteURL(ctx).Hostname(),
RPOrigins: lo.Map(settings.AllSiteURLs(ctx), func(item *url.URL, index int) string {
item.Path = ""
return item.String()
}), // The origin URLs allowed for WebAuthn requests
}
return webauthn.New(wConfig)
}
func (d *dependency) UAParser() *uaparser.Parser {
if d.parser != nil {
return d.parser
}
d.parser = uaparser.NewFromSaved()
return d.parser
}
func (d *dependency) ConfigProvider() conf.ConfigProvider {
if d.configProvider != nil {
return d.configProvider
}
if d.configPath == "" {
d.panicError(ErrorConfigPathNotSet)
}
var err error
d.configProvider, err = conf.NewIniConfigProvider(d.configPath, logging.NewConsoleLogger(logging.LevelInformational))
if err != nil {
d.panicError(err)
}
return d.configProvider
}
func (d *dependency) Logger() logging.Logger {
if d.logger != nil {
return d.logger
}
config := d.ConfigProvider()
logLevel := logging.LogLevel(config.System().LogLevel)
if config.System().Debug {
logLevel = logging.LevelDebug
}
d.logger = logging.NewConsoleLogger(logLevel)
d.logger.Info("Logger initialized with LogLevel=%q.", logLevel)
return d.logger
}
func (d *dependency) Statics() iofs.FS {
if d.statics != nil {
return d.statics
}
d.statics = statics.NewStaticFS(d.Logger())
return d.statics
}
func (d *dependency) ServerStaticFS() static.ServeFileSystem {
if d.serverStaticFS != nil {
return d.serverStaticFS
}
sfs, err := statics.NewServerStaticFS(d.Logger(), d.Statics(), d.isPro)
if err != nil {
d.panicError(err)
}
d.serverStaticFS = sfs
return d.serverStaticFS
}
func (d *dependency) DBClient() *ent.Client {
if d.dbClient != nil {
return d.dbClient
}
if d.rawEntClient == nil {
client, err := inventory.NewRawEntClient(d.Logger(), d.ConfigProvider())
if err != nil {
d.panicError(err)
}
d.rawEntClient = client
}
proSuffix := ""
if d.isPro {
proSuffix = "-pro"
}
client, err := inventory.InitializeDBClient(d.Logger(), d.rawEntClient, d.KV(), d.requiredDbVersion+proSuffix)
if err != nil {
d.panicError(err)
}
d.dbClient = client
return d.dbClient
}
func (d *dependency) KV() cache.Driver {
if d.kv != nil {
return d.kv
}
config := d.ConfigProvider().Redis()
if config.Server != "" {
d.kv = cache.NewRedisStore(
d.Logger(),
10,
config,
)
} else {
d.kv = cache.NewMemoStore(util.DataPath(cache.DefaultCacheFile), d.Logger())
}
return d.kv
}
func (d *dependency) NavigatorStateKV() cache.Driver {
if d.navigatorStateKv != nil {
return d.navigatorStateKv
}
d.navigatorStateKv = cache.NewMemoStore("", d.Logger())
return d.navigatorStateKv
}
func (d *dependency) SettingClient() inventory.SettingClient {
if d.settingClient != nil {
return d.settingClient
}
d.settingClient = inventory.NewSettingClient(d.DBClient(), d.KV())
return d.settingClient
}
func (d *dependency) SettingProvider() setting.Provider {
if d.settingProvider != nil {
return d.settingProvider
}
if d.ConfigProvider().System().Mode == conf.MasterMode {
// For master mode, setting value will be retrieved in order:
// Env overwrite -> KV Store -> DB Setting Store
d.settingProvider = setting.NewProvider(
setting.NewEnvOverrideStore(
setting.NewKvSettingStore(d.KV(),
setting.NewDbSettingStore(d.SettingClient(), nil),
),
d.Logger(),
),
)
} else {
// For slave mode, setting value will be retrieved in order:
// Env overwrite -> Config file overwrites -> Setting defaults in DB schema
d.settingProvider = setting.NewProvider(
setting.NewEnvOverrideStore(
setting.NewConfSettingStore(d.ConfigProvider(),
setting.NewDbDefaultStore(nil),
),
d.Logger(),
),
)
}
return d.settingProvider
}
func (d *dependency) UserClient() inventory.UserClient {
if d.userClient != nil {
return d.userClient
}
return inventory.NewUserClient(d.DBClient())
}
func (d *dependency) GroupClient() inventory.GroupClient {
if d.groupClient != nil {
return d.groupClient
}
return inventory.NewGroupClient(d.DBClient(), d.ConfigProvider().Database().Type, d.KV())
}
func (d *dependency) NodeClient() inventory.NodeClient {
if d.nodeClient != nil {
return d.nodeClient
}
return inventory.NewNodeClient(d.DBClient())
}
func (d *dependency) NodePool(ctx context.Context) (cluster.NodePool, error) {
reload, _ := ctx.Value(ReloadCtx{}).(bool)
if d.nodePool != nil && !reload {
return d.nodePool, nil
}
if d.ConfigProvider().System().Mode == conf.MasterMode {
np, err := cluster.NewNodePool(ctx, d.Logger(), d.ConfigProvider(), d.SettingProvider(), d.NodeClient())
if err != nil {
return nil, err
}
d.nodePool = np
} else {
d.nodePool = cluster.NewSlaveDummyNodePool(ctx, d.ConfigProvider(), d.SettingProvider())
}
return d.nodePool, nil
}
func (d *dependency) EmailClient(ctx context.Context) email.Driver {
d.mu.Lock()
defer d.mu.Unlock()
if reload, _ := ctx.Value(ReloadCtx{}).(bool); reload || d.emailClient == nil {
if d.emailClient != nil {
d.emailClient.Close()
}
d.emailClient = email.NewSMTPPool(d.SettingProvider(), d.Logger())
}
return d.emailClient
}
func (d *dependency) MimeDetector(ctx context.Context) mime.MimeDetector {
d.mu.Lock()
defer d.mu.Unlock()
_, reload := ctx.Value(ReloadCtx{}).(bool)
if d.mimeDetector != nil && !reload {
return d.mimeDetector
}
d.mimeDetector = mime.NewMimeDetector(ctx, d.SettingProvider(), d.Logger())
return d.mimeDetector
}
func (d *dependency) MediaMetaExtractor(ctx context.Context) mediameta.Extractor {
d.mu.Lock()
defer d.mu.Unlock()
_, reload := ctx.Value(ReloadCtx{}).(bool)
if d.mediaMeta != nil && !reload {
return d.mediaMeta
}
d.mediaMeta = mediameta.NewExtractorManager(ctx, d.SettingProvider(), d.Logger())
return d.mediaMeta
}
func (d *dependency) ThumbQueue(ctx context.Context) queue.Queue {
d.mu.Lock()
defer d.mu.Unlock()
_, reload := ctx.Value(ReloadCtx{}).(bool)
if d.thumbQueue != nil && !reload {
return d.thumbQueue
}
if d.thumbQueue != nil {
d.thumbQueue.Shutdown()
}
settings := d.SettingProvider()
queueSetting := settings.Queue(context.Background(), setting.QueueTypeThumb)
var (
t inventory.TaskClient
)
if d.ConfigProvider().System().Mode == conf.MasterMode {
t = d.TaskClient()
}
d.thumbQueue = queue.New(d.Logger(), t, nil, d,
queue.WithBackoffFactor(queueSetting.BackoffFactor),
queue.WithMaxRetry(queueSetting.MaxRetry),
queue.WithBackoffMaxDuration(queueSetting.BackoffMaxDuration),
queue.WithRetryDelay(queueSetting.RetryDelay),
queue.WithWorkerCount(queueSetting.WorkerNum),
queue.WithName("ThumbQueue"),
queue.WithMaxTaskExecution(queueSetting.MaxExecution),
)
return d.thumbQueue
}
func (d *dependency) MediaMetaQueue(ctx context.Context) queue.Queue {
d.mu.Lock()
defer d.mu.Unlock()
_, reload := ctx.Value(ReloadCtx{}).(bool)
if d.mediaMetaQueue != nil && !reload {
return d.mediaMetaQueue
}
if d.mediaMetaQueue != nil {
d.mediaMetaQueue.Shutdown()
}
settings := d.SettingProvider()
queueSetting := settings.Queue(context.Background(), setting.QueueTypeMediaMeta)
d.mediaMetaQueue = queue.New(d.Logger(), d.TaskClient(), nil, d,
queue.WithBackoffFactor(queueSetting.BackoffFactor),
queue.WithMaxRetry(queueSetting.MaxRetry),
queue.WithBackoffMaxDuration(queueSetting.BackoffMaxDuration),
queue.WithRetryDelay(queueSetting.RetryDelay),
queue.WithWorkerCount(queueSetting.WorkerNum),
queue.WithName("MediaMetadataQueue"),
queue.WithMaxTaskExecution(queueSetting.MaxExecution),
queue.WithResumeTaskType(queue.MediaMetaTaskType),
)
return d.mediaMetaQueue
}
func (d *dependency) IoIntenseQueue(ctx context.Context) queue.Queue {
d.mu.Lock()
defer d.mu.Unlock()
_, reload := ctx.Value(ReloadCtx{}).(bool)
if d.ioIntenseQueue != nil && !reload {
return d.ioIntenseQueue
}
if d.ioIntenseQueue != nil {
d.ioIntenseQueue.Shutdown()
}
settings := d.SettingProvider()
queueSetting := settings.Queue(context.Background(), setting.QueueTypeIOIntense)
d.ioIntenseQueue = queue.New(d.Logger(), d.TaskClient(), d.TaskRegistry(), d,
queue.WithBackoffFactor(queueSetting.BackoffFactor),
queue.WithMaxRetry(queueSetting.MaxRetry),
queue.WithBackoffMaxDuration(queueSetting.BackoffMaxDuration),
queue.WithRetryDelay(queueSetting.RetryDelay),
queue.WithWorkerCount(queueSetting.WorkerNum),
queue.WithName("IoIntenseQueue"),
queue.WithMaxTaskExecution(queueSetting.MaxExecution),
queue.WithResumeTaskType(queue.CreateArchiveTaskType, queue.ExtractArchiveTaskType, queue.RelocateTaskType, queue.ImportTaskType),
queue.WithTaskPullInterval(10*time.Second),
)
return d.ioIntenseQueue
}
func (d *dependency) RemoteDownloadQueue(ctx context.Context) queue.Queue {
d.mu.Lock()
defer d.mu.Unlock()
_, reload := ctx.Value(ReloadCtx{}).(bool)
if d.remoteDownloadQueue != nil && !reload {
return d.remoteDownloadQueue
}
if d.remoteDownloadQueue != nil {
d.remoteDownloadQueue.Shutdown()
}
settings := d.SettingProvider()
queueSetting := settings.Queue(context.Background(), setting.QueueTypeRemoteDownload)
d.remoteDownloadQueue = queue.New(d.Logger(), d.TaskClient(), d.TaskRegistry(), d,
queue.WithBackoffFactor(queueSetting.BackoffFactor),
queue.WithMaxRetry(queueSetting.MaxRetry),
queue.WithBackoffMaxDuration(queueSetting.BackoffMaxDuration),
queue.WithRetryDelay(queueSetting.RetryDelay),
queue.WithWorkerCount(queueSetting.WorkerNum),
queue.WithName("RemoteDownloadQueue"),
queue.WithMaxTaskExecution(queueSetting.MaxExecution),
queue.WithResumeTaskType(queue.RemoteDownloadTaskType),
queue.WithTaskPullInterval(10*time.Second),
)
return d.remoteDownloadQueue
}
func (d *dependency) EntityRecycleQueue(ctx context.Context) queue.Queue {
d.mu.Lock()
defer d.mu.Unlock()
_, reload := ctx.Value(ReloadCtx{}).(bool)
if d.entityRecycleQueue != nil && !reload {
return d.entityRecycleQueue
}
if d.entityRecycleQueue != nil {
d.entityRecycleQueue.Shutdown()
}
settings := d.SettingProvider()
queueSetting := settings.Queue(context.Background(), setting.QueueTypeEntityRecycle)
d.entityRecycleQueue = queue.New(d.Logger(), d.TaskClient(), nil, d,
queue.WithBackoffFactor(queueSetting.BackoffFactor),
queue.WithMaxRetry(queueSetting.MaxRetry),
queue.WithBackoffMaxDuration(queueSetting.BackoffMaxDuration),
queue.WithRetryDelay(queueSetting.RetryDelay),
queue.WithWorkerCount(queueSetting.WorkerNum),
queue.WithName("EntityRecycleQueue"),
queue.WithMaxTaskExecution(queueSetting.MaxExecution),
queue.WithResumeTaskType(queue.EntityRecycleRoutineTaskType, queue.ExplicitEntityRecycleTaskType, queue.UploadSentinelCheckTaskType),
queue.WithTaskPullInterval(10*time.Second),
)
return d.entityRecycleQueue
}
func (d *dependency) SlaveQueue(ctx context.Context) queue.Queue {
d.mu.Lock()
defer d.mu.Unlock()
_, reload := ctx.Value(ReloadCtx{}).(bool)
if d.slaveQueue != nil && !reload {
return d.slaveQueue
}
if d.slaveQueue != nil {
d.slaveQueue.Shutdown()
}
settings := d.SettingProvider()
queueSetting := settings.Queue(context.Background(), setting.QueueTypeSlave)
d.slaveQueue = queue.New(d.Logger(), nil, nil, d,
queue.WithBackoffFactor(queueSetting.BackoffFactor),
queue.WithMaxRetry(queueSetting.MaxRetry),
queue.WithBackoffMaxDuration(queueSetting.BackoffMaxDuration),
queue.WithRetryDelay(queueSetting.RetryDelay),
queue.WithWorkerCount(queueSetting.WorkerNum),
queue.WithName("SlaveQueue"),
queue.WithMaxTaskExecution(queueSetting.MaxExecution),
)
return d.slaveQueue
}
func (d *dependency) GeneralAuth() auth.Auth {
if d.generalAuth != nil {
return d.generalAuth
}
var secretKey string
if d.ConfigProvider().System().Mode == conf.MasterMode {
secretKey = d.SettingProvider().SecretKey(context.Background())
} else {
secretKey = d.ConfigProvider().Slave().Secret
if secretKey == "" {
d.panicError(errors.New("SlaveSecret is not set, please specify it in config file"))
}
}
d.generalAuth = auth.HMACAuth{
SecretKey: []byte(secretKey),
}
return d.generalAuth
}
func (d *dependency) FileClient() inventory.FileClient {
if d.fileClient != nil {
return d.fileClient
}
return inventory.NewFileClient(d.DBClient(), d.ConfigProvider().Database().Type, d.HashIDEncoder())
}
func (d *dependency) ShareClient() inventory.ShareClient {
if d.shareClient != nil {
return d.shareClient
}
return inventory.NewShareClient(d.DBClient(), d.ConfigProvider().Database().Type, d.HashIDEncoder())
}
func (d *dependency) TaskClient() inventory.TaskClient {
if d.taskClient != nil {
return d.taskClient
}
return inventory.NewTaskClient(d.DBClient(), d.ConfigProvider().Database().Type, d.HashIDEncoder())
}
func (d *dependency) DavAccountClient() inventory.DavAccountClient {
if d.davAccountClient != nil {
return d.davAccountClient
}
return inventory.NewDavAccountClient(d.DBClient(), d.ConfigProvider().Database().Type, d.HashIDEncoder())
}
func (d *dependency) DirectLinkClient() inventory.DirectLinkClient {
if d.directLinkClient != nil {
return d.directLinkClient
}
return inventory.NewDirectLinkClient(d.DBClient(), d.ConfigProvider().Database().Type, d.HashIDEncoder())
}
func (d *dependency) HashIDEncoder() hashid.Encoder {
if d.hashidEncoder != nil {
return d.hashidEncoder
}
encoder, err := hashid.New(d.SettingProvider().HashIDSalt(context.Background()))
if err != nil {
d.panicError(err)
}
d.hashidEncoder = encoder
return d.hashidEncoder
}
func (d *dependency) CredManager() credmanager.CredManager {
if d.credManager != nil {
return d.credManager
}
if d.ConfigProvider().System().Mode == conf.MasterMode {
d.credManager = credmanager.New(d.KV())
} else {
d.credManager = credmanager.NewSlaveManager(d.KV(), d.ConfigProvider())
}
return d.credManager
}
func (d *dependency) TokenAuth() auth.TokenAuth {
if d.tokenAuth != nil {
return d.tokenAuth
}
d.tokenAuth = auth.NewTokenAuth(d.HashIDEncoder(), d.SettingProvider(),
[]byte(d.SettingProvider().SecretKey(context.Background())), d.UserClient(), d.Logger(), d.KV())
return d.tokenAuth
}
func (d *dependency) LockSystem() lock.LockSystem {
if d.lockSystem != nil {
return d.lockSystem
}
d.lockSystem = lock.NewMemLS(d.HashIDEncoder(), d.Logger())
return d.lockSystem
}
func (d *dependency) StoragePolicyClient() inventory.StoragePolicyClient {
if d.storagePolicyClient != nil {
return d.storagePolicyClient
}
return inventory.NewStoragePolicyClient(d.DBClient(), d.KV())
}
func (d *dependency) ThumbPipeline() thumb.Generator {
if d.thumbPipeline != nil {
return d.thumbPipeline
}
d.thumbPipeline = thumb.NewPipeline(d.SettingProvider(), d.Logger())
return d.thumbPipeline
}
func (d *dependency) TaskRegistry() queue.TaskRegistry {
if d.taskRegistry != nil {
return d.taskRegistry
}
d.taskRegistry = queue.NewTaskRegistry()
return d.taskRegistry
}
func (d *dependency) Shutdown(ctx context.Context) error {
d.mu.Lock()
if d.emailClient != nil {
d.emailClient.Close()
}
wg := sync.WaitGroup{}
if d.mediaMetaQueue != nil {
wg.Add(1)
go func() {
d.mediaMetaQueue.Shutdown()
defer wg.Done()
}()
}
if d.thumbQueue != nil {
wg.Add(1)
go func() {
d.thumbQueue.Shutdown()
defer wg.Done()
}()
}
if d.ioIntenseQueue != nil {
wg.Add(1)
go func() {
d.ioIntenseQueue.Shutdown()
defer wg.Done()
}()
}
if d.entityRecycleQueue != nil {
wg.Add(1)
go func() {
d.entityRecycleQueue.Shutdown()
defer wg.Done()
}()
}
if d.slaveQueue != nil {
wg.Add(1)
go func() {
d.slaveQueue.Shutdown()
defer wg.Done()
}()
}
if d.remoteDownloadQueue != nil {
wg.Add(1)
go func() {
d.remoteDownloadQueue.Shutdown()
defer wg.Done()
}()
}
d.mu.Unlock()
wg.Wait()
return nil
}
func (d *dependency) panicError(err error) {
if d.logger != nil {
d.logger.Panic("Fatal error in dependency initialization: %s", err)
}
panic(err)
}
func (d *dependency) ForkWithLogger(ctx context.Context, l logging.Logger) context.Context {
dep := &dependencyCorrelated{
l: l,
dependency: d,
}
return context.WithValue(ctx, DepCtx{}, dep)
}
type dependencyCorrelated struct {
l logging.Logger
*dependency
}
func (d *dependencyCorrelated) Logger() logging.Logger {
return d.l
}