Cloudreve/pkg/filemanager/manager/mediameta.go

194 lines
5.6 KiB
Go

package manager
import (
"context"
"encoding/json"
"fmt"
"github.com/cloudreve/Cloudreve/v4/application/dependency"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/ent/task"
"github.com/cloudreve/Cloudreve/v4/inventory"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/driver"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs/dbfs"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
"github.com/cloudreve/Cloudreve/v4/pkg/util"
"github.com/samber/lo"
)
type (
MediaMetaTask struct {
*queue.DBTask
}
MediaMetaTaskState struct {
Uri *fs.URI `json:"uri"`
EntityID int `json:"entity_id"`
}
)
func init() {
queue.RegisterResumableTaskFactory(queue.MediaMetaTaskType, NewMediaMetaTaskFromModel)
}
// NewMediaMetaTask creates a new MediaMetaTask to
func NewMediaMetaTask(ctx context.Context, uri *fs.URI, entityID int, creator *ent.User) (*MediaMetaTask, error) {
state := &MediaMetaTaskState{
Uri: uri,
EntityID: entityID,
}
stateBytes, err := json.Marshal(state)
if err != nil {
return nil, fmt.Errorf("failed to marshal state: %w", err)
}
return &MediaMetaTask{
DBTask: &queue.DBTask{
DirectOwner: creator,
Task: &ent.Task{
Type: queue.MediaMetaTaskType,
CorrelationID: logging.CorrelationID(ctx),
PrivateState: string(stateBytes),
PublicState: &types.TaskPublicState{},
},
},
}, nil
}
func NewMediaMetaTaskFromModel(task *ent.Task) queue.Task {
return &MediaMetaTask{
DBTask: &queue.DBTask{
Task: task,
},
}
}
func (m *MediaMetaTask) Do(ctx context.Context) (task.Status, error) {
dep := dependency.FromContext(ctx)
fm := NewFileManager(dep, inventory.UserFromContext(ctx)).(*manager)
// unmarshal state
var state MediaMetaTaskState
if err := json.Unmarshal([]byte(m.State()), &state); err != nil {
return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr)
}
err := fm.ExtractAndSaveMediaMeta(ctx, state.Uri, state.EntityID)
if err != nil {
return task.StatusError, err
}
return task.StatusCompleted, nil
}
func (m *manager) ExtractAndSaveMediaMeta(ctx context.Context, uri *fs.URI, entityID int) error {
// 1. retrieve file info
file, err := m.fs.Get(ctx, uri, dbfs.WithFileEntities())
if err != nil {
return fmt.Errorf("failed to get file: %w", err)
}
versions := lo.Filter(file.Entities(), func(i fs.Entity, index int) bool {
return i.Type() == types.EntityTypeVersion
})
targetVersion, versionIndex, found := lo.FindIndexOf(versions, func(i fs.Entity) bool {
return i.ID() == entityID
})
if !found {
return fmt.Errorf("failed to find version: %s (%w)", err, queue.CriticalErr)
}
if versionIndex != 0 {
m.l.Debug("Skip media meta task for non-latest version.")
return nil
}
var (
metas []driver.MediaMeta
)
// 2. try using native driver
_, d, err := m.getEntityPolicyDriver(ctx, targetVersion, nil)
if err != nil {
return fmt.Errorf("failed to get storage driver: %s (%w)", err, queue.CriticalErr)
}
driverCaps := d.Capabilities()
if util.IsInExtensionList(driverCaps.MediaMetaSupportedExts, file.Name()) {
m.l.Debug("Using native driver to generate media meta.")
metas, err = d.MediaMeta(ctx, targetVersion.Source(), file.Ext())
if err != nil {
return fmt.Errorf("failed to get media meta using native driver: %w", err)
}
} else if driverCaps.MediaMetaProxy && util.IsInExtensionList(m.dep.MediaMetaExtractor(ctx).Exts(), file.Name()) {
m.l.Debug("Using local extractor to generate media meta.")
extractor := m.dep.MediaMetaExtractor(ctx)
source, err := m.GetEntitySource(ctx, targetVersion.ID())
defer source.Close()
if err != nil {
return fmt.Errorf("failed to get entity source: %w", err)
}
metas, err = extractor.Extract(ctx, file.Ext(), source)
if err != nil {
return fmt.Errorf("failed to extract media meta using local extractor: %w", err)
}
} else {
m.l.Debug("No available generator for media meta.")
return nil
}
m.l.Debug("%d media meta generated.", len(metas))
m.l.Debug("Media meta: %v", metas)
// 3. save meta
if len(metas) > 0 {
if err := m.fs.PatchMetadata(ctx, []*fs.URI{uri}, lo.Map(metas, func(i driver.MediaMeta, index int) fs.MetadataPatch {
return fs.MetadataPatch{
Key: fmt.Sprintf("%s:%s", i.Type, i.Key),
Value: i.Value,
}
})...); err != nil {
return fmt.Errorf("failed to save media meta: %s (%w)", err, queue.CriticalErr)
}
}
return nil
}
func (m *manager) shouldGenerateMediaMeta(ctx context.Context, d driver.Handler, fileName string) bool {
driverCaps := d.Capabilities()
if util.IsInExtensionList(driverCaps.MediaMetaSupportedExts, fileName) {
// Handler support it natively
return true
}
if driverCaps.MediaMetaProxy && util.IsInExtensionList(m.dep.MediaMetaExtractor(ctx).Exts(), fileName) {
// Handler does not support. but proxy is enabled.
return true
}
return false
}
func (m *manager) mediaMetaForNewEntity(ctx context.Context, session *fs.UploadSession, d driver.Handler) {
if session.Props.EntityType == nil || *session.Props.EntityType == types.EntityTypeVersion {
if !m.shouldGenerateMediaMeta(ctx, d, session.Props.Uri.Name()) {
return
}
mediaMetaTask, err := NewMediaMetaTask(ctx, session.Props.Uri, session.EntityID, m.user)
if err != nil {
m.l.Warning("Failed to create media meta task: %s", err)
return
}
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, mediaMetaTask); err != nil {
m.l.Warning("Failed to queue media meta task: %s", err)
}
return
}
}