From 98d99a1fd0315b25213efffb34ab4a27b3d1f510 Mon Sep 17 00:00:00 2001 From: v2ray Date: Fri, 22 Jan 2016 16:25:01 +0100 Subject: [PATCH] dynamic allocation of inbounds --- common/net/testing/port.go | 9 +- shell/point/inbound_detour_dynamic.go | 135 +++++++++++++++++++++- shell/point/point.go | 7 ++ testing/scenarios/data/test_4_client.json | 29 +++++ testing/scenarios/data/test_4_server.json | 48 ++++++++ testing/scenarios/dynamic_vmess_test.go | 54 +++++++++ 6 files changed, 274 insertions(+), 8 deletions(-) create mode 100644 testing/scenarios/data/test_4_client.json create mode 100644 testing/scenarios/data/test_4_server.json create mode 100644 testing/scenarios/dynamic_vmess_test.go diff --git a/common/net/testing/port.go b/common/net/testing/port.go index 21fce177..b49d60a7 100644 --- a/common/net/testing/port.go +++ b/common/net/testing/port.go @@ -1,15 +1,10 @@ package testing import ( - "sync/atomic" - + "github.com/v2ray/v2ray-core/common/dice" v2net "github.com/v2ray/v2ray-core/common/net" ) -var ( - port = int32(30000) -) - func PickPort() v2net.Port { - return v2net.Port(atomic.AddInt32(&port, 1)) + return v2net.Port(30000 + dice.Roll(10000)) } diff --git a/shell/point/inbound_detour_dynamic.go b/shell/point/inbound_detour_dynamic.go index 45a149af..e204f185 100644 --- a/shell/point/inbound_detour_dynamic.go +++ b/shell/point/inbound_detour_dynamic.go @@ -1,3 +1,136 @@ package point -import () +import ( + "sync" + "time" + + "github.com/v2ray/v2ray-core/app" + "github.com/v2ray/v2ray-core/common/dice" + "github.com/v2ray/v2ray-core/common/log" + v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/common/retry" + "github.com/v2ray/v2ray-core/proxy" + proxyrepo "github.com/v2ray/v2ray-core/proxy/repo" +) + +type InboundDetourHandlerDynamic struct { + sync.RWMutex + space app.Space + config *InboundDetourConfig + portsInUse map[v2net.Port]bool + ichInUse []*InboundConnectionHandlerWithPort + ich2Recycle []*InboundConnectionHandlerWithPort + lastRefresh time.Time + started bool +} + +func NewInboundDetourHandlerDynamic(space app.Space, config *InboundDetourConfig) (*InboundDetourHandlerDynamic, error) { + handler := &InboundDetourHandlerDynamic{ + space: space, + config: config, + portsInUse: make(map[v2net.Port]bool), + } + if err := handler.refresh(); err != nil { + return nil, err + } + return handler, nil +} + +func (this *InboundDetourHandlerDynamic) refresh() error { + this.Lock() + defer this.Unlock() + + this.ich2Recycle = this.ichInUse + if this.ich2Recycle != nil { + time.AfterFunc(10*time.Second, func() { + for _, ich := range this.ich2Recycle { + if ich != nil { + ich.handler.Close() + delete(this.portsInUse, ich.port) + } + } + }) + } + + ichCount := this.config.Allocation.Concurrency + // TODO: check ichCount + if this.ichInUse == nil { + this.ichInUse = make([]*InboundConnectionHandlerWithPort, ichCount) + } + for idx, _ := range this.ichInUse { + port := this.pickUnusedPort() + ich, err := proxyrepo.CreateInboundConnectionHandler(this.config.Protocol, this.space, this.config.Settings) + if err != nil { + log.Error("Point: Failed to create inbound connection handler: ", err) + return err + } + this.ichInUse[idx] = &InboundConnectionHandlerWithPort{ + port: port, + handler: ich, + } + } + if this.started { + this.Start() + } + + this.lastRefresh = time.Now() + time.AfterFunc(time.Duration(this.config.Allocation.Refresh)*time.Minute, func() { + this.refresh() + }) + + return nil +} + +func (this *InboundDetourHandlerDynamic) pickUnusedPort() v2net.Port { + delta := int(this.config.PortRange.To) - int(this.config.PortRange.From) + 1 + for { + r := dice.Roll(delta) + port := this.config.PortRange.From + v2net.Port(r) + _, used := this.portsInUse[port] + if !used { + this.portsInUse[port] = true + return port + } + } +} + +func (this *InboundDetourHandlerDynamic) GetConnectionHandler() (proxy.InboundConnectionHandler, int) { + this.RLock() + defer this.RUnlock() + ich := this.ichInUse[dice.Roll(len(this.ichInUse))] + until := (time.Now().Unix() - this.lastRefresh.Unix()) / 60 / 1000 + return ich.handler, int(until) +} + +func (this *InboundDetourHandlerDynamic) Close() { + this.Lock() + defer this.Unlock() + for _, ich := range this.ichInUse { + ich.handler.Close() + } + if this.ich2Recycle != nil { + for _, ich := range this.ich2Recycle { + if ich != nil && ich.handler != nil { + ich.handler.Close() + } + } + } +} + +func (this *InboundDetourHandlerDynamic) Start() error { + for _, ich := range this.ichInUse { + err := retry.Timed(100 /* times */, 100 /* ms */).On(func() error { + err := ich.handler.Listen(ich.port) + if err != nil { + log.Error("Point: Failed to start inbound detour on port ", ich.port, ": ", err) + return err + } + return nil + }) + if err != nil { + return err + } + } + this.started = true + return nil +} diff --git a/shell/point/point.go b/shell/point/point.go index 97efecdf..f7d3923e 100644 --- a/shell/point/point.go +++ b/shell/point/point.go @@ -87,6 +87,13 @@ func NewPoint(pConfig *Config) (*Point, error) { return nil, BadConfiguration } detourHandler = dh + case AllocationStrategyRandom: + dh, err := NewInboundDetourHandlerDynamic(vpoint.space.ForContext(detourConfig.Tag), detourConfig) + if err != nil { + log.Error("Point: Failed to create detour handler: ", err) + return nil, BadConfiguration + } + detourHandler = dh default: log.Error("Point: Unknown allocation strategy: ", allocConfig.Strategy) return nil, BadConfiguration diff --git a/testing/scenarios/data/test_4_client.json b/testing/scenarios/data/test_4_client.json new file mode 100644 index 00000000..38db452b --- /dev/null +++ b/testing/scenarios/data/test_4_client.json @@ -0,0 +1,29 @@ +{ + "port": 50030, + "inbound": { + "protocol": "dokodemo-door", + "settings": { + "address": "127.0.0.1", + "port": 50032, + "network": "tcp", + "timeout": 0 + } + }, + "outbound": { + "protocol": "vmess", + "settings": { + "vnext": [ + { + "address": "127.0.0.1", + "port": 50031, + "users": [ + { + "id": "d17a1af7-efa5-42ca-b7e9-6a35282d737f", + "alterId": 10 + } + ] + } + ] + } + } +} diff --git a/testing/scenarios/data/test_4_server.json b/testing/scenarios/data/test_4_server.json new file mode 100644 index 00000000..2f1f38b7 --- /dev/null +++ b/testing/scenarios/data/test_4_server.json @@ -0,0 +1,48 @@ +{ + "port": 50031, + "log": { + "loglevel": "warning" + }, + "inbound": { + "protocol": "vmess", + "settings": { + "clients": [ + { + "id": "d17a1af7-efa5-42ca-b7e9-6a35282d737f", + "level": 1, + "alterId": 10 + } + ], + "features": { + "detour": { + "to": "detour" + } + } + } + }, + "outbound": { + "protocol": "freedom", + "settings": {} + }, + "inboundDetour": [ + { + "protocol": "vmess", + "port": "50035-50039", + "tag": "detour", + "settings": { + "clients": [ + { + "id": "a12f49ba-466c-4dd5-8438-5c315143bc96", + "alterId": 100, + "level": 1 + } + ] + }, + "allocate": { + "strategy": "random", + "concurrency": 3, + "refresh": 5 + } + } + ] +} diff --git a/testing/scenarios/dynamic_vmess_test.go b/testing/scenarios/dynamic_vmess_test.go new file mode 100644 index 00000000..6dbbb64f --- /dev/null +++ b/testing/scenarios/dynamic_vmess_test.go @@ -0,0 +1,54 @@ +package scenarios + +import ( + "net" + "testing" + + v2net "github.com/v2ray/v2ray-core/common/net" + v2testing "github.com/v2ray/v2ray-core/testing" + "github.com/v2ray/v2ray-core/testing/assert" + "github.com/v2ray/v2ray-core/testing/servers/tcp" +) + +func TestDynamicVMess(t *testing.T) { + v2testing.Current(t) + + tcpServer := &tcp.Server{ + Port: v2net.Port(50032), + MsgProcessor: func(data []byte) []byte { + buffer := make([]byte, 0, 2048) + buffer = append(buffer, []byte("Processed: ")...) + buffer = append(buffer, data...) + return buffer + }, + } + _, err := tcpServer.Start() + assert.Error(err).IsNil() + defer tcpServer.Close() + + assert.Error(InitializeServerSetOnce("test_4")).IsNil() + + for i := 0; i < 100; i++ { + conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{ + IP: []byte{127, 0, 0, 1}, + Port: 50030, + }) + assert.Error(err).IsNil() + + payload := "dokodemo request." + nBytes, err := conn.Write([]byte(payload)) + assert.Error(err).IsNil() + assert.Int(nBytes).Equals(len(payload)) + + conn.CloseWrite() + + response := make([]byte, 1024) + nBytes, err = conn.Read(response) + assert.Error(err).IsNil() + assert.StringLiteral("Processed: " + payload).Equals(string(response[:nBytes])) + + conn.Close() + } + + CloseAllServers() +}