From d5f17ab4fc7ef26602fba794b073da4484e59f44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A3=8E=E6=89=87=E6=BB=91=E7=BF=94=E7=BF=BC?= Date: Thu, 11 Dec 2025 11:59:22 +0000 Subject: [PATCH] Try to optimize pipe performance --- transport/pipe/impl.go | 45 +++++++++++++++++------------------------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/transport/pipe/impl.go b/transport/pipe/impl.go index e5d67827..81172906 100644 --- a/transport/pipe/impl.go +++ b/transport/pipe/impl.go @@ -3,7 +3,6 @@ package pipe import ( "errors" "io" - "runtime" "sync" "time" @@ -136,11 +135,10 @@ func (p *pipe) writeMultiBufferInternal(mb buf.MultiBuffer) error { if p.data == nil { p.data = mb - return nil + } else { + p.data, _ = buf.MergeMulti(p.data, mb) } - - p.data, _ = buf.MergeMulti(p.data, mb) - return errSlowDown + return nil } func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error { @@ -155,30 +153,23 @@ func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error { return nil } - if err == errSlowDown { - p.readSignal.Signal() - - // Yield current goroutine. Hopefully the reading counterpart can pick up the payload. - runtime.Gosched() - return nil + if err == errBufferFull { + if p.option.discardOverflow { + buf.ReleaseMulti(mb) + return nil + } + select { + case <-p.writeSignal.Wait(): + continue + case <-p.done.Wait(): + buf.ReleaseMulti(mb) + return io.ErrClosedPipe + } } - if err == errBufferFull && p.option.discardOverflow { - buf.ReleaseMulti(mb) - return nil - } - - if err != errBufferFull { - buf.ReleaseMulti(mb) - p.readSignal.Signal() - return err - } - - select { - case <-p.writeSignal.Wait(): - case <-p.done.Wait(): - return io.ErrClosedPipe - } + buf.ReleaseMulti(mb) + p.readSignal.Signal() + return err } }