connection reuse for mkcp

pull/314/head
Darien Raymond 8 years ago
parent b2e084c78c
commit d19ee4d408
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169

@ -88,6 +88,13 @@ func (this *Config) GetReceivingBufferSize() uint32 {
return this.ReadBuffer.GetSize() / this.Mtu.GetValue()
}
func (o *ConnectionReuse) IsEnabled() bool {
if o == nil {
return true
}
return o.Enable
}
func init() {
internet.RegisterNetworkConfigCreator(v2net.Network_KCP, func() interface{} {
return new(Config)

@ -15,6 +15,7 @@ It has these top-level messages:
DownlinkCapacity
WriteBuffer
ReadBuffer
ConnectionReuse
Config
*/
package kcp
@ -95,6 +96,15 @@ func (m *ReadBuffer) String() string { return proto.CompactTextString
func (*ReadBuffer) ProtoMessage() {}
func (*ReadBuffer) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
type ConnectionReuse struct {
Enable bool `protobuf:"varint,1,opt,name=enable" json:"enable,omitempty"`
}
func (m *ConnectionReuse) Reset() { *m = ConnectionReuse{} }
func (m *ConnectionReuse) String() string { return proto.CompactTextString(m) }
func (*ConnectionReuse) ProtoMessage() {}
func (*ConnectionReuse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
type Config struct {
Mtu *MTU `protobuf:"bytes,1,opt,name=mtu" json:"mtu,omitempty"`
Tti *TTI `protobuf:"bytes,2,opt,name=tti" json:"tti,omitempty"`
@ -104,12 +114,13 @@ type Config struct {
WriteBuffer *WriteBuffer `protobuf:"bytes,6,opt,name=write_buffer,json=writeBuffer" json:"write_buffer,omitempty"`
ReadBuffer *ReadBuffer `protobuf:"bytes,7,opt,name=read_buffer,json=readBuffer" json:"read_buffer,omitempty"`
HeaderConfig *v2ray_core_common_loader.TypedSettings `protobuf:"bytes,8,opt,name=header_config,json=headerConfig" json:"header_config,omitempty"`
ConnectionReuse *ConnectionReuse `protobuf:"bytes,9,opt,name=connection_reuse,json=connectionReuse" json:"connection_reuse,omitempty"`
}
func (m *Config) Reset() { *m = Config{} }
func (m *Config) String() string { return proto.CompactTextString(m) }
func (*Config) ProtoMessage() {}
func (*Config) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func (*Config) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
func (m *Config) GetMtu() *MTU {
if m != nil {
@ -160,6 +171,13 @@ func (m *Config) GetHeaderConfig() *v2ray_core_common_loader.TypedSettings {
return nil
}
func (m *Config) GetConnectionReuse() *ConnectionReuse {
if m != nil {
return m.ConnectionReuse
}
return nil
}
func init() {
proto.RegisterType((*MTU)(nil), "v2ray.core.transport.internet.kcp.MTU")
proto.RegisterType((*TTI)(nil), "v2ray.core.transport.internet.kcp.TTI")
@ -167,38 +185,42 @@ func init() {
proto.RegisterType((*DownlinkCapacity)(nil), "v2ray.core.transport.internet.kcp.DownlinkCapacity")
proto.RegisterType((*WriteBuffer)(nil), "v2ray.core.transport.internet.kcp.WriteBuffer")
proto.RegisterType((*ReadBuffer)(nil), "v2ray.core.transport.internet.kcp.ReadBuffer")
proto.RegisterType((*ConnectionReuse)(nil), "v2ray.core.transport.internet.kcp.ConnectionReuse")
proto.RegisterType((*Config)(nil), "v2ray.core.transport.internet.kcp.Config")
}
func init() { proto.RegisterFile("v2ray.com/core/transport/internet/kcp/config.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 426 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x93, 0xc1, 0x6f, 0xd3, 0x30,
0x18, 0xc5, 0x55, 0xd2, 0x95, 0xe9, 0xcb, 0x36, 0x46, 0xc4, 0x21, 0x02, 0x09, 0x75, 0x93, 0xd8,
0x7a, 0xc1, 0x11, 0xdd, 0x05, 0xae, 0x1d, 0x97, 0x49, 0x0c, 0x81, 0x49, 0x85, 0xd4, 0x4b, 0x71,
0x1d, 0xb7, 0x58, 0x69, 0x6c, 0xcb, 0x71, 0x5a, 0x85, 0x7f, 0x91, 0x7f, 0x0a, 0xc5, 0x6e, 0x68,
0x5a, 0xa9, 0x6b, 0x6e, 0x6d, 0xfc, 0xde, 0xcf, 0xc9, 0x7b, 0xdf, 0x07, 0xc3, 0xd5, 0x50, 0x93,
0x12, 0x51, 0x99, 0x45, 0x54, 0x6a, 0x16, 0x19, 0x4d, 0x44, 0xae, 0xa4, 0x36, 0x11, 0x17, 0x86,
0x69, 0xc1, 0x4c, 0x94, 0x52, 0x15, 0x51, 0x29, 0xe6, 0x7c, 0x81, 0x94, 0x96, 0x46, 0x06, 0x57,
0xb5, 0x47, 0x33, 0xf4, 0x5f, 0x8f, 0x6a, 0x3d, 0x4a, 0xa9, 0x7a, 0x7d, 0xbb, 0x87, 0xa5, 0x32,
0xcb, 0xa4, 0x88, 0x96, 0x92, 0x24, 0x4c, 0x47, 0xa6, 0x54, 0xcc, 0xb1, 0xae, 0xdf, 0x80, 0xf7,
0x18, 0x8f, 0x83, 0x57, 0x70, 0xb2, 0x22, 0xcb, 0x82, 0x85, 0x9d, 0x7e, 0x67, 0x70, 0x8e, 0xdd,
0x9f, 0xea, 0x30, 0x8e, 0x1f, 0x0e, 0x1c, 0xde, 0xc0, 0xc5, 0x58, 0x2d, 0xb9, 0x48, 0xef, 0x89,
0x22, 0x94, 0x9b, 0xf2, 0x80, 0x6e, 0x00, 0x97, 0x9f, 0xe5, 0x5a, 0xb4, 0x50, 0x5e, 0x81, 0xff,
0x53, 0x73, 0xc3, 0x46, 0xc5, 0x7c, 0xce, 0x74, 0x10, 0x40, 0x37, 0xe7, 0x7f, 0x6a, 0x8d, 0xfd,
0x7d, 0xdd, 0x07, 0xc0, 0x8c, 0x24, 0x4f, 0x28, 0xfe, 0x76, 0xa1, 0x77, 0x6f, 0xd3, 0x0a, 0x3e,
0x82, 0x97, 0x99, 0xc2, 0x9e, 0xfa, 0xc3, 0x1b, 0x74, 0x34, 0x35, 0xf4, 0x18, 0x8f, 0x71, 0x65,
0xa9, 0x9c, 0xc6, 0xf0, 0xf0, 0x59, 0x6b, 0x67, 0x1c, 0x3f, 0xe0, 0xca, 0x12, 0x4c, 0xe0, 0x45,
0x61, 0x53, 0x99, 0xd2, 0xcd, 0xc7, 0x86, 0x9e, 0xa5, 0x7c, 0x68, 0x41, 0xd9, 0xcd, 0x13, 0x5f,
0x14, 0xbb, 0xf9, 0xfe, 0x82, 0x97, 0xc9, 0x26, 0xc9, 0x2d, 0xbd, 0x6b, 0xe9, 0x77, 0x2d, 0xe8,
0xfb, 0x2d, 0xe0, 0xcb, 0x64, 0xbf, 0x97, 0xb7, 0x00, 0x54, 0x8a, 0x05, 0xcb, 0x0d, 0x97, 0x22,
0x3c, 0xe9, 0x77, 0x06, 0xa7, 0xb8, 0xf1, 0x24, 0xf8, 0x0e, 0x67, 0xeb, 0xaa, 0xa1, 0xe9, 0xcc,
0x16, 0x10, 0xf6, 0xec, 0xe5, 0xa8, 0xc5, 0xe5, 0x8d, 0x62, 0xb1, 0xbf, 0x6e, 0xb4, 0xfc, 0x15,
0x7c, 0xcd, 0x48, 0x52, 0x13, 0x9f, 0x5b, 0xe2, 0xfb, 0x16, 0xc4, 0xed, 0x1c, 0x60, 0xd0, 0xdb,
0x99, 0xf8, 0x02, 0xe7, 0xbf, 0x59, 0x35, 0xe5, 0x53, 0xb7, 0x33, 0xe1, 0xa9, 0x25, 0xde, 0x36,
0x89, 0x6e, 0x1b, 0x90, 0xdb, 0x06, 0x14, 0x97, 0x8a, 0x25, 0x3f, 0x98, 0x31, 0x5c, 0x2c, 0x72,
0x7c, 0xe6, 0xdc, 0x6e, 0x84, 0x46, 0x9f, 0xe0, 0x1d, 0x95, 0xd9, 0xf1, 0xb7, 0x19, 0xf9, 0xce,
0xf0, 0xad, 0x5a, 0xaa, 0x89, 0x97, 0x52, 0x35, 0xeb, 0xd9, 0x05, 0xbb, 0xfb, 0x17, 0x00, 0x00,
0xff, 0xff, 0x02, 0xb7, 0x81, 0x10, 0xe2, 0x03, 0x00, 0x00,
// 473 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x93, 0xd1, 0x6f, 0xd3, 0x30,
0x10, 0xc6, 0x35, 0xba, 0x96, 0x71, 0xd9, 0xd6, 0x12, 0x21, 0x14, 0x81, 0x84, 0xb6, 0x4a, 0x6c,
0xe3, 0x01, 0x47, 0x74, 0x2f, 0xf0, 0xda, 0xf2, 0x32, 0x89, 0x21, 0x30, 0xa9, 0x90, 0x26, 0xa1,
0xe2, 0x3a, 0xd7, 0x62, 0xb5, 0xb1, 0x2d, 0xc7, 0x59, 0x55, 0xfe, 0x24, 0xfe, 0x4a, 0x64, 0xa7,
0x5d, 0xdb, 0x48, 0xdb, 0xf2, 0x96, 0xf8, 0xbe, 0xfb, 0xd9, 0xfa, 0xee, 0x3b, 0xe8, 0xdd, 0xf6,
0x0c, 0x5b, 0x12, 0xae, 0xb2, 0x98, 0x2b, 0x83, 0xb1, 0x35, 0x4c, 0xe6, 0x5a, 0x19, 0x1b, 0x0b,
0x69, 0xd1, 0x48, 0xb4, 0xf1, 0x8c, 0xeb, 0x98, 0x2b, 0x39, 0x11, 0x53, 0xa2, 0x8d, 0xb2, 0x2a,
0x3c, 0x5d, 0xf7, 0x18, 0x24, 0x77, 0x7a, 0xb2, 0xd6, 0x93, 0x19, 0xd7, 0xaf, 0xce, 0x2b, 0x58,
0xae, 0xb2, 0x4c, 0xc9, 0x78, 0xae, 0x58, 0x8a, 0x26, 0xb6, 0x4b, 0x8d, 0x25, 0xab, 0xfb, 0x1a,
0x1a, 0xd7, 0xc9, 0x30, 0x7c, 0x01, 0xcd, 0x5b, 0x36, 0x2f, 0x30, 0xda, 0x3b, 0xd9, 0xbb, 0x38,
0xa2, 0xe5, 0x8f, 0x2b, 0x26, 0xc9, 0xd5, 0x3d, 0xc5, 0x33, 0x38, 0x1e, 0xea, 0xb9, 0x90, 0xb3,
0x01, 0xd3, 0x8c, 0x0b, 0xbb, 0xbc, 0x47, 0x77, 0x01, 0x9d, 0xcf, 0x6a, 0x21, 0x6b, 0x28, 0x4f,
0x21, 0xf8, 0x69, 0x84, 0xc5, 0x7e, 0x31, 0x99, 0xa0, 0x09, 0x43, 0xd8, 0xcf, 0xc5, 0xdf, 0xb5,
0xc6, 0x7f, 0x77, 0x4f, 0x00, 0x28, 0xb2, 0xf4, 0x01, 0xc5, 0x3b, 0x68, 0x0f, 0x94, 0x94, 0xc8,
0xad, 0x50, 0x92, 0x62, 0x91, 0x63, 0xf8, 0x12, 0x5a, 0x28, 0xd9, 0x78, 0x5e, 0x0a, 0x0f, 0xe8,
0xea, 0xaf, 0xfb, 0xaf, 0x09, 0xad, 0x81, 0x37, 0x36, 0xfc, 0x08, 0x8d, 0xcc, 0x16, 0xbe, 0x1e,
0xf4, 0xce, 0xc8, 0xa3, 0x06, 0x93, 0xeb, 0x64, 0x48, 0x5d, 0x8b, 0xeb, 0xb4, 0x56, 0x44, 0x4f,
0x6a, 0x77, 0x26, 0xc9, 0x15, 0x75, 0x2d, 0xe1, 0x0d, 0xb4, 0x0b, 0x6f, 0xe0, 0x88, 0xaf, 0x7c,
0x89, 0x1a, 0x9e, 0xf2, 0xa1, 0x06, 0x65, 0xd7, 0x7a, 0x7a, 0x5c, 0xec, 0x8e, 0xe2, 0x37, 0x3c,
0x4f, 0x57, 0xa6, 0x6f, 0xe8, 0xfb, 0x9e, 0x7e, 0x59, 0x83, 0x5e, 0x1d, 0x18, 0xed, 0xa4, 0xd5,
0x11, 0xbe, 0x01, 0xe0, 0x4a, 0x4e, 0x31, 0x77, 0x3e, 0x47, 0x4d, 0x6f, 0xec, 0xd6, 0x49, 0xf8,
0x1d, 0x0e, 0x17, 0x6e, 0x98, 0xa3, 0xb1, 0x9f, 0x55, 0xd4, 0xf2, 0x97, 0x93, 0x1a, 0x97, 0x6f,
0x65, 0x80, 0x06, 0x8b, 0xad, 0x40, 0x7c, 0x85, 0xc0, 0x20, 0x4b, 0xd7, 0xc4, 0xa7, 0x9e, 0xf8,
0xbe, 0x06, 0x71, 0x13, 0x19, 0x0a, 0x66, 0x13, 0x9f, 0x2f, 0x70, 0xf4, 0x07, 0xdd, 0x42, 0x8c,
0xca, 0xf5, 0x8a, 0x0e, 0x3c, 0xf1, 0x7c, 0x9b, 0x58, 0x2e, 0x0e, 0x29, 0x17, 0x87, 0x24, 0x4b,
0x8d, 0xe9, 0x0f, 0xb4, 0x56, 0xc8, 0x69, 0x4e, 0x0f, 0xcb, 0xee, 0x55, 0x84, 0x7e, 0x41, 0x87,
0xdf, 0x05, 0x6f, 0x64, 0x5c, 0xf2, 0xa2, 0x67, 0x1e, 0xd8, 0xab, 0xf1, 0xc4, 0x4a, 0x66, 0x69,
0x9b, 0xef, 0x1e, 0xf4, 0x3f, 0xc1, 0x5b, 0xae, 0xb2, 0xc7, 0x49, 0xfd, 0xa0, 0x7c, 0xcf, 0x37,
0xb7, 0xde, 0x37, 0x8d, 0x19, 0xd7, 0xe3, 0x96, 0x5f, 0xf5, 0xcb, 0xff, 0x01, 0x00, 0x00, 0xff,
0xff, 0x76, 0x78, 0x1a, 0x9c, 0x6c, 0x04, 0x00, 0x00,
}

@ -37,6 +37,10 @@ message ReadBuffer {
uint32 size = 1;
}
message ConnectionReuse {
bool enable = 1;
}
message Config {
MTU mtu = 1;
TTI tti = 2;
@ -46,4 +50,5 @@ message Config {
WriteBuffer write_buffer = 6;
ReadBuffer read_buffer = 7;
v2ray.core.common.loader.TypedSettings header_config = 8;
ConnectionReuse connection_reuse = 9;
}

@ -8,10 +8,10 @@ import (
"sync/atomic"
"time"
"v2ray.com/core/common/alloc"
"v2ray.com/core/common/log"
"v2ray.com/core/common/predicate"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/internal"
)
var (
@ -159,13 +159,19 @@ func (this *Updater) Run() {
}
}
type SystemConnection interface {
net.Conn
Id() internal.ConnectionId
Reset(internet.Authenticator, func([]byte))
}
// Connection is a KCP connection over UDP.
type Connection struct {
conn SystemConnection
connRecycler internal.ConnectionRecyler
block internet.Authenticator
local, remote net.Addr
rd time.Time
wd time.Time // write deadline
writer io.WriteCloser
since int64
dataInputCond *sync.Cond
dataOutputCond *sync.Cond
@ -179,48 +185,47 @@ type Connection struct {
mss uint32
roundTrip *RoundTripInfo
interval uint32
receivingWorker *ReceivingWorker
sendingWorker *SendingWorker
congestionControl bool
output *BufferedSegmentWriter
dataUpdater *Updater
pingUpdater *Updater
reusable bool
}
// NewConnection create a new KCP connection between local and remote.
func NewConnection(conv uint16, writerCloser io.WriteCloser, local *net.UDPAddr, remote *net.UDPAddr, block internet.Authenticator, config *Config) *Connection {
func NewConnection(conv uint16, sysConn SystemConnection, recycler internal.ConnectionRecyler, block internet.Authenticator, config *Config) *Connection {
log.Info("KCP|Connection: creating connection ", conv)
conn := new(Connection)
conn.local = local
conn.remote = remote
conn.block = block
conn.writer = writerCloser
conn.since = nowMillisec()
conn.dataInputCond = sync.NewCond(new(sync.Mutex))
conn.dataOutputCond = sync.NewCond(new(sync.Mutex))
conn.Config = config
authWriter := &AuthenticationWriter{
Authenticator: block,
Writer: writerCloser,
Writer: sysConn,
Config: config,
}
conn.conv = conv
conn.output = NewSegmentWriter(authWriter)
conn.mss = authWriter.Mtu() - DataSegmentOverhead
conn.roundTrip = &RoundTripInfo{
conn := &Connection{
conv: conv,
conn: sysConn,
connRecycler: recycler,
block: block,
since: nowMillisec(),
dataInputCond: sync.NewCond(new(sync.Mutex)),
dataOutputCond: sync.NewCond(new(sync.Mutex)),
Config: config,
output: NewSegmentWriter(authWriter),
mss: authWriter.Mtu() - DataSegmentOverhead,
roundTrip: &RoundTripInfo{
rto: 100,
minRtt: config.Tti.GetValue(),
},
}
conn.interval = config.Tti.GetValue()
sysConn.Reset(block, conn.Input)
conn.receivingWorker = NewReceivingWorker(conn)
conn.congestionControl = config.Congestion
conn.sendingWorker = NewSendingWorker(conn)
isTerminating := func() bool {
@ -230,7 +235,7 @@ func NewConnection(conv uint16, writerCloser io.WriteCloser, local *net.UDPAddr,
return conn.State() == StateTerminated
}
conn.dataUpdater = NewUpdater(
conn.interval,
config.Tti.GetValue(),
predicate.Not(isTerminating).And(predicate.Any(conn.sendingWorker.UpdateNecessary, conn.receivingWorker.UpdateNecessary)),
isTerminating,
conn.updateTask)
@ -368,7 +373,7 @@ func (this *Connection) Close() error {
if state.Is(StateReadyToClose, StateTerminating, StateTerminated) {
return ErrClosedConnection
}
log.Info("KCP|Connection: Closing connection to ", this.remote)
log.Info("KCP|Connection: Closing connection to ", this.conn.RemoteAddr())
if state == StateActive {
this.SetState(StateReadyToClose)
@ -388,7 +393,7 @@ func (this *Connection) LocalAddr() net.Addr {
if this == nil {
return nil
}
return this.local
return this.conn.LocalAddr()
}
// RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
@ -396,7 +401,7 @@ func (this *Connection) RemoteAddr() net.Addr {
if this == nil {
return nil
}
return this.remote
return this.conn.RemoteAddr()
}
// SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
@ -433,24 +438,6 @@ func (this *Connection) updateTask() {
this.flush()
}
func (this *Connection) FetchInputFrom(conn io.Reader) {
go func() {
payload := alloc.NewLocalBuffer(2048)
defer payload.Release()
for this.State() != StateTerminated {
payload.Reset()
nBytes, err := conn.Read(payload.Value)
if err != nil {
return
}
payload.Slice(0, nBytes)
if this.block.Open(payload) {
this.Input(payload.Value)
}
}
}()
}
func (this *Connection) Reusable() bool {
return false
}
@ -458,7 +445,7 @@ func (this *Connection) Reusable() bool {
func (this *Connection) SetReusable(b bool) {}
func (this *Connection) Terminate() {
if this == nil || this.writer == nil {
if this == nil {
return
}
log.Info("KCP|Connection: Terminating connection to ", this.RemoteAddr())
@ -466,7 +453,11 @@ func (this *Connection) Terminate() {
//this.SetState(StateTerminated)
this.dataInputCond.Broadcast()
this.dataOutputCond.Broadcast()
this.writer.Close()
if this.Config.ConnectionReuse.IsEnabled() && this.reusable {
this.connRecycler.Put(this.conn.Id(), this.conn)
} else {
this.conn.Close()
}
this.sendingWorker.Release()
this.receivingWorker.Release()
}
@ -488,7 +479,7 @@ func (this *Connection) OnPeerClosed() {
}
// Input when you received a low level packet (eg. UDP packet), call it
func (this *Connection) Input(data []byte) int {
func (this *Connection) Input(data []byte) {
current := this.Elapsed()
atomic.StoreUint32(&this.lastIncomingTime, current)
@ -498,6 +489,9 @@ func (this *Connection) Input(data []byte) int {
if seg == nil {
break
}
if seg.Conversation() != this.conv {
return
}
switch seg := seg.(type) {
case *DataSegment:
@ -530,8 +524,6 @@ func (this *Connection) Input(data []byte) int {
default:
}
}
return 0
}
func (this *Connection) flush() {

@ -1,27 +1,63 @@
package kcp_test
import (
"net"
"testing"
"time"
"v2ray.com/core/testing/assert"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/internal"
. "v2ray.com/core/transport/internet/kcp"
)
type NoOpWriteCloser struct{}
type NoOpConn struct{}
func (this *NoOpWriteCloser) Write(b []byte) (int, error) {
func (o *NoOpConn) Write(b []byte) (int, error) {
return len(b), nil
}
func (this *NoOpWriteCloser) Close() error {
func (o *NoOpConn) Close() error {
return nil
}
func (o *NoOpConn) Read([]byte) (int, error) {
panic("Should not be called.")
}
func (o *NoOpConn) LocalAddr() net.Addr {
return nil
}
func (o *NoOpConn) RemoteAddr() net.Addr {
return nil
}
func (o *NoOpConn) SetDeadline(time.Time) error {
return nil
}
func (o *NoOpConn) SetReadDeadline(time.Time) error {
return nil
}
func (o *NoOpConn) SetWriteDeadline(time.Time) error {
return nil
}
func (o *NoOpConn) Id() internal.ConnectionId {
return internal.ConnectionId{}
}
func (o *NoOpConn) Reset(auth internet.Authenticator, input func([]byte)) {}
type NoOpRecycler struct{}
func (o *NoOpRecycler) Put(internal.ConnectionId, net.Conn) {}
func TestConnectionReadTimeout(t *testing.T) {
assert := assert.On(t)
conn := NewConnection(1, &NoOpWriteCloser{}, nil, nil, NewSimpleAuthenticator(), &Config{})
conn := NewConnection(1, &NoOpConn{}, &NoOpRecycler{}, NewSimpleAuthenticator(), &Config{})
conn.SetReadDeadline(time.Now().Add(time.Second))
b := make([]byte, 1024)

@ -3,27 +3,88 @@ package kcp
import (
"crypto/tls"
"net"
"sync"
"sync/atomic"
"v2ray.com/core/common/alloc"
"v2ray.com/core/common/dice"
"v2ray.com/core/common/log"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/internal"
v2tls "v2ray.com/core/transport/internet/tls"
)
var (
globalConv = uint32(dice.Roll(65536))
globalPool = internal.NewConnectionPool()
)
type ClientConnection struct {
sync.Mutex
net.Conn
id internal.ConnectionId
input func([]byte)
auth internet.Authenticator
}
func (o *ClientConnection) Read([]byte) (int, error) {
panic("KCP|ClientConnection: Read should not be called.")
}
func (o *ClientConnection) Id() internal.ConnectionId {
return o.id
}
func (o *ClientConnection) Close() error {
return o.Conn.Close()
}
func (o *ClientConnection) Reset(auth internet.Authenticator, inputCallback func([]byte)) {
o.Lock()
o.input = inputCallback
o.auth = auth
o.Unlock()
}
func (o *ClientConnection) Run() {
payload := alloc.NewSmallBuffer()
defer payload.Release()
for {
nBytes, err := o.Conn.Read(payload.Value)
if err != nil {
payload.Release()
return
}
payload.Slice(0, nBytes)
o.Lock()
if o.input != nil && o.auth.Open(payload) {
o.input(payload.Value)
}
o.Unlock()
payload.Reset()
}
}
func DialKCP(src v2net.Address, dest v2net.Destination, options internet.DialerOptions) (internet.Connection, error) {
dest.Network = v2net.Network_UDP
log.Info("KCP|Dialer: Dialing KCP to ", dest)
conn, err := internet.DialToDest(src, dest)
id := internal.NewConnectionId(src, dest)
conn := globalPool.Get(id)
if conn == nil {
rawConn, err := internet.DialToDest(src, dest)
if err != nil {
log.Error("KCP|Dialer: Failed to dial to dest: ", err)
return nil, err
}
c := &ClientConnection{
Conn: rawConn,
id: id,
}
go c.Run()
conn = c
}
networkSettings, err := options.Stream.GetEffectiveNetworkSettings()
if err != nil {
@ -38,8 +99,7 @@ func DialKCP(src v2net.Address, dest v2net.Destination, options internet.DialerO
return nil, err
}
conv := uint16(atomic.AddUint32(&globalConv, 1))
session := NewConnection(conv, conn, conn.LocalAddr().(*net.UDPAddr), conn.RemoteAddr().(*net.UDPAddr), cpip, kcpSettings)
session.FetchInputFrom(conn)
session := NewConnection(conv, conn.(*ClientConnection), globalPool, cpip, kcpSettings)
var iConn internet.Connection
iConn = session

@ -12,16 +12,81 @@ import (
"v2ray.com/core/common/serial"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/internal"
v2tls "v2ray.com/core/transport/internet/tls"
"v2ray.com/core/transport/internet/udp"
)
type ConnectionId struct {
Remote v2net.Address
Port v2net.Port
Conv uint16
}
type ServerConnection struct {
id internal.ConnectionId
writer *Writer
local net.Addr
remote net.Addr
auth internet.Authenticator
input func([]byte)
}
func (o *ServerConnection) Read([]byte) (int, error) {
panic("KCP|ServerConnection: Read should not be called.")
}
func (o *ServerConnection) Write(b []byte) (int, error) {
return o.writer.Write(b)
}
func (o *ServerConnection) Close() error {
return o.writer.Close()
}
func (o *ServerConnection) Reset(auth internet.Authenticator, input func([]byte)) {
o.auth = auth
o.input = input
}
func (o *ServerConnection) Input(b *alloc.Buffer) {
defer b.Release()
if o.auth.Open(b) {
o.input(b.Value)
}
}
func (o *ServerConnection) LocalAddr() net.Addr {
return o.local
}
func (o *ServerConnection) RemoteAddr() net.Addr {
return o.remote
}
func (o *ServerConnection) SetDeadline(time.Time) error {
return nil
}
func (o *ServerConnection) SetReadDeadline(time.Time) error {
return nil
}
func (o *ServerConnection) SetWriteDeadline(time.Time) error {
return nil
}
func (o *ServerConnection) Id() internal.ConnectionId {
return o.id
}
// Listener defines a server listening for connections
type Listener struct {
sync.Mutex
running bool
authenticator internet.Authenticator
sessions map[string]*Connection
sessions map[ConnectionId]*Connection
awaitingConns chan *Connection
hub *udp.UDPHub
tlsConfig *tls.Config
@ -42,7 +107,7 @@ func NewListener(address v2net.Address, port v2net.Port, options internet.Listen
}
l := &Listener{
authenticator: auth,
sessions: make(map[string]*Connection),
sessions: make(map[ConnectionId]*Connection),
awaitingConns: make(chan *Connection, 64),
running: true,
config: kcpSettings,
@ -89,40 +154,51 @@ func (this *Listener) OnReceive(payload *alloc.Buffer, session *proxy.SessionInf
}
conv := serial.BytesToUint16(payload.Value)
cmd := Command(payload.Value[2])
sourceId := src.NetAddr() + "|" + serial.Uint16ToString(conv)
conn, found := this.sessions[sourceId]
id := ConnectionId{
Remote: src.Address,
Port: src.Port,
Conv: conv,
}
conn, found := this.sessions[id]
if !found {
if cmd == CommandTerminate {
return
}
log.Debug("KCP|Listener: Creating session with id(", sourceId, ") from ", src)
writer := &Writer{
id: sourceId,
id: id,
hub: this.hub,
dest: src,
listener: this,
}
srcAddr := &net.UDPAddr{
remoteAddr := &net.UDPAddr{
IP: src.Address.IP(),
Port: int(src.Port),
}
localAddr := this.hub.Addr()
auth, err := this.config.GetAuthenticator()
if err != nil {
log.Error("KCP|Listener: Failed to create authenticator: ", err)
}
conn = NewConnection(conv, writer, this.Addr().(*net.UDPAddr), srcAddr, auth, this.config)
sConn := &ServerConnection{
id: internal.NewConnectionId(v2net.LocalHostIP, src),
local: localAddr,
remote: remoteAddr,
writer: writer,
}
conn = NewConnection(conv, sConn, this, auth, this.config)
select {
case this.awaitingConns <- conn:
case <-time.After(time.Second * 5):
conn.Close()
return
}
this.sessions[sourceId] = conn
this.sessions[id] = conn
}
conn.Input(payload.Value)
}
func (this *Listener) Remove(dest string) {
func (this *Listener) Remove(id ConnectionId) {
if !this.running {
return
}
@ -131,8 +207,7 @@ func (this *Listener) Remove(dest string) {
if !this.running {
return
}
log.Debug("KCP|Listener: Removing session ", dest)
delete(this.sessions, dest)
delete(this.sessions, id)
}
// Accept implements the Accept method in the Listener interface; it waits for the next call and returns a generic Conn.
@ -187,8 +262,10 @@ func (this *Listener) Addr() net.Addr {
return this.hub.Addr()
}
func (this *Listener) Put(internal.ConnectionId, net.Conn) {}
type Writer struct {
id string
id ConnectionId
dest v2net.Destination
hub *udp.UDPHub
listener *Listener

@ -24,6 +24,7 @@ const (
type Segment interface {
common.Releasable
Conversation() uint16
ByteSize() int
Bytes([]byte) []byte
}
@ -48,6 +49,10 @@ func NewDataSegment() *DataSegment {
return new(DataSegment)
}
func (this *DataSegment) Conversation() uint16 {
return this.Conv
}
func (this *DataSegment) SetData(b []byte) {
if this.Data == nil {
this.Data = alloc.NewSmallBuffer()
@ -89,6 +94,10 @@ func NewAckSegment() *AckSegment {
return new(AckSegment)
}
func (this *AckSegment) Conversation() uint16 {
return this.Conv
}
func (this *AckSegment) PutTimestamp(timestamp uint32) {
if timestamp-this.Timestamp < 0x7FFFFFFF {
this.Timestamp = timestamp
@ -138,6 +147,10 @@ func NewCmdOnlySegment() *CmdOnlySegment {
return new(CmdOnlySegment)
}
func (this *CmdOnlySegment) Conversation() uint16 {
return this.Conv
}
func (this *CmdOnlySegment) ByteSize() int {
return 2 + 1 + 1 + 4 + 4 + 4
}

@ -2,7 +2,6 @@ package udp
import (
"net"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/transport/internet"
)

Loading…
Cancel
Save