diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 1917b7f8..7699996b 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -11,6 +11,7 @@ import ( v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/common/protocol" "github.com/v2ray/v2ray-core/common/protocol/raw" + "github.com/v2ray/v2ray-core/common/retry" "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io" @@ -28,7 +29,24 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al defer ray.OutboundInput().Release() defer ray.OutboundOutput().Close() - destination, vNextUser := this.receiverManager.PickReceiver() + var rec *Receiver + var conn *hub.Connection + + err := retry.Timed(5, 100).On(func() error { + rec = this.receiverManager.PickReceiver() + rawConn, err := hub.Dial(this.meta.Address, rec.Destination) + if err != nil { + return err + } + conn = rawConn + + return nil + }) + if err != nil { + log.Error("Failed to find an available destination:", err) + return err + } + log.Info("VMessOut: Tunneling request to ", target, " via ", rec.Destination) command := protocol.RequestCommandTCP if target.IsUDP() { @@ -36,20 +54,13 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al } request := &protocol.RequestHeader{ Version: raw.Version, - User: vNextUser, + User: rec.PickUser(), Command: command, Address: target.Address(), Port: target.Port(), Option: protocol.RequestOptionChunkStream, } - conn, err := hub.Dial(this.meta.Address, destination) - if err != nil { - log.Error("Failed to open ", destination, ": ", err) - return err - } - log.Info("VMessOut: Tunneling request to ", request.Address, " via ", destination) - defer conn.Close() if transport.IsConnectionReusable() { @@ -67,7 +78,7 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al session := raw.NewClientSession(protocol.DefaultIDHash) go this.handleRequest(session, conn, request, payload, input, &requestFinish) - go this.handleResponse(session, conn, request, destination, output, &responseFinish) + go this.handleResponse(session, conn, request, rec.Destination, output, &responseFinish) requestFinish.Lock() responseFinish.Lock() @@ -86,7 +97,9 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn if request.Option.Has(protocol.RequestOptionChunkStream) { streamWriter = vmessio.NewAuthChunkWriter(streamWriter) } - streamWriter.Write(payload) + if err := streamWriter.Write(payload); err != nil { + conn.SetReusable(false) + } writer.SetCached(false) err := v2io.Pipe(input, streamWriter) @@ -112,6 +125,7 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con header, err := session.DecodeResponseHeader(reader) if err != nil { + conn.SetReusable(false) log.Warning("VMessOut: Failed to read response: ", err) return } diff --git a/proxy/vmess/outbound/receiver.go b/proxy/vmess/outbound/receiver.go index 78a960b6..d4689944 100644 --- a/proxy/vmess/outbound/receiver.go +++ b/proxy/vmess/outbound/receiver.go @@ -127,12 +127,10 @@ func (this *ReceiverManager) pickStdReceiver() *Receiver { return this.receivers[dice.Roll(len(this.receivers))] } -func (this *ReceiverManager) PickReceiver() (v2net.Destination, *protocol.User) { +func (this *ReceiverManager) PickReceiver() *Receiver { rec := this.pickDetour() if rec == nil { rec = this.pickStdReceiver() } - user := rec.PickUser() - - return rec.Destination, user + return rec }