Cloudreve/pkg/filemanager/chunk/chunk.go

192 lines
4.6 KiB
Go

package chunk
import (
"context"
"fmt"
"io"
"os"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/chunk/backoff"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/request"
"github.com/cloudreve/Cloudreve/v4/pkg/util"
)
const bufferTempPattern = "cdChunk.*.tmp"
// ChunkProcessFunc callback function for processing a chunk
type ChunkProcessFunc func(c *ChunkGroup, chunk io.Reader) error
// ChunkGroup manage groups of chunks
type ChunkGroup struct {
file *fs.UploadRequest
chunkSize int64
backoff backoff.Backoff
enableRetryBuffer bool
l logging.Logger
currentIndex int
chunkNum int64
bufferTemp *os.File
tempPath string
}
func NewChunkGroup(file *fs.UploadRequest, chunkSize int64, backoff backoff.Backoff, useBuffer bool, l logging.Logger, tempPath string) *ChunkGroup {
c := &ChunkGroup{
file: file,
chunkSize: chunkSize,
backoff: backoff,
currentIndex: -1,
enableRetryBuffer: useBuffer,
l: l,
tempPath: tempPath,
}
if c.chunkSize == 0 {
c.chunkSize = c.file.Props.Size
}
if c.file.Props.Size == 0 {
c.chunkNum = 1
} else {
c.chunkNum = c.file.Props.Size / c.chunkSize
if c.file.Props.Size%c.chunkSize != 0 {
c.chunkNum++
}
}
return c
}
// TempAvailable returns if current chunk temp file is available to be read
func (c *ChunkGroup) TempAvailable() bool {
if c.bufferTemp != nil {
state, _ := c.bufferTemp.Stat()
return state != nil && state.Size() == c.Length()
}
return false
}
// Process a chunk with retry logic
func (c *ChunkGroup) Process(processor ChunkProcessFunc) error {
reader := io.LimitReader(c.file, c.Length())
// If useBuffer is enabled, tee the reader to a temp file
if c.enableRetryBuffer && c.bufferTemp == nil && !c.file.Seekable() {
var err error
c.bufferTemp, err = os.CreateTemp(util.DataPath(c.tempPath), bufferTempPattern)
if err != nil {
c.l.Warning("Failed to create temp chunk buffer file: %s", err)
}
reader = &omitErrorTeeReader{
r: reader,
w: c.bufferTemp,
}
}
if c.bufferTemp != nil {
defer func() {
if c.bufferTemp != nil {
c.bufferTemp.Close()
os.Remove(c.bufferTemp.Name())
c.bufferTemp = nil
}
}()
// if temp buffer file is available, use it
if c.TempAvailable() {
if _, err := c.bufferTemp.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("failed to seek temp file back to chunk start: %w", err)
}
c.l.Debug("Chunk %d will be read from temp file %q.", c.Index(), c.bufferTemp.Name())
reader = io.NopCloser(c.bufferTemp)
}
}
err := processor(c, reader)
if err != nil {
if c.enableRetryBuffer {
request.BlackHole(reader)
}
if err != context.Canceled && (c.file.Seekable() || c.TempAvailable()) && c.backoff.Next(err) {
if c.file.Seekable() {
if _, seekErr := c.file.Seek(c.Start(), io.SeekStart); seekErr != nil {
return fmt.Errorf("failed to seek back to chunk start: %w, last error: %s", seekErr, err)
}
}
c.l.Debug("Retrying chunk %d, last error: %s", c.currentIndex, err)
return c.Process(processor)
}
return err
}
c.l.Debug("Chunk %d processed", c.currentIndex)
return nil
}
// Start returns the byte index of current chunk
func (c *ChunkGroup) Start() int64 {
return int64(int64(c.Index()) * c.chunkSize)
}
// Total returns the total length
func (c *ChunkGroup) Total() int64 {
return int64(c.file.Props.Size)
}
// Num returns the total chunk number
func (c *ChunkGroup) Num() int {
return int(c.chunkNum)
}
// RangeHeader returns header value of Content-Range
func (c *ChunkGroup) RangeHeader() string {
return fmt.Sprintf("bytes %d-%d/%d", c.Start(), c.Start()+c.Length()-1, c.Total())
}
// Index returns current chunk index, starts from 0
func (c *ChunkGroup) Index() int {
return c.currentIndex
}
// Next switch to next chunk, returns whether all chunks are processed
func (c *ChunkGroup) Next() bool {
c.currentIndex++
c.backoff.Reset()
return c.currentIndex < int(c.chunkNum)
}
// Length returns the length of current chunk
func (c *ChunkGroup) Length() int64 {
contentLength := c.chunkSize
if c.Index() == int(c.chunkNum-1) {
contentLength = c.file.Props.Size - c.chunkSize*(c.chunkNum-1)
}
return int64(contentLength)
}
// IsLast returns if current chunk is the last one
func (c *ChunkGroup) IsLast() bool {
return c.Index() == int(c.chunkNum-1)
}
type omitErrorTeeReader struct {
r io.Reader
w io.Writer
}
func (t *omitErrorTeeReader) Read(p []byte) (n int, err error) {
n, err = t.r.Read(p)
if n > 0 {
_, _ = t.w.Write(p[:n])
}
return
}