diff --git a/app/proxyman/inbound/worker.go b/app/proxyman/inbound/worker.go index 5fee1cf9..647c8089 100644 --- a/app/proxyman/inbound/worker.go +++ b/app/proxyman/inbound/worker.go @@ -2,7 +2,6 @@ package inbound import ( "context" - "io" "sync" "sync/atomic" "time" @@ -20,6 +19,7 @@ import ( "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/tcp" "v2ray.com/core/transport/internet/udp" + "v2ray.com/core/transport/pipe" ) type worker interface { @@ -121,7 +121,8 @@ func (w *tcpWorker) Port() net.Port { type udpConn struct { lastActivityTime int64 // in seconds - input chan *buf.Buffer + reader buf.Reader + writer buf.Writer output func([]byte) (int, error) remote net.Addr local net.Addr @@ -136,52 +137,21 @@ func (c *udpConn) updateActivity() { // ReadMultiBuffer implements buf.Reader func (c *udpConn) ReadMultiBuffer() (buf.MultiBuffer, error) { - var payload buf.MultiBuffer - - select { - case in := <-c.input: - payload.Append(in) - default: - select { - case in := <-c.input: - payload.Append(in) - case <-c.done.Wait(): - return nil, io.EOF - } - } - -L: - for { - select { - case in := <-c.input: - payload.Append(in) - default: - break L - } + mb, err := c.reader.ReadMultiBuffer() + if err != nil { + return nil, err } - c.updateActivity() if c.uplink != nil { - c.uplink.Add(int64(payload.Len())) + c.uplink.Add(int64(mb.Len())) } - return payload, nil + return mb, nil } func (c *udpConn) Read(buf []byte) (int, error) { - select { - case in := <-c.input: - defer in.Release() - c.updateActivity() - nBytes := copy(buf, in.Bytes()) - if c.uplink != nil { - c.uplink.Add(int64(nBytes)) - } - return nBytes, nil - case <-c.done.Wait(): - return 0, io.EOF - } + panic("not implemented") } // Write implements io.Writer. @@ -198,6 +168,7 @@ func (c *udpConn) Write(buf []byte) (int, error) { func (c *udpConn) Close() error { common.Must(c.done.Close()) + common.Must(common.Close(c.writer)) return nil } @@ -251,8 +222,10 @@ func (w *udpWorker) getConnection(id connID) (*udpConn, bool) { return conn, true } + pReader, pWriter := pipe.New(pipe.DiscardOverflow(), pipe.WithSizeLimit(16*1024)) conn := &udpConn{ - input: make(chan *buf.Buffer, 32), + reader: pReader, + writer: pWriter, output: func(b []byte) (int, error) { return w.hub.WriteTo(b, id.src) }, @@ -282,13 +255,9 @@ func (w *udpWorker) callback(b *buf.Buffer, source net.Destination, originalDest id.dest = originalDest } conn, existing := w.getConnection(id) - select { - case conn.input <- b: - case <-conn.done.Wait(): - b.Release() - default: - b.Release() - } + + // payload will be discarded in pipe is full. + conn.writer.WriteMultiBuffer(buf.NewMultiBufferValue(b)) // nolint: errcheck if !existing { common.Must(w.checker.Start()) diff --git a/transport/pipe/impl.go b/transport/pipe/impl.go index 1d1035e6..be69dd53 100644 --- a/transport/pipe/impl.go +++ b/transport/pipe/impl.go @@ -22,12 +22,13 @@ const ( type pipe struct { sync.Mutex - data buf.MultiBuffer - readSignal *signal.Notifier - writeSignal *signal.Notifier - done *done.Instance - limit int32 - state state + data buf.MultiBuffer + readSignal *signal.Notifier + writeSignal *signal.Notifier + done *done.Instance + limit int32 + state state + discardOverflow bool } var errBufferFull = errors.New("buffer full") @@ -121,10 +122,14 @@ func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error { for { err := p.writeMultiBufferInternal(mb) - if err == nil { + switch { + case err == nil: p.readSignal.Signal() return nil - } else if err != errBufferFull { + case err == errBufferFull && p.discardOverflow: + mb.Release() + return nil + case err != errBufferFull: mb.Release() p.readSignal.Signal() return err diff --git a/transport/pipe/pipe.go b/transport/pipe/pipe.go index 7de6c665..a09ce7cc 100644 --- a/transport/pipe/pipe.go +++ b/transport/pipe/pipe.go @@ -11,18 +11,28 @@ import ( // Option for creating new Pipes. type Option func(*pipe) +// WithoutSizeLimit returns an Option for Pipe to have no size limit. func WithoutSizeLimit() Option { return func(p *pipe) { p.limit = -1 } } +// WithSizeLimit returns an Option for Pipe to have the given size limit. func WithSizeLimit(limit int32) Option { return func(p *pipe) { p.limit = limit } } +// DiscardOverflow returns an Option for Pipe to discard writes if full. +func DiscardOverflow() Option { + return func(p *pipe) { + p.discardOverflow = true + } +} + +// OptionsFromContext returns a list of Options from context. func OptionsFromContext(ctx context.Context) []Option { var opt []Option