mirror of https://github.com/v2ray/v2ray-core
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
175 lines
2.8 KiB
175 lines
2.8 KiB
package pipe |
|
|
|
import ( |
|
"errors" |
|
"io" |
|
"sync" |
|
"time" |
|
|
|
"v2ray.com/core/common" |
|
"v2ray.com/core/common/buf" |
|
"v2ray.com/core/common/signal" |
|
"v2ray.com/core/common/signal/done" |
|
) |
|
|
|
type state byte |
|
|
|
const ( |
|
open state = iota |
|
closed |
|
errord |
|
) |
|
|
|
type pipe struct { |
|
sync.Mutex |
|
data buf.MultiBuffer |
|
readSignal *signal.Notifier |
|
writeSignal *signal.Notifier |
|
done *done.Instance |
|
limit int32 |
|
state state |
|
discardOverflow bool |
|
} |
|
|
|
var errBufferFull = errors.New("buffer full") |
|
|
|
func (p *pipe) getState(forRead bool) error { |
|
switch p.state { |
|
case open: |
|
if !forRead && p.limit >= 0 && p.data.Len() > p.limit { |
|
return errBufferFull |
|
} |
|
return nil |
|
case closed: |
|
if !forRead { |
|
return io.ErrClosedPipe |
|
} |
|
if !p.data.IsEmpty() { |
|
return nil |
|
} |
|
return io.EOF |
|
case errord: |
|
return io.ErrClosedPipe |
|
default: |
|
panic("impossible case") |
|
} |
|
} |
|
|
|
func (p *pipe) readMultiBufferInternal() (buf.MultiBuffer, error) { |
|
p.Lock() |
|
defer p.Unlock() |
|
|
|
if err := p.getState(true); err != nil { |
|
return nil, err |
|
} |
|
|
|
data := p.data |
|
p.data = nil |
|
return data, nil |
|
} |
|
|
|
func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) { |
|
for { |
|
data, err := p.readMultiBufferInternal() |
|
if data != nil || err != nil { |
|
p.writeSignal.Signal() |
|
return data, err |
|
} |
|
|
|
select { |
|
case <-p.readSignal.Wait(): |
|
case <-p.done.Wait(): |
|
} |
|
} |
|
} |
|
|
|
func (p *pipe) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error) { |
|
timer := time.NewTimer(d) |
|
defer timer.Stop() |
|
|
|
for { |
|
data, err := p.readMultiBufferInternal() |
|
if data != nil || err != nil { |
|
p.writeSignal.Signal() |
|
return data, err |
|
} |
|
|
|
select { |
|
case <-p.readSignal.Wait(): |
|
case <-p.done.Wait(): |
|
case <-timer.C: |
|
return nil, buf.ErrReadTimeout |
|
} |
|
} |
|
} |
|
|
|
func (p *pipe) writeMultiBufferInternal(mb buf.MultiBuffer) error { |
|
p.Lock() |
|
defer p.Unlock() |
|
|
|
if err := p.getState(false); err != nil { |
|
return err |
|
} |
|
|
|
p.data.AppendMulti(mb) |
|
return nil |
|
} |
|
|
|
func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error { |
|
if mb.IsEmpty() { |
|
return nil |
|
} |
|
|
|
for { |
|
err := p.writeMultiBufferInternal(mb) |
|
switch { |
|
case err == nil: |
|
p.readSignal.Signal() |
|
return nil |
|
case err == errBufferFull && p.discardOverflow: |
|
mb.Release() |
|
return nil |
|
case err != errBufferFull: |
|
mb.Release() |
|
p.readSignal.Signal() |
|
return err |
|
} |
|
|
|
select { |
|
case <-p.writeSignal.Wait(): |
|
case <-p.done.Wait(): |
|
return io.ErrClosedPipe |
|
} |
|
} |
|
} |
|
|
|
func (p *pipe) Close() error { |
|
p.Lock() |
|
defer p.Unlock() |
|
|
|
if p.state == closed || p.state == errord { |
|
return nil |
|
} |
|
|
|
p.state = closed |
|
common.Must(p.done.Close()) |
|
return nil |
|
} |
|
|
|
func (p *pipe) CloseError() { |
|
p.Lock() |
|
defer p.Unlock() |
|
|
|
if p.state == closed || p.state == errord { |
|
return |
|
} |
|
|
|
p.state = errord |
|
|
|
if !p.data.IsEmpty() { |
|
p.data.Release() |
|
p.data = nil |
|
} |
|
|
|
common.Must(p.done.Close()) |
|
}
|
|
|