You've already forked v2ray-core
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d6200dc64 | ||
|
|
e00c424341 | ||
|
|
ec610494ea | ||
|
|
54f98fe11b | ||
|
|
8b88f63280 | ||
|
|
1b71c7d68d | ||
|
|
f5f13d801f | ||
|
|
3ded18a75b | ||
|
|
7765fedd78 | ||
|
|
09ea65687c |
@@ -14,26 +14,3 @@ const (
|
||||
type PacketDispatcher interface {
|
||||
DispatchToOutbound(destination v2net.Destination) ray.InboundRay
|
||||
}
|
||||
|
||||
type packetDispatcherWithContext interface {
|
||||
DispatchToOutbound(context app.Context, destination v2net.Destination) ray.InboundRay
|
||||
}
|
||||
|
||||
type contextedPacketDispatcher struct {
|
||||
context app.Context
|
||||
packetDispatcher packetDispatcherWithContext
|
||||
}
|
||||
|
||||
func (this *contextedPacketDispatcher) DispatchToOutbound(destination v2net.Destination) ray.InboundRay {
|
||||
return this.packetDispatcher.DispatchToOutbound(this.context, destination)
|
||||
}
|
||||
|
||||
func init() {
|
||||
app.Register(APP_ID, func(context app.Context, obj interface{}) interface{} {
|
||||
packetDispatcher := obj.(packetDispatcherWithContext)
|
||||
return &contextedPacketDispatcher{
|
||||
context: context,
|
||||
packetDispatcher: packetDispatcher,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
77
app/dispatcher/impl/default.go
Normal file
77
app/dispatcher/impl/default.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package impl
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/proxyman"
|
||||
"github.com/v2ray/v2ray-core/app/router"
|
||||
"github.com/v2ray/v2ray-core/common/log"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
"github.com/v2ray/v2ray-core/proxy"
|
||||
"github.com/v2ray/v2ray-core/transport/ray"
|
||||
)
|
||||
|
||||
type DefaultDispatcher struct {
|
||||
ohm proxyman.OutboundHandlerManager
|
||||
router router.Router
|
||||
}
|
||||
|
||||
func NewDefaultDispatcher(space app.Space) *DefaultDispatcher {
|
||||
d := &DefaultDispatcher{}
|
||||
space.InitializeApplication(func() error {
|
||||
return d.Initialize(space)
|
||||
})
|
||||
return d
|
||||
}
|
||||
|
||||
// @Private
|
||||
func (this *DefaultDispatcher) Initialize(space app.Space) error {
|
||||
if !space.HasApp(proxyman.APP_ID_OUTBOUND_MANAGER) {
|
||||
log.Error("DefaultDispatcher: OutboundHandlerManager is not found in the space.")
|
||||
return app.ErrorMissingApplication
|
||||
}
|
||||
this.ohm = space.GetApp(proxyman.APP_ID_OUTBOUND_MANAGER).(proxyman.OutboundHandlerManager)
|
||||
|
||||
if space.HasApp(router.APP_ID) {
|
||||
this.router = space.GetApp(router.APP_ID).(router.Router)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *DefaultDispatcher) Release() {
|
||||
|
||||
}
|
||||
|
||||
func (this *DefaultDispatcher) DispatchToOutbound(destination v2net.Destination) ray.InboundRay {
|
||||
direct := ray.NewRay()
|
||||
dispatcher := this.ohm.GetDefaultHandler()
|
||||
|
||||
if this.router != nil {
|
||||
if tag, err := this.router.TakeDetour(destination); err == nil {
|
||||
if handler := this.ohm.GetHandler(tag); handler != nil {
|
||||
log.Info("DefaultDispatcher: Taking detour [", tag, "] for [", destination, "].")
|
||||
dispatcher = handler
|
||||
} else {
|
||||
log.Warning("DefaultDispatcher: Nonexisting tag: ", tag)
|
||||
}
|
||||
} else {
|
||||
log.Info("DefaultDispatcher: Default route for ", destination)
|
||||
}
|
||||
}
|
||||
|
||||
go this.FilterPacketAndDispatch(destination, direct, dispatcher)
|
||||
|
||||
return direct
|
||||
}
|
||||
|
||||
// @Private
|
||||
func (this *DefaultDispatcher) FilterPacketAndDispatch(destination v2net.Destination, link ray.OutboundRay, dispatcher proxy.OutboundHandler) {
|
||||
payload, err := link.OutboundInput().Read()
|
||||
if err != nil {
|
||||
log.Info("DefaultDispatcher: No payload to dispatch, stopping now.")
|
||||
link.OutboundInput().Release()
|
||||
link.OutboundOutput().Release()
|
||||
return
|
||||
}
|
||||
dispatcher.Dispatch(destination, payload, link)
|
||||
}
|
||||
@@ -14,26 +14,3 @@ const (
|
||||
type Server interface {
|
||||
Get(domain string) []net.IP
|
||||
}
|
||||
|
||||
type dnsServerWithContext interface {
|
||||
Get(context app.Context, domain string) []net.IP
|
||||
}
|
||||
|
||||
type contextedDnsServer struct {
|
||||
context app.Context
|
||||
dnsCache dnsServerWithContext
|
||||
}
|
||||
|
||||
func (this *contextedDnsServer) Get(domain string) []net.IP {
|
||||
return this.dnsCache.Get(this.context, domain)
|
||||
}
|
||||
|
||||
func init() {
|
||||
app.Register(APP_ID, func(context app.Context, obj interface{}) interface{} {
|
||||
dcContext := obj.(dnsServerWithContext)
|
||||
return &contextedDnsServer{
|
||||
context: context,
|
||||
dnsCache: dcContext,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -16,7 +16,9 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultTTL = uint32(3600)
|
||||
DefaultTTL = uint32(3600)
|
||||
CleanupInterval = time.Second * 120
|
||||
CleanupThreshold = 512
|
||||
)
|
||||
|
||||
type ARecord struct {
|
||||
@@ -35,9 +37,10 @@ type PendingRequest struct {
|
||||
|
||||
type UDPNameServer struct {
|
||||
sync.Mutex
|
||||
address v2net.Destination
|
||||
requests map[uint16]*PendingRequest
|
||||
udpServer *hub.UDPServer
|
||||
address v2net.Destination
|
||||
requests map[uint16]*PendingRequest
|
||||
udpServer *hub.UDPServer
|
||||
nextCleanup time.Time
|
||||
}
|
||||
|
||||
func NewUDPNameServer(address v2net.Destination, dispatcher dispatcher.PacketDispatcher) *UDPNameServer {
|
||||
@@ -46,35 +49,36 @@ func NewUDPNameServer(address v2net.Destination, dispatcher dispatcher.PacketDis
|
||||
requests: make(map[uint16]*PendingRequest),
|
||||
udpServer: hub.NewUDPServer(dispatcher),
|
||||
}
|
||||
go s.Cleanup()
|
||||
return s
|
||||
}
|
||||
|
||||
// @Private
|
||||
func (this *UDPNameServer) Cleanup() {
|
||||
for {
|
||||
time.Sleep(time.Second * 60)
|
||||
expiredRequests := make([]uint16, 0, 16)
|
||||
now := time.Now()
|
||||
this.Lock()
|
||||
for id, r := range this.requests {
|
||||
if r.expire.Before(now) {
|
||||
expiredRequests = append(expiredRequests, id)
|
||||
close(r.response)
|
||||
}
|
||||
expiredRequests := make([]uint16, 0, 16)
|
||||
now := time.Now()
|
||||
this.Lock()
|
||||
for id, r := range this.requests {
|
||||
if r.expire.Before(now) {
|
||||
expiredRequests = append(expiredRequests, id)
|
||||
close(r.response)
|
||||
}
|
||||
for _, id := range expiredRequests {
|
||||
delete(this.requests, id)
|
||||
}
|
||||
this.Unlock()
|
||||
expiredRequests = nil
|
||||
}
|
||||
for _, id := range expiredRequests {
|
||||
delete(this.requests, id)
|
||||
}
|
||||
this.Unlock()
|
||||
expiredRequests = nil
|
||||
}
|
||||
|
||||
// @Private
|
||||
func (this *UDPNameServer) AssignUnusedID(response chan<- *ARecord) uint16 {
|
||||
var id uint16
|
||||
this.Lock()
|
||||
if len(this.requests) > CleanupThreshold && this.nextCleanup.Before(time.Now()) {
|
||||
this.nextCleanup = time.Now().Add(CleanupInterval)
|
||||
go this.Cleanup()
|
||||
}
|
||||
|
||||
for {
|
||||
id = uint16(rand.Intn(65536))
|
||||
if _, found := this.requests[id]; found {
|
||||
|
||||
@@ -22,6 +22,7 @@ type DomainRecord struct {
|
||||
|
||||
type CacheServer struct {
|
||||
sync.RWMutex
|
||||
space app.Space
|
||||
records map[string]*DomainRecord
|
||||
servers []NameServer
|
||||
}
|
||||
@@ -31,17 +32,29 @@ func NewCacheServer(space app.Space, config *Config) *CacheServer {
|
||||
records: make(map[string]*DomainRecord),
|
||||
servers: make([]NameServer, len(config.NameServers)),
|
||||
}
|
||||
dispatcher := space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher)
|
||||
for idx, ns := range config.NameServers {
|
||||
if ns.Address().IsDomain() && ns.Address().Domain() == "localhost" {
|
||||
server.servers[idx] = &LocalNameServer{}
|
||||
} else {
|
||||
server.servers[idx] = NewUDPNameServer(ns, dispatcher)
|
||||
space.InitializeApplication(func() error {
|
||||
if !space.HasApp(dispatcher.APP_ID) {
|
||||
log.Error("DNS: Dispatcher is not found in the space.")
|
||||
return app.ErrorMissingApplication
|
||||
}
|
||||
}
|
||||
|
||||
dispatcher := space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher)
|
||||
for idx, ns := range config.NameServers {
|
||||
if ns.Address().IsDomain() && ns.Address().Domain() == "localhost" {
|
||||
server.servers[idx] = &LocalNameServer{}
|
||||
} else {
|
||||
server.servers[idx] = NewUDPNameServer(ns, dispatcher)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return server
|
||||
}
|
||||
|
||||
func (this *CacheServer) Release() {
|
||||
|
||||
}
|
||||
|
||||
//@Private
|
||||
func (this *CacheServer) GetCached(domain string) []net.IP {
|
||||
this.RLock()
|
||||
@@ -53,7 +66,7 @@ func (this *CacheServer) GetCached(domain string) []net.IP {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *CacheServer) Get(context app.Context, domain string) []net.IP {
|
||||
func (this *CacheServer) Get(domain string) []net.IP {
|
||||
domain = dns.Fqdn(domain)
|
||||
ips := this.GetCached(domain)
|
||||
if ips != nil {
|
||||
|
||||
@@ -6,44 +6,25 @@ import (
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
dispatchers "github.com/v2ray/v2ray-core/app/dispatcher/impl"
|
||||
. "github.com/v2ray/v2ray-core/app/dns"
|
||||
apptesting "github.com/v2ray/v2ray-core/app/testing"
|
||||
"github.com/v2ray/v2ray-core/app/proxyman"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
netassert "github.com/v2ray/v2ray-core/common/net/testing/assert"
|
||||
"github.com/v2ray/v2ray-core/proxy/freedom"
|
||||
v2testing "github.com/v2ray/v2ray-core/testing"
|
||||
"github.com/v2ray/v2ray-core/testing/assert"
|
||||
"github.com/v2ray/v2ray-core/transport/ray"
|
||||
)
|
||||
|
||||
type TestDispatcher struct {
|
||||
freedom *freedom.FreedomConnection
|
||||
}
|
||||
|
||||
func (this *TestDispatcher) DispatchToOutbound(context app.Context, dest v2net.Destination) ray.InboundRay {
|
||||
direct := ray.NewRay()
|
||||
|
||||
go func() {
|
||||
payload, err := direct.OutboundInput().Read()
|
||||
if err != nil {
|
||||
direct.OutboundInput().Release()
|
||||
direct.OutboundOutput().Release()
|
||||
return
|
||||
}
|
||||
this.freedom.Dispatch(dest, payload, direct)
|
||||
}()
|
||||
return direct
|
||||
}
|
||||
|
||||
func TestDnsAdd(t *testing.T) {
|
||||
v2testing.Current(t)
|
||||
|
||||
d := &TestDispatcher{
|
||||
freedom: &freedom.FreedomConnection{},
|
||||
}
|
||||
spaceController := app.NewController()
|
||||
spaceController.Bind(dispatcher.APP_ID, d)
|
||||
space := spaceController.ForContext("test")
|
||||
space := app.NewSpace()
|
||||
|
||||
outboundHandlerManager := proxyman.NewDefaultOutboundHandlerManager()
|
||||
outboundHandlerManager.SetDefaultHandler(&freedom.FreedomConnection{})
|
||||
space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, outboundHandlerManager)
|
||||
space.BindApp(dispatcher.APP_ID, dispatchers.NewDefaultDispatcher(space))
|
||||
|
||||
domain := "local.v2ray.com"
|
||||
server := NewCacheServer(space, &Config{
|
||||
@@ -51,9 +32,10 @@ func TestDnsAdd(t *testing.T) {
|
||||
v2net.UDPDestination(v2net.IPAddress([]byte{8, 8, 8, 8}), v2net.Port(53)),
|
||||
},
|
||||
})
|
||||
ips := server.Get(&apptesting.Context{
|
||||
CallerTagValue: "a",
|
||||
}, domain)
|
||||
space.BindApp(APP_ID, server)
|
||||
space.Initialize()
|
||||
|
||||
ips := server.Get(domain)
|
||||
assert.Int(len(ips)).Equals(1)
|
||||
netassert.IP(ips[0].To4()).Equals(net.IP([]byte{127, 0, 0, 1}))
|
||||
}
|
||||
|
||||
@@ -1,37 +1,69 @@
|
||||
package proxyman
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/proxy"
|
||||
)
|
||||
|
||||
const (
|
||||
APP_ID_INBOUND_MANAGER = app.ID(4)
|
||||
APP_ID_INBOUND_MANAGER = app.ID(4)
|
||||
APP_ID_OUTBOUND_MANAGER = app.ID(6)
|
||||
)
|
||||
|
||||
type InboundHandlerManager interface {
|
||||
GetHandler(tag string) (proxy.InboundHandler, int)
|
||||
}
|
||||
|
||||
type inboundHandlerManagerWithContext interface {
|
||||
GetHandler(context app.Context, tag string) (proxy.InboundHandler, int)
|
||||
type OutboundHandlerManager interface {
|
||||
GetHandler(tag string) proxy.OutboundHandler
|
||||
GetDefaultHandler() proxy.OutboundHandler
|
||||
}
|
||||
|
||||
type inboundHandlerManagerWithContextImpl struct {
|
||||
context app.Context
|
||||
manager inboundHandlerManagerWithContext
|
||||
type DefaultOutboundHandlerManager struct {
|
||||
sync.RWMutex
|
||||
defaultHandler proxy.OutboundHandler
|
||||
taggedHandler map[string]proxy.OutboundHandler
|
||||
}
|
||||
|
||||
func (this *inboundHandlerManagerWithContextImpl) GetHandler(tag string) (proxy.InboundHandler, int) {
|
||||
return this.manager.GetHandler(this.context, tag)
|
||||
func NewDefaultOutboundHandlerManager() *DefaultOutboundHandlerManager {
|
||||
return &DefaultOutboundHandlerManager{
|
||||
taggedHandler: make(map[string]proxy.OutboundHandler),
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
app.Register(APP_ID_INBOUND_MANAGER, func(context app.Context, obj interface{}) interface{} {
|
||||
manager := obj.(inboundHandlerManagerWithContext)
|
||||
return &inboundHandlerManagerWithContextImpl{
|
||||
context: context,
|
||||
manager: manager,
|
||||
}
|
||||
})
|
||||
func (this *DefaultOutboundHandlerManager) Release() {
|
||||
|
||||
}
|
||||
|
||||
func (this *DefaultOutboundHandlerManager) GetDefaultHandler() proxy.OutboundHandler {
|
||||
this.RLock()
|
||||
defer this.RUnlock()
|
||||
if this.defaultHandler == nil {
|
||||
return nil
|
||||
}
|
||||
return this.defaultHandler
|
||||
}
|
||||
|
||||
func (this *DefaultOutboundHandlerManager) SetDefaultHandler(handler proxy.OutboundHandler) {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
this.defaultHandler = handler
|
||||
}
|
||||
|
||||
func (this *DefaultOutboundHandlerManager) GetHandler(tag string) proxy.OutboundHandler {
|
||||
this.RLock()
|
||||
defer this.RUnlock()
|
||||
if handler, found := this.taggedHandler[tag]; found {
|
||||
return handler
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *DefaultOutboundHandlerManager) SetHandler(tag string, handler proxy.OutboundHandler) {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
|
||||
this.taggedHandler[tag] = handler
|
||||
}
|
||||
|
||||
@@ -2,10 +2,16 @@ package router
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/common"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
)
|
||||
|
||||
const (
|
||||
APP_ID = app.ID(3)
|
||||
)
|
||||
|
||||
type Router interface {
|
||||
common.Releasable
|
||||
TakeDetour(v2net.Destination) (string, error)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
package router_test
|
||||
|
||||
import (
|
||||
"net"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
. "github.com/v2ray/v2ray-core/app/router"
|
||||
_ "github.com/v2ray/v2ray-core/app/router/rules"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
"github.com/v2ray/v2ray-core/shell/point"
|
||||
v2testing "github.com/v2ray/v2ray-core/testing"
|
||||
"github.com/v2ray/v2ray-core/testing/assert"
|
||||
)
|
||||
|
||||
func TestRouter(t *testing.T) {
|
||||
v2testing.Current(t)
|
||||
|
||||
baseDir := "$GOPATH/src/github.com/v2ray/v2ray-core/release/config"
|
||||
|
||||
pointConfig, err := point.LoadConfig(filepath.Join(baseDir, "vpoint_socks_vmess.json"))
|
||||
assert.Error(err).IsNil()
|
||||
|
||||
router, err := CreateRouter(pointConfig.RouterConfig.Strategy, pointConfig.RouterConfig.Settings, nil)
|
||||
assert.Error(err).IsNil()
|
||||
|
||||
dest := v2net.TCPDestination(v2net.IPAddress(net.ParseIP("120.135.126.1")), 80)
|
||||
tag, err := router.TakeDetour(dest)
|
||||
assert.Error(err).IsNil()
|
||||
assert.StringLiteral(tag).Equals("direct")
|
||||
}
|
||||
@@ -81,6 +81,7 @@ func init() {
|
||||
anySubDomain + "9xu" + dotCom,
|
||||
anySubDomain + "abchina" + dotCom,
|
||||
anySubDomain + "acfun" + dotTv,
|
||||
anySubDomain + "acgvideo" + dotCom,
|
||||
anySubDomain + "agrantsem" + dotCom,
|
||||
anySubDomain + "aicdn" + dotCom,
|
||||
anySubDomain + "aixifan" + dotCom,
|
||||
|
||||
@@ -41,23 +41,34 @@ func (this *cacheEntry) Extend() {
|
||||
}
|
||||
|
||||
type Router struct {
|
||||
config *RouterRuleConfig
|
||||
cache *collect.ValidityMap
|
||||
space app.Space
|
||||
config *RouterRuleConfig
|
||||
cache *collect.ValidityMap
|
||||
dnsServer dns.Server
|
||||
}
|
||||
|
||||
func NewRouter(config *RouterRuleConfig, space app.Space) *Router {
|
||||
return &Router{
|
||||
r := &Router{
|
||||
config: config,
|
||||
cache: collect.NewValidityMap(3600),
|
||||
space: space,
|
||||
}
|
||||
space.InitializeApplication(func() error {
|
||||
if !space.HasApp(dns.APP_ID) {
|
||||
log.Error("DNS: Router is not found in the space.")
|
||||
return app.ErrorMissingApplication
|
||||
}
|
||||
r.dnsServer = space.GetApp(dns.APP_ID).(dns.Server)
|
||||
return nil
|
||||
})
|
||||
return r
|
||||
}
|
||||
|
||||
func (this *Router) Release() {
|
||||
|
||||
}
|
||||
|
||||
// @Private
|
||||
func (this *Router) ResolveIP(dest v2net.Destination) []v2net.Destination {
|
||||
dnsServer := this.space.GetApp(dns.APP_ID).(dns.Server)
|
||||
ips := dnsServer.Get(dest.Address().Domain())
|
||||
ips := this.dnsServer.Get(dest.Address().Domain())
|
||||
if len(ips) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -3,6 +3,12 @@ package rules_test
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
dispatchers "github.com/v2ray/v2ray-core/app/dispatcher/impl"
|
||||
"github.com/v2ray/v2ray-core/app/dns"
|
||||
"github.com/v2ray/v2ray-core/app/proxyman"
|
||||
"github.com/v2ray/v2ray-core/app/router"
|
||||
. "github.com/v2ray/v2ray-core/app/router/rules"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
v2testing "github.com/v2ray/v2ray-core/testing"
|
||||
@@ -21,9 +27,15 @@ func TestSimpleRouter(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
router := NewRouter(config, nil)
|
||||
space := app.NewSpace()
|
||||
space.BindApp(dns.APP_ID, dns.NewCacheServer(space, &dns.Config{}))
|
||||
space.BindApp(dispatcher.APP_ID, dispatchers.NewDefaultDispatcher(space))
|
||||
space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, proxyman.NewDefaultOutboundHandlerManager())
|
||||
r := NewRouter(config, space)
|
||||
space.BindApp(router.APP_ID, r)
|
||||
assert.Error(space.Initialize()).IsNil()
|
||||
|
||||
tag, err := router.TakeDetour(v2net.TCPDestination(v2net.DomainAddress("v2ray.com"), 80))
|
||||
tag, err := r.TakeDetour(v2net.TCPDestination(v2net.DomainAddress("v2ray.com"), 80))
|
||||
assert.Error(err).IsNil()
|
||||
assert.StringLiteral(tag).Equals("test")
|
||||
}
|
||||
|
||||
99
app/space.go
99
app/space.go
@@ -1,5 +1,15 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/v2ray/v2ray-core/common"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrorMissingApplication = errors.New("App: Failed to found one or more applications.")
|
||||
)
|
||||
|
||||
type ID int
|
||||
|
||||
// Context of a function call from proxy to app.
|
||||
@@ -7,52 +17,51 @@ type Context interface {
|
||||
CallerTag() string
|
||||
}
|
||||
|
||||
type Caller interface {
|
||||
Tag() string
|
||||
}
|
||||
|
||||
type Application interface {
|
||||
common.Releasable
|
||||
}
|
||||
|
||||
type ApplicationInitializer func() error
|
||||
|
||||
// A Space contains all apps that may be available in a V2Ray runtime.
|
||||
// Caller must check the availability of an app by calling HasXXX before getting its instance.
|
||||
type Space interface {
|
||||
Initialize() error
|
||||
InitializeApplication(ApplicationInitializer)
|
||||
|
||||
HasApp(ID) bool
|
||||
GetApp(ID) interface{}
|
||||
}
|
||||
|
||||
type ForContextCreator func(Context, interface{}) interface{}
|
||||
|
||||
var (
|
||||
metadataCache = make(map[ID]ForContextCreator)
|
||||
)
|
||||
|
||||
func Register(id ID, creator ForContextCreator) {
|
||||
// TODO: check id
|
||||
metadataCache[id] = creator
|
||||
}
|
||||
|
||||
type contextImpl struct {
|
||||
callerTag string
|
||||
}
|
||||
|
||||
func (this *contextImpl) CallerTag() string {
|
||||
return this.callerTag
|
||||
GetApp(ID) Application
|
||||
BindApp(ID, Application)
|
||||
}
|
||||
|
||||
type spaceImpl struct {
|
||||
cache map[ID]interface{}
|
||||
tag string
|
||||
cache map[ID]Application
|
||||
appInit []ApplicationInitializer
|
||||
}
|
||||
|
||||
func newSpaceImpl(tag string, cache map[ID]interface{}) *spaceImpl {
|
||||
space := &spaceImpl{
|
||||
tag: tag,
|
||||
cache: make(map[ID]interface{}),
|
||||
func NewSpace() Space {
|
||||
return &spaceImpl{
|
||||
cache: make(map[ID]Application),
|
||||
appInit: make([]ApplicationInitializer, 0, 32),
|
||||
}
|
||||
context := &contextImpl{
|
||||
callerTag: tag,
|
||||
}
|
||||
for id, object := range cache {
|
||||
creator, found := metadataCache[id]
|
||||
if found {
|
||||
space.cache[id] = creator(context, object)
|
||||
}
|
||||
|
||||
func (this *spaceImpl) InitializeApplication(f ApplicationInitializer) {
|
||||
this.appInit = append(this.appInit, f)
|
||||
}
|
||||
|
||||
func (this *spaceImpl) Initialize() error {
|
||||
for _, f := range this.appInit {
|
||||
err := f()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return space
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *spaceImpl) HasApp(id ID) bool {
|
||||
@@ -60,7 +69,7 @@ func (this *spaceImpl) HasApp(id ID) bool {
|
||||
return found
|
||||
}
|
||||
|
||||
func (this *spaceImpl) GetApp(id ID) interface{} {
|
||||
func (this *spaceImpl) GetApp(id ID) Application {
|
||||
obj, found := this.cache[id]
|
||||
if !found {
|
||||
return nil
|
||||
@@ -68,22 +77,6 @@ func (this *spaceImpl) GetApp(id ID) interface{} {
|
||||
return obj
|
||||
}
|
||||
|
||||
// A SpaceController is supposed to be used by a shell to create Spaces. It should not be used
|
||||
// directly by proxies.
|
||||
type SpaceController struct {
|
||||
objectCache map[ID]interface{}
|
||||
}
|
||||
|
||||
func NewController() *SpaceController {
|
||||
return &SpaceController{
|
||||
objectCache: make(map[ID]interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (this *SpaceController) Bind(id ID, object interface{}) {
|
||||
this.objectCache[id] = object
|
||||
}
|
||||
|
||||
func (this *SpaceController) ForContext(tag string) Space {
|
||||
return newSpaceImpl(tag, this.objectCache)
|
||||
func (this *spaceImpl) BindApp(id ID, application Application) {
|
||||
this.cache[id] = application
|
||||
}
|
||||
|
||||
@@ -1,9 +0,0 @@
|
||||
package testing
|
||||
|
||||
type Context struct {
|
||||
CallerTagValue string
|
||||
}
|
||||
|
||||
func (this *Context) CallerTag() string {
|
||||
return this.CallerTagValue
|
||||
}
|
||||
@@ -2,11 +2,13 @@ package io
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/v2ray/v2ray-core/common/alloc"
|
||||
)
|
||||
|
||||
type BufferedReader struct {
|
||||
sync.Mutex
|
||||
reader io.Reader
|
||||
buffer *alloc.Buffer
|
||||
cached bool
|
||||
@@ -21,6 +23,9 @@ func NewBufferedReader(rawReader io.Reader) *BufferedReader {
|
||||
}
|
||||
|
||||
func (this *BufferedReader) Release() {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
|
||||
this.buffer.Release()
|
||||
this.buffer = nil
|
||||
this.reader = nil
|
||||
@@ -35,6 +40,13 @@ func (this *BufferedReader) SetCached(cached bool) {
|
||||
}
|
||||
|
||||
func (this *BufferedReader) Read(b []byte) (int, error) {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
|
||||
if this.reader == nil {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
if !this.cached {
|
||||
if !this.buffer.IsEmpty() {
|
||||
return this.buffer.Read(b)
|
||||
|
||||
@@ -2,11 +2,13 @@ package io
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/v2ray/v2ray-core/common/alloc"
|
||||
)
|
||||
|
||||
type BufferedWriter struct {
|
||||
sync.Mutex
|
||||
writer io.Writer
|
||||
buffer *alloc.Buffer
|
||||
cached bool
|
||||
@@ -21,20 +23,30 @@ func NewBufferedWriter(rawWriter io.Writer) *BufferedWriter {
|
||||
}
|
||||
|
||||
func (this *BufferedWriter) Write(b []byte) (int, error) {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
|
||||
if this.writer == nil {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
if !this.cached {
|
||||
return this.writer.Write(b)
|
||||
}
|
||||
nBytes, _ := this.buffer.Write(b)
|
||||
if this.buffer.IsFull() {
|
||||
err := this.Flush()
|
||||
if err != nil {
|
||||
return nBytes, err
|
||||
}
|
||||
go this.Flush()
|
||||
}
|
||||
return nBytes, nil
|
||||
}
|
||||
|
||||
func (this *BufferedWriter) Flush() error {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
if this.writer == nil {
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
defer this.buffer.Clear()
|
||||
for !this.buffer.IsEmpty() {
|
||||
nBytes, err := this.writer.Write(this.buffer.Value)
|
||||
@@ -59,6 +71,10 @@ func (this *BufferedWriter) SetCached(cached bool) {
|
||||
|
||||
func (this *BufferedWriter) Release() {
|
||||
this.Flush()
|
||||
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
|
||||
this.buffer.Release()
|
||||
this.buffer = nil
|
||||
this.writer = nil
|
||||
|
||||
@@ -3,12 +3,14 @@ package dokodemo
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
"github.com/v2ray/v2ray-core/common/alloc"
|
||||
v2io "github.com/v2ray/v2ray-core/common/io"
|
||||
"github.com/v2ray/v2ray-core/common/log"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
"github.com/v2ray/v2ray-core/proxy"
|
||||
"github.com/v2ray/v2ray-core/proxy/internal"
|
||||
"github.com/v2ray/v2ray-core/transport/hub"
|
||||
)
|
||||
|
||||
@@ -26,13 +28,21 @@ type DokodemoDoor struct {
|
||||
listeningPort v2net.Port
|
||||
}
|
||||
|
||||
func NewDokodemoDoor(config *Config, packetDispatcher dispatcher.PacketDispatcher) *DokodemoDoor {
|
||||
return &DokodemoDoor{
|
||||
config: config,
|
||||
packetDispatcher: packetDispatcher,
|
||||
address: config.Address,
|
||||
port: config.Port,
|
||||
func NewDokodemoDoor(config *Config, space app.Space) *DokodemoDoor {
|
||||
d := &DokodemoDoor{
|
||||
config: config,
|
||||
address: config.Address,
|
||||
port: config.Port,
|
||||
}
|
||||
space.InitializeApplication(func() error {
|
||||
if !space.HasApp(dispatcher.APP_ID) {
|
||||
log.Error("Dokodemo: Dispatcher is not found in the space.")
|
||||
return app.ErrorMissingApplication
|
||||
}
|
||||
d.packetDispatcher = space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher)
|
||||
return nil
|
||||
})
|
||||
return d
|
||||
}
|
||||
|
||||
func (this *DokodemoDoor) Port() v2net.Port {
|
||||
@@ -153,3 +163,10 @@ func (this *DokodemoDoor) HandleTCPConnection(conn *hub.Connection) {
|
||||
outputFinish.Lock()
|
||||
inputFinish.Lock()
|
||||
}
|
||||
|
||||
func init() {
|
||||
internal.MustRegisterInboundHandlerCreator("dokodemo-door",
|
||||
func(space app.Space, rawConfig interface{}) (proxy.InboundHandler, error) {
|
||||
return NewDokodemoDoor(rawConfig.(*Config), space), nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
package dokodemo
|
||||
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
"github.com/v2ray/v2ray-core/proxy"
|
||||
"github.com/v2ray/v2ray-core/proxy/internal"
|
||||
)
|
||||
|
||||
func init() {
|
||||
internal.MustRegisterInboundHandlerCreator("dokodemo-door",
|
||||
func(space app.Space, rawConfig interface{}) (proxy.InboundHandler, error) {
|
||||
config := rawConfig.(*Config)
|
||||
if !space.HasApp(dispatcher.APP_ID) {
|
||||
return nil, internal.ErrorBadConfiguration
|
||||
}
|
||||
return NewDokodemoDoor(
|
||||
config,
|
||||
space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher)), nil
|
||||
})
|
||||
}
|
||||
@@ -4,32 +4,58 @@ import (
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
testdispatcher "github.com/v2ray/v2ray-core/app/dispatcher/testing"
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
dispatchers "github.com/v2ray/v2ray-core/app/dispatcher/impl"
|
||||
"github.com/v2ray/v2ray-core/app/proxyman"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
v2nettesting "github.com/v2ray/v2ray-core/common/net/testing"
|
||||
netassert "github.com/v2ray/v2ray-core/common/net/testing/assert"
|
||||
. "github.com/v2ray/v2ray-core/proxy/dokodemo"
|
||||
"github.com/v2ray/v2ray-core/proxy/freedom"
|
||||
v2testing "github.com/v2ray/v2ray-core/testing"
|
||||
"github.com/v2ray/v2ray-core/testing/assert"
|
||||
"github.com/v2ray/v2ray-core/testing/servers/tcp"
|
||||
"github.com/v2ray/v2ray-core/testing/servers/udp"
|
||||
)
|
||||
|
||||
func TestDokodemoTCP(t *testing.T) {
|
||||
v2testing.Current(t)
|
||||
|
||||
testPacketDispatcher := testdispatcher.NewTestPacketDispatcher(nil)
|
||||
tcpServer := &tcp.Server{
|
||||
Port: v2nettesting.PickPort(),
|
||||
MsgProcessor: func(data []byte) []byte {
|
||||
buffer := make([]byte, 0, 2048)
|
||||
buffer = append(buffer, []byte("Processed: ")...)
|
||||
buffer = append(buffer, data...)
|
||||
return buffer
|
||||
},
|
||||
}
|
||||
_, err := tcpServer.Start()
|
||||
assert.Error(err).IsNil()
|
||||
|
||||
defer tcpServer.Close()
|
||||
|
||||
space := app.NewSpace()
|
||||
space.BindApp(dispatcher.APP_ID, dispatchers.NewDefaultDispatcher(space))
|
||||
ohm := proxyman.NewDefaultOutboundHandlerManager()
|
||||
ohm.SetDefaultHandler(&freedom.FreedomConnection{})
|
||||
space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, ohm)
|
||||
|
||||
data2Send := "Data to be sent to remote."
|
||||
|
||||
dokodemo := NewDokodemoDoor(&Config{
|
||||
Address: v2net.IPAddress([]byte{1, 2, 3, 4}),
|
||||
Port: 128,
|
||||
Address: v2net.LocalHostIP,
|
||||
Port: tcpServer.Port,
|
||||
Network: v2net.TCPNetwork.AsList(),
|
||||
Timeout: 600,
|
||||
}, testPacketDispatcher)
|
||||
}, space)
|
||||
defer dokodemo.Close()
|
||||
|
||||
assert.Error(space.Initialize()).IsNil()
|
||||
|
||||
port := v2nettesting.PickPort()
|
||||
err := dokodemo.Listen(port)
|
||||
err = dokodemo.Listen(port)
|
||||
assert.Error(err).IsNil()
|
||||
netassert.Port(port).Equals(dokodemo.Port())
|
||||
|
||||
@@ -43,36 +69,51 @@ func TestDokodemoTCP(t *testing.T) {
|
||||
tcpClient.Write([]byte(data2Send))
|
||||
tcpClient.CloseWrite()
|
||||
|
||||
destination := <-testPacketDispatcher.Destination
|
||||
|
||||
response := make([]byte, 1024)
|
||||
nBytes, err := tcpClient.Read(response)
|
||||
assert.Error(err).IsNil()
|
||||
tcpClient.Close()
|
||||
|
||||
assert.StringLiteral("Processed: " + data2Send).Equals(string(response[:nBytes]))
|
||||
assert.Bool(destination.IsTCP()).IsTrue()
|
||||
netassert.Address(destination.Address()).Equals(v2net.IPAddress([]byte{1, 2, 3, 4}))
|
||||
netassert.Port(destination.Port()).Equals(128)
|
||||
}
|
||||
|
||||
func TestDokodemoUDP(t *testing.T) {
|
||||
v2testing.Current(t)
|
||||
|
||||
testPacketDispatcher := testdispatcher.NewTestPacketDispatcher(nil)
|
||||
udpServer := &udp.Server{
|
||||
Port: v2nettesting.PickPort(),
|
||||
MsgProcessor: func(data []byte) []byte {
|
||||
buffer := make([]byte, 0, 2048)
|
||||
buffer = append(buffer, []byte("Processed: ")...)
|
||||
buffer = append(buffer, data...)
|
||||
return buffer
|
||||
},
|
||||
}
|
||||
_, err := udpServer.Start()
|
||||
assert.Error(err).IsNil()
|
||||
|
||||
defer udpServer.Close()
|
||||
|
||||
space := app.NewSpace()
|
||||
space.BindApp(dispatcher.APP_ID, dispatchers.NewDefaultDispatcher(space))
|
||||
ohm := proxyman.NewDefaultOutboundHandlerManager()
|
||||
ohm.SetDefaultHandler(&freedom.FreedomConnection{})
|
||||
space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, ohm)
|
||||
|
||||
data2Send := "Data to be sent to remote."
|
||||
|
||||
dokodemo := NewDokodemoDoor(&Config{
|
||||
Address: v2net.IPAddress([]byte{5, 6, 7, 8}),
|
||||
Port: 256,
|
||||
Address: v2net.LocalHostIP,
|
||||
Port: udpServer.Port,
|
||||
Network: v2net.UDPNetwork.AsList(),
|
||||
Timeout: 600,
|
||||
}, testPacketDispatcher)
|
||||
}, space)
|
||||
defer dokodemo.Close()
|
||||
|
||||
assert.Error(space.Initialize()).IsNil()
|
||||
|
||||
port := v2nettesting.PickPort()
|
||||
err := dokodemo.Listen(port)
|
||||
err = dokodemo.Listen(port)
|
||||
assert.Error(err).IsNil()
|
||||
netassert.Port(port).Equals(dokodemo.Port())
|
||||
|
||||
@@ -82,13 +123,13 @@ func TestDokodemoUDP(t *testing.T) {
|
||||
Zone: "",
|
||||
})
|
||||
assert.Error(err).IsNil()
|
||||
defer udpClient.Close()
|
||||
|
||||
udpClient.Write([]byte(data2Send))
|
||||
udpClient.Close()
|
||||
|
||||
destination := <-testPacketDispatcher.Destination
|
||||
|
||||
assert.Bool(destination.IsUDP()).IsTrue()
|
||||
netassert.Address(destination.Address()).Equals(v2net.IPAddress([]byte{5, 6, 7, 8}))
|
||||
netassert.Port(destination.Port()).Equals(256)
|
||||
response := make([]byte, 1024)
|
||||
nBytes, addr, err := udpClient.ReadFromUDP(response)
|
||||
assert.Error(err).IsNil()
|
||||
netassert.IP(addr.IP).Equals(v2net.LocalHostIP.IP())
|
||||
assert.Bytes(response[:nBytes]).Equals([]byte("Processed: " + data2Send))
|
||||
}
|
||||
|
||||
@@ -10,6 +10,8 @@ import (
|
||||
"golang.org/x/net/proxy"
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dns"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
v2nettesting "github.com/v2ray/v2ray-core/common/net/testing"
|
||||
v2proxy "github.com/v2ray/v2ray-core/proxy"
|
||||
proxytesting "github.com/v2ray/v2ray-core/proxy/testing"
|
||||
@@ -44,6 +46,11 @@ func TestSocksTcpConnect(t *testing.T) {
|
||||
"auth": "noauth"
|
||||
}`),
|
||||
},
|
||||
DNSConfig: &dns.Config{
|
||||
NameServers: []v2net.Destination{
|
||||
v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)),
|
||||
},
|
||||
},
|
||||
OutboundConfig: &point.ConnectionConfig{
|
||||
Protocol: protocol,
|
||||
Settings: nil,
|
||||
@@ -106,6 +113,11 @@ func TestSocksTcpConnectWithUserPass(t *testing.T) {
|
||||
]
|
||||
}`),
|
||||
},
|
||||
DNSConfig: &dns.Config{
|
||||
NameServers: []v2net.Destination{
|
||||
v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)),
|
||||
},
|
||||
},
|
||||
OutboundConfig: &point.ConnectionConfig{
|
||||
Protocol: protocol,
|
||||
Settings: nil,
|
||||
@@ -168,6 +180,11 @@ func TestSocksTcpConnectWithWrongUserPass(t *testing.T) {
|
||||
]
|
||||
}`),
|
||||
},
|
||||
DNSConfig: &dns.Config{
|
||||
NameServers: []v2net.Destination{
|
||||
v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)),
|
||||
},
|
||||
},
|
||||
OutboundConfig: &point.ConnectionConfig{
|
||||
Protocol: protocol,
|
||||
Settings: nil,
|
||||
@@ -216,6 +233,11 @@ func TestSocksTcpConnectWithWrongAuthMethod(t *testing.T) {
|
||||
]
|
||||
}`),
|
||||
},
|
||||
DNSConfig: &dns.Config{
|
||||
NameServers: []v2net.Destination{
|
||||
v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)),
|
||||
},
|
||||
},
|
||||
OutboundConfig: &point.ConnectionConfig{
|
||||
Protocol: protocol,
|
||||
Settings: nil,
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
"github.com/v2ray/v2ray-core/app/dns"
|
||||
v2net "github.com/v2ray/v2ray-core/common/net"
|
||||
v2nettesting "github.com/v2ray/v2ray-core/common/net/testing"
|
||||
"github.com/v2ray/v2ray-core/common/protocol"
|
||||
@@ -46,6 +47,11 @@ func TestVMessInAndOut(t *testing.T) {
|
||||
|
||||
configA := &point.Config{
|
||||
Port: portA,
|
||||
DNSConfig: &dns.Config{
|
||||
NameServers: []v2net.Destination{
|
||||
v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)),
|
||||
},
|
||||
},
|
||||
InboundConfig: &point.ConnectionConfig{
|
||||
Protocol: protocol,
|
||||
Settings: nil,
|
||||
@@ -86,6 +92,11 @@ func TestVMessInAndOut(t *testing.T) {
|
||||
|
||||
configB := &point.Config{
|
||||
Port: portB,
|
||||
DNSConfig: &dns.Config{
|
||||
NameServers: []v2net.Destination{
|
||||
v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)),
|
||||
},
|
||||
},
|
||||
InboundConfig: &point.ConnectionConfig{
|
||||
Protocol: "vmess",
|
||||
Settings: []byte(`{
|
||||
|
||||
@@ -7,6 +7,7 @@ package point
|
||||
import (
|
||||
"github.com/v2ray/v2ray-core/app"
|
||||
"github.com/v2ray/v2ray-core/app/dispatcher"
|
||||
dispatchers "github.com/v2ray/v2ray-core/app/dispatcher/impl"
|
||||
"github.com/v2ray/v2ray-core/app/dns"
|
||||
"github.com/v2ray/v2ray-core/app/proxyman"
|
||||
"github.com/v2ray/v2ray-core/app/router"
|
||||
@@ -15,7 +16,6 @@ import (
|
||||
"github.com/v2ray/v2ray-core/common/retry"
|
||||
"github.com/v2ray/v2ray-core/proxy"
|
||||
proxyrepo "github.com/v2ray/v2ray-core/proxy/repo"
|
||||
"github.com/v2ray/v2ray-core/transport/ray"
|
||||
)
|
||||
|
||||
// Point shell of V2Ray.
|
||||
@@ -27,7 +27,7 @@ type Point struct {
|
||||
taggedIdh map[string]InboundDetourHandler
|
||||
odh map[string]proxy.OutboundHandler
|
||||
router router.Router
|
||||
space *app.SpaceController
|
||||
space app.Space
|
||||
}
|
||||
|
||||
// NewPoint returns a new Point server based on given configuration.
|
||||
@@ -55,12 +55,33 @@ func NewPoint(pConfig *Config) (*Point, error) {
|
||||
log.SetLogLevel(logConfig.LogLevel)
|
||||
}
|
||||
|
||||
vpoint.space = app.NewController()
|
||||
vpoint.space.Bind(dispatcher.APP_ID, vpoint)
|
||||
vpoint.space.Bind(proxyman.APP_ID_INBOUND_MANAGER, vpoint)
|
||||
vpoint.space = app.NewSpace()
|
||||
vpoint.space.BindApp(proxyman.APP_ID_INBOUND_MANAGER, vpoint)
|
||||
|
||||
outboundHandlerManager := proxyman.NewDefaultOutboundHandlerManager()
|
||||
vpoint.space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, outboundHandlerManager)
|
||||
|
||||
dnsConfig := pConfig.DNSConfig
|
||||
if dnsConfig != nil {
|
||||
dnsServer := dns.NewCacheServer(vpoint.space, dnsConfig)
|
||||
vpoint.space.BindApp(dns.APP_ID, dnsServer)
|
||||
}
|
||||
|
||||
routerConfig := pConfig.RouterConfig
|
||||
if routerConfig != nil {
|
||||
r, err := router.CreateRouter(routerConfig.Strategy, routerConfig.Settings, vpoint.space)
|
||||
if err != nil {
|
||||
log.Error("Failed to create router: ", err)
|
||||
return nil, ErrorBadConfiguration
|
||||
}
|
||||
vpoint.space.BindApp(router.APP_ID, r)
|
||||
vpoint.router = r
|
||||
}
|
||||
|
||||
vpoint.space.BindApp(dispatcher.APP_ID, dispatchers.NewDefaultDispatcher(vpoint.space))
|
||||
|
||||
ichConfig := pConfig.InboundConfig.Settings
|
||||
ich, err := proxyrepo.CreateInboundHandler(pConfig.InboundConfig.Protocol, vpoint.space.ForContext("vpoint-default-inbound"), ichConfig)
|
||||
ich, err := proxyrepo.CreateInboundHandler(pConfig.InboundConfig.Protocol, vpoint.space, ichConfig)
|
||||
if err != nil {
|
||||
log.Error("Failed to create inbound connection handler: ", err)
|
||||
return nil, err
|
||||
@@ -68,12 +89,13 @@ func NewPoint(pConfig *Config) (*Point, error) {
|
||||
vpoint.ich = ich
|
||||
|
||||
ochConfig := pConfig.OutboundConfig.Settings
|
||||
och, err := proxyrepo.CreateOutboundHandler(pConfig.OutboundConfig.Protocol, vpoint.space.ForContext("vpoint-default-outbound"), ochConfig)
|
||||
och, err := proxyrepo.CreateOutboundHandler(pConfig.OutboundConfig.Protocol, vpoint.space, ochConfig)
|
||||
if err != nil {
|
||||
log.Error("Failed to create outbound connection handler: ", err)
|
||||
return nil, err
|
||||
}
|
||||
vpoint.och = och
|
||||
outboundHandlerManager.SetDefaultHandler(och)
|
||||
|
||||
vpoint.taggedIdh = make(map[string]InboundDetourHandler)
|
||||
detours := pConfig.InboundDetours
|
||||
@@ -84,14 +106,14 @@ func NewPoint(pConfig *Config) (*Point, error) {
|
||||
var detourHandler InboundDetourHandler
|
||||
switch allocConfig.Strategy {
|
||||
case AllocationStrategyAlways:
|
||||
dh, err := NewInboundDetourHandlerAlways(vpoint.space.ForContext(detourConfig.Tag), detourConfig)
|
||||
dh, err := NewInboundDetourHandlerAlways(vpoint.space, detourConfig)
|
||||
if err != nil {
|
||||
log.Error("Point: Failed to create detour handler: ", err)
|
||||
return nil, ErrorBadConfiguration
|
||||
}
|
||||
detourHandler = dh
|
||||
case AllocationStrategyRandom:
|
||||
dh, err := NewInboundDetourHandlerDynamic(vpoint.space.ForContext(detourConfig.Tag), detourConfig)
|
||||
dh, err := NewInboundDetourHandlerDynamic(vpoint.space, detourConfig)
|
||||
if err != nil {
|
||||
log.Error("Point: Failed to create detour handler: ", err)
|
||||
return nil, ErrorBadConfiguration
|
||||
@@ -112,29 +134,18 @@ func NewPoint(pConfig *Config) (*Point, error) {
|
||||
if len(outboundDetours) > 0 {
|
||||
vpoint.odh = make(map[string]proxy.OutboundHandler)
|
||||
for _, detourConfig := range outboundDetours {
|
||||
detourHandler, err := proxyrepo.CreateOutboundHandler(detourConfig.Protocol, vpoint.space.ForContext(detourConfig.Tag), detourConfig.Settings)
|
||||
detourHandler, err := proxyrepo.CreateOutboundHandler(detourConfig.Protocol, vpoint.space, detourConfig.Settings)
|
||||
if err != nil {
|
||||
log.Error("Failed to create detour outbound connection handler: ", err)
|
||||
return nil, err
|
||||
}
|
||||
vpoint.odh[detourConfig.Tag] = detourHandler
|
||||
outboundHandlerManager.SetHandler(detourConfig.Tag, detourHandler)
|
||||
}
|
||||
}
|
||||
|
||||
dnsConfig := pConfig.DNSConfig
|
||||
if dnsConfig != nil {
|
||||
dnsServer := dns.NewCacheServer(vpoint.space.ForContext("system.dns"), dnsConfig)
|
||||
vpoint.space.Bind(dns.APP_ID, dnsServer)
|
||||
}
|
||||
|
||||
routerConfig := pConfig.RouterConfig
|
||||
if routerConfig != nil {
|
||||
r, err := router.CreateRouter(routerConfig.Strategy, routerConfig.Settings, vpoint.space.ForContext("system.router"))
|
||||
if err != nil {
|
||||
log.Error("Failed to create router: ", err)
|
||||
return nil, ErrorBadConfiguration
|
||||
}
|
||||
vpoint.router = r
|
||||
if err := vpoint.space.Initialize(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return vpoint, nil
|
||||
@@ -177,40 +188,7 @@ func (this *Point) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Dispatches a Packet to an OutboundConnection.
|
||||
// The packet will be passed through the router (if configured), and then sent to an outbound
|
||||
// connection with matching tag.
|
||||
func (this *Point) DispatchToOutbound(context app.Context, destination v2net.Destination) ray.InboundRay {
|
||||
direct := ray.NewRay()
|
||||
dispatcher := this.och
|
||||
|
||||
if this.router != nil {
|
||||
if tag, err := this.router.TakeDetour(destination); err == nil {
|
||||
if handler, found := this.odh[tag]; found {
|
||||
log.Info("Point: Taking detour [", tag, "] for [", destination, "]")
|
||||
dispatcher = handler
|
||||
} else {
|
||||
log.Warning("Point: Unable to find routing destination: ", tag)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
go this.FilterPacketAndDispatch(destination, direct, dispatcher)
|
||||
return direct
|
||||
}
|
||||
|
||||
func (this *Point) FilterPacketAndDispatch(destination v2net.Destination, link ray.OutboundRay, dispatcher proxy.OutboundHandler) {
|
||||
payload, err := link.OutboundInput().Read()
|
||||
if err != nil {
|
||||
log.Info("Point: No payload to dispatch, stopping dispatching now.")
|
||||
link.OutboundOutput().Release()
|
||||
link.OutboundInput().Release()
|
||||
return
|
||||
}
|
||||
dispatcher.Dispatch(destination, payload, link)
|
||||
}
|
||||
|
||||
func (this *Point) GetHandler(context app.Context, tag string) (proxy.InboundHandler, int) {
|
||||
func (this *Point) GetHandler(tag string) (proxy.InboundHandler, int) {
|
||||
handler, found := this.taggedIdh[tag]
|
||||
if !found {
|
||||
log.Warning("Point: Unable to find an inbound handler with tag: ", tag)
|
||||
@@ -218,3 +196,7 @@ func (this *Point) GetHandler(context app.Context, tag string) (proxy.InboundHan
|
||||
}
|
||||
return handler.GetConnectionHandler()
|
||||
}
|
||||
|
||||
func (this *Point) Release() {
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user