Merge pull request #42715 from DirectXMan12/bug/infinite-hpa

Automatic merge from submit-queue

Rate limit HPA controller to sync period

Since the HPA controller pulls information from an external source that
makes no guarantees about consistency, it's possible for the HPA
to get into an infinite update loop -- if the metrics change with
every query, the HPA controller will run it's normal reconcilation,
post a status update, see that status update itself, fetch new metrics,
and if those metrics are different, post another status update, and
repeat.  This can lead to continuously updating a single HPA.
    
By rate-limiting each HPA to once per sync interval, we prevent this
from happening.

**Release note**:
```release-note
NONE
```
pull/6/head
Kubernetes Submit Queue 2017-03-21 14:26:16 -07:00 committed by GitHub
commit eb77144474
4 changed files with 145 additions and 20 deletions

View File

@ -13,6 +13,7 @@ go_library(
srcs = [ srcs = [
"doc.go", "doc.go",
"horizontal.go", "horizontal.go",
"rate_limitters.go",
"replica_calculator.go", "replica_calculator.go",
], ],
tags = ["automanaged"], tags = ["automanaged"],
@ -27,8 +28,10 @@ go_library(
"//pkg/client/clientset_generated/clientset/typed/extensions/v1beta1:go_default_library", "//pkg/client/clientset_generated/clientset/typed/extensions/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/autoscaling/v1:go_default_library", "//pkg/client/informers/informers_generated/externalversions/autoscaling/v1:go_default_library",
"//pkg/client/listers/autoscaling/v1:go_default_library", "//pkg/client/listers/autoscaling/v1:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/podautoscaler/metrics:go_default_library", "//pkg/controller/podautoscaler/metrics:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/api/resource", "//vendor:k8s.io/apimachinery/pkg/api/resource",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/labels",
@ -36,10 +39,12 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/tools/record", "//vendor:k8s.io/client-go/tools/record",
"//vendor:k8s.io/client-go/util/workqueue",
], ],
) )

View File

@ -22,16 +22,19 @@ import (
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1" clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
autoscalingv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1" autoscalingv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
@ -41,6 +44,7 @@ import (
extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/extensions/v1beta1" extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/extensions/v1beta1"
autoscalinginformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/autoscaling/v1" autoscalinginformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/autoscaling/v1"
autoscalinglisters "k8s.io/kubernetes/pkg/client/listers/autoscaling/v1" autoscalinglisters "k8s.io/kubernetes/pkg/client/listers/autoscaling/v1"
"k8s.io/kubernetes/pkg/controller"
) )
const ( const (
@ -85,6 +89,9 @@ type HorizontalController struct {
// NewHorizontalController. // NewHorizontalController.
hpaLister autoscalinglisters.HorizontalPodAutoscalerLister hpaLister autoscalinglisters.HorizontalPodAutoscalerLister
hpaListerSynced cache.InformerSynced hpaListerSynced cache.InformerSynced
// Controllers that need to be synced
queue workqueue.RateLimitingInterface
} }
var downscaleForbiddenWindow = 5 * time.Minute var downscaleForbiddenWindow = 5 * time.Minute
@ -103,41 +110,31 @@ func NewHorizontalController(
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")}) broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")})
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "horizontal-pod-autoscaler"}) recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "horizontal-pod-autoscaler"})
controller := &HorizontalController{ hpaController := &HorizontalController{
replicaCalc: replicaCalc, replicaCalc: replicaCalc,
eventRecorder: recorder, eventRecorder: recorder,
scaleNamespacer: scaleNamespacer, scaleNamespacer: scaleNamespacer,
hpaNamespacer: hpaNamespacer, hpaNamespacer: hpaNamespacer,
queue: workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"),
} }
hpaInformer.Informer().AddEventHandlerWithResyncPeriod( hpaInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: hpaController.enqueueHPA,
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler) UpdateFunc: hpaController.updateHPA,
err := controller.reconcileAutoscaler(hpa) DeleteFunc: hpaController.deleteHPA,
if err != nil {
glog.Warningf("Failed to reconcile %s: %v", hpa.Name, err)
}
},
UpdateFunc: func(old, cur interface{}) {
hpa := cur.(*autoscalingv1.HorizontalPodAutoscaler)
err := controller.reconcileAutoscaler(hpa)
if err != nil {
glog.Warningf("Failed to reconcile %s: %v", hpa.Name, err)
}
},
// We are not interested in deletions.
}, },
resyncPeriod, resyncPeriod,
) )
controller.hpaLister = hpaInformer.Lister() hpaController.hpaLister = hpaInformer.Lister()
controller.hpaListerSynced = hpaInformer.Informer().HasSynced hpaController.hpaListerSynced = hpaInformer.Informer().HasSynced
return controller return hpaController
} }
func (a *HorizontalController) Run(stopCh <-chan struct{}) { func (a *HorizontalController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer a.queue.ShutDown()
glog.Infof("Starting HPA Controller") glog.Infof("Starting HPA Controller")
@ -146,10 +143,65 @@ func (a *HorizontalController) Run(stopCh <-chan struct{}) {
return return
} }
// start a single worker (we may wish to start more in the future)
go wait.Until(a.worker, time.Second, stopCh)
<-stopCh <-stopCh
glog.Infof("Shutting down HPA Controller") glog.Infof("Shutting down HPA Controller")
} }
// obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item.
func (a *HorizontalController) updateHPA(old, cur interface{}) {
a.enqueueHPA(cur)
}
// obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item.
func (a *HorizontalController) enqueueHPA(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
// always add rate-limitted so we don't fetch metrics more that once per resync interval
a.queue.AddRateLimited(key)
}
func (a *HorizontalController) deleteHPA(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
// TODO: could we leak if we fail to get the key?
a.queue.Forget(key)
}
func (a *HorizontalController) worker() {
for a.processNextWorkItem() {
}
glog.Infof("horizontal pod autoscaler controller worker shutting down")
}
func (a *HorizontalController) processNextWorkItem() bool {
key, quit := a.queue.Get()
if quit {
return false
}
defer a.queue.Done(key)
err := a.reconcileKey(key.(string))
if err == nil {
// don't "forget" here because we want to only process a given HPA once per resync interval
return true
}
a.queue.AddRateLimited(key)
utilruntime.HandleError(err)
return true
}
// Computes the desired number of replicas for the metric specifications listed in the HPA, returning the maximum // Computes the desired number of replicas for the metric specifications listed in the HPA, returning the maximum
// of the computed replica counts, a description of the associated metric, and the statuses of all metrics // of the computed replica counts, a description of the associated metric, and the statuses of all metrics
// computed. // computed.
@ -275,6 +327,21 @@ func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.Hori
return replicas, metric, statuses, timestamp, nil return replicas, metric, statuses, timestamp, nil
} }
func (a *HorizontalController) reconcileKey(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
if errors.IsNotFound(err) {
glog.Infof("Horizontal Pod Autoscaler has been deleted %v", key)
return nil
}
return a.reconcileAutoscaler(hpa)
}
func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler) error { func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler) error {
// make a copy so that we never mutate the shared informer cache (conversion can mutate the object) // make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
hpav1Raw, err := api.Scheme.DeepCopy(hpav1Shared) hpav1Raw, err := api.Scheme.DeepCopy(hpav1Shared)

View File

@ -490,7 +490,7 @@ func (tc *testCase) runTest(t *testing.T) {
) )
eventClient := &clientfake.Clientset{} eventClient := &clientfake.Clientset{}
eventClient.AddReactor("*", "events", func(action core.Action) (handled bool, ret runtime.Object, err error) { eventClient.AddReactor("create", "events", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock() tc.Lock()
defer tc.Unlock() defer tc.Unlock()

View File

@ -0,0 +1,53 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podautoscaler
import (
"time"
"k8s.io/client-go/util/workqueue"
)
// FixedItemIntervalRateLimiter limits items to a fixed-rate interval
type FixedItemIntervalRateLimiter struct {
interval time.Duration
}
var _ workqueue.RateLimiter = &FixedItemIntervalRateLimiter{}
func NewFixedItemIntervalRateLimiter(interval time.Duration) workqueue.RateLimiter {
return &FixedItemIntervalRateLimiter{
interval: interval,
}
}
func (r *FixedItemIntervalRateLimiter) When(item interface{}) time.Duration {
return r.interval
}
func (r *FixedItemIntervalRateLimiter) NumRequeues(item interface{}) int {
return 1
}
func (r *FixedItemIntervalRateLimiter) Forget(item interface{}) {
}
// NewDefaultHPARateLimitter creates a rate limitter which limits overall (as per the
// default controller rate limiter), as well as per the resync interval
func NewDefaultHPARateLimiter(interval time.Duration) workqueue.RateLimiter {
return NewFixedItemIntervalRateLimiter(interval)
}