Browse Source

MemoryStreamSettings

pull/1269/head
Darien Raymond 6 years ago
parent
commit
b3847fb7c0
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
  1. 8
      app/proxyman/inbound/always.go
  2. 11
      app/proxyman/inbound/dynamic.go
  3. 2
      app/proxyman/inbound/worker.go
  4. 10
      app/proxyman/outbound/handler.go
  5. 2
      transport/config.proto
  6. 22
      transport/internet/context.go
  7. 21
      transport/internet/dialer.go
  8. 4
      transport/internet/domainsocket/dial.go
  9. 16
      transport/internet/domainsocket/listener_test.go
  10. 4
      transport/internet/http/dialer.go
  11. 24
      transport/internet/http/http_test.go
  12. 4
      transport/internet/http/hub.go
  13. 2
      transport/internet/kcp/dialer.go
  14. 11
      transport/internet/kcp/kcp_test.go
  15. 4
      transport/internet/kcp/listener.go
  16. 33
      transport/internet/memory_settings.go
  17. 2
      transport/internet/sockopt_linux_test.go
  18. 4
      transport/internet/tcp/dialer.go
  19. 14
      transport/internet/tcp_hub.go
  20. 6
      transport/internet/tls/config.go
  21. 2
      transport/internet/websocket/dialer.go
  22. 4
      transport/internet/websocket/hub.go
  23. 48
      transport/internet/websocket/ws_test.go

8
app/proxyman/inbound/always.go

@ -11,6 +11,7 @@ import (
"v2ray.com/core/common/net"
"v2ray.com/core/common/serial"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/internet"
)
func getStatCounter(v *core.Instance, tag string) (core.StatCounter, core.StatCounter) {
@ -71,11 +72,16 @@ func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *
for port := pr.From; port <= pr.To; port++ {
if nl.HasNetwork(net.Network_TCP) {
newError("creating stream worker on ", address, ":", port).AtDebug().WriteToLog()
mss, err := internet.ToMemoryStreamConfig(receiverConfig.StreamSettings)
if err != nil {
return nil, newError("failed to parse stream config").Base(err).AtWarning()
}
worker := &tcpWorker{
address: address,
port: net.Port(port),
proxy: p,
stream: receiverConfig.StreamSettings,
stream: mss,
recvOrigDest: receiverConfig.ReceiveOriginalDestination,
tag: tag,
dispatcher: h.mux,

11
app/proxyman/inbound/dynamic.go

@ -12,6 +12,7 @@ import (
"v2ray.com/core/common/net"
"v2ray.com/core/common/task"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/internet"
)
type DynamicInboundHandler struct {
@ -19,6 +20,7 @@ type DynamicInboundHandler struct {
v *core.Instance
proxyConfig interface{}
receiverConfig *proxyman.ReceiverConfig
streamSettings *internet.MemoryStreamConfig
portMutex sync.Mutex
portsInUse map[net.Port]bool
workerMutex sync.RWMutex
@ -39,6 +41,13 @@ func NewDynamicInboundHandler(ctx context.Context, tag string, receiverConfig *p
v: v,
}
mss, err := internet.ToMemoryStreamConfig(receiverConfig.StreamSettings)
if err != nil {
return nil, newError("failed to parse stream settings").Base(err).AtWarning()
}
h.streamSettings = mss
h.task = &task.Periodic{
Interval: time.Minute * time.Duration(h.receiverConfig.AllocationStrategy.GetRefreshValue()),
Execute: h.refresh,
@ -110,7 +119,7 @@ func (h *DynamicInboundHandler) refresh() error {
address: address,
port: port,
proxy: p,
stream: h.receiverConfig.StreamSettings,
stream: h.streamSettings,
recvOrigDest: h.receiverConfig.ReceiveOriginalDestination,
dispatcher: h.mux,
sniffingConfig: h.receiverConfig.GetEffectiveSniffingSettings(),

2
app/proxyman/inbound/worker.go

@ -33,7 +33,7 @@ type tcpWorker struct {
address net.Address
port net.Port
proxy proxy.Inbound
stream *internet.StreamConfig
stream *internet.MemoryStreamConfig
recvOrigDest bool
tag string
dispatcher core.Dispatcher

10
app/proxyman/outbound/handler.go

@ -17,6 +17,7 @@ import (
type Handler struct {
config *core.OutboundHandlerConfig
senderSettings *proxyman.SenderConfig
streamSettings *internet.MemoryStreamConfig
proxy proxy.Outbound
outboundManager core.OutboundHandlerManager
mux *mux.ClientManager
@ -37,6 +38,11 @@ func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (core.O
switch s := senderSettings.(type) {
case *proxyman.SenderConfig:
h.senderSettings = s
mss, err := internet.ToMemoryStreamConfig(s.StreamSettings)
if err != nil {
return nil, newError("failed to parse stream settings").Base(err).AtWarning()
}
h.streamSettings = mss
default:
return nil, newError("settings is not SenderConfig")
}
@ -118,9 +124,7 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn
ctx = internet.ContextWithDialerSource(ctx, h.senderSettings.Via.AsAddress())
}
if h.senderSettings.StreamSettings != nil {
ctx = internet.ContextWithStreamSettings(ctx, h.senderSettings.StreamSettings)
}
ctx = internet.ContextWithStreamSettings(ctx, h.streamSettings)
}
return internet.Dial(ctx, dest)

2
transport/config.proto

@ -11,4 +11,4 @@ import "v2ray.com/core/transport/internet/config.proto";
// Global transport settings. This affects all type of connections that go through V2Ray.
message Config {
repeated v2ray.core.transport.internet.TransportConfig transport_settings = 1;
}
}

22
transport/internet/context.go

@ -15,16 +15,16 @@ const (
securitySettingsKey
)
func ContextWithStreamSettings(ctx context.Context, streamSettings *StreamConfig) context.Context {
func ContextWithStreamSettings(ctx context.Context, streamSettings *MemoryStreamConfig) context.Context {
return context.WithValue(ctx, streamSettingsKey, streamSettings)
}
func StreamSettingsFromContext(ctx context.Context) *StreamConfig {
func StreamSettingsFromContext(ctx context.Context) *MemoryStreamConfig {
ss := ctx.Value(streamSettingsKey)
if ss == nil {
return nil
}
return ss.(*StreamConfig)
return ss.(*MemoryStreamConfig)
}
func ContextWithDialerSource(ctx context.Context, addr net.Address) context.Context {
@ -37,19 +37,3 @@ func DialerSourceFromContext(ctx context.Context) net.Address {
}
return net.AnyIP
}
func ContextWithTransportSettings(ctx context.Context, transportSettings interface{}) context.Context {
return context.WithValue(ctx, transportSettingsKey, transportSettings)
}
func TransportSettingsFromContext(ctx context.Context) interface{} {
return ctx.Value(transportSettingsKey)
}
func ContextWithSecuritySettings(ctx context.Context, securitySettings interface{}) context.Context {
return context.WithValue(ctx, securitySettingsKey, securitySettings)
}
func SecuritySettingsFromContext(ctx context.Context) interface{} {
return ctx.Value(securitySettingsKey)
}

21
transport/internet/dialer.go

@ -24,18 +24,19 @@ func RegisterTransportDialer(protocol string, dialer Dialer) error {
func Dial(ctx context.Context, dest net.Destination) (Connection, error) {
if dest.Network == net.Network_TCP {
streamSettings := StreamSettingsFromContext(ctx)
protocol := streamSettings.GetEffectiveProtocol()
transportSettings, err := streamSettings.GetEffectiveTransportSettings()
if err != nil {
return nil, err
}
ctx = ContextWithTransportSettings(ctx, transportSettings)
if streamSettings != nil && streamSettings.HasSecuritySettings() {
securitySettings, err := streamSettings.GetEffectiveSecuritySettings()
var protocol string
if streamSettings != nil {
protocol = streamSettings.ProtocolName
} else {
protocol = "tcp"
pSettings, err := CreateTransportConfigByName(protocol)
if err != nil {
return nil, err
return nil, newError("failed to create default config for protocol: ", protocol).Base(err)
}
ctx = ContextWithSecuritySettings(ctx, securitySettings)
ctx = ContextWithStreamSettings(ctx, &MemoryStreamConfig{
ProtocolName: protocol,
ProtocolSettings: pSettings,
})
}
dialer := transportDialerCache[protocol]
if dialer == nil {

4
transport/internet/domainsocket/dial.go

@ -13,11 +13,11 @@ import (
)
func getSettingsFromContext(ctx context.Context) *Config {
rawSettings := internet.TransportSettingsFromContext(ctx)
rawSettings := internet.StreamSettingsFromContext(ctx)
if rawSettings == nil {
return nil
}
return rawSettings.(*Config)
return rawSettings.ProtocolSettings.(*Config)
}
func Dial(ctx context.Context, dest net.Destination) (internet.Connection, error) {

16
transport/internet/domainsocket/listener_test.go

@ -18,8 +18,11 @@ import (
func TestListen(t *testing.T) {
assert := With(t)
ctx := internet.ContextWithTransportSettings(context.Background(), &Config{
Path: "/tmp/ts3",
ctx := internet.ContextWithStreamSettings(context.Background(), &internet.MemoryStreamConfig{
ProtocolName: "domainsocket",
ProtocolSettings: &Config{
Path: "/tmp/ts3",
},
})
listener, err := Listen(ctx, nil, net.Port(0), func(conn internet.Connection) {
defer conn.Close()
@ -53,9 +56,12 @@ func TestListenAbstract(t *testing.T) {
assert := With(t)
ctx := internet.ContextWithTransportSettings(context.Background(), &Config{
Path: "/tmp/ts3",
Abstract: true,
ctx := internet.ContextWithStreamSettings(context.Background(), &internet.MemoryStreamConfig{
ProtocolName: "domainsocket",
ProtocolSettings: &Config{
Path: "/tmp/ts3",
Abstract: true,
},
})
listener, err := Listen(ctx, nil, net.Port(0), func(conn internet.Connection) {
defer conn.Close()

4
transport/internet/http/dialer.go

@ -72,8 +72,8 @@ func getHTTPClient(ctx context.Context, dest net.Destination) (*http.Client, err
// Dial dials a new TCP connection to the given destination.
func Dial(ctx context.Context, dest net.Destination) (internet.Connection, error) {
rawSettings := internet.TransportSettingsFromContext(ctx)
httpSettings, ok := rawSettings.(*Config)
rawSettings := internet.StreamSettingsFromContext(ctx)
httpSettings, ok := rawSettings.ProtocolSettings.(*Config)
if !ok {
return nil, newError("HTTP config is not set.").AtError()
}

24
transport/internet/http/http_test.go

@ -22,11 +22,14 @@ func TestHTTPConnection(t *testing.T) {
port := tcp.PickPort()
lctx := context.Background()
lctx = internet.ContextWithSecuritySettings(lctx, &tls.Config{
Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil, cert.CommonName("www.v2ray.com")))},
lctx := internet.ContextWithStreamSettings(context.Background(), &internet.MemoryStreamConfig{
ProtocolName: "http",
ProtocolSettings: &Config{},
SecurityType: "tls",
SecuritySettings: &tls.Config{
Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil, cert.CommonName("www.v2ray.com")))},
},
})
lctx = internet.ContextWithTransportSettings(lctx, &Config{})
listener, err := Listen(lctx, net.LocalHostIP, port, func(conn internet.Connection) {
go func() {
@ -51,12 +54,15 @@ func TestHTTPConnection(t *testing.T) {
time.Sleep(time.Second)
dctx := context.Background()
dctx = internet.ContextWithSecuritySettings(dctx, &tls.Config{
ServerName: "www.v2ray.com",
AllowInsecure: true,
dctx := internet.ContextWithStreamSettings(context.Background(), &internet.MemoryStreamConfig{
ProtocolName: "http",
ProtocolSettings: &Config{},
SecurityType: "tls",
SecuritySettings: &tls.Config{
ServerName: "www.v2ray.com",
AllowInsecure: true,
},
})
dctx = internet.ContextWithTransportSettings(dctx, &Config{})
conn, err := Dial(dctx, net.TCPDestination(net.LocalHostIP, port))
assert(err, IsNil)
defer conn.Close()

4
transport/internet/http/hub.go

@ -88,8 +88,8 @@ func (l *Listener) ServeHTTP(writer http.ResponseWriter, request *http.Request)
}
func Listen(ctx context.Context, address net.Address, port net.Port, handler internet.ConnHandler) (internet.Listener, error) {
rawSettings := internet.TransportSettingsFromContext(ctx)
httpSettings, ok := rawSettings.(*Config)
rawSettings := internet.StreamSettingsFromContext(ctx)
httpSettings, ok := rawSettings.ProtocolSettings.(*Config)
if !ok {
return nil, newError("HTTP config is not set.").AtError()
}

2
transport/internet/kcp/dialer.go

@ -55,7 +55,7 @@ func DialKCP(ctx context.Context, dest net.Destination) (internet.Connection, er
return nil, newError("failed to dial to dest: ", err).AtWarning().Base(err)
}
kcpSettings := internet.TransportSettingsFromContext(ctx).(*Config)
kcpSettings := internet.StreamSettingsFromContext(ctx).ProtocolSettings.(*Config)
header, err := kcpSettings.GetPackerHeader()
if err != nil {

11
transport/internet/kcp/kcp_test.go

@ -17,7 +17,11 @@ import (
func TestDialAndListen(t *testing.T) {
assert := With(t)
listerner, err := NewListener(internet.ContextWithTransportSettings(context.Background(), &Config{}), net.LocalHostIP, net.Port(0), func(conn internet.Connection) {
lctx := internet.ContextWithStreamSettings(context.Background(), &internet.MemoryStreamConfig{
ProtocolName: "mkcp",
ProtocolSettings: &Config{},
})
listerner, err := NewListener(lctx, net.LocalHostIP, net.Port(0), func(conn internet.Connection) {
go func(c internet.Connection) {
payload := make([]byte, 4096)
for {
@ -36,7 +40,10 @@ func TestDialAndListen(t *testing.T) {
assert(err, IsNil)
port := net.Port(listerner.Addr().(*net.UDPAddr).Port)
ctx := internet.ContextWithTransportSettings(context.Background(), &Config{})
ctx := internet.ContextWithStreamSettings(context.Background(), &internet.MemoryStreamConfig{
ProtocolName: "mkcp",
ProtocolSettings: &Config{},
})
wg := new(sync.WaitGroup)
for i := 0; i < 10; i++ {
clientConn, err := DialKCP(ctx, net.UDPDestination(net.LocalHostIP, port))

4
transport/internet/kcp/listener.go

@ -34,8 +34,8 @@ type Listener struct {
}
func NewListener(ctx context.Context, address net.Address, port net.Port, addConn internet.ConnHandler) (*Listener, error) {
networkSettings := internet.TransportSettingsFromContext(ctx)
kcpSettings := networkSettings.(*Config)
networkSettings := internet.StreamSettingsFromContext(ctx)
kcpSettings := networkSettings.ProtocolSettings.(*Config)
header, err := kcpSettings.GetPackerHeader()
if err != nil {

33
transport/internet/memory_settings.go

@ -0,0 +1,33 @@
package internet
type MemoryStreamConfig struct {
ProtocolName string
ProtocolSettings interface{}
SecurityType string
SecuritySettings interface{}
SocketSettings *SocketConfig
}
func ToMemoryStreamConfig(s *StreamConfig) (*MemoryStreamConfig, error) {
ets, err := s.GetEffectiveTransportSettings()
if err != nil {
return nil, err
}
mss := &MemoryStreamConfig{
ProtocolName: s.GetEffectiveProtocol(),
ProtocolSettings: ets,
SocketSettings: s.SocketSettings,
}
if s != nil && s.HasSecuritySettings() {
ess, err := s.GetEffectiveSecuritySettings()
if err != nil {
return nil, err
}
mss.SecurityType = s.SecurityType
mss.SecuritySettings = ess
}
return mss, nil
}

2
transport/internet/sockopt_linux_test.go

@ -25,7 +25,7 @@ func TestSockOptMark(t *testing.T) {
const mark = 1
ctx := context.Background()
ctx = ContextWithStreamSettings(ctx, &StreamConfig{
ctx = ContextWithStreamSettings(ctx, &MemoryStreamConfig{
SocketSettings: &SocketConfig{
Mark: mark,
},

4
transport/internet/tcp/dialer.go

@ -11,11 +11,11 @@ import (
)
func getTCPSettingsFromContext(ctx context.Context) *Config {
rawTCPSettings := internet.TransportSettingsFromContext(ctx)
rawTCPSettings := internet.StreamSettingsFromContext(ctx)
if rawTCPSettings == nil {
return nil
}
return rawTCPSettings.(*Config)
return rawTCPSettings.ProtocolSettings.(*Config)
}
// Dial dials a new TCP connection to the given destination.

14
transport/internet/tcp_hub.go

@ -29,19 +29,7 @@ type Listener interface {
func ListenTCP(ctx context.Context, address net.Address, port net.Port, handler ConnHandler) (Listener, error) {
settings := StreamSettingsFromContext(ctx)
protocol := settings.GetEffectiveProtocol()
transportSettings, err := settings.GetEffectiveTransportSettings()
if err != nil {
return nil, err
}
ctx = ContextWithTransportSettings(ctx, transportSettings)
if settings != nil && settings.HasSecuritySettings() {
securitySettings, err := settings.GetEffectiveSecuritySettings()
if err != nil {
return nil, err
}
ctx = ContextWithSecuritySettings(ctx, securitySettings)
}
protocol := settings.ProtocolName
listenFunc := transportListenerCache[protocol]
if listenFunc == nil {
return nil, newError(protocol, " listener not registered.").AtError()

6
transport/internet/tls/config.go

@ -217,11 +217,11 @@ func WithNextProto(protocol ...string) Option {
// ConfigFromContext fetches Config from context. Nil if not found.
func ConfigFromContext(ctx context.Context) *Config {
securitySettings := internet.SecuritySettingsFromContext(ctx)
if securitySettings == nil {
streamSettings := internet.StreamSettingsFromContext(ctx)
if streamSettings == nil {
return nil
}
config, ok := securitySettings.(*Config)
config, ok := streamSettings.SecuritySettings.(*Config)
if !ok {
return nil
}

2
transport/internet/websocket/dialer.go

@ -30,7 +30,7 @@ func init() {
func dialWebsocket(ctx context.Context, dest net.Destination) (net.Conn, error) {
src := internet.DialerSourceFromContext(ctx)
wsSettings := internet.TransportSettingsFromContext(ctx).(*Config)
wsSettings := internet.StreamSettingsFromContext(ctx).ProtocolSettings.(*Config)
dialer := &websocket.Dialer{
NetDial: func(network, addr string) (net.Conn, error) {

4
transport/internet/websocket/hub.go

@ -57,8 +57,8 @@ type Listener struct {
}
func ListenWS(ctx context.Context, address net.Address, port net.Port, addConn internet.ConnHandler) (internet.Listener, error) {
networkSettings := internet.TransportSettingsFromContext(ctx)
wsSettings := networkSettings.(*Config)
networkSettings := internet.StreamSettingsFromContext(ctx)
wsSettings := networkSettings.ProtocolSettings.(*Config)
l := &Listener{
config: wsSettings,

48
transport/internet/websocket/ws_test.go

@ -17,9 +17,14 @@ import (
func Test_listenWSAndDial(t *testing.T) {
assert := With(t)
listen, err := ListenWS(internet.ContextWithTransportSettings(context.Background(), &Config{
Path: "ws",
}), net.DomainAddress("localhost"), 13146, func(conn internet.Connection) {
lctx := internet.ContextWithStreamSettings(context.Background(), &internet.MemoryStreamConfig{
ProtocolName: "websocket",
ProtocolSettings: &Config{
Path: "ws",
},
})
listen, err := ListenWS(lctx, net.DomainAddress("localhost"), 13146, func(conn internet.Connection) {
go func(c internet.Connection) {
defer c.Close()
@ -37,7 +42,10 @@ func Test_listenWSAndDial(t *testing.T) {
})
assert(err, IsNil)
ctx := internet.ContextWithTransportSettings(context.Background(), &Config{Path: "ws"})
ctx := internet.ContextWithStreamSettings(context.Background(), &internet.MemoryStreamConfig{
ProtocolName: "websocket",
ProtocolSettings: &Config{Path: "ws"},
})
conn, err := Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13146))
assert(err, IsNil)
@ -65,9 +73,13 @@ func Test_listenWSAndDial(t *testing.T) {
func TestDialWithRemoteAddr(t *testing.T) {
assert := With(t)
listen, err := ListenWS(internet.ContextWithTransportSettings(context.Background(), &Config{
Path: "ws",
}), net.DomainAddress("localhost"), 13148, func(conn internet.Connection) {
lctx := internet.ContextWithStreamSettings(context.Background(), &internet.MemoryStreamConfig{
ProtocolName: "websocket",
ProtocolSettings: &Config{
Path: "ws",
},
})
listen, err := ListenWS(lctx, net.DomainAddress("localhost"), 13148, func(conn internet.Connection) {
go func(c internet.Connection) {
defer c.Close()
@ -87,7 +99,10 @@ func TestDialWithRemoteAddr(t *testing.T) {
})
assert(err, IsNil)
ctx := internet.ContextWithTransportSettings(context.Background(), &Config{Path: "ws", Header: []*Header{{Key: "X-Forwarded-For", Value: "1.1.1.1"}}})
ctx := internet.ContextWithStreamSettings(context.Background(), &internet.MemoryStreamConfig{
ProtocolName: "websocket",
ProtocolSettings: &Config{Path: "ws", Header: []*Header{{Key: "X-Forwarded-For", Value: "1.1.1.1"}}},
})
conn, err := Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13148))
assert(err, IsNil)
@ -111,13 +126,18 @@ func Test_listenWSAndDial_TLS(t *testing.T) {
start := time.Now()
ctx := internet.ContextWithTransportSettings(context.Background(), &Config{
Path: "wss",
})
ctx = internet.ContextWithSecuritySettings(ctx, &tls.Config{
AllowInsecure: true,
Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil, cert.CommonName("localhost")))},
ctx := internet.ContextWithStreamSettings(context.Background(), &internet.MemoryStreamConfig{
ProtocolName: "websocket",
ProtocolSettings: &Config{
Path: "wss",
},
SecurityType: "tls",
SecuritySettings: &tls.Config{
AllowInsecure: true,
Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil, cert.CommonName("localhost")))},
},
})
listen, err := ListenWS(ctx, net.DomainAddress("localhost"), 13143, func(conn internet.Connection) {
go func() {
_ = conn.Close()

Loading…
Cancel
Save