Cloudreve/pkg/filemanager/driver/ks3/ks3.go

552 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package ks3
import (
"context"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/aws/request"
"io"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"time"
"strconv"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/boolset"
"github.com/cloudreve/Cloudreve/v4/pkg/cluster/routes"
"github.com/cloudreve/Cloudreve/v4/pkg/conf"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/chunk"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/chunk/backoff"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/driver"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs/mime"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/serializer"
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
"github.com/ks3sdklib/aws-sdk-go/aws/awserr"
"github.com/ks3sdklib/aws-sdk-go/service/s3/s3manager"
"github.com/samber/lo"
"github.com/ks3sdklib/aws-sdk-go/aws"
"github.com/ks3sdklib/aws-sdk-go/aws/credentials"
"github.com/ks3sdklib/aws-sdk-go/service/s3"
)
// Driver KS3 compatible driver
type Driver struct {
policy *ent.StoragePolicy
chunkSize int64
settings setting.Provider
l logging.Logger
config conf.ConfigProvider
mime mime.MimeDetector
sess *aws.Config
svc *s3.S3
}
// UploadPolicy KS3上传策略
type UploadPolicy struct {
Expiration string `json:"expiration"`
Conditions []interface{} `json:"conditions"`
}
type Session struct {
Config *aws.Config
Handlers request.Handlers
}
// MetaData 文件信息
type MetaData struct {
Size int64
Etag string
}
var (
features = &boolset.BooleanSet{}
)
func init() {
boolset.Sets(map[driver.HandlerCapability]bool{
driver.HandlerCapabilityUploadSentinelRequired: true,
}, features)
}
func Int64(v int64) *int64 {
return &v
}
func New(ctx context.Context, policy *ent.StoragePolicy, settings setting.Provider,
config conf.ConfigProvider, l logging.Logger, mime mime.MimeDetector) (*Driver, error) {
chunkSize := policy.Settings.ChunkSize
if policy.Settings.ChunkSize == 0 {
chunkSize = 25 << 20 // 25 MB
}
driver := &Driver{
policy: policy,
settings: settings,
chunkSize: chunkSize,
config: config,
l: l,
mime: mime,
}
sess := aws.Config{
Credentials: credentials.NewStaticCredentials(policy.AccessKey, policy.SecretKey, ""),
Endpoint: policy.Server,
Region: policy.Settings.Region,
S3ForcePathStyle: policy.Settings.S3ForcePathStyle,
}
driver.sess = &sess
driver.svc = s3.New(&sess)
return driver, nil
}
// List 列出给定路径下的文件
func (handler *Driver) List(ctx context.Context, base string, onProgress driver.ListProgressFunc, recursive bool) ([]fs.PhysicalObject, error) {
// 初始化列目录参数
base = strings.TrimPrefix(base, "/")
if base != "" {
base += "/"
}
opt := &s3.ListObjectsInput{
Bucket: &handler.policy.BucketName,
Prefix: &base,
MaxKeys: Int64(1000),
}
// 是否为递归列出
if !recursive {
opt.Delimiter = aws.String("/")
}
var (
objects []*s3.Object
commons []*s3.CommonPrefix
)
for {
res, err := handler.svc.ListObjectsWithContext(ctx, opt)
if err != nil {
return nil, err
}
objects = append(objects, res.Contents...)
commons = append(commons, res.CommonPrefixes...)
// 如果本次未列取完则继续使用marker获取结果
if *res.IsTruncated {
opt.Marker = res.NextMarker
} else {
break
}
}
// 处理列取结果
res := make([]fs.PhysicalObject, 0, len(objects)+len(commons))
// 处理目录
for _, object := range commons {
rel, err := filepath.Rel(*opt.Prefix, *object.Prefix)
if err != nil {
continue
}
res = append(res, fs.PhysicalObject{
Name: path.Base(*object.Prefix),
RelativePath: filepath.ToSlash(rel),
Size: 0,
IsDir: true,
LastModify: time.Now(),
})
}
onProgress(len(commons))
// 处理文件
for _, object := range objects {
rel, err := filepath.Rel(*opt.Prefix, *object.Key)
if err != nil {
continue
}
res = append(res, fs.PhysicalObject{
Name: path.Base(*object.Key),
Source: *object.Key,
RelativePath: filepath.ToSlash(rel),
Size: *object.Size,
IsDir: false,
LastModify: time.Now(),
})
}
onProgress(len(objects))
return res, nil
}
// Open 打开文件
func (handler *Driver) Open(ctx context.Context, path string) (*os.File, error) {
return nil, errors.New("not implemented")
}
// Put 将文件流保存到指定目录
func (handler *Driver) Put(ctx context.Context, file *fs.UploadRequest) error {
defer file.Close()
// 是否允许覆盖
overwrite := file.Mode&fs.ModeOverwrite == fs.ModeOverwrite
if !overwrite {
// Check for duplicated file
if _, err := handler.Meta(ctx, file.Props.SavePath); err == nil {
return fs.ErrFileExisted
}
}
// 初始化配置
uploader := s3manager.NewUploader(&s3manager.UploadOptions{
S3: handler.svc, // S3Client实例必填
PartSize: handler.chunkSize, // 分块大小默认为5MB非必填
})
mimeType := file.Props.MimeType
if mimeType == "" {
handler.mime.TypeByName(file.Props.Uri.Name())
}
_, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: &handler.policy.BucketName,
Key: &file.Props.SavePath,
Body: io.LimitReader(file, file.Props.Size),
ContentType: aws.String(mimeType),
})
if err != nil {
return err
}
return nil
}
// Delete 删除文件
func (handler *Driver) Delete(ctx context.Context, files ...string) ([]string, error) {
failed := make([]string, 0, len(files))
batchSize := handler.policy.Settings.S3DeleteBatchSize
if batchSize == 0 {
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
// The request can contain a list of up to 1000 keys that you want to delete.
batchSize = 1000
}
var lastErr error
groups := lo.Chunk(files, batchSize)
for _, group := range groups {
if len(group) == 1 {
// Invoke single file delete API
_, err := handler.svc.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
Bucket: &handler.policy.BucketName,
Key: &group[0],
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
// Ignore NoSuchKey error
if aerr.Code() == s3.ErrCodeNoSuchKey {
continue
}
}
failed = append(failed, group[0])
lastErr = err
}
} else {
// Invoke batch delete API
res, err := handler.svc.DeleteObjects(
&s3.DeleteObjectsInput{
Bucket: &handler.policy.BucketName,
Delete: &s3.Delete{
Objects: lo.Map(group, func(s string, i int) *s3.ObjectIdentifier {
return &s3.ObjectIdentifier{Key: &s}
}),
},
})
if err != nil {
failed = append(failed, group...)
lastErr = err
continue
}
for _, v := range res.Errors {
handler.l.Debug("Failed to delete file: %s, Code:%s, Message:%s", v.Key, v.Code, v.Key)
failed = append(failed, *v.Key)
}
}
}
return failed, lastErr
}
// Thumb 获取缩略图URL
func (handler *Driver) Thumb(ctx context.Context, expire *time.Time, ext string, e fs.Entity) (string, error) {
return "", errors.New("not implemented")
}
// Source 获取文件外链
func (handler *Driver) Source(ctx context.Context, e fs.Entity, args *driver.GetSourceArgs) (string, error) {
var contentDescription *string
if args.IsDownload {
encodedFilename := url.PathEscape(args.DisplayName)
contentDescription = aws.String(fmt.Sprintf(`attachment; filename="%s"`, encodedFilename))
}
// 确保过期时间不小于 0 ,如果小于则设置为 7 天
var ttl int64
if args.Expire != nil {
ttl = int64(time.Until(*args.Expire).Seconds())
} else {
ttl = 604800
}
downloadUrl, err := handler.svc.GeneratePresignedUrl(&s3.GeneratePresignedUrlInput{
HTTPMethod: s3.GET, // 请求方法
Bucket: &handler.policy.BucketName, // 存储空间名称
Key: aws.String(e.Source()), // 对象的key
Expires: ttl, // 过期时间,转换为秒数
ResponseContentDisposition: contentDescription, // 设置响应头部 Content-Disposition
})
if err != nil {
return "", err
}
// 将最终生成的签名URL域名换成用户自定义的加速域名如果有
finalURL, err := url.Parse(downloadUrl)
if err != nil {
return "", err
}
// 公有空间替换掉Key及不支持的头
if !handler.policy.IsPrivate {
finalURL.RawQuery = ""
}
return finalURL.String(), nil
}
// Token 获取上传凭证
func (handler *Driver) Token(ctx context.Context, uploadSession *fs.UploadSession, file *fs.UploadRequest) (*fs.UploadCredential, error) {
// Check for duplicated file
if _, err := handler.Meta(ctx, file.Props.SavePath); err == nil {
return nil, fs.ErrFileExisted
}
// 生成回调地址
siteURL := handler.settings.SiteURL(setting.UseFirstSiteUrl(ctx))
// 在从机端创建上传会话
uploadSession.ChunkSize = handler.chunkSize
uploadSession.Callback = routes.MasterSlaveCallbackUrl(siteURL, types.PolicyTypeKs3, uploadSession.Props.UploadSessionID, uploadSession.CallbackSecret).String()
mimeType := file.Props.MimeType
if mimeType == "" {
handler.mime.TypeByName(file.Props.Uri.Name())
}
// 创建分片上传
res, err := handler.svc.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
Bucket: &handler.policy.BucketName,
Key: &uploadSession.Props.SavePath,
Expires: &uploadSession.Props.ExpireAt,
ContentType: aws.String(mimeType),
})
if err != nil {
return nil, fmt.Errorf("failed to create multipart upload: %w", err)
}
uploadSession.UploadID = *res.UploadID
// 为每个分片签名上传 URL
chunks := chunk.NewChunkGroup(file, handler.chunkSize, &backoff.ConstantBackoff{}, false, handler.l, "")
urls := make([]string, chunks.Num())
for chunks.Next() {
err := chunks.Process(func(c *chunk.ChunkGroup, chunk io.Reader) error {
// 计算过期时间(秒)
expireSeconds := int(time.Until(uploadSession.Props.ExpireAt).Seconds())
partNumber := c.Index() + 1
// 生成预签名URL
signedURL, err := handler.svc.GeneratePresignedUrl(&s3.GeneratePresignedUrlInput{
HTTPMethod: s3.PUT,
Bucket: &handler.policy.BucketName,
Key: &uploadSession.Props.SavePath,
Expires: int64(expireSeconds),
Parameters: map[string]*string{
"partNumber": aws.String(strconv.Itoa(partNumber)),
"uploadId": res.UploadID,
},
ContentType: aws.String("application/octet-stream"),
})
if err != nil {
return fmt.Errorf("failed to generate presigned upload url for chunk %d: %w", partNumber, err)
}
urls[c.Index()] = signedURL
return nil
})
if err != nil {
return nil, err
}
}
// 签名完成分片上传的请求URL
expireSeconds := int(time.Until(uploadSession.Props.ExpireAt).Seconds())
signedURL, err := handler.svc.GeneratePresignedUrl(&s3.GeneratePresignedUrlInput{
HTTPMethod: s3.POST,
Bucket: &handler.policy.BucketName,
Key: &file.Props.SavePath,
Expires: int64(expireSeconds),
Parameters: map[string]*string{
"uploadId": res.UploadID,
},
ContentType: aws.String("application/octet-stream"),
})
if err != nil {
return nil, err
}
// 生成上传凭证
return &fs.UploadCredential{
UploadID: *res.UploadID,
UploadURLs: urls,
CompleteURL: signedURL,
SessionID: uploadSession.Props.UploadSessionID,
ChunkSize: handler.chunkSize,
}, nil
}
// CancelToken 取消上传凭证
func (handler *Driver) CancelToken(ctx context.Context, uploadSession *fs.UploadSession) error {
_, err := handler.svc.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{
UploadID: &uploadSession.UploadID,
Bucket: &handler.policy.BucketName,
Key: &uploadSession.Props.SavePath,
})
return err
}
// cancelUpload 取消分片上传
func (handler *Driver) cancelUpload(key, id *string) {
if _, err := handler.svc.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: &handler.policy.BucketName,
UploadID: id,
Key: key,
}); err != nil {
handler.l.Warning("failed to abort multipart upload: %s", err)
}
}
// Capabilities 获取存储能力
func (handler *Driver) Capabilities() *driver.Capabilities {
return &driver.Capabilities{
StaticFeatures: features,
MediaMetaProxy: handler.policy.Settings.MediaMetaGeneratorProxy,
ThumbProxy: handler.policy.Settings.ThumbGeneratorProxy,
MaxSourceExpire: time.Duration(604800) * time.Second,
}
}
// MediaMeta 获取媒体元信息
func (handler *Driver) MediaMeta(ctx context.Context, path, ext string) ([]driver.MediaMeta, error) {
return nil, errors.New("not implemented")
}
// LocalPath 获取本地路径
func (handler *Driver) LocalPath(ctx context.Context, path string) string {
return ""
}
// CompleteUpload 完成上传
func (handler *Driver) CompleteUpload(ctx context.Context, session *fs.UploadSession) error {
if session.SentinelTaskID == 0 {
return nil
}
// Make sure uploaded file size is correct
res, err := handler.Meta(ctx, session.Props.SavePath)
if err != nil {
return fmt.Errorf("failed to get uploaded file size: %w", err)
}
if res.Size != session.Props.Size {
return serializer.NewError(
serializer.CodeMetaMismatch,
fmt.Sprintf("File size not match, expected: %d, actual: %d", session.Props.Size, res.Size),
nil,
)
}
return nil
}
// Meta 获取文件元信息
func (handler *Driver) Meta(ctx context.Context, path string) (*MetaData, error) {
res, err := handler.svc.HeadObjectWithContext(ctx,
&s3.HeadObjectInput{
Bucket: &handler.policy.BucketName,
Key: &path,
})
if err != nil {
return nil, err
}
return &MetaData{
Size: *res.ContentLength,
Etag: *res.ETag,
}, nil
}
// CORS 设置CORS规则
func (handler *Driver) CORS() error {
rule := s3.CORSRule{
AllowedMethod: []string{
"GET",
"POST",
"PUT",
"DELETE",
"HEAD",
},
AllowedOrigin: []string{"*"},
AllowedHeader: []string{"*"},
ExposeHeader: []string{"ETag"},
MaxAgeSeconds: 3600,
}
_, err := handler.svc.PutBucketCORS(&s3.PutBucketCORSInput{
Bucket: &handler.policy.BucketName,
CORSConfiguration: &s3.CORSConfiguration{
Rules: []*s3.CORSRule{&rule},
},
})
return err
}
// Reader 读取器
type Reader struct {
r io.Reader
}
// Read 读取数据
func (r Reader) Read(p []byte) (int, error) {
return r.r.Read(p)
}