mirror of https://github.com/XTLS/Xray-core
				
				
				
			
		
			
				
	
	
		
			406 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			406 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Go
		
	
	
package stats_test
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	. "github.com/xtls/xray-core/app/stats"
 | 
						|
	"github.com/xtls/xray-core/common"
 | 
						|
	"github.com/xtls/xray-core/features/stats"
 | 
						|
)
 | 
						|
 | 
						|
func TestStatsChannel(t *testing.T) {
 | 
						|
	// At most 2 subscribers could be registered
 | 
						|
	c := NewChannel(&ChannelConfig{SubscriberLimit: 2, Blocking: true})
 | 
						|
 | 
						|
	a, err := stats.SubscribeRunnableChannel(c)
 | 
						|
	common.Must(err)
 | 
						|
	if !c.Running() {
 | 
						|
		t.Fatal("unexpected failure in running channel after first subscription")
 | 
						|
	}
 | 
						|
 | 
						|
	b, err := c.Subscribe()
 | 
						|
	common.Must(err)
 | 
						|
 | 
						|
	// Test that third subscriber is forbidden
 | 
						|
	_, err = c.Subscribe()
 | 
						|
	if err == nil {
 | 
						|
		t.Fatal("unexpected successful subscription")
 | 
						|
	}
 | 
						|
	t.Log("expected error: ", err)
 | 
						|
 | 
						|
	stopCh := make(chan struct{})
 | 
						|
	errCh := make(chan string)
 | 
						|
 | 
						|
	go func() {
 | 
						|
		c.Publish(context.Background(), 1)
 | 
						|
		c.Publish(context.Background(), 2)
 | 
						|
		c.Publish(context.Background(), "3")
 | 
						|
		c.Publish(context.Background(), []int{4})
 | 
						|
		stopCh <- struct{}{}
 | 
						|
	}()
 | 
						|
 | 
						|
	go func() {
 | 
						|
		if v, ok := (<-a).(int); !ok || v != 1 {
 | 
						|
			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
 | 
						|
		}
 | 
						|
		if v, ok := (<-a).(int); !ok || v != 2 {
 | 
						|
			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
 | 
						|
		}
 | 
						|
		if v, ok := (<-a).(string); !ok || v != "3" {
 | 
						|
			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3")
 | 
						|
		}
 | 
						|
		if v, ok := (<-a).([]int); !ok || v[0] != 4 {
 | 
						|
			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4})
 | 
						|
		}
 | 
						|
		stopCh <- struct{}{}
 | 
						|
	}()
 | 
						|
 | 
						|
	go func() {
 | 
						|
		if v, ok := (<-b).(int); !ok || v != 1 {
 | 
						|
			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
 | 
						|
		}
 | 
						|
		if v, ok := (<-b).(int); !ok || v != 2 {
 | 
						|
			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
 | 
						|
		}
 | 
						|
		if v, ok := (<-b).(string); !ok || v != "3" {
 | 
						|
			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3")
 | 
						|
		}
 | 
						|
		if v, ok := (<-b).([]int); !ok || v[0] != 4 {
 | 
						|
			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4})
 | 
						|
		}
 | 
						|
		stopCh <- struct{}{}
 | 
						|
	}()
 | 
						|
 | 
						|
	timeout := time.After(2 * time.Second)
 | 
						|
	for i := 0; i < 3; i++ {
 | 
						|
		select {
 | 
						|
		case <-timeout:
 | 
						|
			t.Fatal("Test timeout after 2s")
 | 
						|
		case e := <-errCh:
 | 
						|
			t.Fatal(e)
 | 
						|
		case <-stopCh:
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Test the unsubscription of channel
 | 
						|
	common.Must(c.Unsubscribe(b))
 | 
						|
 | 
						|
	// Test the last subscriber will close channel with `UnsubscribeClosableChannel`
 | 
						|
	common.Must(stats.UnsubscribeClosableChannel(c, a))
 | 
						|
	if c.Running() {
 | 
						|
		t.Fatal("unexpected running channel after unsubscribing the last subscriber")
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestStatsChannelUnsubscribe(t *testing.T) {
 | 
						|
	c := NewChannel(&ChannelConfig{Blocking: true})
 | 
						|
	common.Must(c.Start())
 | 
						|
	defer c.Close()
 | 
						|
 | 
						|
	a, err := c.Subscribe()
 | 
						|
	common.Must(err)
 | 
						|
	defer c.Unsubscribe(a)
 | 
						|
 | 
						|
	b, err := c.Subscribe()
 | 
						|
	common.Must(err)
 | 
						|
 | 
						|
	pauseCh := make(chan struct{})
 | 
						|
	stopCh := make(chan struct{})
 | 
						|
	errCh := make(chan string)
 | 
						|
 | 
						|
	{
 | 
						|
		var aSet, bSet bool
 | 
						|
		for _, s := range c.Subscribers() {
 | 
						|
			if s == a {
 | 
						|
				aSet = true
 | 
						|
			}
 | 
						|
			if s == b {
 | 
						|
				bSet = true
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if !(aSet && bSet) {
 | 
						|
			t.Fatal("unexpected subscribers: ", c.Subscribers())
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	go func() { // Blocking publish
 | 
						|
		c.Publish(context.Background(), 1)
 | 
						|
		<-pauseCh // Wait for `b` goroutine to resume sending message
 | 
						|
		c.Publish(context.Background(), 2)
 | 
						|
	}()
 | 
						|
 | 
						|
	go func() {
 | 
						|
		if v, ok := (<-a).(int); !ok || v != 1 {
 | 
						|
			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
 | 
						|
		}
 | 
						|
		if v, ok := (<-a).(int); !ok || v != 2 {
 | 
						|
			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	go func() {
 | 
						|
		if v, ok := (<-b).(int); !ok || v != 1 {
 | 
						|
			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
 | 
						|
		}
 | 
						|
		// Unsubscribe `b` while publishing is paused
 | 
						|
		c.Unsubscribe(b)
 | 
						|
		{ // Test `b` is not in subscribers
 | 
						|
			var aSet, bSet bool
 | 
						|
			for _, s := range c.Subscribers() {
 | 
						|
				if s == a {
 | 
						|
					aSet = true
 | 
						|
				}
 | 
						|
				if s == b {
 | 
						|
					bSet = true
 | 
						|
				}
 | 
						|
			}
 | 
						|
			if !(aSet && !bSet) {
 | 
						|
				errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
 | 
						|
			}
 | 
						|
		}
 | 
						|
		// Resume publishing progress
 | 
						|
		close(pauseCh)
 | 
						|
		// Test `b` is neither closed nor able to receive any data
 | 
						|
		select {
 | 
						|
		case v, ok := <-b:
 | 
						|
			if ok {
 | 
						|
				errCh <- fmt.Sprint("unexpected data received: ", v)
 | 
						|
			} else {
 | 
						|
				errCh <- fmt.Sprint("unexpected closed channel: ", b)
 | 
						|
			}
 | 
						|
		default:
 | 
						|
		}
 | 
						|
		close(stopCh)
 | 
						|
	}()
 | 
						|
 | 
						|
	select {
 | 
						|
	case <-time.After(2 * time.Second):
 | 
						|
		t.Fatal("Test timeout after 2s")
 | 
						|
	case e := <-errCh:
 | 
						|
		t.Fatal(e)
 | 
						|
	case <-stopCh:
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestStatsChannelBlocking(t *testing.T) {
 | 
						|
	// Do not use buffer so as to create blocking scenario
 | 
						|
	c := NewChannel(&ChannelConfig{BufferSize: 0, Blocking: true})
 | 
						|
	common.Must(c.Start())
 | 
						|
	defer c.Close()
 | 
						|
 | 
						|
	a, err := c.Subscribe()
 | 
						|
	common.Must(err)
 | 
						|
	defer c.Unsubscribe(a)
 | 
						|
 | 
						|
	pauseCh := make(chan struct{})
 | 
						|
	stopCh := make(chan struct{})
 | 
						|
	errCh := make(chan string)
 | 
						|
 | 
						|
	ctx, cancel := context.WithCancel(context.Background())
 | 
						|
 | 
						|
	// Test blocking channel publishing
 | 
						|
	go func() {
 | 
						|
		// Dummy message with no subscriber receiving, will block broadcasting goroutine
 | 
						|
		c.Publish(context.Background(), nil)
 | 
						|
 | 
						|
		<-pauseCh
 | 
						|
 | 
						|
		// Publishing should be blocked here, for last message was not cleared and buffer was full
 | 
						|
		c.Publish(context.Background(), nil)
 | 
						|
 | 
						|
		pauseCh <- struct{}{}
 | 
						|
 | 
						|
		// Publishing should still be blocked here
 | 
						|
		c.Publish(ctx, nil)
 | 
						|
 | 
						|
		// Check publishing is done because context is canceled
 | 
						|
		select {
 | 
						|
		case <-ctx.Done():
 | 
						|
			if ctx.Err() != context.Canceled {
 | 
						|
				errCh <- fmt.Sprint("unexpected error: ", ctx.Err())
 | 
						|
			}
 | 
						|
		default:
 | 
						|
			errCh <- "unexpected non-blocked publishing"
 | 
						|
		}
 | 
						|
		close(stopCh)
 | 
						|
	}()
 | 
						|
 | 
						|
	go func() {
 | 
						|
		pauseCh <- struct{}{}
 | 
						|
 | 
						|
		select {
 | 
						|
		case <-pauseCh:
 | 
						|
			errCh <- "unexpected non-blocked publishing"
 | 
						|
		case <-time.After(100 * time.Millisecond):
 | 
						|
		}
 | 
						|
 | 
						|
		// Receive first published message
 | 
						|
		<-a
 | 
						|
 | 
						|
		select {
 | 
						|
		case <-pauseCh:
 | 
						|
		case <-time.After(100 * time.Millisecond):
 | 
						|
			errCh <- "unexpected blocking publishing"
 | 
						|
		}
 | 
						|
 | 
						|
		// Manually cancel the context to end publishing
 | 
						|
		cancel()
 | 
						|
	}()
 | 
						|
 | 
						|
	select {
 | 
						|
	case <-time.After(2 * time.Second):
 | 
						|
		t.Fatal("Test timeout after 2s")
 | 
						|
	case e := <-errCh:
 | 
						|
		t.Fatal(e)
 | 
						|
	case <-stopCh:
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestStatsChannelNonBlocking(t *testing.T) {
 | 
						|
	// Do not use buffer so as to create blocking scenario
 | 
						|
	c := NewChannel(&ChannelConfig{BufferSize: 0, Blocking: false})
 | 
						|
	common.Must(c.Start())
 | 
						|
	defer c.Close()
 | 
						|
 | 
						|
	a, err := c.Subscribe()
 | 
						|
	common.Must(err)
 | 
						|
	defer c.Unsubscribe(a)
 | 
						|
 | 
						|
	pauseCh := make(chan struct{})
 | 
						|
	stopCh := make(chan struct{})
 | 
						|
	errCh := make(chan string)
 | 
						|
 | 
						|
	ctx, cancel := context.WithCancel(context.Background())
 | 
						|
 | 
						|
	// Test blocking channel publishing
 | 
						|
	go func() {
 | 
						|
		c.Publish(context.Background(), nil)
 | 
						|
		c.Publish(context.Background(), nil)
 | 
						|
		pauseCh <- struct{}{}
 | 
						|
		<-pauseCh
 | 
						|
		c.Publish(ctx, nil)
 | 
						|
		c.Publish(ctx, nil)
 | 
						|
		// Check publishing is done because context is canceled
 | 
						|
		select {
 | 
						|
		case <-ctx.Done():
 | 
						|
			if ctx.Err() != context.Canceled {
 | 
						|
				errCh <- fmt.Sprint("unexpected error: ", ctx.Err())
 | 
						|
			}
 | 
						|
		case <-time.After(100 * time.Millisecond):
 | 
						|
			errCh <- "unexpected non-cancelled publishing"
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	go func() {
 | 
						|
		// Check publishing won't block even if there is no subscriber receiving message
 | 
						|
		select {
 | 
						|
		case <-pauseCh:
 | 
						|
		case <-time.After(100 * time.Millisecond):
 | 
						|
			errCh <- "unexpected blocking publishing"
 | 
						|
		}
 | 
						|
 | 
						|
		// Receive first and second published message
 | 
						|
		<-a
 | 
						|
		<-a
 | 
						|
 | 
						|
		pauseCh <- struct{}{}
 | 
						|
 | 
						|
		// Manually cancel the context to end publishing
 | 
						|
		cancel()
 | 
						|
 | 
						|
		// Check third and forth published message is cancelled and cannot receive
 | 
						|
		<-time.After(100 * time.Millisecond)
 | 
						|
		select {
 | 
						|
		case <-a:
 | 
						|
			errCh <- "unexpected non-cancelled publishing"
 | 
						|
		default:
 | 
						|
		}
 | 
						|
		select {
 | 
						|
		case <-a:
 | 
						|
			errCh <- "unexpected non-cancelled publishing"
 | 
						|
		default:
 | 
						|
		}
 | 
						|
		close(stopCh)
 | 
						|
	}()
 | 
						|
 | 
						|
	select {
 | 
						|
	case <-time.After(2 * time.Second):
 | 
						|
		t.Fatal("Test timeout after 2s")
 | 
						|
	case e := <-errCh:
 | 
						|
		t.Fatal(e)
 | 
						|
	case <-stopCh:
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestStatsChannelConcurrency(t *testing.T) {
 | 
						|
	// Do not use buffer so as to create blocking scenario
 | 
						|
	c := NewChannel(&ChannelConfig{BufferSize: 0, Blocking: true})
 | 
						|
	common.Must(c.Start())
 | 
						|
	defer c.Close()
 | 
						|
 | 
						|
	a, err := c.Subscribe()
 | 
						|
	common.Must(err)
 | 
						|
	defer c.Unsubscribe(a)
 | 
						|
 | 
						|
	b, err := c.Subscribe()
 | 
						|
	common.Must(err)
 | 
						|
	defer c.Unsubscribe(b)
 | 
						|
 | 
						|
	stopCh := make(chan struct{})
 | 
						|
	errCh := make(chan string)
 | 
						|
 | 
						|
	go func() { // Blocking publish
 | 
						|
		c.Publish(context.Background(), 1)
 | 
						|
		c.Publish(context.Background(), 2)
 | 
						|
	}()
 | 
						|
 | 
						|
	go func() {
 | 
						|
		if v, ok := (<-a).(int); !ok || v != 1 {
 | 
						|
			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
 | 
						|
		}
 | 
						|
		if v, ok := (<-a).(int); !ok || v != 2 {
 | 
						|
			errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	go func() {
 | 
						|
		// Block `b` for a time so as to ensure source channel is trying to send message to `b`.
 | 
						|
		<-time.After(25 * time.Millisecond)
 | 
						|
		// This causes concurrency scenario: unsubscribe `b` while trying to send message to it
 | 
						|
		c.Unsubscribe(b)
 | 
						|
		// Test `b` is not closed and can still receive data 1:
 | 
						|
		// Because unsubscribe won't affect the ongoing process of sending message.
 | 
						|
		select {
 | 
						|
		case v, ok := <-b:
 | 
						|
			if v1, ok1 := v.(int); !(ok && ok1 && v1 == 1) {
 | 
						|
				errCh <- fmt.Sprint("unexpected failure in receiving data: ", 1)
 | 
						|
			}
 | 
						|
		default:
 | 
						|
			errCh <- fmt.Sprint("unexpected block from receiving data: ", 1)
 | 
						|
		}
 | 
						|
		// Test `b` is not closed but cannot receive data 2:
 | 
						|
		// Because in a new round of messaging, `b` has been unsubscribed.
 | 
						|
		select {
 | 
						|
		case v, ok := <-b:
 | 
						|
			if ok {
 | 
						|
				errCh <- fmt.Sprint("unexpected receiving: ", v)
 | 
						|
			} else {
 | 
						|
				errCh <- "unexpected closing of channel"
 | 
						|
			}
 | 
						|
		default:
 | 
						|
		}
 | 
						|
		close(stopCh)
 | 
						|
	}()
 | 
						|
 | 
						|
	select {
 | 
						|
	case <-time.After(2 * time.Second):
 | 
						|
		t.Fatal("Test timeout after 2s")
 | 
						|
	case e := <-errCh:
 | 
						|
		t.Fatal(e)
 | 
						|
	case <-stopCh:
 | 
						|
	}
 | 
						|
}
 |