improve kcp performance

pull/215/head
v2ray 2016-06-17 17:38:42 +02:00
parent f5d613f10a
commit 233b73d1db
5 changed files with 35 additions and 113 deletions

View File

@ -1,7 +1,7 @@
{ {
"port": 50030, "port": 50030,
"log": { "log": {
"loglevel": "info" "loglevel": "debug"
}, },
"inbound": { "inbound": {
"listen": "127.0.0.1", "listen": "127.0.0.1",
@ -15,6 +15,9 @@
}, },
"outbound": { "outbound": {
"protocol": "vmess", "protocol": "vmess",
"streamSettings": {
"network": "kcp"
},
"settings": { "settings": {
"vnext": [ "vnext": [
{ {

View File

@ -1,11 +1,14 @@
{ {
"port": 50031, "port": 50031,
"log": { "log": {
"loglevel": "info" "loglevel": "debug"
}, },
"inbound": { "inbound": {
"listen": "127.0.0.1", "listen": "127.0.0.1",
"protocol": "vmess", "protocol": "vmess",
"streamSettings": {
"network": "kcp"
},
"settings": { "settings": {
"clients": [ "clients": [
{ {
@ -32,6 +35,9 @@
"port": "50035-50039", "port": "50035-50039",
"tag": "detour", "tag": "detour",
"settings": {}, "settings": {},
"streamSettings": {
"network": "kcp"
},
"allocate": { "allocate": {
"strategy": "random", "strategy": "random",
"concurrency": 2, "concurrency": 2,

View File

@ -1,47 +1,10 @@
package kcp package kcp
/*AdvancedConfig define behavior of KCP in detail
MaximumTransmissionUnit:
Largest protocol data unit that the layer can pass onwards
can be discovered by running tracepath
SendingWindowSize , ReceivingWindowSize:
the size of Tx/Rx window, by packet
ForwardErrorCorrectionGroupSize:
The the size of packet to generate a Forward Error Correction
packet, this is used to counteract packet loss.
AcknowledgeNoDelay:
Do not wait a certain of time before sending the ACK packet,
increase bandwich cost and might increase performance
Dscp:
Differentiated services code point,
be used to reconized to discriminate packet.
It is recommanded to keep it 0 to avoid being detected.
ReadTimeout,WriteTimeout:
Close the Socket if it have been silent for too long,
Small value can recycle zombie socket faster but
can cause v2ray to kill the proxy connection it is relaying,
Higher value can prevent server from closing zombie socket and
waste resources.
*/
/*Config define basic behavior of KCP
Mode:
can be one of these values:
fast3,fast2,fast,normal
<<<<<<- less delay
->>>>>> less bandwich wasted
*/
type Config struct { type Config struct {
Mtu int Mtu int // Maximum transmission unit
Sndwnd int Sndwnd int // Sending window size
Rcvwnd int Rcvwnd int // Receiving window size
Acknodelay bool Acknodelay bool // Acknoledge without delay
} }
func (this *Config) Apply() { func (this *Config) Apply() {

View File

@ -70,7 +70,6 @@ type UDPSession struct {
wd time.Time // write deadline wd time.Time // write deadline
chReadEvent chan struct{} chReadEvent chan struct{}
chWriteEvent chan struct{} chWriteEvent chan struct{}
ackNoDelay bool
writer io.WriteCloser writer io.WriteCloser
since int64 since int64
} }
@ -88,7 +87,6 @@ func newUDPSession(conv uint32, writerCloser io.WriteCloser, local *net.UDPAddr,
mtu := uint32(effectiveConfig.Mtu - block.HeaderSize() - headerSize) mtu := uint32(effectiveConfig.Mtu - block.HeaderSize() - headerSize)
sess.kcp = NewKCP(conv, mtu, func(buf []byte, size int) { sess.kcp = NewKCP(conv, mtu, func(buf []byte, size int) {
log.Info(sess.local, " kcp output: ", buf[:size])
if size >= IKCP_OVERHEAD { if size >= IKCP_OVERHEAD {
ext := alloc.NewBuffer().Clear().Append(buf[:size]) ext := alloc.NewBuffer().Clear().Append(buf[:size])
cmd := cmdData cmd := cmdData
@ -102,12 +100,10 @@ func newUDPSession(conv uint32, writerCloser io.WriteCloser, local *net.UDPAddr,
}) })
sess.kcp.WndSize(effectiveConfig.Sndwnd, effectiveConfig.Rcvwnd) sess.kcp.WndSize(effectiveConfig.Sndwnd, effectiveConfig.Rcvwnd)
sess.kcp.NoDelay(1, 20, 2, 1) sess.kcp.NoDelay(1, 20, 2, 1)
sess.ackNoDelay = effectiveConfig.Acknodelay
sess.kcp.current = sess.Elapsed() sess.kcp.current = sess.Elapsed()
go sess.updateTask() go sess.updateTask()
log.Info("Created KCP conn to ", sess.RemoteAddr())
return sess return sess
} }
@ -158,7 +154,6 @@ func (s *UDPSession) Read(b []byte) (int, error) {
// Write implements the Conn Write method. // Write implements the Conn Write method.
func (s *UDPSession) Write(b []byte) (int, error) { func (s *UDPSession) Write(b []byte) (int, error) {
log.Info("Trying to write ", len(b), " bytes. ", s.local)
if s.state == ConnStateReadyToClose || if s.state == ConnStateReadyToClose ||
s.state == ConnStatePeerClosed || s.state == ConnStatePeerClosed ||
s.state == ConnStateClosed { s.state == ConnStateClosed {
@ -166,44 +161,28 @@ func (s *UDPSession) Write(b []byte) (int, error) {
} }
for { for {
s.Lock()
if s.state == ConnStateReadyToClose || if s.state == ConnStateReadyToClose ||
s.state == ConnStatePeerClosed || s.state == ConnStatePeerClosed ||
s.state == ConnStateClosed { s.state == ConnStateClosed {
s.Unlock()
return 0, io.ErrClosedPipe return 0, io.ErrClosedPipe
} }
if !s.wd.IsZero() { s.kcpAccess.Lock()
if time.Now().After(s.wd) { // timeout
s.Unlock()
return 0, errTimeout
}
}
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) { if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
nBytes := len(b) nBytes := len(b)
log.Info("Writing ", nBytes, " bytes.", s.local)
s.kcp.Send(b) s.kcp.Send(b)
s.kcp.current = s.Elapsed() s.kcp.current = s.Elapsed()
s.kcp.flush() s.kcp.flush()
s.Unlock() s.kcpAccess.Unlock()
return nBytes, nil return nBytes, nil
} }
s.kcpAccess.Unlock()
var timeout <-chan time.Time if !s.wd.IsZero() && s.wd.Before(time.Now()) {
if !s.wd.IsZero() {
delay := s.wd.Sub(time.Now())
timeout = time.After(delay)
}
s.Unlock()
// wait for write event or timeout
select {
case <-s.chWriteEvent:
case <-timeout:
return 0, errTimeout return 0, errTimeout
} }
time.Sleep(time.Duration(s.kcp.WaitSnd()*5) * time.Millisecond)
} }
} }
@ -213,6 +192,9 @@ func (this *UDPSession) Terminate() {
} }
this.Lock() this.Lock()
defer this.Unlock() defer this.Unlock()
if this.state == ConnStateClosed {
return
}
this.state = ConnStateClosed this.state = ConnStateClosed
this.writer.Close() this.writer.Close()
@ -223,7 +205,7 @@ func (this *UDPSession) NotifyTermination() {
this.Lock() this.Lock()
if this.state == ConnStateClosed { if this.state == ConnStateClosed {
this.Unlock() this.Unlock()
return break
} }
buffer := alloc.NewSmallBuffer().Clear() buffer := alloc.NewSmallBuffer().Clear()
buffer.AppendBytes(byte(CommandTerminate), byte(OptionClose), byte(0), byte(0), byte(0), byte(0)) buffer.AppendBytes(byte(CommandTerminate), byte(OptionClose), byte(0), byte(0), byte(0), byte(0))
@ -236,7 +218,7 @@ func (this *UDPSession) NotifyTermination() {
// Close closes the connection. // Close closes the connection.
func (s *UDPSession) Close() error { func (s *UDPSession) Close() error {
log.Info("Closed ", s.local) log.Debug("KCP|Connection: Closing connection to ", s.remote)
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -300,31 +282,11 @@ func (s *UDPSession) output(payload *alloc.Buffer) {
// kcp update, input loop // kcp update, input loop
func (s *UDPSession) updateTask() { func (s *UDPSession) updateTask() {
ticker := time.NewTicker(20 * time.Millisecond) for s.state != ConnStateClosed {
defer ticker.Stop()
var nextupdate uint32 = 0
for range ticker.C {
s.Lock()
if s.state == ConnStateClosed {
s.Unlock()
return
}
current := s.Elapsed() current := s.Elapsed()
if !s.needUpdate && nextupdate == 0 {
nextupdate = s.kcp.Check(current)
}
current = s.Elapsed()
if s.needUpdate || current >= nextupdate {
log.Info("Updating KCP: ", current, " addr ", s.LocalAddr())
s.kcp.Update(current) s.kcp.Update(current)
nextupdate = s.kcp.Check(current) interval := s.kcp.Check(s.Elapsed())
s.needUpdate = false time.Sleep(time.Duration(interval) * time.Millisecond)
}
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
s.notifyWriteEvent()
}
s.Unlock()
} }
} }
@ -367,16 +329,8 @@ func (s *UDPSession) kcpInput(data []byte) {
} }
s.kcpAccess.Lock() s.kcpAccess.Lock()
s.kcp.current = s.Elapsed() s.kcp.current = s.Elapsed()
log.Info(s.local, " kcp input: ", data[2:]) s.kcp.Input(data[2:])
ret := s.kcp.Input(data[2:])
log.Info("kcp input returns ", ret)
if s.ackNoDelay {
s.kcp.current = s.Elapsed()
s.kcp.flush()
} else {
s.needUpdate = true
}
s.kcpAccess.Unlock() s.kcpAccess.Unlock()
s.notifyReadEvent() s.notifyReadEvent()
} }
@ -391,8 +345,9 @@ func (this *UDPSession) FetchInputFrom(conn net.Conn) {
} }
payload.Slice(0, nBytes) payload.Slice(0, nBytes)
if this.block.Open(payload) { if this.block.Open(payload) {
log.Info("Client fetching ", payload.Len(), " bytes.")
this.kcpInput(payload.Value) this.kcpInput(payload.Value)
} else {
log.Info("KCP|Connection: Invalid response from ", conn.RemoteAddr())
} }
payload.Release() payload.Release()
} }

View File

@ -25,7 +25,6 @@ type Listener struct {
} }
func NewListener(address v2net.Address, port v2net.Port) (*Listener, error) { func NewListener(address v2net.Address, port v2net.Port) (*Listener, error) {
log.Info("Creating listener on ", address, ":", port)
l := &Listener{ l := &Listener{
block: NewSimpleAuthenticator(), block: NewSimpleAuthenticator(),
sessions: make(map[string]*UDPSession), sessions: make(map[string]*UDPSession),
@ -41,16 +40,15 @@ func NewListener(address v2net.Address, port v2net.Port) (*Listener, error) {
return nil, err return nil, err
} }
l.hub = hub l.hub = hub
log.Info("Listener created.") log.Info("KCP|Listener: listening on ", address, ":", port)
return l, nil return l, nil
} }
func (this *Listener) OnReceive(payload *alloc.Buffer, src v2net.Destination) { func (this *Listener) OnReceive(payload *alloc.Buffer, src v2net.Destination) {
log.Info("Listener on receive from ", src)
defer payload.Release() defer payload.Release()
if valid := this.block.Open(payload); !valid { if valid := this.block.Open(payload); !valid {
log.Info("Listern discarding invalid payload.") log.Info("KCP|Listener: discarding invalid payload from ", src)
return return
} }
if !this.running { if !this.running {
@ -74,7 +72,6 @@ func (this *Listener) OnReceive(payload *alloc.Buffer, src v2net.Destination) {
IP: src.Address().IP(), IP: src.Address().IP(),
Port: int(src.Port()), Port: int(src.Port()),
} }
log.Info("Listener creating new connection.")
conn = newUDPSession(conv, writer, this.localAddr, srcAddr, this.block) conn = newUDPSession(conv, writer, this.localAddr, srcAddr, this.block)
select { select {
case this.awaitingConns <- conn: case this.awaitingConns <- conn:
@ -107,7 +104,6 @@ func (this *Listener) Accept() (internet.Connection, error) {
} }
select { select {
case conn := <-this.awaitingConns: case conn := <-this.awaitingConns:
log.Info("Accepting connection from ", conn.RemoteAddr())
return conn, nil return conn, nil
case <-time.After(time.Second): case <-time.After(time.Second):
@ -142,7 +138,6 @@ type Writer struct {
} }
func (this *Writer) Write(payload []byte) (int, error) { func (this *Writer) Write(payload []byte) (int, error) {
log.Info("Writer writing to ", this.dest, " with ", len(payload), " bytes.")
return this.hub.WriteTo(payload, this.dest) return this.hub.WriteTo(payload, this.dest)
} }