From 233b73d1db330e121510183bc8853ba754db6162 Mon Sep 17 00:00:00 2001
From: v2ray <admin@v2ray.com>
Date: Fri, 17 Jun 2016 17:38:42 +0200
Subject: [PATCH] improve kcp performance

---
 testing/scenarios/data/test_4_client.json |  5 +-
 testing/scenarios/data/test_4_server.json |  8 ++-
 transport/internet/kcp/config.go          | 45 ++-----------
 transport/internet/kcp/connection.go      | 81 +++++------------------
 transport/internet/kcp/listener.go        |  9 +--
 5 files changed, 35 insertions(+), 113 deletions(-)

diff --git a/testing/scenarios/data/test_4_client.json b/testing/scenarios/data/test_4_client.json
index 6c6d5878..311f0e44 100644
--- a/testing/scenarios/data/test_4_client.json
+++ b/testing/scenarios/data/test_4_client.json
@@ -1,7 +1,7 @@
 {
   "port": 50030,
   "log": {
-    "loglevel": "info"
+    "loglevel": "debug"
   },
   "inbound": {
     "listen": "127.0.0.1",
@@ -15,6 +15,9 @@
   },
   "outbound": {
     "protocol": "vmess",
+    "streamSettings": {
+      "network": "kcp"
+    },
     "settings": {
       "vnext": [
         {
diff --git a/testing/scenarios/data/test_4_server.json b/testing/scenarios/data/test_4_server.json
index 3d0fc550..015d5d66 100644
--- a/testing/scenarios/data/test_4_server.json
+++ b/testing/scenarios/data/test_4_server.json
@@ -1,11 +1,14 @@
 {
   "port": 50031,
   "log": {
-    "loglevel": "info"
+    "loglevel": "debug"
   },
   "inbound": {
     "listen": "127.0.0.1",
     "protocol": "vmess",
+    "streamSettings": {
+      "network": "kcp"
+    },
     "settings": {
       "clients": [
         {
@@ -32,6 +35,9 @@
       "port": "50035-50039",
       "tag": "detour",
       "settings": {},
+      "streamSettings": {
+      "network": "kcp"
+    },
       "allocate": {
         "strategy": "random",
         "concurrency": 2,
diff --git a/transport/internet/kcp/config.go b/transport/internet/kcp/config.go
index 8f2055c3..ef9d1e46 100644
--- a/transport/internet/kcp/config.go
+++ b/transport/internet/kcp/config.go
@@ -1,47 +1,10 @@
 package kcp
 
-/*AdvancedConfig define behavior of KCP in detail
-
-MaximumTransmissionUnit:
-Largest protocol data unit that the layer can pass onwards
-can be discovered by running tracepath
-
-SendingWindowSize , ReceivingWindowSize:
-the size of Tx/Rx window, by packet
-
-ForwardErrorCorrectionGroupSize:
-The the size of packet to generate a Forward Error Correction
-packet, this is used to counteract packet loss.
-
-AcknowledgeNoDelay:
-Do not wait a certain of time before sending the ACK packet,
-increase bandwich cost and might increase performance
-
-Dscp:
-Differentiated services code point,
-be used to reconized to discriminate packet.
-It is recommanded to keep it 0 to avoid being detected.
-
-ReadTimeout,WriteTimeout:
-Close the Socket if it have been silent for too long,
-Small value can recycle zombie socket faster but
-can cause v2ray to kill the proxy connection it is relaying,
-Higher value can prevent server from closing zombie socket and
-waste resources.
-*/
-
-/*Config define basic behavior of KCP
-Mode:
-can be one of these values:
-fast3,fast2,fast,normal
-<<<<<<- less delay
-->>>>>> less bandwich wasted
-*/
 type Config struct {
-	Mtu        int
-	Sndwnd     int
-	Rcvwnd     int
-	Acknodelay bool
+	Mtu        int  // Maximum transmission unit
+	Sndwnd     int  // Sending window size
+	Rcvwnd     int  // Receiving window size
+	Acknodelay bool // Acknoledge without delay
 }
 
 func (this *Config) Apply() {
diff --git a/transport/internet/kcp/connection.go b/transport/internet/kcp/connection.go
index 6de1b687..c62d35ee 100644
--- a/transport/internet/kcp/connection.go
+++ b/transport/internet/kcp/connection.go
@@ -70,7 +70,6 @@ type UDPSession struct {
 	wd            time.Time // write deadline
 	chReadEvent   chan struct{}
 	chWriteEvent  chan struct{}
-	ackNoDelay    bool
 	writer        io.WriteCloser
 	since         int64
 }
@@ -88,7 +87,6 @@ func newUDPSession(conv uint32, writerCloser io.WriteCloser, local *net.UDPAddr,
 
 	mtu := uint32(effectiveConfig.Mtu - block.HeaderSize() - headerSize)
 	sess.kcp = NewKCP(conv, mtu, func(buf []byte, size int) {
-		log.Info(sess.local, " kcp output: ", buf[:size])
 		if size >= IKCP_OVERHEAD {
 			ext := alloc.NewBuffer().Clear().Append(buf[:size])
 			cmd := cmdData
@@ -102,12 +100,10 @@ func newUDPSession(conv uint32, writerCloser io.WriteCloser, local *net.UDPAddr,
 	})
 	sess.kcp.WndSize(effectiveConfig.Sndwnd, effectiveConfig.Rcvwnd)
 	sess.kcp.NoDelay(1, 20, 2, 1)
-	sess.ackNoDelay = effectiveConfig.Acknodelay
 	sess.kcp.current = sess.Elapsed()
 
 	go sess.updateTask()
 
-	log.Info("Created KCP conn to ", sess.RemoteAddr())
 	return sess
 }
 
@@ -158,7 +154,6 @@ func (s *UDPSession) Read(b []byte) (int, error) {
 
 // Write implements the Conn Write method.
 func (s *UDPSession) Write(b []byte) (int, error) {
-	log.Info("Trying to write ", len(b), " bytes. ", s.local)
 	if s.state == ConnStateReadyToClose ||
 		s.state == ConnStatePeerClosed ||
 		s.state == ConnStateClosed {
@@ -166,44 +161,28 @@ func (s *UDPSession) Write(b []byte) (int, error) {
 	}
 
 	for {
-		s.Lock()
 		if s.state == ConnStateReadyToClose ||
 			s.state == ConnStatePeerClosed ||
 			s.state == ConnStateClosed {
-			s.Unlock()
 			return 0, io.ErrClosedPipe
 		}
 
-		if !s.wd.IsZero() {
-			if time.Now().After(s.wd) { // timeout
-				s.Unlock()
-				return 0, errTimeout
-			}
-		}
-
+		s.kcpAccess.Lock()
 		if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
 			nBytes := len(b)
-			log.Info("Writing ", nBytes, " bytes.", s.local)
 			s.kcp.Send(b)
 			s.kcp.current = s.Elapsed()
 			s.kcp.flush()
-			s.Unlock()
+			s.kcpAccess.Unlock()
 			return nBytes, nil
 		}
+		s.kcpAccess.Unlock()
 
-		var timeout <-chan time.Time
-		if !s.wd.IsZero() {
-			delay := s.wd.Sub(time.Now())
-			timeout = time.After(delay)
-		}
-		s.Unlock()
-
-		// wait for write event or timeout
-		select {
-		case <-s.chWriteEvent:
-		case <-timeout:
+		if !s.wd.IsZero() && s.wd.Before(time.Now()) {
 			return 0, errTimeout
 		}
+
+		time.Sleep(time.Duration(s.kcp.WaitSnd()*5) * time.Millisecond)
 	}
 }
 
@@ -213,6 +192,9 @@ func (this *UDPSession) Terminate() {
 	}
 	this.Lock()
 	defer this.Unlock()
+	if this.state == ConnStateClosed {
+		return
+	}
 
 	this.state = ConnStateClosed
 	this.writer.Close()
@@ -223,7 +205,7 @@ func (this *UDPSession) NotifyTermination() {
 		this.Lock()
 		if this.state == ConnStateClosed {
 			this.Unlock()
-			return
+			break
 		}
 		buffer := alloc.NewSmallBuffer().Clear()
 		buffer.AppendBytes(byte(CommandTerminate), byte(OptionClose), byte(0), byte(0), byte(0), byte(0))
@@ -236,7 +218,7 @@ func (this *UDPSession) NotifyTermination() {
 
 // Close closes the connection.
 func (s *UDPSession) Close() error {
-	log.Info("Closed ", s.local)
+	log.Debug("KCP|Connection: Closing connection to ", s.remote)
 	s.Lock()
 	defer s.Unlock()
 
@@ -300,31 +282,11 @@ func (s *UDPSession) output(payload *alloc.Buffer) {
 
 // kcp update, input loop
 func (s *UDPSession) updateTask() {
-	ticker := time.NewTicker(20 * time.Millisecond)
-	defer ticker.Stop()
-
-	var nextupdate uint32 = 0
-	for range ticker.C {
-		s.Lock()
-		if s.state == ConnStateClosed {
-			s.Unlock()
-			return
-		}
+	for s.state != ConnStateClosed {
 		current := s.Elapsed()
-		if !s.needUpdate && nextupdate == 0 {
-			nextupdate = s.kcp.Check(current)
-		}
-		current = s.Elapsed()
-		if s.needUpdate || current >= nextupdate {
-			log.Info("Updating KCP: ", current, " addr ", s.LocalAddr())
-			s.kcp.Update(current)
-			nextupdate = s.kcp.Check(current)
-			s.needUpdate = false
-		}
-		if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
-			s.notifyWriteEvent()
-		}
-		s.Unlock()
+		s.kcp.Update(current)
+		interval := s.kcp.Check(s.Elapsed())
+		time.Sleep(time.Duration(interval) * time.Millisecond)
 	}
 }
 
@@ -367,16 +329,8 @@ func (s *UDPSession) kcpInput(data []byte) {
 	}
 	s.kcpAccess.Lock()
 	s.kcp.current = s.Elapsed()
-	log.Info(s.local, " kcp input: ", data[2:])
-	ret := s.kcp.Input(data[2:])
-	log.Info("kcp input returns ", ret)
+	s.kcp.Input(data[2:])
 
-	if s.ackNoDelay {
-		s.kcp.current = s.Elapsed()
-		s.kcp.flush()
-	} else {
-		s.needUpdate = true
-	}
 	s.kcpAccess.Unlock()
 	s.notifyReadEvent()
 }
@@ -391,8 +345,9 @@ func (this *UDPSession) FetchInputFrom(conn net.Conn) {
 			}
 			payload.Slice(0, nBytes)
 			if this.block.Open(payload) {
-				log.Info("Client fetching ", payload.Len(), " bytes.")
 				this.kcpInput(payload.Value)
+			} else {
+				log.Info("KCP|Connection: Invalid response from ", conn.RemoteAddr())
 			}
 			payload.Release()
 		}
diff --git a/transport/internet/kcp/listener.go b/transport/internet/kcp/listener.go
index 386727c8..497a1e7f 100644
--- a/transport/internet/kcp/listener.go
+++ b/transport/internet/kcp/listener.go
@@ -25,7 +25,6 @@ type Listener struct {
 }
 
 func NewListener(address v2net.Address, port v2net.Port) (*Listener, error) {
-	log.Info("Creating listener on ", address, ":", port)
 	l := &Listener{
 		block:         NewSimpleAuthenticator(),
 		sessions:      make(map[string]*UDPSession),
@@ -41,16 +40,15 @@ func NewListener(address v2net.Address, port v2net.Port) (*Listener, error) {
 		return nil, err
 	}
 	l.hub = hub
-	log.Info("Listener created.")
+	log.Info("KCP|Listener: listening on ", address, ":", port)
 	return l, nil
 }
 
 func (this *Listener) OnReceive(payload *alloc.Buffer, src v2net.Destination) {
-	log.Info("Listener on receive from ", src)
 	defer payload.Release()
 
 	if valid := this.block.Open(payload); !valid {
-		log.Info("Listern discarding invalid payload.")
+		log.Info("KCP|Listener: discarding invalid payload from ", src)
 		return
 	}
 	if !this.running {
@@ -74,7 +72,6 @@ func (this *Listener) OnReceive(payload *alloc.Buffer, src v2net.Destination) {
 			IP:   src.Address().IP(),
 			Port: int(src.Port()),
 		}
-		log.Info("Listener creating new connection.")
 		conn = newUDPSession(conv, writer, this.localAddr, srcAddr, this.block)
 		select {
 		case this.awaitingConns <- conn:
@@ -107,7 +104,6 @@ func (this *Listener) Accept() (internet.Connection, error) {
 		}
 		select {
 		case conn := <-this.awaitingConns:
-			log.Info("Accepting connection from ", conn.RemoteAddr())
 			return conn, nil
 		case <-time.After(time.Second):
 
@@ -142,7 +138,6 @@ type Writer struct {
 }
 
 func (this *Writer) Write(payload []byte) (int, error) {
-	log.Info("Writer writing to ", this.dest, " with ", len(payload), " bytes.")
 	return this.hub.WriteTo(payload, this.dest)
 }