mirror of https://github.com/v2ray/v2ray-core
implement inbound connection handler manager in point
parent
23b6b987ca
commit
51695df239
|
@ -32,7 +32,7 @@ const (
|
|||
type InboundDetourAllocationConfig struct {
|
||||
Strategy string // Allocation strategy of this inbound detour.
|
||||
Concurrency int // Number of handlers (ports) running in parallel.
|
||||
Refresh int // Number of seconds before a handler is regenerated.
|
||||
Refresh int // Number of minutes before a handler is regenerated.
|
||||
}
|
||||
|
||||
type InboundDetourConfig struct {
|
||||
|
|
|
@ -13,6 +13,10 @@ import (
|
|||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultRefreshMinute = int(9999)
|
||||
)
|
||||
|
||||
func (this *Config) UnmarshalJSON(data []byte) error {
|
||||
type JsonConfig struct {
|
||||
Port v2net.Port `json:"port"` // Port of this Point server.
|
||||
|
@ -82,7 +86,7 @@ func (this *InboundDetourAllocationConfig) UnmarshalJSON(data []byte) error {
|
|||
type JsonInboundDetourAllocationConfig struct {
|
||||
Strategy string `json:"strategy"`
|
||||
Concurrency int `json:"concurrency"`
|
||||
RefreshSec int `json:"refresh"`
|
||||
RefreshMin int `json:"refresh"`
|
||||
}
|
||||
jsonConfig := new(JsonInboundDetourAllocationConfig)
|
||||
if err := json.Unmarshal(data, jsonConfig); err != nil {
|
||||
|
@ -90,7 +94,10 @@ func (this *InboundDetourAllocationConfig) UnmarshalJSON(data []byte) error {
|
|||
}
|
||||
this.Strategy = jsonConfig.Strategy
|
||||
this.Concurrency = jsonConfig.Concurrency
|
||||
this.Refresh = jsonConfig.RefreshSec
|
||||
this.Refresh = jsonConfig.RefreshMin
|
||||
if this.Refresh == 0 {
|
||||
this.Refresh = DefaultRefreshMinute
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -115,6 +122,12 @@ func (this *InboundDetourConfig) UnmarshalJSON(data []byte) error {
|
|||
this.Settings = jsonConfig.Settings
|
||||
this.Tag = jsonConfig.Tag
|
||||
this.Allocation = jsonConfig.Allocation
|
||||
if this.Allocation == nil {
|
||||
this.Allocation = &InboundDetourAllocationConfig{
|
||||
Strategy: AllocationStrategyAlways,
|
||||
Refresh: DefaultRefreshMinute,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,64 +1,11 @@
|
|||
package point
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/common/log"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
"github.com/v2ray/v2ray-core/common/retry"
|
||||
"github.com/v2ray/v2ray-core/proxy"
|
||||
proxyrepo "github.com/v2ray/v2ray-core/proxy/repo"
|
||||
)
|
||||
|
||||
type InboundConnectionHandlerWithPort struct {
|
||||
port v2net.Port
|
||||
handler proxy.InboundConnectionHandler
|
||||
}
|
||||
|
||||
// Handler for inbound detour connections.
|
||||
type InboundDetourHandler struct {
|
||||
space app.Space
|
||||
config *InboundDetourConfig
|
||||
ich []*InboundConnectionHandlerWithPort
|
||||
}
|
||||
|
||||
func (this *InboundDetourHandler) Initialize() error {
|
||||
ports := this.config.PortRange
|
||||
this.ich = make([]*InboundConnectionHandlerWithPort, 0, ports.To-ports.From+1)
|
||||
for i := ports.From; i <= ports.To; i++ {
|
||||
ichConfig := this.config.Settings
|
||||
ich, err := proxyrepo.CreateInboundConnectionHandler(this.config.Protocol, this.space, ichConfig)
|
||||
if err != nil {
|
||||
log.Error("Failed to create inbound connection handler: ", err)
|
||||
return err
|
||||
}
|
||||
this.ich = append(this.ich, &InboundConnectionHandlerWithPort{
|
||||
port: i,
|
||||
handler: ich,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *InboundDetourHandler) Close() {
|
||||
for _, ich := range this.ich {
|
||||
ich.handler.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Starts the inbound connection handler.
|
||||
func (this *InboundDetourHandler) Start() error {
|
||||
for _, ich := range this.ich {
|
||||
err := retry.Timed(100 /* times */, 100 /* ms */).On(func() error {
|
||||
err := ich.handler.Listen(ich.port)
|
||||
if err != nil {
|
||||
log.Error("Failed to start inbound detour on port ", ich.port, ": ", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
type InboundDetourHandler interface {
|
||||
Start() error
|
||||
Close()
|
||||
GetConnectionHandler() (proxy.InboundConnectionHandler, int)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
package point
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/common/log"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
"github.com/v2ray/v2ray-core/common/retry"
|
||||
"github.com/v2ray/v2ray-core/proxy"
|
||||
proxyrepo "github.com/v2ray/v2ray-core/proxy/repo"
|
||||
)
|
||||
|
||||
type InboundConnectionHandlerWithPort struct {
|
||||
port v2net.Port
|
||||
handler proxy.InboundConnectionHandler
|
||||
}
|
||||
|
||||
// Handler for inbound detour connections.
|
||||
type InboundDetourHandlerAlways struct {
|
||||
space app.Space
|
||||
config *InboundDetourConfig
|
||||
ich []*InboundConnectionHandlerWithPort
|
||||
}
|
||||
|
||||
func NewInboundDetourHandlerAlways(space app.Space, config *InboundDetourConfig) (*InboundDetourHandlerAlways, error) {
|
||||
handler := &InboundDetourHandlerAlways{
|
||||
space: space,
|
||||
config: config,
|
||||
}
|
||||
ports := config.PortRange
|
||||
handler.ich = make([]*InboundConnectionHandlerWithPort, 0, ports.To-ports.From+1)
|
||||
for i := ports.From; i <= ports.To; i++ {
|
||||
ichConfig := config.Settings
|
||||
ich, err := proxyrepo.CreateInboundConnectionHandler(config.Protocol, space, ichConfig)
|
||||
if err != nil {
|
||||
log.Error("Failed to create inbound connection handler: ", err)
|
||||
return nil, err
|
||||
}
|
||||
handler.ich = append(handler.ich, &InboundConnectionHandlerWithPort{
|
||||
port: i,
|
||||
handler: ich,
|
||||
})
|
||||
}
|
||||
return handler, nil
|
||||
}
|
||||
|
||||
func (this *InboundDetourHandlerAlways) GetConnectionHandler() (proxy.InboundConnectionHandler, int) {
|
||||
idx := rand.Intn(len(this.ich))
|
||||
ich := this.ich[idx]
|
||||
return ich.handler, this.config.Allocation.Refresh
|
||||
}
|
||||
|
||||
func (this *InboundDetourHandlerAlways) Close() {
|
||||
for _, ich := range this.ich {
|
||||
ich.handler.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Starts the inbound connection handler.
|
||||
func (this *InboundDetourHandlerAlways) Start() error {
|
||||
for _, ich := range this.ich {
|
||||
err := retry.Timed(100 /* times */, 100 /* ms */).On(func() error {
|
||||
err := ich.handler.Listen(ich.port)
|
||||
if err != nil {
|
||||
log.Error("Failed to start inbound detour on port ", ich.port, ": ", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
package point
|
||||
|
||||
import ()
|
|
@ -18,13 +18,14 @@ import (
|
|||
|
||||
// Point shell of V2Ray.
|
||||
type Point struct {
|
||||
port v2net.Port
|
||||
ich proxy.InboundConnectionHandler
|
||||
och proxy.OutboundConnectionHandler
|
||||
idh []*InboundDetourHandler
|
||||
odh map[string]proxy.OutboundConnectionHandler
|
||||
router router.Router
|
||||
space *controller.SpaceController
|
||||
port v2net.Port
|
||||
ich proxy.InboundConnectionHandler
|
||||
och proxy.OutboundConnectionHandler
|
||||
idh []InboundDetourHandler
|
||||
taggedIdh map[string]InboundDetourHandler
|
||||
odh map[string]proxy.OutboundConnectionHandler
|
||||
router router.Router
|
||||
space *controller.SpaceController
|
||||
}
|
||||
|
||||
// NewPoint returns a new Point server based on given configuration.
|
||||
|
@ -71,19 +72,29 @@ func NewPoint(pConfig *Config) (*Point, error) {
|
|||
}
|
||||
vpoint.och = och
|
||||
|
||||
vpoint.taggedIdh = make(map[string]InboundDetourHandler)
|
||||
detours := pConfig.InboundDetours
|
||||
if len(detours) > 0 {
|
||||
vpoint.idh = make([]*InboundDetourHandler, len(detours))
|
||||
vpoint.idh = make([]InboundDetourHandler, len(detours))
|
||||
for idx, detourConfig := range detours {
|
||||
detourHandler := &InboundDetourHandler{
|
||||
space: vpoint.space.ForContext(detourConfig.Tag),
|
||||
config: detourConfig,
|
||||
}
|
||||
err := detourHandler.Initialize()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
allocConfig := detourConfig.Allocation
|
||||
var detourHandler InboundDetourHandler
|
||||
switch allocConfig.Strategy {
|
||||
case AllocationStrategyAlways:
|
||||
dh, err := NewInboundDetourHandlerAlways(vpoint.space.ForContext(detourConfig.Tag), detourConfig)
|
||||
if err != nil {
|
||||
log.Error("Point: Failed to create detour handler: ", err)
|
||||
return nil, BadConfiguration
|
||||
}
|
||||
detourHandler = dh
|
||||
default:
|
||||
log.Error("Point: Unknown allocation strategy: ", allocConfig.Strategy)
|
||||
return nil, BadConfiguration
|
||||
}
|
||||
vpoint.idh[idx] = detourHandler
|
||||
if len(detourConfig.Tag) > 0 {
|
||||
vpoint.taggedIdh[detourConfig.Tag] = detourHandler
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -193,3 +204,7 @@ func (this *Point) FilterPacketAndDispatch(packet v2net.Packet, link ray.Outboun
|
|||
|
||||
dispatcher.Dispatch(packet, link)
|
||||
}
|
||||
|
||||
func (this *Point) GetHandler(tag string) (proxy.InboundConnectionHandler, int) {
|
||||
return nil, 0
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue