fix(pikpak): webdav upload issue (#7050)

renovate/github.com-meilisearch-meilisearch-go-0.x
YangXu 2024-08-22 00:35:52 +08:00 committed by GitHub
parent 489b28bdf7
commit ef5e192c3b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 282 additions and 43 deletions

View File

@ -4,24 +4,18 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/alist-org/alist/v3/internal/op"
"golang.org/x/oauth2"
"io"
"net/http"
"strconv"
"strings"
"github.com/alist-org/alist/v3/drivers/base" "github.com/alist-org/alist/v3/drivers/base"
"github.com/alist-org/alist/v3/internal/driver" "github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/pkg/utils" "github.com/alist-org/alist/v3/pkg/utils"
hash_extend "github.com/alist-org/alist/v3/pkg/utils/hash" hash_extend "github.com/alist-org/alist/v3/pkg/utils/hash"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/oauth2"
"net/http"
"strconv"
"strings"
) )
type PikPak struct { type PikPak struct {
@ -123,10 +117,16 @@ func (d *PikPak) Init(ctx context.Context) (err error) {
d.AccessToken = token.AccessToken d.AccessToken = token.AccessToken
// 获取CaptchaToken // 获取CaptchaToken
err = d.RefreshCaptchaTokenAtLogin(GetAction(http.MethodGet, "https://api-drive.mypikpak.com/drive/v1/files"), d.Common.UserID) err = d.RefreshCaptchaTokenAtLogin(GetAction(http.MethodGet, "https://api-drive.mypikpak.com/drive/v1/files"), d.Username)
if err != nil { if err != nil {
return err return err
} }
// 获取用户ID
userID := token.Extra("sub").(string)
if userID != "" {
d.Common.SetUserID(userID)
}
// 更新UserAgent // 更新UserAgent
if d.Platform == "android" { if d.Platform == "android" {
d.Common.UserAgent = BuildCustomUserAgent(utils.GetMD5EncodeStr(d.Username+d.Password), AndroidClientID, AndroidPackageName, AndroidSdkVersion, AndroidClientVersion, AndroidPackageName, d.Common.UserID) d.Common.UserAgent = BuildCustomUserAgent(utils.GetMD5EncodeStr(d.Username+d.Password), AndroidClientID, AndroidPackageName, AndroidSdkVersion, AndroidClientVersion, AndroidPackageName, d.Common.UserID)
@ -271,27 +271,17 @@ func (d *PikPak) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
} }
params := resp.Resumable.Params params := resp.Resumable.Params
endpoint := strings.Join(strings.Split(params.Endpoint, ".")[1:], ".") //endpoint := strings.Join(strings.Split(params.Endpoint, ".")[1:], ".")
cfg := &aws.Config{ // web 端上传 返回的endpoint 为 `mypikpak.com` | android 端上传 返回的endpoint 为 `vip-lixian-07.mypikpak.com`·
Credentials: credentials.NewStaticCredentials(params.AccessKeyID, params.AccessKeySecret, params.SecurityToken), if d.Addition.Platform == "android" {
Region: aws.String("pikpak"), params.Endpoint = "mypikpak.com"
Endpoint: &endpoint,
} }
ss, err := session.NewSession(cfg)
if err != nil { if stream.GetSize() <= 10*utils.MB { // 文件大小 小于10MB改用普通模式上传
return err return d.UploadByOSS(&params, stream, up)
} }
uploader := s3manager.NewUploader(ss) // 分片上传
if stream.GetSize() > s3manager.MaxUploadParts*s3manager.DefaultUploadPartSize { return d.UploadByMultipart(&params, stream.GetSize(), stream, up)
uploader.PartSize = stream.GetSize() / (s3manager.MaxUploadParts - 1)
}
input := &s3manager.UploadInput{
Bucket: &params.Bucket,
Key: &params.Key,
Body: io.TeeReader(stream, driver.NewProgress(stream.GetSize(), up)),
}
_, err = uploader.UploadWithContext(ctx, input)
return err
} }
// 离线下载文件 // 离线下载文件

View File

@ -80,22 +80,24 @@ type UploadTaskData struct {
UploadType string `json:"upload_type"` UploadType string `json:"upload_type"`
//UPLOAD_TYPE_RESUMABLE //UPLOAD_TYPE_RESUMABLE
Resumable *struct { Resumable *struct {
Kind string `json:"kind"` Kind string `json:"kind"`
Params struct { Params S3Params `json:"params"`
AccessKeyID string `json:"access_key_id"` Provider string `json:"provider"`
AccessKeySecret string `json:"access_key_secret"`
Bucket string `json:"bucket"`
Endpoint string `json:"endpoint"`
Expiration time.Time `json:"expiration"`
Key string `json:"key"`
SecurityToken string `json:"security_token"`
} `json:"params"`
Provider string `json:"provider"`
} `json:"resumable"` } `json:"resumable"`
File File `json:"file"` File File `json:"file"`
} }
type S3Params struct {
AccessKeyID string `json:"access_key_id"`
AccessKeySecret string `json:"access_key_secret"`
Bucket string `json:"bucket"`
Endpoint string `json:"endpoint"`
Expiration time.Time `json:"expiration"`
Key string `json:"key"`
SecurityToken string `json:"security_token"`
}
// 添加离线下载响应 // 添加离线下载响应
type OfflineDownloadResp struct { type OfflineDownloadResp struct {
File *string `json:"file"` File *string `json:"file"`

View File

@ -1,17 +1,24 @@
package pikpak package pikpak
import ( import (
"bytes"
"crypto/md5" "crypto/md5"
"crypto/sha1" "crypto/sha1"
"encoding/hex" "encoding/hex"
"errors"
"fmt" "fmt"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op" "github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/pkg/utils" "github.com/alist-org/alist/v3/pkg/utils"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"io"
"net/http" "net/http"
"path/filepath"
"regexp" "regexp"
"strings" "strings"
"sync"
"time" "time"
"github.com/alist-org/alist/v3/drivers/base" "github.com/alist-org/alist/v3/drivers/base"
@ -56,6 +63,12 @@ var WebAlgorithms = []string{
"NhXXU9rg4XXdzo7u5o", "NhXXU9rg4XXdzo7u5o",
} }
const (
OSSUserAgent = "aliyun-sdk-android/2.9.13(Linux/Android 14/M2004j7ac;UKQ1.231108.001)"
OssSecurityTokenHeaderName = "X-OSS-Security-Token"
ThreadsNum = 10
)
const ( const (
AndroidClientID = "YNxT9w7GMdWvEOKa" AndroidClientID = "YNxT9w7GMdWvEOKa"
AndroidClientSecret = "dbw2OtmVEeuUvIptb1Coyg" AndroidClientSecret = "dbw2OtmVEeuUvIptb1Coyg"
@ -393,3 +406,237 @@ func (d *PikPak) refreshCaptchaToken(action string, metas map[string]string) err
d.Common.SetCaptchaToken(resp.CaptchaToken) d.Common.SetCaptchaToken(resp.CaptchaToken)
return nil return nil
} }
func (d *PikPak) UploadByOSS(params *S3Params, stream model.FileStreamer, up driver.UpdateProgress) error {
ossClient, err := oss.New(params.Endpoint, params.AccessKeyID, params.AccessKeySecret)
if err != nil {
return err
}
bucket, err := ossClient.Bucket(params.Bucket)
if err != nil {
return err
}
err = bucket.PutObject(params.Key, stream, OssOption(params)...)
if err != nil {
return err
}
return nil
}
func (d *PikPak) UploadByMultipart(params *S3Params, fileSize int64, stream model.FileStreamer, up driver.UpdateProgress) error {
var (
chunks []oss.FileChunk
parts []oss.UploadPart
imur oss.InitiateMultipartUploadResult
ossClient *oss.Client
bucket *oss.Bucket
err error
)
tmpF, err := stream.CacheFullInTempFile()
if err != nil {
return err
}
if ossClient, err = oss.New(params.Endpoint, params.AccessKeyID, params.AccessKeySecret); err != nil {
return err
}
if bucket, err = ossClient.Bucket(params.Bucket); err != nil {
return err
}
ticker := time.NewTicker(time.Hour * 12)
defer ticker.Stop()
// 设置超时
timeout := time.NewTimer(time.Hour * 24)
if chunks, err = SplitFile(fileSize); err != nil {
return err
}
if imur, err = bucket.InitiateMultipartUpload(params.Key,
oss.SetHeader(OssSecurityTokenHeaderName, params.SecurityToken),
oss.UserAgentHeader(OSSUserAgent),
); err != nil {
return err
}
wg := sync.WaitGroup{}
wg.Add(len(chunks))
chunksCh := make(chan oss.FileChunk)
errCh := make(chan error)
UploadedPartsCh := make(chan oss.UploadPart)
quit := make(chan struct{})
// producer
go chunksProducer(chunksCh, chunks)
go func() {
wg.Wait()
quit <- struct{}{}
}()
// consumers
for i := 0; i < ThreadsNum; i++ {
go func(threadId int) {
defer func() {
if r := recover(); r != nil {
errCh <- fmt.Errorf("recovered in %v", r)
}
}()
for chunk := range chunksCh {
var part oss.UploadPart // 出现错误就继续尝试共尝试3次
for retry := 0; retry < 3; retry++ {
select {
case <-ticker.C:
errCh <- errors.Wrap(err, "ossToken 过期")
default:
}
buf := make([]byte, chunk.Size)
if _, err = tmpF.ReadAt(buf, chunk.Offset); err != nil && !errors.Is(err, io.EOF) {
continue
}
b := bytes.NewBuffer(buf)
if part, err = bucket.UploadPart(imur, b, chunk.Size, chunk.Number, OssOption(params)...); err == nil {
break
}
}
if err != nil {
errCh <- errors.Wrap(err, fmt.Sprintf("上传 %s 的第%d个分片时出现错误%v", stream.GetName(), chunk.Number, err))
}
UploadedPartsCh <- part
}
}(i)
}
go func() {
for part := range UploadedPartsCh {
parts = append(parts, part)
wg.Done()
}
}()
LOOP:
for {
select {
case <-ticker.C:
// ossToken 过期
return err
case <-quit:
break LOOP
case <-errCh:
return err
case <-timeout.C:
return fmt.Errorf("time out")
}
}
// EOF错误是xml的Unmarshal导致的响应其实是json格式所以实际上上传是成功的
if _, err = bucket.CompleteMultipartUpload(imur, parts, OssOption(params)...); err != nil && !errors.Is(err, io.EOF) {
// 当文件名含有 &< 这两个字符之一时响应的xml解析会出现错误实际上上传是成功的
if filename := filepath.Base(stream.GetName()); !strings.ContainsAny(filename, "&<") {
return err
}
}
return nil
}
func chunksProducer(ch chan oss.FileChunk, chunks []oss.FileChunk) {
for _, chunk := range chunks {
ch <- chunk
}
}
func SplitFile(fileSize int64) (chunks []oss.FileChunk, err error) {
for i := int64(1); i < 10; i++ {
if fileSize < i*utils.GB { // 文件大小小于iGB时分为i*100片
if chunks, err = SplitFileByPartNum(fileSize, int(i*100)); err != nil {
return
}
break
}
}
if fileSize > 9*utils.GB { // 文件大小大于9GB时分为1000片
if chunks, err = SplitFileByPartNum(fileSize, 1000); err != nil {
return
}
}
// 单个分片大小不能小于1MB
if chunks[0].Size < 1*utils.MB {
if chunks, err = SplitFileByPartSize(fileSize, 1*utils.MB); err != nil {
return
}
}
return
}
// SplitFileByPartNum splits big file into parts by the num of parts.
// Split the file with specified parts count, returns the split result when error is nil.
func SplitFileByPartNum(fileSize int64, chunkNum int) ([]oss.FileChunk, error) {
if chunkNum <= 0 || chunkNum > 10000 {
return nil, errors.New("chunkNum invalid")
}
if int64(chunkNum) > fileSize {
return nil, errors.New("oss: chunkNum invalid")
}
var chunks []oss.FileChunk
chunk := oss.FileChunk{}
chunkN := (int64)(chunkNum)
for i := int64(0); i < chunkN; i++ {
chunk.Number = int(i + 1)
chunk.Offset = i * (fileSize / chunkN)
if i == chunkN-1 {
chunk.Size = fileSize/chunkN + fileSize%chunkN
} else {
chunk.Size = fileSize / chunkN
}
chunks = append(chunks, chunk)
}
return chunks, nil
}
// SplitFileByPartSize splits big file into parts by the size of parts.
// Splits the file by the part size. Returns the FileChunk when error is nil.
func SplitFileByPartSize(fileSize int64, chunkSize int64) ([]oss.FileChunk, error) {
if chunkSize <= 0 {
return nil, errors.New("chunkSize invalid")
}
chunkN := fileSize / chunkSize
if chunkN >= 10000 {
return nil, errors.New("Too many parts, please increase part size")
}
var chunks []oss.FileChunk
chunk := oss.FileChunk{}
for i := int64(0); i < chunkN; i++ {
chunk.Number = int(i + 1)
chunk.Offset = i * chunkSize
chunk.Size = chunkSize
chunks = append(chunks, chunk)
}
if fileSize%chunkSize > 0 {
chunk.Number = len(chunks) + 1
chunk.Offset = int64(len(chunks)) * chunkSize
chunk.Size = fileSize % chunkSize
chunks = append(chunks, chunk)
}
return chunks, nil
}
// OssOption get options
func OssOption(params *S3Params) []oss.Option {
options := []oss.Option{
oss.SetHeader(OssSecurityTokenHeaderName, params.SecurityToken),
oss.UserAgentHeader(OSSUserAgent),
}
return options
}