fix ray stream

pull/255/merge
Darien Raymond 8 years ago
parent 56f08afd9c
commit 06c92e492d
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169

@ -2,8 +2,6 @@ package ray
import ( import (
"io" "io"
"sync"
"time"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
) )
@ -42,8 +40,6 @@ func (v *directRay) InboundOutput() InputStream {
} }
type Stream struct { type Stream struct {
access sync.RWMutex
closed bool
buffer chan *buf.Buffer buffer chan *buf.Buffer
} }
@ -54,72 +50,40 @@ func NewStream() *Stream {
} }
func (v *Stream) Read() (*buf.Buffer, error) { func (v *Stream) Read() (*buf.Buffer, error) {
if v.buffer == nil { buffer, open := <-v.buffer
return nil, io.EOF
}
v.access.RLock()
if v.buffer == nil {
v.access.RUnlock()
return nil, io.EOF
}
channel := v.buffer
v.access.RUnlock()
result, open := <-channel
if !open { if !open {
return nil, io.EOF return nil, io.EOF
} }
return result, nil return buffer, nil
} }
func (v *Stream) Write(data *buf.Buffer) error { func (v *Stream) Write(data *buf.Buffer) (err error) {
for !v.closed { defer func() {
err := v.TryWriteOnce(data) if r := recover(); r != nil {
if err != io.ErrNoProgress { err = io.ErrClosedPipe
return err
}
} }
return io.ErrClosedPipe }()
}
func (v *Stream) TryWriteOnce(data *buf.Buffer) error { v.buffer <- data
v.access.RLock()
defer v.access.RUnlock()
if v.closed {
return io.ErrClosedPipe
}
select {
case v.buffer <- data:
return nil return nil
case <-time.After(2 * time.Second):
return io.ErrNoProgress
}
} }
func (v *Stream) Close() { func (v *Stream) Close() {
if v.closed { defer swallowPanic()
return
}
v.access.Lock()
defer v.access.Unlock()
if v.closed {
return
}
v.closed = true
close(v.buffer) close(v.buffer)
} }
func (v *Stream) Release() { func (v *Stream) Release() {
if v.buffer == nil { defer swallowPanic()
return
} close(v.buffer)
v.Close()
v.access.Lock() for b := range v.buffer {
defer v.access.Unlock() b.Release()
if v.buffer == nil {
return
}
for data := range v.buffer {
data.Release()
} }
v.buffer = nil }
func swallowPanic() {
recover()
} }

@ -0,0 +1,42 @@
package ray_test
import (
"io"
"testing"
"v2ray.com/core/common/buf"
"v2ray.com/core/testing/assert"
. "v2ray.com/core/transport/ray"
)
func TestStreamIO(t *testing.T) {
assert := assert.On(t)
stream := NewStream()
assert.Error(stream.Write(buf.New())).IsNil()
_, err := stream.Read()
assert.Error(err).IsNil()
stream.Close()
_, err = stream.Read()
assert.Error(err).Equals(io.EOF)
err = stream.Write(buf.New())
assert.Error(err).Equals(io.ErrClosedPipe)
}
func TestStreamClose(t *testing.T) {
assert := assert.On(t)
stream := NewStream()
assert.Error(stream.Write(buf.New())).IsNil()
stream.Close()
_, err := stream.Read()
assert.Error(err).IsNil()
_, err = stream.Read()
assert.Error(err).Equals(io.EOF)
}
Loading…
Cancel
Save