Cloudreve/application/migrator/file.go

190 lines
5.3 KiB
Go

package migrator
import (
"context"
"encoding/json"
"fmt"
"os"
"strconv"
"github.com/cloudreve/Cloudreve/v4/application/migrator/model"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/conf"
)
func (m *Migrator) migrateFile() error {
m.l.Info("Migrating files...")
batchSize := 1000
offset := m.state.FileOffset
ctx := context.Background()
if m.state.FileConflictRename == nil {
m.state.FileConflictRename = make(map[uint]string)
}
if m.state.EntitySources == nil {
m.state.EntitySources = make(map[string]int)
}
if offset > 0 {
m.l.Info("Resuming file migration from offset %d", offset)
}
out:
for {
m.l.Info("Migrating files with offset %d", offset)
var files []model.File
if err := model.DB.Limit(batchSize).Offset(offset).Find(&files).Error; err != nil {
return fmt.Errorf("failed to list v3 files: %w", err)
}
if len(files) == 0 {
if m.dep.ConfigProvider().Database().Type == conf.PostgresDB {
m.l.Info("Resetting file ID sequence for postgres...")
m.v4client.File.ExecContext(ctx, "SELECT SETVAL('files_id_seq', (SELECT MAX(id) FROM files))")
}
break
}
tx, err := m.v4client.Tx(ctx)
if err != nil {
_ = tx.Rollback()
return fmt.Errorf("failed to start transaction: %w", err)
}
for _, f := range files {
if _, ok := m.state.FolderIDs[int(f.FolderID)]; !ok {
m.l.Warning("Folder ID %d for file %d not found, skipping", f.FolderID, f.ID)
continue
}
if _, ok := m.state.UserIDs[int(f.UserID)]; !ok {
m.l.Warning("User ID %d for file %d not found, skipping", f.UserID, f.ID)
continue
}
if _, ok := m.state.PolicyIDs[int(f.PolicyID)]; !ok {
m.l.Warning("Policy ID %d for file %d not found, skipping", f.PolicyID, f.ID)
continue
}
metadata := make(map[string]string)
if f.Metadata != "" {
json.Unmarshal([]byte(f.Metadata), &metadata)
}
var (
thumbnail *ent.Entity
entity *ent.Entity
err error
)
if metadata[model.ThumbStatusMetadataKey] == model.ThumbStatusExist {
size := int64(0)
if m.state.LocalPolicyIDs[int(f.PolicyID)] {
thumbFile, err := os.Stat(f.SourceName + m.state.ThumbSuffix)
if err == nil {
size = thumbFile.Size()
}
m.l.Warning("Thumbnail file %s for file %d not found, use 0 size", f.SourceName+m.state.ThumbSuffix, f.ID)
}
// Insert thumbnail entity
thumbnail, err = m.insertEntity(tx, f.SourceName+m.state.ThumbSuffix, int(types.EntityTypeThumbnail), int(f.PolicyID), int(f.UserID), size)
if err != nil {
_ = tx.Rollback()
return fmt.Errorf("failed to insert thumbnail entity: %w", err)
}
}
// Insert file version entity
entity, err = m.insertEntity(tx, f.SourceName, int(types.EntityTypeVersion), int(f.PolicyID), int(f.UserID), int64(f.Size))
if err != nil {
_ = tx.Rollback()
return fmt.Errorf("failed to insert file version entity: %w", err)
}
fname := f.Name
if _, ok := m.state.FileConflictRename[f.ID]; ok {
fname = m.state.FileConflictRename[f.ID]
}
stm := tx.File.Create().
SetCreatedAt(formatTime(f.CreatedAt)).
SetUpdatedAt(formatTime(f.UpdatedAt)).
SetName(fname).
SetRawID(int(f.ID) + m.state.LastFolderID).
SetOwnerID(int(f.UserID)).
SetSize(int64(f.Size)).
SetPrimaryEntity(entity.ID).
SetFileChildren(int(f.FolderID)).
SetType(int(types.FileTypeFile)).
SetStoragePoliciesID(int(f.PolicyID)).
AddEntities(entity)
if thumbnail != nil {
stm.AddEntities(thumbnail)
}
if _, err := stm.Save(ctx); err != nil {
_ = tx.Rollback()
if ent.IsConstraintError(err) {
if _, ok := m.state.FileConflictRename[f.ID]; ok {
return fmt.Errorf("file %d already exists, but new name is already in conflict rename map, please resolve this manually", f.ID)
}
m.l.Warning("File %d already exists, will retry with new name in next batch", f.ID)
m.state.FileConflictRename[f.ID] = fmt.Sprintf("%d_%s", f.ID, f.Name)
continue out
}
return fmt.Errorf("failed to create file %d: %w", f.ID, err)
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
offset += batchSize
m.state.FileOffset = offset
if err := m.saveState(); err != nil {
m.l.Warning("Failed to save state after file batch: %s", err)
} else {
m.l.Info("Saved migration state after processing this batch")
}
}
return nil
}
func (m *Migrator) insertEntity(tx *ent.Tx, source string, entityType, policyID, createdBy int, size int64) (*ent.Entity, error) {
// find existing one
entityKey := strconv.Itoa(policyID) + "+" + source
if existingId, ok := m.state.EntitySources[entityKey]; ok {
existing, err := tx.Entity.UpdateOneID(existingId).
AddReferenceCount(1).
Save(context.Background())
if err == nil {
return existing, nil
}
m.l.Warning("Failed to update existing entity %d: %s, fallback to create new one.", existingId, err)
}
// create new one
e, err := tx.Entity.Create().
SetSource(source).
SetType(entityType).
SetSize(size).
SetStoragePolicyEntities(policyID).
SetCreatedBy(createdBy).
SetReferenceCount(1).
Save(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to create new entity: %w", err)
}
m.state.EntitySources[entityKey] = e.ID
return e, nil
}