diff --git a/pkg/controller/nodeipam/ipam/cidr_allocator.go b/pkg/controller/nodeipam/ipam/cidr_allocator.go index 768d221f9f..426abf68cd 100644 --- a/pkg/controller/nodeipam/ipam/cidr_allocator.go +++ b/pkg/controller/nodeipam/ipam/cidr_allocator.go @@ -71,7 +71,10 @@ const ( cidrUpdateRetries = 3 // updateRetryTimeout is the time to wait before requeing a failed node for retry - updateRetryTimeout = 100 * time.Millisecond + updateRetryTimeout = 250 * time.Millisecond + + // maxUpdateRetryTimeout is the maximum amount of time between timeouts. + maxUpdateRetryTimeout = 5 * time.Second // updateMaxRetries is the max retries for a failed node updateMaxRetries = 10 diff --git a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go index 087e04a392..494d47d79f 100644 --- a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go +++ b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go @@ -24,6 +24,7 @@ import ( "github.com/golang/glog" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -33,7 +34,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/api/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -155,16 +155,21 @@ func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) { glog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed") return } - if err := ca.updateCIDRAllocation(workItem); err != nil { - if ca.canRetry(workItem) { - time.AfterFunc(updateRetryTimeout, func() { + if err := ca.updateCIDRAllocation(workItem); err == nil { + glog.V(3).Infof("Updated CIDR for %q", workItem) + ca.removeNodeFromProcessing(workItem) + } else { + glog.Errorf("Error updating CIDR for %q: %v", workItem, err) + if canRetry, timeout := ca.retryParams(workItem); canRetry { + glog.V(2).Infof("Retrying update for %q after %v", workItem, timeout) + time.AfterFunc(timeout, func() { // Requeue the failed node for update again. ca.nodeUpdateChannel <- workItem }) continue } + glog.Errorf("Exceeded retry count for %q, dropping from queue", workItem) } - ca.removeNodeFromProcessing(workItem) case <-stopChan: return } @@ -181,15 +186,34 @@ func (ca *cloudCIDRAllocator) insertNodeToProcessing(nodeName string) bool { return true } -func (ca *cloudCIDRAllocator) canRetry(nodeName string) bool { +func (ca *cloudCIDRAllocator) retryParams(nodeName string) (bool, time.Duration) { ca.lock.Lock() defer ca.lock.Unlock() - count := ca.nodesInProcessing[nodeName].retries + 1 + + entry, ok := ca.nodesInProcessing[nodeName] + if !ok { + glog.Errorf("Cannot get retryParams for %q as entry does not exist", nodeName) + return false, 0 + } + + count := entry.retries + 1 if count > updateMaxRetries { - return false + return false, 0 } ca.nodesInProcessing[nodeName].retries = count - return true + + return true, nodeUpdateRetryTimeout(count) +} + +func nodeUpdateRetryTimeout(count int) time.Duration { + timeout := updateRetryTimeout + for i := 0; i < count && timeout < maxUpdateRetryTimeout; i++ { + timeout *= 2 + } + if timeout > maxUpdateRetryTimeout { + return maxUpdateRetryTimeout + } + return timeout } func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) { diff --git a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go index cf64b0fc31..89714c3d20 100644 --- a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go +++ b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go @@ -17,6 +17,7 @@ limitations under the License. package ipam import ( + "fmt" "testing" "time" @@ -57,3 +58,22 @@ func TestBoundedRetries(t *testing.T) { // wait for node to finish processing (should terminate and not time out) } } + +func TestNodeUpdateRetryTimeout(t *testing.T) { + for _, tc := range []struct { + count int + want time.Duration + }{ + {count: 0, want: 250 * time.Millisecond}, + {count: 1, want: 500 * time.Millisecond}, + {count: 2, want: 1000 * time.Millisecond}, + {count: 3, want: 2000 * time.Millisecond}, + {count: 50, want: 5000 * time.Millisecond}, + } { + t.Run(fmt.Sprintf("count %d", tc.count), func(t *testing.T) { + if got := nodeUpdateRetryTimeout(tc.count); got != tc.want { + t.Errorf("nodeUpdateRetryTimeout(tc.count) = %v; want %v", got, tc.want) + } + }) + } +}