From d343cb1ee6778bbbffca02b8854eb6ece30182b4 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Wed, 9 Nov 2016 00:17:09 +0100 Subject: [PATCH] proxy connection --- app/proxy/proxy.go | 131 +++++++++++++++++++++++++++++++ proxy/freedom/freedom.go | 4 +- proxy/shadowsocks/client.go | 4 +- proxy/vmess/outbound/outbound.go | 4 +- transport/internet/dialer.go | 24 +++--- 5 files changed, 153 insertions(+), 14 deletions(-) create mode 100644 app/proxy/proxy.go diff --git a/app/proxy/proxy.go b/app/proxy/proxy.go new file mode 100644 index 00000000..412c18ce --- /dev/null +++ b/app/proxy/proxy.go @@ -0,0 +1,131 @@ +package proxy + +import ( + "errors" + "io" + "net" + "time" + + "v2ray.com/core/app" + "v2ray.com/core/app/proxyman" + "v2ray.com/core/common/alloc" + v2io "v2ray.com/core/common/io" + "v2ray.com/core/common/log" + v2net "v2ray.com/core/common/net" + "v2ray.com/core/transport/internet" + "v2ray.com/core/transport/ray" +) + +const ( + APP_ID = 7 +) + +type OutboundProxy struct { + outboundManager proxyman.OutboundHandlerManager +} + +func NewOutboundProxy(space app.Space) *OutboundProxy { + proxy := new(OutboundProxy) + space.InitializeApplication(func() error { + if !space.HasApp(proxyman.APP_ID_OUTBOUND_MANAGER) { + return errors.New("Proxy: Outbound handler manager not found.") + } + proxy.outboundManager = space.GetApp(proxyman.APP_ID_OUTBOUND_MANAGER).(proxyman.OutboundHandlerManager) + return nil + }) + return proxy +} + +func (this *OutboundProxy) Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOptions) (internet.Connection, error) { + handler := this.outboundManager.GetHandler(options.ProxyTag) + if handler == nil { + log.Warning("Proxy: Failed to get outbound handler with tag: ", options.ProxyTag) + return internet.Dial(src, dest, internet.DialerOptions{ + Stream: options.Stream, + }) + } + stream := ray.NewRay() + if err := handler.Dispatch(dest, alloc.NewLocalBuffer(32).Clear(), stream); err != nil { + return nil, err + } + return NewProxyConnection(src, dest, stream), nil +} + +func (this *OutboundProxy) Release() { + +} + +type ProxyConnection struct { + stream ray.Ray + closed bool + localAddr net.Addr + remoteAddr net.Addr + + reader *v2io.ChanReader + writer *v2io.ChainWriter +} + +func NewProxyConnection(src v2net.Address, dest v2net.Destination, stream ray.Ray) *ProxyConnection { + return &ProxyConnection{ + stream: stream, + localAddr: &net.TCPAddr{ + IP: []byte{0, 0, 0, 0}, + Port: 0, + }, + remoteAddr: &net.TCPAddr{ + IP: []byte{0, 0, 0, 0}, + Port: 0, + }, + reader: v2io.NewChanReader(stream.InboundOutput()), + writer: v2io.NewChainWriter(stream.InboundInput()), + } +} + +func (this *ProxyConnection) Read(b []byte) (int, error) { + if this.closed { + return 0, io.EOF + } + return this.reader.Read(b) +} + +func (this *ProxyConnection) Write(b []byte) (int, error) { + if this.closed { + return 0, io.EOF + } + return this.writer.Write(b) +} + +func (this *ProxyConnection) Close() error { + this.closed = true + this.stream.InboundInput().Close() + this.stream.InboundOutput().Release() + return nil +} + +func (this *ProxyConnection) LocalAddr() net.Addr { + return this.localAddr +} + +func (this *ProxyConnection) RemoteAddr() net.Addr { + return this.remoteAddr +} + +func (this *ProxyConnection) SetDeadline(t time.Time) error { + return nil +} + +func (this *ProxyConnection) SetReadDeadline(t time.Time) error { + return nil +} + +func (this *ProxyConnection) SetWriteDeadline(t time.Time) error { + return nil +} + +func (this *ProxyConnection) Reusable() bool { + return false +} + +func (this *ProxyConnection) SetReusable(bool) { + +} diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 2220bece..0bb42166 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -80,7 +80,9 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload * destination = this.ResolveIP(destination) } err := retry.Timed(5, 100).On(func() error { - rawConn, err := internet.Dial(this.meta.Address, destination, this.meta.StreamSettings) + rawConn, err := internet.Dial(this.meta.Address, destination, internet.DialerOptions{ + Stream: this.meta.StreamSettings, + }) if err != nil { return err } diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index d02b024e..8cdb3096 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -48,7 +48,9 @@ func (this *Client) Dispatch(destination v2net.Destination, payload *alloc.Buffe server = this.serverPicker.PickServer() dest := server.Destination() dest.Network = network - rawConn, err := internet.Dial(this.meta.Address, dest, this.meta.StreamSettings) + rawConn, err := internet.Dial(this.meta.Address, dest, internet.DialerOptions{ + Stream: this.meta.StreamSettings, + }) if err != nil { return err } diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index fd870325..f8da7b60 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -35,7 +35,9 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al err := retry.Timed(5, 100).On(func() error { rec = this.serverPicker.PickServer() - rawConn, err := internet.Dial(this.meta.Address, rec.Destination(), this.meta.StreamSettings) + rawConn, err := internet.Dial(this.meta.Address, rec.Destination(), internet.DialerOptions{ + Stream: this.meta.StreamSettings, + }) if err != nil { return err } diff --git a/transport/internet/dialer.go b/transport/internet/dialer.go index c1f07f2b..45413b88 100644 --- a/transport/internet/dialer.go +++ b/transport/internet/dialer.go @@ -12,7 +12,8 @@ var ( ) type DialerOptions struct { - Stream *StreamConfig + Stream *StreamConfig + ProxyTag string } type Dialer func(src v2net.Address, dest v2net.Destination, options DialerOptions) (Connection, error) @@ -23,27 +24,28 @@ var ( RawTCPDialer Dialer UDPDialer Dialer WSDialer Dialer + ProxyDialer Dialer ) -func Dial(src v2net.Address, dest v2net.Destination, settings *StreamConfig) (Connection, error) { +func Dial(src v2net.Address, dest v2net.Destination, options DialerOptions) (Connection, error) { + if len(options.ProxyTag) > 0 && ProxyDialer != nil { + return ProxyDialer(src, dest, options) + } var connection Connection var err error - dialerOptions := DialerOptions{ - Stream: settings, - } if dest.Network == v2net.Network_TCP { - switch settings.Network { + switch options.Stream.Network { case v2net.Network_TCP: - connection, err = TCPDialer(src, dest, dialerOptions) + connection, err = TCPDialer(src, dest, options) case v2net.Network_KCP: - connection, err = KCPDialer(src, dest, dialerOptions) + connection, err = KCPDialer(src, dest, options) case v2net.Network_WebSocket: - connection, err = WSDialer(src, dest, dialerOptions) + connection, err = WSDialer(src, dest, options) // This check has to be the last one. case v2net.Network_RawTCP: - connection, err = RawTCPDialer(src, dest, dialerOptions) + connection, err = RawTCPDialer(src, dest, options) default: return nil, ErrUnsupportedStreamType } @@ -54,7 +56,7 @@ func Dial(src v2net.Address, dest v2net.Destination, settings *StreamConfig) (Co return connection, nil } - return UDPDialer(src, dest, dialerOptions) + return UDPDialer(src, dest, options) } func DialToDest(src v2net.Address, dest v2net.Destination) (net.Conn, error) {