diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index f7aaecd862..999351c50b 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -238,8 +238,9 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err) } nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), - s.PodEvictionTimeout.Duration, s.DeletingPodsQps, int(s.DeletingPodsBurst), - s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) + s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration, + s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, + int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod.Duration) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) diff --git a/cmd/kube-controller-manager/app/options/options.go b/cmd/kube-controller-manager/app/options/options.go index efba4340ef..e6ce2caeac 100644 --- a/cmd/kube-controller-manager/app/options/options.go +++ b/cmd/kube-controller-manager/app/options/options.go @@ -136,7 +136,8 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.DeploymentControllerSyncPeriod.Duration, "deployment-controller-sync-period", s.DeploymentControllerSyncPeriod.Duration, "Period for syncing the deployments.") fs.DurationVar(&s.PodEvictionTimeout.Duration, "pod-eviction-timeout", s.PodEvictionTimeout.Duration, "The grace period for deleting pods on failed nodes.") fs.Float32Var(&s.DeletingPodsQps, "deleting-pods-qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.") - fs.Int32Var(&s.DeletingPodsBurst, "deleting-pods-burst", 1, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.") + fs.Int32Var(&s.DeletingPodsBurst, "deleting-pods-burst", 0, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.") + fs.MarkDeprecated("deleting-pods-burst", "This flag is currently no-op and will be deleted.") fs.Int32Var(&s.RegisterRetryCount, "register-retry-count", s.RegisterRetryCount, ""+ "The number of retries for initial node registration. Retry interval equals node-sync-period.") fs.MarkDeprecated("register-retry-count", "This flag is currently no-op and will be deleted.") diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 332453c7c9..44076ace6d 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -154,7 +154,7 @@ func (s *CMServer) Run(_ []string) error { _, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR) _, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR) nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), - s.PodEvictionTimeout.Duration, s.DeletingPodsQps, int(s.DeletingPodsBurst), + s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod.Duration) diff --git a/pkg/apis/componentconfig/types.go b/pkg/apis/componentconfig/types.go index ebbc79da4f..66c675070c 100644 --- a/pkg/apis/componentconfig/types.go +++ b/pkg/apis/componentconfig/types.go @@ -516,7 +516,7 @@ type KubeControllerManagerConfiguration struct { // deletingPodsQps is the number of nodes per second on which pods are deleted in // case of node failure. DeletingPodsQps float32 `json:"deletingPodsQps"` - // deletingPodsBurst is the number of nodes on which pods are bursty deleted in + // DEPRECATED: deletingPodsBurst is the number of nodes on which pods are bursty deleted in // case of node failure. For more details look into RateLimiter. DeletingPodsBurst int32 `json:"deletingPodsBurst"` // nodeMontiorGracePeriod is the amount of time which we allow a running node to be diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index ecd11cd6e1..face4bc543 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -56,6 +56,8 @@ const ( nodeStatusUpdateRetry = 5 // controls how often NodeController will try to evict Pods from non-responsive Nodes. nodeEvictionPeriod = 100 * time.Millisecond + // Burst value for all eviction rate limiters + evictionRateLimiterBurst = 1 ) type zoneState string @@ -114,7 +116,6 @@ type NodeController struct { zonePodEvictor map[string]*RateLimitedTimedQueue zoneTerminationEvictor map[string]*RateLimitedTimedQueue evictionLimiterQPS float32 - evictionLimiterBurst int podEvictionTimeout time.Duration // The maximum duration before a pod evicted from a node can be forcefully terminated. maximumGracePeriod time.Duration @@ -144,7 +145,6 @@ func NewNodeController( kubeClient clientset.Interface, podEvictionTimeout time.Duration, evictionLimiterQPS float32, - evictionLimiterBurst int, nodeMonitorGracePeriod time.Duration, nodeStartupGracePeriod time.Duration, nodeMonitorPeriod time.Duration, @@ -200,7 +200,6 @@ func NewNodeController( nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) }, computeZoneStateFunc: ComputeZoneState, evictionLimiterQPS: evictionLimiterQPS, - evictionLimiterBurst: evictionLimiterBurst, zoneStates: make(map[string]zoneState), } @@ -385,11 +384,11 @@ func (nc *NodeController) monitorNodeStatus() error { if _, found := nc.zonePodEvictor[zone]; !found { nc.zonePodEvictor[zone] = NewRateLimitedTimedQueue( - flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst)) + flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst)) } if _, found := nc.zoneTerminationEvictor[zone]; !found { nc.zoneTerminationEvictor[zone] = NewRateLimitedTimedQueue( - flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst)) + flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst)) } nc.cancelPodEviction(added[i]) } diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 31f8881404..48b73a210c 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -601,7 +601,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { for _, item := range table { nodeController := NewNodeController(nil, item.fakeNodeHandler, - evictionTimeout, testRateLimiterQPS, testRateLimiterBurst, testNodeMonitorGracePeriod, + evictionTimeout, testRateLimiterQPS, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } for _, ds := range item.daemonSets { @@ -674,7 +674,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) { deleteWaitChan: make(chan struct{}), } nodeController := NewNodeController(nil, fnh, 10*time.Minute, - testRateLimiterQPS, testRateLimiterBurst, + testRateLimiterQPS, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.cloud = &fakecloud.FakeCloud{} @@ -907,7 +907,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for i, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterBurst, + nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1057,7 +1057,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) { } for i, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterBurst, + nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1139,7 +1139,7 @@ func TestNodeDeletion(t *testing.T) { Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), } - nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterBurst, + nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } @@ -1243,7 +1243,7 @@ func TestCheckPod(t *testing.T) { }, } - nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, 0, nil, nil, 0, false) + nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false) nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc) nc.nodeStore.Store.Add(&api.Node{ ObjectMeta: api.ObjectMeta{ @@ -1310,7 +1310,7 @@ func TestCleanupOrphanedPods(t *testing.T) { newPod("b", "bar"), newPod("c", "gone"), } - nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, 0, nil, nil, 0, false) + nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false) nc.nodeStore.Store.Add(newNode("foo")) nc.nodeStore.Store.Add(newNode("bar")) diff --git a/pkg/controller/node/rate_limited_queue.go b/pkg/controller/node/rate_limited_queue.go index 2cf933534f..305ba129fe 100644 --- a/pkg/controller/node/rate_limited_queue.go +++ b/pkg/controller/node/rate_limited_queue.go @@ -148,8 +148,9 @@ func (q *UniqueQueue) Clear() { // RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time // of execution. It is also rate limited. type RateLimitedTimedQueue struct { - queue UniqueQueue - limiter flowcontrol.RateLimiter + queue UniqueQueue + limiterLock sync.Mutex + limiter flowcontrol.RateLimiter } // Creates new queue which will use given RateLimiter to oversee execution. @@ -173,6 +174,8 @@ type ActionFunc func(TimedValue) (bool, time.Duration) // time to execute the next item in the queue. func (q *RateLimitedTimedQueue) Try(fn ActionFunc) { val, ok := q.queue.Head() + q.limiterLock.Lock() + defer q.limiterLock.Unlock() for ok { // rate limit the queue checking if !q.limiter.TryAccept() { @@ -216,3 +219,35 @@ func (q *RateLimitedTimedQueue) Remove(value string) bool { func (q *RateLimitedTimedQueue) Clear() { q.queue.Clear() } + +// SwapLimiter safely swaps current limiter for this queue with the passed one if capacities or qps's differ. +func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32) { + q.limiterLock.Lock() + defer q.limiterLock.Unlock() + if q.limiter.QPS() == newQPS { + return + } + var newLimiter flowcontrol.RateLimiter + if newQPS <= 0 { + newLimiter = flowcontrol.NewFakeNeverRateLimiter() + } else { + newLimiter = flowcontrol.NewTokenBucketRateLimiter(newQPS, evictionRateLimiterBurst) + } + // If we're currently waiting on limiter, we drain the new one - this is a good approach when Burst value is 1 + // TODO: figure out if we need to support higher Burst values and decide on the drain logic, should we keep: + // - saturation (percentage of used tokens) + // - number of used tokens + // - number of available tokens + // - something else + for q.limiter.Saturation() > newLimiter.Saturation() { + // Check if we're not using fake limiter + previousSaturation := newLimiter.Saturation() + newLimiter.TryAccept() + // It's a fake limiter + if newLimiter.Saturation() == previousSaturation { + break + } + } + q.limiter.Stop() + q.limiter = newLimiter +} diff --git a/pkg/controller/node/rate_limited_queue_test.go b/pkg/controller/node/rate_limited_queue_test.go index 0b2016f9d8..44a24b13f1 100644 --- a/pkg/controller/node/rate_limited_queue_test.go +++ b/pkg/controller/node/rate_limited_queue_test.go @@ -281,3 +281,27 @@ func TestClear(t *testing.T) { t.Fatalf("Clear should remove all elements from the queue.") } } + +func TestSwapLimiter(t *testing.T) { + evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) + fakeAlways := flowcontrol.NewFakeAlwaysRateLimiter() + qps := evictor.limiter.QPS() + if qps != fakeAlways.QPS() { + t.Fatalf("QPS does not match create one: %v instead of %v", qps, fakeAlways.QPS()) + } + + evictor.SwapLimiter(0) + qps = evictor.limiter.QPS() + fakeNever := flowcontrol.NewFakeNeverRateLimiter() + if qps != fakeNever.QPS() { + t.Fatalf("QPS does not match create one: %v instead of %v", qps, fakeNever.QPS()) + } + + createdQPS := float32(5.5) + evictor.SwapLimiter(createdQPS) + qps = evictor.limiter.QPS() + if qps != createdQPS { + t.Fatalf("QPS does not match create one: %v instead of %v", qps, createdQPS) + } + +} diff --git a/pkg/util/flowcontrol/throttle.go b/pkg/util/flowcontrol/throttle.go index 482ba7d14a..232ef6e58c 100644 --- a/pkg/util/flowcontrol/throttle.go +++ b/pkg/util/flowcontrol/throttle.go @@ -35,10 +35,13 @@ type RateLimiter interface { // Usually we use token bucket rate limiter. In that case, // 1.0 means no tokens are available; 0.0 means we have a full bucket of tokens to use. Saturation() float64 + // QPS returns QPS of this rate limiter + QPS() float32 } type tokenBucketRateLimiter struct { limiter *ratelimit.Bucket + qps float32 } // NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach. @@ -48,7 +51,10 @@ type tokenBucketRateLimiter struct { // The maximum number of tokens in the bucket is capped at 'burst'. func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter { limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst)) - return &tokenBucketRateLimiter{limiter} + return &tokenBucketRateLimiter{ + limiter: limiter, + qps: qps, + } } func (t *tokenBucketRateLimiter) TryAccept() bool { @@ -69,6 +75,10 @@ func (t *tokenBucketRateLimiter) Accept() { func (t *tokenBucketRateLimiter) Stop() { } +func (t *tokenBucketRateLimiter) QPS() float32 { + return t.qps +} + type fakeAlwaysRateLimiter struct{} func NewFakeAlwaysRateLimiter() RateLimiter { @@ -87,6 +97,10 @@ func (t *fakeAlwaysRateLimiter) Stop() {} func (t *fakeAlwaysRateLimiter) Accept() {} +func (t *fakeAlwaysRateLimiter) QPS() float32 { + return 1 +} + type fakeNeverRateLimiter struct { wg sync.WaitGroup } @@ -114,3 +128,7 @@ func (t *fakeNeverRateLimiter) Stop() { func (t *fakeNeverRateLimiter) Accept() { t.wg.Wait() } + +func (t *fakeNeverRateLimiter) QPS() float32 { + return 1 +}