mirror of https://github.com/XTLS/Xray-core
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
405 lines
9.5 KiB
405 lines
9.5 KiB
package stats_test |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"testing" |
|
"time" |
|
|
|
. "github.com/xtls/xray-core/app/stats" |
|
"github.com/xtls/xray-core/common" |
|
"github.com/xtls/xray-core/features/stats" |
|
) |
|
|
|
func TestStatsChannel(t *testing.T) { |
|
// At most 2 subscribers could be registered |
|
c := NewChannel(&ChannelConfig{SubscriberLimit: 2, Blocking: true}) |
|
|
|
a, err := stats.SubscribeRunnableChannel(c) |
|
common.Must(err) |
|
if !c.Running() { |
|
t.Fatal("unexpected failure in running channel after first subscription") |
|
} |
|
|
|
b, err := c.Subscribe() |
|
common.Must(err) |
|
|
|
// Test that third subscriber is forbidden |
|
_, err = c.Subscribe() |
|
if err == nil { |
|
t.Fatal("unexpected successful subscription") |
|
} |
|
t.Log("expected error: ", err) |
|
|
|
stopCh := make(chan struct{}) |
|
errCh := make(chan string) |
|
|
|
go func() { |
|
c.Publish(context.Background(), 1) |
|
c.Publish(context.Background(), 2) |
|
c.Publish(context.Background(), "3") |
|
c.Publish(context.Background(), []int{4}) |
|
stopCh <- struct{}{} |
|
}() |
|
|
|
go func() { |
|
if v, ok := (<-a).(int); !ok || v != 1 { |
|
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) |
|
} |
|
if v, ok := (<-a).(int); !ok || v != 2 { |
|
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2) |
|
} |
|
if v, ok := (<-a).(string); !ok || v != "3" { |
|
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3") |
|
} |
|
if v, ok := (<-a).([]int); !ok || v[0] != 4 { |
|
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4}) |
|
} |
|
stopCh <- struct{}{} |
|
}() |
|
|
|
go func() { |
|
if v, ok := (<-b).(int); !ok || v != 1 { |
|
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) |
|
} |
|
if v, ok := (<-b).(int); !ok || v != 2 { |
|
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2) |
|
} |
|
if v, ok := (<-b).(string); !ok || v != "3" { |
|
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3") |
|
} |
|
if v, ok := (<-b).([]int); !ok || v[0] != 4 { |
|
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4}) |
|
} |
|
stopCh <- struct{}{} |
|
}() |
|
|
|
timeout := time.After(2 * time.Second) |
|
for i := 0; i < 3; i++ { |
|
select { |
|
case <-timeout: |
|
t.Fatal("Test timeout after 2s") |
|
case e := <-errCh: |
|
t.Fatal(e) |
|
case <-stopCh: |
|
} |
|
} |
|
|
|
// Test the unsubscription of channel |
|
common.Must(c.Unsubscribe(b)) |
|
|
|
// Test the last subscriber will close channel with `UnsubscribeClosableChannel` |
|
common.Must(stats.UnsubscribeClosableChannel(c, a)) |
|
if c.Running() { |
|
t.Fatal("unexpected running channel after unsubscribing the last subscriber") |
|
} |
|
} |
|
|
|
func TestStatsChannelUnsubscribe(t *testing.T) { |
|
c := NewChannel(&ChannelConfig{Blocking: true}) |
|
common.Must(c.Start()) |
|
defer c.Close() |
|
|
|
a, err := c.Subscribe() |
|
common.Must(err) |
|
defer c.Unsubscribe(a) |
|
|
|
b, err := c.Subscribe() |
|
common.Must(err) |
|
|
|
pauseCh := make(chan struct{}) |
|
stopCh := make(chan struct{}) |
|
errCh := make(chan string) |
|
|
|
{ |
|
var aSet, bSet bool |
|
for _, s := range c.Subscribers() { |
|
if s == a { |
|
aSet = true |
|
} |
|
if s == b { |
|
bSet = true |
|
} |
|
} |
|
if !(aSet && bSet) { |
|
t.Fatal("unexpected subscribers: ", c.Subscribers()) |
|
} |
|
} |
|
|
|
go func() { // Blocking publish |
|
c.Publish(context.Background(), 1) |
|
<-pauseCh // Wait for `b` goroutine to resume sending message |
|
c.Publish(context.Background(), 2) |
|
}() |
|
|
|
go func() { |
|
if v, ok := (<-a).(int); !ok || v != 1 { |
|
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) |
|
} |
|
if v, ok := (<-a).(int); !ok || v != 2 { |
|
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2) |
|
} |
|
}() |
|
|
|
go func() { |
|
if v, ok := (<-b).(int); !ok || v != 1 { |
|
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) |
|
} |
|
// Unsubscribe `b` while publishing is paused |
|
c.Unsubscribe(b) |
|
{ // Test `b` is not in subscribers |
|
var aSet, bSet bool |
|
for _, s := range c.Subscribers() { |
|
if s == a { |
|
aSet = true |
|
} |
|
if s == b { |
|
bSet = true |
|
} |
|
} |
|
if !(aSet && !bSet) { |
|
errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers()) |
|
} |
|
} |
|
// Resume publishing progress |
|
close(pauseCh) |
|
// Test `b` is neither closed nor able to receive any data |
|
select { |
|
case v, ok := <-b: |
|
if ok { |
|
errCh <- fmt.Sprint("unexpected data received: ", v) |
|
} else { |
|
errCh <- fmt.Sprint("unexpected closed channel: ", b) |
|
} |
|
default: |
|
} |
|
close(stopCh) |
|
}() |
|
|
|
select { |
|
case <-time.After(2 * time.Second): |
|
t.Fatal("Test timeout after 2s") |
|
case e := <-errCh: |
|
t.Fatal(e) |
|
case <-stopCh: |
|
} |
|
} |
|
|
|
func TestStatsChannelBlocking(t *testing.T) { |
|
// Do not use buffer so as to create blocking scenario |
|
c := NewChannel(&ChannelConfig{BufferSize: 0, Blocking: true}) |
|
common.Must(c.Start()) |
|
defer c.Close() |
|
|
|
a, err := c.Subscribe() |
|
common.Must(err) |
|
defer c.Unsubscribe(a) |
|
|
|
pauseCh := make(chan struct{}) |
|
stopCh := make(chan struct{}) |
|
errCh := make(chan string) |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
|
|
// Test blocking channel publishing |
|
go func() { |
|
// Dummy message with no subscriber receiving, will block broadcasting goroutine |
|
c.Publish(context.Background(), nil) |
|
|
|
<-pauseCh |
|
|
|
// Publishing should be blocked here, for last message was not cleared and buffer was full |
|
c.Publish(context.Background(), nil) |
|
|
|
pauseCh <- struct{}{} |
|
|
|
// Publishing should still be blocked here |
|
c.Publish(ctx, nil) |
|
|
|
// Check publishing is done because context is canceled |
|
select { |
|
case <-ctx.Done(): |
|
if ctx.Err() != context.Canceled { |
|
errCh <- fmt.Sprint("unexpected error: ", ctx.Err()) |
|
} |
|
default: |
|
errCh <- "unexpected non-blocked publishing" |
|
} |
|
close(stopCh) |
|
}() |
|
|
|
go func() { |
|
pauseCh <- struct{}{} |
|
|
|
select { |
|
case <-pauseCh: |
|
errCh <- "unexpected non-blocked publishing" |
|
case <-time.After(100 * time.Millisecond): |
|
} |
|
|
|
// Receive first published message |
|
<-a |
|
|
|
select { |
|
case <-pauseCh: |
|
case <-time.After(100 * time.Millisecond): |
|
errCh <- "unexpected blocking publishing" |
|
} |
|
|
|
// Manually cancel the context to end publishing |
|
cancel() |
|
}() |
|
|
|
select { |
|
case <-time.After(2 * time.Second): |
|
t.Fatal("Test timeout after 2s") |
|
case e := <-errCh: |
|
t.Fatal(e) |
|
case <-stopCh: |
|
} |
|
} |
|
|
|
func TestStatsChannelNonBlocking(t *testing.T) { |
|
// Do not use buffer so as to create blocking scenario |
|
c := NewChannel(&ChannelConfig{BufferSize: 0, Blocking: false}) |
|
common.Must(c.Start()) |
|
defer c.Close() |
|
|
|
a, err := c.Subscribe() |
|
common.Must(err) |
|
defer c.Unsubscribe(a) |
|
|
|
pauseCh := make(chan struct{}) |
|
stopCh := make(chan struct{}) |
|
errCh := make(chan string) |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
|
|
// Test blocking channel publishing |
|
go func() { |
|
c.Publish(context.Background(), nil) |
|
c.Publish(context.Background(), nil) |
|
pauseCh <- struct{}{} |
|
<-pauseCh |
|
c.Publish(ctx, nil) |
|
c.Publish(ctx, nil) |
|
// Check publishing is done because context is canceled |
|
select { |
|
case <-ctx.Done(): |
|
if ctx.Err() != context.Canceled { |
|
errCh <- fmt.Sprint("unexpected error: ", ctx.Err()) |
|
} |
|
case <-time.After(100 * time.Millisecond): |
|
errCh <- "unexpected non-cancelled publishing" |
|
} |
|
}() |
|
|
|
go func() { |
|
// Check publishing won't block even if there is no subscriber receiving message |
|
select { |
|
case <-pauseCh: |
|
case <-time.After(100 * time.Millisecond): |
|
errCh <- "unexpected blocking publishing" |
|
} |
|
|
|
// Receive first and second published message |
|
<-a |
|
<-a |
|
|
|
pauseCh <- struct{}{} |
|
|
|
// Manually cancel the context to end publishing |
|
cancel() |
|
|
|
// Check third and forth published message is cancelled and cannot receive |
|
<-time.After(100 * time.Millisecond) |
|
select { |
|
case <-a: |
|
errCh <- "unexpected non-cancelled publishing" |
|
default: |
|
} |
|
select { |
|
case <-a: |
|
errCh <- "unexpected non-cancelled publishing" |
|
default: |
|
} |
|
close(stopCh) |
|
}() |
|
|
|
select { |
|
case <-time.After(2 * time.Second): |
|
t.Fatal("Test timeout after 2s") |
|
case e := <-errCh: |
|
t.Fatal(e) |
|
case <-stopCh: |
|
} |
|
} |
|
|
|
func TestStatsChannelConcurrency(t *testing.T) { |
|
// Do not use buffer so as to create blocking scenario |
|
c := NewChannel(&ChannelConfig{BufferSize: 0, Blocking: true}) |
|
common.Must(c.Start()) |
|
defer c.Close() |
|
|
|
a, err := c.Subscribe() |
|
common.Must(err) |
|
defer c.Unsubscribe(a) |
|
|
|
b, err := c.Subscribe() |
|
common.Must(err) |
|
defer c.Unsubscribe(b) |
|
|
|
stopCh := make(chan struct{}) |
|
errCh := make(chan string) |
|
|
|
go func() { // Blocking publish |
|
c.Publish(context.Background(), 1) |
|
c.Publish(context.Background(), 2) |
|
}() |
|
|
|
go func() { |
|
if v, ok := (<-a).(int); !ok || v != 1 { |
|
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) |
|
} |
|
if v, ok := (<-a).(int); !ok || v != 2 { |
|
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2) |
|
} |
|
}() |
|
|
|
go func() { |
|
// Block `b` for a time so as to ensure source channel is trying to send message to `b`. |
|
<-time.After(25 * time.Millisecond) |
|
// This causes concurrency scenario: unsubscribe `b` while trying to send message to it |
|
c.Unsubscribe(b) |
|
// Test `b` is not closed and can still receive data 1: |
|
// Because unsubscribe won't affect the ongoing process of sending message. |
|
select { |
|
case v, ok := <-b: |
|
if v1, ok1 := v.(int); !(ok && ok1 && v1 == 1) { |
|
errCh <- fmt.Sprint("unexpected failure in receiving data: ", 1) |
|
} |
|
default: |
|
errCh <- fmt.Sprint("unexpected block from receiving data: ", 1) |
|
} |
|
// Test `b` is not closed but cannot receive data 2: |
|
// Because in a new round of messaging, `b` has been unsubscribed. |
|
select { |
|
case v, ok := <-b: |
|
if ok { |
|
errCh <- fmt.Sprint("unexpected receiving: ", v) |
|
} else { |
|
errCh <- "unexpected closing of channel" |
|
} |
|
default: |
|
} |
|
close(stopCh) |
|
}() |
|
|
|
select { |
|
case <-time.After(2 * time.Second): |
|
t.Fatal("Test timeout after 2s") |
|
case e := <-errCh: |
|
t.Fatal(e) |
|
case <-stopCh: |
|
} |
|
}
|
|
|