Browse Source

Synchronization in timed queue

pull/40/head
V2Ray 9 years ago
parent
commit
054f56820a
  1. 19
      common/collect/timed_queue.go

19
common/collect/timed_queue.go

@ -42,7 +42,7 @@ func (queue *timedQueueImpl) Pop() interface{} {
type TimedQueue struct { type TimedQueue struct {
queue timedQueueImpl queue timedQueueImpl
access sync.Mutex access sync.RWMutex
removed chan interface{} removed chan interface{}
} }
@ -50,7 +50,7 @@ func NewTimedQueue(updateInterval int) *TimedQueue {
queue := &TimedQueue{ queue := &TimedQueue{
queue: make([]*timedQueueEntry, 0, 256), queue: make([]*timedQueueEntry, 0, 256),
removed: make(chan interface{}, 16), removed: make(chan interface{}, 16),
access: sync.Mutex{}, access: sync.RWMutex{},
} }
go queue.cleanup(time.Tick(time.Duration(updateInterval) * time.Second)) go queue.cleanup(time.Tick(time.Duration(updateInterval) * time.Second))
return queue return queue
@ -72,18 +72,23 @@ func (queue *TimedQueue) RemovedEntries() <-chan interface{} {
func (queue *TimedQueue) cleanup(tick <-chan time.Time) { func (queue *TimedQueue) cleanup(tick <-chan time.Time) {
for { for {
now := <-tick now := <-tick
if queue.queue.Len() == 0 { queue.access.RLock()
queueLen := queue.queue.Len()
queue.access.RUnlock()
if queueLen == 0 {
continue continue
} }
nowSec := now.UTC().Unix() nowSec := now.UTC().Unix()
entry := queue.queue[0] queue.access.RLock()
if entry.timeSec > nowSec { firstEntryTime := queue.queue[0].timeSec
queue.access.RUnlock()
if firstEntryTime > nowSec {
continue continue
} }
queue.access.Lock() queue.access.Lock()
entry = heap.Pop(&queue.queue).(*timedQueueEntry) firstEntryValue := heap.Pop(&queue.queue).(*timedQueueEntry).value
queue.access.Unlock() queue.access.Unlock()
queue.removed <- entry.value queue.removed <- firstEntryValue
} }
} }

Loading…
Cancel
Save