split uplink and downlink traffic

pull/1008/head
Darien Raymond 2018-03-31 20:30:49 +02:00
parent 0975e26ed1
commit 6d98bc4607
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
7 changed files with 105 additions and 65 deletions

View File

@ -56,6 +56,40 @@ func getStatsName(u *protocol.User) string {
return "user>traffic>" + u.Email return "user>traffic>" + u.Email
} }
func (d *DefaultDispatcher) getStatCounter(name string) core.StatCounter {
c := d.stats.GetCounter(name)
if c != nil {
return c
}
c, err := d.stats.RegisterCounter(name)
if err != nil {
return nil
}
return c
}
func (d *DefaultDispatcher) getRayOption(user *protocol.User) []ray.Option {
var rayOptions []ray.Option
if user != nil && len(user.Email) > 0 {
p := d.policy.ForLevel(user.Level)
if p.Stats.UserUplink {
name := "user>>>" + user.Email + ">>>traffic>>>uplink"
if c := d.getStatCounter(name); c != nil {
rayOptions = append(rayOptions, ray.WithUplinkStatCounter(c))
}
}
if p.Stats.UserDownlink {
name := "user>>>" + user.Email + ">>>traffic>>>downlink"
if c := d.getStatCounter(name); c != nil {
rayOptions = append(rayOptions, ray.WithDownlinkStatCounter(c))
}
}
}
return rayOptions
}
// Dispatch implements core.Dispatcher. // Dispatch implements core.Dispatcher.
func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (ray.InboundRay, error) { func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (ray.InboundRay, error) {
if !destination.IsValid() { if !destination.IsValid() {
@ -63,24 +97,8 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin
} }
ctx = proxy.ContextWithTarget(ctx, destination) ctx = proxy.ContextWithTarget(ctx, destination)
var rayOptions []ray.Option
user := protocol.UserFromContext(ctx) user := protocol.UserFromContext(ctx)
if user != nil && len(user.Email) > 0 { rayOptions := d.getRayOption(user)
name := getStatsName(user)
c, err := d.stats.RegisterCounter(name)
if err != nil {
c = d.stats.GetCounter(name)
}
if c == nil {
newError("failed to get stats counter ", name).AtWarning().WithContext(ctx).WriteToLog()
}
p := d.policy.ForLevel(user.Level)
if p.Stats.EnablePerUser {
rayOptions = append(rayOptions, ray.WithStatCounter(c))
}
}
outbound := ray.New(ctx, rayOptions...) outbound := ray.New(ctx, rayOptions...)
snifferList := proxyman.ProtocolSniffersFromContext(ctx) snifferList := proxyman.ProtocolSniffersFromContext(ctx)

View File

@ -61,7 +61,8 @@ func (p *Policy) ToCorePolicy() core.Policy {
cp.Timeouts.UplinkOnly = p.Timeout.UplinkOnly.Duration() cp.Timeouts.UplinkOnly = p.Timeout.UplinkOnly.Duration()
} }
if p.Stats != nil { if p.Stats != nil {
cp.Stats.EnablePerUser = p.Stats.EnablePerUser cp.Stats.UserUplink = p.Stats.UserUplink
cp.Stats.UserDownlink = p.Stats.UserDownlink
} }
return cp return cp
} }

View File

@ -97,7 +97,8 @@ func (m *Policy_Timeout) GetDownlinkOnly() *Second {
} }
type Policy_Stats struct { type Policy_Stats struct {
EnablePerUser bool `protobuf:"varint,1,opt,name=enable_per_user,json=enablePerUser" json:"enable_per_user,omitempty"` UserUplink bool `protobuf:"varint,1,opt,name=user_uplink,json=userUplink" json:"user_uplink,omitempty"`
UserDownlink bool `protobuf:"varint,2,opt,name=user_downlink,json=userDownlink" json:"user_downlink,omitempty"`
} }
func (m *Policy_Stats) Reset() { *m = Policy_Stats{} } func (m *Policy_Stats) Reset() { *m = Policy_Stats{} }
@ -105,9 +106,16 @@ func (m *Policy_Stats) String() string { return proto.CompactTextStri
func (*Policy_Stats) ProtoMessage() {} func (*Policy_Stats) ProtoMessage() {}
func (*Policy_Stats) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1, 1} } func (*Policy_Stats) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1, 1} }
func (m *Policy_Stats) GetEnablePerUser() bool { func (m *Policy_Stats) GetUserUplink() bool {
if m != nil { if m != nil {
return m.EnablePerUser return m.UserUplink
}
return false
}
func (m *Policy_Stats) GetUserDownlink() bool {
if m != nil {
return m.UserDownlink
} }
return false return false
} }
@ -139,31 +147,31 @@ func init() {
func init() { proto.RegisterFile("v2ray.com/core/app/policy/config.proto", fileDescriptor0) } func init() { proto.RegisterFile("v2ray.com/core/app/policy/config.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{ var fileDescriptor0 = []byte{
// 403 bytes of a gzipped FileDescriptorProto // 410 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0xdd, 0x8a, 0xd3, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0x5d, 0xab, 0xd3, 0x30,
0x14, 0xc7, 0x49, 0x62, 0xb2, 0x7a, 0x6a, 0x5d, 0x19, 0x5c, 0x88, 0x05, 0x65, 0xa9, 0xb8, 0xf4, 0x18, 0xc7, 0x69, 0x6b, 0x7b, 0x8e, 0x4f, 0xcf, 0x54, 0x82, 0x07, 0xea, 0x40, 0x3d, 0x6c, 0x28,
0x6a, 0x02, 0xd9, 0x1b, 0x3f, 0x70, 0xc5, 0x15, 0x05, 0x41, 0xb0, 0xa4, 0x7e, 0x80, 0x37, 0x61, 0xbb, 0x4a, 0xa1, 0xbb, 0xf1, 0x05, 0x27, 0xce, 0x17, 0x10, 0x14, 0x47, 0xe6, 0x0b, 0x78, 0x33,
0x3a, 0x39, 0xda, 0xd0, 0xe9, 0xcc, 0x30, 0x49, 0x2a, 0x79, 0x0d, 0xdf, 0xc0, 0x5b, 0x1f, 0xc5, 0x62, 0x1a, 0x5d, 0x59, 0x96, 0x84, 0xbe, 0x4c, 0xfa, 0x35, 0xfc, 0x06, 0xde, 0xfa, 0xc9, 0xfc,
0xa7, 0x92, 0x64, 0x12, 0x72, 0xd3, 0x76, 0x7b, 0x37, 0x19, 0x7e, 0xff, 0x5f, 0x4e, 0x4e, 0xfe, 0x18, 0xd2, 0xa4, 0xa5, 0x37, 0xdb, 0xdc, 0x5d, 0xfa, 0xf0, 0xfb, 0xff, 0x78, 0x12, 0xfe, 0x85,
0x70, 0xb1, 0x8d, 0x0d, 0xab, 0x29, 0x57, 0x9b, 0x88, 0x2b, 0x83, 0x11, 0xd3, 0x3a, 0xd2, 0x4a, 0x87, 0xbb, 0x24, 0xa7, 0x35, 0x66, 0x6a, 0x1b, 0x33, 0x95, 0xf3, 0x98, 0x6a, 0x1d, 0x6b, 0x25,
0xe4, 0xbc, 0x8e, 0xb8, 0x92, 0x3f, 0xf2, 0x9f, 0x54, 0x1b, 0x55, 0x2a, 0x72, 0xd6, 0x73, 0x06, 0x32, 0x56, 0xc7, 0x4c, 0xc9, 0xef, 0xd9, 0x0f, 0xac, 0x73, 0x55, 0x2a, 0x74, 0xd9, 0x71, 0x39,
0x29, 0xd3, 0x9a, 0x5a, 0x66, 0xfa, 0x18, 0x82, 0x05, 0x72, 0x25, 0x33, 0xf2, 0x00, 0xfc, 0x2d, 0xc7, 0x54, 0x6b, 0x6c, 0x99, 0xd1, 0x3d, 0x08, 0x96, 0x9c, 0x29, 0x99, 0xa2, 0xdb, 0xe0, 0xef,
0x13, 0x15, 0x86, 0xce, 0xb9, 0x33, 0x1b, 0x27, 0xf6, 0x61, 0xfa, 0xcf, 0x83, 0x60, 0xde, 0xa2, 0xa8, 0xa8, 0x78, 0xe4, 0x5c, 0x39, 0x93, 0x01, 0xb1, 0x1f, 0xa3, 0xbf, 0x1e, 0x04, 0x0b, 0x83,
0xe4, 0x35, 0x9c, 0x94, 0xf9, 0x06, 0x55, 0x55, 0xb6, 0xc8, 0x28, 0x7e, 0x4a, 0x77, 0x3a, 0xa9, 0xa2, 0xe7, 0x70, 0x56, 0x66, 0x5b, 0xae, 0xaa, 0xd2, 0x20, 0x61, 0xf2, 0x00, 0xef, 0x75, 0x62,
0xe5, 0xe9, 0x67, 0x0b, 0x27, 0x7d, 0x8a, 0x3c, 0x07, 0xbf, 0x28, 0x59, 0x59, 0x84, 0x6e, 0x1b, 0xcb, 0xe3, 0x8f, 0x16, 0x26, 0x5d, 0x0a, 0x3d, 0x06, 0xbf, 0x28, 0x69, 0x59, 0x44, 0xae, 0x89,
0x7f, 0x72, 0x38, 0xbe, 0x68, 0xd0, 0xc4, 0x26, 0x26, 0xbf, 0x5d, 0x38, 0xe9, 0x7c, 0xe4, 0x25, 0x8f, 0x8f, 0xc7, 0x97, 0x0d, 0x4a, 0x6c, 0x62, 0xf8, 0xcb, 0x85, 0xb3, 0xd6, 0x87, 0x9e, 0xc2,
0xdc, 0x59, 0x31, 0x99, 0x15, 0x2b, 0xb6, 0xc6, 0x6e, 0x92, 0x47, 0x7b, 0x54, 0xf6, 0xd3, 0x92, 0xf5, 0x35, 0x95, 0x69, 0xb1, 0xa6, 0x1b, 0xde, 0x6e, 0x72, 0xf7, 0x80, 0xca, 0x5e, 0x8d, 0xf4,
0x81, 0x27, 0xef, 0xe1, 0x94, 0x2b, 0x29, 0x91, 0x97, 0xb9, 0x92, 0x69, 0x9e, 0x09, 0xec, 0xa6, 0x3c, 0x7a, 0x03, 0x37, 0x99, 0x92, 0x92, 0xb3, 0x32, 0x53, 0x72, 0x95, 0xa5, 0x82, 0xb7, 0xdb,
0xb9, 0x41, 0x71, 0x6f, 0x48, 0x7d, 0xc8, 0x04, 0x92, 0x2b, 0x18, 0x55, 0x5a, 0xe4, 0x72, 0x9d, 0xfc, 0x47, 0x71, 0xa3, 0x4f, 0xbd, 0x4d, 0x05, 0x47, 0x33, 0x08, 0x2b, 0x2d, 0x32, 0xb9, 0x59,
0x2a, 0x29, 0xea, 0xd0, 0x3b, 0xc6, 0x01, 0x36, 0xf1, 0x49, 0x8a, 0x9a, 0x5c, 0xc3, 0x38, 0x53, 0x29, 0x29, 0xea, 0xc8, 0x3b, 0xc5, 0x01, 0x36, 0xf1, 0x41, 0x8a, 0x1a, 0xcd, 0x61, 0x90, 0xaa,
0xbf, 0xe4, 0x60, 0xb8, 0x75, 0x8c, 0xe1, 0x6e, 0x9f, 0x69, 0x1c, 0x93, 0x08, 0xfc, 0x76, 0x49, 0x9f, 0xb2, 0x37, 0x5c, 0x3b, 0xc5, 0x70, 0xd1, 0x65, 0x1a, 0xc7, 0xf0, 0x3d, 0xf8, 0xe6, 0x91,
0xe4, 0x02, 0x4e, 0x51, 0xb2, 0xa5, 0xc0, 0x54, 0xa3, 0x49, 0xab, 0x02, 0x4d, 0xbb, 0x97, 0xdb, 0xd0, 0x7d, 0x08, 0xab, 0x82, 0xe7, 0x2b, 0xeb, 0x37, 0x6f, 0x72, 0x4e, 0xa0, 0x19, 0x7d, 0x32,
0xc9, 0xd8, 0x5e, 0xcf, 0xd1, 0x7c, 0x29, 0xd0, 0x4c, 0xff, 0x38, 0x10, 0xbc, 0x6d, 0x4b, 0x41, 0x13, 0x34, 0x86, 0x81, 0x01, 0xba, 0xb8, 0xb9, 0xf3, 0x39, 0xb9, 0x68, 0x86, 0xaf, 0xda, 0xd9,
0xae, 0xc0, 0x17, 0xb8, 0x45, 0x11, 0x3a, 0xe7, 0xde, 0x6c, 0x14, 0xcf, 0xf6, 0xbc, 0xd7, 0xd2, 0xe8, 0xb7, 0x03, 0xc1, 0x4b, 0x53, 0x19, 0x34, 0x03, 0x5f, 0xf0, 0x1d, 0x17, 0x91, 0x73, 0xe5,
0xf4, 0x63, 0x83, 0xbe, 0x93, 0xa5, 0xa9, 0x13, 0x1b, 0x9b, 0x7c, 0x03, 0x18, 0x2e, 0xc9, 0x7d, 0x4d, 0xc2, 0x64, 0x72, 0x60, 0x2b, 0x4b, 0xe3, 0x77, 0x0d, 0xfa, 0x5a, 0x96, 0x79, 0x4d, 0x6c,
0xf0, 0xd6, 0x58, 0x77, 0xcd, 0x69, 0x8e, 0xe4, 0xb2, 0x6f, 0xd3, 0xe1, 0xed, 0xda, 0x7f, 0xdd, 0x6c, 0xf8, 0x05, 0xa0, 0x1f, 0xa2, 0x5b, 0xe0, 0x6d, 0x78, 0xdd, 0xf6, 0xaa, 0x39, 0xa2, 0x69,
0x95, 0xed, 0x85, 0xfb, 0xcc, 0xb9, 0x7e, 0x05, 0x0f, 0xb9, 0xda, 0xec, 0xc6, 0xe7, 0xce, 0xf7, 0xd7, 0xb5, 0xe3, 0x6f, 0x6f, 0x9b, 0xd0, 0x56, 0xf1, 0x89, 0xfb, 0xc8, 0x99, 0x3f, 0x83, 0x3b,
0xc0, 0x9e, 0xfe, 0xba, 0x67, 0x5f, 0xe3, 0x84, 0x35, 0x03, 0x1a, 0xa4, 0x6f, 0xb4, 0xee, 0x4c, 0x4c, 0x6d, 0xf7, 0xe3, 0x0b, 0xe7, 0x6b, 0x60, 0x4f, 0x7f, 0xdc, 0xcb, 0xcf, 0x09, 0xa1, 0xcd,
0xcb, 0xa0, 0x6d, 0xfb, 0xe5, 0xff, 0x00, 0x00, 0x00, 0xff, 0xff, 0x1f, 0xad, 0x5f, 0x54, 0x17, 0x82, 0x39, 0xc7, 0x2f, 0xb4, 0x6e, 0x4d, 0xdf, 0x02, 0xf3, 0x2f, 0x4c, 0xff, 0x05, 0x00, 0x00,
0x03, 0x00, 0x00, 0xff, 0xff, 0x98, 0xa8, 0x2f, 0xbe, 0x35, 0x03, 0x00, 0x00,
} }

View File

@ -20,7 +20,8 @@ message Policy {
} }
message Stats { message Stats {
bool enable_per_user = 1; bool user_uplink = 1;
bool user_downlink = 2;
} }
Timeout timeout = 1; Timeout timeout = 1;

View File

@ -20,7 +20,8 @@ type TimeoutPolicy struct {
} }
type StatsPolicy struct { type StatsPolicy struct {
EnablePerUser bool UserUplink bool
UserDownlink bool
} }
// Policy is session based settings for controlling V2Ray requests. It contains various settings (or limits) that may differ for different users in the context. // Policy is session based settings for controlling V2Ray requests. It contains various settings (or limits) that may differ for different users in the context.
@ -47,7 +48,8 @@ func DefaultPolicy() Policy {
DownlinkOnly: time.Second * 30, DownlinkOnly: time.Second * 30,
}, },
Stats: StatsPolicy{ Stats: StatsPolicy{
EnablePerUser: false, UserUplink: false,
UserDownlink: false,
}, },
} }
} }

View File

@ -390,7 +390,8 @@ func TestCommanderStats(t *testing.T) {
}, },
1: { 1: {
Stats: &policy.Policy_Stats{ Stats: &policy.Policy_Stats{
EnablePerUser: true, UserUplink: true,
UserDownlink: true,
}, },
}, },
}, },
@ -502,7 +503,7 @@ func TestCommanderStats(t *testing.T) {
cmdConn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%d", cmdPort), grpc.WithInsecure(), grpc.WithBlock()) cmdConn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%d", cmdPort), grpc.WithInsecure(), grpc.WithBlock())
assert(err, IsNil) assert(err, IsNil)
const name = "user>traffic>test" const name = "user>>>test>>>traffic>>>uplink"
sClient := statscmd.NewStatsServiceClient(cmdConn) sClient := statscmd.NewStatsServiceClient(cmdConn)
sresp, err := sClient.GetStats(context.Background(), &statscmd.GetStatsRequest{ sresp, err := sClient.GetStats(context.Background(), &statscmd.GetStatsRequest{
@ -511,7 +512,7 @@ func TestCommanderStats(t *testing.T) {
}) })
assert(err, IsNil) assert(err, IsNil)
assert(sresp.Stat.Name, Equals, name) assert(sresp.Stat.Name, Equals, name)
assert(sresp.Stat.Value, Equals, int64(10240*1024*2)) assert(sresp.Stat.Value, Equals, int64(10240*1024))
sresp, err = sClient.GetStats(context.Background(), &statscmd.GetStatsRequest{ sresp, err = sClient.GetStats(context.Background(), &statscmd.GetStatsRequest{
Name: name, Name: name,

View File

@ -12,15 +12,23 @@ import (
"v2ray.com/core/common/signal" "v2ray.com/core/common/signal"
) )
type Option func(*Stream) type Option func(*directRay)
type addInt64 interface { type addInt64 interface {
Add(int64) int64 Add(int64) int64
} }
func WithStatCounter(c addInt64) Option { func WithUplinkStatCounter(c addInt64) Option {
return func(s *Stream) { return func(s *directRay) {
s.onDataSize = append(s.onDataSize, func(delta uint64) { s.Input.onDataSize = append(s.Input.onDataSize, func(delta uint64) {
c.Add(int64(delta))
})
}
}
func WithDownlinkStatCounter(c addInt64) Option {
return func(s *directRay) {
s.Output.onDataSize = append(s.Output.onDataSize, func(delta uint64) {
c.Add(int64(delta)) c.Add(int64(delta))
}) })
} }
@ -28,10 +36,14 @@ func WithStatCounter(c addInt64) Option {
// New creates a new Ray for direct traffic transport. // New creates a new Ray for direct traffic transport.
func New(ctx context.Context, opts ...Option) Ray { func New(ctx context.Context, opts ...Option) Ray {
return &directRay{ r := &directRay{
Input: NewStream(ctx, opts...), Input: NewStream(ctx),
Output: NewStream(ctx, opts...), Output: NewStream(ctx),
} }
for _, opt := range opts {
opt(r)
}
return r
} }
type directRay struct { type directRay struct {
@ -80,16 +92,13 @@ type Stream struct {
} }
// NewStream creates a new Stream. // NewStream creates a new Stream.
func NewStream(ctx context.Context, opts ...Option) *Stream { func NewStream(ctx context.Context) *Stream {
s := &Stream{ s := &Stream{
ctx: ctx, ctx: ctx,
readSignal: signal.NewNotifier(), readSignal: signal.NewNotifier(),
writeSignal: signal.NewNotifier(), writeSignal: signal.NewNotifier(),
size: 0, size: 0,
} }
for _, opt := range opts {
opt(s)
}
return s return s
} }