diff --git a/transport/internet/udp/hub.go b/transport/internet/udp/hub.go index d81e6392..628549ef 100644 --- a/transport/internet/udp/hub.go +++ b/transport/internet/udp/hub.go @@ -1,14 +1,13 @@ package udp import ( + "context" "net" - "sync" "v2ray.com/core/app/log" "v2ray.com/core/common/buf" "v2ray.com/core/common/dice" v2net "v2ray.com/core/common/net" - "v2ray.com/core/common/signal" "v2ray.com/core/transport/internet/internal" ) @@ -76,9 +75,8 @@ type ListenOption struct { } type Hub struct { - sync.RWMutex conn *net.UDPConn - cancel *signal.CancelSignal + cancel context.CancelFunc queue *PayloadQueue option ListenOption } @@ -107,24 +105,20 @@ func ListenUDP(address v2net.Address, port v2net.Port, option ListenOption) (*Hu return nil, err } } + ctx, cancel := context.WithCancel(context.Background()) hub := &Hub{ conn: udpConn, queue: NewPayloadQueue(option), option: option, - cancel: signal.NewCloseSignal(), + cancel: cancel, } - go hub.start() + go hub.start(ctx) return hub, nil } func (v *Hub) Close() { - v.Lock() - defer v.Unlock() - - v.cancel.Cancel() + v.cancel() v.conn.Close() - v.cancel.WaitForDone() - v.queue.Close() } func (v *Hub) WriteTo(payload []byte, dest v2net.Destination) (int, error) { @@ -134,12 +128,9 @@ func (v *Hub) WriteTo(payload []byte, dest v2net.Destination) (int, error) { }) } -func (v *Hub) start() { - v.cancel.WaitThread() - defer v.cancel.FinishThread() - +func (v *Hub) start(ctx context.Context) { oobBytes := make([]byte, 256) - for v.Running() { + for range ctx.Done() { buffer := buf.NewSmall() var noob int var addr *net.UDPAddr @@ -165,13 +156,10 @@ func (v *Hub) start() { } v.queue.Enqueue(payload) } + v.queue.Close() } -func (v *Hub) Running() bool { - return !v.cancel.Cancelled() -} - -// Connection return the net.Conn underneath this hub. +// Connection returns the net.Conn underneath this hub. // Private: Visible for testing only func (v *Hub) Connection() net.Conn { return v.conn