From a74573277f81565b3aeff8c5d4bfe2a3110c1ccd Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Thu, 28 Apr 2016 07:24:06 +0000 Subject: [PATCH] attempt to fix #23822 --- contrib/mesos/pkg/queue/delay.go | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/contrib/mesos/pkg/queue/delay.go b/contrib/mesos/pkg/queue/delay.go index 327ba10edb..3609dd1f91 100644 --- a/contrib/mesos/pkg/queue/delay.go +++ b/contrib/mesos/pkg/queue/delay.go @@ -18,6 +18,7 @@ package queue import ( "container/heap" + "runtime" "sync" "time" @@ -138,6 +139,29 @@ func (q *DelayQueue) Pop() interface{} { }, nil) } +func finishWaiting(cond *sync.Cond, waitFinished <-chan struct{}) { + runtime.Gosched() + select { + // avoid creating a timer if we can help it... + case <-waitFinished: + return + default: + const spinTimeout = 100 * time.Millisecond + t := time.NewTimer(spinTimeout) + defer t.Stop() + for { + runtime.Gosched() + cond.Broadcast() + select { + case <-waitFinished: + return + case <-t.C: + t.Reset(spinTimeout) + } + } + } +} + // returns a non-nil value from the queue, or else nil if/when cancelled; if cancel // is nil then cancellation is disabled and this func must return a non-nil value. func (q *DelayQueue) pop(next func() *qitem, cancel <-chan struct{}) interface{} { @@ -164,6 +188,7 @@ func (q *DelayQueue) pop(next func() *qitem, cancel <-chan struct{}) interface{} select { case <-cancel: item.readd(item) + finishWaiting(&q.cond, ch) return nil case <-ch: // we may no longer have the earliest deadline, re-try @@ -353,8 +378,7 @@ func (q *DelayFIFO) pop(cancel <-chan struct{}) interface{} { // we may not have the lock yet, so // broadcast to abort Wait, then // return after lock re-acquisition - q.cond().Broadcast() - <-signal + finishWaiting(q.cond(), signal) return nil case <-signal: // we have the lock, re-check