mirror of https://github.com/k3s-io/k3s
attempt to fix #23822
parent
9fd05474c2
commit
a74573277f
|
@ -18,6 +18,7 @@ package queue
|
|||
|
||||
import (
|
||||
"container/heap"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -138,6 +139,29 @@ func (q *DelayQueue) Pop() interface{} {
|
|||
}, nil)
|
||||
}
|
||||
|
||||
func finishWaiting(cond *sync.Cond, waitFinished <-chan struct{}) {
|
||||
runtime.Gosched()
|
||||
select {
|
||||
// avoid creating a timer if we can help it...
|
||||
case <-waitFinished:
|
||||
return
|
||||
default:
|
||||
const spinTimeout = 100 * time.Millisecond
|
||||
t := time.NewTimer(spinTimeout)
|
||||
defer t.Stop()
|
||||
for {
|
||||
runtime.Gosched()
|
||||
cond.Broadcast()
|
||||
select {
|
||||
case <-waitFinished:
|
||||
return
|
||||
case <-t.C:
|
||||
t.Reset(spinTimeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// returns a non-nil value from the queue, or else nil if/when cancelled; if cancel
|
||||
// is nil then cancellation is disabled and this func must return a non-nil value.
|
||||
func (q *DelayQueue) pop(next func() *qitem, cancel <-chan struct{}) interface{} {
|
||||
|
@ -164,6 +188,7 @@ func (q *DelayQueue) pop(next func() *qitem, cancel <-chan struct{}) interface{}
|
|||
select {
|
||||
case <-cancel:
|
||||
item.readd(item)
|
||||
finishWaiting(&q.cond, ch)
|
||||
return nil
|
||||
case <-ch:
|
||||
// we may no longer have the earliest deadline, re-try
|
||||
|
@ -353,8 +378,7 @@ func (q *DelayFIFO) pop(cancel <-chan struct{}) interface{} {
|
|||
// we may not have the lock yet, so
|
||||
// broadcast to abort Wait, then
|
||||
// return after lock re-acquisition
|
||||
q.cond().Broadcast()
|
||||
<-signal
|
||||
finishWaiting(q.cond(), signal)
|
||||
return nil
|
||||
case <-signal:
|
||||
// we have the lock, re-check
|
||||
|
|
Loading…
Reference in New Issue