From 648d4180aa302ce37bb1d6d7a0c2ec1d0952d0d7 Mon Sep 17 00:00:00 2001 From: RPRX <63339210+rprx@users.noreply.github.com> Date: Thu, 24 Sep 2020 01:59:14 +0000 Subject: [PATCH] VLESS PREVIEW 2 --- infra/conf/vless.go | 14 +++- infra/conf/vless_test.go | 6 +- proxy/vless/encoding/addons.go | 136 ++++++++++++++++++++++++++++++- proxy/vless/encoding/encoding.go | 10 +-- proxy/vless/inbound/inbound.go | 83 +++++++++++++------ proxy/vless/outbound/outbound.go | 59 ++++++++++++-- proxy/vless/vless.go | 4 + 7 files changed, 264 insertions(+), 48 deletions(-) diff --git a/infra/conf/vless.go b/infra/conf/vless.go index 7d222bba..00f2f394 100644 --- a/infra/conf/vless.go +++ b/infra/conf/vless.go @@ -48,9 +48,12 @@ func (c *VLessInboundConfig) Build() (proto.Message, error) { return nil, newError(`VLESS clients: invalid user`).Base(err) } - if account.Flow != "" { - return nil, newError(`VLESS clients: "flow" is not available in this version`) + switch account.Flow { + case "", "xtls-rprx-origin": + default: + return nil, newError(`VLESS clients: "flow" only accepts "", "xtls-rprx-origin" in this version`) } + if account.Encryption != "" { return nil, newError(`VLESS clients: "encryption" should not in inbound settings`) } @@ -161,9 +164,12 @@ func (c *VLessOutboundConfig) Build() (proto.Message, error) { return nil, newError(`VLESS users: invalid user`).Base(err) } - if account.Flow != "" { - return nil, newError(`VLESS users: "flow" is not available in this version`) + switch account.Flow { + case "", "xtls-rprx-origin", "xtls-rprx-origin-udp443": + default: + return nil, newError(`VLESS users: "flow" only accepts "", "xtls-rprx-origin", "xtls-rprx-origin-udp443" in this version`) } + if account.Encryption != "none" { return nil, newError(`VLESS users: please add/set "encryption":"none" for every user`) } diff --git a/infra/conf/vless_test.go b/infra/conf/vless_test.go index 12035095..01eb9619 100644 --- a/infra/conf/vless_test.go +++ b/infra/conf/vless_test.go @@ -26,6 +26,7 @@ func TestVLessOutbound(t *testing.T) { "users": [ { "id": "27848739-7e62-4138-9fd3-098a63964b6b", + "flow": "xtls-rprx-origin-udp443", "encryption": "none", "level": 0 } @@ -46,6 +47,7 @@ func TestVLessOutbound(t *testing.T) { { Account: serial.ToTypedMessage(&vless.Account{ Id: "27848739-7e62-4138-9fd3-098a63964b6b", + Flow: "xtls-rprx-origin-udp443", Encryption: "none", }), Level: 0, @@ -69,6 +71,7 @@ func TestVLessInbound(t *testing.T) { "clients": [ { "id": "27848739-7e62-4138-9fd3-098a63964b6b", + "flow": "xtls-rprx-origin", "level": 0, "email": "love@v2fly.org" } @@ -94,7 +97,8 @@ func TestVLessInbound(t *testing.T) { Clients: []*protocol.User{ { Account: serial.ToTypedMessage(&vless.Account{ - Id: "27848739-7e62-4138-9fd3-098a63964b6b", + Id: "27848739-7e62-4138-9fd3-098a63964b6b", + Flow: "xtls-rprx-origin", }), Level: 0, Email: "love@v2fly.org", diff --git a/proxy/vless/encoding/addons.go b/proxy/vless/encoding/addons.go index a69c4109..389511c9 100644 --- a/proxy/vless/encoding/addons.go +++ b/proxy/vless/encoding/addons.go @@ -9,11 +9,25 @@ import ( "v2ray.com/core/common/buf" "v2ray.com/core/common/protocol" + "v2ray.com/core/proxy/vless" ) func EncodeHeaderAddons(buffer *buf.Buffer, addons *Addons) error { switch addons.Flow { + case vless.XRO: + + if bytes, err := proto.Marshal(addons); err != nil { + newError("failed to marshal addons protobuf value").Base(err) + } else { + if err := buffer.WriteByte(byte(len(bytes))); err != nil { + return newError("failed to write addons protobuf length").Base(err) + } + if _, err := buffer.Write(bytes); err != nil { + return newError("failed to write addons protobuf value").Base(err) + } + } + default: if err := buffer.WriteByte(0); err != nil { @@ -62,22 +76,136 @@ func DecodeHeaderAddons(buffer *buf.Buffer, reader io.Reader) (*Addons, error) { func EncodeBodyAddons(writer io.Writer, request *protocol.RequestHeader, addons *Addons) buf.Writer { switch addons.Flow { - default: + case vless.XRO: - return buf.NewWriter(writer) + if request.Command == protocol.RequestCommandUDP { + return NewMultiLengthPacketWriter(writer.(buf.Writer)) + } } + return buf.NewWriter(writer) + } // DecodeBodyAddons returns a Reader from which caller can fetch decrypted body. func DecodeBodyAddons(reader io.Reader, request *protocol.RequestHeader, addons *Addons) buf.Reader { switch addons.Flow { - default: + case vless.XRO: - return buf.NewReader(reader) + if request.Command == protocol.RequestCommandUDP { + return NewLengthPacketReader(reader) + } } + return buf.NewReader(reader) + +} + +func NewMultiLengthPacketWriter(writer buf.Writer) *MultiLengthPacketWriter { + return &MultiLengthPacketWriter{ + Writer: writer, + } +} + +type MultiLengthPacketWriter struct { + buf.Writer +} + +func (w *MultiLengthPacketWriter) WriteMultiBuffer(mb buf.MultiBuffer) error { + defer buf.ReleaseMulti(mb) + mb2Write := make(buf.MultiBuffer, 0, len(mb)+1) + for _, b := range mb { + length := b.Len() + if length == 0 || length+2 > buf.Size { + continue + } + eb := buf.New() + if err := eb.WriteByte(byte(length >> 8)); err != nil { + eb.Release() + continue + } + if err := eb.WriteByte(byte(length)); err != nil { + eb.Release() + continue + } + if _, err := eb.Write(b.Bytes()); err != nil { + eb.Release() + continue + } + mb2Write = append(mb2Write, eb) + } + if mb2Write.IsEmpty() { + return nil + } + return w.Writer.WriteMultiBuffer(mb2Write) +} + +func NewLengthPacketWriter(writer io.Writer) *LengthPacketWriter { + return &LengthPacketWriter{ + Writer: writer, + cache: make([]byte, 0, 65536), + } +} + +type LengthPacketWriter struct { + io.Writer + cache []byte +} + +func (w *LengthPacketWriter) WriteMultiBuffer(mb buf.MultiBuffer) error { + length := mb.Len() // none of mb is nil + //fmt.Println("Write", length) + if length == 0 { + return nil + } + defer func() { + w.cache = w.cache[:0] + }() + w.cache = append(w.cache, byte(length>>8), byte(length)) + for i, b := range mb { + w.cache = append(w.cache, b.Bytes()...) + b.Release() + mb[i] = nil + } + if _, err := w.Write(w.cache); err != nil { + return newError("failed to write a packet").Base(err) + } + return nil +} + +func NewLengthPacketReader(reader io.Reader) *LengthPacketReader { + return &LengthPacketReader{ + Reader: reader, + cache: make([]byte, 2), + } +} + +type LengthPacketReader struct { + io.Reader + cache []byte +} + +func (r *LengthPacketReader) ReadMultiBuffer() (buf.MultiBuffer, error) { + if _, err := io.ReadFull(r.Reader, r.cache); err != nil { // maybe EOF + return nil, newError("failed to read packet length").Base(err) + } + length := int(r.cache[0])<<8 | int(r.cache[1]) + //fmt.Println("Read", length) + mb := make(buf.MultiBuffer, 0, length/buf.Size+1) + for length > 0 { + size := length + if length > buf.Size { + size = buf.Size + } + length -= size + b := buf.New() + if _, err := b.ReadFullFrom(r.Reader, int32(size)); err != nil { + return nil, newError("failed to read packet payload").Base(err) + } + mb = append(mb, b) + } + return mb, nil } diff --git a/proxy/vless/encoding/encoding.go b/proxy/vless/encoding/encoding.go index a0d5b54f..b56b30ef 100644 --- a/proxy/vless/encoding/encoding.go +++ b/proxy/vless/encoding/encoding.go @@ -153,23 +153,23 @@ func EncodeResponseHeader(writer io.Writer, request *protocol.RequestHeader, res } // DecodeResponseHeader decodes and returns (if successful) a ResponseHeader from an input stream. -func DecodeResponseHeader(reader io.Reader, request *protocol.RequestHeader, responseAddons *Addons) error { +func DecodeResponseHeader(reader io.Reader, request *protocol.RequestHeader) (*Addons, error) { buffer := buf.StackNew() defer buffer.Release() if _, err := buffer.ReadFullFrom(reader, 1); err != nil { - return newError("failed to read response version").Base(err) + return nil, newError("failed to read response version").Base(err) } if buffer.Byte(0) != request.Version { - return newError("unexpected response version. Expecting ", int(request.Version), " but actually ", int(buffer.Byte(0))) + return nil, newError("unexpected response version. Expecting ", int(request.Version), " but actually ", int(buffer.Byte(0))) } responseAddons, err := DecodeHeaderAddons(&buffer, reader) if err != nil { - return newError("failed to decode response header addons").Base(err) + return nil, newError("failed to decode response header addons").Base(err) } - return nil + return responseAddons, nil } diff --git a/proxy/vless/inbound/inbound.go b/proxy/vless/inbound/inbound.go index 865aafc1..af7ded4f 100644 --- a/proxy/vless/inbound/inbound.go +++ b/proxy/vless/inbound/inbound.go @@ -6,7 +6,6 @@ package inbound import ( "context" - "encoding/hex" "io" "strconv" "time" @@ -17,6 +16,7 @@ import ( "v2ray.com/core/common/errors" "v2ray.com/core/common/log" "v2ray.com/core/common/net" + "v2ray.com/core/common/platform" "v2ray.com/core/common/protocol" "v2ray.com/core/common/retry" "v2ray.com/core/common/session" @@ -30,6 +30,11 @@ import ( "v2ray.com/core/proxy/vless/encoding" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/tls" + "v2ray.com/core/transport/internet/xtls" +) + +var ( + xtls_show = false ) func init() { @@ -43,6 +48,13 @@ func init() { } return New(ctx, config.(*Config), dc) })) + + const defaultFlagValue = "NOT_DEFINED_AT_ALL" + + xtlsShow := platform.NewEnvFlag("v2ray.vless.xtls.show").GetValue(func() string { return defaultFlagValue }) + if xtlsShow == "true" { + xtls_show = true + } } // Handler is an inbound connection handler that handles messages in VLess protocol. @@ -135,6 +147,11 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i sid := session.ExportIDToError(ctx) + iConn := connection + if statConn, ok := iConn.(*internet.StatCouterConnection); ok { + iConn = statConn.Connection + } + sessionPolicy := h.policyManager.ForLevel(0) if err := connection.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake)); err != nil { return newError("unable to set read deadline").Base(err).AtWarning() @@ -183,16 +200,15 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i alpn := "" if len(apfb) > 1 || apfb[""] == nil { - iConn := connection - if statConn, ok := iConn.(*internet.StatCouterConnection); ok { - iConn = statConn.Connection - } if tlsConn, ok := iConn.(*tls.Conn); ok { alpn = tlsConn.ConnectionState().NegotiatedProtocol newError("realAlpn = " + alpn).AtInfo().WriteToLog(sid) - if apfb[alpn] == nil { - alpn = "" - } + } else if xtlsConn, ok := iConn.(*xtls.Conn); ok { + alpn = xtlsConn.ConnectionState().NegotiatedProtocol + newError("realAlpn = " + alpn).AtInfo().WriteToLog(sid) + } + if apfb[alpn] == nil { + alpn = "" } } pfb := apfb[alpn] @@ -307,18 +323,9 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i pro.Write(net.ParseIP(remoteAddr).To16()) pro.Write(net.ParseIP(localAddr).To16()) } - p1, _ := strconv.ParseInt(remotePort, 10, 64) - b1, _ := hex.DecodeString(strconv.FormatInt(p1, 16)) - p2, _ := strconv.ParseInt(localPort, 10, 64) - b2, _ := hex.DecodeString(strconv.FormatInt(p2, 16)) - if len(b1) == 1 { - pro.WriteByte(0) - } - pro.Write(b1) - if len(b2) == 1 { - pro.WriteByte(0) - } - pro.Write(b2) + p1, _ := strconv.ParseUint(remotePort, 10, 16) + p2, _ := strconv.ParseUint(localPort, 10, 16) + pro.Write([]byte{byte(p1 >> 8), byte(p1), byte(p2 >> 8), byte(p2)}) } if err := serverWriter.WriteMultiBuffer(buf.MultiBuffer{pro}); err != nil { return newError("failed to set PROXY protocol v", fb.Xver).Base(err).AtWarning() @@ -376,6 +383,34 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i } inbound.User = request.User + account := request.User.Account.(*vless.MemoryAccount) + + responseAddons := &encoding.Addons{ + Flow: requestAddons.Flow, + } + + switch requestAddons.Flow { + case vless.XRO: + if account.Flow == vless.XRO { + switch request.Command { + case protocol.RequestCommandMux: + return newError(vless.XRO + " doesn't support Mux").AtWarning() + case protocol.RequestCommandUDP: + //return newError(vless.XRO + " doesn't support UDP").AtWarning() + case protocol.RequestCommandTCP: + if xtlsConn, ok := iConn.(*xtls.Conn); ok { + xtlsConn.RPRX = true + xtlsConn.SHOW = xtls_show + xtlsConn.MARK = "XTLS" + } else { + return newError(`failed to use ` + vless.XRO + `, maybe "security" is not "xtls"`).AtWarning() + } + } + } else { + return newError(account.ID.String() + " is not able to use " + vless.XRO).AtWarning() + } + } + if request.Command != protocol.RequestCommandMux { ctx = log.ContextWithAccessMessage(ctx, &log.AccessMessage{ From: connection.RemoteAddr(), @@ -396,8 +431,8 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i return newError("failed to dispatch request to ", request.Destination()).Base(err).AtWarning() } - serverReader := link.Reader - serverWriter := link.Writer + serverReader := link.Reader // .(*pipe.Reader) + serverWriter := link.Writer // .(*pipe.Writer) postRequest := func() error { defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) @@ -416,10 +451,6 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i getResponse := func() error { defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly) - responseAddons := &encoding.Addons{ - Flow: requestAddons.Flow, - } - bufferWriter := buf.NewBufferedWriter(buf.NewWriter(connection)) if err := encoding.EncodeResponseHeader(bufferWriter, request, responseAddons); err != nil { return newError("failed to encode response header").Base(err).AtWarning() diff --git a/proxy/vless/outbound/outbound.go b/proxy/vless/outbound/outbound.go index 161ada2f..a1d6726c 100644 --- a/proxy/vless/outbound/outbound.go +++ b/proxy/vless/outbound/outbound.go @@ -12,6 +12,7 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/net" + "v2ray.com/core/common/platform" "v2ray.com/core/common/protocol" "v2ray.com/core/common/retry" "v2ray.com/core/common/session" @@ -22,12 +23,24 @@ import ( "v2ray.com/core/proxy/vless/encoding" "v2ray.com/core/transport" "v2ray.com/core/transport/internet" + "v2ray.com/core/transport/internet/xtls" +) + +var ( + xtls_show = false ) func init() { common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { return New(ctx, config.(*Config)) })) + + const defaultFlagValue = "NOT_DEFINED_AT_ALL" + + xtlsShow := platform.NewEnvFlag("v2ray.vless.xtls.show").GetValue(func() string { return defaultFlagValue }) + if xtlsShow == "true" { + xtls_show = true + } } // Handler is an outbound connection handler for VLess protocol. @@ -60,13 +73,13 @@ func New(ctx context.Context, config *Config) (*Handler, error) { } // Process implements proxy.Outbound.Process(). -func (v *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error { +func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error { var rec *protocol.ServerSpec var conn internet.Connection if err := retry.ExponentialBackoff(5, 200).On(func() error { - rec = v.serverPicker.PickServer() + rec = h.serverPicker.PickServer() var err error conn, err = dialer.Dial(ctx, rec.Destination()) if err != nil { @@ -78,6 +91,11 @@ func (v *Handler) Process(ctx context.Context, link *transport.Link, dialer inte } defer conn.Close() // nolint: errcheck + iConn := conn + if statConn, ok := iConn.(*internet.StatCouterConnection); ok { + iConn = statConn.Connection + } + outbound := session.OutboundFromContext(ctx) if outbound == nil || !outbound.Target.IsValid() { return newError("target not specified").AtError() @@ -108,12 +126,38 @@ func (v *Handler) Process(ctx context.Context, link *transport.Link, dialer inte Flow: account.Flow, } - sessionPolicy := v.policyManager.ForLevel(request.User.Level) + switch requestAddons.Flow { + case vless.XRO, vless.XRO + "-udp443": + switch request.Command { + case protocol.RequestCommandMux: + return newError(vless.XRO + " doesn't support Mux").AtWarning() + case protocol.RequestCommandUDP: + if requestAddons.Flow == vless.XRO && request.Port == 443 { + return newError(vless.XRO + " stopped UDP/443").AtWarning() + } + requestAddons.Flow = vless.XRO + case protocol.RequestCommandTCP: + if xtlsConn, ok := iConn.(*xtls.Conn); ok { + xtlsConn.RPRX = true + xtlsConn.SHOW = xtls_show + xtlsConn.MARK = "XTLS" + } else { + return newError(`failed to use ` + vless.XRO + `, maybe "security" is not "xtls"`).AtWarning() + } + requestAddons.Flow = vless.XRO + } + default: + if _, ok := iConn.(*xtls.Conn); ok { + panic(`To avoid misunderstanding, you must fill in VLESS "flow" when using XTLS.`) + } + } + + sessionPolicy := h.policyManager.ForLevel(request.User.Level) ctx, cancel := context.WithCancel(ctx) timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle) - clientReader := link.Reader - clientWriter := link.Writer + clientReader := link.Reader // .(*pipe.Reader) + clientWriter := link.Writer // .(*pipe.Writer) postRequest := func() error { defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) @@ -151,9 +195,8 @@ func (v *Handler) Process(ctx context.Context, link *transport.Link, dialer inte getResponse := func() error { defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly) - responseAddons := new(encoding.Addons) - - if err := encoding.DecodeResponseHeader(conn, request, responseAddons); err != nil { + responseAddons, err := encoding.DecodeResponseHeader(conn, request) + if err != nil { return newError("failed to decode response header").Base(err).AtWarning() } diff --git a/proxy/vless/vless.go b/proxy/vless/vless.go index 9e6dc7ab..ea51e563 100644 --- a/proxy/vless/vless.go +++ b/proxy/vless/vless.go @@ -6,3 +6,7 @@ package vless //go:generate errorgen + +const ( + XRO = "xtls-rprx-origin" +)