From dde3f60e303d49ce855ee2dbb728022fb8d3dc1d Mon Sep 17 00:00:00 2001 From: v2ray Date: Thu, 28 Jan 2016 21:30:05 +0100 Subject: [PATCH] udp for shadowsocks --- common/alloc/buffer.go | 14 +++++ proxy/shadowsocks/shadowsocks.go | 105 ++++++++++++++++++++++++------- 2 files changed, 97 insertions(+), 22 deletions(-) diff --git a/common/alloc/buffer.go b/common/alloc/buffer.go index 553b4db5..2bc51641 100644 --- a/common/alloc/buffer.go +++ b/common/alloc/buffer.go @@ -1,6 +1,7 @@ package alloc import ( + "io" "sync" ) @@ -72,6 +73,19 @@ func (b *Buffer) Write(data []byte) (int, error) { return len(data), nil } +func (b *Buffer) Read(data []byte) (int, error) { + if b.Len() == 0 { + return 0, io.EOF + } + nBytes := copy(data, b.Value) + if nBytes == b.Len() { + b.Value = b.Value[:0] + } else { + b.Value = b.Value[nBytes:] + } + return nBytes, nil +} + type bufferPool struct { chain chan []byte allocator *sync.Pool diff --git a/proxy/shadowsocks/shadowsocks.go b/proxy/shadowsocks/shadowsocks.go index c48546d1..992dcf7d 100644 --- a/proxy/shadowsocks/shadowsocks.go +++ b/proxy/shadowsocks/shadowsocks.go @@ -16,11 +16,12 @@ import ( ) type Shadowsocks struct { - space app.Space - config *Config - port v2net.Port - accepting bool - tcpListener *hub.TCPHub + space app.Space + config *Config + port v2net.Port + accepting bool + tcpHub *hub.TCPHub + udpHub *hub.UDPHub } func (this *Shadowsocks) Port() v2net.Port { @@ -29,8 +30,8 @@ func (this *Shadowsocks) Port() v2net.Port { func (this *Shadowsocks) Close() { this.accepting = false - this.tcpListener.Close() - this.tcpListener = nil + this.tcpHub.Close() + this.tcpHub = nil } func (this *Shadowsocks) Listen(port v2net.Port) error { @@ -41,17 +42,82 @@ func (this *Shadowsocks) Listen(port v2net.Port) error { return proxy.ErrorAlreadyListening } } + this.accepting = true - tcpListener, err := hub.ListenTCP(port, this.handleConnection) + tcpHub, err := hub.ListenTCP(port, this.handleConnection) if err != nil { - log.Error("Shadowsocks: Failed to listen on port ", port, ": ", err) + log.Error("Shadowsocks: Failed to listen TCP on port ", port, ": ", err) return err } - this.tcpListener = tcpListener - this.accepting = true + this.tcpHub = tcpHub + + if this.config.UDP { + udpHub, err := hub.ListenUDP(port, this.handlerUDPPayload) + if err != nil { + log.Error("Shadowsocks: Failed to listen UDP on port ", port, ": ", err) + } + this.udpHub = udpHub + } + return nil } +func (this *Shadowsocks) handlerUDPPayload(payload *alloc.Buffer, dest v2net.Destination) { + defer payload.Release() + + iv := payload.Value[:this.config.Cipher.IVSize()] + key := this.config.Key + payload.SliceFrom(this.config.Cipher.IVSize()) + + reader, err := this.config.Cipher.NewDecodingStream(key, iv, payload) + if err != nil { + log.Error("Shadowsocks: Failed to create decoding stream: ", err) + return + } + + request, err := ReadRequest(reader) + if err != nil { + return + } + + buffer, _ := v2net.ReadFrom(reader, nil) + + packet := v2net.NewPacket(v2net.TCPDestination(request.Address, request.Port), buffer, false) + ray := this.space.PacketDispatcher().DispatchToOutbound(packet) + close(ray.InboundInput()) + + for respChunk := range ray.InboundOutput() { + + response := alloc.NewBuffer().Slice(0, this.config.Cipher.IVSize()) + rand.Read(response.Value) + + writer, err := this.config.Cipher.NewEncodingStream(key, response.Value, response) + if err != nil { + log.Error("Shadowsocks: Failed to create encoding stream: ", err) + return + } + + switch { + case request.Address.IsIPv4(): + writer.Write([]byte{AddrTypeIPv4}) + writer.Write(request.Address.IP()) + case request.Address.IsIPv6(): + writer.Write([]byte{AddrTypeIPv6}) + writer.Write(request.Address.IP()) + case request.Address.IsDomain(): + writer.Write([]byte{AddrTypeDomain, byte(len(request.Address.Domain()))}) + writer.Write([]byte(request.Address.Domain())) + } + + writer.Write(request.Port.Bytes()) + writer.Write(respChunk.Value) + respChunk.Release() + + this.udpHub.WriteTo(response.Value, dest) + response.Release() + } +} + func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) { defer conn.Close() @@ -81,22 +147,17 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) { packet := v2net.NewPacket(v2net.TCPDestination(request.Address, request.Port), nil, true) ray := this.space.PacketDispatcher().DispatchToOutbound(packet) - respIv := make([]byte, this.config.Cipher.IVSize()) - rand.Read(respIv) - - writer, err := this.config.Cipher.NewEncodingStream(key, respIv, conn) - if err != nil { - log.Error("Shadowsocks: Failed to create encoding stream: ", err) - return - } - var writeFinish sync.Mutex writeFinish.Lock() go func() { - firstChunk := alloc.NewBuffer().Clear() + firstChunk := alloc.NewBuffer().Slice(0, this.config.Cipher.IVSize()) defer firstChunk.Release() - firstChunk.Append(respIv) + writer, err := this.config.Cipher.NewEncodingStream(key, firstChunk.Value, conn) + if err != nil { + log.Error("Shadowsocks: Failed to create encoding stream: ", err) + return + } if payload, ok := <-ray.InboundOutput(); ok { firstChunk.Append(payload.Value)