combine syncs in rootcacertpublisher

and some misc simplifications.
pull/58/head
Mike Danese 2018-11-08 19:14:12 -08:00
parent 8307fb2fb3
commit 206f5892a7
4 changed files with 88 additions and 149 deletions

View File

@ -143,7 +143,6 @@ func startRootCACertPublisher(ctx ControllerContext) (http.Handler, bool, error)
sac, err := rootcacertpublisher.NewPublisher(
ctx.InformerFactory.Core().V1().ConfigMaps(),
ctx.InformerFactory.Core().V1().Namespaces(),
ctx.ClientBuilder.ClientOrDie("root-ca-cert-publisher"),
rootCA,
)

View File

@ -44,8 +44,8 @@ go_test(
"//pkg/controller:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
],
)

View File

@ -36,22 +36,17 @@ import (
"k8s.io/kubernetes/pkg/util/metrics"
)
// RootCACertCofigMapName is name of the configmap which stores certificates to access api-server
// RootCACertCofigMapName is name of the configmap which stores certificates to
// access api-server
const RootCACertCofigMapName = "kube-root-ca.crt"
// NewPublisher construct a new controller which would manage the configmap which stores
// certificates in each namespace. It will make sure certificate configmap exists in each namespace.
func NewPublisher(cmInformer coreinformers.ConfigMapInformer, nsInformer coreinformers.NamespaceInformer, cl clientset.Interface, rootCA []byte) (*Publisher, error) {
// NewPublisher construct a new controller which would manage the configmap
// which stores certificates in each namespace. It will make sure certificate
// configmap exists in each namespace.
func NewPublisher(cmInformer coreinformers.ConfigMapInformer, cl clientset.Interface, rootCA []byte) (*Publisher, error) {
e := &Publisher{
client: cl,
configMap: v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: RootCACertCofigMapName,
},
Data: map[string]string{
"ca.crt": string(rootCA),
},
},
rootCA: rootCA,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "root-ca-cert-publisher"),
}
if cl.CoreV1().RESTClient().GetRateLimiter() != nil {
@ -67,13 +62,6 @@ func NewPublisher(cmInformer coreinformers.ConfigMapInformer, nsInformer coreinf
e.cmLister = cmInformer.Lister()
e.cmListerSynced = cmInformer.Informer().HasSynced
nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: e.namespaceAdded,
UpdateFunc: e.namespaceUpdated,
})
e.nsLister = nsInformer.Lister()
e.nsListerSynced = nsInformer.Informer().HasSynced
e.syncHandler = e.syncNamespace
return e, nil
@ -83,7 +71,7 @@ func NewPublisher(cmInformer coreinformers.ConfigMapInformer, nsInformer coreinf
// Publisher manages certificate ConfigMap objects inside Namespaces
type Publisher struct {
client clientset.Interface
configMap v1.ConfigMap
rootCA []byte
// To allow injection for testing.
syncHandler func(key string) error
@ -91,9 +79,6 @@ type Publisher struct {
cmLister corelisters.ConfigMapLister
cmListerSynced cache.InformerSynced
nsLister corelisters.NamespaceLister
nsListerSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
}
@ -105,7 +90,7 @@ func (c *Publisher) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting root CA certificate configmap publisher")
defer klog.Infof("Shutting down root CA certificate configmap publisher")
if !controller.WaitForCacheSync("crt configmap", stopCh, c.cmListerSynced, c.nsListerSynced) {
if !controller.WaitForCacheSync("crt configmap", stopCh, c.cmListerSynced) {
return
}
@ -129,24 +114,15 @@ func (c *Publisher) configMapDeleted(obj interface{}) {
}
func (c *Publisher) configMapUpdated(_, newObj interface{}) {
newConfigMap, err := convertToCM(newObj)
cm, err := convertToCM(newObj)
if err != nil {
utilruntime.HandleError(err)
return
}
if newConfigMap.Name != RootCACertCofigMapName {
if cm.Name != RootCACertCofigMapName {
return
}
if reflect.DeepEqual(c.configMap.Data, newConfigMap.Data) {
return
}
newConfigMap.Data = make(map[string]string)
newConfigMap.Data["ca.crt"] = c.configMap.Data["ca.crt"]
if _, err := c.client.CoreV1().ConfigMaps(newConfigMap.Namespace).Update(newConfigMap); err != nil && !apierrs.IsAlreadyExists(err) {
utilruntime.HandleError(fmt.Errorf("configmap creation failure:%v", err))
}
c.queue.Add(cm.Namespace)
}
func (c *Publisher) namespaceAdded(obj interface{}) {
@ -167,7 +143,8 @@ func (c *Publisher) runWorker() {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
// processNextWorkItem deals with one key off the queue. It returns false when
// it's time to quit.
func (c *Publisher) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
@ -175,47 +152,52 @@ func (c *Publisher) processNextWorkItem() bool {
}
defer c.queue.Done(key)
err := c.syncHandler(key.(string))
if err == nil {
if err := c.syncHandler(key.(string)); err != nil {
utilruntime.HandleError(fmt.Errorf("syncing %q failed: %v", key, err))
c.queue.AddRateLimited(key)
return true
}
c.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
c.queue.AddRateLimited(key)
return true
}
func (c *Publisher) syncNamespace(key string) error {
func (c *Publisher) syncNamespace(ns string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Since(startTime))
klog.V(4).Infof("Finished syncing namespace %q (%v)", ns, time.Since(startTime))
}()
ns, err := c.nsLister.Get(key)
if apierrs.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
switch _, err := c.cmLister.ConfigMaps(ns.Name).Get(c.configMap.Name); {
case err == nil:
return nil
cm, err := c.cmLister.ConfigMaps(ns).Get(RootCACertCofigMapName)
switch {
case apierrs.IsNotFound(err):
_, err := c.client.CoreV1().ConfigMaps(ns).Create(&v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: RootCACertCofigMapName,
},
Data: map[string]string{
"ca.crt": string(c.rootCA),
},
})
return err
case err != nil:
return err
}
cm := c.configMap.DeepCopy()
if _, err := c.client.CoreV1().ConfigMaps(ns.Name).Create(cm); err != nil && !apierrs.IsAlreadyExists(err) {
return err
data := map[string]string{
"ca.crt": string(c.rootCA),
}
if reflect.DeepEqual(cm.Data, data) {
return nil
}
cm.Data = data
_, err = c.client.CoreV1().ConfigMaps(ns).Update(cm)
return err
}
func convertToCM(obj interface{}) (*v1.ConfigMap, error) {
cm, ok := obj.(*v1.ConfigMap)
if !ok {

View File

@ -17,14 +17,14 @@ limitations under the License.
package rootcacertpublisher
import (
"reflect"
"testing"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/controller"
)
@ -76,94 +76,68 @@ func TestConfigMapCreation(t *testing.T) {
name string
}
testcases := map[string]struct {
ExistingNamespace *v1.Namespace
ExistingConfigMaps []*v1.ConfigMap
AddedNamespace *v1.Namespace
UpdatedNamespace *v1.Namespace
DeletedConfigMap *v1.ConfigMap
UpdatedConfigMap []*v1.ConfigMap
UpdatedConfigMap *v1.ConfigMap
ExpectActions []action
}{
"create new namesapce": {
AddedNamespace: newNs,
ExpectActions: []action{{verb: "create", name: RootCACertCofigMapName}},
},
"delete other configmap": {
ExistingNamespace: existNS,
ExistingConfigMaps: []*v1.ConfigMap{otherConfigMap, caConfigMap},
DeletedConfigMap: otherConfigMap,
},
"delete ca configmap": {
ExistingNamespace: existNS,
ExistingConfigMaps: []*v1.ConfigMap{otherConfigMap, caConfigMap},
DeletedConfigMap: caConfigMap,
ExpectActions: []action{{verb: "create", name: RootCACertCofigMapName}},
},
"update ca configmap with adding field": {
ExistingNamespace: existNS,
ExistingConfigMaps: []*v1.ConfigMap{caConfigMap},
UpdatedConfigMap: []*v1.ConfigMap{caConfigMap, addFieldCM},
UpdatedConfigMap: addFieldCM,
ExpectActions: []action{{verb: "update", name: RootCACertCofigMapName}},
},
"update ca configmap with modifying field": {
ExistingNamespace: existNS,
ExistingConfigMaps: []*v1.ConfigMap{caConfigMap},
UpdatedConfigMap: []*v1.ConfigMap{caConfigMap, modifyFieldCM},
UpdatedConfigMap: modifyFieldCM,
ExpectActions: []action{{verb: "update", name: RootCACertCofigMapName}},
},
"update with other configmap": {
ExistingNamespace: existNS,
ExistingConfigMaps: []*v1.ConfigMap{caConfigMap, otherConfigMap},
UpdatedConfigMap: []*v1.ConfigMap{otherConfigMap, updateOtherConfigMap},
UpdatedConfigMap: updateOtherConfigMap,
},
"update namespace with terminating state": {
ExistingNamespace: existNS,
UpdatedNamespace: terminatingNS,
},
}
for k, tc := range testcases {
t.Run(k, func(t *testing.T) {
client := fake.NewSimpleClientset(caConfigMap, existNS)
informers := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc())
cmInformer := informers.Core().V1().ConfigMaps()
nsInformer := informers.Core().V1().Namespaces()
controller, err := NewPublisher(cmInformer, nsInformer, client, fakeRootCA)
controller, err := NewPublisher(cmInformer, client, fakeRootCA)
if err != nil {
t.Fatalf("error creating ServiceAccounts controller: %v", err)
}
controller.cmListerSynced = alwaysReady
controller.nsListerSynced = alwaysReady
cmStore := cmInformer.Informer().GetStore()
nsStore := nsInformer.Informer().GetStore()
syncCalls := make(chan struct{})
controller.syncHandler = func(key string) error {
err := controller.syncNamespace(key)
if err != nil {
t.Logf("%s: %v", k, err)
}
syncCalls <- struct{}{}
return err
}
stopCh := make(chan struct{})
defer close(stopCh)
go controller.Run(1, stopCh)
controller.syncHandler = controller.syncNamespace
if tc.ExistingNamespace != nil {
nsStore.Add(tc.ExistingNamespace)
}
for _, s := range tc.ExistingConfigMaps {
cmStore.Add(s)
}
if tc.AddedNamespace != nil {
nsStore.Add(tc.AddedNamespace)
controller.namespaceAdded(tc.AddedNamespace)
}
if tc.UpdatedNamespace != nil {
controller.namespaceUpdated(tc.ExistingNamespace, tc.UpdatedNamespace)
controller.namespaceUpdated(nil, tc.UpdatedNamespace)
}
if tc.DeletedConfigMap != nil {
@ -172,37 +146,21 @@ func TestConfigMapCreation(t *testing.T) {
}
if tc.UpdatedConfigMap != nil {
old := tc.UpdatedConfigMap[0]
new := tc.UpdatedConfigMap[1]
controller.configMapUpdated(old, new)
cmStore.Add(tc.UpdatedConfigMap)
controller.configMapUpdated(nil, tc.UpdatedConfigMap)
}
// wait to be called
select {
case <-syncCalls:
case <-time.After(5 * time.Second):
for controller.queue.Len() != 0 {
controller.processNextWorkItem()
}
actions := client.Actions()
if len(tc.ExpectActions) != len(actions) {
t.Errorf("%s: Expected to create configmap %#v. Actual actions were: %#v", k, tc.ExpectActions, actions)
continue
if reflect.DeepEqual(actions, tc.ExpectActions) {
t.Errorf("Unexpected actions:\n%s", diff.ObjectGoPrintDiff(actions, tc.ExpectActions))
}
for i, expectAction := range tc.ExpectActions {
action := actions[i]
if !action.Matches(expectAction.verb, "configmaps") {
t.Errorf("%s: Unexpected action %s", k, action)
break
}
cm := action.(core.CreateAction).GetObject().(*v1.ConfigMap)
if cm.Name != expectAction.name {
t.Errorf("%s: Expected %s to be %s, got %s be %s", k, expectAction.name, expectAction.verb, cm.Name, action.GetVerb())
})
}
}
}
}
var alwaysReady = func() bool { return true }
func defaultCrtConfigMapPtr(rootCA []byte) *v1.ConfigMap {
tmp := v1.ConfigMap{