mirror of https://github.com/v2ray/v2ray-core
Merge pull request #99 from darhwa/http-outbound
Reduce extra RTT when setting up CONNECT tunnel in http outbound (not vmess through http)pull/2714/head
commit
ea5d1ee664
|
@ -16,6 +16,7 @@ import (
|
||||||
"v2ray.com/core"
|
"v2ray.com/core"
|
||||||
"v2ray.com/core/common"
|
"v2ray.com/core/common"
|
||||||
"v2ray.com/core/common/buf"
|
"v2ray.com/core/common/buf"
|
||||||
|
"v2ray.com/core/common/bytespool"
|
||||||
"v2ray.com/core/common/net"
|
"v2ray.com/core/common/net"
|
||||||
"v2ray.com/core/common/protocol"
|
"v2ray.com/core/common/protocol"
|
||||||
"v2ray.com/core/common/retry"
|
"v2ray.com/core/common/retry"
|
||||||
|
@ -80,12 +81,21 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter
|
||||||
var user *protocol.MemoryUser
|
var user *protocol.MemoryUser
|
||||||
var conn internet.Connection
|
var conn internet.Connection
|
||||||
|
|
||||||
|
mbuf, _ := link.Reader.ReadMultiBuffer()
|
||||||
|
len := mbuf.Len()
|
||||||
|
firstPayload := bytespool.Alloc(len)
|
||||||
|
mbuf, _ = buf.SplitBytes(mbuf, firstPayload)
|
||||||
|
firstPayload = firstPayload[:len]
|
||||||
|
|
||||||
|
buf.ReleaseMulti(mbuf)
|
||||||
|
defer bytespool.Free(firstPayload)
|
||||||
|
|
||||||
if err := retry.ExponentialBackoff(5, 100).On(func() error {
|
if err := retry.ExponentialBackoff(5, 100).On(func() error {
|
||||||
server := c.serverPicker.PickServer()
|
server := c.serverPicker.PickServer()
|
||||||
dest := server.Destination()
|
dest := server.Destination()
|
||||||
user = server.PickUser()
|
user = server.PickUser()
|
||||||
|
|
||||||
netConn, err := setUpHTTPTunnel(ctx, dest, targetAddr, user, dialer)
|
netConn, err := setUpHTTPTunnel(ctx, dest, targetAddr, user, dialer, firstPayload)
|
||||||
if netConn != nil {
|
if netConn != nil {
|
||||||
conn = internet.Connection(netConn)
|
conn = internet.Connection(netConn)
|
||||||
}
|
}
|
||||||
|
@ -126,11 +136,11 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter
|
||||||
}
|
}
|
||||||
|
|
||||||
// setUpHTTPTunnel will create a socket tunnel via HTTP CONNECT method
|
// setUpHTTPTunnel will create a socket tunnel via HTTP CONNECT method
|
||||||
func setUpHTTPTunnel(ctx context.Context, dest net.Destination, target string, user *protocol.MemoryUser, dialer internet.Dialer) (net.Conn, error) {
|
func setUpHTTPTunnel(ctx context.Context, dest net.Destination, target string, user *protocol.MemoryUser, dialer internet.Dialer, firstPayload []byte) (net.Conn, error) {
|
||||||
req := &http.Request{
|
req := &http.Request{
|
||||||
Method: http.MethodConnect,
|
Method: http.MethodConnect,
|
||||||
URL: &url.URL{Host: target},
|
URL: &url.URL{Host: target},
|
||||||
Header: http.Header{"Proxy-Connection": []string{"Keep-Alive"}},
|
Header: make(http.Header),
|
||||||
Host: target,
|
Host: target,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,12 +151,19 @@ func setUpHTTPTunnel(ctx context.Context, dest net.Destination, target string, u
|
||||||
}
|
}
|
||||||
|
|
||||||
connectHTTP1 := func(rawConn net.Conn) (net.Conn, error) {
|
connectHTTP1 := func(rawConn net.Conn) (net.Conn, error) {
|
||||||
|
req.Header.Set("Proxy-Connection", "Keep-Alive")
|
||||||
|
|
||||||
err := req.Write(rawConn)
|
err := req.Write(rawConn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rawConn.Close()
|
rawConn.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, err := rawConn.Write(firstPayload); err != nil {
|
||||||
|
rawConn.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
resp, err := http.ReadResponse(bufio.NewReader(rawConn), req)
|
resp, err := http.ReadResponse(bufio.NewReader(rawConn), req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rawConn.Close()
|
rawConn.Close()
|
||||||
|
@ -164,12 +181,27 @@ func setUpHTTPTunnel(ctx context.Context, dest net.Destination, target string, u
|
||||||
pr, pw := io.Pipe()
|
pr, pw := io.Pipe()
|
||||||
req.Body = pr
|
req.Body = pr
|
||||||
|
|
||||||
|
var pErr error
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
_, pErr = pw.Write(firstPayload)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
resp, err := h2clientConn.RoundTrip(req)
|
resp, err := h2clientConn.RoundTrip(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rawConn.Close()
|
rawConn.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
if pErr != nil {
|
||||||
|
rawConn.Close()
|
||||||
|
return nil, pErr
|
||||||
|
}
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
rawConn.Close()
|
rawConn.Close()
|
||||||
return nil, newError("Proxy responded with non 200 code: " + resp.Status)
|
return nil, newError("Proxy responded with non 200 code: " + resp.Status)
|
||||||
|
|
Loading…
Reference in New Issue