diff --git a/testing/scenarios/data/test_4_client.json b/testing/scenarios/data/test_4_client.json index 6c6d5878..311f0e44 100644 --- a/testing/scenarios/data/test_4_client.json +++ b/testing/scenarios/data/test_4_client.json @@ -1,7 +1,7 @@ { "port": 50030, "log": { - "loglevel": "info" + "loglevel": "debug" }, "inbound": { "listen": "127.0.0.1", @@ -15,6 +15,9 @@ }, "outbound": { "protocol": "vmess", + "streamSettings": { + "network": "kcp" + }, "settings": { "vnext": [ { diff --git a/testing/scenarios/data/test_4_server.json b/testing/scenarios/data/test_4_server.json index 3d0fc550..015d5d66 100644 --- a/testing/scenarios/data/test_4_server.json +++ b/testing/scenarios/data/test_4_server.json @@ -1,11 +1,14 @@ { "port": 50031, "log": { - "loglevel": "info" + "loglevel": "debug" }, "inbound": { "listen": "127.0.0.1", "protocol": "vmess", + "streamSettings": { + "network": "kcp" + }, "settings": { "clients": [ { @@ -32,6 +35,9 @@ "port": "50035-50039", "tag": "detour", "settings": {}, + "streamSettings": { + "network": "kcp" + }, "allocate": { "strategy": "random", "concurrency": 2, diff --git a/transport/internet/kcp/config.go b/transport/internet/kcp/config.go index 8f2055c3..ef9d1e46 100644 --- a/transport/internet/kcp/config.go +++ b/transport/internet/kcp/config.go @@ -1,47 +1,10 @@ package kcp -/*AdvancedConfig define behavior of KCP in detail - -MaximumTransmissionUnit: -Largest protocol data unit that the layer can pass onwards -can be discovered by running tracepath - -SendingWindowSize , ReceivingWindowSize: -the size of Tx/Rx window, by packet - -ForwardErrorCorrectionGroupSize: -The the size of packet to generate a Forward Error Correction -packet, this is used to counteract packet loss. - -AcknowledgeNoDelay: -Do not wait a certain of time before sending the ACK packet, -increase bandwich cost and might increase performance - -Dscp: -Differentiated services code point, -be used to reconized to discriminate packet. -It is recommanded to keep it 0 to avoid being detected. - -ReadTimeout,WriteTimeout: -Close the Socket if it have been silent for too long, -Small value can recycle zombie socket faster but -can cause v2ray to kill the proxy connection it is relaying, -Higher value can prevent server from closing zombie socket and -waste resources. -*/ - -/*Config define basic behavior of KCP -Mode: -can be one of these values: -fast3,fast2,fast,normal -<<<<<<- less delay -->>>>>> less bandwich wasted -*/ type Config struct { - Mtu int - Sndwnd int - Rcvwnd int - Acknodelay bool + Mtu int // Maximum transmission unit + Sndwnd int // Sending window size + Rcvwnd int // Receiving window size + Acknodelay bool // Acknoledge without delay } func (this *Config) Apply() { diff --git a/transport/internet/kcp/connection.go b/transport/internet/kcp/connection.go index 6de1b687..c62d35ee 100644 --- a/transport/internet/kcp/connection.go +++ b/transport/internet/kcp/connection.go @@ -70,7 +70,6 @@ type UDPSession struct { wd time.Time // write deadline chReadEvent chan struct{} chWriteEvent chan struct{} - ackNoDelay bool writer io.WriteCloser since int64 } @@ -88,7 +87,6 @@ func newUDPSession(conv uint32, writerCloser io.WriteCloser, local *net.UDPAddr, mtu := uint32(effectiveConfig.Mtu - block.HeaderSize() - headerSize) sess.kcp = NewKCP(conv, mtu, func(buf []byte, size int) { - log.Info(sess.local, " kcp output: ", buf[:size]) if size >= IKCP_OVERHEAD { ext := alloc.NewBuffer().Clear().Append(buf[:size]) cmd := cmdData @@ -102,12 +100,10 @@ func newUDPSession(conv uint32, writerCloser io.WriteCloser, local *net.UDPAddr, }) sess.kcp.WndSize(effectiveConfig.Sndwnd, effectiveConfig.Rcvwnd) sess.kcp.NoDelay(1, 20, 2, 1) - sess.ackNoDelay = effectiveConfig.Acknodelay sess.kcp.current = sess.Elapsed() go sess.updateTask() - log.Info("Created KCP conn to ", sess.RemoteAddr()) return sess } @@ -158,7 +154,6 @@ func (s *UDPSession) Read(b []byte) (int, error) { // Write implements the Conn Write method. func (s *UDPSession) Write(b []byte) (int, error) { - log.Info("Trying to write ", len(b), " bytes. ", s.local) if s.state == ConnStateReadyToClose || s.state == ConnStatePeerClosed || s.state == ConnStateClosed { @@ -166,44 +161,28 @@ func (s *UDPSession) Write(b []byte) (int, error) { } for { - s.Lock() if s.state == ConnStateReadyToClose || s.state == ConnStatePeerClosed || s.state == ConnStateClosed { - s.Unlock() return 0, io.ErrClosedPipe } - if !s.wd.IsZero() { - if time.Now().After(s.wd) { // timeout - s.Unlock() - return 0, errTimeout - } - } - + s.kcpAccess.Lock() if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) { nBytes := len(b) - log.Info("Writing ", nBytes, " bytes.", s.local) s.kcp.Send(b) s.kcp.current = s.Elapsed() s.kcp.flush() - s.Unlock() + s.kcpAccess.Unlock() return nBytes, nil } + s.kcpAccess.Unlock() - var timeout <-chan time.Time - if !s.wd.IsZero() { - delay := s.wd.Sub(time.Now()) - timeout = time.After(delay) - } - s.Unlock() - - // wait for write event or timeout - select { - case <-s.chWriteEvent: - case <-timeout: + if !s.wd.IsZero() && s.wd.Before(time.Now()) { return 0, errTimeout } + + time.Sleep(time.Duration(s.kcp.WaitSnd()*5) * time.Millisecond) } } @@ -213,6 +192,9 @@ func (this *UDPSession) Terminate() { } this.Lock() defer this.Unlock() + if this.state == ConnStateClosed { + return + } this.state = ConnStateClosed this.writer.Close() @@ -223,7 +205,7 @@ func (this *UDPSession) NotifyTermination() { this.Lock() if this.state == ConnStateClosed { this.Unlock() - return + break } buffer := alloc.NewSmallBuffer().Clear() buffer.AppendBytes(byte(CommandTerminate), byte(OptionClose), byte(0), byte(0), byte(0), byte(0)) @@ -236,7 +218,7 @@ func (this *UDPSession) NotifyTermination() { // Close closes the connection. func (s *UDPSession) Close() error { - log.Info("Closed ", s.local) + log.Debug("KCP|Connection: Closing connection to ", s.remote) s.Lock() defer s.Unlock() @@ -300,31 +282,11 @@ func (s *UDPSession) output(payload *alloc.Buffer) { // kcp update, input loop func (s *UDPSession) updateTask() { - ticker := time.NewTicker(20 * time.Millisecond) - defer ticker.Stop() - - var nextupdate uint32 = 0 - for range ticker.C { - s.Lock() - if s.state == ConnStateClosed { - s.Unlock() - return - } + for s.state != ConnStateClosed { current := s.Elapsed() - if !s.needUpdate && nextupdate == 0 { - nextupdate = s.kcp.Check(current) - } - current = s.Elapsed() - if s.needUpdate || current >= nextupdate { - log.Info("Updating KCP: ", current, " addr ", s.LocalAddr()) - s.kcp.Update(current) - nextupdate = s.kcp.Check(current) - s.needUpdate = false - } - if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) { - s.notifyWriteEvent() - } - s.Unlock() + s.kcp.Update(current) + interval := s.kcp.Check(s.Elapsed()) + time.Sleep(time.Duration(interval) * time.Millisecond) } } @@ -367,16 +329,8 @@ func (s *UDPSession) kcpInput(data []byte) { } s.kcpAccess.Lock() s.kcp.current = s.Elapsed() - log.Info(s.local, " kcp input: ", data[2:]) - ret := s.kcp.Input(data[2:]) - log.Info("kcp input returns ", ret) + s.kcp.Input(data[2:]) - if s.ackNoDelay { - s.kcp.current = s.Elapsed() - s.kcp.flush() - } else { - s.needUpdate = true - } s.kcpAccess.Unlock() s.notifyReadEvent() } @@ -391,8 +345,9 @@ func (this *UDPSession) FetchInputFrom(conn net.Conn) { } payload.Slice(0, nBytes) if this.block.Open(payload) { - log.Info("Client fetching ", payload.Len(), " bytes.") this.kcpInput(payload.Value) + } else { + log.Info("KCP|Connection: Invalid response from ", conn.RemoteAddr()) } payload.Release() } diff --git a/transport/internet/kcp/listener.go b/transport/internet/kcp/listener.go index 386727c8..497a1e7f 100644 --- a/transport/internet/kcp/listener.go +++ b/transport/internet/kcp/listener.go @@ -25,7 +25,6 @@ type Listener struct { } func NewListener(address v2net.Address, port v2net.Port) (*Listener, error) { - log.Info("Creating listener on ", address, ":", port) l := &Listener{ block: NewSimpleAuthenticator(), sessions: make(map[string]*UDPSession), @@ -41,16 +40,15 @@ func NewListener(address v2net.Address, port v2net.Port) (*Listener, error) { return nil, err } l.hub = hub - log.Info("Listener created.") + log.Info("KCP|Listener: listening on ", address, ":", port) return l, nil } func (this *Listener) OnReceive(payload *alloc.Buffer, src v2net.Destination) { - log.Info("Listener on receive from ", src) defer payload.Release() if valid := this.block.Open(payload); !valid { - log.Info("Listern discarding invalid payload.") + log.Info("KCP|Listener: discarding invalid payload from ", src) return } if !this.running { @@ -74,7 +72,6 @@ func (this *Listener) OnReceive(payload *alloc.Buffer, src v2net.Destination) { IP: src.Address().IP(), Port: int(src.Port()), } - log.Info("Listener creating new connection.") conn = newUDPSession(conv, writer, this.localAddr, srcAddr, this.block) select { case this.awaitingConns <- conn: @@ -107,7 +104,6 @@ func (this *Listener) Accept() (internet.Connection, error) { } select { case conn := <-this.awaitingConns: - log.Info("Accepting connection from ", conn.RemoteAddr()) return conn, nil case <-time.After(time.Second): @@ -142,7 +138,6 @@ type Writer struct { } func (this *Writer) Write(payload []byte) (int, error) { - log.Info("Writer writing to ", this.dest, " with ", len(payload), " bytes.") return this.hub.WriteTo(payload, this.dest) }