v2ray-core/common/buf/copy.go

122 lines
2.4 KiB
Go
Raw Normal View History

2017-06-04 19:32:01 +00:00
package buf
import (
"io"
"time"
2017-06-04 19:32:01 +00:00
"v2ray.com/core/common/errors"
"v2ray.com/core/common/signal"
)
type dataHandler func(MultiBuffer)
type copyHandler struct {
2018-11-15 18:59:23 +00:00
onData []dataHandler
2017-06-04 19:32:01 +00:00
}
2018-02-13 10:15:04 +00:00
// SizeCounter is for counting bytes copied by Copy().
2017-11-09 21:33:15 +00:00
type SizeCounter struct {
Size int64
}
2017-11-21 16:02:55 +00:00
// CopyOption is an option for copying data.
2017-06-04 19:32:01 +00:00
type CopyOption func(*copyHandler)
2017-11-21 16:02:55 +00:00
// UpdateActivity is a CopyOption to update activity on each data copy operation.
func UpdateActivity(timer signal.ActivityUpdater) CopyOption {
2017-06-04 19:32:01 +00:00
return func(handler *copyHandler) {
handler.onData = append(handler.onData, func(MultiBuffer) {
timer.Update()
})
}
}
2017-11-21 16:02:55 +00:00
// CountSize is a CopyOption that sums the total size of data copied into the given SizeCounter.
2017-11-09 21:33:15 +00:00
func CountSize(sc *SizeCounter) CopyOption {
return func(handler *copyHandler) {
handler.onData = append(handler.onData, func(b MultiBuffer) {
sc.Size += int64(b.Len())
})
}
}
2018-10-24 10:02:02 +00:00
type readError struct {
error
}
func (e readError) Error() string {
return e.error.Error()
}
func (e readError) Inner() error {
return e.error
}
func IsReadError(err error) bool {
_, ok := err.(readError)
return ok
}
type writeError struct {
error
}
func (e writeError) Error() string {
return e.error.Error()
}
func (e writeError) Inner() error {
return e.error
}
func IsWriteError(err error) bool {
_, ok := err.(writeError)
return ok
}
2017-06-04 19:32:01 +00:00
func copyInternal(reader Reader, writer Writer, handler *copyHandler) error {
for {
2018-11-15 18:59:23 +00:00
buffer, err := reader.ReadMultiBuffer()
if !buffer.IsEmpty() {
for _, handler := range handler.onData {
handler(buffer)
}
2018-11-15 18:59:23 +00:00
if werr := writer.WriteMultiBuffer(buffer); werr != nil {
2018-10-24 10:02:02 +00:00
return writeError{werr}
}
2018-02-19 16:50:53 +00:00
}
if err != nil {
2018-10-24 10:02:02 +00:00
return readError{err}
2017-06-04 19:32:01 +00:00
}
}
}
2017-11-21 16:02:55 +00:00
// Copy dumps all payload from reader to writer or stops when an error occurs. It returns nil when EOF.
2017-06-04 19:32:01 +00:00
func Copy(reader Reader, writer Writer, options ...CopyOption) error {
2018-08-09 11:30:44 +00:00
var handler copyHandler
2017-06-04 19:32:01 +00:00
for _, option := range options {
2018-11-15 20:16:54 +00:00
option(&handler)
2017-06-04 19:32:01 +00:00
}
2018-11-15 20:16:54 +00:00
err := copyInternal(reader, writer, &handler)
2017-06-04 19:32:01 +00:00
if err != nil && errors.Cause(err) != io.EOF {
return err
}
return nil
}
var ErrNotTimeoutReader = newError("not a TimeoutReader")
func CopyOnceTimeout(reader Reader, writer Writer, timeout time.Duration) error {
timeoutReader, ok := reader.(TimeoutReader)
if !ok {
return ErrNotTimeoutReader
}
mb, err := timeoutReader.ReadMultiBufferTimeout(timeout)
if err != nil {
return err
}
return writer.WriteMultiBuffer(mb)
}