diff --git a/drivers/ftp/driver.go b/drivers/ftp/driver.go index b8deae8a..70fbabdc 100644 --- a/drivers/ftp/driver.go +++ b/drivers/ftp/driver.go @@ -64,7 +64,7 @@ func (d *FTP) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*m return nil, err } - r := NewFTPFileReader(d.conn, file.GetPath()) + r := NewFileReader(d.conn, file.GetPath(), file.GetSize()) link := &model.Link{ MFile: r, } diff --git a/drivers/ftp/util.go b/drivers/ftp/util.go index 8699ab6f..196d874c 100644 --- a/drivers/ftp/util.go +++ b/drivers/ftp/util.go @@ -4,6 +4,7 @@ import ( "io" "os" "sync" + "sync/atomic" "time" "github.com/jlaffaye/ftp" @@ -30,31 +31,32 @@ func (d *FTP) login() error { return nil } -// An FTP file reader that implements io.MFile for seeking. -type FTPFileReader struct { +// FileReader An FTP file reader that implements io.MFile for seeking. +type FileReader struct { conn *ftp.ServerConn resp *ftp.Response - offset int64 + offset atomic.Int64 readAtOffset int64 mu sync.Mutex path string + size int64 } -func NewFTPFileReader(conn *ftp.ServerConn, path string) *FTPFileReader { - return &FTPFileReader{ +func NewFileReader(conn *ftp.ServerConn, path string, size int64) *FileReader { + return &FileReader{ conn: conn, path: path, + size: size, } } -func (r *FTPFileReader) Read(buf []byte) (n int, err error) { - n, err = r.ReadAt(buf, r.offset) - r.mu.Lock() - defer r.mu.Unlock() - r.offset += int64(n) +func (r *FileReader) Read(buf []byte) (n int, err error) { + n, err = r.ReadAt(buf, r.offset.Load()) + r.offset.Add(int64(n)) return } -func (r *FTPFileReader) ReadAt(buf []byte, off int64) (n int, err error) { + +func (r *FileReader) ReadAt(buf []byte, off int64) (n int, err error) { if off < 0 { return -1, os.ErrInvalid } @@ -80,11 +82,8 @@ func (r *FTPFileReader) ReadAt(buf []byte, off int64) (n int, err error) { return } -func (r *FTPFileReader) Seek(offset int64, whence int) (int64, error) { - r.mu.Lock() - defer r.mu.Unlock() - - oldOffset := r.offset +func (r *FileReader) Seek(offset int64, whence int) (int64, error) { + oldOffset := r.offset.Load() var newOffset int64 switch whence { case io.SeekStart: @@ -92,11 +91,7 @@ func (r *FTPFileReader) Seek(offset int64, whence int) (int64, error) { case io.SeekCurrent: newOffset = oldOffset + offset case io.SeekEnd: - size, err := r.conn.FileSize(r.path) - if err != nil { - return oldOffset, err - } - newOffset = offset + int64(size) + return r.size, nil default: return -1, os.ErrInvalid } @@ -109,11 +104,11 @@ func (r *FTPFileReader) Seek(offset int64, whence int) (int64, error) { // offset not changed, so return directly return oldOffset, nil } - r.offset = newOffset + r.offset.Store(newOffset) return newOffset, nil } -func (r *FTPFileReader) Close() error { +func (r *FileReader) Close() error { if r.resp != nil { return r.resp.Close() }