mirror of https://github.com/cloudreve/Cloudreve
223 lines
6.4 KiB
Go
223 lines
6.4 KiB
Go
package explorer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/cloudreve/Cloudreve/v4/application/dependency"
|
|
"github.com/cloudreve/Cloudreve/v4/inventory"
|
|
"github.com/cloudreve/Cloudreve/v4/inventory/types"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/manager"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/request"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/serializer"
|
|
"github.com/gin-gonic/gin"
|
|
"strconv"
|
|
"time"
|
|
)
|
|
|
|
// CreateUploadSessionService 获取上传凭证服务
|
|
type (
|
|
CreateUploadSessionParameterCtx struct{}
|
|
CreateUploadSessionService struct {
|
|
Uri string `json:"uri" binding:"required"`
|
|
Size int64 `json:"size" binding:"min=0"`
|
|
LastModified int64 `json:"last_modified"`
|
|
MimeType string `json:"mime_type"`
|
|
PolicyID string `json:"policy_id"`
|
|
Metadata map[string]string `json:"metadata" binding:"max=256"`
|
|
EntityType string `json:"entity_type" binding:"eq=|eq=live_photo|eq=version"`
|
|
}
|
|
)
|
|
|
|
// Create 创建新的上传会话
|
|
func (service *CreateUploadSessionService) Create(c context.Context) (*UploadSessionResponse, error) {
|
|
dep := dependency.FromContext(c)
|
|
user := inventory.UserFromContext(c)
|
|
m := manager.NewFileManager(dep, user)
|
|
defer m.Recycle()
|
|
|
|
uri, err := fs.NewUriFromString(service.Uri)
|
|
if err != nil {
|
|
return nil, serializer.NewError(serializer.CodeParamErr, "unknown uri", err)
|
|
}
|
|
|
|
var entityType *types.EntityType
|
|
switch service.EntityType {
|
|
case "live_photo":
|
|
livePhoto := types.EntityTypeLivePhoto
|
|
entityType = &livePhoto
|
|
case "version":
|
|
version := types.EntityTypeVersion
|
|
entityType = &version
|
|
}
|
|
|
|
hasher := dep.HashIDEncoder()
|
|
policyId, err := hasher.Decode(service.PolicyID, hashid.PolicyID)
|
|
if err != nil {
|
|
return nil, serializer.NewError(serializer.CodeParamErr, "unknown policy id", err)
|
|
}
|
|
|
|
uploadRequest := &fs.UploadRequest{
|
|
Props: &fs.UploadProps{
|
|
Uri: uri,
|
|
Size: service.Size,
|
|
|
|
MimeType: service.MimeType,
|
|
Metadata: service.Metadata,
|
|
EntityType: entityType,
|
|
PreferredStoragePolicy: policyId,
|
|
},
|
|
}
|
|
|
|
if service.LastModified > 0 {
|
|
lastModified := time.UnixMilli(service.LastModified)
|
|
uploadRequest.Props.LastModified = &lastModified
|
|
}
|
|
|
|
credential, err := m.CreateUploadSession(c, uploadRequest)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return BuildUploadSessionResponse(credential, hasher), nil
|
|
}
|
|
|
|
type (
|
|
UploadParameterCtx struct{}
|
|
// UploadService 本机及从机策略上传服务
|
|
UploadService struct {
|
|
ID string `uri:"sessionId" binding:"required"`
|
|
Index int `uri:"index" form:"index" binding:"min=0"`
|
|
}
|
|
)
|
|
|
|
// LocalUpload 处理本机文件分片上传
|
|
func (service *UploadService) LocalUpload(c *gin.Context) error {
|
|
dep := dependency.FromContext(c)
|
|
kv := dep.KV()
|
|
|
|
uploadSessionRaw, ok := kv.Get(manager.UploadSessionCachePrefix + service.ID)
|
|
if !ok {
|
|
return serializer.NewError(serializer.CodeUploadSessionExpired, "", nil)
|
|
}
|
|
|
|
uploadSession := uploadSessionRaw.(fs.UploadSession)
|
|
|
|
user := inventory.UserFromContext(c)
|
|
m := manager.NewFileManager(dep, user)
|
|
defer m.Recycle()
|
|
|
|
if uploadSession.UID != user.ID {
|
|
return serializer.NewError(serializer.CodeUploadSessionExpired, "", nil)
|
|
}
|
|
|
|
// Confirm upload session and chunk index
|
|
placeholder, err := m.ConfirmUploadSession(c, &uploadSession, service.Index)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return processChunkUpload(c, m, &uploadSession, service.Index, placeholder, fs.ModeOverwrite)
|
|
}
|
|
|
|
// SlaveUpload 处理从机文件分片上传
|
|
func (service *UploadService) SlaveUpload(c *gin.Context) error {
|
|
dep := dependency.FromContext(c)
|
|
kv := dep.KV()
|
|
|
|
uploadSessionRaw, ok := kv.Get(manager.UploadSessionCachePrefix + service.ID)
|
|
if !ok {
|
|
return serializer.NewError(serializer.CodeUploadSessionExpired, "", nil)
|
|
}
|
|
|
|
uploadSession := uploadSessionRaw.(fs.UploadSession)
|
|
|
|
// Parse chunk index from query
|
|
service.Index, _ = strconv.Atoi(c.Query("chunk"))
|
|
|
|
m := manager.NewFileManager(dep, nil)
|
|
defer m.Recycle()
|
|
|
|
return processChunkUpload(c, m, &uploadSession, service.Index, nil, fs.ModeOverwrite)
|
|
}
|
|
|
|
func processChunkUpload(c *gin.Context, m manager.FileManager, session *fs.UploadSession, index int, file fs.File, mode fs.WriteMode) error {
|
|
// 取得并校验文件大小是否符合分片要求
|
|
chunkSize := session.ChunkSize
|
|
isLastChunk := session.ChunkSize == 0 || int64(index+1)*chunkSize >= session.Props.Size
|
|
expectedLength := chunkSize
|
|
if isLastChunk {
|
|
expectedLength = session.Props.Size - int64(index)*chunkSize
|
|
}
|
|
|
|
rc, fileSize, err := request.SniffContentLength(c.Request)
|
|
if err != nil || (expectedLength != fileSize) {
|
|
return serializer.NewError(
|
|
serializer.CodeInvalidContentLength,
|
|
fmt.Sprintf("Invalid Content-Length (expected: %d)", expectedLength),
|
|
err,
|
|
)
|
|
}
|
|
|
|
// 非首个分片时需要允许覆盖
|
|
if index > 0 {
|
|
mode |= fs.ModeOverwrite
|
|
}
|
|
|
|
req := &fs.UploadRequest{
|
|
File: rc,
|
|
Offset: chunkSize * int64(index),
|
|
Props: session.Props.Copy(),
|
|
Mode: mode,
|
|
}
|
|
|
|
// 执行上传
|
|
ctx := context.WithValue(c, cluster.SlaveNodeIDCtx{}, strconv.Itoa(session.Policy.NodeID))
|
|
err = m.Upload(ctx, req, session.Policy)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if rc, ok := req.File.(request.LimitReaderCloser); ok {
|
|
if rc.Count() != expectedLength {
|
|
err := fmt.Errorf("uploaded data(%d) does not match purposed size(%d)", rc.Count(), req.Props.Size)
|
|
return serializer.NewError(serializer.CodeIOFailed, "Uploaded data does not match purposed size", err)
|
|
}
|
|
}
|
|
|
|
// Finish upload
|
|
if isLastChunk {
|
|
_, err := m.CompleteUpload(ctx, session)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to complete upload: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type (
|
|
DeleteUploadSessionParameterCtx struct{}
|
|
DeleteUploadSessionService struct {
|
|
ID string `json:"id" binding:"required"`
|
|
Uri string `json:"uri" binding:"required"`
|
|
}
|
|
)
|
|
|
|
// Delete deletes the specified upload session
|
|
func (service *DeleteUploadSessionService) Delete(c *gin.Context) error {
|
|
dep := dependency.FromContext(c)
|
|
user := inventory.UserFromContext(c)
|
|
m := manager.NewFileManager(dep, user)
|
|
defer m.Recycle()
|
|
|
|
uri, err := fs.NewUriFromString(service.Uri)
|
|
if err != nil {
|
|
return serializer.NewError(serializer.CodeParamErr, "unknown uri", err)
|
|
}
|
|
|
|
return m.CancelUploadSession(c, uri, service.ID)
|
|
}
|