From e8faa7d4e37998aa3993ff08e8938983bf93eef0 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Wed, 14 Nov 2018 12:31:59 +0100 Subject: [PATCH] move pipe option into dedicated struct --- transport/pipe/impl.go | 26 +++++++++++++++++--------- transport/pipe/pipe.go | 20 +++++++++++--------- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/transport/pipe/impl.go b/transport/pipe/impl.go index 1553e252..1484506a 100644 --- a/transport/pipe/impl.go +++ b/transport/pipe/impl.go @@ -21,15 +21,23 @@ const ( errord ) +type pipeOption struct { + limit int32 // maximum buffer size in bytes + discardOverflow bool +} + +func (o *pipeOption) isFull(curSize int32) bool { + return o.limit >= 0 && curSize > o.limit +} + type pipe struct { sync.Mutex - data buf.MultiBuffer - readSignal *signal.Notifier - writeSignal *signal.Notifier - done *done.Instance - limit int32 - state state - discardOverflow bool + data buf.MultiBuffer + readSignal *signal.Notifier + writeSignal *signal.Notifier + done *done.Instance + option pipeOption + state state } var errBufferFull = errors.New("buffer full") @@ -37,7 +45,7 @@ var errBufferFull = errors.New("buffer full") func (p *pipe) getState(forRead bool) error { switch p.state { case open: - if !forRead && p.limit >= 0 && p.data.Len() > p.limit { + if !forRead && p.option.isFull(p.data.Len()) { return errBufferFull } return nil @@ -130,7 +138,7 @@ func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error { // Yield current goroutine. Hopefully the reading counterpart can pick up the payload. runtime.Gosched() return nil - case err == errBufferFull && p.discardOverflow: + case err == errBufferFull && p.option.discardOverflow: mb.Release() return nil case err != errBufferFull: diff --git a/transport/pipe/pipe.go b/transport/pipe/pipe.go index 4ad251ca..9641436d 100644 --- a/transport/pipe/pipe.go +++ b/transport/pipe/pipe.go @@ -9,26 +9,26 @@ import ( ) // Option for creating new Pipes. -type Option func(*pipe) +type Option func(*pipeOption) // WithoutSizeLimit returns an Option for Pipe to have no size limit. func WithoutSizeLimit() Option { - return func(p *pipe) { - p.limit = -1 + return func(opt *pipeOption) { + opt.limit = -1 } } // WithSizeLimit returns an Option for Pipe to have the given size limit. func WithSizeLimit(limit int32) Option { - return func(p *pipe) { - p.limit = limit + return func(opt *pipeOption) { + opt.limit = limit } } // DiscardOverflow returns an Option for Pipe to discard writes if full. func DiscardOverflow() Option { - return func(p *pipe) { - p.discardOverflow = true + return func(opt *pipeOption) { + opt.discardOverflow = true } } @@ -49,14 +49,16 @@ func OptionsFromContext(ctx context.Context) []Option { // New creates a new Reader and Writer that connects to each other. func New(opts ...Option) (*Reader, *Writer) { p := &pipe{ - limit: -1, readSignal: signal.NewNotifier(), writeSignal: signal.NewNotifier(), done: done.New(), + option: pipeOption{ + limit: -1, + }, } for _, opt := range opts { - opt(p) + opt(&(p.option)) } return &Reader{