Xray-core/app/stats/channel.go

174 lines
4.2 KiB
Go

package stats
import (
"context"
"sync"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors"
)
// Channel is an implementation of stats.Channel.
type Channel struct {
channel chan channelMessage
subscribers []chan interface{}
// Synchronization components
access sync.RWMutex
closed chan struct{}
// Channel options
blocking bool // Set blocking state if channel buffer reaches limit
bufferSize int // Set to 0 as no buffering
subsLimit int // Set to 0 as no subscriber limit
}
// NewChannel creates an instance of Statistics Channel.
func NewChannel(config *ChannelConfig) *Channel {
return &Channel{
channel: make(chan channelMessage, config.BufferSize),
subsLimit: int(config.SubscriberLimit),
bufferSize: int(config.BufferSize),
blocking: config.Blocking,
}
}
// Subscribers implements stats.Channel.
func (c *Channel) Subscribers() []chan interface{} {
c.access.RLock()
defer c.access.RUnlock()
return c.subscribers
}
// Subscribe implements stats.Channel.
func (c *Channel) Subscribe() (chan interface{}, error) {
c.access.Lock()
defer c.access.Unlock()
if c.subsLimit > 0 && len(c.subscribers) >= c.subsLimit {
return nil, errors.New("Number of subscribers has reached limit")
}
subscriber := make(chan interface{}, c.bufferSize)
c.subscribers = append(c.subscribers, subscriber)
return subscriber, nil
}
// Unsubscribe implements stats.Channel.
func (c *Channel) Unsubscribe(subscriber chan interface{}) error {
c.access.Lock()
defer c.access.Unlock()
for i, s := range c.subscribers {
if s == subscriber {
// Copy to new memory block to prevent modifying original data
subscribers := make([]chan interface{}, len(c.subscribers)-1)
copy(subscribers[:i], c.subscribers[:i])
copy(subscribers[i:], c.subscribers[i+1:])
c.subscribers = subscribers
}
}
return nil
}
// Publish implements stats.Channel.
func (c *Channel) Publish(ctx context.Context, msg interface{}) {
select { // Early exit if channel closed
case <-c.closed:
return
default:
pub := channelMessage{context: ctx, message: msg}
if c.blocking {
pub.publish(c.channel)
} else {
pub.publishNonBlocking(c.channel)
}
}
}
// Running returns whether the channel is running.
func (c *Channel) Running() bool {
select {
case <-c.closed: // Channel closed
default: // Channel running or not initialized
if c.closed != nil { // Channel initialized
return true
}
}
return false
}
// Start implements common.Runnable.
func (c *Channel) Start() error {
c.access.Lock()
defer c.access.Unlock()
if !c.Running() {
c.closed = make(chan struct{}) // Reset close signal
go func() {
for {
select {
case pub := <-c.channel: // Published message received
for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retrievement
if c.blocking {
pub.broadcast(sub)
} else {
pub.broadcastNonBlocking(sub)
}
}
case <-c.closed: // Channel closed
for _, sub := range c.Subscribers() { // Remove all subscribers
common.Must(c.Unsubscribe(sub))
close(sub)
}
return
}
}
}()
}
return nil
}
// Close implements common.Closable.
func (c *Channel) Close() error {
c.access.Lock()
defer c.access.Unlock()
if c.Running() {
close(c.closed) // Send closed signal
}
return nil
}
// channelMessage is the published message with guaranteed delivery.
// message is discarded only when the context is early cancelled.
type channelMessage struct {
context context.Context
message interface{}
}
func (c channelMessage) publish(publisher chan channelMessage) {
select {
case publisher <- c:
case <-c.context.Done():
}
}
func (c channelMessage) publishNonBlocking(publisher chan channelMessage) {
select {
case publisher <- c:
default: // Create another goroutine to keep sending message
go c.publish(publisher)
}
}
func (c channelMessage) broadcast(subscriber chan interface{}) {
select {
case subscriber <- c.message:
case <-c.context.Done():
}
}
func (c channelMessage) broadcastNonBlocking(subscriber chan interface{}) {
select {
case subscriber <- c.message:
default: // Create another goroutine to keep sending message
go c.broadcast(subscriber)
}
}