From 054f56820a2d8a1eb42ade25b6c1163034c09ad0 Mon Sep 17 00:00:00 2001 From: V2Ray Date: Tue, 6 Oct 2015 23:29:05 +0200 Subject: [PATCH] Synchronization in timed queue --- common/collect/timed_queue.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/common/collect/timed_queue.go b/common/collect/timed_queue.go index 4505db47..41ddc87d 100644 --- a/common/collect/timed_queue.go +++ b/common/collect/timed_queue.go @@ -42,7 +42,7 @@ func (queue *timedQueueImpl) Pop() interface{} { type TimedQueue struct { queue timedQueueImpl - access sync.Mutex + access sync.RWMutex removed chan interface{} } @@ -50,7 +50,7 @@ func NewTimedQueue(updateInterval int) *TimedQueue { queue := &TimedQueue{ queue: make([]*timedQueueEntry, 0, 256), removed: make(chan interface{}, 16), - access: sync.Mutex{}, + access: sync.RWMutex{}, } go queue.cleanup(time.Tick(time.Duration(updateInterval) * time.Second)) return queue @@ -72,18 +72,23 @@ func (queue *TimedQueue) RemovedEntries() <-chan interface{} { func (queue *TimedQueue) cleanup(tick <-chan time.Time) { for { now := <-tick - if queue.queue.Len() == 0 { + queue.access.RLock() + queueLen := queue.queue.Len() + queue.access.RUnlock() + if queueLen == 0 { continue } nowSec := now.UTC().Unix() - entry := queue.queue[0] - if entry.timeSec > nowSec { + queue.access.RLock() + firstEntryTime := queue.queue[0].timeSec + queue.access.RUnlock() + if firstEntryTime > nowSec { continue } queue.access.Lock() - entry = heap.Pop(&queue.queue).(*timedQueueEntry) + firstEntryValue := heap.Pop(&queue.queue).(*timedQueueEntry).value queue.access.Unlock() - queue.removed <- entry.value + queue.removed <- firstEntryValue } }