diff --git a/transport/ray/direct.go b/transport/ray/direct.go index 30f5b70c..802c756e 100644 --- a/transport/ray/direct.go +++ b/transport/ray/direct.go @@ -1,8 +1,10 @@ package ray import ( + "errors" "io" "sync" + "time" "github.com/v2ray/v2ray-core/common/alloc" ) @@ -11,6 +13,10 @@ const ( bufferSize = 128 ) +var ( + ErrorIOTimeout = errors.New("IO Timeout") +) + // NewRay creates a new Ray for direct traffic transport. func NewRay() Ray { return &directRay{ @@ -74,13 +80,26 @@ func (this *Stream) Write(data *alloc.Buffer) error { if this.closed { return io.EOF } + for { + err := this.TryWriteOnce(data) + if err != ErrorIOTimeout { + return err + } + } +} + +func (this *Stream) TryWriteOnce(data *alloc.Buffer) error { this.access.RLock() defer this.access.RUnlock() if this.closed { return io.EOF } - this.buffer <- data - return nil + select { + case this.buffer <- data: + return nil + case <-time.Tick(time.Second): + return ErrorIOTimeout + } } func (this *Stream) Close() {