mirror of https://github.com/Xhofe/alist
perf: use io copy with buffer pool (#6389)
* feat: add io methods with buffer * chore: move io.Copy calls to utils.CopyWithBufferpull/6397/head
parent
ec08ecdf6c
commit
b95df1d745
|
@ -194,7 +194,7 @@ func (d *Pan123) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
|
||||||
defer func() {
|
defer func() {
|
||||||
_ = tempFile.Close()
|
_ = tempFile.Close()
|
||||||
}()
|
}()
|
||||||
if _, err = io.Copy(h, tempFile); err != nil {
|
if _, err = utils.CopyWithBuffer(h, tempFile); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = tempFile.Seek(0, io.SeekStart)
|
_, err = tempFile.Seek(0, io.SeekStart)
|
||||||
|
|
|
@ -595,7 +595,7 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode
|
||||||
}
|
}
|
||||||
|
|
||||||
silceMd5.Reset()
|
silceMd5.Reset()
|
||||||
if _, err := io.CopyN(io.MultiWriter(fileMd5, silceMd5), tempFile, byteSize); err != nil && err != io.EOF {
|
if _, err := utils.CopyWithBufferN(io.MultiWriter(fileMd5, silceMd5), tempFile, byteSize); err != nil && err != io.EOF {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
md5Byte := silceMd5.Sum(nil)
|
md5Byte := silceMd5.Sum(nil)
|
||||||
|
|
|
@ -194,7 +194,7 @@ func (d *AliDrive) Put(ctx context.Context, dstDir model.Obj, streamer model.Fil
|
||||||
}
|
}
|
||||||
if d.RapidUpload {
|
if d.RapidUpload {
|
||||||
buf := bytes.NewBuffer(make([]byte, 0, 1024))
|
buf := bytes.NewBuffer(make([]byte, 0, 1024))
|
||||||
io.CopyN(buf, file, 1024)
|
utils.CopyWithBufferN(buf, file, 1024)
|
||||||
reqBody["pre_hash"] = utils.HashData(utils.SHA1, buf.Bytes())
|
reqBody["pre_hash"] = utils.HashData(utils.SHA1, buf.Bytes())
|
||||||
if localFile != nil {
|
if localFile != nil {
|
||||||
if _, err := localFile.Seek(0, io.SeekStart); err != nil {
|
if _, err := localFile.Seek(0, io.SeekStart); err != nil {
|
||||||
|
|
|
@ -136,7 +136,7 @@ func (d *AliyundriveOpen) calProofCode(stream model.FileStreamer) (string, error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
_, err = io.CopyN(buf, reader, length)
|
_, err = utils.CopyWithBufferN(buf, reader, length)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
|
@ -211,7 +211,7 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F
|
||||||
if i == count {
|
if i == count {
|
||||||
byteSize = lastBlockSize
|
byteSize = lastBlockSize
|
||||||
}
|
}
|
||||||
_, err := io.CopyN(io.MultiWriter(fileMd5H, sliceMd5H, slicemd5H2Write), tempFile, byteSize)
|
_, err := utils.CopyWithBufferN(io.MultiWriter(fileMd5H, sliceMd5H, slicemd5H2Write), tempFile, byteSize)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -261,7 +261,7 @@ func (d *BaiduPhoto) Put(ctx context.Context, dstDir model.Obj, stream model.Fil
|
||||||
if i == count {
|
if i == count {
|
||||||
byteSize = lastBlockSize
|
byteSize = lastBlockSize
|
||||||
}
|
}
|
||||||
_, err := io.CopyN(io.MultiWriter(fileMd5H, sliceMd5H, slicemd5H2Write), tempFile, byteSize)
|
_, err := utils.CopyWithBufferN(io.MultiWriter(fileMd5H, sliceMd5H, slicemd5H2Write), tempFile, byteSize)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -229,7 +229,7 @@ func (d *ChaoXing) Put(ctx context.Context, dstDir model.Obj, stream model.FileS
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = io.Copy(filePart, stream)
|
_, err = utils.CopyWithBuffer(filePart, stream)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -271,7 +271,7 @@ func (d *ILanZou) Put(ctx context.Context, dstDir model.Obj, stream model.FileSt
|
||||||
defer func() {
|
defer func() {
|
||||||
_ = tempFile.Close()
|
_ = tempFile.Close()
|
||||||
}()
|
}()
|
||||||
if _, err = io.Copy(h, tempFile); err != nil {
|
if _, err = utils.CopyWithBuffer(h, tempFile); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
_, err = tempFile.Seek(0, io.SeekStart)
|
_, err = tempFile.Seek(0, io.SeekStart)
|
||||||
|
|
|
@ -206,7 +206,7 @@ func (d *MediaTrack) Put(ctx context.Context, dstDir model.Obj, stream model.Fil
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
h := md5.New()
|
h := md5.New()
|
||||||
_, err = io.Copy(h, tempFile)
|
_, err = utils.CopyWithBuffer(h, tempFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"crypto/sha1"
|
"crypto/sha1"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
|
"github.com/alist-org/alist/v3/pkg/utils"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
@ -141,7 +142,7 @@ func getGcid(r io.Reader, size int64) (string, error) {
|
||||||
readSize := calcBlockSize(size)
|
readSize := calcBlockSize(size)
|
||||||
for {
|
for {
|
||||||
hash2.Reset()
|
hash2.Reset()
|
||||||
if n, err := io.CopyN(hash2, r, readSize); err != nil && n == 0 {
|
if n, err := utils.CopyWithBufferN(hash2, r, readSize); err != nil && n == 0 {
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
|
@ -143,7 +143,7 @@ func (d *QuarkOrUC) Put(ctx context.Context, dstDir model.Obj, stream model.File
|
||||||
_ = tempFile.Close()
|
_ = tempFile.Close()
|
||||||
}()
|
}()
|
||||||
m := md5.New()
|
m := md5.New()
|
||||||
_, err = io.Copy(m, tempFile)
|
_, err = utils.CopyWithBuffer(m, tempFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -153,7 +153,7 @@ func (d *QuarkOrUC) Put(ctx context.Context, dstDir model.Obj, stream model.File
|
||||||
}
|
}
|
||||||
md5Str := hex.EncodeToString(m.Sum(nil))
|
md5Str := hex.EncodeToString(m.Sum(nil))
|
||||||
s := sha1.New()
|
s := sha1.New()
|
||||||
_, err = io.Copy(s, tempFile)
|
_, err = utils.CopyWithBuffer(s, tempFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
package smb
|
package smb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"github.com/alist-org/alist/v3/pkg/utils"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
|
@ -74,7 +74,7 @@ func (d *SMB) CopyFile(src, dst string) error {
|
||||||
}
|
}
|
||||||
defer dstfd.Close()
|
defer dstfd.Close()
|
||||||
|
|
||||||
if _, err = io.Copy(dstfd, srcfd); err != nil {
|
if _, err = utils.CopyWithBuffer(dstfd, srcfd); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if srcinfo, err = d.fs.Stat(src); err != nil {
|
if srcinfo, err = d.fs.Stat(src); err != nil {
|
||||||
|
|
|
@ -190,7 +190,7 @@ func getGcid(r io.Reader, size int64) (string, error) {
|
||||||
readSize := calcBlockSize(size)
|
readSize := calcBlockSize(size)
|
||||||
for {
|
for {
|
||||||
hash2.Reset()
|
hash2.Reset()
|
||||||
if n, err := io.CopyN(hash2, r, readSize); err != nil && n == 0 {
|
if n, err := utils.CopyWithBufferN(hash2, r, readSize); err != nil && n == 0 {
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/alist-org/alist/v3/pkg/utils"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -271,7 +272,7 @@ func (d *downloader) tryDownloadChunk(params *HttpRequestParams, ch *chunk) (int
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := io.Copy(ch.buf, resp.Body)
|
n, err := utils.CopyWithBuffer(ch.buf, resp.Body)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return n, &errReadingBody{err: err}
|
return n, &errReadingBody{err: err}
|
||||||
|
|
|
@ -162,7 +162,7 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time
|
||||||
pw.CloseWithError(err)
|
pw.CloseWithError(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if _, err := io.CopyN(part, reader, ra.Length); err != nil {
|
if _, err := utils.CopyWithBufferN(part, reader, ra.Length); err != nil {
|
||||||
pw.CloseWithError(err)
|
pw.CloseWithError(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -182,7 +182,7 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time
|
||||||
w.WriteHeader(code)
|
w.WriteHeader(code)
|
||||||
|
|
||||||
if r.Method != "HEAD" {
|
if r.Method != "HEAD" {
|
||||||
written, err := io.CopyN(w, sendContent, sendSize)
|
written, err := utils.CopyWithBufferN(w, sendContent, sendSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("ServeHttp error. err: %s ", err)
|
log.Warnf("ServeHttp error. err: %s ", err)
|
||||||
if written != sendSize {
|
if written != sendSize {
|
||||||
|
|
|
@ -2,6 +2,7 @@ package net
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/alist-org/alist/v3/pkg/utils"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"mime/multipart"
|
"mime/multipart"
|
||||||
|
@ -330,7 +331,7 @@ func GetRangedHttpReader(readCloser io.ReadCloser, offset, length int64) (io.Rea
|
||||||
log.Warnf("offset is more than 100MB, if loading data from internet, high-latency and wasting of bandwidth is expected")
|
log.Warnf("offset is more than 100MB, if loading data from internet, high-latency and wasting of bandwidth is expected")
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := io.Copy(io.Discard, io.LimitReader(readCloser, offset)); err != nil {
|
if _, err := utils.CopyWithBuffer(io.Discard, io.LimitReader(readCloser, offset)); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -104,7 +104,7 @@ func (f *FileStream) RangeRead(httpRange http_range.Range) (io.Reader, error) {
|
||||||
if httpRange.Start == 0 && httpRange.Length <= InMemoryBufMaxSizeBytes && f.peekBuff == nil {
|
if httpRange.Start == 0 && httpRange.Length <= InMemoryBufMaxSizeBytes && f.peekBuff == nil {
|
||||||
bufSize := utils.Min(httpRange.Length, f.GetSize())
|
bufSize := utils.Min(httpRange.Length, f.GetSize())
|
||||||
newBuf := bytes.NewBuffer(make([]byte, 0, bufSize))
|
newBuf := bytes.NewBuffer(make([]byte, 0, bufSize))
|
||||||
n, err := io.CopyN(newBuf, f.Reader, bufSize)
|
n, err := utils.CopyWithBufferN(newBuf, f.Reader, bufSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/alist-org/alist/v3/pkg/utils"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
@ -419,7 +420,7 @@ func (c *Client) ReadStreamRange(path string, offset, length int64) (io.ReadClos
|
||||||
// stream in rs.Body
|
// stream in rs.Body
|
||||||
if rs.StatusCode == 200 {
|
if rs.StatusCode == 200 {
|
||||||
// discard first 'offset' bytes.
|
// discard first 'offset' bytes.
|
||||||
if _, err := io.Copy(io.Discard, io.LimitReader(rs.Body, offset)); err != nil {
|
if _, err := utils.CopyWithBuffer(io.Discard, io.LimitReader(rs.Body, offset)); err != nil {
|
||||||
return nil, newPathErrorErr("ReadStreamRange", path, err)
|
return nil, newPathErrorErr("ReadStreamRange", path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@ func CopyFile(src, dst string) error {
|
||||||
}
|
}
|
||||||
defer dstfd.Close()
|
defer dstfd.Close()
|
||||||
|
|
||||||
if _, err = io.Copy(dstfd, srcfd); err != nil {
|
if _, err = CopyWithBuffer(dstfd, srcfd); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if srcinfo, err = os.Stat(src); err != nil {
|
if srcinfo, err = os.Stat(src); err != nil {
|
||||||
|
@ -121,7 +121,7 @@ func CreateTempFile(r io.Reader, size int64) (*os.File, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
readBytes, err := io.Copy(f, r)
|
readBytes, err := CopyWithBuffer(f, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = os.Remove(f.Name())
|
_ = os.Remove(f.Name())
|
||||||
return nil, errs.NewErr(err, "CreateTempFile failed")
|
return nil, errs.NewErr(err, "CreateTempFile failed")
|
||||||
|
|
|
@ -96,7 +96,7 @@ func HashData(hashType *HashType, data []byte, params ...any) string {
|
||||||
// HashReader get hash of one hashType from a reader
|
// HashReader get hash of one hashType from a reader
|
||||||
func HashReader(hashType *HashType, reader io.Reader, params ...any) (string, error) {
|
func HashReader(hashType *HashType, reader io.Reader, params ...any) (string, error) {
|
||||||
h := hashType.NewFunc(params...)
|
h := hashType.NewFunc(params...)
|
||||||
_, err := io.Copy(h, reader)
|
_, err := CopyWithBuffer(h, reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", errs.NewErr(err, "HashReader error")
|
return "", errs.NewErr(err, "HashReader error")
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"io"
|
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -36,7 +35,7 @@ var hashTestSet = []hashTest{
|
||||||
func TestMultiHasher(t *testing.T) {
|
func TestMultiHasher(t *testing.T) {
|
||||||
for _, test := range hashTestSet {
|
for _, test := range hashTestSet {
|
||||||
mh := NewMultiHasher([]*HashType{MD5, SHA1, SHA256})
|
mh := NewMultiHasher([]*HashType{MD5, SHA1, SHA256})
|
||||||
n, err := io.Copy(mh, bytes.NewBuffer(test.input))
|
n, err := CopyWithBuffer(mh, bytes.NewBuffer(test.input))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Len(t, test.input, int(n))
|
assert.Len(t, test.input, int(n))
|
||||||
hashInfo := mh.GetHashInfo()
|
hashInfo := mh.GetHashInfo()
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/exp/constraints"
|
"golang.org/x/exp/constraints"
|
||||||
|
@ -29,7 +30,7 @@ func CopyWithCtx(ctx context.Context, out io.Writer, in io.Reader, size int64, p
|
||||||
// possible in the call process.
|
// possible in the call process.
|
||||||
var finish int64 = 0
|
var finish int64 = 0
|
||||||
s := size / 100
|
s := size / 100
|
||||||
_, err := io.Copy(out, readerFunc(func(p []byte) (int, error) {
|
_, err := CopyWithBuffer(out, readerFunc(func(p []byte) (int, error) {
|
||||||
// golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations
|
// golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations
|
||||||
select {
|
select {
|
||||||
// if context has been canceled
|
// if context has been canceled
|
||||||
|
@ -204,3 +205,31 @@ func Max[T constraints.Ordered](a, b T) T {
|
||||||
}
|
}
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var IoBuffPool = &sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return make([]byte, 32*1024*2) // Two times of size in io package
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func CopyWithBuffer(dst io.Writer, src io.Reader) (written int64, err error) {
|
||||||
|
buff := IoBuffPool.Get().([]byte)
|
||||||
|
defer IoBuffPool.Put(buff)
|
||||||
|
written, err = io.CopyBuffer(dst, src, buff)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return written, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func CopyWithBufferN(dst io.Writer, src io.Reader, n int64) (written int64, err error) {
|
||||||
|
written, err = CopyWithBuffer(dst, io.LimitReader(src, n))
|
||||||
|
if written == n {
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
if written < n && err == nil {
|
||||||
|
// src stopped early; must have been EOF.
|
||||||
|
err = io.EOF
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue