diff --git a/transport/internet/kcp/connection.go b/transport/internet/kcp/connection.go index ff4cb8eb..69ff703f 100644 --- a/transport/internet/kcp/connection.go +++ b/transport/internet/kcp/connection.go @@ -19,16 +19,7 @@ var ( ) const ( - basePort = 20000 // minimum port for listening - maxPort = 65535 // maximum port for listening - defaultWndSize = 128 // default window size, in packet - mtuLimit = 4096 - rxQueueLimit = 8192 - rxFecLimit = 2048 - - headerSize = 2 - cmdData uint16 = 0 - cmdClose uint16 = 1 + headerSize = 2 ) type Command byte @@ -58,8 +49,8 @@ func nowMillisec() int64 { return now.Unix()*1000 + int64(now.Nanosecond()/1000000) } -// UDPSession defines a KCP session implemented by UDP -type UDPSession struct { +// Connection is a KCP connection over UDP. +type Connection struct { sync.RWMutex state ConnState kcp *KCP // the core ARQ @@ -74,108 +65,108 @@ type UDPSession struct { since int64 } -// newUDPSession create a new udp session for client or server -func newUDPSession(conv uint32, writerCloser io.WriteCloser, local *net.UDPAddr, remote *net.UDPAddr, block Authenticator) *UDPSession { - sess := new(UDPSession) - sess.local = local - sess.chReadEvent = make(chan struct{}, 1) - sess.remote = remote - sess.block = block - sess.writer = writerCloser - sess.since = nowMillisec() +// NewConnection create a new KCP connection between local and remote. +func NewConnection(conv uint32, writerCloser io.WriteCloser, local *net.UDPAddr, remote *net.UDPAddr, block Authenticator) *Connection { + conn := new(Connection) + conn.local = local + conn.chReadEvent = make(chan struct{}, 1) + conn.remote = remote + conn.block = block + conn.writer = writerCloser + conn.since = nowMillisec() mtu := uint32(effectiveConfig.Mtu - block.HeaderSize() - headerSize) - sess.kcp = NewKCP(conv, mtu, func(buf []byte, size int) { + conn.kcp = NewKCP(conv, mtu, func(buf []byte, size int) { if size >= IKCP_OVERHEAD { ext := alloc.NewBuffer().Clear().Append(buf[:size]) - cmd := cmdData + cmd := CommandData opt := Option(0) - if sess.state == ConnStateReadyToClose { + if conn.state == ConnStateReadyToClose { opt = OptionClose } ext.Prepend([]byte{byte(cmd), byte(opt)}) - go sess.output(ext) + go conn.output(ext) } - if sess.state == ConnStateReadyToClose && sess.kcp.WaitSnd() == 0 { - go sess.NotifyTermination() + if conn.state == ConnStateReadyToClose && conn.kcp.WaitSnd() == 0 { + go conn.NotifyTermination() } }) - sess.kcp.WndSize(effectiveConfig.Sndwnd, effectiveConfig.Rcvwnd) - sess.kcp.NoDelay(1, 20, 2, 1) - sess.kcp.current = sess.Elapsed() + conn.kcp.WndSize(effectiveConfig.Sndwnd, effectiveConfig.Rcvwnd) + conn.kcp.NoDelay(1, 20, 2, 1) + conn.kcp.current = conn.Elapsed() - go sess.updateTask() + go conn.updateTask() - return sess + return conn } -func (this *UDPSession) Elapsed() uint32 { +func (this *Connection) Elapsed() uint32 { return uint32(nowMillisec() - this.since) } // Read implements the Conn Read method. -func (s *UDPSession) Read(b []byte) (int, error) { - if s == nil || s.state == ConnStateReadyToClose || s.state == ConnStateClosed { +func (this *Connection) Read(b []byte) (int, error) { + if this == nil || this.state == ConnStateReadyToClose || this.state == ConnStateClosed { return 0, io.EOF } for { - s.RLock() - if s.state == ConnStateReadyToClose || s.state == ConnStateClosed { - s.RUnlock() + this.RLock() + if this.state == ConnStateReadyToClose || this.state == ConnStateClosed { + this.RUnlock() return 0, io.EOF } - if !s.rd.IsZero() && s.rd.Before(time.Now()) { - s.RUnlock() + if !this.rd.IsZero() && this.rd.Before(time.Now()) { + this.RUnlock() return 0, errTimeout } - s.RUnlock() + this.RUnlock() - s.kcpAccess.Lock() - nBytes := s.kcp.Recv(b) - s.kcpAccess.Unlock() + this.kcpAccess.Lock() + nBytes := this.kcp.Recv(b) + this.kcpAccess.Unlock() if nBytes > 0 { return nBytes, nil } select { - case <-s.chReadEvent: + case <-this.chReadEvent: case <-time.After(time.Second): } } } // Write implements the Conn Write method. -func (s *UDPSession) Write(b []byte) (int, error) { - if s == nil || - s.state == ConnStateReadyToClose || - s.state == ConnStatePeerClosed || - s.state == ConnStateClosed { +func (this *Connection) Write(b []byte) (int, error) { + if this == nil || + this.state == ConnStateReadyToClose || + this.state == ConnStatePeerClosed || + this.state == ConnStateClosed { return 0, io.ErrClosedPipe } for { - s.RLock() - if s.state == ConnStateReadyToClose || - s.state == ConnStatePeerClosed || - s.state == ConnStateClosed { - s.RUnlock() + this.RLock() + if this.state == ConnStateReadyToClose || + this.state == ConnStatePeerClosed || + this.state == ConnStateClosed { + this.RUnlock() return 0, io.ErrClosedPipe } - s.RUnlock() + this.RUnlock() - s.kcpAccess.Lock() - if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) { + this.kcpAccess.Lock() + if this.kcp.WaitSnd() < int(this.kcp.snd_wnd) { nBytes := len(b) - s.kcp.Send(b) - s.kcp.current = s.Elapsed() - s.kcp.flush() - s.kcpAccess.Unlock() + this.kcp.Send(b) + this.kcp.current = this.Elapsed() + this.kcp.flush() + this.kcpAccess.Unlock() return nBytes, nil } - s.kcpAccess.Unlock() + this.kcpAccess.Unlock() - if !s.wd.IsZero() && s.wd.Before(time.Now()) { + if !this.wd.IsZero() && this.wd.Before(time.Now()) { return 0, errTimeout } @@ -184,7 +175,7 @@ func (s *UDPSession) Write(b []byte) (int, error) { } } -func (this *UDPSession) Terminate() { +func (this *Connection) Terminate() { if this == nil || this.state == ConnStateClosed { return } @@ -198,7 +189,7 @@ func (this *UDPSession) Terminate() { this.writer.Close() } -func (this *UDPSession) NotifyTermination() { +func (this *Connection) NotifyTermination() { for i := 0; i < 16; i++ { this.RLock() if this.state == ConnStateClosed { @@ -217,102 +208,102 @@ func (this *UDPSession) NotifyTermination() { } // Close closes the connection. -func (s *UDPSession) Close() error { - if s == nil || s.state == ConnStateClosed || s.state == ConnStateReadyToClose { +func (this *Connection) Close() error { + if this == nil || this.state == ConnStateClosed || this.state == ConnStateReadyToClose { return errClosedConnection } - log.Debug("KCP|Connection: Closing connection to ", s.remote) - s.Lock() - defer s.Unlock() + log.Debug("KCP|Connection: Closing connection to ", this.remote) + this.Lock() + defer this.Unlock() - if s.state == ConnStateActive { - s.state = ConnStateReadyToClose - if s.kcp.WaitSnd() == 0 { - go s.NotifyTermination() + if this.state == ConnStateActive { + this.state = ConnStateReadyToClose + if this.kcp.WaitSnd() == 0 { + go this.NotifyTermination() } } - if s.state == ConnStatePeerClosed { - go s.Terminate() + if this.state == ConnStatePeerClosed { + go this.Terminate() } return nil } // LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it. -func (s *UDPSession) LocalAddr() net.Addr { - if s == nil { +func (this *Connection) LocalAddr() net.Addr { + if this == nil { return nil } - return s.local + return this.local } // RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it. -func (s *UDPSession) RemoteAddr() net.Addr { - if s == nil { +func (this *Connection) RemoteAddr() net.Addr { + if this == nil { return nil } - return s.remote + return this.remote } // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline. -func (s *UDPSession) SetDeadline(t time.Time) error { - if s == nil || s.state != ConnStateActive { +func (this *Connection) SetDeadline(t time.Time) error { + if this == nil || this.state != ConnStateActive { return errClosedConnection } - s.Lock() - defer s.Unlock() - s.rd = t - s.wd = t + this.Lock() + defer this.Unlock() + this.rd = t + this.wd = t return nil } // SetReadDeadline implements the Conn SetReadDeadline method. -func (s *UDPSession) SetReadDeadline(t time.Time) error { - if s == nil || s.state != ConnStateActive { +func (this *Connection) SetReadDeadline(t time.Time) error { + if this == nil || this.state != ConnStateActive { return errClosedConnection } - s.Lock() - defer s.Unlock() - s.rd = t + this.Lock() + defer this.Unlock() + this.rd = t return nil } // SetWriteDeadline implements the Conn SetWriteDeadline method. -func (s *UDPSession) SetWriteDeadline(t time.Time) error { - if s == nil || s.state != ConnStateActive { +func (this *Connection) SetWriteDeadline(t time.Time) error { + if this == nil || this.state != ConnStateActive { return errClosedConnection } - s.Lock() - defer s.Unlock() - s.wd = t + this.Lock() + defer this.Unlock() + this.wd = t return nil } -func (s *UDPSession) output(payload *alloc.Buffer) { +func (this *Connection) output(payload *alloc.Buffer) { defer payload.Release() - if s == nil { + if this == nil { return } - s.RLock() - defer s.RUnlock() - if s.state == ConnStatePeerClosed || s.state == ConnStateClosed { + this.RLock() + defer this.RUnlock() + if this.state == ConnStatePeerClosed || this.state == ConnStateClosed { return } - s.block.Seal(payload) + this.block.Seal(payload) - s.writer.Write(payload.Value) + this.writer.Write(payload.Value) } // kcp update, input loop -func (s *UDPSession) updateTask() { - for s.state != ConnStateClosed { - current := s.Elapsed() - s.kcpAccess.Lock() - s.kcp.Update(current) - interval := s.kcp.Check(s.Elapsed()) - s.kcpAccess.Unlock() +func (this *Connection) updateTask() { + for this.state != ConnStateClosed { + current := this.Elapsed() + this.kcpAccess.Lock() + this.kcp.Update(current) + interval := this.kcp.Check(this.Elapsed()) + this.kcpAccess.Unlock() sleep := interval - current if sleep < 10 { sleep = 10 @@ -321,14 +312,14 @@ func (s *UDPSession) updateTask() { } } -func (s *UDPSession) notifyReadEvent() { +func (this *Connection) notifyReadEvent() { select { - case s.chReadEvent <- struct{}{}: + case this.chReadEvent <- struct{}{}: default: } } -func (this *UDPSession) MarkPeerClose() { +func (this *Connection) MarkPeerClose() { this.Lock() defer this.Unlock() if this.state == ConnStateReadyToClose { @@ -341,25 +332,25 @@ func (this *UDPSession) MarkPeerClose() { } } -func (s *UDPSession) kcpInput(data []byte) { +func (this *Connection) kcpInput(data []byte) { cmd := Command(data[0]) opt := Option(data[1]) if cmd == CommandTerminate { - go s.Terminate() + go this.Terminate() return } if opt == OptionClose { - go s.MarkPeerClose() + go this.MarkPeerClose() } - s.kcpAccess.Lock() - s.kcp.current = s.Elapsed() - s.kcp.Input(data[2:]) + this.kcpAccess.Lock() + this.kcp.current = this.Elapsed() + this.kcp.Input(data[2:]) - s.kcpAccess.Unlock() - s.notifyReadEvent() + this.kcpAccess.Unlock() + this.notifyReadEvent() } -func (this *UDPSession) FetchInputFrom(conn net.Conn) { +func (this *Connection) FetchInputFrom(conn net.Conn) { go func() { for { payload := alloc.NewBuffer() @@ -378,8 +369,8 @@ func (this *UDPSession) FetchInputFrom(conn net.Conn) { }() } -func (this *UDPSession) Reusable() bool { +func (this *Connection) Reusable() bool { return false } -func (this *UDPSession) SetReusable(b bool) {} +func (this *Connection) SetReusable(b bool) {} diff --git a/transport/internet/kcp/dialer.go b/transport/internet/kcp/dialer.go index 5b22b63e..c5e2d8b7 100644 --- a/transport/internet/kcp/dialer.go +++ b/transport/internet/kcp/dialer.go @@ -23,7 +23,7 @@ func DialKCP(src v2net.Address, dest v2net.Destination) (internet.Connection, er } cpip := NewSimpleAuthenticator() - session := newUDPSession(rand.Uint32(), conn, conn.LocalAddr().(*net.UDPAddr), conn.RemoteAddr().(*net.UDPAddr), cpip) + session := NewConnection(rand.Uint32(), conn, conn.LocalAddr().(*net.UDPAddr), conn.RemoteAddr().(*net.UDPAddr), cpip) session.FetchInputFrom(conn) return session, nil diff --git a/transport/internet/kcp/listener.go b/transport/internet/kcp/listener.go index 497a1e7f..1a173d7c 100644 --- a/transport/internet/kcp/listener.go +++ b/transport/internet/kcp/listener.go @@ -18,8 +18,8 @@ type Listener struct { sync.Mutex running bool block Authenticator - sessions map[string]*UDPSession - awaitingConns chan *UDPSession + sessions map[string]*Connection + awaitingConns chan *Connection hub *udp.UDPHub localAddr *net.UDPAddr } @@ -27,8 +27,8 @@ type Listener struct { func NewListener(address v2net.Address, port v2net.Port) (*Listener, error) { l := &Listener{ block: NewSimpleAuthenticator(), - sessions: make(map[string]*UDPSession), - awaitingConns: make(chan *UDPSession, 64), + sessions: make(map[string]*Connection), + awaitingConns: make(chan *Connection, 64), localAddr: &net.UDPAddr{ IP: address.IP(), Port: int(port), @@ -72,7 +72,7 @@ func (this *Listener) OnReceive(payload *alloc.Buffer, src v2net.Destination) { IP: src.Address().IP(), Port: int(src.Port()), } - conn = newUDPSession(conv, writer, this.localAddr, srcAddr, this.block) + conn = NewConnection(conv, writer, this.localAddr, srcAddr, this.block) select { case this.awaitingConns <- conn: case <-time.After(time.Second * 5):