Stats: Implements blocking/non-blocking messaging of Channel (#250)

pull/2757/head
Ye Zhihao 2020-10-03 03:06:32 +08:00 committed by GitHub
parent 9f344fa1c9
commit 67f409de04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 401 additions and 283 deletions

View File

@ -6,6 +6,7 @@ package command
import ( import (
"context" "context"
"time"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -38,7 +39,8 @@ func (s *routingServer) TestRoute(ctx context.Context, request *TestRouteRequest
return nil, err return nil, err
} }
if request.PublishResult && s.routingStats != nil { if request.PublishResult && s.routingStats != nil {
s.routingStats.Publish(route) ctx, _ := context.WithTimeout(context.Background(), 4*time.Second) // nolint: lostcancel
s.routingStats.Publish(ctx, route)
} }
return AsProtobufMessage(request.FieldSelectors)(route), nil return AsProtobufMessage(request.FieldSelectors)(route), nil
} }
@ -55,10 +57,13 @@ func (s *routingServer) SubscribeRoutingStats(request *SubscribeRoutingStatsRequ
defer stats.UnsubscribeClosableChannel(s.routingStats, subscriber) // nolint: errcheck defer stats.UnsubscribeClosableChannel(s.routingStats, subscriber) // nolint: errcheck
for { for {
select { select {
case value, received := <-subscriber: case value, ok := <-subscriber:
if !ok {
return newError("Upstream closed the subscriber channel.")
}
route, ok := value.(routing.Route) route, ok := value.(routing.Route)
if !(received && ok) { if !ok {
return newError("Receiving upstream statistics failed.") return newError("Upstream sent malformed statistics.")
} }
err := stream.Send(genMessage(route)) err := stream.Send(genMessage(route))
if err != nil { if err != nil {

View File

@ -21,9 +21,9 @@ import (
func TestServiceSubscribeRoutingStats(t *testing.T) { func TestServiceSubscribeRoutingStats(t *testing.T) {
c := stats.NewChannel(&stats.ChannelConfig{ c := stats.NewChannel(&stats.ChannelConfig{
SubscriberLimit: 1, SubscriberLimit: 1,
BufferSize: 16, BufferSize: 0,
BroadcastTimeout: 100, Blocking: true,
}) })
common.Must(c.Start()) common.Must(c.Start())
defer c.Close() defer c.Close()
@ -55,122 +55,138 @@ func TestServiceSubscribeRoutingStats(t *testing.T) {
// Publisher goroutine // Publisher goroutine
go func() { go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second) publishTestCases := func() error {
defer cancel() ctx, cancel := context.WithTimeout(context.Background(), time.Second)
for { // Wait until there's one subscriber in routing stats channel defer cancel()
if len(c.Subscribers()) > 0 { for { // Wait until there's one subscriber in routing stats channel
break if len(c.Subscribers()) > 0 {
break
}
if ctx.Err() != nil {
return ctx.Err()
}
} }
if ctx.Err() != nil { for _, tc := range testCases {
errCh <- ctx.Err() c.Publish(context.Background(), AsRoutingRoute(tc))
time.Sleep(time.Millisecond)
} }
return nil
} }
for _, tc := range testCases {
c.Publish(AsRoutingRoute(tc)) if err := publishTestCases(); err != nil {
errCh <- err
} }
// Wait for next round of publishing // Wait for next round of publishing
<-nextPub <-nextPub
ctx, cancel = context.WithTimeout(context.Background(), time.Second) if err := publishTestCases(); err != nil {
defer cancel() errCh <- err
for { // Wait until there's one subscriber in routing stats channel
if len(c.Subscribers()) > 0 {
break
}
if ctx.Err() != nil {
errCh <- ctx.Err()
}
}
for _, tc := range testCases {
c.Publish(AsRoutingRoute(tc))
} }
}() }()
// Client goroutine // Client goroutine
go func() { go func() {
defer lis.Close()
conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure())
if err != nil { if err != nil {
errCh <- err errCh <- err
return
} }
defer lis.Close()
defer conn.Close() defer conn.Close()
client := NewRoutingServiceClient(conn) client := NewRoutingServiceClient(conn)
// Test retrieving all fields // Test retrieving all fields
streamCtx, streamClose := context.WithCancel(context.Background()) testRetrievingAllFields := func() error {
stream, err := client.SubscribeRoutingStats(streamCtx, &SubscribeRoutingStatsRequest{}) streamCtx, streamClose := context.WithCancel(context.Background())
if err != nil {
errCh <- err
}
for _, tc := range testCases { // Test the unsubscription of stream works well
msg, err := stream.Recv() defer func() {
streamClose()
timeOutCtx, timeout := context.WithTimeout(context.Background(), time.Second)
defer timeout()
for { // Wait until there's no subscriber in routing stats channel
if len(c.Subscribers()) == 0 {
break
}
if timeOutCtx.Err() != nil {
t.Error("unexpected subscribers not decreased in channel", timeOutCtx.Err())
}
}
}()
stream, err := client.SubscribeRoutingStats(streamCtx, &SubscribeRoutingStatsRequest{})
if err != nil { if err != nil {
errCh <- err return err
} }
if r := cmp.Diff(msg, tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
t.Error(r)
}
}
// Test that double subscription will fail for _, tc := range testCases {
errStream, err := client.SubscribeRoutingStats(context.Background(), &SubscribeRoutingStatsRequest{ msg, err := stream.Recv()
FieldSelectors: []string{"ip", "port", "domain", "outbound"}, if err != nil {
}) return err
if err != nil { }
errCh <- err if r := cmp.Diff(msg, tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
} t.Error(r)
if _, err := errStream.Recv(); err == nil { }
t.Error("unexpected successful subscription") }
}
// Test the unsubscription of stream works well // Test that double subscription will fail
streamClose() errStream, err := client.SubscribeRoutingStats(context.Background(), &SubscribeRoutingStatsRequest{
timeOutCtx, timeout := context.WithTimeout(context.Background(), time.Second) FieldSelectors: []string{"ip", "port", "domain", "outbound"},
defer timeout() })
for { // Wait until there's no subscriber in routing stats channel if err != nil {
if len(c.Subscribers()) == 0 { return err
break
} }
if timeOutCtx.Err() != nil { if _, err := errStream.Recv(); err == nil {
t.Error("unexpected subscribers not decreased in channel") t.Error("unexpected successful subscription")
errCh <- timeOutCtx.Err()
} }
return nil
} }
// Test retrieving only a subset of fields // Test retrieving only a subset of fields
streamCtx, streamClose = context.WithCancel(context.Background()) testRetrievingSubsetOfFields := func() error {
stream, err = client.SubscribeRoutingStats(streamCtx, &SubscribeRoutingStatsRequest{ streamCtx, streamClose := context.WithCancel(context.Background())
FieldSelectors: []string{"ip", "port", "domain", "outbound"}, defer streamClose()
}) stream, err := client.SubscribeRoutingStats(streamCtx, &SubscribeRoutingStatsRequest{
if err != nil { FieldSelectors: []string{"ip", "port", "domain", "outbound"},
})
if err != nil {
return err
}
// Send nextPub signal to start next round of publishing
close(nextPub)
for _, tc := range testCases {
msg, err := stream.Recv()
if err != nil {
return err
}
stat := &RoutingContext{ // Only a subset of stats is retrieved
SourceIPs: tc.SourceIPs,
TargetIPs: tc.TargetIPs,
SourcePort: tc.SourcePort,
TargetPort: tc.TargetPort,
TargetDomain: tc.TargetDomain,
OutboundGroupTags: tc.OutboundGroupTags,
OutboundTag: tc.OutboundTag,
}
if r := cmp.Diff(msg, stat, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
t.Error(r)
}
}
return nil
}
if err := testRetrievingAllFields(); err != nil {
errCh <- err errCh <- err
} }
if err := testRetrievingSubsetOfFields(); err != nil {
close(nextPub) // Send nextPub signal to start next round of publishing errCh <- err
for _, tc := range testCases {
msg, err := stream.Recv()
stat := &RoutingContext{ // Only a subset of stats is retrieved
SourceIPs: tc.SourceIPs,
TargetIPs: tc.TargetIPs,
SourcePort: tc.SourcePort,
TargetPort: tc.TargetPort,
TargetDomain: tc.TargetDomain,
OutboundGroupTags: tc.OutboundGroupTags,
OutboundTag: tc.OutboundTag,
}
if err != nil {
errCh <- err
}
if r := cmp.Diff(msg, stat, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
t.Error(r)
}
} }
streamClose() errCh <- nil // Client passed all tests successfully
// Client passed all tests successfully
errCh <- nil
}() }()
// Wait for goroutines to complete // Wait for goroutines to complete
@ -186,9 +202,9 @@ func TestServiceSubscribeRoutingStats(t *testing.T) {
func TestSerivceTestRoute(t *testing.T) { func TestSerivceTestRoute(t *testing.T) {
c := stats.NewChannel(&stats.ChannelConfig{ c := stats.NewChannel(&stats.ChannelConfig{
SubscriberLimit: 1, SubscriberLimit: 1,
BufferSize: 16, BufferSize: 16,
BroadcastTimeout: 100, Blocking: true,
}) })
common.Must(c.Start()) common.Must(c.Start())
defer c.Close() defer c.Close()
@ -249,11 +265,11 @@ func TestSerivceTestRoute(t *testing.T) {
// Client goroutine // Client goroutine
go func() { go func() {
defer lis.Close()
conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure())
if err != nil { if err != nil {
errCh <- err errCh <- err
} }
defer lis.Close()
defer conn.Close() defer conn.Close()
client := NewRoutingServiceClient(conn) client := NewRoutingServiceClient(conn)
@ -268,58 +284,69 @@ func TestSerivceTestRoute(t *testing.T) {
} }
// Test simple TestRoute // Test simple TestRoute
for _, tc := range testCases { testSimple := func() error {
route, err := client.TestRoute(context.Background(), &TestRouteRequest{RoutingContext: tc}) for _, tc := range testCases {
if err != nil { route, err := client.TestRoute(context.Background(), &TestRouteRequest{RoutingContext: tc})
errCh <- err if err != nil {
} return err
if r := cmp.Diff(route, tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" { }
t.Error(r) if r := cmp.Diff(route, tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
t.Error(r)
}
} }
return nil
} }
// Test TestRoute with special options // Test TestRoute with special options
sub, err := c.Subscribe() testOptions := func() error {
if err != nil { sub, err := c.Subscribe()
errCh <- err
}
for _, tc := range testCases {
route, err := client.TestRoute(context.Background(), &TestRouteRequest{
RoutingContext: tc,
FieldSelectors: []string{"ip", "port", "domain", "outbound"},
PublishResult: true,
})
stat := &RoutingContext{ // Only a subset of stats is retrieved
SourceIPs: tc.SourceIPs,
TargetIPs: tc.TargetIPs,
SourcePort: tc.SourcePort,
TargetPort: tc.TargetPort,
TargetDomain: tc.TargetDomain,
OutboundGroupTags: tc.OutboundGroupTags,
OutboundTag: tc.OutboundTag,
}
if err != nil { if err != nil {
errCh <- err return err
} }
if r := cmp.Diff(route, stat, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" { for _, tc := range testCases {
t.Error(r) route, err := client.TestRoute(context.Background(), &TestRouteRequest{
} RoutingContext: tc,
select { // Check that routing result has been published to statistics channel FieldSelectors: []string{"ip", "port", "domain", "outbound"},
case msg, received := <-sub: PublishResult: true,
if route, ok := msg.(routing.Route); received && ok { })
if r := cmp.Diff(AsProtobufMessage(nil)(route), tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" { if err != nil {
t.Error(r) return err
} }
} else { stat := &RoutingContext{ // Only a subset of stats is retrieved
t.Error("unexpected failure in receiving published routing result") SourceIPs: tc.SourceIPs,
TargetIPs: tc.TargetIPs,
SourcePort: tc.SourcePort,
TargetPort: tc.TargetPort,
TargetDomain: tc.TargetDomain,
OutboundGroupTags: tc.OutboundGroupTags,
OutboundTag: tc.OutboundTag,
}
if r := cmp.Diff(route, stat, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
t.Error(r)
}
select { // Check that routing result has been published to statistics channel
case msg, received := <-sub:
if route, ok := msg.(routing.Route); received && ok {
if r := cmp.Diff(AsProtobufMessage(nil)(route), tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
t.Error(r)
}
} else {
t.Error("unexpected failure in receiving published routing result for testcase", tc)
}
case <-time.After(100 * time.Millisecond):
t.Error("unexpected failure in receiving published routing result", tc)
} }
case <-time.After(100 * time.Millisecond):
t.Error("unexpected failure in receiving published routing result")
} }
return nil
} }
// Client passed all tests successfully if err := testSimple(); err != nil {
errCh <- nil errCh <- err
}
if err := testOptions(); err != nil {
errCh <- err
}
errCh <- nil // Client passed all tests successfully
}() }()
// Wait for goroutines to complete // Wait for goroutines to complete

View File

@ -3,15 +3,15 @@
package stats package stats
import ( import (
"context"
"sync" "sync"
"time"
"v2ray.com/core/common" "v2ray.com/core/common"
) )
// Channel is an implementation of stats.Channel. // Channel is an implementation of stats.Channel.
type Channel struct { type Channel struct {
channel chan interface{} channel chan channelMessage
subscribers []chan interface{} subscribers []chan interface{}
// Synchronization components // Synchronization components
@ -19,28 +19,21 @@ type Channel struct {
closed chan struct{} closed chan struct{}
// Channel options // Channel options
subscriberLimit int // Set to 0 as no subscriber limit blocking bool // Set blocking state if channel buffer reaches limit
channelBufferSize int // Set to 0 as no buffering bufferSize int // Set to 0 as no buffering
broadcastTimeout time.Duration // Set to 0 as non-blocking immediate timeout subsLimit int // Set to 0 as no subscriber limit
} }
// NewChannel creates an instance of Statistics Channel. // NewChannel creates an instance of Statistics Channel.
func NewChannel(config *ChannelConfig) *Channel { func NewChannel(config *ChannelConfig) *Channel {
return &Channel{ return &Channel{
channel: make(chan interface{}, config.BufferSize), channel: make(chan channelMessage, config.BufferSize),
subscriberLimit: int(config.SubscriberLimit), subsLimit: int(config.SubscriberLimit),
channelBufferSize: int(config.BufferSize), bufferSize: int(config.BufferSize),
broadcastTimeout: time.Duration(config.BroadcastTimeout+1) * time.Millisecond, blocking: config.Blocking,
} }
} }
// Channel returns the underlying go channel.
func (c *Channel) Channel() chan interface{} {
c.access.RLock()
defer c.access.RUnlock()
return c.channel
}
// Subscribers implements stats.Channel. // Subscribers implements stats.Channel.
func (c *Channel) Subscribers() []chan interface{} { func (c *Channel) Subscribers() []chan interface{} {
c.access.RLock() c.access.RLock()
@ -52,10 +45,10 @@ func (c *Channel) Subscribers() []chan interface{} {
func (c *Channel) Subscribe() (chan interface{}, error) { func (c *Channel) Subscribe() (chan interface{}, error) {
c.access.Lock() c.access.Lock()
defer c.access.Unlock() defer c.access.Unlock()
if c.subscriberLimit > 0 && len(c.subscribers) >= c.subscriberLimit { if c.subsLimit > 0 && len(c.subscribers) >= c.subsLimit {
return nil, newError("Number of subscribers has reached limit") return nil, newError("Number of subscribers has reached limit")
} }
subscriber := make(chan interface{}, c.channelBufferSize) subscriber := make(chan interface{}, c.bufferSize)
c.subscribers = append(c.subscribers, subscriber) c.subscribers = append(c.subscribers, subscriber)
return subscriber, nil return subscriber, nil
} }
@ -77,16 +70,17 @@ func (c *Channel) Unsubscribe(subscriber chan interface{}) error {
} }
// Publish implements stats.Channel. // Publish implements stats.Channel.
func (c *Channel) Publish(message interface{}) { func (c *Channel) Publish(ctx context.Context, msg interface{}) {
select { // Early exit if channel closed select { // Early exit if channel closed
case <-c.closed: case <-c.closed:
return return
default: default:
} pub := channelMessage{context: ctx, message: msg}
select { // Drop message if not successfully sent if c.blocking {
case c.channel <- message: pub.publish(c.channel)
default: } else {
return pub.publishNonBlocking(c.channel)
}
} }
} }
@ -111,13 +105,12 @@ func (c *Channel) Start() error {
go func() { go func() {
for { for {
select { select {
case message := <-c.channel: // Broadcast message case pub := <-c.channel: // Published message received
for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retreivement for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retrievement
select { if c.blocking {
case sub <- message: // Successfully sent message pub.broadcast(sub)
case <-time.After(c.broadcastTimeout): // Remove timeout subscriber } else {
common.Must(c.Unsubscribe(sub)) pub.broadcastNonBlocking(sub)
close(sub) // Actively close subscriber as notification
} }
} }
case <-c.closed: // Channel closed case <-c.closed: // Channel closed
@ -142,3 +135,40 @@ func (c *Channel) Close() error {
} }
return nil return nil
} }
// channelMessage is the published message with guaranteed delivery.
// message is discarded only when the context is early cancelled.
type channelMessage struct {
context context.Context
message interface{}
}
func (c channelMessage) publish(publisher chan channelMessage) {
select {
case publisher <- c:
case <-c.context.Done():
}
}
func (c channelMessage) publishNonBlocking(publisher chan channelMessage) {
select {
case publisher <- c:
default: // Create another goroutine to keep sending message
go c.publish(publisher)
}
}
func (c channelMessage) broadcast(subscriber chan interface{}) {
select {
case subscriber <- c.message:
case <-c.context.Done():
}
}
func (c channelMessage) broadcastNonBlocking(subscriber chan interface{}) {
select {
case subscriber <- c.message:
default: // Create another goroutine to keep sending message
go c.broadcast(subscriber)
}
}

View File

@ -1,6 +1,7 @@
package stats_test package stats_test
import ( import (
"context"
"fmt" "fmt"
"testing" "testing"
"time" "time"
@ -12,8 +13,7 @@ import (
func TestStatsChannel(t *testing.T) { func TestStatsChannel(t *testing.T) {
// At most 2 subscribers could be registered // At most 2 subscribers could be registered
c := NewChannel(&ChannelConfig{SubscriberLimit: 2}) c := NewChannel(&ChannelConfig{SubscriberLimit: 2, Blocking: true})
source := c.Channel()
a, err := stats.SubscribeRunnableChannel(c) a, err := stats.SubscribeRunnableChannel(c)
common.Must(err) common.Must(err)
@ -34,21 +34,12 @@ func TestStatsChannel(t *testing.T) {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
errCh := make(chan string) errCh := make(chan string)
go func() { // Blocking publish go func() {
source <- 1 c.Publish(context.Background(), 1)
source <- 2 c.Publish(context.Background(), 2)
source <- "3" c.Publish(context.Background(), "3")
source <- []int{4} c.Publish(context.Background(), []int{4})
source <- nil // Dummy messsage with no subscriber receiving, will block reading goroutine stopCh <- struct{}{}
for i := 0; i < cap(source); i++ {
source <- nil // Fill source channel's buffer
}
select {
case source <- nil: // Source writing should be blocked here, for last message was not cleared and buffer was full
errCh <- fmt.Sprint("unexpected non-blocked source channel")
default:
close(stopCh)
}
}() }()
go func() { go func() {
@ -64,6 +55,7 @@ func TestStatsChannel(t *testing.T) {
if v, ok := (<-a).([]int); !ok || v[0] != 4 { if v, ok := (<-a).([]int); !ok || v[0] != 4 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4}) errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4})
} }
stopCh <- struct{}{}
}() }()
go func() { go func() {
@ -79,14 +71,18 @@ func TestStatsChannel(t *testing.T) {
if v, ok := (<-b).([]int); !ok || v[0] != 4 { if v, ok := (<-b).([]int); !ok || v[0] != 4 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4}) errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4})
} }
stopCh <- struct{}{}
}() }()
select { timeout := time.After(2 * time.Second)
case <-time.After(2 * time.Second): for i := 0; i < 3; i++ {
t.Fatal("Test timeout after 2s") select {
case e := <-errCh: case <-timeout:
t.Fatal(e) t.Fatal("Test timeout after 2s")
case <-stopCh: case e := <-errCh:
t.Fatal(e)
case <-stopCh:
}
} }
// Test the unsubscription of channel // Test the unsubscription of channel
@ -100,12 +96,10 @@ func TestStatsChannel(t *testing.T) {
} }
func TestStatsChannelUnsubcribe(t *testing.T) { func TestStatsChannelUnsubcribe(t *testing.T) {
c := NewChannel(&ChannelConfig{}) c := NewChannel(&ChannelConfig{Blocking: true})
common.Must(c.Start()) common.Must(c.Start())
defer c.Close() defer c.Close()
source := c.Channel()
a, err := c.Subscribe() a, err := c.Subscribe()
common.Must(err) common.Must(err)
defer c.Unsubscribe(a) defer c.Unsubscribe(a)
@ -133,9 +127,9 @@ func TestStatsChannelUnsubcribe(t *testing.T) {
} }
go func() { // Blocking publish go func() { // Blocking publish
source <- 1 c.Publish(context.Background(), 1)
<-pauseCh // Wait for `b` goroutine to resume sending message <-pauseCh // Wait for `b` goroutine to resume sending message
source <- 2 c.Publish(context.Background(), 2)
}() }()
go func() { go func() {
@ -151,7 +145,7 @@ func TestStatsChannelUnsubcribe(t *testing.T) {
if v, ok := (<-b).(int); !ok || v != 1 { if v, ok := (<-b).(int); !ok || v != 1 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
} }
// Unsubscribe `b` while `source`'s messaging is paused // Unsubscribe `b` while publishing is paused
c.Unsubscribe(b) c.Unsubscribe(b)
{ // Test `b` is not in subscribers { // Test `b` is not in subscribers
var aSet, bSet bool var aSet, bSet bool
@ -167,7 +161,7 @@ func TestStatsChannelUnsubcribe(t *testing.T) {
errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers()) errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
} }
} }
// Resume `source`'s progress // Resume publishing progress
close(pauseCh) close(pauseCh)
// Test `b` is neither closed nor able to receive any data // Test `b` is neither closed nor able to receive any data
select { select {
@ -191,78 +185,142 @@ func TestStatsChannelUnsubcribe(t *testing.T) {
} }
} }
func TestStatsChannelTimeout(t *testing.T) { func TestStatsChannelBlocking(t *testing.T) {
// Do not use buffer so as to create blocking scenario // Do not use buffer so as to create blocking scenario
c := NewChannel(&ChannelConfig{BufferSize: 0, BroadcastTimeout: 50}) c := NewChannel(&ChannelConfig{BufferSize: 0, Blocking: true})
common.Must(c.Start()) common.Must(c.Start())
defer c.Close() defer c.Close()
source := c.Channel()
a, err := c.Subscribe() a, err := c.Subscribe()
common.Must(err) common.Must(err)
defer c.Unsubscribe(a) defer c.Unsubscribe(a)
b, err := c.Subscribe() pauseCh := make(chan struct{})
common.Must(err)
defer c.Unsubscribe(b)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
errCh := make(chan string) errCh := make(chan string)
go func() { // Blocking publish ctx, cancel := context.WithCancel(context.Background())
source <- 1
source <- 2 // Test blocking channel publishing
go func() {
// Dummy messsage with no subscriber receiving, will block broadcasting goroutine
c.Publish(context.Background(), nil)
<-pauseCh
// Publishing should be blocked here, for last message was not cleared and buffer was full
c.Publish(context.Background(), nil)
pauseCh <- struct{}{}
// Publishing should still be blocked here
c.Publish(ctx, nil)
// Check publishing is done because context is canceled
select {
case <-ctx.Done():
if ctx.Err() != context.Canceled {
errCh <- fmt.Sprint("unexpected error: ", ctx.Err())
}
default:
errCh <- "unexpected non-blocked publishing"
}
close(stopCh)
}() }()
go func() { go func() {
if v, ok := (<-a).(int); !ok || v != 1 { pauseCh <- struct{}{}
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
select {
case <-pauseCh:
errCh <- "unexpected non-blocked publishing"
case <-time.After(100 * time.Millisecond):
} }
if v, ok := (<-a).(int); !ok || v != 2 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2) // Receive first published message
<-a
select {
case <-pauseCh:
case <-time.After(100 * time.Millisecond):
errCh <- "unexpected blocking publishing"
} }
{ // Test `b` is still in subscribers yet (because `a` receives 2 first)
var aSet, bSet bool // Manually cancel the context to end publishing
for _, s := range c.Subscribers() { cancel()
if s == a { }()
aSet = true
} select {
if s == b { case <-time.After(2 * time.Second):
bSet = true t.Fatal("Test timeout after 2s")
} case e := <-errCh:
} t.Fatal(e)
if !(aSet && bSet) { case <-stopCh:
errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers()) }
}
func TestStatsChannelNonBlocking(t *testing.T) {
// Do not use buffer so as to create blocking scenario
c := NewChannel(&ChannelConfig{BufferSize: 0, Blocking: false})
common.Must(c.Start())
defer c.Close()
a, err := c.Subscribe()
common.Must(err)
defer c.Unsubscribe(a)
pauseCh := make(chan struct{})
stopCh := make(chan struct{})
errCh := make(chan string)
ctx, cancel := context.WithCancel(context.Background())
// Test blocking channel publishing
go func() {
c.Publish(context.Background(), nil)
c.Publish(context.Background(), nil)
pauseCh <- struct{}{}
<-pauseCh
c.Publish(ctx, nil)
c.Publish(ctx, nil)
// Check publishing is done because context is canceled
select {
case <-ctx.Done():
if ctx.Err() != context.Canceled {
errCh <- fmt.Sprint("unexpected error: ", ctx.Err())
} }
case <-time.After(100 * time.Millisecond):
errCh <- "unexpected non-cancelled publishing"
} }
}() }()
go func() { go func() {
if v, ok := (<-b).(int); !ok || v != 1 { // Check publishing won't block even if there is no subscriber receiving message
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) select {
case <-pauseCh:
case <-time.After(100 * time.Millisecond):
errCh <- "unexpected blocking publishing"
} }
// Block `b` channel for a time longer than `source`'s timeout
<-time.After(200 * time.Millisecond) // Receive first and second published message
{ // Test `b` has been unsubscribed by source <-a
var aSet, bSet bool <-a
for _, s := range c.Subscribers() {
if s == a { pauseCh <- struct{}{}
aSet = true
} // Manually cancel the context to end publishing
if s == b { cancel()
bSet = true
} // Check third and forth published message is cancelled and cannot receive
} <-time.After(100 * time.Millisecond)
if !(aSet && !bSet) { select {
errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers()) case <-a:
} errCh <- "unexpected non-cancelled publishing"
default:
} }
select { // Test `b` has been closed by source select {
case v, ok := <-b: case <-a:
if ok { errCh <- "unexpected non-cancelled publishing"
errCh <- fmt.Sprint("unexpected data received: ", v)
}
default: default:
} }
close(stopCh) close(stopCh)
@ -279,12 +337,10 @@ func TestStatsChannelTimeout(t *testing.T) {
func TestStatsChannelConcurrency(t *testing.T) { func TestStatsChannelConcurrency(t *testing.T) {
// Do not use buffer so as to create blocking scenario // Do not use buffer so as to create blocking scenario
c := NewChannel(&ChannelConfig{BufferSize: 0, BroadcastTimeout: 100}) c := NewChannel(&ChannelConfig{BufferSize: 0, Blocking: true})
common.Must(c.Start()) common.Must(c.Start())
defer c.Close() defer c.Close()
source := c.Channel()
a, err := c.Subscribe() a, err := c.Subscribe()
common.Must(err) common.Must(err)
defer c.Unsubscribe(a) defer c.Unsubscribe(a)
@ -297,8 +353,8 @@ func TestStatsChannelConcurrency(t *testing.T) {
errCh := make(chan string) errCh := make(chan string)
go func() { // Blocking publish go func() { // Blocking publish
source <- 1 c.Publish(context.Background(), 1)
source <- 2 c.Publish(context.Background(), 2)
}() }()
go func() { go func() {
@ -311,8 +367,7 @@ func TestStatsChannelConcurrency(t *testing.T) {
}() }()
go func() { go func() {
// Block `b` for a time shorter than `source`'s timeout // Block `b` for a time so as to ensure source channel is trying to send message to `b`.
// So as to ensure source channel is trying to send message to `b`.
<-time.After(25 * time.Millisecond) <-time.After(25 * time.Millisecond)
// This causes concurrency scenario: unsubscribe `b` while trying to send message to it // This causes concurrency scenario: unsubscribe `b` while trying to send message to it
c.Unsubscribe(b) c.Unsubscribe(b)

View File

@ -68,9 +68,9 @@ type ChannelConfig struct {
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
SubscriberLimit int32 `protobuf:"varint,1,opt,name=SubscriberLimit,proto3" json:"SubscriberLimit,omitempty"` Blocking bool `protobuf:"varint,1,opt,name=Blocking,proto3" json:"Blocking,omitempty"`
BufferSize int32 `protobuf:"varint,2,opt,name=BufferSize,proto3" json:"BufferSize,omitempty"` SubscriberLimit int32 `protobuf:"varint,2,opt,name=SubscriberLimit,proto3" json:"SubscriberLimit,omitempty"`
BroadcastTimeout int32 `protobuf:"varint,3,opt,name=BroadcastTimeout,proto3" json:"BroadcastTimeout,omitempty"` BufferSize int32 `protobuf:"varint,3,opt,name=BufferSize,proto3" json:"BufferSize,omitempty"`
} }
func (x *ChannelConfig) Reset() { func (x *ChannelConfig) Reset() {
@ -105,6 +105,13 @@ func (*ChannelConfig) Descriptor() ([]byte, []int) {
return file_app_stats_config_proto_rawDescGZIP(), []int{1} return file_app_stats_config_proto_rawDescGZIP(), []int{1}
} }
func (x *ChannelConfig) GetBlocking() bool {
if x != nil {
return x.Blocking
}
return false
}
func (x *ChannelConfig) GetSubscriberLimit() int32 { func (x *ChannelConfig) GetSubscriberLimit() int32 {
if x != nil { if x != nil {
return x.SubscriberLimit return x.SubscriberLimit
@ -119,34 +126,26 @@ func (x *ChannelConfig) GetBufferSize() int32 {
return 0 return 0
} }
func (x *ChannelConfig) GetBroadcastTimeout() int32 {
if x != nil {
return x.BroadcastTimeout
}
return 0
}
var File_app_stats_config_proto protoreflect.FileDescriptor var File_app_stats_config_proto protoreflect.FileDescriptor
var file_app_stats_config_proto_rawDesc = []byte{ var file_app_stats_config_proto_rawDesc = []byte{
0x0a, 0x16, 0x61, 0x70, 0x70, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x0a, 0x16, 0x61, 0x70, 0x70, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x2f, 0x63, 0x6f, 0x6e, 0x66,
0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x14, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x14, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e,
0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x73, 0x22, 0x08, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x73, 0x22, 0x08,
0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x85, 0x01, 0x0a, 0x0d, 0x43, 0x68, 0x61, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x75, 0x0a, 0x0d, 0x43, 0x68, 0x61, 0x6e,
0x6e, 0x6e, 0x65, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x28, 0x0a, 0x0f, 0x53, 0x75, 0x6e, 0x65, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1a, 0x0a, 0x08, 0x42, 0x6c, 0x6f,
0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x01, 0x20, 0x63, 0x6b, 0x69, 0x6e, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x42, 0x6c, 0x6f,
0x01, 0x28, 0x05, 0x52, 0x0f, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x4c, 0x63, 0x6b, 0x69, 0x6e, 0x67, 0x12, 0x28, 0x0a, 0x0f, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69,
0x69, 0x6d, 0x69, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x42, 0x75, 0x66, 0x66, 0x65, 0x72, 0x53, 0x69, 0x62, 0x65, 0x72, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0f,
0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x42, 0x75, 0x66, 0x66, 0x65, 0x72, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12,
0x53, 0x69, 0x7a, 0x65, 0x12, 0x2a, 0x0a, 0x10, 0x42, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x1e, 0x0a, 0x0a, 0x42, 0x75, 0x66, 0x66, 0x65, 0x72, 0x53, 0x69, 0x7a, 0x65, 0x18, 0x03, 0x20,
0x74, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x10, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x42, 0x75, 0x66, 0x66, 0x65, 0x72, 0x53, 0x69, 0x7a, 0x65, 0x42,
0x42, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x4d, 0x0a, 0x18, 0x63, 0x6f, 0x6d, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72,
0x42, 0x4d, 0x0a, 0x18, 0x63, 0x6f, 0x6d, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x73, 0x50, 0x01, 0x5a, 0x18, 0x76,
0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x73, 0x50, 0x01, 0x5a, 0x18, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x61, 0x70,
0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x61, 0x70, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x73, 0xaa, 0x02, 0x14, 0x56, 0x32, 0x52, 0x61, 0x79, 0x2e,
0x70, 0x70, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x73, 0xaa, 0x02, 0x14, 0x56, 0x32, 0x52, 0x61, 0x79, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x62, 0x06,
0x2e, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x62, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (

View File

@ -11,7 +11,7 @@ message Config {
} }
message ChannelConfig { message ChannelConfig {
int32 SubscriberLimit = 1; bool Blocking = 1;
int32 BufferSize = 2; int32 SubscriberLimit = 2;
int32 BroadcastTimeout = 3; int32 BufferSize = 3;
} }

View File

@ -94,7 +94,7 @@ func (m *Manager) RegisterChannel(name string) (stats.Channel, error) {
return nil, newError("Channel ", name, " already registered.") return nil, newError("Channel ", name, " already registered.")
} }
newError("create new channel ", name).AtDebug().WriteToLog() newError("create new channel ", name).AtDebug().WriteToLog()
c := NewChannel(&ChannelConfig{BufferSize: 16, BroadcastTimeout: 100}) c := NewChannel(&ChannelConfig{BufferSize: 64, Blocking: false})
m.channels[name] = c m.channels[name] = c
if m.running { if m.running {
return c, c.Start() return c, c.Start()

View File

@ -3,6 +3,8 @@ package stats
//go:generate errorgen //go:generate errorgen
import ( import (
"context"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/features" "v2ray.com/core/features"
) )
@ -25,8 +27,8 @@ type Counter interface {
type Channel interface { type Channel interface {
// Channel is a runnable unit. // Channel is a runnable unit.
common.Runnable common.Runnable
// Publish broadcasts a message through the channel. // Publish broadcasts a message through the channel with a controlling context.
Publish(interface{}) Publish(context.Context, interface{})
// SubscriberCount returns the number of the subscribers. // SubscriberCount returns the number of the subscribers.
Subscribers() []chan interface{} Subscribers() []chan interface{}
// Subscribe registers for listening to channel stream and returns a new listener channel. // Subscribe registers for listening to channel stream and returns a new listener channel.