mirror of https://github.com/XTLS/Xray-core
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
172 lines
4.1 KiB
172 lines
4.1 KiB
package stats |
|
|
|
import ( |
|
"context" |
|
"sync" |
|
|
|
"github.com/xtls/xray-core/common" |
|
) |
|
|
|
// Channel is an implementation of stats.Channel. |
|
type Channel struct { |
|
channel chan channelMessage |
|
subscribers []chan interface{} |
|
|
|
// Synchronization components |
|
access sync.RWMutex |
|
closed chan struct{} |
|
|
|
// Channel options |
|
blocking bool // Set blocking state if channel buffer reaches limit |
|
bufferSize int // Set to 0 as no buffering |
|
subsLimit int // Set to 0 as no subscriber limit |
|
} |
|
|
|
// NewChannel creates an instance of Statistics Channel. |
|
func NewChannel(config *ChannelConfig) *Channel { |
|
return &Channel{ |
|
channel: make(chan channelMessage, config.BufferSize), |
|
subsLimit: int(config.SubscriberLimit), |
|
bufferSize: int(config.BufferSize), |
|
blocking: config.Blocking, |
|
} |
|
} |
|
|
|
// Subscribers implements stats.Channel. |
|
func (c *Channel) Subscribers() []chan interface{} { |
|
c.access.RLock() |
|
defer c.access.RUnlock() |
|
return c.subscribers |
|
} |
|
|
|
// Subscribe implements stats.Channel. |
|
func (c *Channel) Subscribe() (chan interface{}, error) { |
|
c.access.Lock() |
|
defer c.access.Unlock() |
|
if c.subsLimit > 0 && len(c.subscribers) >= c.subsLimit { |
|
return nil, newError("Number of subscribers has reached limit") |
|
} |
|
subscriber := make(chan interface{}, c.bufferSize) |
|
c.subscribers = append(c.subscribers, subscriber) |
|
return subscriber, nil |
|
} |
|
|
|
// Unsubscribe implements stats.Channel. |
|
func (c *Channel) Unsubscribe(subscriber chan interface{}) error { |
|
c.access.Lock() |
|
defer c.access.Unlock() |
|
for i, s := range c.subscribers { |
|
if s == subscriber { |
|
// Copy to new memory block to prevent modifying original data |
|
subscribers := make([]chan interface{}, len(c.subscribers)-1) |
|
copy(subscribers[:i], c.subscribers[:i]) |
|
copy(subscribers[i:], c.subscribers[i+1:]) |
|
c.subscribers = subscribers |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
// Publish implements stats.Channel. |
|
func (c *Channel) Publish(ctx context.Context, msg interface{}) { |
|
select { // Early exit if channel closed |
|
case <-c.closed: |
|
return |
|
default: |
|
pub := channelMessage{context: ctx, message: msg} |
|
if c.blocking { |
|
pub.publish(c.channel) |
|
} else { |
|
pub.publishNonBlocking(c.channel) |
|
} |
|
} |
|
} |
|
|
|
// Running returns whether the channel is running. |
|
func (c *Channel) Running() bool { |
|
select { |
|
case <-c.closed: // Channel closed |
|
default: // Channel running or not initialized |
|
if c.closed != nil { // Channel initialized |
|
return true |
|
} |
|
} |
|
return false |
|
} |
|
|
|
// Start implements common.Runnable. |
|
func (c *Channel) Start() error { |
|
c.access.Lock() |
|
defer c.access.Unlock() |
|
if !c.Running() { |
|
c.closed = make(chan struct{}) // Reset close signal |
|
go func() { |
|
for { |
|
select { |
|
case pub := <-c.channel: // Published message received |
|
for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retrievement |
|
if c.blocking { |
|
pub.broadcast(sub) |
|
} else { |
|
pub.broadcastNonBlocking(sub) |
|
} |
|
} |
|
case <-c.closed: // Channel closed |
|
for _, sub := range c.Subscribers() { // Remove all subscribers |
|
common.Must(c.Unsubscribe(sub)) |
|
close(sub) |
|
} |
|
return |
|
} |
|
} |
|
}() |
|
} |
|
return nil |
|
} |
|
|
|
// Close implements common.Closable. |
|
func (c *Channel) Close() error { |
|
c.access.Lock() |
|
defer c.access.Unlock() |
|
if c.Running() { |
|
close(c.closed) // Send closed signal |
|
} |
|
return nil |
|
} |
|
|
|
// channelMessage is the published message with guaranteed delivery. |
|
// message is discarded only when the context is early cancelled. |
|
type channelMessage struct { |
|
context context.Context |
|
message interface{} |
|
} |
|
|
|
func (c channelMessage) publish(publisher chan channelMessage) { |
|
select { |
|
case publisher <- c: |
|
case <-c.context.Done(): |
|
} |
|
} |
|
|
|
func (c channelMessage) publishNonBlocking(publisher chan channelMessage) { |
|
select { |
|
case publisher <- c: |
|
default: // Create another goroutine to keep sending message |
|
go c.publish(publisher) |
|
} |
|
} |
|
|
|
func (c channelMessage) broadcast(subscriber chan interface{}) { |
|
select { |
|
case subscriber <- c.message: |
|
case <-c.context.Done(): |
|
} |
|
} |
|
|
|
func (c channelMessage) broadcastNonBlocking(subscriber chan interface{}) { |
|
select { |
|
case subscriber <- c.message: |
|
default: // Create another goroutine to keep sending message |
|
go c.broadcast(subscriber) |
|
} |
|
}
|
|
|