mirror of https://github.com/cloudreve/Cloudreve
416 lines
13 KiB
Go
416 lines
13 KiB
Go
package manager
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha1"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/cloudreve/Cloudreve/v4/ent"
|
|
"github.com/cloudreve/Cloudreve/v4/inventory/types"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/cluster/routes"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/driver"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs/dbfs"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/manager/entitysource"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/serializer"
|
|
"github.com/gofrs/uuid"
|
|
"github.com/samber/lo"
|
|
)
|
|
|
|
type (
|
|
EntityManagement interface {
|
|
// GetEntityUrls gets download urls of given entities, return URLs and the earliest expiry time
|
|
GetEntityUrls(ctx context.Context, args []GetEntityUrlArgs, opts ...fs.Option) ([]EntityUrl, *time.Time, error)
|
|
// GetUrlForRedirectedDirectLink gets redirected direct download link of given direct link
|
|
GetUrlForRedirectedDirectLink(ctx context.Context, dl *ent.DirectLink, opts ...fs.Option) (string, *time.Time, error)
|
|
// GetDirectLink gets permanent direct download link of given files
|
|
GetDirectLink(ctx context.Context, urls ...*fs.URI) ([]DirectLink, error)
|
|
// GetEntitySource gets source of given entity
|
|
GetEntitySource(ctx context.Context, entityID int, opts ...fs.Option) (entitysource.EntitySource, error)
|
|
// Thumbnail gets thumbnail entity of given file
|
|
Thumbnail(ctx context.Context, uri *fs.URI) (entitysource.EntitySource, error)
|
|
// SubmitAndAwaitThumbnailTask submits a thumbnail task and waits for result
|
|
SubmitAndAwaitThumbnailTask(ctx context.Context, uri *fs.URI, ext string, entity fs.Entity) (fs.Entity, error)
|
|
// SetCurrentVersion sets current version of given file
|
|
SetCurrentVersion(ctx context.Context, path *fs.URI, version int) error
|
|
// DeleteVersion deletes a version of given file
|
|
DeleteVersion(ctx context.Context, path *fs.URI, version int) error
|
|
// ExtractAndSaveMediaMeta extracts and saves media meta into file metadata of given file.
|
|
ExtractAndSaveMediaMeta(ctx context.Context, uri *fs.URI, entityID int) error
|
|
// RecycleEntities recycles a group of entities
|
|
RecycleEntities(ctx context.Context, force bool, entityIDs ...int) error
|
|
// ListPhysical lists physical files in a path
|
|
ListPhysical(ctx context.Context, path string, policyID int, recursive bool, progress driver.ListProgressFunc) ([]fs.PhysicalObject, error)
|
|
// ImportPhysical imports a physical file to a Cloudreve file
|
|
ImportPhysical(ctx context.Context, dst *fs.URI, policyId int, src fs.PhysicalObject, completeHook bool) error
|
|
}
|
|
DirectLink struct {
|
|
File fs.File
|
|
Url string
|
|
}
|
|
|
|
EntityUrl struct {
|
|
Url string `json:"url"`
|
|
BrowserDownloadDisplayName string `json:"stream_saver_display_name,omitempty"`
|
|
}
|
|
)
|
|
|
|
func (m *manager) GetDirectLink(ctx context.Context, urls ...*fs.URI) ([]DirectLink, error) {
|
|
ae := serializer.NewAggregateError()
|
|
res := make([]DirectLink, 0, len(urls))
|
|
useRedirect := m.user.Edges.Group.Settings.RedirectedSource
|
|
fileClient := m.dep.FileClient()
|
|
siteUrl := m.settings.SiteURL(ctx)
|
|
|
|
for _, url := range urls {
|
|
file, err := m.fs.Get(
|
|
ctx, url,
|
|
dbfs.WithFileEntities(),
|
|
dbfs.WithRequiredCapabilities(dbfs.NavigatorCapabilityDownloadFile),
|
|
)
|
|
if err != nil {
|
|
ae.Add(url.String(), err)
|
|
continue
|
|
}
|
|
|
|
if file.OwnerID() != m.user.ID && !m.user.Edges.Group.Permissions.Enabled(int(types.GroupPermissionIsAdmin)) {
|
|
ae.Add(url.String(), fs.ErrOwnerOnly)
|
|
continue
|
|
}
|
|
|
|
if file.Type() != types.FileTypeFile {
|
|
ae.Add(url.String(), fs.ErrEntityNotExist)
|
|
continue
|
|
}
|
|
|
|
target := file.PrimaryEntity()
|
|
if target == nil {
|
|
ae.Add(url.String(), fs.ErrEntityNotExist)
|
|
continue
|
|
}
|
|
|
|
// Hooks for entity download
|
|
if err := m.fs.ExecuteNavigatorHooks(ctx, fs.HookTypeBeforeDownload, file); err != nil {
|
|
m.l.Warning("Failed to execute navigator hooks: %s", err)
|
|
}
|
|
|
|
if useRedirect {
|
|
reuseExisting := !m.user.Edges.Group.Permissions.Enabled(int(types.GroupPermissionUniqueRedirectDirectLink))
|
|
// Use redirect source
|
|
link, err := fileClient.CreateDirectLink(ctx, file.ID(), file.Name(), m.user.Edges.Group.SpeedLimit, reuseExisting)
|
|
if err != nil {
|
|
ae.Add(url.String(), err)
|
|
continue
|
|
}
|
|
|
|
linkHashID := hashid.EncodeSourceLinkID(m.hasher, link.ID)
|
|
res = append(res, DirectLink{
|
|
File: file,
|
|
Url: routes.MasterDirectLink(siteUrl, linkHashID, link.Name).String(),
|
|
})
|
|
} else {
|
|
// Use direct source
|
|
policy, d, err := m.getEntityPolicyDriver(ctx, target, nil)
|
|
if err != nil {
|
|
ae.Add(url.String(), err)
|
|
continue
|
|
}
|
|
|
|
source := entitysource.NewEntitySource(target, d, policy, m.auth, m.settings, m.hasher, m.dep.RequestClient(),
|
|
m.l, m.config, m.dep.MimeDetector(ctx))
|
|
sourceUrl, err := source.Url(ctx,
|
|
entitysource.WithSpeedLimit(int64(m.user.Edges.Group.SpeedLimit)),
|
|
entitysource.WithDisplayName(file.Name()),
|
|
)
|
|
if err != nil {
|
|
ae.Add(url.String(), err)
|
|
continue
|
|
}
|
|
|
|
res = append(res, DirectLink{
|
|
File: file,
|
|
Url: sourceUrl.Url,
|
|
})
|
|
}
|
|
|
|
}
|
|
|
|
return res, ae.Aggregate()
|
|
}
|
|
|
|
func (m *manager) GetUrlForRedirectedDirectLink(ctx context.Context, dl *ent.DirectLink, opts ...fs.Option) (string, *time.Time, error) {
|
|
o := newOption()
|
|
for _, opt := range opts {
|
|
opt.Apply(o)
|
|
}
|
|
|
|
file, err := m.fs.GetFileFromDirectLink(ctx, dl)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
|
|
// Find primary entity
|
|
target, found := lo.Find(file.Entities(), func(entity fs.Entity) bool {
|
|
return entity.ID() == file.PrimaryEntityID()
|
|
})
|
|
if !found {
|
|
return "", nil, fs.ErrDirectLinkInvalid.WithError(fmt.Errorf("primary entity not found"))
|
|
}
|
|
primaryEntity := target
|
|
|
|
// Generate url
|
|
var (
|
|
res string
|
|
expire *time.Time
|
|
)
|
|
|
|
// Try to read from cache.
|
|
cacheKey := entityUrlCacheKey(primaryEntity.ID(), int64(dl.Speed), dl.Name, o.IsDownload,
|
|
m.settings.SiteURL(ctx).String())
|
|
if cached, ok := m.kv.Get(cacheKey); ok {
|
|
cachedItem := cached.(EntityUrlCache)
|
|
res = cachedItem.Url
|
|
expire = cachedItem.ExpireAt
|
|
} else {
|
|
// Cache miss, Generate new url
|
|
policy, d, err := m.getEntityPolicyDriver(ctx, primaryEntity, nil)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
|
|
source := entitysource.NewEntitySource(primaryEntity, d, policy, m.auth, m.settings, m.hasher, m.dep.RequestClient(),
|
|
m.l, m.config, m.dep.MimeDetector(ctx))
|
|
downloadUrl, err := source.Url(ctx,
|
|
entitysource.WithExpire(o.Expire),
|
|
entitysource.WithDownload(o.IsDownload),
|
|
entitysource.WithSpeedLimit(int64(dl.Speed)),
|
|
entitysource.WithDisplayName(dl.Name),
|
|
)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
|
|
// Save into kv
|
|
cacheValidDuration := expireTimeToTTL(o.Expire) - m.settings.EntityUrlCacheMargin(ctx)
|
|
if cacheValidDuration > 0 {
|
|
m.kv.Set(cacheKey, EntityUrlCache{
|
|
Url: downloadUrl.Url,
|
|
ExpireAt: downloadUrl.ExpireAt,
|
|
}, cacheValidDuration)
|
|
}
|
|
|
|
res = downloadUrl.Url
|
|
expire = downloadUrl.ExpireAt
|
|
}
|
|
|
|
return res, expire, nil
|
|
}
|
|
|
|
func (m *manager) GetEntityUrls(ctx context.Context, args []GetEntityUrlArgs, opts ...fs.Option) ([]EntityUrl, *time.Time, error) {
|
|
o := newOption()
|
|
for _, opt := range opts {
|
|
opt.Apply(o)
|
|
}
|
|
|
|
var earliestExpireAt *time.Time
|
|
res := make([]EntityUrl, len(args))
|
|
ae := serializer.NewAggregateError()
|
|
for i, arg := range args {
|
|
file, err := m.fs.Get(
|
|
ctx, arg.URI,
|
|
dbfs.WithFileEntities(),
|
|
dbfs.WithRequiredCapabilities(dbfs.NavigatorCapabilityDownloadFile),
|
|
)
|
|
if err != nil {
|
|
ae.Add(arg.URI.String(), err)
|
|
continue
|
|
}
|
|
|
|
if file.Type() != types.FileTypeFile {
|
|
ae.Add(arg.URI.String(), fs.ErrEntityNotExist)
|
|
continue
|
|
}
|
|
|
|
var (
|
|
target fs.Entity
|
|
found bool
|
|
)
|
|
if arg.PreferredEntityID != "" {
|
|
found, target = fs.FindDesiredEntity(file, arg.PreferredEntityID, m.hasher, nil)
|
|
if !found {
|
|
ae.Add(arg.URI.String(), fs.ErrEntityNotExist)
|
|
continue
|
|
}
|
|
} else {
|
|
// No preferred entity ID, use the primary version entity
|
|
target = file.PrimaryEntity()
|
|
if target == nil {
|
|
ae.Add(arg.URI.String(), fs.ErrEntityNotExist)
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Hooks for entity download
|
|
if err := m.fs.ExecuteNavigatorHooks(ctx, fs.HookTypeBeforeDownload, file); err != nil {
|
|
m.l.Warning("Failed to execute navigator hooks: %s", err)
|
|
}
|
|
|
|
policy, d, err := m.getEntityPolicyDriver(ctx, target, nil)
|
|
if err != nil {
|
|
ae.Add(arg.URI.String(), err)
|
|
continue
|
|
}
|
|
|
|
// Try to read from cache.
|
|
cacheKey := entityUrlCacheKey(target.ID(), o.DownloadSpeed, getEntityDisplayName(file, target), o.IsDownload,
|
|
m.settings.SiteURL(ctx).String())
|
|
if cached, ok := m.kv.Get(cacheKey); ok && !o.NoCache {
|
|
cachedItem := cached.(EntityUrlCache)
|
|
// Find the earliest expiry time
|
|
if cachedItem.ExpireAt != nil && (earliestExpireAt == nil || cachedItem.ExpireAt.Before(*earliestExpireAt)) {
|
|
earliestExpireAt = cachedItem.ExpireAt
|
|
}
|
|
res[i] = EntityUrl{
|
|
Url: cachedItem.Url,
|
|
BrowserDownloadDisplayName: cachedItem.BrowserDownloadDisplayName,
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Cache miss, Generate new url
|
|
source := entitysource.NewEntitySource(target, d, policy, m.auth, m.settings, m.hasher, m.dep.RequestClient(),
|
|
m.l, m.config, m.dep.MimeDetector(ctx))
|
|
downloadUrl, err := source.Url(ctx,
|
|
entitysource.WithExpire(o.Expire),
|
|
entitysource.WithDownload(o.IsDownload),
|
|
entitysource.WithSpeedLimit(o.DownloadSpeed),
|
|
entitysource.WithDisplayName(getEntityDisplayName(file, target)),
|
|
)
|
|
if err != nil {
|
|
ae.Add(arg.URI.String(), err)
|
|
continue
|
|
}
|
|
|
|
// Find the earliest expiry time
|
|
if downloadUrl.ExpireAt != nil && (earliestExpireAt == nil || downloadUrl.ExpireAt.Before(*earliestExpireAt)) {
|
|
earliestExpireAt = downloadUrl.ExpireAt
|
|
}
|
|
|
|
// Save into kv
|
|
cacheValidDuration := expireTimeToTTL(o.Expire) - m.settings.EntityUrlCacheMargin(ctx)
|
|
if cacheValidDuration > 0 {
|
|
m.kv.Set(cacheKey, EntityUrlCache{
|
|
Url: downloadUrl.Url,
|
|
ExpireAt: downloadUrl.ExpireAt,
|
|
}, cacheValidDuration)
|
|
}
|
|
|
|
res[i] = EntityUrl{
|
|
Url: downloadUrl.Url,
|
|
}
|
|
if d.Capabilities().BrowserRelayedDownload {
|
|
res[i].BrowserDownloadDisplayName = getEntityDisplayName(file, target)
|
|
}
|
|
}
|
|
|
|
return res, earliestExpireAt, ae.Aggregate()
|
|
}
|
|
|
|
func (m *manager) GetEntitySource(ctx context.Context, entityID int, opts ...fs.Option) (entitysource.EntitySource, error) {
|
|
o := newOption()
|
|
for _, opt := range opts {
|
|
opt.Apply(o)
|
|
}
|
|
|
|
var (
|
|
entity fs.Entity
|
|
err error
|
|
)
|
|
|
|
if o.Entity != nil {
|
|
entity = o.Entity
|
|
} else {
|
|
entity, err = m.fs.GetEntity(ctx, entityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if entity.ReferenceCount() == 0 {
|
|
return nil, fs.ErrEntityNotExist
|
|
}
|
|
}
|
|
|
|
policy, handler, err := m.getEntityPolicyDriver(ctx, entity, o.Policy)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return entitysource.NewEntitySource(entity, handler, policy, m.auth, m.settings, m.hasher, m.dep.RequestClient(), m.l,
|
|
m.config, m.dep.MimeDetector(ctx), entitysource.WithContext(ctx), entitysource.WithThumb(o.IsThumb)), nil
|
|
}
|
|
|
|
func (l *manager) SetCurrentVersion(ctx context.Context, path *fs.URI, version int) error {
|
|
return l.fs.VersionControl(ctx, path, version, false)
|
|
}
|
|
|
|
func (l *manager) DeleteVersion(ctx context.Context, path *fs.URI, version int) error {
|
|
return l.fs.VersionControl(ctx, path, version, true)
|
|
}
|
|
|
|
func (l *manager) ListPhysical(ctx context.Context, path string, policyID int, recursive bool, progress driver.ListProgressFunc) ([]fs.PhysicalObject, error) {
|
|
policy, err := l.dep.StoragePolicyClient().GetPolicyByID(ctx, policyID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
driver, err := l.GetStorageDriver(ctx, policy)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return driver.List(ctx, path, progress, recursive)
|
|
}
|
|
|
|
func (l *manager) ImportPhysical(ctx context.Context, dst *fs.URI, policyId int, src fs.PhysicalObject, completeHook bool) error {
|
|
targetUri := dst.Join(src.RelativePath)
|
|
req := &fs.UploadRequest{
|
|
Props: &fs.UploadProps{
|
|
Uri: targetUri,
|
|
UploadSessionID: uuid.Must(uuid.NewV4()).String(),
|
|
Size: src.Size,
|
|
PreferredStoragePolicy: policyId,
|
|
SavePath: src.Source,
|
|
LastModified: &src.LastModify,
|
|
},
|
|
ImportFrom: &src,
|
|
}
|
|
|
|
// Prepare for upload
|
|
uploadSession, err := l.fs.PrepareUpload(ctx, req)
|
|
if err != nil {
|
|
return fmt.Errorf("faield to prepare uplaod: %w", err)
|
|
}
|
|
if completeHook {
|
|
d, err := l.GetStorageDriver(ctx, l.CastStoragePolicyOnSlave(ctx, uploadSession.Policy))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
l.onNewEntityUploaded(ctx, uploadSession, d)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func entityUrlCacheKey(id int, speed int64, displayName string, download bool, siteUrl string) string {
|
|
hash := sha1.New()
|
|
hash.Write([]byte(fmt.Sprintf("%d_%d_%s_%t_%s", id,
|
|
speed, displayName, download, siteUrl)))
|
|
hashRes := hex.EncodeToString(hash.Sum(nil))
|
|
|
|
return fmt.Sprintf("%s_%s", EntityUrlCacheKeyPrefix, hashRes)
|
|
}
|