move policy and dnsClient

pull/1331/head^2
Darien Raymond 2018-10-11 22:34:31 +02:00
parent 273342d0b9
commit b4821c5ed5
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
21 changed files with 245 additions and 220 deletions

View File

@ -18,6 +18,7 @@ import (
"v2ray.com/core/common/stats" "v2ray.com/core/common/stats"
"v2ray.com/core/common/vio" "v2ray.com/core/common/vio"
"v2ray.com/core/features/outbound" "v2ray.com/core/features/outbound"
"v2ray.com/core/features/policy"
"v2ray.com/core/features/routing" "v2ray.com/core/features/routing"
feature_stats "v2ray.com/core/features/stats" feature_stats "v2ray.com/core/features/stats"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
@ -85,7 +86,7 @@ func (r *cachedReader) CloseError() {
type DefaultDispatcher struct { type DefaultDispatcher struct {
ohm outbound.HandlerManager ohm outbound.HandlerManager
router routing.Router router routing.Router
policy core.PolicyManager policy policy.Manager
stats feature_stats.Manager stats feature_stats.Manager
} }

View File

@ -7,6 +7,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/miekg/dns"
"v2ray.com/core" "v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
@ -40,7 +41,7 @@ func New(ctx context.Context, config *Config) (*Server, error) {
server.hosts = hosts server.hosts = hosts
v := core.MustFromContext(ctx) v := core.MustFromContext(ctx)
if err := v.RegisterFeature((*core.DNSClient)(nil), server); err != nil { if err := v.RegisterFeature((*dns.Client)(nil), server); err != nil {
return nil, newError("unable to register DNSClient.").Base(err) return nil, newError("unable to register DNSClient.").Base(err)
} }

View File

@ -3,7 +3,7 @@ package policy
import ( import (
"time" "time"
"v2ray.com/core" "v2ray.com/core/features/policy"
) )
// Duration converts Second to time.Duration. // Duration converts Second to time.Duration.
@ -15,7 +15,7 @@ func (s *Second) Duration() time.Duration {
} }
func defaultPolicy() *Policy { func defaultPolicy() *Policy {
p := core.DefaultPolicy() p := policy.SessionDefault()
return &Policy{ return &Policy{
Timeout: &Policy_Timeout{ Timeout: &Policy_Timeout{
@ -60,9 +60,9 @@ func (p *Policy) overrideWith(another *Policy) {
} }
} }
// ToCorePolicy converts this Policy to core.Policy. // ToCorePolicy converts this Policy to policy.Session.
func (p *Policy) ToCorePolicy() core.Policy { func (p *Policy) ToCorePolicy() policy.Session {
cp := core.DefaultPolicy() cp := policy.SessionDefault()
if p.Timeout != nil { if p.Timeout != nil {
cp.Timeouts.ConnectionIdle = p.Timeout.ConnectionIdle.Duration() cp.Timeouts.ConnectionIdle = p.Timeout.ConnectionIdle.Duration()
@ -80,10 +80,10 @@ func (p *Policy) ToCorePolicy() core.Policy {
return cp return cp
} }
// ToCorePolicy converts this SystemPolicy to core.SystemPolicy. // ToCorePolicy converts this SystemPolicy to policy.System.
func (p *SystemPolicy) ToCorePolicy() core.SystemPolicy { func (p *SystemPolicy) ToCorePolicy() policy.System {
return core.SystemPolicy{ return policy.System{
Stats: core.SystemStatsPolicy{ Stats: policy.SystemStats{
InboundUplink: p.Stats.InboundUplink, InboundUplink: p.Stats.InboundUplink,
InboundDownlink: p.Stats.InboundDownlink, InboundDownlink: p.Stats.InboundDownlink,
}, },

View File

@ -5,6 +5,7 @@ import (
"v2ray.com/core" "v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/features/policy"
) )
// Instance is an instance of Policy manager. // Instance is an instance of Policy manager.
@ -29,7 +30,7 @@ func New(ctx context.Context, config *Config) (*Instance, error) {
v := core.FromContext(ctx) v := core.FromContext(ctx)
if v != nil { if v != nil {
if err := v.RegisterFeature((*core.PolicyManager)(nil), m); err != nil { if err := v.RegisterFeature((*policy.Manager)(nil), m); err != nil {
return nil, newError("unable to register PolicyManager in core").Base(err).AtError() return nil, newError("unable to register PolicyManager in core").Base(err).AtError()
} }
} }
@ -37,18 +38,18 @@ func New(ctx context.Context, config *Config) (*Instance, error) {
return m, nil return m, nil
} }
// ForLevel implements core.PolicyManager. // ForLevel implements policy.Manager.
func (m *Instance) ForLevel(level uint32) core.Policy { func (m *Instance) ForLevel(level uint32) policy.Session {
if p, ok := m.levels[level]; ok { if p, ok := m.levels[level]; ok {
return p.ToCorePolicy() return p.ToCorePolicy()
} }
return core.DefaultPolicy() return policy.SessionDefault()
} }
// ForSystem implements core.PolicyManager. // ForSystem implements policy.Manager.
func (m *Instance) ForSystem() core.SystemPolicy { func (m *Instance) ForSystem() policy.System {
if m.system == nil { if m.system == nil {
return core.SystemPolicy{} return policy.System{}
} }
return m.system.ToCorePolicy() return m.system.ToCorePolicy()
} }

View File

@ -5,8 +5,8 @@ import (
"testing" "testing"
"time" "time"
"v2ray.com/core"
. "v2ray.com/core/app/policy" . "v2ray.com/core/app/policy"
"v2ray.com/core/features/policy"
. "v2ray.com/ext/assert" . "v2ray.com/ext/assert"
) )
@ -26,7 +26,7 @@ func TestPolicy(t *testing.T) {
}) })
assert(err, IsNil) assert(err, IsNil)
pDefault := core.DefaultPolicy() pDefault := policy.SessionDefault()
p0 := manager.ForLevel(0) p0 := manager.ForLevel(0)
assert(p0.Timeouts.Handshake, Equals, 2*time.Second) assert(p0.Timeouts.Handshake, Equals, 2*time.Second)

View File

@ -1,4 +1,4 @@
// Package policy is an implementation of core.PolicyManager feature. // Package policy is an implementation of policy.Manager feature.
package policy package policy
//go:generate errorgen //go:generate errorgen

View File

@ -5,12 +5,12 @@ package router
import ( import (
"context" "context"
"v2ray.com/core/common/session"
"v2ray.com/core/features/routing"
"v2ray.com/core" "v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/session"
"v2ray.com/core/features/dns"
"v2ray.com/core/features/routing"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
) )
@ -18,7 +18,7 @@ import (
type Router struct { type Router struct {
domainStrategy Config_DomainStrategy domainStrategy Config_DomainStrategy
rules []Rule rules []Rule
dns core.DNSClient dns dns.Client
} }
// NewRouter creates a new Router based on the given config. // NewRouter creates a new Router based on the given config.
@ -46,7 +46,7 @@ func NewRouter(ctx context.Context, config *Config) (*Router, error) {
} }
type ipResolver struct { type ipResolver struct {
dns core.DNSClient dns dns.Client
ip []net.Address ip []net.Address
domain string domain string
resolved bool resolved bool

25
dns.go
View File

@ -5,49 +5,44 @@ import (
"sync" "sync"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/features/dns"
) )
// DNSClient is a V2Ray feature for querying DNS information.
type DNSClient interface {
Feature
LookupIP(host string) ([]net.IP, error)
}
type syncDNSClient struct { type syncDNSClient struct {
sync.RWMutex sync.RWMutex
DNSClient dns.Client
} }
func (d *syncDNSClient) LookupIP(host string) ([]net.IP, error) { func (d *syncDNSClient) LookupIP(host string) ([]net.IP, error) {
d.RLock() d.RLock()
defer d.RUnlock() defer d.RUnlock()
if d.DNSClient == nil { if d.Client == nil {
return net.LookupIP(host) return net.LookupIP(host)
} }
return d.DNSClient.LookupIP(host) return d.Client.LookupIP(host)
} }
func (d *syncDNSClient) Start() error { func (d *syncDNSClient) Start() error {
d.RLock() d.RLock()
defer d.RUnlock() defer d.RUnlock()
if d.DNSClient == nil { if d.Client == nil {
return nil return nil
} }
return d.DNSClient.Start() return d.Client.Start()
} }
func (d *syncDNSClient) Close() error { func (d *syncDNSClient) Close() error {
d.RLock() d.RLock()
defer d.RUnlock() defer d.RUnlock()
return common.Close(d.DNSClient) return common.Close(d.Client)
} }
func (d *syncDNSClient) Set(client DNSClient) { func (d *syncDNSClient) Set(client dns.Client) {
if client == nil { if client == nil {
return return
} }
@ -55,6 +50,6 @@ func (d *syncDNSClient) Set(client DNSClient) {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
common.Close(d.DNSClient) // nolint: errcheck common.Close(d.Client) // nolint: errcheck
d.DNSClient = client d.Client = client
} }

13
features/dns/client.go Normal file
View File

@ -0,0 +1,13 @@
package dns
import (
"net"
"v2ray.com/core/features"
)
// Client is a V2Ray feature for querying DNS information.
type Client interface {
features.Feature
LookupIP(host string) ([]net.IP, error)
}

134
features/policy/policy.go Normal file
View File

@ -0,0 +1,134 @@
package policy
import (
"context"
"runtime"
"time"
"v2ray.com/core/common/platform"
"v2ray.com/core/features"
)
// Timeout contains limits for connection timeout.
type Timeout struct {
// Timeout for handshake phase in a connection.
Handshake time.Duration
// Timeout for connection being idle, i.e., there is no egress or ingress traffic in this connection.
ConnectionIdle time.Duration
// Timeout for an uplink only connection, i.e., the downlink of the connection has been closed.
UplinkOnly time.Duration
// Timeout for an downlink only connection, i.e., the uplink of the connection has been closed.
DownlinkOnly time.Duration
}
// Stats contains settings for stats counters.
type Stats struct {
// Whether or not to enable stat counter for user uplink traffic.
UserUplink bool
// Whether or not to enable stat counter for user downlink traffic.
UserDownlink bool
}
// Buffer contains settings for internal buffer.
type Buffer struct {
// Size of buffer per connection, in bytes. -1 for unlimited buffer.
PerConnection int32
}
// SystemStats contains stat policy settings on system level.
type SystemStats struct {
// Whether or not to enable stat counter for uplink traffic in inbound handlers.
InboundUplink bool
// Whether or not to enable stat counter for downlink traffic in inbound handlers.
InboundDownlink bool
}
// System contains policy settings at system level.
type System struct {
Stats SystemStats
Buffer Buffer
}
// Session is session based settings for controlling V2Ray requests. It contains various settings (or limits) that may differ for different users in the context.
type Session struct {
Timeouts Timeout // Timeout settings
Stats Stats
Buffer Buffer
}
// Manager is a feature that provides Policy for the given user by its id or level.
type Manager interface {
features.Feature
// ForLevel returns the Session policy for the given user level.
ForLevel(level uint32) Session
// ForSystem returns the System policy for V2Ray system.
ForSystem() System
}
var defaultBufferSize int32
func init() {
const key = "v2ray.ray.buffer.size"
const defaultValue = -17
size := platform.EnvFlag{
Name: key,
AltName: platform.NormalizeEnvName(key),
}.GetValueAsInt(defaultValue)
switch size {
case 0:
defaultBufferSize = -1 // For pipe to use unlimited size
case defaultValue: // Env flag not defined. Use default values per CPU-arch.
switch runtime.GOARCH {
case "arm", "arm64", "mips", "mipsle", "mips64", "mips64le":
defaultBufferSize = 16 * 1024 // 16k cache for low-end devices
default:
defaultBufferSize = 2 * 1024 * 1024
}
default:
defaultBufferSize = int32(size) * 1024 * 1024
}
}
func defaultBufferPolicy() Buffer {
return Buffer{
PerConnection: defaultBufferSize,
}
}
// SessionDefault returns the Policy when user is not specified.
func SessionDefault() Session {
return Session{
Timeouts: Timeout{
Handshake: time.Second * 4,
ConnectionIdle: time.Second * 300,
UplinkOnly: time.Second * 2,
DownlinkOnly: time.Second * 5,
},
Stats: Stats{
UserUplink: false,
UserDownlink: false,
},
Buffer: defaultBufferPolicy(),
}
}
type policyKey int32
const (
bufferPolicyKey policyKey = 0
)
func ContextWithBufferPolicy(ctx context.Context, p Buffer) context.Context {
return context.WithValue(ctx, bufferPolicyKey, p)
}
func BufferPolicyFromContext(ctx context.Context) Buffer {
pPolicy := ctx.Value(bufferPolicyKey)
if pPolicy == nil {
return defaultBufferPolicy()
}
return pPolicy.(Buffer)
}

158
policy.go
View File

@ -1,189 +1,63 @@
package core package core
import ( import (
"context"
"runtime"
"sync" "sync"
"time" "time"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/platform" "v2ray.com/core/features/policy"
) )
// TimeoutPolicy contains limits for connection timeout.
type TimeoutPolicy struct {
// Timeout for handshake phase in a connection.
Handshake time.Duration
// Timeout for connection being idle, i.e., there is no egress or ingress traffic in this connection.
ConnectionIdle time.Duration
// Timeout for an uplink only connection, i.e., the downlink of the connection has been closed.
UplinkOnly time.Duration
// Timeout for an downlink only connection, i.e., the uplink of the connection has been closed.
DownlinkOnly time.Duration
}
// StatsPolicy contains settings for stats counters.
type StatsPolicy struct {
// Whether or not to enable stat counter for user uplink traffic.
UserUplink bool
// Whether or not to enable stat counter for user downlink traffic.
UserDownlink bool
}
// BufferPolicy contains settings for internal buffer.
type BufferPolicy struct {
// Size of buffer per connection, in bytes. -1 for unlimited buffer.
PerConnection int32
}
// SystemStatsPolicy contains stat policy settings on system level.
type SystemStatsPolicy struct {
// Whether or not to enable stat counter for uplink traffic in inbound handlers.
InboundUplink bool
// Whether or not to enable stat counter for downlink traffic in inbound handlers.
InboundDownlink bool
}
// SystemPolicy contains policy settings at system level.
type SystemPolicy struct {
Stats SystemStatsPolicy
Buffer BufferPolicy
}
// Policy is session based settings for controlling V2Ray requests. It contains various settings (or limits) that may differ for different users in the context.
type Policy struct {
Timeouts TimeoutPolicy // Timeout settings
Stats StatsPolicy
Buffer BufferPolicy
}
// PolicyManager is a feature that provides Policy for the given user by its id or level.
type PolicyManager interface {
Feature
// ForLevel returns the Policy for the given user level.
ForLevel(level uint32) Policy
// ForSystem returns the Policy for V2Ray system.
ForSystem() SystemPolicy
}
var defaultBufferSize int32
func init() {
const key = "v2ray.ray.buffer.size"
const defaultValue = -17
size := platform.EnvFlag{
Name: key,
AltName: platform.NormalizeEnvName(key),
}.GetValueAsInt(defaultValue)
switch size {
case 0:
defaultBufferSize = -1 // For pipe to use unlimited size
case defaultValue: // Env flag not defined. Use default values per CPU-arch.
switch runtime.GOARCH {
case "arm", "arm64", "mips", "mipsle", "mips64", "mips64le":
defaultBufferSize = 16 * 1024 // 16k cache for low-end devices
default:
defaultBufferSize = 2 * 1024 * 1024
}
default:
defaultBufferSize = int32(size) * 1024 * 1024
}
}
func defaultBufferPolicy() BufferPolicy {
return BufferPolicy{
PerConnection: defaultBufferSize,
}
}
// DefaultPolicy returns the Policy when user is not specified.
func DefaultPolicy() Policy {
return Policy{
Timeouts: TimeoutPolicy{
Handshake: time.Second * 4,
ConnectionIdle: time.Second * 300,
UplinkOnly: time.Second * 2,
DownlinkOnly: time.Second * 5,
},
Stats: StatsPolicy{
UserUplink: false,
UserDownlink: false,
},
Buffer: defaultBufferPolicy(),
}
}
type policyKey int
const (
bufferPolicyKey policyKey = 0
)
func ContextWithBufferPolicy(ctx context.Context, p BufferPolicy) context.Context {
return context.WithValue(ctx, bufferPolicyKey, p)
}
func BufferPolicyFromContext(ctx context.Context) BufferPolicy {
pPolicy := ctx.Value(bufferPolicyKey)
if pPolicy == nil {
return defaultBufferPolicy()
}
return pPolicy.(BufferPolicy)
}
type syncPolicyManager struct { type syncPolicyManager struct {
sync.RWMutex sync.RWMutex
PolicyManager policy.Manager
} }
func (m *syncPolicyManager) ForLevel(level uint32) Policy { func (m *syncPolicyManager) ForLevel(level uint32) policy.Session {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
if m.PolicyManager == nil { if m.Manager == nil {
p := DefaultPolicy() p := policy.SessionDefault()
if level == 1 { if level == 1 {
p.Timeouts.ConnectionIdle = time.Second * 600 p.Timeouts.ConnectionIdle = time.Second * 600
} }
return p return p
} }
return m.PolicyManager.ForLevel(level) return m.Manager.ForLevel(level)
} }
func (m *syncPolicyManager) ForSystem() SystemPolicy { func (m *syncPolicyManager) ForSystem() policy.System {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
if m.PolicyManager == nil { if m.Manager == nil {
return SystemPolicy{} return policy.System{}
} }
return m.PolicyManager.ForSystem() return m.Manager.ForSystem()
} }
func (m *syncPolicyManager) Start() error { func (m *syncPolicyManager) Start() error {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
if m.PolicyManager == nil { if m.Manager == nil {
return nil return nil
} }
return m.PolicyManager.Start() return m.Manager.Start()
} }
func (m *syncPolicyManager) Close() error { func (m *syncPolicyManager) Close() error {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
return common.Close(m.PolicyManager) return common.Close(m.Manager)
} }
func (m *syncPolicyManager) Set(manager PolicyManager) { func (m *syncPolicyManager) Set(manager policy.Manager) {
if manager == nil { if manager == nil {
return return
} }
@ -191,6 +65,6 @@ func (m *syncPolicyManager) Set(manager PolicyManager) {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
common.Close(m.PolicyManager) // nolint: errcheck common.Close(m.Manager) // nolint: errcheck
m.PolicyManager = manager m.Manager = manager
} }

View File

@ -13,13 +13,14 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/features/policy"
"v2ray.com/core/features/routing" "v2ray.com/core/features/routing"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
) )
type DokodemoDoor struct { type DokodemoDoor struct {
policyManager core.PolicyManager policyManager policy.Manager
config *Config config *Config
address net.Address address net.Address
port net.Port port net.Port
@ -44,7 +45,7 @@ func (d *DokodemoDoor) Network() net.NetworkList {
return *(d.config.NetworkList) return *(d.config.NetworkList)
} }
func (d *DokodemoDoor) policy() core.Policy { func (d *DokodemoDoor) policy() policy.Session {
config := d.config config := d.config
p := d.policyManager.ForLevel(config.UserLevel) p := d.policyManager.ForLevel(config.UserLevel)
if config.Timeout > 0 && config.UserLevel == 0 { if config.Timeout > 0 && config.UserLevel == 0 {
@ -82,7 +83,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, plcy.Timeouts.ConnectionIdle) timer := signal.CancelAfterInactivity(ctx, cancel, plcy.Timeouts.ConnectionIdle)
ctx = core.ContextWithBufferPolicy(ctx, plcy.Buffer) ctx = policy.ContextWithBufferPolicy(ctx, plcy.Buffer)
link, err := dispatcher.Dispatch(ctx, dest) link, err := dispatcher.Dispatch(ctx, dest)
if err != nil { if err != nil {
return newError("failed to dispatch request").Base(err) return newError("failed to dispatch request").Base(err)

View File

@ -6,6 +6,7 @@ import (
"context" "context"
"time" "time"
"github.com/miekg/dns"
"v2ray.com/core" "v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
@ -16,14 +17,15 @@ import (
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/vio" "v2ray.com/core/common/vio"
"v2ray.com/core/features/policy"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
) )
// Handler handles Freedom connections. // Handler handles Freedom connections.
type Handler struct { type Handler struct {
policyManager core.PolicyManager policyManager policy.Manager
dns core.DNSClient dns dns.Client
config Config config Config
} }
@ -39,7 +41,7 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
return f, nil return f, nil
} }
func (h *Handler) policy() core.Policy { func (h *Handler) policy() policy.Session {
p := h.policyManager.ForLevel(h.config.UserLevel) p := h.policyManager.ForLevel(h.config.UserLevel)
if h.config.Timeout > 0 && h.config.UserLevel == 0 { if h.config.Timeout > 0 && h.config.UserLevel == 0 {
p.Timeouts.ConnectionIdle = time.Duration(h.config.Timeout) * time.Second p.Timeouts.ConnectionIdle = time.Duration(h.config.Timeout) * time.Second

View File

@ -20,6 +20,7 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/features/policy"
"v2ray.com/core/features/routing" "v2ray.com/core/features/routing"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
@ -41,7 +42,7 @@ func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
return s, nil return s, nil
} }
func (s *Server) policy() core.Policy { func (s *Server) policy() policy.Session {
config := s.config config := s.config
p := s.v.PolicyManager().ForLevel(config.UserLevel) p := s.v.PolicyManager().ForLevel(config.UserLevel)
if config.Timeout > 0 && config.UserLevel == 0 { if config.Timeout > 0 && config.UserLevel == 0 {
@ -176,7 +177,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, plcy.Timeouts.ConnectionIdle) timer := signal.CancelAfterInactivity(ctx, cancel, plcy.Timeouts.ConnectionIdle)
ctx = core.ContextWithBufferPolicy(ctx, plcy.Buffer) ctx = policy.ContextWithBufferPolicy(ctx, plcy.Buffer)
link, err := dispatcher.Dispatch(ctx, dest) link, err := dispatcher.Dispatch(ctx, dest)
if err != nil { if err != nil {
return err return err

View File

@ -14,6 +14,7 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/features/policy"
"v2ray.com/core/features/routing" "v2ray.com/core/features/routing"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/pipe" "v2ray.com/core/transport/pipe"
@ -32,7 +33,7 @@ var (
type Server struct { type Server struct {
user *protocol.User user *protocol.User
account *Account account *Account
policy core.PolicyManager policy policy.Manager
} }
func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) { func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
@ -114,7 +115,7 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, sPolicy.Timeouts.ConnectionIdle) timer := signal.CancelAfterInactivity(ctx, cancel, sPolicy.Timeouts.ConnectionIdle)
ctx = core.ContextWithBufferPolicy(ctx, sPolicy.Buffer) ctx = policy.ContextWithBufferPolicy(ctx, sPolicy.Buffer)
sc := SessionContext{ sc := SessionContext{
ConnectionType: ct, ConnectionType: ct,

View File

@ -13,6 +13,7 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/features/policy"
"v2ray.com/core/features/routing" "v2ray.com/core/features/routing"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/udp" "v2ray.com/core/transport/internet/udp"
@ -175,7 +176,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle) timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
ctx = core.ContextWithBufferPolicy(ctx, sessionPolicy.Buffer) ctx = policy.ContextWithBufferPolicy(ctx, sessionPolicy.Buffer)
link, err := dispatcher.Dispatch(ctx, dest) link, err := dispatcher.Dispatch(ctx, dest)
if err != nil { if err != nil {
return err return err

View File

@ -7,6 +7,7 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/vio" "v2ray.com/core/common/vio"
"v2ray.com/core/features/policy"
"v2ray.com/core" "v2ray.com/core"
"v2ray.com/core/common" "v2ray.com/core/common"
@ -22,7 +23,7 @@ import (
// Client is a Socks5 client. // Client is a Socks5 client.
type Client struct { type Client struct {
serverPicker protocol.ServerPicker serverPicker protocol.ServerPicker
policyManager core.PolicyManager policyManager policy.Manager
} }
// NewClient create a new Socks5 client based on the given config. // NewClient create a new Socks5 client based on the given config.

View File

@ -14,6 +14,7 @@ import (
"v2ray.com/core/common/session" "v2ray.com/core/common/session"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/features/policy"
"v2ray.com/core/features/routing" "v2ray.com/core/features/routing"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/udp" "v2ray.com/core/transport/internet/udp"
@ -35,7 +36,7 @@ func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
return s, nil return s, nil
} }
func (s *Server) policy() core.Policy { func (s *Server) policy() policy.Session {
config := s.config config := s.config
p := s.v.PolicyManager().ForLevel(config.UserLevel) p := s.v.PolicyManager().ForLevel(config.UserLevel)
if config.Timeout > 0 { if config.Timeout > 0 {
@ -137,7 +138,7 @@ func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
timer := signal.CancelAfterInactivity(ctx, cancel, s.policy().Timeouts.ConnectionIdle) timer := signal.CancelAfterInactivity(ctx, cancel, s.policy().Timeouts.ConnectionIdle)
plcy := s.policy() plcy := s.policy()
ctx = core.ContextWithBufferPolicy(ctx, plcy.Buffer) ctx = policy.ContextWithBufferPolicy(ctx, plcy.Buffer)
link, err := dispatcher.Dispatch(ctx, dest) link, err := dispatcher.Dispatch(ctx, dest)
if err != nil { if err != nil {
return err return err

View File

@ -21,6 +21,7 @@ import (
"v2ray.com/core/common/task" "v2ray.com/core/common/task"
"v2ray.com/core/common/uuid" "v2ray.com/core/common/uuid"
feature_inbound "v2ray.com/core/features/inbound" feature_inbound "v2ray.com/core/features/inbound"
"v2ray.com/core/features/policy"
"v2ray.com/core/features/routing" "v2ray.com/core/features/routing"
"v2ray.com/core/proxy/vmess" "v2ray.com/core/proxy/vmess"
"v2ray.com/core/proxy/vmess/encoding" "v2ray.com/core/proxy/vmess/encoding"
@ -100,7 +101,7 @@ func (v *userByEmail) Remove(email string) bool {
// Handler is an inbound connection handler that handles messages in VMess protocol. // Handler is an inbound connection handler that handles messages in VMess protocol.
type Handler struct { type Handler struct {
policyManager core.PolicyManager policyManager policy.Manager
inboundHandlerManager feature_inbound.Manager inboundHandlerManager feature_inbound.Manager
clients *vmess.TimedUserValidator clients *vmess.TimedUserValidator
usersByEmail *userByEmail usersByEmail *userByEmail
@ -269,7 +270,7 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle) timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
ctx = core.ContextWithBufferPolicy(ctx, sessionPolicy.Buffer) ctx = policy.ContextWithBufferPolicy(ctx, sessionPolicy.Buffer)
link, err := dispatcher.Dispatch(ctx, request.Destination()) link, err := dispatcher.Dispatch(ctx, request.Destination())
if err != nil { if err != nil {
return newError("failed to dispatch request to ", request.Destination()).Base(err) return newError("failed to dispatch request to ", request.Destination()).Base(err)

View File

@ -3,9 +3,9 @@ package pipe
import ( import (
"context" "context"
"v2ray.com/core"
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
"v2ray.com/core/common/signal/done" "v2ray.com/core/common/signal/done"
"v2ray.com/core/features/policy"
) )
// Option for creating new Pipes. // Option for creating new Pipes.
@ -36,7 +36,7 @@ func DiscardOverflow() Option {
func OptionsFromContext(ctx context.Context) []Option { func OptionsFromContext(ctx context.Context) []Option {
var opt []Option var opt []Option
bp := core.BufferPolicyFromContext(ctx) bp := policy.BufferPolicyFromContext(ctx)
if bp.PerConnection >= 0 { if bp.PerConnection >= 0 {
opt = append(opt, WithSizeLimit(bp.PerConnection)) opt = append(opt, WithSizeLimit(bp.PerConnection))
} else { } else {

View File

@ -7,8 +7,11 @@ import (
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/serial" "v2ray.com/core/common/serial"
"v2ray.com/core/common/uuid" "v2ray.com/core/common/uuid"
"v2ray.com/core/features"
"v2ray.com/core/features/dns"
"v2ray.com/core/features/inbound" "v2ray.com/core/features/inbound"
"v2ray.com/core/features/outbound" "v2ray.com/core/features/outbound"
"v2ray.com/core/features/policy"
"v2ray.com/core/features/routing" "v2ray.com/core/features/routing"
"v2ray.com/core/features/stats" "v2ray.com/core/features/stats"
) )
@ -19,12 +22,6 @@ type Server interface {
common.Runnable common.Runnable
} }
// Feature is the interface for V2Ray features. All features must implement this interface.
// All existing features have an implementation in app directory. These features can be replaced by third-party ones.
type Feature interface {
common.Runnable
}
// Instance combines all functionalities in V2Ray. // Instance combines all functionalities in V2Ray.
type Instance struct { type Instance struct {
dnsClient syncDNSClient dnsClient syncDNSClient
@ -36,7 +33,7 @@ type Instance struct {
stats syncStatManager stats syncStatManager
access sync.Mutex access sync.Mutex
features []Feature features []features.Feature
id uuid.UUID id uuid.UUID
running bool running bool
} }
@ -143,14 +140,14 @@ func (s *Instance) Start() error {
// RegisterFeature registers the given feature into V2Ray. // RegisterFeature registers the given feature into V2Ray.
// If feature is one of the following types, the corresponding feature in this Instance // If feature is one of the following types, the corresponding feature in this Instance
// will be replaced: DNSClient, PolicyManager, Router, Dispatcher, InboundHandlerManager, OutboundHandlerManager. // will be replaced: DNSClient, PolicyManager, Router, Dispatcher, InboundHandlerManager, OutboundHandlerManager.
func (s *Instance) RegisterFeature(feature interface{}, instance Feature) error { func (s *Instance) RegisterFeature(feature interface{}, instance features.Feature) error {
running := false running := false
switch feature.(type) { switch feature.(type) {
case DNSClient, *DNSClient: case dns.Client, *dns.Client:
s.dnsClient.Set(instance.(DNSClient)) s.dnsClient.Set(instance.(dns.Client))
case PolicyManager, *PolicyManager: case policy.Manager, *policy.Manager:
s.policyManager.Set(instance.(PolicyManager)) s.policyManager.Set(instance.(policy.Manager))
case routing.Router, *routing.Router: case routing.Router, *routing.Router:
s.router.Set(instance.(routing.Router)) s.router.Set(instance.(routing.Router))
case routing.Dispatcher, *routing.Dispatcher: case routing.Dispatcher, *routing.Dispatcher:
@ -174,13 +171,13 @@ func (s *Instance) RegisterFeature(feature interface{}, instance Feature) error
return nil return nil
} }
func (s *Instance) allFeatures() []Feature { func (s *Instance) allFeatures() []features.Feature {
return append([]Feature{s.DNSClient(), s.PolicyManager(), s.Dispatcher(), s.Router(), s.InboundHandlerManager(), s.OutboundHandlerManager(), s.Stats()}, s.features...) return append([]features.Feature{s.DNSClient(), s.PolicyManager(), s.Dispatcher(), s.Router(), s.InboundHandlerManager(), s.OutboundHandlerManager(), s.Stats()}, s.features...)
} }
// GetFeature returns a feature that was registered in this Instance. Nil if not found. // GetFeature returns a feature that was registered in this Instance. Nil if not found.
// The returned Feature must implement common.HasType and whose type equals to the given feature type. // The returned Feature must implement common.HasType and whose type equals to the given feature type.
func (s *Instance) GetFeature(featureType interface{}) Feature { func (s *Instance) GetFeature(featureType interface{}) features.Feature {
for _, f := range s.features { for _, f := range s.features {
if hasType, ok := f.(common.HasType); ok { if hasType, ok := f.(common.HasType); ok {
if hasType.Type() == featureType { if hasType.Type() == featureType {
@ -191,13 +188,13 @@ func (s *Instance) GetFeature(featureType interface{}) Feature {
return nil return nil
} }
// DNSClient returns the DNSClient used by this Instance. The returned DNSClient is always functional. // DNSClient returns the dns.Client used by this Instance. The returned dns.Client is always functional.
func (s *Instance) DNSClient() DNSClient { func (s *Instance) DNSClient() dns.Client {
return &(s.dnsClient) return &(s.dnsClient)
} }
// PolicyManager returns the PolicyManager used by this Instance. The returned PolicyManager is always functional. // PolicyManager returns the policy.Manager used by this Instance. The returned policy.Manager is always functional.
func (s *Instance) PolicyManager() PolicyManager { func (s *Instance) PolicyManager() policy.Manager {
return &(s.policyManager) return &(s.policyManager)
} }