From fa37f82b89abcebee452595b7ab795eb93162e3c Mon Sep 17 00:00:00 2001 From: Vigilans Date: Wed, 23 Sep 2020 22:35:13 +0800 Subject: [PATCH] Stats: Add ChannelConfig & Return error on subscription --- app/stats/channel.go | 81 +++++++++++++++----------- app/stats/channel_test.go | 118 ++++++++++++++++++++++---------------- app/stats/config.go | 15 ----- app/stats/config.pb.go | 103 ++++++++++++++++++++++++++++++--- app/stats/config.proto | 8 ++- app/stats/stats.go | 30 ++++++++-- app/stats/stats_test.go | 3 +- features/stats/stats.go | 25 +++++++- 8 files changed, 267 insertions(+), 116 deletions(-) delete mode 100644 app/stats/config.go diff --git a/app/stats/channel.go b/app/stats/channel.go index 478994ce..dd484fab 100644 --- a/app/stats/channel.go +++ b/app/stats/channel.go @@ -5,15 +5,33 @@ package stats import ( "sync" "time" + + "v2ray.com/core/common" ) // Channel is an implementation of stats.Channel. type Channel struct { + channel chan interface{} + subscribers []chan interface{} + + // Synchronization components access sync.RWMutex closed chan struct{} - channel chan interface{} - subscribers []chan interface{} + // Channel options + subscriberLimit int // Set to 0 as no subscriber limit + channelBufferSize int // Set to 0 as no buffering + broadcastTimeout time.Duration // Set to 0 as non-blocking immediate timeout +} + +// NewChannel creates an instance of Statistics Channel. +func NewChannel(config *ChannelConfig) *Channel { + return &Channel{ + channel: make(chan interface{}, config.BufferSize), + subscriberLimit: int(config.SubscriberLimit), + channelBufferSize: int(config.BufferSize), + broadcastTimeout: time.Duration(config.BroadcastTimeout+1) * time.Millisecond, + } } // Channel returns the underlying go channel. @@ -31,16 +49,19 @@ func (c *Channel) Subscribers() []chan interface{} { } // Subscribe implements stats.Channel. -func (c *Channel) Subscribe() chan interface{} { +func (c *Channel) Subscribe() (chan interface{}, error) { c.access.Lock() defer c.access.Unlock() - subscriber := make(chan interface{}) + if c.subscriberLimit > 0 && len(c.subscribers) >= c.subscriberLimit { + return nil, newError("Number of subscribers has reached limit") + } + subscriber := make(chan interface{}, c.channelBufferSize) c.subscribers = append(c.subscribers, subscriber) - return subscriber + return subscriber, nil } // Unsubscribe implements stats.Channel. -func (c *Channel) Unsubscribe(subscriber chan interface{}) { +func (c *Channel) Unsubscribe(subscriber chan interface{}) error { c.access.Lock() defer c.access.Unlock() for i, s := range c.subscribers { @@ -50,9 +71,9 @@ func (c *Channel) Unsubscribe(subscriber chan interface{}) { copy(subscribers[:i], c.subscribers[:i]) copy(subscribers[i:], c.subscribers[i+1:]) c.subscribers = subscribers - return } } + return nil } // Publish implements stats.Channel. @@ -85,34 +106,30 @@ func (c *Channel) Running() bool { func (c *Channel) Start() error { c.access.Lock() defer c.access.Unlock() - if c.Running() { - return nil - } - if c.channel == nil { // Initialize publisher channel - c.channel = make(chan interface{}, 16) - } - c.closed = make(chan struct{}) // Reset close signal - go func() { - for { - select { - case message := <-c.channel: // Broadcast message - for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retreivement - select { - case sub <- message: // Successfully sent message - case <-time.After(100 * time.Millisecond): - c.Unsubscribe(sub) // Remove timeout subscriber - close(sub) // Actively close subscriber as notification + if !c.Running() { + c.closed = make(chan struct{}) // Reset close signal + go func() { + for { + select { + case message := <-c.channel: // Broadcast message + for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retreivement + select { + case sub <- message: // Successfully sent message + case <-time.After(c.broadcastTimeout): // Remove timeout subscriber + common.Must(c.Unsubscribe(sub)) + close(sub) // Actively close subscriber as notification + } } + case <-c.closed: // Channel closed + for _, sub := range c.Subscribers() { // Remove all subscribers + common.Must(c.Unsubscribe(sub)) + close(sub) + } + return } - case <-c.closed: // Channel closed - for _, sub := range c.Subscribers() { // Remove all subscribers - c.Unsubscribe(sub) - close(sub) - } - return } - } - }() + }() + } return nil } diff --git a/app/stats/channel_test.go b/app/stats/channel_test.go index 8e78ddb7..6458711b 100644 --- a/app/stats/channel_test.go +++ b/app/stats/channel_test.go @@ -1,7 +1,6 @@ package stats_test import ( - "context" "fmt" "testing" "time" @@ -12,25 +11,30 @@ import ( ) func TestStatsChannel(t *testing.T) { - raw, err := common.CreateObject(context.Background(), &Config{}) + // At most 2 subscribers could be registered + c := NewChannel(&ChannelConfig{SubscriberLimit: 2}) + source := c.Channel() + + a, err := stats.SubscribeRunnableChannel(c) + common.Must(err) + if !c.Running() { + t.Fatal("unexpected failure in running channel after first subscription") + } + + b, err := c.Subscribe() common.Must(err) - m := raw.(stats.Manager) - c, err := m.RegisterChannel("test.channel") - common.Must(err) - common.Must(m.Start()) - defer m.Close() - - source := c.(*Channel).Channel() - a := c.Subscribe() - b := c.Subscribe() - defer c.Unsubscribe(a) - defer c.Unsubscribe(b) + // Test that third subscriber is forbidden + _, err = c.Subscribe() + if err == nil { + t.Fatal("unexpected successful subscription") + } + t.Log("expected error: ", err) stopCh := make(chan struct{}) errCh := make(chan string) - go func() { + go func() { // Blocking publish source <- 1 source <- 2 source <- "3" @@ -84,22 +88,31 @@ func TestStatsChannel(t *testing.T) { t.Fatal(e) case <-stopCh: } + + // Test the unsubscription of channel + common.Must(c.Unsubscribe(b)) + + // Test the last subscriber will close channel with `UnsubscribeClosableChannel` + common.Must(stats.UnsubscribeClosableChannel(c, a)) + if c.Running() { + t.Fatal("unexpected running channel after unsubscribing the last subscriber") + } } func TestStatsChannelUnsubcribe(t *testing.T) { - raw, err := common.CreateObject(context.Background(), &Config{}) - common.Must(err) + c := NewChannel(&ChannelConfig{}) + common.Must(c.Start()) + defer c.Close() - m := raw.(stats.Manager) - c, err := m.RegisterChannel("test.channel") - common.Must(err) - common.Must(m.Start()) - defer m.Close() + source := c.Channel() - a := c.Subscribe() - b := c.Subscribe() + a, err := c.Subscribe() + common.Must(err) defer c.Unsubscribe(a) + b, err := c.Subscribe() + common.Must(err) + pauseCh := make(chan struct{}) stopCh := make(chan struct{}) errCh := make(chan string) @@ -119,10 +132,10 @@ func TestStatsChannelUnsubcribe(t *testing.T) { } } - go func() { - c.Publish(1) + go func() { // Blocking publish + source <- 1 <-pauseCh // Wait for `b` goroutine to resume sending message - c.Publish(2) + source <- 2 }() go func() { @@ -179,26 +192,27 @@ func TestStatsChannelUnsubcribe(t *testing.T) { } func TestStatsChannelTimeout(t *testing.T) { - raw, err := common.CreateObject(context.Background(), &Config{}) - common.Must(err) + // Do not use buffer so as to create blocking scenario + c := NewChannel(&ChannelConfig{BufferSize: 0, BroadcastTimeout: 50}) + common.Must(c.Start()) + defer c.Close() - m := raw.(stats.Manager) - c, err := m.RegisterChannel("test.channel") - common.Must(err) - common.Must(m.Start()) - defer m.Close() + source := c.Channel() - a := c.Subscribe() - b := c.Subscribe() + a, err := c.Subscribe() + common.Must(err) defer c.Unsubscribe(a) + + b, err := c.Subscribe() + common.Must(err) defer c.Unsubscribe(b) stopCh := make(chan struct{}) errCh := make(chan string) - go func() { - c.Publish(1) - c.Publish(2) + go func() { // Blocking publish + source <- 1 + source <- 2 }() go func() { @@ -229,7 +243,7 @@ func TestStatsChannelTimeout(t *testing.T) { errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) } // Block `b` channel for a time longer than `source`'s timeout - <-time.After(150 * time.Millisecond) + <-time.After(200 * time.Millisecond) { // Test `b` has been unsubscribed by source var aSet, bSet bool for _, s := range c.Subscribers() { @@ -264,25 +278,27 @@ func TestStatsChannelTimeout(t *testing.T) { } func TestStatsChannelConcurrency(t *testing.T) { - raw, err := common.CreateObject(context.Background(), &Config{}) - common.Must(err) + // Do not use buffer so as to create blocking scenario + c := NewChannel(&ChannelConfig{BufferSize: 0, BroadcastTimeout: 100}) + common.Must(c.Start()) + defer c.Close() - m := raw.(stats.Manager) - c, err := m.RegisterChannel("test.channel") - common.Must(err) - common.Must(m.Start()) - defer m.Close() + source := c.Channel() - a := c.Subscribe() - b := c.Subscribe() + a, err := c.Subscribe() + common.Must(err) defer c.Unsubscribe(a) + b, err := c.Subscribe() + common.Must(err) + defer c.Unsubscribe(b) + stopCh := make(chan struct{}) errCh := make(chan string) - go func() { - c.Publish(1) - c.Publish(2) + go func() { // Blocking publish + source <- 1 + source <- 2 }() go func() { diff --git a/app/stats/config.go b/app/stats/config.go deleted file mode 100644 index e124b17a..00000000 --- a/app/stats/config.go +++ /dev/null @@ -1,15 +0,0 @@ -// +build !confonly - -package stats - -import ( - "context" - - "v2ray.com/core/common" -) - -func init() { - common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { - return NewManager(ctx, config.(*Config)) - })) -} diff --git a/app/stats/config.pb.go b/app/stats/config.pb.go index f430b641..f9402fc7 100644 --- a/app/stats/config.pb.go +++ b/app/stats/config.pb.go @@ -63,18 +63,90 @@ func (*Config) Descriptor() ([]byte, []int) { return file_app_stats_config_proto_rawDescGZIP(), []int{0} } +type ChannelConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + SubscriberLimit int32 `protobuf:"varint,1,opt,name=SubscriberLimit,proto3" json:"SubscriberLimit,omitempty"` + BufferSize int32 `protobuf:"varint,2,opt,name=BufferSize,proto3" json:"BufferSize,omitempty"` + BroadcastTimeout int32 `protobuf:"varint,3,opt,name=BroadcastTimeout,proto3" json:"BroadcastTimeout,omitempty"` +} + +func (x *ChannelConfig) Reset() { + *x = ChannelConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_app_stats_config_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ChannelConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ChannelConfig) ProtoMessage() {} + +func (x *ChannelConfig) ProtoReflect() protoreflect.Message { + mi := &file_app_stats_config_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ChannelConfig.ProtoReflect.Descriptor instead. +func (*ChannelConfig) Descriptor() ([]byte, []int) { + return file_app_stats_config_proto_rawDescGZIP(), []int{1} +} + +func (x *ChannelConfig) GetSubscriberLimit() int32 { + if x != nil { + return x.SubscriberLimit + } + return 0 +} + +func (x *ChannelConfig) GetBufferSize() int32 { + if x != nil { + return x.BufferSize + } + return 0 +} + +func (x *ChannelConfig) GetBroadcastTimeout() int32 { + if x != nil { + return x.BroadcastTimeout + } + return 0 +} + var File_app_stats_config_proto protoreflect.FileDescriptor var file_app_stats_config_proto_rawDesc = []byte{ 0x0a, 0x16, 0x61, 0x70, 0x70, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x14, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x73, 0x22, 0x08, - 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x42, 0x4d, 0x0a, 0x18, 0x63, 0x6f, 0x6d, 0x2e, - 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x73, - 0x74, 0x61, 0x74, 0x73, 0x50, 0x01, 0x5a, 0x18, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x73, - 0xaa, 0x02, 0x14, 0x56, 0x32, 0x52, 0x61, 0x79, 0x2e, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x41, 0x70, - 0x70, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x85, 0x01, 0x0a, 0x0d, 0x43, 0x68, 0x61, + 0x6e, 0x6e, 0x65, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x28, 0x0a, 0x0f, 0x53, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x0f, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x4c, + 0x69, 0x6d, 0x69, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x42, 0x75, 0x66, 0x66, 0x65, 0x72, 0x53, 0x69, + 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x42, 0x75, 0x66, 0x66, 0x65, 0x72, + 0x53, 0x69, 0x7a, 0x65, 0x12, 0x2a, 0x0a, 0x10, 0x42, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, + 0x74, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x10, + 0x42, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, + 0x42, 0x4d, 0x0a, 0x18, 0x63, 0x6f, 0x6d, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, + 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x73, 0x50, 0x01, 0x5a, 0x18, + 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x61, + 0x70, 0x70, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x73, 0xaa, 0x02, 0x14, 0x56, 0x32, 0x52, 0x61, 0x79, + 0x2e, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -89,9 +161,10 @@ func file_app_stats_config_proto_rawDescGZIP() []byte { return file_app_stats_config_proto_rawDescData } -var file_app_stats_config_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_app_stats_config_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_app_stats_config_proto_goTypes = []interface{}{ - (*Config)(nil), // 0: v2ray.core.app.stats.Config + (*Config)(nil), // 0: v2ray.core.app.stats.Config + (*ChannelConfig)(nil), // 1: v2ray.core.app.stats.ChannelConfig } var file_app_stats_config_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for method output_type @@ -119,6 +192,18 @@ func file_app_stats_config_proto_init() { return nil } } + file_app_stats_config_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ChannelConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -126,7 +211,7 @@ func file_app_stats_config_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_app_stats_config_proto_rawDesc, NumEnums: 0, - NumMessages: 1, + NumMessages: 2, NumExtensions: 0, NumServices: 0, }, diff --git a/app/stats/config.proto b/app/stats/config.proto index bcbf847f..0ea911fd 100644 --- a/app/stats/config.proto +++ b/app/stats/config.proto @@ -7,5 +7,11 @@ option java_package = "com.v2ray.core.app.stats"; option java_multiple_files = true; message Config { - + +} + +message ChannelConfig { + int32 SubscriberLimit = 1; + int32 BufferSize = 2; + int32 BroadcastTimeout = 3; } diff --git a/app/stats/stats.go b/app/stats/stats.go index 0dd91ea4..8a5a1eb0 100644 --- a/app/stats/stats.go +++ b/app/stats/stats.go @@ -8,6 +8,8 @@ import ( "context" "sync" + "v2ray.com/core/common" + "v2ray.com/core/common/errors" "v2ray.com/core/features/stats" ) @@ -92,10 +94,10 @@ func (m *Manager) RegisterChannel(name string) (stats.Channel, error) { return nil, newError("Channel ", name, " already registered.") } newError("create new channel ", name).AtDebug().WriteToLog() - c := new(Channel) + c := NewChannel(&ChannelConfig{BufferSize: 16, BroadcastTimeout: 100}) m.channels[name] = c if m.running { - c.Start() + return c, c.Start() } return c, nil } @@ -108,7 +110,7 @@ func (m *Manager) UnregisterChannel(name string) error { if c, found := m.channels[name]; found { newError("remove channel ", name).AtDebug().WriteToLog() delete(m.channels, name) - c.Close() + return c.Close() } return nil } @@ -129,8 +131,14 @@ func (m *Manager) Start() error { m.access.Lock() defer m.access.Unlock() m.running = true + errs := []error{} for _, channel := range m.channels { - channel.Start() + if err := channel.Start(); err != nil { + errs = append(errs, err) + } + } + if len(errs) != 0 { + return errors.Combine(errs...) } return nil } @@ -140,10 +148,22 @@ func (m *Manager) Close() error { m.access.Lock() defer m.access.Unlock() m.running = false + errs := []error{} for name, channel := range m.channels { newError("remove channel ", name).AtDebug().WriteToLog() delete(m.channels, name) - channel.Close() + if err := channel.Close(); err != nil { + errs = append(errs, err) + } + } + if len(errs) != 0 { + return errors.Combine(errs...) } return nil } + +func init() { + common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { + return NewManager(ctx, config.(*Config)) + })) +} diff --git a/app/stats/stats_test.go b/app/stats/stats_test.go index f4079b80..1641021d 100644 --- a/app/stats/stats_test.go +++ b/app/stats/stats_test.go @@ -42,7 +42,8 @@ func TestStatsChannelRunnable(t *testing.T) { t.Fatalf("unexpected non-running channel: test.channel.%d", 2) } - s1 := c1.Subscribe() + s1, err := c1.Subscribe() + common.Must(err) common.Must(c1.Close()) if c1.Running() { diff --git a/features/stats/stats.go b/features/stats/stats.go index cfe6d307..73fae0f4 100644 --- a/features/stats/stats.go +++ b/features/stats/stats.go @@ -30,9 +30,30 @@ type Channel interface { // SubscriberCount returns the number of the subscribers. Subscribers() []chan interface{} // Subscribe registers for listening to channel stream and returns a new listener channel. - Subscribe() chan interface{} + Subscribe() (chan interface{}, error) // Unsubscribe unregisters a listener channel from current Channel object. - Unsubscribe(chan interface{}) + Unsubscribe(chan interface{}) error +} + +// SubscribeRunnableChannel subscribes the channel and starts it if there is first subscriber coming. +func SubscribeRunnableChannel(c Channel) (chan interface{}, error) { + if len(c.Subscribers()) == 0 { + if err := c.Start(); err != nil { + return nil, err + } + } + return c.Subscribe() +} + +// UnsubscribeClosableChannel unsubcribes the channel and close it if there is no more subscriber. +func UnsubscribeClosableChannel(c Channel, sub chan interface{}) error { + if err := c.Unsubscribe(sub); err != nil { + return err + } + if len(c.Subscribers()) == 0 { + return c.Close() + } + return nil } // Manager is the interface for stats manager.