From e12e5a0ecb16bce41c3b38cbdb2cb274e68ef870 Mon Sep 17 00:00:00 2001 From: v2ray Date: Tue, 15 Dec 2015 16:00:47 +0100 Subject: [PATCH] barely working http proxy --- proxy/freedom/freedom.go | 6 +- proxy/freedom/freedom_test.go | 5 +- proxy/http/http.go | 102 +++++++++++++++++++++++----- proxy/http/http_factory.go | 17 +++++ release/server/main.go | 2 + testing/scenarios/dokodemo_test.go | 1 + testing/scenarios/router_test.go | 2 + testing/scenarios/socks_end_test.go | 5 +- testing/servers/tcp/tcp.go | 23 ++++--- testing/servers/udp/udp.go | 7 ++ 10 files changed, 139 insertions(+), 31 deletions(-) create mode 100644 proxy/http/http_factory.go diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 9a235eca..e9f29035 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -78,9 +78,9 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou } writeMutex.Lock() - if tcpConn, ok := conn.(*net.TCPConn); ok { - tcpConn.CloseWrite() - } + //if tcpConn, ok := conn.(*net.TCPConn); ok { + // tcpConn.CloseWrite() + //} readMutex.Lock() conn.Close() diff --git a/proxy/freedom/freedom_test.go b/proxy/freedom/freedom_test.go index 30ea273c..0c46db98 100644 --- a/proxy/freedom/freedom_test.go +++ b/proxy/freedom/freedom_test.go @@ -3,7 +3,6 @@ package freedom import ( "bytes" "fmt" - "io/ioutil" "net" "testing" @@ -127,9 +126,9 @@ func TestSocksTcpConnect(t *testing.T) { tcpConn.CloseWrite() } - dataReturned, err := ioutil.ReadAll(conn) + dataReturned, err := v2net.ReadFrom(conn, nil) assert.Error(err).IsNil() conn.Close() - assert.Bytes(dataReturned).Equals([]byte("Processed: Data to be sent to remote")) + assert.Bytes(dataReturned.Value).Equals([]byte("Processed: Data to be sent to remote")) } diff --git a/proxy/http/http.go b/proxy/http/http.go index b4c94b4a..00a24463 100644 --- a/proxy/http/http.go +++ b/proxy/http/http.go @@ -2,15 +2,18 @@ package http import ( "bufio" + "io" "net" "net/http" "strconv" "strings" + "sync" "github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/transport/ray" ) type HttpProxyServer struct { @@ -50,22 +53,23 @@ func (this *HttpProxyServer) accept(listener *net.TCPListener) { } } -func parseHost(rawHost string) (v2net.Address, error) { - port := v2net.Port(80) +func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Address, error) { + port := defaultPort host, rawPort, err := net.SplitHostPort(rawHost) if err != nil { if addrError, ok := err.(*net.AddrError); ok && strings.Contains(addrError.Err, "missing port") { host = rawHost - port = v2net.Port(80) } else { return nil, err } + } else { + intPort, err := strconv.Atoi(rawPort) + if err != nil { + return nil, err + } + port = v2net.Port(intPort) } - intPort, err := strconv.Atoi(rawPort) - if err != nil { - return nil, err - } - port = v2net.Port(intPort) + if ip := net.ParseIP(host); ip != nil { return v2net.IPAddress(ip, port), nil } @@ -73,14 +77,25 @@ func parseHost(rawHost string) (v2net.Address, error) { } func (this *HttpProxyServer) handleConnection(conn *net.TCPConn) { - httpReader := bufio.NewReader(conn) - request, err := http.ReadRequest(httpReader) - if err != nil { - log.Warning("Malformed HTTP request: %v", err) - return + defer conn.Close() + reader := bufio.NewReader(conn) + for true { + request, err := http.ReadRequest(reader) + if err != nil { + break + } + this.handleRequest(request, reader, conn) + } +} + +func (this *HttpProxyServer) handleRequest(request *http.Request, reader io.Reader, writer io.Writer) { + log.Info("Request to Method [%s] Host [%s] with URL [%s]", request.Method, request.Host, request.URL.String()) + defaultPort := v2net.Port(80) + if strings.ToLower(request.URL.Scheme) == "https" { + defaultPort = v2net.Port(443) } if strings.ToUpper(request.Method) == "CONNECT" { - address, err := parseHost(request.Host) + address, err := parseHost(request.Host, defaultPort) if err != nil { log.Warning("Malformed proxy host: %v", err) return @@ -99,9 +114,64 @@ func (this *HttpProxyServer) handleConnection(conn *net.TCPConn) { buffer := alloc.NewSmallBuffer().Clear() response.Write(buffer) - conn.Write(buffer.Value) + writer.Write(buffer.Value) packet := v2net.NewPacket(v2net.NewTCPDestination(address), nil, true) - this.space.PacketDispatcher().DispatchToOutbound(packet) + ray := this.space.PacketDispatcher().DispatchToOutbound(packet) + this.transport(reader, writer, ray) + } else if len(request.URL.Host) > 0 { + address, err := parseHost(request.URL.Host, defaultPort) + if err != nil { + log.Warning("Malformed proxy host: %v", err) + return + } + request.Host = request.URL.Host + request.Header.Set("Connection", "keep-alive") + request.Header.Del("Proxy-Connection") + buffer := alloc.NewBuffer().Clear() + request.Write(buffer) + log.Info("Request to remote: %s", string(buffer.Value)) + packet := v2net.NewPacket(v2net.NewTCPDestination(address), buffer, false) + ray := this.space.PacketDispatcher().DispatchToOutbound(packet) + this.transport(nil, writer, ray) + } else { + response := &http.Response{ + Status: "400 Bad Request", + StatusCode: 400, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + Header: http.Header(make(map[string][]string)), + Body: nil, + ContentLength: 0, + Close: false, + } + + buffer := alloc.NewSmallBuffer().Clear() + response.Write(buffer) + writer.Write(buffer.Value) } } + +func (this *HttpProxyServer) transport(input io.Reader, output io.Writer, ray ray.InboundRay) { + var inputFinish, outputFinish sync.Mutex + inputFinish.Lock() + outputFinish.Lock() + + if input != nil { + go func() { + v2net.ReaderToChan(ray.InboundInput(), input) + inputFinish.Unlock() + close(ray.InboundInput()) + }() + } else { + close(ray.InboundInput()) + } + + go func() { + v2net.ChanToWriter(output, ray.InboundOutput()) + outputFinish.Unlock() + }() + + outputFinish.Lock() +} diff --git a/proxy/http/http_factory.go b/proxy/http/http_factory.go new file mode 100644 index 00000000..6853933e --- /dev/null +++ b/proxy/http/http_factory.go @@ -0,0 +1,17 @@ +package http + +import ( + "github.com/v2ray/v2ray-core/app" + "github.com/v2ray/v2ray-core/proxy/common/connhandler" +) + +type HttpProxyServerFactory struct { +} + +func (this HttpProxyServerFactory) Create(space app.Space, rawConfig interface{}) (connhandler.InboundConnectionHandler, error) { + return NewHttpProxyServer(space, rawConfig.(Config)), nil +} + +func init() { + connhandler.RegisterInboundConnectionHandlerFactory("http", HttpProxyServerFactory{}) +} diff --git a/release/server/main.go b/release/server/main.go index 2785c82a..f837ff79 100644 --- a/release/server/main.go +++ b/release/server/main.go @@ -21,6 +21,8 @@ import ( _ "github.com/v2ray/v2ray-core/proxy/dokodemo/json" _ "github.com/v2ray/v2ray-core/proxy/freedom" _ "github.com/v2ray/v2ray-core/proxy/freedom/json" + _ "github.com/v2ray/v2ray-core/proxy/http" + _ "github.com/v2ray/v2ray-core/proxy/http/json" _ "github.com/v2ray/v2ray-core/proxy/socks" _ "github.com/v2ray/v2ray-core/proxy/socks/json" _ "github.com/v2ray/v2ray-core/proxy/vmess/inbound" diff --git a/testing/scenarios/dokodemo_test.go b/testing/scenarios/dokodemo_test.go index 26eaf938..b581c369 100644 --- a/testing/scenarios/dokodemo_test.go +++ b/testing/scenarios/dokodemo_test.go @@ -24,6 +24,7 @@ func TestDokodemoTCP(t *testing.T) { } _, err := tcpServer.Start() assert.Error(err).IsNil() + defer tcpServer.Close() assert.Error(InitializeServerSetOnce("test_2")).IsNil() diff --git a/testing/scenarios/router_test.go b/testing/scenarios/router_test.go index 831fb2e8..3226fd4b 100644 --- a/testing/scenarios/router_test.go +++ b/testing/scenarios/router_test.go @@ -24,6 +24,7 @@ func TestRouter(t *testing.T) { } _, err := tcpServer.Start() assert.Error(err).IsNil() + defer tcpServer.Close() tcpServer2Accessed := false tcpServer2 := &tcp.Server{ @@ -35,6 +36,7 @@ func TestRouter(t *testing.T) { } _, err = tcpServer2.Start() assert.Error(err).IsNil() + defer tcpServer2.Close() assert.Error(InitializeServerSetOnce("test_3")).IsNil() diff --git a/testing/scenarios/socks_end_test.go b/testing/scenarios/socks_end_test.go index c8b58e02..be0ba895 100644 --- a/testing/scenarios/socks_end_test.go +++ b/testing/scenarios/socks_end_test.go @@ -31,12 +31,13 @@ func TestTCPConnection(t *testing.T) { } _, err := tcpServer.Start() assert.Error(err).IsNil() + defer tcpServer.Close() assert.Error(InitializeServerSetOnce("test_1")).IsNil() socksPort := v2net.Port(50000) - for i := 0; i < 100; i++ { + for i := 0; i < 10; i++ { conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{ IP: []byte{127, 0, 0, 1}, Port: int(socksPort), @@ -98,6 +99,7 @@ func TestTCPBind(t *testing.T) { } _, err := tcpServer.Start() assert.Error(err).IsNil() + defer tcpServer.Close() assert.Error(InitializeServerSetOnce("test_1")).IsNil() @@ -146,6 +148,7 @@ func TestUDPAssociate(t *testing.T) { } _, err := udpServer.Start() assert.Error(err).IsNil() + defer udpServer.Close() assert.Error(InitializeServerSetOnce("test_1")).IsNil() diff --git a/testing/servers/tcp/tcp.go b/testing/servers/tcp/tcp.go index 92445480..0ebea452 100644 --- a/testing/servers/tcp/tcp.go +++ b/testing/servers/tcp/tcp.go @@ -2,7 +2,6 @@ package tcp import ( "fmt" - "io/ioutil" "net" v2net "github.com/v2ray/v2ray-core/common/net" @@ -11,6 +10,7 @@ import ( type Server struct { Port v2net.Port MsgProcessor func(msg []byte) []byte + accepting bool } func (server *Server) Start() (v2net.Address, error) { @@ -28,7 +28,9 @@ func (server *Server) Start() (v2net.Address, error) { } func (server *Server) acceptConnections(listener *net.TCPListener) { - for { + server.accepting = true + defer listener.Close() + for server.accepting { conn, err := listener.Accept() if err != nil { fmt.Printf("Failed accept TCP connection: %v", err) @@ -40,12 +42,17 @@ func (server *Server) acceptConnections(listener *net.TCPListener) { } func (server *Server) handleConnection(conn net.Conn) { - request, err := ioutil.ReadAll(conn) - if err != nil { - fmt.Printf("Failed to read request: %v", err) - return + for true { + request, err := v2net.ReadFrom(conn, nil) + if err != nil { + break + } + response := server.MsgProcessor(request.Value) + conn.Write(response) } - response := server.MsgProcessor(request) - conn.Write(response) conn.Close() } + +func (this *Server) Close() { + this.accepting = true +} diff --git a/testing/servers/udp/udp.go b/testing/servers/udp/udp.go index ecfe5975..91199255 100644 --- a/testing/servers/udp/udp.go +++ b/testing/servers/udp/udp.go @@ -10,6 +10,7 @@ import ( type Server struct { Port v2net.Port MsgProcessor func(msg []byte) []byte + accepting bool } func (server *Server) Start() (v2net.Address, error) { @@ -27,6 +28,8 @@ func (server *Server) Start() (v2net.Address, error) { } func (server *Server) handleConnection(conn *net.UDPConn) { + server.accepting = true + defer conn.Close() for { buffer := make([]byte, 2*1024) nBytes, addr, err := conn.ReadFromUDP(buffer) @@ -39,3 +42,7 @@ func (server *Server) handleConnection(conn *net.UDPConn) { conn.WriteToUDP(response, addr) } } + +func (server *Server) Close() { + server.accepting = false +}