mirror of https://github.com/k3s-io/k3s
Merge pull request #36019 from mwielgus/fed-secret-nn
Automatic merge from submit-queue Switch federated secret controller to use NamespacedName To make it cleaner and consistent with other controllers. cc: @quinton-hoolepull/6/head
commit
ec6a5d279f
|
@ -26,6 +26,7 @@ go_library(
|
||||||
"//pkg/client/record:go_default_library",
|
"//pkg/client/record:go_default_library",
|
||||||
"//pkg/controller:go_default_library",
|
"//pkg/controller:go_default_library",
|
||||||
"//pkg/runtime:go_default_library",
|
"//pkg/runtime:go_default_library",
|
||||||
|
"//pkg/types:go_default_library",
|
||||||
"//pkg/util/flowcontrol:go_default_library",
|
"//pkg/util/flowcontrol:go_default_library",
|
||||||
"//pkg/watch:go_default_library",
|
"//pkg/watch:go_default_library",
|
||||||
"//vendor:github.com/golang/glog",
|
"//vendor:github.com/golang/glog",
|
||||||
|
@ -45,6 +46,7 @@ go_test(
|
||||||
"//pkg/client/clientset_generated/release_1_5:go_default_library",
|
"//pkg/client/clientset_generated/release_1_5:go_default_library",
|
||||||
"//pkg/client/clientset_generated/release_1_5/fake:go_default_library",
|
"//pkg/client/clientset_generated/release_1_5/fake:go_default_library",
|
||||||
"//pkg/runtime:go_default_library",
|
"//pkg/runtime:go_default_library",
|
||||||
|
"//pkg/types:go_default_library",
|
||||||
"//pkg/util/wait:go_default_library",
|
"//pkg/util/wait:go_default_library",
|
||||||
"//vendor:github.com/stretchr/testify/assert",
|
"//vendor:github.com/stretchr/testify/assert",
|
||||||
],
|
],
|
||||||
|
|
|
@ -17,7 +17,6 @@ limitations under the License.
|
||||||
package secret
|
package secret
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||||
|
@ -31,6 +30,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
|
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/types"
|
||||||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
|
|
||||||
|
@ -173,8 +173,8 @@ func (secretcontroller *SecretController) Run(stopChan <-chan struct{}) {
|
||||||
secretcontroller.secretFederatedInformer.Stop()
|
secretcontroller.secretFederatedInformer.Stop()
|
||||||
}()
|
}()
|
||||||
secretcontroller.secretDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
|
secretcontroller.secretDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
|
||||||
secret := item.Value.(*secretItem)
|
secret := item.Value.(*types.NamespacedName)
|
||||||
secretcontroller.reconcileSecret(secret.namespace, secret.name)
|
secretcontroller.reconcileSecret(*secret)
|
||||||
})
|
})
|
||||||
secretcontroller.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
|
secretcontroller.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
|
||||||
secretcontroller.reconcileSecretsOnClusterChange()
|
secretcontroller.reconcileSecretsOnClusterChange()
|
||||||
|
@ -182,32 +182,21 @@ func (secretcontroller *SecretController) Run(stopChan <-chan struct{}) {
|
||||||
util.StartBackoffGC(secretcontroller.secretBackoff, stopChan)
|
util.StartBackoffGC(secretcontroller.secretBackoff, stopChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getSecretKey(namespace, name string) string {
|
|
||||||
return fmt.Sprintf("%s/%s", namespace, name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Internal structure for data in delaying deliverer.
|
|
||||||
type secretItem struct {
|
|
||||||
namespace string
|
|
||||||
name string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (secretcontroller *SecretController) deliverSecretObj(obj interface{}, delay time.Duration, failed bool) {
|
func (secretcontroller *SecretController) deliverSecretObj(obj interface{}, delay time.Duration, failed bool) {
|
||||||
secret := obj.(*api_v1.Secret)
|
secret := obj.(*api_v1.Secret)
|
||||||
secretcontroller.deliverSecret(secret.Namespace, secret.Name, delay, failed)
|
secretcontroller.deliverSecret(types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name}, delay, failed)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure.
|
// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure.
|
||||||
func (secretcontroller *SecretController) deliverSecret(namespace string, name string, delay time.Duration, failed bool) {
|
func (secretcontroller *SecretController) deliverSecret(secret types.NamespacedName, delay time.Duration, failed bool) {
|
||||||
key := getSecretKey(namespace, name)
|
key := secret.String()
|
||||||
if failed {
|
if failed {
|
||||||
secretcontroller.secretBackoff.Next(key, time.Now())
|
secretcontroller.secretBackoff.Next(key, time.Now())
|
||||||
delay = delay + secretcontroller.secretBackoff.Get(key)
|
delay = delay + secretcontroller.secretBackoff.Get(key)
|
||||||
} else {
|
} else {
|
||||||
secretcontroller.secretBackoff.Reset(key)
|
secretcontroller.secretBackoff.Reset(key)
|
||||||
}
|
}
|
||||||
secretcontroller.secretDeliverer.DeliverAfter(key,
|
secretcontroller.secretDeliverer.DeliverAfter(key, &secret, delay)
|
||||||
&secretItem{namespace: namespace, name: name}, delay)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
|
// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
|
||||||
|
@ -235,22 +224,22 @@ func (secretcontroller *SecretController) reconcileSecretsOnClusterChange() {
|
||||||
}
|
}
|
||||||
for _, obj := range secretcontroller.secretInformerStore.List() {
|
for _, obj := range secretcontroller.secretInformerStore.List() {
|
||||||
secret := obj.(*api_v1.Secret)
|
secret := obj.(*api_v1.Secret)
|
||||||
secretcontroller.deliverSecret(secret.Namespace, secret.Name, secretcontroller.smallDelay, false)
|
secretcontroller.deliverSecret(types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name}, secretcontroller.smallDelay, false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (secretcontroller *SecretController) reconcileSecret(namespace string, secretName string) {
|
func (secretcontroller *SecretController) reconcileSecret(secret types.NamespacedName) {
|
||||||
|
|
||||||
if !secretcontroller.isSynced() {
|
if !secretcontroller.isSynced() {
|
||||||
secretcontroller.deliverSecret(namespace, secretName, secretcontroller.clusterAvailableDelay, false)
|
secretcontroller.deliverSecret(secret, secretcontroller.clusterAvailableDelay, false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
key := getSecretKey(namespace, secretName)
|
key := secret.String()
|
||||||
baseSecretObj, exist, err := secretcontroller.secretInformerStore.GetByKey(key)
|
baseSecretObj, exist, err := secretcontroller.secretInformerStore.GetByKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to query main secret store for %v: %v", key, err)
|
glog.Errorf("Failed to query main secret store for %v: %v", key, err)
|
||||||
secretcontroller.deliverSecret(namespace, secretName, 0, true)
|
secretcontroller.deliverSecret(secret, 0, true)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -263,7 +252,7 @@ func (secretcontroller *SecretController) reconcileSecret(namespace string, secr
|
||||||
clusters, err := secretcontroller.secretFederatedInformer.GetReadyClusters()
|
clusters, err := secretcontroller.secretFederatedInformer.GetReadyClusters()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to get cluster list: %v", err)
|
glog.Errorf("Failed to get cluster list: %v", err)
|
||||||
secretcontroller.deliverSecret(namespace, secretName, secretcontroller.clusterAvailableDelay, false)
|
secretcontroller.deliverSecret(secret, secretcontroller.clusterAvailableDelay, false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -272,7 +261,7 @@ func (secretcontroller *SecretController) reconcileSecret(namespace string, secr
|
||||||
clusterSecretObj, found, err := secretcontroller.secretFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
|
clusterSecretObj, found, err := secretcontroller.secretFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to get %s from %s: %v", key, cluster.Name, err)
|
glog.Errorf("Failed to get %s from %s: %v", key, cluster.Name, err)
|
||||||
secretcontroller.deliverSecret(namespace, secretName, 0, true)
|
secretcontroller.deliverSecret(secret, 0, true)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -320,10 +309,10 @@ func (secretcontroller *SecretController) reconcileSecret(namespace string, secr
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to execute updates for %s: %v", key, err)
|
glog.Errorf("Failed to execute updates for %s: %v", key, err)
|
||||||
secretcontroller.deliverSecret(namespace, secretName, 0, true)
|
secretcontroller.deliverSecret(secret, 0, true)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Evertyhing is in order but lets be double sure
|
// Evertyhing is in order but lets be double sure
|
||||||
secretcontroller.deliverSecret(namespace, secretName, secretcontroller.secretReviewDelay, false)
|
secretcontroller.deliverSecret(secret, secretcontroller.secretReviewDelay, false)
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import (
|
||||||
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
|
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
|
||||||
fake_kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake"
|
fake_kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/types"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
@ -100,7 +101,7 @@ func TestSecretController(t *testing.T) {
|
||||||
// Wait for the secret to appear in the informer store
|
// Wait for the secret to appear in the informer store
|
||||||
err := WaitForStoreUpdate(
|
err := WaitForStoreUpdate(
|
||||||
secretController.secretFederatedInformer.GetTargetStore(),
|
secretController.secretFederatedInformer.GetTargetStore(),
|
||||||
cluster1.Name, getSecretKey(secret1.Namespace, secret1.Name), wait.ForeverTestTimeout)
|
cluster1.Name, types.NamespacedName{Namespace: secret1.Namespace, Name: secret1.Name}.String(), wait.ForeverTestTimeout)
|
||||||
assert.Nil(t, err, "secret should have appeared in the informer store")
|
assert.Nil(t, err, "secret should have appeared in the informer store")
|
||||||
|
|
||||||
// Test update federated secret.
|
// Test update federated secret.
|
||||||
|
|
Loading…
Reference in New Issue