Cloudreve/pkg/filemanager/driver/oss/oss.go

554 lines
16 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package oss
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/url"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/boolset"
"github.com/cloudreve/Cloudreve/v4/pkg/cluster/routes"
"github.com/cloudreve/Cloudreve/v4/pkg/conf"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/chunk"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/chunk/backoff"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/driver"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs/mime"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/request"
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
"github.com/cloudreve/Cloudreve/v4/pkg/util"
"github.com/samber/lo"
)
// UploadPolicy 阿里云OSS上传策略
type UploadPolicy struct {
Expiration string `json:"expiration"`
Conditions []interface{} `json:"conditions"`
}
// CallbackPolicy 回调策略
type CallbackPolicy struct {
CallbackURL string `json:"callbackUrl"`
CallbackBody string `json:"callbackBody"`
CallbackBodyType string `json:"callbackBodyType"`
CallbackSNI bool `json:"callbackSNI"`
}
// Driver 阿里云OSS策略适配器
type Driver struct {
policy *ent.StoragePolicy
client *oss.Client
bucket *oss.Bucket
settings setting.Provider
l logging.Logger
config conf.ConfigProvider
mime mime.MimeDetector
httpClient request.Client
chunkSize int64
}
type key int
const (
chunkRetrySleep = time.Duration(5) * time.Second
uploadIdParam = "uploadId"
partNumberParam = "partNumber"
callbackParam = "callback"
completeAllHeader = "x-oss-complete-all"
maxDeleteBatch = 1000
// MultiPartUploadThreshold 服务端使用分片上传的阈值
MultiPartUploadThreshold int64 = 5 * (1 << 30) // 5GB
)
var (
features = &boolset.BooleanSet{}
)
func New(ctx context.Context, policy *ent.StoragePolicy, settings setting.Provider,
config conf.ConfigProvider, l logging.Logger, mime mime.MimeDetector) (*Driver, error) {
chunkSize := policy.Settings.ChunkSize
if policy.Settings.ChunkSize == 0 {
chunkSize = 25 << 20 // 25 MB
}
driver := &Driver{
policy: policy,
settings: settings,
chunkSize: chunkSize,
config: config,
l: l,
mime: mime,
httpClient: request.NewClient(config, request.WithLogger(l)),
}
return driver, driver.InitOSSClient(false)
}
// CORS 创建跨域策略
func (handler *Driver) CORS() error {
return handler.client.SetBucketCORS(handler.policy.BucketName, []oss.CORSRule{
{
AllowedOrigin: []string{"*"},
AllowedMethod: []string{
"GET",
"POST",
"PUT",
"DELETE",
"HEAD",
},
ExposeHeader: []string{},
AllowedHeader: []string{"*"},
MaxAgeSeconds: 3600,
},
})
}
// InitOSSClient 初始化OSS鉴权客户端
func (handler *Driver) InitOSSClient(forceUsePublicEndpoint bool) error {
if handler.policy == nil {
return errors.New("empty policy")
}
opt := make([]oss.ClientOption, 0)
// 决定是否使用内网 Endpoint
endpoint := handler.policy.Server
if handler.policy.Settings.ServerSideEndpoint != "" && !forceUsePublicEndpoint {
endpoint = handler.policy.Settings.ServerSideEndpoint
} else if handler.policy.Settings.UseCname {
opt = append(opt, oss.UseCname(true))
}
if !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") {
endpoint = "https://" + endpoint
}
// 初始化客户端
client, err := oss.New(endpoint, handler.policy.AccessKey, handler.policy.SecretKey, opt...)
if err != nil {
return err
}
handler.client = client
// 初始化存储桶
bucket, err := client.Bucket(handler.policy.BucketName)
if err != nil {
return err
}
handler.bucket = bucket
return nil
}
// List 列出OSS上的文件
func (handler *Driver) List(ctx context.Context, base string, onProgress driver.ListProgressFunc, recursive bool) ([]fs.PhysicalObject, error) {
// 列取文件
base = strings.TrimPrefix(base, "/")
if base != "" {
base += "/"
}
var (
delimiter string
marker string
objects []oss.ObjectProperties
commons []string
)
if !recursive {
delimiter = "/"
}
for {
subRes, err := handler.bucket.ListObjects(oss.Marker(marker), oss.Prefix(base),
oss.MaxKeys(1000), oss.Delimiter(delimiter))
if err != nil {
return nil, err
}
objects = append(objects, subRes.Objects...)
commons = append(commons, subRes.CommonPrefixes...)
marker = subRes.NextMarker
if marker == "" {
break
}
}
// 处理列取结果
res := make([]fs.PhysicalObject, 0, len(objects)+len(commons))
// 处理目录
for _, object := range commons {
rel, err := filepath.Rel(base, object)
if err != nil {
continue
}
res = append(res, fs.PhysicalObject{
Name: path.Base(object),
RelativePath: filepath.ToSlash(rel),
Size: 0,
IsDir: true,
LastModify: time.Now(),
})
}
onProgress(len(commons))
// 处理文件
for _, object := range objects {
rel, err := filepath.Rel(base, object.Key)
if err != nil {
continue
}
res = append(res, fs.PhysicalObject{
Name: path.Base(object.Key),
Source: object.Key,
RelativePath: filepath.ToSlash(rel),
Size: object.Size,
IsDir: false,
LastModify: object.LastModified,
})
}
onProgress(len(res))
return res, nil
}
// Get 获取文件
func (handler *Driver) Open(ctx context.Context, path string) (*os.File, error) {
return nil, errors.New("not implemented")
}
// Put 将文件流保存到指定目录
func (handler *Driver) Put(ctx context.Context, file *fs.UploadRequest) error {
defer file.Close()
// 凭证有效期
credentialTTL := handler.settings.UploadSessionTTL(ctx)
mimeType := file.Props.MimeType
if mimeType == "" {
handler.mime.TypeByName(file.Props.Uri.Name())
}
// 是否允许覆盖
overwrite := file.Mode&fs.ModeOverwrite == fs.ModeOverwrite
options := []oss.Option{
oss.WithContext(ctx),
oss.Expires(time.Now().Add(credentialTTL * time.Second)),
oss.ForbidOverWrite(!overwrite),
oss.ContentType(mimeType),
}
// 小文件直接上传
if file.Props.Size < MultiPartUploadThreshold {
return handler.bucket.PutObject(file.Props.SavePath, file, options...)
}
// 超过阈值时使用分片上传
imur, err := handler.bucket.InitiateMultipartUpload(file.Props.SavePath, options...)
if err != nil {
return fmt.Errorf("failed to initiate multipart upload: %w", err)
}
parts := make([]oss.UploadPart, 0)
chunks := chunk.NewChunkGroup(file, handler.chunkSize, &backoff.ConstantBackoff{
Max: handler.settings.ChunkRetryLimit(ctx),
Sleep: chunkRetrySleep,
}, handler.settings.UseChunkBuffer(ctx), handler.l, handler.settings.TempPath(ctx))
uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error {
part, err := handler.bucket.UploadPart(imur, content, current.Length(), current.Index()+1, oss.WithContext(ctx))
if err == nil {
parts = append(parts, part)
}
return err
}
for chunks.Next() {
if err := chunks.Process(uploadFunc); err != nil {
handler.cancelUpload(imur)
return fmt.Errorf("failed to upload chunk #%d: %w", chunks.Index(), err)
}
}
_, err = handler.bucket.CompleteMultipartUpload(imur, parts, oss.ForbidOverWrite(!overwrite), oss.WithContext(ctx))
if err != nil {
handler.cancelUpload(imur)
}
return err
}
// Delete 删除一个或多个文件,
// 返回未删除的文件
func (handler *Driver) Delete(ctx context.Context, files ...string) ([]string, error) {
groups := lo.Chunk(files, maxDeleteBatch)
failed := make([]string, 0)
var lastError error
for index, group := range groups {
handler.l.Debug("Process delete group #%d: %v", index, group)
// 删除文件
delRes, err := handler.bucket.DeleteObjects(group)
if err != nil {
failed = append(failed, group...)
lastError = err
continue
}
// 统计未删除的文件
failed = append(failed, util.SliceDifference(files, delRes.DeletedObjects)...)
}
if len(failed) > 0 && lastError == nil {
lastError = fmt.Errorf("failed to delete files: %v", failed)
}
return failed, lastError
}
// Thumb 获取文件缩略图
func (handler *Driver) Thumb(ctx context.Context, expire *time.Time, ext string, e fs.Entity) (string, error) {
usePublicEndpoint := true
if forceUsePublicEndpoint, ok := ctx.Value(driver.ForceUsePublicEndpointCtx{}).(bool); ok {
usePublicEndpoint = forceUsePublicEndpoint
}
// 初始化客户端
if err := handler.InitOSSClient(usePublicEndpoint); err != nil {
return "", err
}
w, h := handler.settings.ThumbSize(ctx)
thumbParam := fmt.Sprintf("image/resize,m_lfit,h_%d,w_%d", h, w)
thumbOption := []oss.Option{oss.Process(thumbParam)}
thumbURL, err := handler.signSourceURL(
ctx,
e.Source(),
expire,
thumbOption,
false,
)
if err != nil {
return "", err
}
return thumbURL, nil
}
// Source 获取外链URL
func (handler *Driver) Source(ctx context.Context, e fs.Entity, args *driver.GetSourceArgs) (string, error) {
// 初始化客户端
usePublicEndpoint := true
if forceUsePublicEndpoint, ok := ctx.Value(driver.ForceUsePublicEndpointCtx{}).(bool); ok {
usePublicEndpoint = forceUsePublicEndpoint
}
if err := handler.InitOSSClient(usePublicEndpoint); err != nil {
return "", err
}
// 添加各项设置
var signOptions = make([]oss.Option, 0, 2)
if args.IsDownload {
encodedFilename := url.PathEscape(args.DisplayName)
signOptions = append(signOptions, oss.ResponseContentDisposition(fmt.Sprintf(`attachment; filename="%s"; filename*=UTF-8''%s`,
encodedFilename, encodedFilename)))
}
if args.Speed > 0 {
// Byte 转换为 bit
args.Speed *= 8
// OSS对速度值有范围限制
if args.Speed < 819200 {
args.Speed = 819200
}
if args.Speed > 838860800 {
args.Speed = 838860800
}
signOptions = append(signOptions, oss.TrafficLimitParam(args.Speed))
}
return handler.signSourceURL(ctx, e.Source(), args.Expire, signOptions, false)
}
func (handler *Driver) signSourceURL(ctx context.Context, path string, expire *time.Time, options []oss.Option, forceSign bool) (string, error) {
ttl := int64(86400 * 365 * 20)
if expire != nil {
ttl = int64(time.Until(*expire).Seconds())
}
signedURL, err := handler.bucket.SignURL(path, oss.HTTPGet, ttl, options...)
if err != nil {
return "", err
}
// 将最终生成的签名URL域名换成用户自定义的加速域名如果有
finalURL, err := url.Parse(signedURL)
if err != nil {
return "", err
}
// 公有空间替换掉Key及不支持的头
if !handler.policy.IsPrivate && !forceSign {
query := finalURL.Query()
query.Del("OSSAccessKeyId")
query.Del("Signature")
query.Del("response-content-disposition")
query.Del("x-oss-traffic-limit")
finalURL.RawQuery = query.Encode()
}
return finalURL.String(), nil
}
// Token 获取上传策略和认证Token
func (handler *Driver) Token(ctx context.Context, uploadSession *fs.UploadSession, file *fs.UploadRequest) (*fs.UploadCredential, error) {
// 初始化客户端
if err := handler.InitOSSClient(true); err != nil {
return nil, err
}
// 生成回调地址
siteURL := handler.settings.SiteURL(setting.UseFirstSiteUrl(ctx))
// 在从机端创建上传会话
uploadSession.ChunkSize = handler.chunkSize
uploadSession.Callback = routes.MasterSlaveCallbackUrl(siteURL, types.PolicyTypeOss, uploadSession.Props.UploadSessionID, uploadSession.CallbackSecret).String()
// 回调策略
callbackPolicy := CallbackPolicy{
CallbackURL: uploadSession.Callback,
CallbackBody: `{"name":${x:fname},"source_name":${object},"size":${size},"pic_info":"${imageInfo.width},${imageInfo.height}"}`,
CallbackBodyType: "application/json",
CallbackSNI: true,
}
callbackPolicyJSON, err := json.Marshal(callbackPolicy)
if err != nil {
return nil, fmt.Errorf("failed to encode callback policy: %w", err)
}
callbackPolicyEncoded := base64.StdEncoding.EncodeToString(callbackPolicyJSON)
mimeType := file.Props.MimeType
if mimeType == "" {
handler.mime.TypeByName(file.Props.Uri.Name())
}
// 初始化分片上传
options := []oss.Option{
oss.WithContext(ctx),
oss.Expires(uploadSession.Props.ExpireAt),
oss.ForbidOverWrite(true),
oss.ContentType(mimeType),
}
imur, err := handler.bucket.InitiateMultipartUpload(file.Props.SavePath, options...)
if err != nil {
return nil, fmt.Errorf("failed to initialize multipart upload: %w", err)
}
uploadSession.UploadID = imur.UploadID
// 为每个分片签名上传 URL
chunks := chunk.NewChunkGroup(file, handler.chunkSize, &backoff.ConstantBackoff{}, false, handler.l, "")
urls := make([]string, chunks.Num())
ttl := int64(time.Until(uploadSession.Props.ExpireAt).Seconds())
for chunks.Next() {
err := chunks.Process(func(c *chunk.ChunkGroup, chunk io.Reader) error {
signedURL, err := handler.bucket.SignURL(file.Props.SavePath, oss.HTTPPut,
ttl,
oss.AddParam(partNumberParam, strconv.Itoa(c.Index()+1)),
oss.AddParam(uploadIdParam, imur.UploadID),
oss.ContentType("application/octet-stream"))
if err != nil {
return err
}
urls[c.Index()] = signedURL
return nil
})
if err != nil {
return nil, err
}
}
// 签名完成分片上传的URL
completeURL, err := handler.bucket.SignURL(file.Props.SavePath, oss.HTTPPost, ttl,
oss.ContentType("application/octet-stream"),
oss.AddParam(uploadIdParam, imur.UploadID),
oss.Expires(time.Now().Add(time.Duration(ttl)*time.Second)),
oss.SetHeader(completeAllHeader, "yes"),
oss.ForbidOverWrite(true),
oss.AddParam(callbackParam, callbackPolicyEncoded))
if err != nil {
return nil, err
}
return &fs.UploadCredential{
UploadID: imur.UploadID,
UploadURLs: urls,
CompleteURL: completeURL,
SessionID: uploadSession.Props.UploadSessionID,
ChunkSize: handler.chunkSize,
}, nil
}
// 取消上传凭证
func (handler *Driver) CancelToken(ctx context.Context, uploadSession *fs.UploadSession) error {
return handler.bucket.AbortMultipartUpload(oss.InitiateMultipartUploadResult{UploadID: uploadSession.UploadID, Key: uploadSession.Props.SavePath}, oss.WithContext(ctx))
}
func (handler *Driver) CompleteUpload(ctx context.Context, session *fs.UploadSession) error {
return nil
}
func (handler *Driver) Capabilities() *driver.Capabilities {
mediaMetaExts := handler.policy.Settings.MediaMetaExts
if !handler.policy.Settings.NativeMediaProcessing {
mediaMetaExts = nil
}
return &driver.Capabilities{
StaticFeatures: features,
MediaMetaSupportedExts: mediaMetaExts,
MediaMetaProxy: handler.policy.Settings.MediaMetaGeneratorProxy,
ThumbSupportedExts: handler.policy.Settings.ThumbExts,
ThumbProxy: handler.policy.Settings.ThumbGeneratorProxy,
ThumbSupportAllExts: handler.policy.Settings.ThumbSupportAllExts,
ThumbMaxSize: handler.policy.Settings.ThumbMaxSize,
}
}
func (handler *Driver) MediaMeta(ctx context.Context, path, ext string) ([]driver.MediaMeta, error) {
if util.ContainsString(supportedImageExt, ext) {
return handler.extractImageMeta(ctx, path)
}
if util.ContainsString(supportedVideoExt, ext) {
return handler.extractIMMMeta(ctx, path, videoInfoProcess)
}
if util.ContainsString(supportedAudioExt, ext) {
return handler.extractIMMMeta(ctx, path, audioInfoProcess)
}
return nil, fmt.Errorf("unsupported media type in oss: %s", ext)
}
func (handler *Driver) LocalPath(ctx context.Context, path string) string {
return ""
}
func (handler *Driver) cancelUpload(imur oss.InitiateMultipartUploadResult) {
if err := handler.bucket.AbortMultipartUpload(imur); err != nil {
handler.l.Warning("failed to abort multipart upload: %s", err)
}
}