diff --git a/common/protocol/raw/server.go b/common/protocol/raw/server.go index 78c8ae1b..7449030e 100644 --- a/common/protocol/raw/server.go +++ b/common/protocol/raw/server.go @@ -48,8 +48,8 @@ func (this *ServerSession) DecodeRequestHeader(reader io.Reader) (*protocol.Requ _, err := io.ReadFull(reader, buffer.Value[:protocol.IDBytesLen]) if err != nil { - log.Error("Raw: Failed to read request header: ", err) - return nil, err + log.Info("Raw: Failed to read request header: ", err) + return nil, io.EOF } user, timestamp, valid := this.userValidator.Get(buffer.Value[:protocol.IDBytesLen]) @@ -77,7 +77,7 @@ func (this *ServerSession) DecodeRequestHeader(reader io.Reader) (*protocol.Requ } if request.Version != Version { - log.Warning("Raw: Invalid protocol version ", request.Version) + log.Info("Raw: Invalid protocol version ", request.Version) return nil, protocol.ErrorInvalidVersion } diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index c697e6cc..f5710e48 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -15,7 +15,7 @@ import ( "github.com/v2ray/v2ray-core/common/retry" "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" - "github.com/v2ray/v2ray-core/transport/dialer" + "github.com/v2ray/v2ray-core/transport/hub" "github.com/v2ray/v2ray-core/transport/ray" ) @@ -77,7 +77,7 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload * destination = this.ResolveIP(destination) } err := retry.Timed(5, 100).On(func() error { - rawConn, err := dialer.Dial(destination) + rawConn, err := hub.DialWithoutCache(destination) if err != nil { return err } diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 6a708403..335841b6 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -1,6 +1,7 @@ package inbound import ( + "io" "sync" "github.com/v2ray/v2ray-core/app" @@ -124,7 +125,7 @@ func (this *VMessInboundHandler) Listen(address v2net.Address, port v2net.Port) func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { defer connection.Close() - connReader := v2net.NewTimeOutReader(16, connection) + connReader := v2net.NewTimeOutReader(8, connection) defer connReader.Release() reader := v2io.NewBufferedReader(connReader) @@ -135,13 +136,19 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { request, err := session.DecodeRequestHeader(reader) if err != nil { - log.Access(connection.RemoteAddr(), "", log.AccessRejected, err) - log.Warning("VMessIn: Invalid request from ", connection.RemoteAddr(), ": ", err) + if err != io.EOF { + log.Access(connection.RemoteAddr(), "", log.AccessRejected, err) + log.Warning("VMessIn: Invalid request from ", connection.RemoteAddr(), ": ", err) + } return } log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "") log.Debug("VMessIn: Received request for ", request.Destination()) + if request.Option.IsChunkStream() { + connection.SetReusable(true) + } + ray := this.packetDispatcher.DispatchToOutbound(request.Destination()) input := ray.InboundInput() output := ray.InboundOutput() diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 2134874d..60870d9d 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -14,7 +14,7 @@ import ( "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io" - "github.com/v2ray/v2ray-core/transport/dialer" + "github.com/v2ray/v2ray-core/transport/hub" "github.com/v2ray/v2ray-core/transport/ray" ) @@ -41,7 +41,7 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al Option: protocol.RequestOptionChunkStream, } - conn, err := dialer.Dial(destination) + conn, err := hub.Dial(destination) if err != nil { log.Error("Failed to open ", destination, ": ", err) return err @@ -49,6 +49,9 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al log.Info("VMessOut: Tunneling request to ", request.Address, " via ", destination) defer conn.Close() + if request.Option.IsChunkStream() { + conn.SetReusable(true) + } input := ray.OutboundInput() output := ray.OutboundOutput() diff --git a/testing/scenarios/data/test_4_client.json b/testing/scenarios/data/test_4_client.json index 0c4801a7..70598ab0 100644 --- a/testing/scenarios/data/test_4_client.json +++ b/testing/scenarios/data/test_4_client.json @@ -1,6 +1,9 @@ { "port": 50030, "listen": "127.0.0.1", + "log": { + "loglevel": "debug" + }, "inbound": { "protocol": "dokodemo-door", "settings": { diff --git a/testing/scenarios/data/test_4_server.json b/testing/scenarios/data/test_4_server.json index 44891d0e..e723277c 100644 --- a/testing/scenarios/data/test_4_server.json +++ b/testing/scenarios/data/test_4_server.json @@ -2,7 +2,7 @@ "port": 50031, "listen": "127.0.0.1", "log": { - "loglevel": "warning" + "loglevel": "debug" }, "inbound": { "protocol": "vmess", diff --git a/transport/dialer/dialer.go b/transport/dialer/dialer.go deleted file mode 100644 index c0f46031..00000000 --- a/transport/dialer/dialer.go +++ /dev/null @@ -1,40 +0,0 @@ -package dialer - -import ( - "errors" - "net" - "time" - - v2net "github.com/v2ray/v2ray-core/common/net" -) - -var ( - ErrorInvalidHost = errors.New("Invalid Host.") -) - -func Dial(dest v2net.Destination) (net.Conn, error) { - if dest.Address().IsDomain() { - dialer := &net.Dialer{ - Timeout: time.Second * 60, - DualStack: true, - } - network := "tcp" - if dest.IsUDP() { - network = "udp" - } - return dialer.Dial(network, dest.NetAddr()) - } - - ip := dest.Address().IP() - if dest.IsTCP() { - return net.DialTCP("tcp", nil, &net.TCPAddr{ - IP: ip, - Port: int(dest.Port()), - }) - } else { - return net.DialUDP("udp", nil, &net.UDPAddr{ - IP: ip, - Port: int(dest.Port()), - }) - } -} diff --git a/transport/hub/connection.go b/transport/hub/connection.go index 0bfc11ad..47283cf4 100644 --- a/transport/hub/connection.go +++ b/transport/hub/connection.go @@ -7,9 +7,14 @@ import ( type ConnectionHandler func(*Connection) +type ConnectionManager interface { + Recycle(string, net.Conn) +} + type Connection struct { + dest string conn net.Conn - listener *TCPHub + listener ConnectionManager reusable bool } @@ -33,22 +38,12 @@ func (this *Connection) Close() error { return ErrorClosedConnection } if this.Reusable() { - this.listener.Recycle(this.conn) + this.listener.Recycle(this.dest, this.conn) return nil } return this.conn.Close() } -func (this *Connection) Release() { - if this == nil || this.listener == nil { - return - } - - this.Close() - this.conn = nil - this.listener = nil -} - func (this *Connection) LocalAddr() net.Addr { return this.conn.LocalAddr() } diff --git a/transport/hub/connection_cache.go b/transport/hub/connection_cache.go new file mode 100644 index 00000000..76524a20 --- /dev/null +++ b/transport/hub/connection_cache.go @@ -0,0 +1,107 @@ +package hub + +import ( + "net" + "sync" + "time" +) + +type AwaitingConnection struct { + conn net.Conn + expire time.Time +} + +func (this *AwaitingConnection) Expired() bool { + return this.expire.Before(time.Now()) +} + +type ConnectionCache struct { + sync.Mutex + cache map[string][]*AwaitingConnection +} + +func NewConnectionCache() *ConnectionCache { + c := &ConnectionCache{ + cache: make(map[string][]*AwaitingConnection), + } + go c.Cleanup() + return c +} + +func (this *ConnectionCache) Cleanup() { + for { + time.Sleep(time.Second * 4) + this.Lock() + for key, value := range this.cache { + size := len(value) + changed := false + for i := 0; i < size; { + if value[i].Expired() { + value[i].conn.Close() + value[i] = value[size-1] + size-- + changed = true + } else { + i++ + } + } + if changed { + for i := size; i < len(value); i++ { + value[i] = nil + } + value = value[:size] + this.cache[key] = value + } + } + this.Unlock() + } +} + +func (this *ConnectionCache) Recycle(dest string, conn net.Conn) { + this.Lock() + defer this.Unlock() + + aconn := &AwaitingConnection{ + conn: conn, + expire: time.Now().Add(time.Second * 4), + } + + var list []*AwaitingConnection + if v, found := this.cache[dest]; found { + v = append(v, aconn) + list = v + } else { + list = []*AwaitingConnection{aconn} + } + this.cache[dest] = list +} + +func FindFirstValid(list []*AwaitingConnection) int { + for idx, conn := range list { + if !conn.Expired() { + return idx + } + conn.conn.Close() + } + return -1 +} + +func (this *ConnectionCache) Get(dest string) net.Conn { + this.Lock() + defer this.Unlock() + + list, found := this.cache[dest] + if !found { + return nil + } + + firstValid := FindFirstValid(list) + if firstValid == -1 { + delete(this.cache, dest) + return nil + } + res := list[firstValid].conn + list = list[firstValid+1:] + this.cache[dest] = list + return res +} diff --git a/transport/hub/dialer.go b/transport/hub/dialer.go new file mode 100644 index 00000000..c841029d --- /dev/null +++ b/transport/hub/dialer.go @@ -0,0 +1,63 @@ +package hub + +import ( + "errors" + "net" + "time" + + "github.com/v2ray/v2ray-core/common/log" + v2net "github.com/v2ray/v2ray-core/common/net" +) + +var ( + ErrorInvalidHost = errors.New("Invalid Host.") + + globalCache = NewConnectionCache() +) + +func Dial(dest v2net.Destination) (*Connection, error) { + destStr := dest.String() + conn := globalCache.Get(destStr) + if conn == nil { + var err error + log.Debug("Hub: Dialling new connection to ", dest) + conn, err = DialWithoutCache(dest) + if err != nil { + return nil, err + } + } else { + log.Debug("Hub: Reusing connection to ", dest) + } + return &Connection{ + dest: destStr, + conn: conn, + listener: globalCache, + }, nil +} + +func DialWithoutCache(dest v2net.Destination) (net.Conn, error) { + if dest.Address().IsDomain() { + dialer := &net.Dialer{ + Timeout: time.Second * 60, + DualStack: true, + } + network := "tcp" + if dest.IsUDP() { + network = "udp" + } + return dialer.Dial(network, dest.NetAddr()) + } + + ip := dest.Address().IP() + if dest.IsTCP() { + return net.DialTCP("tcp", nil, &net.TCPAddr{ + IP: ip, + Port: int(dest.Port()), + }) + } + + return net.DialUDP("udp", nil, &net.UDPAddr{ + IP: ip, + Port: int(dest.Port()), + }) +} diff --git a/transport/dialer/dialer_test.go b/transport/hub/dialer_test.go similarity index 90% rename from transport/dialer/dialer_test.go rename to transport/hub/dialer_test.go index 66ef3cf3..cce772a0 100644 --- a/transport/dialer/dialer_test.go +++ b/transport/hub/dialer_test.go @@ -1,4 +1,4 @@ -package dialer_test +package hub_test import ( "testing" @@ -7,7 +7,7 @@ import ( v2nettesting "github.com/v2ray/v2ray-core/common/net/testing" "github.com/v2ray/v2ray-core/testing/assert" "github.com/v2ray/v2ray-core/testing/servers/tcp" - . "github.com/v2ray/v2ray-core/transport/dialer" + . "github.com/v2ray/v2ray-core/transport/hub" ) func TestDialDomain(t *testing.T) { diff --git a/transport/hub/tcp.go b/transport/hub/tcp.go index 778fe55e..49b59d00 100644 --- a/transport/hub/tcp.go +++ b/transport/hub/tcp.go @@ -49,24 +49,14 @@ func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandle } func (this *TCPHub) Close() { - this.Lock() - defer this.Unlock() - this.accepting = false this.listener.Close() - this.listener = nil } func (this *TCPHub) start() { this.accepting = true for this.accepting { - this.Lock() - if !this.accepting { - this.Unlock() - break - } conn, err := this.listener.Accept() - this.Unlock() if err != nil { if this.accepting { @@ -75,6 +65,7 @@ func (this *TCPHub) start() { continue } go this.connCallback(&Connection{ + dest: conn.RemoteAddr().String(), conn: conn, listener: this, }) @@ -82,9 +73,10 @@ func (this *TCPHub) start() { } // @Private -func (this *TCPHub) Recycle(conn net.Conn) { +func (this *TCPHub) Recycle(dest string, conn net.Conn) { if this.accepting { go this.connCallback(&Connection{ + dest: dest, conn: conn, listener: this, })