modulize dispatcher and proxyman

pull/168/head
v2ray 2016-05-17 23:05:52 -07:00
parent 09ea65687c
commit 7765fedd78
10 changed files with 167 additions and 213 deletions

View File

@ -14,26 +14,3 @@ const (
type PacketDispatcher interface { type PacketDispatcher interface {
DispatchToOutbound(destination v2net.Destination) ray.InboundRay DispatchToOutbound(destination v2net.Destination) ray.InboundRay
} }
type packetDispatcherWithContext interface {
DispatchToOutbound(context app.Context, destination v2net.Destination) ray.InboundRay
}
type contextedPacketDispatcher struct {
context app.Context
packetDispatcher packetDispatcherWithContext
}
func (this *contextedPacketDispatcher) DispatchToOutbound(destination v2net.Destination) ray.InboundRay {
return this.packetDispatcher.DispatchToOutbound(this.context, destination)
}
func init() {
app.Register(APP_ID, func(context app.Context, obj interface{}) interface{} {
packetDispatcher := obj.(packetDispatcherWithContext)
return &contextedPacketDispatcher{
context: context,
packetDispatcher: packetDispatcher,
}
})
}

View File

@ -0,0 +1,71 @@
package impl
import (
"github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/app/dns"
"github.com/v2ray/v2ray-core/app/proxyman"
"github.com/v2ray/v2ray-core/app/router"
"github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/proxy"
"github.com/v2ray/v2ray-core/transport/ray"
)
type DefaultDispatcher struct {
ohm proxyman.OutboundHandlerManager
router router.Router
dns dns.Server
}
func NewDefaultDispatcher(space app.Space) *DefaultDispatcher {
if !space.HasApp(proxyman.APP_ID_OUTBOUND_MANAGER) {
log.Error("DefaultDispatcher: OutboundHandlerManager is not found in the space.")
return nil
}
if !space.HasApp(dns.APP_ID) {
log.Error("DefaultDispatcher: DNS is not found in the space.")
return nil
}
d := &DefaultDispatcher{
ohm: space.GetApp(proxyman.APP_ID_OUTBOUND_MANAGER).(proxyman.OutboundHandlerManager),
dns: space.GetApp(dns.APP_ID).(dns.Server),
}
if space.HasApp(router.APP_ID) {
d.router = space.GetApp(router.APP_ID).(router.Router)
}
return d
}
func (this *DefaultDispatcher) DispatchToOutbound(destination v2net.Destination) ray.InboundRay {
direct := ray.NewRay()
dispatcher := this.ohm.GetDefaultHandler()
if this.router != nil {
if tag, err := this.router.TakeDetour(destination); err == nil {
if handler := this.ohm.GetHandler(tag); handler != nil {
log.Info("DefaultDispatcher: Taking detour [", tag, "] for [", destination, "].")
dispatcher = handler
} else {
log.Warning("DefaultDispatcher: Nonexisting tag: ", tag)
}
} else {
log.Info("DefaultDispatcher: Default route for ", destination)
}
}
go this.FilterPacketAndDispatch(destination, direct, dispatcher)
return direct
}
// @Private
func (this *DefaultDispatcher) FilterPacketAndDispatch(destination v2net.Destination, link ray.OutboundRay, dispatcher proxy.OutboundHandler) {
payload, err := link.OutboundInput().Read()
if err != nil {
log.Info("DefaultDispatcher: No payload to dispatch, stopping now.")
link.OutboundInput().Release()
link.OutboundOutput().Release()
return
}
dispatcher.Dispatch(destination, payload, link)
}

View File

@ -14,26 +14,3 @@ const (
type Server interface { type Server interface {
Get(domain string) []net.IP Get(domain string) []net.IP
} }
type dnsServerWithContext interface {
Get(context app.Context, domain string) []net.IP
}
type contextedDnsServer struct {
context app.Context
dnsCache dnsServerWithContext
}
func (this *contextedDnsServer) Get(domain string) []net.IP {
return this.dnsCache.Get(this.context, domain)
}
func init() {
app.Register(APP_ID, func(context app.Context, obj interface{}) interface{} {
dcContext := obj.(dnsServerWithContext)
return &contextedDnsServer{
context: context,
dnsCache: dcContext,
}
})
}

View File

@ -53,7 +53,7 @@ func (this *CacheServer) GetCached(domain string) []net.IP {
return nil return nil
} }
func (this *CacheServer) Get(context app.Context, domain string) []net.IP { func (this *CacheServer) Get(domain string) []net.IP {
domain = dns.Fqdn(domain) domain = dns.Fqdn(domain)
ips := this.GetCached(domain) ips := this.GetCached(domain)
if ips != nil { if ips != nil {

View File

@ -6,44 +6,26 @@ import (
"github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/app/dispatcher" "github.com/v2ray/v2ray-core/app/dispatcher"
dispatchers "github.com/v2ray/v2ray-core/app/dispatcher/impl"
. "github.com/v2ray/v2ray-core/app/dns" . "github.com/v2ray/v2ray-core/app/dns"
apptesting "github.com/v2ray/v2ray-core/app/testing" "github.com/v2ray/v2ray-core/app/proxyman"
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
netassert "github.com/v2ray/v2ray-core/common/net/testing/assert" netassert "github.com/v2ray/v2ray-core/common/net/testing/assert"
"github.com/v2ray/v2ray-core/proxy/freedom" "github.com/v2ray/v2ray-core/proxy/freedom"
v2testing "github.com/v2ray/v2ray-core/testing" v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert" "github.com/v2ray/v2ray-core/testing/assert"
"github.com/v2ray/v2ray-core/transport/ray"
) )
type TestDispatcher struct {
freedom *freedom.FreedomConnection
}
func (this *TestDispatcher) DispatchToOutbound(context app.Context, dest v2net.Destination) ray.InboundRay {
direct := ray.NewRay()
go func() {
payload, err := direct.OutboundInput().Read()
if err != nil {
direct.OutboundInput().Release()
direct.OutboundOutput().Release()
return
}
this.freedom.Dispatch(dest, payload, direct)
}()
return direct
}
func TestDnsAdd(t *testing.T) { func TestDnsAdd(t *testing.T) {
v2testing.Current(t) v2testing.Current(t)
d := &TestDispatcher{ space := app.NewSpace()
freedom: &freedom.FreedomConnection{},
} outboundHandlerManager := &proxyman.DefaultOutboundHandlerManager{}
spaceController := app.NewController() outboundHandlerManager.SetDefaultHandler(&freedom.FreedomConnection{})
spaceController.Bind(dispatcher.APP_ID, d) space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, outboundHandlerManager)
space := spaceController.ForContext("test")
space.BindApp(dispatcher.APP_ID, dispatchers.NewDefaultDispatcher(space))
domain := "local.v2ray.com" domain := "local.v2ray.com"
server := NewCacheServer(space, &Config{ server := NewCacheServer(space, &Config{
@ -51,9 +33,7 @@ func TestDnsAdd(t *testing.T) {
v2net.UDPDestination(v2net.IPAddress([]byte{8, 8, 8, 8}), v2net.Port(53)), v2net.UDPDestination(v2net.IPAddress([]byte{8, 8, 8, 8}), v2net.Port(53)),
}, },
}) })
ips := server.Get(&apptesting.Context{ ips := server.Get(domain)
CallerTagValue: "a",
}, domain)
assert.Int(len(ips)).Equals(1) assert.Int(len(ips)).Equals(1)
netassert.IP(ips[0].To4()).Equals(net.IP([]byte{127, 0, 0, 1})) netassert.IP(ips[0].To4()).Equals(net.IP([]byte{127, 0, 0, 1}))
} }

View File

@ -1,37 +1,59 @@
package proxyman package proxyman
import ( import (
"sync"
"github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy"
) )
const ( const (
APP_ID_INBOUND_MANAGER = app.ID(4) APP_ID_INBOUND_MANAGER = app.ID(4)
APP_ID_OUTBOUND_MANAGER = app.ID(6)
) )
type InboundHandlerManager interface { type InboundHandlerManager interface {
GetHandler(tag string) (proxy.InboundHandler, int) GetHandler(tag string) (proxy.InboundHandler, int)
} }
type inboundHandlerManagerWithContext interface { type OutboundHandlerManager interface {
GetHandler(context app.Context, tag string) (proxy.InboundHandler, int) GetHandler(tag string) proxy.OutboundHandler
GetDefaultHandler() proxy.OutboundHandler
} }
type inboundHandlerManagerWithContextImpl struct { type DefaultOutboundHandlerManager struct {
context app.Context sync.RWMutex
manager inboundHandlerManagerWithContext defaultHandler proxy.OutboundHandler
taggedHandler map[string]proxy.OutboundHandler
} }
func (this *inboundHandlerManagerWithContextImpl) GetHandler(tag string) (proxy.InboundHandler, int) { func (this *DefaultOutboundHandlerManager) GetDefaultHandler() proxy.OutboundHandler {
return this.manager.GetHandler(this.context, tag) this.RLock()
} defer this.RUnlock()
if this.defaultHandler == nil {
func init() { return nil
app.Register(APP_ID_INBOUND_MANAGER, func(context app.Context, obj interface{}) interface{} {
manager := obj.(inboundHandlerManagerWithContext)
return &inboundHandlerManagerWithContextImpl{
context: context,
manager: manager,
} }
}) return this.defaultHandler
}
func (this *DefaultOutboundHandlerManager) SetDefaultHandler(handler proxy.OutboundHandler) {
this.Lock()
defer this.Unlock()
this.defaultHandler = handler
}
func (this *DefaultOutboundHandlerManager) GetHandler(tag string) proxy.OutboundHandler {
this.RLock()
defer this.RUnlock()
if handler, found := this.taggedHandler[tag]; found {
return handler
}
return nil
}
func (this *DefaultOutboundHandlerManager) SetHandler(tag string, handler proxy.OutboundHandler) {
this.Lock()
defer this.Unlock()
this.taggedHandler[tag] = handler
} }

View File

@ -5,6 +5,10 @@ import (
v2net "github.com/v2ray/v2ray-core/common/net" v2net "github.com/v2ray/v2ray-core/common/net"
) )
const (
APP_ID = app.ID(3)
)
type Router interface { type Router interface {
TakeDetour(v2net.Destination) (string, error) TakeDetour(v2net.Destination) (string, error)
} }

View File

@ -7,52 +7,26 @@ type Context interface {
CallerTag() string CallerTag() string
} }
type Caller interface {
Tag() string
}
// A Space contains all apps that may be available in a V2Ray runtime. // A Space contains all apps that may be available in a V2Ray runtime.
// Caller must check the availability of an app by calling HasXXX before getting its instance. // Caller must check the availability of an app by calling HasXXX before getting its instance.
type Space interface { type Space interface {
HasApp(ID) bool HasApp(ID) bool
GetApp(ID) interface{} GetApp(ID) interface{}
} BindApp(ID, interface{})
type ForContextCreator func(Context, interface{}) interface{}
var (
metadataCache = make(map[ID]ForContextCreator)
)
func Register(id ID, creator ForContextCreator) {
// TODO: check id
metadataCache[id] = creator
}
type contextImpl struct {
callerTag string
}
func (this *contextImpl) CallerTag() string {
return this.callerTag
} }
type spaceImpl struct { type spaceImpl struct {
cache map[ID]interface{} cache map[ID]interface{}
tag string
} }
func newSpaceImpl(tag string, cache map[ID]interface{}) *spaceImpl { func NewSpace() Space {
space := &spaceImpl{ return &spaceImpl{
tag: tag,
cache: make(map[ID]interface{}), cache: make(map[ID]interface{}),
} }
context := &contextImpl{
callerTag: tag,
}
for id, object := range cache {
creator, found := metadataCache[id]
if found {
space.cache[id] = creator(context, object)
}
}
return space
} }
func (this *spaceImpl) HasApp(id ID) bool { func (this *spaceImpl) HasApp(id ID) bool {
@ -68,22 +42,6 @@ func (this *spaceImpl) GetApp(id ID) interface{} {
return obj return obj
} }
// A SpaceController is supposed to be used by a shell to create Spaces. It should not be used func (this *spaceImpl) BindApp(id ID, object interface{}) {
// directly by proxies. this.cache[id] = object
type SpaceController struct {
objectCache map[ID]interface{}
}
func NewController() *SpaceController {
return &SpaceController{
objectCache: make(map[ID]interface{}),
}
}
func (this *SpaceController) Bind(id ID, object interface{}) {
this.objectCache[id] = object
}
func (this *SpaceController) ForContext(tag string) Space {
return newSpaceImpl(tag, this.objectCache)
} }

View File

@ -1,9 +0,0 @@
package testing
type Context struct {
CallerTagValue string
}
func (this *Context) CallerTag() string {
return this.CallerTagValue
}

View File

@ -7,6 +7,7 @@ package point
import ( import (
"github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/app/dispatcher" "github.com/v2ray/v2ray-core/app/dispatcher"
dispatchers "github.com/v2ray/v2ray-core/app/dispatcher/impl"
"github.com/v2ray/v2ray-core/app/dns" "github.com/v2ray/v2ray-core/app/dns"
"github.com/v2ray/v2ray-core/app/proxyman" "github.com/v2ray/v2ray-core/app/proxyman"
"github.com/v2ray/v2ray-core/app/router" "github.com/v2ray/v2ray-core/app/router"
@ -15,7 +16,6 @@ import (
"github.com/v2ray/v2ray-core/common/retry" "github.com/v2ray/v2ray-core/common/retry"
"github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy"
proxyrepo "github.com/v2ray/v2ray-core/proxy/repo" proxyrepo "github.com/v2ray/v2ray-core/proxy/repo"
"github.com/v2ray/v2ray-core/transport/ray"
) )
// Point shell of V2Ray. // Point shell of V2Ray.
@ -27,7 +27,7 @@ type Point struct {
taggedIdh map[string]InboundDetourHandler taggedIdh map[string]InboundDetourHandler
odh map[string]proxy.OutboundHandler odh map[string]proxy.OutboundHandler
router router.Router router router.Router
space *app.SpaceController space app.Space
} }
// NewPoint returns a new Point server based on given configuration. // NewPoint returns a new Point server based on given configuration.
@ -55,12 +55,33 @@ func NewPoint(pConfig *Config) (*Point, error) {
log.SetLogLevel(logConfig.LogLevel) log.SetLogLevel(logConfig.LogLevel)
} }
vpoint.space = app.NewController() vpoint.space = app.NewSpace()
vpoint.space.Bind(dispatcher.APP_ID, vpoint) vpoint.space.BindApp(proxyman.APP_ID_INBOUND_MANAGER, vpoint)
vpoint.space.Bind(proxyman.APP_ID_INBOUND_MANAGER, vpoint)
outboundHandlerManager := &proxyman.DefaultOutboundHandlerManager{}
vpoint.space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, outboundHandlerManager)
dnsConfig := pConfig.DNSConfig
if dnsConfig != nil {
dnsServer := dns.NewCacheServer(vpoint.space, dnsConfig)
vpoint.space.BindApp(dns.APP_ID, dnsServer)
}
routerConfig := pConfig.RouterConfig
if routerConfig != nil {
r, err := router.CreateRouter(routerConfig.Strategy, routerConfig.Settings, vpoint.space)
if err != nil {
log.Error("Failed to create router: ", err)
return nil, ErrorBadConfiguration
}
vpoint.space.BindApp(router.APP_ID, r)
vpoint.router = r
}
vpoint.space.BindApp(dispatcher.APP_ID, dispatchers.NewDefaultDispatcher(vpoint.space))
ichConfig := pConfig.InboundConfig.Settings ichConfig := pConfig.InboundConfig.Settings
ich, err := proxyrepo.CreateInboundHandler(pConfig.InboundConfig.Protocol, vpoint.space.ForContext("vpoint-default-inbound"), ichConfig) ich, err := proxyrepo.CreateInboundHandler(pConfig.InboundConfig.Protocol, vpoint.space, ichConfig)
if err != nil { if err != nil {
log.Error("Failed to create inbound connection handler: ", err) log.Error("Failed to create inbound connection handler: ", err)
return nil, err return nil, err
@ -68,12 +89,13 @@ func NewPoint(pConfig *Config) (*Point, error) {
vpoint.ich = ich vpoint.ich = ich
ochConfig := pConfig.OutboundConfig.Settings ochConfig := pConfig.OutboundConfig.Settings
och, err := proxyrepo.CreateOutboundHandler(pConfig.OutboundConfig.Protocol, vpoint.space.ForContext("vpoint-default-outbound"), ochConfig) och, err := proxyrepo.CreateOutboundHandler(pConfig.OutboundConfig.Protocol, vpoint.space, ochConfig)
if err != nil { if err != nil {
log.Error("Failed to create outbound connection handler: ", err) log.Error("Failed to create outbound connection handler: ", err)
return nil, err return nil, err
} }
vpoint.och = och vpoint.och = och
outboundHandlerManager.SetDefaultHandler(och)
vpoint.taggedIdh = make(map[string]InboundDetourHandler) vpoint.taggedIdh = make(map[string]InboundDetourHandler)
detours := pConfig.InboundDetours detours := pConfig.InboundDetours
@ -84,14 +106,14 @@ func NewPoint(pConfig *Config) (*Point, error) {
var detourHandler InboundDetourHandler var detourHandler InboundDetourHandler
switch allocConfig.Strategy { switch allocConfig.Strategy {
case AllocationStrategyAlways: case AllocationStrategyAlways:
dh, err := NewInboundDetourHandlerAlways(vpoint.space.ForContext(detourConfig.Tag), detourConfig) dh, err := NewInboundDetourHandlerAlways(vpoint.space, detourConfig)
if err != nil { if err != nil {
log.Error("Point: Failed to create detour handler: ", err) log.Error("Point: Failed to create detour handler: ", err)
return nil, ErrorBadConfiguration return nil, ErrorBadConfiguration
} }
detourHandler = dh detourHandler = dh
case AllocationStrategyRandom: case AllocationStrategyRandom:
dh, err := NewInboundDetourHandlerDynamic(vpoint.space.ForContext(detourConfig.Tag), detourConfig) dh, err := NewInboundDetourHandlerDynamic(vpoint.space, detourConfig)
if err != nil { if err != nil {
log.Error("Point: Failed to create detour handler: ", err) log.Error("Point: Failed to create detour handler: ", err)
return nil, ErrorBadConfiguration return nil, ErrorBadConfiguration
@ -112,31 +134,16 @@ func NewPoint(pConfig *Config) (*Point, error) {
if len(outboundDetours) > 0 { if len(outboundDetours) > 0 {
vpoint.odh = make(map[string]proxy.OutboundHandler) vpoint.odh = make(map[string]proxy.OutboundHandler)
for _, detourConfig := range outboundDetours { for _, detourConfig := range outboundDetours {
detourHandler, err := proxyrepo.CreateOutboundHandler(detourConfig.Protocol, vpoint.space.ForContext(detourConfig.Tag), detourConfig.Settings) detourHandler, err := proxyrepo.CreateOutboundHandler(detourConfig.Protocol, vpoint.space, detourConfig.Settings)
if err != nil { if err != nil {
log.Error("Failed to create detour outbound connection handler: ", err) log.Error("Failed to create detour outbound connection handler: ", err)
return nil, err return nil, err
} }
vpoint.odh[detourConfig.Tag] = detourHandler vpoint.odh[detourConfig.Tag] = detourHandler
outboundHandlerManager.SetHandler(detourConfig.Tag, detourHandler)
} }
} }
dnsConfig := pConfig.DNSConfig
if dnsConfig != nil {
dnsServer := dns.NewCacheServer(vpoint.space.ForContext("system.dns"), dnsConfig)
vpoint.space.Bind(dns.APP_ID, dnsServer)
}
routerConfig := pConfig.RouterConfig
if routerConfig != nil {
r, err := router.CreateRouter(routerConfig.Strategy, routerConfig.Settings, vpoint.space.ForContext("system.router"))
if err != nil {
log.Error("Failed to create router: ", err)
return nil, ErrorBadConfiguration
}
vpoint.router = r
}
return vpoint, nil return vpoint, nil
} }
@ -177,40 +184,7 @@ func (this *Point) Start() error {
return nil return nil
} }
// Dispatches a Packet to an OutboundConnection. func (this *Point) GetHandler(tag string) (proxy.InboundHandler, int) {
// The packet will be passed through the router (if configured), and then sent to an outbound
// connection with matching tag.
func (this *Point) DispatchToOutbound(context app.Context, destination v2net.Destination) ray.InboundRay {
direct := ray.NewRay()
dispatcher := this.och
if this.router != nil {
if tag, err := this.router.TakeDetour(destination); err == nil {
if handler, found := this.odh[tag]; found {
log.Info("Point: Taking detour [", tag, "] for [", destination, "]")
dispatcher = handler
} else {
log.Warning("Point: Unable to find routing destination: ", tag)
}
}
}
go this.FilterPacketAndDispatch(destination, direct, dispatcher)
return direct
}
func (this *Point) FilterPacketAndDispatch(destination v2net.Destination, link ray.OutboundRay, dispatcher proxy.OutboundHandler) {
payload, err := link.OutboundInput().Read()
if err != nil {
log.Info("Point: No payload to dispatch, stopping dispatching now.")
link.OutboundOutput().Release()
link.OutboundInput().Release()
return
}
dispatcher.Dispatch(destination, payload, link)
}
func (this *Point) GetHandler(context app.Context, tag string) (proxy.InboundHandler, int) {
handler, found := this.taggedIdh[tag] handler, found := this.taggedIdh[tag]
if !found { if !found {
log.Warning("Point: Unable to find an inbound handler with tag: ", tag) log.Warning("Point: Unable to find an inbound handler with tag: ", tag)