move pipe option into dedicated struct

pull/1435/head
Darien Raymond 2018-11-14 12:31:59 +01:00
parent 8c12fb6ff1
commit e8faa7d4e3
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
2 changed files with 28 additions and 18 deletions

View File

@ -21,15 +21,23 @@ const (
errord errord
) )
type pipeOption struct {
limit int32 // maximum buffer size in bytes
discardOverflow bool
}
func (o *pipeOption) isFull(curSize int32) bool {
return o.limit >= 0 && curSize > o.limit
}
type pipe struct { type pipe struct {
sync.Mutex sync.Mutex
data buf.MultiBuffer data buf.MultiBuffer
readSignal *signal.Notifier readSignal *signal.Notifier
writeSignal *signal.Notifier writeSignal *signal.Notifier
done *done.Instance done *done.Instance
limit int32 option pipeOption
state state state state
discardOverflow bool
} }
var errBufferFull = errors.New("buffer full") var errBufferFull = errors.New("buffer full")
@ -37,7 +45,7 @@ var errBufferFull = errors.New("buffer full")
func (p *pipe) getState(forRead bool) error { func (p *pipe) getState(forRead bool) error {
switch p.state { switch p.state {
case open: case open:
if !forRead && p.limit >= 0 && p.data.Len() > p.limit { if !forRead && p.option.isFull(p.data.Len()) {
return errBufferFull return errBufferFull
} }
return nil return nil
@ -130,7 +138,7 @@ func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
// Yield current goroutine. Hopefully the reading counterpart can pick up the payload. // Yield current goroutine. Hopefully the reading counterpart can pick up the payload.
runtime.Gosched() runtime.Gosched()
return nil return nil
case err == errBufferFull && p.discardOverflow: case err == errBufferFull && p.option.discardOverflow:
mb.Release() mb.Release()
return nil return nil
case err != errBufferFull: case err != errBufferFull:

View File

@ -9,26 +9,26 @@ import (
) )
// Option for creating new Pipes. // Option for creating new Pipes.
type Option func(*pipe) type Option func(*pipeOption)
// WithoutSizeLimit returns an Option for Pipe to have no size limit. // WithoutSizeLimit returns an Option for Pipe to have no size limit.
func WithoutSizeLimit() Option { func WithoutSizeLimit() Option {
return func(p *pipe) { return func(opt *pipeOption) {
p.limit = -1 opt.limit = -1
} }
} }
// WithSizeLimit returns an Option for Pipe to have the given size limit. // WithSizeLimit returns an Option for Pipe to have the given size limit.
func WithSizeLimit(limit int32) Option { func WithSizeLimit(limit int32) Option {
return func(p *pipe) { return func(opt *pipeOption) {
p.limit = limit opt.limit = limit
} }
} }
// DiscardOverflow returns an Option for Pipe to discard writes if full. // DiscardOverflow returns an Option for Pipe to discard writes if full.
func DiscardOverflow() Option { func DiscardOverflow() Option {
return func(p *pipe) { return func(opt *pipeOption) {
p.discardOverflow = true opt.discardOverflow = true
} }
} }
@ -49,14 +49,16 @@ func OptionsFromContext(ctx context.Context) []Option {
// New creates a new Reader and Writer that connects to each other. // New creates a new Reader and Writer that connects to each other.
func New(opts ...Option) (*Reader, *Writer) { func New(opts ...Option) (*Reader, *Writer) {
p := &pipe{ p := &pipe{
limit: -1,
readSignal: signal.NewNotifier(), readSignal: signal.NewNotifier(),
writeSignal: signal.NewNotifier(), writeSignal: signal.NewNotifier(),
done: done.New(), done: done.New(),
option: pipeOption{
limit: -1,
},
} }
for _, opt := range opts { for _, opt := range opts {
opt(p) opt(&(p.option))
} }
return &Reader{ return &Reader{