fix data race in ray

pull/642/merge
Darien Raymond 7 years ago
parent 60f3562ac1
commit 5ae8bfbda1

@ -143,26 +143,38 @@ func (s *Stream) ReadTimeout(timeout time.Duration) (buf.MultiBuffer, error) {
}
}
func (s *Stream) Write(data buf.MultiBuffer) error {
if data.IsEmpty() {
func (s *Stream) waitForStreamSize() error {
if streamSizeLimit == 0 {
return nil
}
s.access.RLock()
defer s.access.RUnlock()
for streamSizeLimit > 0 && s.size >= streamSizeLimit {
select {
case <-s.ctx.Done():
return io.ErrClosedPipe
case <-s.readSignal:
s.access.RLock()
if s.err || s.close {
data.Release()
s.access.RUnlock()
return io.ErrClosedPipe
}
s.access.RUnlock()
}
}
return nil
}
func (s *Stream) Write(data buf.MultiBuffer) error {
if data.IsEmpty() {
return nil
}
if err := s.waitForStreamSize(); err != nil {
data.Release()
return err
}
s.access.Lock()
defer s.access.Unlock()

Loading…
Cancel
Save