diff --git a/common/mux/client.go b/common/mux/client.go index 5cb58191..7940c1aa 100644 --- a/common/mux/client.go +++ b/common/mux/client.go @@ -35,7 +35,7 @@ func (m *ClientManager) Dispatch(ctx context.Context, link *transport.Link) erro } } - return newError("unable to find an available mux client") + return newError("unable to find an available mux client").AtWarning() } type WorkerPicker interface { @@ -274,7 +274,7 @@ func (m *ClientWorker) IsClosing() bool { } func (m *ClientWorker) IsFull() bool { - if m.IsClosing() { + if m.IsClosing() || m.Closed() { return true } diff --git a/common/mux/client_test.go b/common/mux/client_test.go index 898ffcc2..b39b2c6c 100644 --- a/common/mux/client_test.go +++ b/common/mux/client_test.go @@ -9,6 +9,8 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/errors" "v2ray.com/core/common/mux" + "v2ray.com/core/common/net" + "v2ray.com/core/common/session" "v2ray.com/core/testing/mocks" "v2ray.com/core/transport" "v2ray.com/core/transport/pipe" @@ -45,3 +47,70 @@ func TestClientWorkerEOF(t *testing.T) { t.Error("expected failed dispatching, but actually not") } } + +func TestClientWorkerClose(t *testing.T) { + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + r1, w1 := pipe.New(pipe.WithoutSizeLimit()) + worker1, err := mux.NewClientWorker(transport.Link{ + Reader: r1, + Writer: w1, + }, mux.ClientStrategy{ + MaxConcurrency: 4, + MaxConnection: 4, + }) + common.Must(err) + + r2, w2 := pipe.New(pipe.WithoutSizeLimit()) + worker2, err := mux.NewClientWorker(transport.Link{ + Reader: r2, + Writer: w2, + }, mux.ClientStrategy{ + MaxConcurrency: 4, + MaxConnection: 4, + }) + common.Must(err) + + factory := mocks.NewMuxClientWorkerFactory(mockCtl) + gomock.InOrder( + factory.EXPECT().Create().Return(worker1, nil), + factory.EXPECT().Create().Return(worker2, nil), + ) + + picker := &mux.IncrementalWorkerPicker{ + Factory: factory, + } + manager := &mux.ClientManager{ + Picker: picker, + } + + tr1, tw1 := pipe.New(pipe.WithoutSizeLimit()) + ctx1 := session.ContextWithOutbound(context.Background(), &session.Outbound{ + Target: net.TCPDestination(net.DomainAddress("www.v2ray.com"), 80), + }) + common.Must(manager.Dispatch(ctx1, &transport.Link{ + Reader: tr1, + Writer: tw1, + })) + defer tw1.Close() + + common.Must(w1.Close()) + + time.Sleep(time.Millisecond * 500) + if !worker1.Closed() { + t.Error("worker1 is not finished") + } + + tr2, tw2 := pipe.New(pipe.WithoutSizeLimit()) + ctx2 := session.ContextWithOutbound(context.Background(), &session.Outbound{ + Target: net.TCPDestination(net.DomainAddress("www.v2ray.com"), 80), + }) + common.Must(manager.Dispatch(ctx2, &transport.Link{ + Reader: tr2, + Writer: tw2, + })) + defer tw2.Close() + + common.Must(w2.Close()) +}