mirror of https://github.com/k3s-io/k3s
Merge pull request #68084 from bowei/backoff-node-ipam
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions here: https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md. Make CIDR allocation retry backoff exponentially This also sets to the retry time to be less aggressive fixes https://github.com/kubernetes/kubernetes/issues/67348 ```release-note NONE ```pull/8/head
commit
bd2370d2ae
|
@ -71,7 +71,10 @@ const (
|
|||
cidrUpdateRetries = 3
|
||||
|
||||
// updateRetryTimeout is the time to wait before requeing a failed node for retry
|
||||
updateRetryTimeout = 100 * time.Millisecond
|
||||
updateRetryTimeout = 250 * time.Millisecond
|
||||
|
||||
// maxUpdateRetryTimeout is the maximum amount of time between timeouts.
|
||||
maxUpdateRetryTimeout = 5 * time.Second
|
||||
|
||||
// updateMaxRetries is the max retries for a failed node
|
||||
updateMaxRetries = 10
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
|
@ -33,7 +34,6 @@ import (
|
|||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
|
@ -155,16 +155,21 @@ func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
|
|||
glog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed")
|
||||
return
|
||||
}
|
||||
if err := ca.updateCIDRAllocation(workItem); err != nil {
|
||||
if ca.canRetry(workItem) {
|
||||
time.AfterFunc(updateRetryTimeout, func() {
|
||||
if err := ca.updateCIDRAllocation(workItem); err == nil {
|
||||
glog.V(3).Infof("Updated CIDR for %q", workItem)
|
||||
ca.removeNodeFromProcessing(workItem)
|
||||
} else {
|
||||
glog.Errorf("Error updating CIDR for %q: %v", workItem, err)
|
||||
if canRetry, timeout := ca.retryParams(workItem); canRetry {
|
||||
glog.V(2).Infof("Retrying update for %q after %v", workItem, timeout)
|
||||
time.AfterFunc(timeout, func() {
|
||||
// Requeue the failed node for update again.
|
||||
ca.nodeUpdateChannel <- workItem
|
||||
})
|
||||
continue
|
||||
}
|
||||
glog.Errorf("Exceeded retry count for %q, dropping from queue", workItem)
|
||||
}
|
||||
ca.removeNodeFromProcessing(workItem)
|
||||
case <-stopChan:
|
||||
return
|
||||
}
|
||||
|
@ -181,15 +186,34 @@ func (ca *cloudCIDRAllocator) insertNodeToProcessing(nodeName string) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (ca *cloudCIDRAllocator) canRetry(nodeName string) bool {
|
||||
func (ca *cloudCIDRAllocator) retryParams(nodeName string) (bool, time.Duration) {
|
||||
ca.lock.Lock()
|
||||
defer ca.lock.Unlock()
|
||||
count := ca.nodesInProcessing[nodeName].retries + 1
|
||||
|
||||
entry, ok := ca.nodesInProcessing[nodeName]
|
||||
if !ok {
|
||||
glog.Errorf("Cannot get retryParams for %q as entry does not exist", nodeName)
|
||||
return false, 0
|
||||
}
|
||||
|
||||
count := entry.retries + 1
|
||||
if count > updateMaxRetries {
|
||||
return false
|
||||
return false, 0
|
||||
}
|
||||
ca.nodesInProcessing[nodeName].retries = count
|
||||
return true
|
||||
|
||||
return true, nodeUpdateRetryTimeout(count)
|
||||
}
|
||||
|
||||
func nodeUpdateRetryTimeout(count int) time.Duration {
|
||||
timeout := updateRetryTimeout
|
||||
for i := 0; i < count && timeout < maxUpdateRetryTimeout; i++ {
|
||||
timeout *= 2
|
||||
}
|
||||
if timeout > maxUpdateRetryTimeout {
|
||||
return maxUpdateRetryTimeout
|
||||
}
|
||||
return timeout
|
||||
}
|
||||
|
||||
func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) {
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package ipam
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -57,3 +58,22 @@ func TestBoundedRetries(t *testing.T) {
|
|||
// wait for node to finish processing (should terminate and not time out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeUpdateRetryTimeout(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
count int
|
||||
want time.Duration
|
||||
}{
|
||||
{count: 0, want: 250 * time.Millisecond},
|
||||
{count: 1, want: 500 * time.Millisecond},
|
||||
{count: 2, want: 1000 * time.Millisecond},
|
||||
{count: 3, want: 2000 * time.Millisecond},
|
||||
{count: 50, want: 5000 * time.Millisecond},
|
||||
} {
|
||||
t.Run(fmt.Sprintf("count %d", tc.count), func(t *testing.T) {
|
||||
if got := nodeUpdateRetryTimeout(tc.count); got != tc.want {
|
||||
t.Errorf("nodeUpdateRetryTimeout(tc.count) = %v; want %v", got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue