diff --git a/app/dispatcher/default.go b/app/dispatcher/default.go index c9689cf9..503d080b 100644 --- a/app/dispatcher/default.go +++ b/app/dispatcher/default.go @@ -52,18 +52,6 @@ func (*DefaultDispatcher) Start() error { // Close implements common.Closable. func (*DefaultDispatcher) Close() error { return nil } -func (d *DefaultDispatcher) getStatCounter(name string) core.StatCounter { - c := d.stats.GetCounter(name) - if c != nil { - return c - } - c, err := d.stats.RegisterCounter(name) - if err != nil { - return nil - } - return c -} - func (d *DefaultDispatcher) getRayOption(user *protocol.User) []ray.Option { var rayOptions []ray.Option @@ -71,13 +59,13 @@ func (d *DefaultDispatcher) getRayOption(user *protocol.User) []ray.Option { p := d.policy.ForLevel(user.Level) if p.Stats.UserUplink { name := "user>>>" + user.Email + ">>>traffic>>>uplink" - if c := d.getStatCounter(name); c != nil { + if c, _ := core.GetOrRegisterStatCounter(d.stats, name); c != nil { rayOptions = append(rayOptions, ray.WithUplinkStatCounter(c)) } } if p.Stats.UserDownlink { name := "user>>>" + user.Email + ">>>traffic>>>downlink" - if c := d.getStatCounter(name); c != nil { + if c, _ := core.GetOrRegisterStatCounter(d.stats, name); c != nil { rayOptions = append(rayOptions, ray.WithDownlinkStatCounter(c)) } } diff --git a/app/policy/config.go b/app/policy/config.go index db5aaa9f..cf65be9a 100644 --- a/app/policy/config.go +++ b/app/policy/config.go @@ -67,3 +67,12 @@ func (p *Policy) ToCorePolicy() core.Policy { } return cp } + +func (p *SystemPolicy) ToCorePolicy() core.SystemPolicy { + return core.SystemPolicy{ + Stats: core.SystemStatsPolicy{ + InboundUplink: p.Stats.InboundUplink, + InboundDownlink: p.Stats.InboundDownlink, + }, + } +} diff --git a/app/policy/config.pb.go b/app/policy/config.pb.go index 10e77c0e..6cde3e14 100644 --- a/app/policy/config.pb.go +++ b/app/policy/config.pb.go @@ -120,14 +120,55 @@ func (m *Policy_Stats) GetUserDownlink() bool { return false } +type SystemPolicy struct { + Stats *SystemPolicy_Stats `protobuf:"bytes,1,opt,name=stats" json:"stats,omitempty"` +} + +func (m *SystemPolicy) Reset() { *m = SystemPolicy{} } +func (m *SystemPolicy) String() string { return proto.CompactTextString(m) } +func (*SystemPolicy) ProtoMessage() {} +func (*SystemPolicy) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *SystemPolicy) GetStats() *SystemPolicy_Stats { + if m != nil { + return m.Stats + } + return nil +} + +type SystemPolicy_Stats struct { + InboundUplink bool `protobuf:"varint,1,opt,name=inbound_uplink,json=inboundUplink" json:"inbound_uplink,omitempty"` + InboundDownlink bool `protobuf:"varint,2,opt,name=inbound_downlink,json=inboundDownlink" json:"inbound_downlink,omitempty"` +} + +func (m *SystemPolicy_Stats) Reset() { *m = SystemPolicy_Stats{} } +func (m *SystemPolicy_Stats) String() string { return proto.CompactTextString(m) } +func (*SystemPolicy_Stats) ProtoMessage() {} +func (*SystemPolicy_Stats) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2, 0} } + +func (m *SystemPolicy_Stats) GetInboundUplink() bool { + if m != nil { + return m.InboundUplink + } + return false +} + +func (m *SystemPolicy_Stats) GetInboundDownlink() bool { + if m != nil { + return m.InboundDownlink + } + return false +} + type Config struct { - Level map[uint32]*Policy `protobuf:"bytes,1,rep,name=level" json:"level,omitempty" protobuf_key:"varint,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + Level map[uint32]*Policy `protobuf:"bytes,1,rep,name=level" json:"level,omitempty" protobuf_key:"varint,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + System *SystemPolicy `protobuf:"bytes,2,opt,name=system" json:"system,omitempty"` } func (m *Config) Reset() { *m = Config{} } func (m *Config) String() string { return proto.CompactTextString(m) } func (*Config) ProtoMessage() {} -func (*Config) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } +func (*Config) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } func (m *Config) GetLevel() map[uint32]*Policy { if m != nil { @@ -136,42 +177,55 @@ func (m *Config) GetLevel() map[uint32]*Policy { return nil } +func (m *Config) GetSystem() *SystemPolicy { + if m != nil { + return m.System + } + return nil +} + func init() { proto.RegisterType((*Second)(nil), "v2ray.core.app.policy.Second") proto.RegisterType((*Policy)(nil), "v2ray.core.app.policy.Policy") proto.RegisterType((*Policy_Timeout)(nil), "v2ray.core.app.policy.Policy.Timeout") proto.RegisterType((*Policy_Stats)(nil), "v2ray.core.app.policy.Policy.Stats") + proto.RegisterType((*SystemPolicy)(nil), "v2ray.core.app.policy.SystemPolicy") + proto.RegisterType((*SystemPolicy_Stats)(nil), "v2ray.core.app.policy.SystemPolicy.Stats") proto.RegisterType((*Config)(nil), "v2ray.core.app.policy.Config") } func init() { proto.RegisterFile("v2ray.com/core/app/policy/config.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 410 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0x5d, 0xab, 0xd3, 0x30, - 0x18, 0xc7, 0x69, 0x6b, 0x7b, 0x8e, 0x4f, 0xcf, 0x54, 0x82, 0x07, 0xea, 0x40, 0x3d, 0x6c, 0x28, - 0xbb, 0x4a, 0xa1, 0xbb, 0xf1, 0x05, 0x27, 0xce, 0x17, 0x10, 0x14, 0x47, 0xe6, 0x0b, 0x78, 0x33, - 0x62, 0x1a, 0x5d, 0x59, 0x96, 0x84, 0xbe, 0x4c, 0xfa, 0x35, 0xfc, 0x06, 0xde, 0xfa, 0xc9, 0xfc, - 0x18, 0xd2, 0xa4, 0xa5, 0x37, 0xdb, 0xdc, 0x5d, 0xfa, 0xf0, 0xfb, 0xff, 0x78, 0x12, 0xfe, 0x85, - 0x87, 0xbb, 0x24, 0xa7, 0x35, 0x66, 0x6a, 0x1b, 0x33, 0x95, 0xf3, 0x98, 0x6a, 0x1d, 0x6b, 0x25, - 0x32, 0x56, 0xc7, 0x4c, 0xc9, 0xef, 0xd9, 0x0f, 0xac, 0x73, 0x55, 0x2a, 0x74, 0xd9, 0x71, 0x39, - 0xc7, 0x54, 0x6b, 0x6c, 0x99, 0xd1, 0x3d, 0x08, 0x96, 0x9c, 0x29, 0x99, 0xa2, 0xdb, 0xe0, 0xef, - 0xa8, 0xa8, 0x78, 0xe4, 0x5c, 0x39, 0x93, 0x01, 0xb1, 0x1f, 0xa3, 0xbf, 0x1e, 0x04, 0x0b, 0x83, - 0xa2, 0xe7, 0x70, 0x56, 0x66, 0x5b, 0xae, 0xaa, 0xd2, 0x20, 0x61, 0xf2, 0x00, 0xef, 0x75, 0x62, - 0xcb, 0xe3, 0x8f, 0x16, 0x26, 0x5d, 0x0a, 0x3d, 0x06, 0xbf, 0x28, 0x69, 0x59, 0x44, 0xae, 0x89, - 0x8f, 0x8f, 0xc7, 0x97, 0x0d, 0x4a, 0x6c, 0x62, 0xf8, 0xcb, 0x85, 0xb3, 0xd6, 0x87, 0x9e, 0xc2, - 0xf5, 0x35, 0x95, 0x69, 0xb1, 0xa6, 0x1b, 0xde, 0x6e, 0x72, 0xf7, 0x80, 0xca, 0x5e, 0x8d, 0xf4, - 0x3c, 0x7a, 0x03, 0x37, 0x99, 0x92, 0x92, 0xb3, 0x32, 0x53, 0x72, 0x95, 0xa5, 0x82, 0xb7, 0xdb, - 0xfc, 0x47, 0x71, 0xa3, 0x4f, 0xbd, 0x4d, 0x05, 0x47, 0x33, 0x08, 0x2b, 0x2d, 0x32, 0xb9, 0x59, - 0x29, 0x29, 0xea, 0xc8, 0x3b, 0xc5, 0x01, 0x36, 0xf1, 0x41, 0x8a, 0x1a, 0xcd, 0x61, 0x90, 0xaa, - 0x9f, 0xb2, 0x37, 0x5c, 0x3b, 0xc5, 0x70, 0xd1, 0x65, 0x1a, 0xc7, 0xf0, 0x3d, 0xf8, 0xe6, 0x91, - 0xd0, 0x7d, 0x08, 0xab, 0x82, 0xe7, 0x2b, 0xeb, 0x37, 0x6f, 0x72, 0x4e, 0xa0, 0x19, 0x7d, 0x32, - 0x13, 0x34, 0x86, 0x81, 0x01, 0xba, 0xb8, 0xb9, 0xf3, 0x39, 0xb9, 0x68, 0x86, 0xaf, 0xda, 0xd9, - 0xe8, 0xb7, 0x03, 0xc1, 0x4b, 0x53, 0x19, 0x34, 0x03, 0x5f, 0xf0, 0x1d, 0x17, 0x91, 0x73, 0xe5, - 0x4d, 0xc2, 0x64, 0x72, 0x60, 0x2b, 0x4b, 0xe3, 0x77, 0x0d, 0xfa, 0x5a, 0x96, 0x79, 0x4d, 0x6c, - 0x6c, 0xf8, 0x05, 0xa0, 0x1f, 0xa2, 0x5b, 0xe0, 0x6d, 0x78, 0xdd, 0xf6, 0xaa, 0x39, 0xa2, 0x69, - 0xd7, 0xb5, 0xe3, 0x6f, 0x6f, 0x9b, 0xd0, 0x56, 0xf1, 0x89, 0xfb, 0xc8, 0x99, 0x3f, 0x83, 0x3b, - 0x4c, 0x6d, 0xf7, 0xe3, 0x0b, 0xe7, 0x6b, 0x60, 0x4f, 0x7f, 0xdc, 0xcb, 0xcf, 0x09, 0xa1, 0xcd, - 0x82, 0x39, 0xc7, 0x2f, 0xb4, 0x6e, 0x4d, 0xdf, 0x02, 0xf3, 0x2f, 0x4c, 0xff, 0x05, 0x00, 0x00, - 0xff, 0xff, 0x98, 0xa8, 0x2f, 0xbe, 0x35, 0x03, 0x00, 0x00, + // 478 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0xdf, 0x6e, 0xd3, 0x30, + 0x18, 0xc5, 0x95, 0x96, 0x66, 0xe3, 0x6b, 0xbb, 0x4d, 0x16, 0x93, 0x4a, 0x25, 0x60, 0xea, 0x34, + 0xd4, 0xdd, 0xb8, 0x52, 0x76, 0x03, 0x4c, 0x0c, 0x31, 0xfe, 0x48, 0x48, 0x20, 0x26, 0x97, 0x3f, + 0x82, 0x9b, 0xca, 0x73, 0x0c, 0x8b, 0xea, 0xd8, 0x56, 0xe2, 0x14, 0xe5, 0x35, 0x78, 0x8c, 0x3d, + 0x14, 0xd7, 0x3c, 0x06, 0x8a, 0xed, 0x2c, 0x1b, 0x5a, 0xb7, 0xde, 0x25, 0x47, 0xbf, 0x73, 0x74, + 0x3e, 0xdb, 0x1f, 0x3c, 0x5e, 0x44, 0x19, 0x2d, 0x31, 0x53, 0xe9, 0x84, 0xa9, 0x8c, 0x4f, 0xa8, + 0xd6, 0x13, 0xad, 0x44, 0xc2, 0xca, 0x09, 0x53, 0xf2, 0x47, 0xf2, 0x13, 0xeb, 0x4c, 0x19, 0x85, + 0xb6, 0x6b, 0x2e, 0xe3, 0x98, 0x6a, 0x8d, 0x1d, 0x33, 0x7a, 0x08, 0xe1, 0x94, 0x33, 0x25, 0x63, + 0x74, 0x0f, 0x3a, 0x0b, 0x2a, 0x0a, 0x3e, 0x08, 0x76, 0x82, 0x71, 0x9f, 0xb8, 0x9f, 0xd1, 0xdf, + 0x36, 0x84, 0x27, 0x16, 0x45, 0x2f, 0x60, 0xcd, 0x24, 0x29, 0x57, 0x85, 0xb1, 0x48, 0x37, 0xda, + 0xc3, 0xd7, 0x66, 0x62, 0xc7, 0xe3, 0x4f, 0x0e, 0x26, 0xb5, 0x0b, 0x3d, 0x85, 0x4e, 0x6e, 0xa8, + 0xc9, 0x07, 0x2d, 0x6b, 0xdf, 0xbd, 0xd9, 0x3e, 0xad, 0x50, 0xe2, 0x1c, 0xc3, 0xdf, 0x2d, 0x58, + 0xf3, 0x79, 0xe8, 0x10, 0xee, 0x9e, 0x51, 0x19, 0xe7, 0x67, 0x74, 0xce, 0x7d, 0x93, 0x07, 0x4b, + 0xa2, 0xdc, 0x68, 0xa4, 0xe1, 0xd1, 0x5b, 0xd8, 0x64, 0x4a, 0x4a, 0xce, 0x4c, 0xa2, 0xe4, 0x2c, + 0x89, 0x05, 0xf7, 0x6d, 0x6e, 0x89, 0xd8, 0x68, 0x5c, 0xef, 0x62, 0xc1, 0xd1, 0x11, 0x74, 0x0b, + 0x2d, 0x12, 0x39, 0x9f, 0x29, 0x29, 0xca, 0x41, 0x7b, 0x95, 0x0c, 0x70, 0x8e, 0x8f, 0x52, 0x94, + 0xe8, 0x18, 0xfa, 0xb1, 0xfa, 0x25, 0x9b, 0x84, 0x3b, 0xab, 0x24, 0xf4, 0x6a, 0x4f, 0x95, 0x31, + 0xfc, 0x00, 0x1d, 0x7b, 0x48, 0xe8, 0x11, 0x74, 0x8b, 0x9c, 0x67, 0x33, 0x97, 0x6f, 0xcf, 0x64, + 0x9d, 0x40, 0x25, 0x7d, 0xb6, 0x0a, 0xda, 0x85, 0xbe, 0x05, 0x6a, 0xbb, 0x9d, 0x79, 0x9d, 0xf4, + 0x2a, 0xf1, 0xb5, 0xd7, 0x46, 0xe7, 0x01, 0xf4, 0xa6, 0x65, 0x6e, 0x78, 0x7a, 0x71, 0xe1, 0xfe, + 0xbe, 0xdc, 0x21, 0xef, 0x2f, 0xeb, 0x76, 0xc9, 0x73, 0xf5, 0xd6, 0xbe, 0xd5, 0x05, 0xf7, 0x60, + 0x23, 0x91, 0xa7, 0xaa, 0x90, 0xf1, 0xd5, 0x8e, 0x7d, 0xaf, 0xfa, 0x9a, 0xfb, 0xb0, 0x55, 0x63, + 0xff, 0x35, 0xdd, 0xf4, 0xfa, 0x45, 0xd9, 0x3f, 0x01, 0x84, 0xaf, 0xec, 0xfb, 0x46, 0x47, 0xd0, + 0x11, 0x7c, 0xc1, 0xc5, 0x20, 0xd8, 0x69, 0x8f, 0xbb, 0xd1, 0x78, 0x49, 0x4d, 0x47, 0xe3, 0xf7, + 0x15, 0xfa, 0x46, 0x9a, 0xac, 0x24, 0xce, 0x86, 0x0e, 0x21, 0xcc, 0xed, 0x08, 0xb7, 0xbc, 0xcb, + 0xcb, 0x73, 0x12, 0x6f, 0x19, 0x7e, 0x05, 0x68, 0x12, 0xd1, 0x16, 0xb4, 0xe7, 0xbc, 0xf4, 0x1b, + 0x54, 0x7d, 0xa2, 0x83, 0x7a, 0xab, 0x6e, 0x7e, 0x65, 0x3e, 0xd5, 0xb1, 0xcf, 0x5a, 0x4f, 0x82, + 0xe3, 0xe7, 0x70, 0x9f, 0xa9, 0xf4, 0x7a, 0xfc, 0x24, 0xf8, 0x1e, 0xba, 0xaf, 0xf3, 0xd6, 0xf6, + 0x97, 0x88, 0xd0, 0x6a, 0xba, 0x8c, 0xe3, 0x97, 0x5a, 0xfb, 0xa4, 0xd3, 0xd0, 0x6e, 0xfd, 0xc1, + 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xa9, 0x9f, 0x00, 0x31, 0x1f, 0x04, 0x00, 0x00, } diff --git a/app/policy/config.proto b/app/policy/config.proto index e82bff3e..cf64db86 100644 --- a/app/policy/config.proto +++ b/app/policy/config.proto @@ -28,6 +28,16 @@ message Policy { Stats stats = 2; } +message SystemPolicy { + message Stats { + bool inbound_uplink = 1; + bool inbound_downlink = 2; + } + + Stats stats = 1; +} + message Config { map level = 1; + SystemPolicy system = 2; } diff --git a/app/policy/manager.go b/app/policy/manager.go index 68ed4e9d..ebfde0e7 100644 --- a/app/policy/manager.go +++ b/app/policy/manager.go @@ -10,12 +10,14 @@ import ( // Instance is an instance of Policy manager. type Instance struct { levels map[uint32]*Policy + system *SystemPolicy } // New creates new Policy manager instance. func New(ctx context.Context, config *Config) (*Instance, error) { m := &Instance{ levels: make(map[uint32]*Policy), + system: config.System, } if len(config.Level) > 0 { for lv, p := range config.Level { @@ -43,6 +45,13 @@ func (m *Instance) ForLevel(level uint32) core.Policy { return core.DefaultPolicy() } +func (m *Instance) ForSystem() core.SystemPolicy { + if m.system == nil { + return core.SystemPolicy{} + } + return m.system.ToCorePolicy() +} + // Start implements common.Runnable.Start(). func (m *Instance) Start() error { return nil diff --git a/app/proxyman/inbound/always.go b/app/proxyman/inbound/always.go index d57e7b09..92a2d604 100644 --- a/app/proxyman/inbound/always.go +++ b/app/proxyman/inbound/always.go @@ -3,6 +3,7 @@ package inbound import ( "context" + "v2ray.com/core" "v2ray.com/core/app/proxyman" "v2ray.com/core/app/proxyman/mux" "v2ray.com/core/common" @@ -11,6 +12,30 @@ import ( "v2ray.com/core/proxy" ) +func getStatCounter(v *core.Instance, tag string) (core.StatCounter, core.StatCounter) { + var uplinkCounter core.StatCounter + var downlinkCounter core.StatCounter + + policy := v.PolicyManager() + stats := v.Stats() + if len(tag) > 0 && policy.ForSystem().Stats.InboundUplink { + name := "inbound>>>" + tag + ">>>traffic>>>uplink" + c, _ := core.GetOrRegisterStatCounter(stats, name) + if c != nil { + uplinkCounter = c + } + } + if len(tag) > 0 && policy.ForSystem().Stats.InboundDownlink { + name := "inbound>>>" + tag + ">>>traffic>>>downlink" + c, _ := core.GetOrRegisterStatCounter(stats, name) + if c != nil { + downlinkCounter = c + } + } + + return uplinkCounter, downlinkCounter +} + type AlwaysOnInboundHandler struct { proxy proxy.Inbound workers []worker @@ -34,6 +59,8 @@ func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig * tag: tag, } + uplinkCounter, downlinkCounter := getStatCounter(core.MustFromContext(ctx), tag) + nl := p.Network() pr := receiverConfig.PortRange address := receiverConfig.Listen.AsAddress() @@ -44,26 +71,30 @@ func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig * if nl.HasNetwork(net.Network_TCP) { newError("creating stream worker on ", address, ":", port).AtDebug().WriteToLog() worker := &tcpWorker{ - address: address, - port: net.Port(port), - proxy: p, - stream: receiverConfig.StreamSettings, - recvOrigDest: receiverConfig.ReceiveOriginalDestination, - tag: tag, - dispatcher: h.mux, - sniffers: receiverConfig.DomainOverride, + address: address, + port: net.Port(port), + proxy: p, + stream: receiverConfig.StreamSettings, + recvOrigDest: receiverConfig.ReceiveOriginalDestination, + tag: tag, + dispatcher: h.mux, + sniffers: receiverConfig.DomainOverride, + uplinkCounter: uplinkCounter, + downlinkCounter: downlinkCounter, } h.workers = append(h.workers, worker) } if nl.HasNetwork(net.Network_UDP) { worker := &udpWorker{ - tag: tag, - proxy: p, - address: address, - port: net.Port(port), - recvOrigDest: receiverConfig.ReceiveOriginalDestination, - dispatcher: h.mux, + tag: tag, + proxy: p, + address: address, + port: net.Port(port), + recvOrigDest: receiverConfig.ReceiveOriginalDestination, + dispatcher: h.mux, + uplinkCounter: uplinkCounter, + downlinkCounter: downlinkCounter, } h.workers = append(h.workers, worker) } diff --git a/app/proxyman/inbound/dynamic.go b/app/proxyman/inbound/dynamic.go index ca2cc944..957279f3 100644 --- a/app/proxyman/inbound/dynamic.go +++ b/app/proxyman/inbound/dynamic.go @@ -90,6 +90,9 @@ func (h *DynamicInboundHandler) refresh() error { if address == nil { address = net.AnyIP } + + uplinkCounter, downlinkCounter := getStatCounter(h.v, h.tag) + for i := uint32(0); i < concurrency; i++ { port := h.allocatePort() rawProxy, err := h.v.CreateObject(h.proxyConfig) @@ -101,14 +104,16 @@ func (h *DynamicInboundHandler) refresh() error { nl := p.Network() if nl.HasNetwork(net.Network_TCP) { worker := &tcpWorker{ - tag: h.tag, - address: address, - port: port, - proxy: p, - stream: h.receiverConfig.StreamSettings, - recvOrigDest: h.receiverConfig.ReceiveOriginalDestination, - dispatcher: h.mux, - sniffers: h.receiverConfig.DomainOverride, + tag: h.tag, + address: address, + port: port, + proxy: p, + stream: h.receiverConfig.StreamSettings, + recvOrigDest: h.receiverConfig.ReceiveOriginalDestination, + dispatcher: h.mux, + sniffers: h.receiverConfig.DomainOverride, + uplinkCounter: uplinkCounter, + downlinkCounter: downlinkCounter, } if err := worker.Start(); err != nil { newError("failed to create TCP worker").Base(err).AtWarning().WriteToLog() @@ -119,12 +124,14 @@ func (h *DynamicInboundHandler) refresh() error { if nl.HasNetwork(net.Network_UDP) { worker := &udpWorker{ - tag: h.tag, - proxy: p, - address: address, - port: port, - recvOrigDest: h.receiverConfig.ReceiveOriginalDestination, - dispatcher: h.mux, + tag: h.tag, + proxy: p, + address: address, + port: port, + recvOrigDest: h.receiverConfig.ReceiveOriginalDestination, + dispatcher: h.mux, + uplinkCounter: uplinkCounter, + downlinkCounter: downlinkCounter, } if err := worker.Start(); err != nil { newError("failed to create UDP worker").Base(err).AtWarning().WriteToLog() diff --git a/app/proxyman/inbound/worker.go b/app/proxyman/inbound/worker.go index e431c732..fbc10a4a 100644 --- a/app/proxyman/inbound/worker.go +++ b/app/proxyman/inbound/worker.go @@ -7,13 +7,12 @@ import ( "sync/atomic" "time" - "v2ray.com/core/common/session" - "v2ray.com/core" "v2ray.com/core/app/proxyman" "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/net" + "v2ray.com/core/common/session" "v2ray.com/core/common/signal" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" @@ -29,14 +28,16 @@ type worker interface { } type tcpWorker struct { - address net.Address - port net.Port - proxy proxy.Inbound - stream *internet.StreamConfig - recvOrigDest bool - tag string - dispatcher core.Dispatcher - sniffers []proxyman.KnownProtocols + address net.Address + port net.Port + proxy proxy.Inbound + stream *internet.StreamConfig + recvOrigDest bool + tag string + dispatcher core.Dispatcher + sniffers []proxyman.KnownProtocols + uplinkCounter core.StatCounter + downlinkCounter core.StatCounter hub internet.Listener } @@ -63,6 +64,13 @@ func (w *tcpWorker) callback(conn internet.Connection) { if len(w.sniffers) > 0 { ctx = proxyman.ContextWithProtocolSniffers(ctx, w.sniffers) } + if w.uplinkCounter != nil || w.downlinkCounter != nil { + conn = &internet.StatCouterConnection{ + Connection: conn, + Uplink: w.uplinkCounter, + Downlink: w.downlinkCounter, + } + } if err := w.proxy.Process(ctx, net.Network_TCP, conn, w.dispatcher); err != nil { newError("connection ends").Base(err).WithContext(ctx).WriteToLog() } @@ -108,6 +116,8 @@ type udpConn struct { remote net.Addr local net.Addr done *signal.Done + uplink core.StatCounter + downlink core.StatCounter } func (c *udpConn) updateActivity() { @@ -119,7 +129,11 @@ func (c *udpConn) Read(buf []byte) (int, error) { case in := <-c.input: defer in.Release() c.updateActivity() - return copy(buf, in.Bytes()), nil + nBytes := copy(buf, in.Bytes()) + if c.uplink != nil { + c.uplink.Add(int64(nBytes)) + } + return nBytes, nil case <-c.done.C(): return 0, io.EOF } @@ -128,6 +142,9 @@ func (c *udpConn) Read(buf []byte) (int, error) { // Write implements io.Writer. func (c *udpConn) Write(buf []byte) (int, error) { n, err := c.output(buf) + if c.downlink != nil { + c.downlink.Add(int64(n)) + } if err == nil { c.updateActivity() } @@ -167,13 +184,15 @@ type connID struct { type udpWorker struct { sync.RWMutex - proxy proxy.Inbound - hub *udp.Hub - address net.Address - port net.Port - recvOrigDest bool - tag string - dispatcher core.Dispatcher + proxy proxy.Inbound + hub *udp.Hub + address net.Address + port net.Port + recvOrigDest bool + tag string + dispatcher core.Dispatcher + uplinkCounter core.StatCounter + downlinkCounter core.StatCounter done *signal.Done activeConn map[connID]*udpConn @@ -200,7 +219,9 @@ func (w *udpWorker) getConnection(id connID) (*udpConn, bool) { IP: w.address.IP(), Port: int(w.port), }, - done: signal.NewDone(), + done: signal.NewDone(), + uplink: w.uplinkCounter, + downlink: w.downlinkCounter, } w.activeConn[id] = conn diff --git a/policy.go b/policy.go index f8dab11c..965ade50 100644 --- a/policy.go +++ b/policy.go @@ -27,6 +27,17 @@ type StatsPolicy struct { UserDownlink bool } +type SystemStatsPolicy struct { + // Whether or not to enable stat counter for uplink traffic in inbound handlers. + InboundUplink bool + // Whether or not to enable stat counter for downlink traffic in inbound handlers. + InboundDownlink bool +} + +type SystemPolicy struct { + Stats SystemStatsPolicy +} + // Policy is session based settings for controlling V2Ray requests. It contains various settings (or limits) that may differ for different users in the context. type Policy struct { Timeouts TimeoutPolicy // Timeout settings @@ -39,6 +50,9 @@ type PolicyManager interface { // ForLevel returns the Policy for the given user level. ForLevel(level uint32) Policy + + // ForSystem returns the Policy for V2Ray system. + ForSystem() SystemPolicy } // DefaultPolicy returns the Policy when user is not specified. @@ -47,8 +61,8 @@ func DefaultPolicy() Policy { Timeouts: TimeoutPolicy{ Handshake: time.Second * 4, ConnectionIdle: time.Second * 300, - UplinkOnly: time.Second * 5, - DownlinkOnly: time.Second * 30, + UplinkOnly: time.Second * 2, + DownlinkOnly: time.Second * 5, }, Stats: StatsPolicy{ UserUplink: false, diff --git a/stats.go b/stats.go index 17bab875..7ec0c814 100644 --- a/stats.go +++ b/stats.go @@ -17,6 +17,16 @@ type StatManager interface { GetCounter(string) StatCounter } +// GetOrRegisterStatCounter tries to get the StatCounter first. If not exist, it then tries to create a new counter. +func GetOrRegisterStatCounter(m StatManager, name string) (StatCounter, error) { + counter := m.GetCounter(name) + if counter != nil { + return counter, nil + } + + return m.RegisterCounter(name) +} + type syncStatManager struct { sync.RWMutex StatManager diff --git a/testing/scenarios/command_test.go b/testing/scenarios/command_test.go index 67ef8d07..a2330ab9 100644 --- a/testing/scenarios/command_test.go +++ b/testing/scenarios/command_test.go @@ -395,10 +395,16 @@ func TestCommanderStats(t *testing.T) { }, }, }, + System: &policy.SystemPolicy{ + Stats: &policy.SystemPolicy_Stats{ + InboundUplink: true, + }, + }, }), }, Inbound: []*core.InboundHandlerConfig{ { + Tag: "vmess", ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{ PortRange: net.SinglePortRange(serverPort), Listen: net.NewIPOrDomain(net.LocalHostIP), @@ -521,5 +527,12 @@ func TestCommanderStats(t *testing.T) { assert(sresp.Stat.Name, Equals, name) assert(sresp.Stat.Value, Equals, int64(0)) + sresp, err = sClient.GetStats(context.Background(), &statscmd.GetStatsRequest{ + Name: "inbound>>>vmess>>>traffic>>>uplink", + Reset_: true, + }) + assert(err, IsNil) + assert(sresp.Stat.Value, Equals, int64(10240*1024)) + CloseAllServers(servers) } diff --git a/transport/internet/connection.go b/transport/internet/connection.go index 2405ce1b..354febf3 100644 --- a/transport/internet/connection.go +++ b/transport/internet/connection.go @@ -7,3 +7,30 @@ import ( type Connection interface { net.Conn } + +type addInt64 interface { + Add(int64) int64 +} + +type StatCouterConnection struct { + Connection + Uplink addInt64 + Downlink addInt64 +} + +func (c *StatCouterConnection) Read(b []byte) (int, error) { + nBytes, err := c.Connection.Read(b) + if c.Uplink != nil { + c.Uplink.Add(int64(nBytes)) + } + + return nBytes, err +} + +func (c *StatCouterConnection) Write(b []byte) (int, error) { + nBytes, err := c.Connection.Write(b) + if c.Downlink != nil { + c.Downlink.Add(int64(nBytes)) + } + return nBytes, err +}