Use backoff from util/flowcontroll in federated namespace controller and other minor fixes

pull/6/head
Marcin Wielgus 2016-08-16 16:30:19 +02:00
parent 378a49613f
commit c1cbe4771b
3 changed files with 50 additions and 47 deletions

View File

@ -148,7 +148,7 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
nsClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "namespace-controller")) nsClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "namespace-controller"))
namespaceController := namespacecontroller.NewNamespaceController(nsClientset) namespaceController := namespacecontroller.NewNamespaceController(nsClientset)
namespaceController.Start() namespaceController.Run(wait.NeverStop)
select {} select {}
} }

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
pkg_runtime "k8s.io/kubernetes/pkg/runtime" pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
@ -60,7 +61,8 @@ type NamespaceController struct {
// Client to federated api server. // Client to federated api server.
federatedApiClient federation_release_1_4.Interface federatedApiClient federation_release_1_4.Interface
stopChan chan struct{} // Backoff manager for namespaces
namespaceBackoff *flowcontrol.Backoff
namespaceReviewDelay time.Duration namespaceReviewDelay time.Duration
clusterAvailableDelay time.Duration clusterAvailableDelay time.Duration
@ -68,23 +70,15 @@ type NamespaceController struct {
updateTimeout time.Duration updateTimeout time.Duration
} }
// A structure passed by delying deliver. It contains a namespace that should be reconciled and
// the number of trials that were made previously and ended up in some kind of namespace-related
// error (like failure to create).
type namespaceItem struct {
namespace string
trial int64
}
// NewNamespaceController returns a new namespace controller // NewNamespaceController returns a new namespace controller
func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceController { func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceController {
nc := &NamespaceController{ nc := &NamespaceController{
federatedApiClient: client, federatedApiClient: client,
stopChan: make(chan struct{}),
namespaceReviewDelay: time.Second * 10, namespaceReviewDelay: time.Second * 10,
clusterAvailableDelay: time.Second * 20, clusterAvailableDelay: time.Second * 20,
smallDelay: time.Second * 3, smallDelay: time.Second * 3,
updateTimeout: time.Second * 30, updateTimeout: time.Second * 30,
namespaceBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
} }
// Build delivereres for triggering reconcilations. // Build delivereres for triggering reconcilations.
@ -103,7 +97,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC
}, },
&api_v1.Namespace{}, &api_v1.Namespace{},
controller.NoResyncPeriodFunc(), controller.NoResyncPeriodFunc(),
util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, 0, 0) })) util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, 0, false) }))
// Federated informer on namespaces in members of federation. // Federated informer on namespaces in members of federation.
nc.namespaceFederatedInformer = util.NewFederatedInformer( nc.namespaceFederatedInformer = util.NewFederatedInformer(
@ -123,7 +117,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC
// Trigger reconcilation whenever something in federated cluster is changed. In most cases it // Trigger reconcilation whenever something in federated cluster is changed. In most cases it
// would be just confirmation that some namespace opration suceeded. // would be just confirmation that some namespace opration suceeded.
util.NewTriggerOnMetaAndSpecChangesPreproc( util.NewTriggerOnMetaAndSpecChangesPreproc(
func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, 0) }, func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, false) },
func(obj pkg_runtime.Object) { util.SetClusterName(obj, cluster.Name) }, func(obj pkg_runtime.Object) { util.SetClusterName(obj, cluster.Name) },
)) ))
}, },
@ -131,7 +125,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC
&util.ClusterLifecycleHandlerFuncs{ &util.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *federation_api.Cluster) { ClusterAvailable: func(cluster *federation_api.Cluster) {
// When new cluster becomes available process all the namespaces again. // When new cluster becomes available process all the namespaces again.
nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(nc.clusterAvailableDelay)) nc.clusterDeliverer.DeliverAfter(allClustersKey, nil, nc.clusterAvailableDelay)
}, },
}, },
) )
@ -156,30 +150,44 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC
return nc return nc
} }
func (nc *NamespaceController) Start() { func (nc *NamespaceController) Run(stopChan <-chan struct{}) {
go nc.namespaceInformerController.Run(nc.stopChan) go nc.namespaceInformerController.Run(stopChan)
nc.namespaceFederatedInformer.Start() nc.namespaceFederatedInformer.Start()
go func() {
<-stopChan
nc.namespaceFederatedInformer.Stop()
}()
nc.namespaceDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { nc.namespaceDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
ni := item.Value.(*namespaceItem) namespace := item.Value.(string)
nc.reconcileNamespace(ni.namespace, ni.trial) nc.reconcileNamespace(namespace)
}) })
nc.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { nc.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
nc.reconcileNamespacesOnClusterChange() nc.reconcileNamespacesOnClusterChange()
}) })
go func() {
select {
case <-time.After(time.Minute):
nc.namespaceBackoff.GC()
case <-stopChan:
return
}
}()
} }
func (nc *NamespaceController) Stop() { func (nc *NamespaceController) deliverNamespaceObj(obj interface{}, delay time.Duration, failed bool) {
nc.namespaceFederatedInformer.Stop()
close(nc.stopChan)
}
func (nc *NamespaceController) deliverNamespaceObj(obj interface{}, delay time.Duration, trial int64) {
namespace := obj.(*api_v1.Namespace) namespace := obj.(*api_v1.Namespace)
nc.deliverNamespace(namespace.Name, delay, trial) nc.deliverNamespace(namespace.Name, delay, failed)
} }
func (nc *NamespaceController) deliverNamespace(namespace string, delay time.Duration, trial int64) { // Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure.
nc.namespaceDeliverer.DeliverAfter(namespace, &namespaceItem{namespace: namespace, trial: trial}, delay) func (nc *NamespaceController) deliverNamespace(namespace string, delay time.Duration, failed bool) {
if failed {
nc.namespaceBackoff.Next(namespace, time.Now())
delay = delay + nc.namespaceBackoff.Get(namespace)
} else {
nc.namespaceBackoff.Reset(namespace)
}
nc.namespaceDeliverer.DeliverAfter(namespace, namespace, delay)
} }
// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet // Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
@ -203,30 +211,23 @@ func (nc *NamespaceController) isSynced() bool {
// The function triggers reconcilation of all federated namespaces. // The function triggers reconcilation of all federated namespaces.
func (nc *NamespaceController) reconcileNamespacesOnClusterChange() { func (nc *NamespaceController) reconcileNamespacesOnClusterChange() {
if !nc.isSynced() { if !nc.isSynced() {
nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(nc.clusterAvailableDelay)) nc.clusterDeliverer.DeliverAfter(allClustersKey, nil, nc.clusterAvailableDelay)
} }
for _, obj := range nc.namespaceInformerStore.List() { for _, obj := range nc.namespaceInformerStore.List() {
namespace := obj.(*api_v1.Namespace) namespace := obj.(*api_v1.Namespace)
nc.deliverNamespace(namespace.Name, nc.smallDelay, 0) nc.deliverNamespace(namespace.Name, nc.smallDelay, false)
} }
} }
func backoff(trial int64) time.Duration { func (nc *NamespaceController) reconcileNamespace(namespace string) {
if trial > 12 {
return 12 * 5 * time.Second
}
return time.Duration(trial) * 5 * time.Second
}
func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) {
if !nc.isSynced() { if !nc.isSynced() {
nc.deliverNamespace(namespace, nc.clusterAvailableDelay, trial) nc.deliverNamespace(namespace, nc.clusterAvailableDelay, false)
} }
baseNamespaceObj, exist, err := nc.namespaceInformerStore.GetByKey(namespace) baseNamespaceObj, exist, err := nc.namespaceInformerStore.GetByKey(namespace)
if err != nil { if err != nil {
glog.Errorf("Failed to query main namespace store for %v: %v", namespace, err) glog.Errorf("Failed to query main namespace store for %v: %v", namespace, err)
nc.deliverNamespace(namespace, backoff(trial+1), trial+1) nc.deliverNamespace(namespace, 0, true)
return return
} }
@ -240,7 +241,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64)
err = nc.federatedApiClient.Core().Namespaces().Delete(baseNamespace.Name, &api.DeleteOptions{}) err = nc.federatedApiClient.Core().Namespaces().Delete(baseNamespace.Name, &api.DeleteOptions{})
if err != nil { if err != nil {
glog.Errorf("Failed to delete namespace %s: %v", baseNamespace.Name, err) glog.Errorf("Failed to delete namespace %s: %v", baseNamespace.Name, err)
nc.deliverNamespace(namespace, backoff(trial+1), trial+1) nc.deliverNamespace(namespace, 0, true)
} }
return return
} }
@ -248,7 +249,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64)
clusters, err := nc.namespaceFederatedInformer.GetReadyClusters() clusters, err := nc.namespaceFederatedInformer.GetReadyClusters()
if err != nil { if err != nil {
glog.Errorf("Failed to get cluster list: %v", err) glog.Errorf("Failed to get cluster list: %v", err)
nc.deliverNamespace(namespace, nc.clusterAvailableDelay, trial) nc.deliverNamespace(namespace, nc.clusterAvailableDelay, false)
return return
} }
@ -257,7 +258,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64)
clusterNamespaceObj, found, err := nc.namespaceFederatedInformer.GetTargetStore().GetByKey(cluster.Name, namespace) clusterNamespaceObj, found, err := nc.namespaceFederatedInformer.GetTargetStore().GetByKey(cluster.Name, namespace)
if err != nil { if err != nil {
glog.Errorf("Failed to get %s from %s: %v", namespace, cluster.Name, err) glog.Errorf("Failed to get %s from %s: %v", namespace, cluster.Name, err)
nc.deliverNamespace(namespace, backoff(trial+1), trial+1) nc.deliverNamespace(namespace, 0, true)
return return
} }
desiredNamespace := &api_v1.Namespace{ desiredNamespace := &api_v1.Namespace{
@ -292,10 +293,10 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64)
err = nc.federatedUpdater.Update(operations, nc.updateTimeout) err = nc.federatedUpdater.Update(operations, nc.updateTimeout)
if err != nil { if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", namespace, err) glog.Errorf("Failed to execute updates for %s: %v", namespace, err)
nc.deliverNamespace(namespace, backoff(trial+1), trial+1) nc.deliverNamespace(namespace, 0, true)
return return
} }
// Evertyhing is in order but lets be double sure // Evertyhing is in order but lets be double sure
nc.deliverNamespace(namespace, nc.namespaceReviewDelay, 0) nc.deliverNamespace(namespace, nc.namespaceReviewDelay, false)
} }

View File

@ -66,11 +66,13 @@ func TestNamespaceController(t *testing.T) {
return nil, fmt.Errorf("Unknown cluster") return nil, fmt.Errorf("Unknown cluster")
} }
}) })
namespaceController.clusterAvailableDelay = 1000 * time.Millisecond namespaceController.clusterAvailableDelay = time.Second
namespaceController.namespaceReviewDelay = 50 * time.Millisecond namespaceController.namespaceReviewDelay = 50 * time.Millisecond
namespaceController.smallDelay = 20 * time.Millisecond namespaceController.smallDelay = 20 * time.Millisecond
namespaceController.updateTimeout = 5 * time.Second namespaceController.updateTimeout = 5 * time.Second
namespaceController.Start()
stop := make(chan struct{})
namespaceController.Run(stop)
ns1 := api_v1.Namespace{ ns1 := api_v1.Namespace{
ObjectMeta: api_v1.ObjectMeta{ ObjectMeta: api_v1.ObjectMeta{
@ -101,7 +103,7 @@ func TestNamespaceController(t *testing.T) {
assert.Equal(t, ns1.Name, createdNamespace2.Name) assert.Equal(t, ns1.Name, createdNamespace2.Name)
// assert.Contains(t, createdNamespace2.Annotations, "A") // assert.Contains(t, createdNamespace2.Annotations, "A")
namespaceController.Stop() close(stop)
} }
func toFederatedInformerForTestOnly(informer util.FederatedInformer) util.FederatedInformerForTestOnly { func toFederatedInformerForTestOnly(informer util.FederatedInformer) util.FederatedInformerForTestOnly {