mirror of https://github.com/cloudreve/Cloudreve
190 lines
5.3 KiB
Go
190 lines
5.3 KiB
Go
package migrator
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"strconv"
|
|
|
|
"github.com/cloudreve/Cloudreve/v4/application/migrator/model"
|
|
"github.com/cloudreve/Cloudreve/v4/ent"
|
|
"github.com/cloudreve/Cloudreve/v4/inventory/types"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/conf"
|
|
)
|
|
|
|
func (m *Migrator) migrateFile() error {
|
|
m.l.Info("Migrating files...")
|
|
batchSize := 1000
|
|
offset := m.state.FileOffset
|
|
ctx := context.Background()
|
|
|
|
if m.state.FileConflictRename == nil {
|
|
m.state.FileConflictRename = make(map[uint]string)
|
|
}
|
|
|
|
if m.state.EntitySources == nil {
|
|
m.state.EntitySources = make(map[string]int)
|
|
}
|
|
|
|
if offset > 0 {
|
|
m.l.Info("Resuming file migration from offset %d", offset)
|
|
}
|
|
|
|
out:
|
|
for {
|
|
m.l.Info("Migrating files with offset %d", offset)
|
|
var files []model.File
|
|
if err := model.DB.Limit(batchSize).Offset(offset).Find(&files).Error; err != nil {
|
|
return fmt.Errorf("failed to list v3 files: %w", err)
|
|
}
|
|
|
|
if len(files) == 0 {
|
|
if m.dep.ConfigProvider().Database().Type == conf.PostgresDB {
|
|
m.l.Info("Resetting file ID sequence for postgres...")
|
|
m.v4client.File.ExecContext(ctx, "SELECT SETVAL('files_id_seq', (SELECT MAX(id) FROM files))")
|
|
}
|
|
break
|
|
}
|
|
|
|
tx, err := m.v4client.Tx(ctx)
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
return fmt.Errorf("failed to start transaction: %w", err)
|
|
}
|
|
|
|
for _, f := range files {
|
|
if _, ok := m.state.FolderIDs[int(f.FolderID)]; !ok {
|
|
m.l.Warning("Folder ID %d for file %d not found, skipping", f.FolderID, f.ID)
|
|
continue
|
|
}
|
|
|
|
if _, ok := m.state.UserIDs[int(f.UserID)]; !ok {
|
|
m.l.Warning("User ID %d for file %d not found, skipping", f.UserID, f.ID)
|
|
continue
|
|
}
|
|
|
|
if _, ok := m.state.PolicyIDs[int(f.PolicyID)]; !ok {
|
|
m.l.Warning("Policy ID %d for file %d not found, skipping", f.PolicyID, f.ID)
|
|
continue
|
|
}
|
|
|
|
metadata := make(map[string]string)
|
|
if f.Metadata != "" {
|
|
json.Unmarshal([]byte(f.Metadata), &metadata)
|
|
}
|
|
|
|
var (
|
|
thumbnail *ent.Entity
|
|
entity *ent.Entity
|
|
err error
|
|
)
|
|
|
|
if metadata[model.ThumbStatusMetadataKey] == model.ThumbStatusExist {
|
|
size := int64(0)
|
|
if m.state.LocalPolicyIDs[int(f.PolicyID)] {
|
|
thumbFile, err := os.Stat(f.SourceName + m.state.ThumbSuffix)
|
|
if err == nil {
|
|
size = thumbFile.Size()
|
|
}
|
|
m.l.Warning("Thumbnail file %s for file %d not found, use 0 size", f.SourceName+m.state.ThumbSuffix, f.ID)
|
|
}
|
|
// Insert thumbnail entity
|
|
thumbnail, err = m.insertEntity(tx, f.SourceName+m.state.ThumbSuffix, int(types.EntityTypeThumbnail), int(f.PolicyID), int(f.UserID), size)
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
return fmt.Errorf("failed to insert thumbnail entity: %w", err)
|
|
}
|
|
}
|
|
|
|
// Insert file version entity
|
|
entity, err = m.insertEntity(tx, f.SourceName, int(types.EntityTypeVersion), int(f.PolicyID), int(f.UserID), int64(f.Size))
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
return fmt.Errorf("failed to insert file version entity: %w", err)
|
|
}
|
|
|
|
fname := f.Name
|
|
if _, ok := m.state.FileConflictRename[f.ID]; ok {
|
|
fname = m.state.FileConflictRename[f.ID]
|
|
}
|
|
|
|
stm := tx.File.Create().
|
|
SetCreatedAt(formatTime(f.CreatedAt)).
|
|
SetUpdatedAt(formatTime(f.UpdatedAt)).
|
|
SetName(fname).
|
|
SetRawID(int(f.ID) + m.state.LastFolderID).
|
|
SetOwnerID(int(f.UserID)).
|
|
SetSize(int64(f.Size)).
|
|
SetPrimaryEntity(entity.ID).
|
|
SetFileChildren(int(f.FolderID)).
|
|
SetType(int(types.FileTypeFile)).
|
|
SetStoragePoliciesID(int(f.PolicyID)).
|
|
AddEntities(entity)
|
|
|
|
if thumbnail != nil {
|
|
stm.AddEntities(thumbnail)
|
|
}
|
|
|
|
if _, err := stm.Save(ctx); err != nil {
|
|
_ = tx.Rollback()
|
|
if ent.IsConstraintError(err) {
|
|
if _, ok := m.state.FileConflictRename[f.ID]; ok {
|
|
return fmt.Errorf("file %d already exists, but new name is already in conflict rename map, please resolve this manually", f.ID)
|
|
}
|
|
|
|
m.l.Warning("File %d already exists, will retry with new name in next batch", f.ID)
|
|
m.state.FileConflictRename[f.ID] = fmt.Sprintf("%d_%s", f.ID, f.Name)
|
|
continue out
|
|
}
|
|
return fmt.Errorf("failed to create file %d: %w", f.ID, err)
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return fmt.Errorf("failed to commit transaction: %w", err)
|
|
}
|
|
|
|
offset += batchSize
|
|
m.state.FileOffset = offset
|
|
if err := m.saveState(); err != nil {
|
|
m.l.Warning("Failed to save state after file batch: %s", err)
|
|
} else {
|
|
m.l.Info("Saved migration state after processing this batch")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Migrator) insertEntity(tx *ent.Tx, source string, entityType, policyID, createdBy int, size int64) (*ent.Entity, error) {
|
|
|
|
// find existing one
|
|
entityKey := strconv.Itoa(policyID) + "+" + source
|
|
if existingId, ok := m.state.EntitySources[entityKey]; ok {
|
|
existing, err := tx.Entity.UpdateOneID(existingId).
|
|
AddReferenceCount(1).
|
|
Save(context.Background())
|
|
if err == nil {
|
|
return existing, nil
|
|
}
|
|
m.l.Warning("Failed to update existing entity %d: %s, fallback to create new one.", existingId, err)
|
|
}
|
|
|
|
// create new one
|
|
e, err := tx.Entity.Create().
|
|
SetSource(source).
|
|
SetType(entityType).
|
|
SetSize(size).
|
|
SetStoragePolicyEntities(policyID).
|
|
SetCreatedBy(createdBy).
|
|
SetReferenceCount(1).
|
|
Save(context.Background())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create new entity: %w", err)
|
|
}
|
|
|
|
m.state.EntitySources[entityKey] = e.ID
|
|
return e, nil
|
|
}
|