From a9d583b92f7c115ab4c516398cd402e19dbe4201 Mon Sep 17 00:00:00 2001 From: v2ray Date: Sun, 14 Aug 2016 17:08:01 +0200 Subject: [PATCH] connection session --- app/dispatcher/dispatcher.go | 3 +-- app/dispatcher/impl/default.go | 3 ++- app/dispatcher/testing/dispatcher.go | 6 +++--- app/dns/nameserver.go | 2 +- common/net/destination.go | 12 ++++++++++++ proxy/dokodemo/dokodemo.go | 9 +++++++-- proxy/http/server.go | 16 ++++++++++------ proxy/proxy.go | 7 +++++++ proxy/shadowsocks/server.go | 8 ++++++-- proxy/socks/server.go | 23 ++++++++++++++++------- proxy/socks/server_udp.go | 3 ++- proxy/testing/mocks/inboundhandler.go | 5 ++++- proxy/vmess/inbound/inbound.go | 6 +++++- transport/internet/udp/udp_server.go | 8 ++++++-- 14 files changed, 82 insertions(+), 29 deletions(-) diff --git a/app/dispatcher/dispatcher.go b/app/dispatcher/dispatcher.go index 0447b6a3..7af4c2f4 100644 --- a/app/dispatcher/dispatcher.go +++ b/app/dispatcher/dispatcher.go @@ -2,7 +2,6 @@ package dispatcher import ( "github.com/v2ray/v2ray-core/app" - v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/transport/ray" ) @@ -13,5 +12,5 @@ const ( // PacketDispatcher dispatch a packet and possibly further network payload to its destination. type PacketDispatcher interface { - DispatchToOutbound(meta *proxy.InboundHandlerMeta, destination v2net.Destination) ray.InboundRay + DispatchToOutbound(meta *proxy.InboundHandlerMeta, session *proxy.SessionInfo) ray.InboundRay } diff --git a/app/dispatcher/impl/default.go b/app/dispatcher/impl/default.go index cc823d5d..5d601b20 100644 --- a/app/dispatcher/impl/default.go +++ b/app/dispatcher/impl/default.go @@ -43,9 +43,10 @@ func (this *DefaultDispatcher) Release() { } -func (this *DefaultDispatcher) DispatchToOutbound(meta *proxy.InboundHandlerMeta, destination v2net.Destination) ray.InboundRay { +func (this *DefaultDispatcher) DispatchToOutbound(meta *proxy.InboundHandlerMeta, session *proxy.SessionInfo) ray.InboundRay { direct := ray.NewRay() dispatcher := this.ohm.GetDefaultHandler() + destination := session.Destination if this.router != nil { if tag, err := this.router.TakeDetour(destination); err == nil { diff --git a/app/dispatcher/testing/dispatcher.go b/app/dispatcher/testing/dispatcher.go index e9bf69cd..7ff42da7 100644 --- a/app/dispatcher/testing/dispatcher.go +++ b/app/dispatcher/testing/dispatcher.go @@ -30,10 +30,10 @@ func NewTestPacketDispatcher(handler func(destination v2net.Destination, traffic } } -func (this *TestPacketDispatcher) DispatchToOutbound(meta *proxy.InboundHandlerMeta, destination v2net.Destination) ray.InboundRay { +func (this *TestPacketDispatcher) DispatchToOutbound(meta *proxy.InboundHandlerMeta, session *proxy.SessionInfo) ray.InboundRay { traffic := ray.NewRay() - this.Destination <- destination - go this.Handler(destination, traffic) + this.Destination <- session.Destination + go this.Handler(session.Destination, traffic) return traffic } diff --git a/app/dns/nameserver.go b/app/dns/nameserver.go index 475f729d..9fbe2c8b 100644 --- a/app/dns/nameserver.go +++ b/app/dns/nameserver.go @@ -165,7 +165,7 @@ func (this *UDPNameServer) BuildQueryA(domain string, id uint16) *alloc.Buffer { } func (this *UDPNameServer) DispatchQuery(payload *alloc.Buffer) { - this.udpServer.Dispatch(pseudoDestination, this.address, payload, this.HandleResponse) + this.udpServer.Dispatch(&proxy.SessionInfo{Source: pseudoDestination, Destination: this.address}, payload, this.HandleResponse) } func (this *UDPNameServer) QueryA(domain string) <-chan *ARecord { diff --git a/common/net/destination.go b/common/net/destination.go index 0641bc7b..13adb8da 100644 --- a/common/net/destination.go +++ b/common/net/destination.go @@ -1,5 +1,9 @@ package net +import ( + "net" +) + // Destination represents a network destination including address and protocol (tcp / udp). type Destination interface { Network() Network // Protocol of communication (tcp / udp) @@ -13,11 +17,19 @@ type Destination interface { IsUDP() bool // True if destination is reachable via UDP } +func TCPDestinationFromAddr(addr *net.TCPAddr) Destination { + return TCPDestination(IPAddress(addr.IP), Port(addr.Port)) +} + // TCPDestination creates a TCP destination with given address func TCPDestination(address Address, port Port) Destination { return &tcpDestination{address: address, port: port} } +func UDPDestinationFromAddr(addr *net.UDPAddr) Destination { + return UDPDestination(IPAddress(addr.IP), Port(addr.Port)) +} + // UDPDestination creates a UDP destination with given address func UDPDestination(address Address, port Port) Destination { return &udpDestination{address: address, port: port} diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index db60fbf2..e39cf5b3 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -1,6 +1,7 @@ package dokodemo import ( + "net" "sync" "github.com/v2ray/v2ray-core/app" @@ -102,7 +103,8 @@ func (this *DokodemoDoor) ListenUDP() error { } func (this *DokodemoDoor) handleUDPPackets(payload *alloc.Buffer, dest v2net.Destination) { - this.udpServer.Dispatch(dest, v2net.UDPDestination(this.address, this.port), payload, this.handleUDPResponse) + this.udpServer.Dispatch( + &proxy.SessionInfo{Source: dest, Destination: v2net.UDPDestination(this.address, this.port)}, payload, this.handleUDPResponse) } func (this *DokodemoDoor) handleUDPResponse(dest v2net.Destination, payload *alloc.Buffer) { @@ -148,7 +150,10 @@ func (this *DokodemoDoor) HandleTCPConnection(conn internet.Connection) { } log.Info("Dokodemo: Handling request to ", dest) - ray := this.packetDispatcher.DispatchToOutbound(this.meta, dest) + ray := this.packetDispatcher.DispatchToOutbound(this.meta, &proxy.SessionInfo{ + Source: v2net.TCPDestinationFromAddr(conn.RemoteAddr().(*net.TCPAddr)), + Destination: dest, + }) defer ray.InboundOutput().Release() var inputFinish, outputFinish sync.Mutex diff --git a/proxy/http/server.go b/proxy/http/server.go index 008b092c..1b3699b1 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -119,14 +119,18 @@ func (this *Server) handleConnection(conn internet.Connection) { return } log.Access(conn.RemoteAddr(), request.URL, log.AccessAccepted, "") + session := &proxy.SessionInfo{ + Source: v2net.TCPDestinationFromAddr(conn.RemoteAddr().(*net.TCPAddr)), + Destination: dest, + } if strings.ToUpper(request.Method) == "CONNECT" { - this.handleConnect(request, dest, reader, conn) + this.handleConnect(request, session, reader, conn) } else { - this.handlePlainHTTP(request, dest, reader, conn) + this.handlePlainHTTP(request, session, reader, conn) } } -func (this *Server) handleConnect(request *http.Request, destination v2net.Destination, reader io.Reader, writer io.Writer) { +func (this *Server) handleConnect(request *http.Request, session *proxy.SessionInfo, reader io.Reader, writer io.Writer) { response := &http.Response{ Status: "200 OK", StatusCode: 200, @@ -140,7 +144,7 @@ func (this *Server) handleConnect(request *http.Request, destination v2net.Desti } response.Write(writer) - ray := this.packetDispatcher.DispatchToOutbound(this.meta, destination) + ray := this.packetDispatcher.DispatchToOutbound(this.meta, session) this.transport(reader, writer, ray) } @@ -209,7 +213,7 @@ func (this *Server) GenerateResponse(statusCode int, status string) *http.Respon } } -func (this *Server) handlePlainHTTP(request *http.Request, dest v2net.Destination, reader *bufio.Reader, writer io.Writer) { +func (this *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionInfo, reader *bufio.Reader, writer io.Writer) { if len(request.URL.Host) <= 0 { response := this.GenerateResponse(400, "Bad Request") response.Write(writer) @@ -220,7 +224,7 @@ func (this *Server) handlePlainHTTP(request *http.Request, dest v2net.Destinatio request.Host = request.URL.Host StripHopByHopHeaders(request) - ray := this.packetDispatcher.DispatchToOutbound(this.meta, dest) + ray := this.packetDispatcher.DispatchToOutbound(this.meta, session) defer ray.InboundInput().Close() defer ray.InboundOutput().Release() diff --git a/proxy/proxy.go b/proxy/proxy.go index 978a7a33..d6cce056 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -4,6 +4,7 @@ package proxy // import "github.com/v2ray/v2ray-core/proxy" import ( "github.com/v2ray/v2ray-core/common/alloc" v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/common/protocol" "github.com/v2ray/v2ray-core/transport/internet" "github.com/v2ray/v2ray-core/transport/ray" ) @@ -15,6 +16,12 @@ const ( HandlerStateRunning = HandlerState(1) ) +type SessionInfo struct { + Source v2net.Destination + Destination v2net.Destination + User *protocol.User +} + type InboundHandlerMeta struct { Tag string Address v2net.Address diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index d54ff250..2593a6b5 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -4,6 +4,7 @@ package shadowsocks import ( "crypto/rand" "io" + "net" "sync" "github.com/v2ray/v2ray-core/app" @@ -114,7 +115,7 @@ func (this *Server) handlerUDPPayload(payload *alloc.Buffer, source v2net.Destin log.Access(source, dest, log.AccessAccepted, "") log.Info("Shadowsocks: Tunnelling request to ", dest) - this.udpServer.Dispatch(source, dest, request.DetachUDPPayload(), func(destination v2net.Destination, payload *alloc.Buffer) { + this.udpServer.Dispatch(&proxy.SessionInfo{Source: source, Destination: dest}, request.DetachUDPPayload(), func(destination v2net.Destination, payload *alloc.Buffer) { defer payload.Release() response := alloc.NewBuffer().Slice(0, ivLen) @@ -204,7 +205,10 @@ func (this *Server) handleConnection(conn internet.Connection) { log.Access(conn.RemoteAddr(), dest, log.AccessAccepted, "") log.Info("Shadowsocks: Tunnelling request to ", dest) - ray := this.packetDispatcher.DispatchToOutbound(this.meta, dest) + ray := this.packetDispatcher.DispatchToOutbound(this.meta, &proxy.SessionInfo{ + Source: v2net.TCPDestinationFromAddr(conn.RemoteAddr().(*net.TCPAddr)), + Destination: dest, + }) defer ray.InboundOutput().Release() var writeFinish sync.Mutex diff --git a/proxy/socks/server.go b/proxy/socks/server.go index 4bb01d57..733dcbd1 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -3,6 +3,7 @@ package socks import ( "errors" "io" + "net" "sync" "time" @@ -119,7 +120,7 @@ func (this *Server) handleConnection(connection internet.Connection) { return } - clientAddr := connection.RemoteAddr().String() + clientAddr := v2net.TCPDestinationFromAddr(connection.RemoteAddr().(*net.TCPAddr)) if err != nil && err == protocol.Socks4Downgrade { this.handleSocks4(clientAddr, reader, writer, auth4) } else { @@ -127,7 +128,7 @@ func (this *Server) handleConnection(connection internet.Connection) { } } -func (this *Server) handleSocks5(clientAddr string, reader *v2io.BufferedReader, writer *v2io.BufferedWriter, auth protocol.Socks5AuthenticationRequest) error { +func (this *Server) handleSocks5(clientAddr v2net.Destination, reader *v2io.BufferedReader, writer *v2io.BufferedWriter, auth protocol.Socks5AuthenticationRequest) error { expectedAuthMethod := protocol.AuthNotRequired if this.config.AuthType == AuthTypePassword { expectedAuthMethod = protocol.AuthUserPass @@ -219,10 +220,14 @@ func (this *Server) handleSocks5(clientAddr string, reader *v2io.BufferedReader, writer.SetCached(false) dest := request.Destination() + session := &proxy.SessionInfo{ + Source: clientAddr, + Destination: dest, + } log.Info("Socks: TCP Connect request to ", dest) log.Access(clientAddr, dest, log.AccessAccepted, "") - this.transport(reader, writer, dest) + this.transport(reader, writer, session) return nil } @@ -258,7 +263,7 @@ func (this *Server) handleUDP(reader io.Reader, writer *v2io.BufferedWriter) err return nil } -func (this *Server) handleSocks4(clientAddr string, reader *v2io.BufferedReader, writer *v2io.BufferedWriter, auth protocol.Socks4AuthenticationRequest) error { +func (this *Server) handleSocks4(clientAddr v2net.Destination, reader *v2io.BufferedReader, writer *v2io.BufferedWriter, auth protocol.Socks4AuthenticationRequest) error { result := protocol.Socks4RequestGranted if auth.Command == protocol.CmdBind { result = protocol.Socks4RequestRejected @@ -277,13 +282,17 @@ func (this *Server) handleSocks4(clientAddr string, reader *v2io.BufferedReader, writer.SetCached(false) dest := v2net.TCPDestination(v2net.IPAddress(auth.IP[:]), auth.Port) + session := &proxy.SessionInfo{ + Source: clientAddr, + Destination: dest, + } log.Access(clientAddr, dest, log.AccessAccepted, "") - this.transport(reader, writer, dest) + this.transport(reader, writer, session) return nil } -func (this *Server) transport(reader io.Reader, writer io.Writer, destination v2net.Destination) { - ray := this.packetDispatcher.DispatchToOutbound(this.meta, destination) +func (this *Server) transport(reader io.Reader, writer io.Writer, session *proxy.SessionInfo) { + ray := this.packetDispatcher.DispatchToOutbound(this.meta, session) input := ray.InboundInput() output := ray.InboundOutput() diff --git a/proxy/socks/server_udp.go b/proxy/socks/server_udp.go index 78f1beef..d25eaf9d 100644 --- a/proxy/socks/server_udp.go +++ b/proxy/socks/server_udp.go @@ -4,6 +4,7 @@ import ( "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/socks/protocol" "github.com/v2ray/v2ray-core/transport/internet/udp" ) @@ -44,7 +45,7 @@ func (this *Server) handleUDPPayload(payload *alloc.Buffer, source v2net.Destina log.Info("Socks: Send packet to ", request.Destination(), " with ", request.Data.Len(), " bytes") log.Access(source, request.Destination, log.AccessAccepted, "") - this.udpServer.Dispatch(source, request.Destination(), request.Data, func(destination v2net.Destination, payload *alloc.Buffer) { + this.udpServer.Dispatch(&proxy.SessionInfo{Source: source, Destination: request.Destination()}, request.Data, func(destination v2net.Destination, payload *alloc.Buffer) { response := &protocol.Socks5UDPRequest{ Fragment: 0, Address: request.Destination().Address(), diff --git a/proxy/testing/mocks/inboundhandler.go b/proxy/testing/mocks/inboundhandler.go index e09da586..bdbc83b3 100644 --- a/proxy/testing/mocks/inboundhandler.go +++ b/proxy/testing/mocks/inboundhandler.go @@ -33,7 +33,10 @@ func (this *InboundConnectionHandler) Close() { func (this *InboundConnectionHandler) Communicate(destination v2net.Destination) error { ray := this.PacketDispatcher.DispatchToOutbound(&proxy.InboundHandlerMeta{ AllowPassiveConnection: false, - }, destination) + }, &proxy.SessionInfo{ + Source: v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(0)), + Destination: destination, + }) input := ray.InboundInput() output := ray.InboundOutput() diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 1e201343..7dd2b366 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -2,6 +2,7 @@ package inbound import ( "io" + "net" "sync" "github.com/v2ray/v2ray-core/app" @@ -163,7 +164,10 @@ func (this *VMessInboundHandler) HandleConnection(connection internet.Connection connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse)) - ray := this.packetDispatcher.DispatchToOutbound(this.meta, request.Destination()) + ray := this.packetDispatcher.DispatchToOutbound(this.meta, &proxy.SessionInfo{ + Source: v2net.TCPDestinationFromAddr(connection.RemoteAddr().(*net.TCPAddr)), + Destination: request.Destination(), + }) input := ray.InboundInput() output := ray.InboundOutput() defer input.Close() diff --git a/transport/internet/udp/udp_server.go b/transport/internet/udp/udp_server.go index d8c64def..618540f8 100644 --- a/transport/internet/udp/udp_server.go +++ b/transport/internet/udp/udp_server.go @@ -132,7 +132,11 @@ func (this *UDPServer) locateExistingAndDispatch(name string, payload *alloc.Buf return false } -func (this *UDPServer) Dispatch(source v2net.Destination, destination v2net.Destination, payload *alloc.Buffer, callback UDPResponseCallback) { +func (this *UDPServer) Dispatch(session *proxy.SessionInfo, payload *alloc.Buffer, callback UDPResponseCallback) { + source := session.Source + destination := session.Destination + + // TODO: Add user to destString destString := source.String() + "-" + destination.String() log.Debug("UDP Server: Dispatch request: ", destString) if this.locateExistingAndDispatch(destString, payload) { @@ -140,7 +144,7 @@ func (this *UDPServer) Dispatch(source v2net.Destination, destination v2net.Dest } log.Info("UDP Server: establishing new connection for ", destString) - inboundRay := this.packetDispatcher.DispatchToOutbound(this.meta, destination) + inboundRay := this.packetDispatcher.DispatchToOutbound(this.meta, session) timedInboundRay := NewTimedInboundRay(destString, inboundRay, this) outputStream := timedInboundRay.InboundInput() if outputStream != nil {