From d02f40a5e725d70f59e9f9e96b22ddad5c0426f7 Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Wed, 17 May 2017 16:53:46 -0700 Subject: [PATCH] Implement DaemonSet history logic in controller 1. Create controllerrevisions (history) and label pods with template hash for both RollingUpdate and OnDelete update strategy 2. Clean up old, non-live history based on revisionHistoryLimit 3. Remove duplicate controllerrevisions (the ones with the same template) and relabel their pods 4. Update RBAC to allow DaemonSet controller to manage controllerrevisions 5. In DaemonSet controller unit tests, create new pods with hash labels --- cmd/kube-controller-manager/app/extensions.go | 1 + pkg/client/listers/extensions/v1beta1/BUILD | 1 + .../extensions/v1beta1/daemonset_expansion.go | 36 ++ pkg/controller/controller_utils.go | 22 +- pkg/controller/controller_utils_test.go | 36 ++ pkg/controller/daemon/BUILD | 5 + pkg/controller/daemon/daemoncontroller.go | 207 ++++++++++- .../daemon/daemoncontroller_test.go | 19 +- pkg/controller/daemon/update.go | 328 +++++++++++++++++- pkg/controller/daemon/util/daemonset_util.go | 22 +- .../daemon/util/daemonset_util_test.go | 102 +++++- pkg/controller/deployment/sync.go | 2 +- pkg/controller/deployment/util/hash_test.go | 5 +- pkg/controller/deployment/util/pod_util.go | 18 - .../deployment/util/pod_util_test.go | 59 ---- .../deployment/util/replicaset_util.go | 3 +- .../rbac/bootstrappolicy/controller_policy.go | 1 + .../testdata/controller-roles.yaml | 11 + 18 files changed, 742 insertions(+), 136 deletions(-) delete mode 100644 pkg/controller/deployment/util/pod_util_test.go diff --git a/cmd/kube-controller-manager/app/extensions.go b/cmd/kube-controller-manager/app/extensions.go index a3218496fa..f1b6544079 100644 --- a/cmd/kube-controller-manager/app/extensions.go +++ b/cmd/kube-controller-manager/app/extensions.go @@ -33,6 +33,7 @@ func startDaemonSetController(ctx ControllerContext) (bool, error) { } go daemon.NewDaemonSetsController( ctx.InformerFactory.Extensions().V1beta1().DaemonSets(), + ctx.InformerFactory.Apps().V1beta1().ControllerRevisions(), ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Nodes(), ctx.ClientBuilder.ClientOrDie("daemon-set-controller"), diff --git a/pkg/client/listers/extensions/v1beta1/BUILD b/pkg/client/listers/extensions/v1beta1/BUILD index 2292129b3b..973ba0bea3 100644 --- a/pkg/client/listers/extensions/v1beta1/BUILD +++ b/pkg/client/listers/extensions/v1beta1/BUILD @@ -26,6 +26,7 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", + "//pkg/apis/apps/v1beta1:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go b/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go index 6e913e4770..09c0ca738c 100644 --- a/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go +++ b/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go @@ -22,6 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/api/v1" + apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" ) @@ -29,6 +30,7 @@ import ( // DaemonSetLister. type DaemonSetListerExpansion interface { GetPodDaemonSets(pod *v1.Pod) ([]*v1beta1.DaemonSet, error) + GetHistoryDaemonSets(history *apps.ControllerRevision) ([]*v1beta1.DaemonSet, error) } // DaemonSetNamespaceListerExpansion allows custom methods to be added to @@ -76,3 +78,37 @@ func (s *daemonSetLister) GetPodDaemonSets(pod *v1.Pod) ([]*v1beta1.DaemonSet, e return daemonSets, nil } + +// GetHistoryDaemonSets returns a list of DaemonSets that potentially +// match a ControllerRevision. Only the one specified in the ControllerRevision's ControllerRef +// will actually manage it. +// Returns an error only if no matching DaemonSets are found. +func (s *daemonSetLister) GetHistoryDaemonSets(history *apps.ControllerRevision) ([]*v1beta1.DaemonSet, error) { + if len(history.Labels) == 0 { + return nil, fmt.Errorf("no DaemonSet found for ControllerRevision %s because it has no labels", history.Name) + } + + list, err := s.DaemonSets(history.Namespace).List(labels.Everything()) + if err != nil { + return nil, err + } + + var daemonSets []*v1beta1.DaemonSet + for _, ds := range list { + selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("invalid label selector: %v", err) + } + // If a DaemonSet with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(history.Labels)) { + continue + } + daemonSets = append(daemonSets, ds) + } + + if len(daemonSets) == 0 { + return nil, fmt.Errorf("could not find DaemonSets for ControllerRevision %s in namespace %s with labels: %v", history.Name, history.Namespace, history.Labels) + } + + return daemonSets, nil +} diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index f7e29fc171..f65e697d61 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -17,8 +17,10 @@ limitations under the License. package controller import ( + "encoding/binary" "encoding/json" "fmt" + "hash/fnv" "sync" "sync/atomic" "time" @@ -29,16 +31,14 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/clock" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" - - "k8s.io/apimachinery/pkg/util/clock" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/integer" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" v1helper "k8s.io/kubernetes/pkg/api/v1/helper" @@ -48,6 +48,7 @@ import ( extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" clientretry "k8s.io/kubernetes/pkg/client/retry" + hashutil "k8s.io/kubernetes/pkg/util/hash" "github.com/golang/glog" ) @@ -980,3 +981,18 @@ func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs glog.Infof("Caches are synced for %s controller", controllerName) return true } + +// ComputeHash returns a hash value calculated from pod template and a collisionCount to avoid hash collision +func ComputeHash(template *v1.PodTemplateSpec, collisionCount *int64) uint32 { + podTemplateSpecHasher := fnv.New32a() + hashutil.DeepHashObject(podTemplateSpecHasher, *template) + + // Add collisionCount in the hash if it exists. + if collisionCount != nil { + collisionCountBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(collisionCountBytes, uint64(*collisionCount)) + podTemplateSpecHasher.Write(collisionCountBytes) + } + + return podTemplateSpecHasher.Sum32() +} diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index 8bfe343878..e4dbbd93e3 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -19,6 +19,7 @@ package controller import ( "encoding/json" "fmt" + "math" "math/rand" "net/http/httptest" "reflect" @@ -443,3 +444,38 @@ func TestActiveReplicaSetsFiltering(t *testing.T) { t.Errorf("expected %v, got %v", expectedNames.List(), gotNames.List()) } } + +func int64P(num int64) *int64 { + return &num +} + +func TestComputeHash(t *testing.T) { + tests := []struct { + name string + template *v1.PodTemplateSpec + collisionCount *int64 + otherCollisionCount *int64 + }{ + { + name: "simple", + template: &v1.PodTemplateSpec{}, + collisionCount: int64P(1), + otherCollisionCount: int64P(2), + }, + { + name: "using math.MaxInt64", + template: &v1.PodTemplateSpec{}, + collisionCount: nil, + otherCollisionCount: int64P(int64(math.MaxInt64)), + }, + } + + for _, test := range tests { + hash := ComputeHash(test.template, test.collisionCount) + otherHash := ComputeHash(test.template, test.otherCollisionCount) + + if hash == otherHash { + t.Errorf("expected different hashes but got the same: %d", hash) + } + } +} diff --git a/pkg/controller/daemon/BUILD b/pkg/controller/daemon/BUILD index 74c714d0c7..9305e1ac27 100644 --- a/pkg/controller/daemon/BUILD +++ b/pkg/controller/daemon/BUILD @@ -21,11 +21,14 @@ go_library( "//pkg/api/v1:go_default_library", "//pkg/api/v1/helper:go_default_library", "//pkg/api/v1/pod:go_default_library", + "//pkg/apis/apps/v1beta1:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset/typed/extensions/v1beta1:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/apps/v1beta1:go_default_library", "//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library", "//pkg/client/informers/informers_generated/externalversions/extensions/v1beta1:go_default_library", + "//pkg/client/listers/apps/v1beta1:go_default_library", "//pkg/client/listers/core/v1:go_default_library", "//pkg/client/listers/extensions/v1beta1:go_default_library", "//pkg/controller:go_default_library", @@ -38,9 +41,11 @@ go_library( "//plugin/pkg/scheduler/algorithm/predicates:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index 2a7e83cfc8..71e35e31bb 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -39,11 +39,14 @@ import ( "k8s.io/kubernetes/pkg/api/v1" v1helper "k8s.io/kubernetes/pkg/api/v1/helper" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/extensions/v1beta1" + appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1" coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/extensions/v1beta1" + appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1" "k8s.io/kubernetes/pkg/controller" @@ -99,6 +102,11 @@ type DaemonSetsController struct { // dsStoreSynced returns true if the daemonset store has been synced at least once. // Added as a member to the struct to allow injection for testing. dsStoreSynced cache.InformerSynced + // historyLister get list/get history from the shared informers's store + historyLister appslisters.ControllerRevisionLister + // historyStoreSynced returns true if the history store has been synced at least once. + // Added as a member to the struct to allow injection for testing. + historyStoreSynced cache.InformerSynced // podLister get list/get pods from the shared informers's store podLister corelisters.PodLister // podStoreSynced returns true if the pod store has been synced at least once. @@ -114,7 +122,7 @@ type DaemonSetsController struct { queue workqueue.RateLimitingInterface } -func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) *DaemonSetsController { +func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, historyInformer appsinformers.ControllerRevisionInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) *DaemonSetsController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. @@ -152,6 +160,14 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo dsc.dsLister = daemonSetInformer.Lister() dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced + historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: dsc.addHistory, + UpdateFunc: dsc.updateHistory, + DeleteFunc: dsc.deleteHistory, + }) + dsc.historyLister = historyInformer.Lister() + dsc.historyStoreSynced = historyInformer.Informer().HasSynced + // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete // more pods until all the effects (expectations) of a daemon set's create/delete have been observed. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -273,6 +289,138 @@ func (dsc *DaemonSetsController) getPodDaemonSets(pod *v1.Pod) []*extensions.Dae return sets } +// getDaemonSetsForHistory returns a list of DaemonSets that potentially +// match a ControllerRevision. +func (dsc *DaemonSetsController) getDaemonSetsForHistory(history *apps.ControllerRevision) []*extensions.DaemonSet { + daemonSets, err := dsc.dsLister.GetHistoryDaemonSets(history) + if err != nil || len(daemonSets) == 0 { + return nil + } + if len(daemonSets) > 1 { + // ControllerRef will ensure we don't do anything crazy, but more than one + // item in this list nevertheless constitutes user error. + glog.V(4).Infof("User error! more than one DaemonSets is selecting ControllerRevision %s/%s with labels: %#v", + history.Namespace, history.Name, history.Labels) + } + return daemonSets +} + +// addHistory enqueues the DaemonSet that manages a ControllerRevision when the ControllerRevision is created +// or when the controller manager is restarted. +func (dsc *DaemonSetsController) addHistory(obj interface{}) { + history := obj.(*apps.ControllerRevision) + if history.DeletionTimestamp != nil { + // On a restart of the controller manager, it's possible for an object to + // show up in a state that is already pending deletion. + dsc.deleteHistory(history) + return + } + + // If it has a ControllerRef, that's all that matters. + if controllerRef := controller.GetControllerOf(history); controllerRef != nil { + ds := dsc.resolveControllerRef(history.Namespace, controllerRef) + if ds == nil { + return + } + glog.V(4).Infof("ControllerRevision %s added.", history.Name) + return + } + + // Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync + // them to see if anyone wants to adopt it. + daemonSets := dsc.getDaemonSetsForHistory(history) + if len(daemonSets) == 0 { + return + } + glog.V(4).Infof("Orphan ControllerRevision %s added.", history.Name) + for _, ds := range daemonSets { + dsc.enqueueDaemonSet(ds) + } +} + +// updateHistory figures out what DaemonSet(s) manage a ControllerRevision when the ControllerRevision +// is updated and wake them up. If the anything of the ControllerRevision have changed, we need to +// awaken both the old and new DaemonSets. +func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) { + curHistory := cur.(*apps.ControllerRevision) + oldHistory := old.(*apps.ControllerRevision) + if curHistory.ResourceVersion == oldHistory.ResourceVersion { + // Periodic resync will send update events for all known ControllerRevisions. + return + } + + curControllerRef := controller.GetControllerOf(curHistory) + oldControllerRef := controller.GetControllerOf(oldHistory) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && oldControllerRef != nil { + // The ControllerRef was changed. Sync the old controller, if any. + if ds := dsc.resolveControllerRef(oldHistory.Namespace, oldControllerRef); ds != nil { + dsc.enqueueDaemonSet(ds) + } + } + + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + ds := dsc.resolveControllerRef(curHistory.Namespace, curControllerRef) + if ds == nil { + return + } + glog.V(4).Infof("ControllerRevision %s updated.", curHistory.Name) + dsc.enqueueDaemonSet(ds) + return + } + + // Otherwise, it's an orphan. If anything changed, sync matching controllers + // to see if anyone wants to adopt it now. + labelChanged := !reflect.DeepEqual(curHistory.Labels, oldHistory.Labels) + if labelChanged || controllerRefChanged { + daemonSets := dsc.getDaemonSetsForHistory(curHistory) + if len(daemonSets) == 0 { + return + } + glog.V(4).Infof("Orphan ControllerRevision %s updated.", curHistory.Name) + for _, ds := range daemonSets { + dsc.enqueueDaemonSet(ds) + } + } +} + +// deleteHistory enqueues the DaemonSet that manages a ControllerRevision when +// the ControllerRevision is deleted. obj could be an *app.ControllerRevision, or +// a DeletionFinalStateUnknown marker item. +func (dsc *DaemonSetsController) deleteHistory(obj interface{}) { + history, ok := obj.(*apps.ControllerRevision) + + // When a delete is dropped, the relist will notice a ControllerRevision in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. If the ControllerRevision + // changed labels the new DaemonSet will not be woken up till the periodic resync. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + return + } + history, ok = tombstone.Obj.(*apps.ControllerRevision) + if !ok { + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ControllerRevision %#v", obj)) + return + } + } + + controllerRef := controller.GetControllerOf(history) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return + } + ds := dsc.resolveControllerRef(history.Namespace, controllerRef) + if ds == nil { + return + } + glog.V(4).Infof("ControllerRevision %s deleted.", history.Name) + dsc.enqueueDaemonSet(ds) +} + func (dsc *DaemonSetsController) addPod(obj interface{}) { pod := obj.(*v1.Pod) @@ -486,11 +634,11 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) { } } -// getNodesToDaemonSetPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes. +// getDaemonPods returns daemon pods owned by the given ds. // This also reconciles ControllerRef by adopting/orphaning. // Note that returned Pods are pointers to objects in the cache. // If you want to modify one, you need to deep-copy it first. -func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) (map[string][]*v1.Pod, error) { +func (dsc *DaemonSetsController) getDaemonPods(ds *extensions.DaemonSet) ([]*v1.Pod, error) { selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) if err != nil { return nil, err @@ -516,7 +664,15 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) }) // Use ControllerRefManager to adopt/orphan as needed. cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind, canAdoptFunc) - claimedPods, err := cm.ClaimPods(pods) + return cm.ClaimPods(pods) +} + +// getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes. +// This also reconciles ControllerRef by adopting/orphaning. +// Note that returned Pods are pointers to objects in the cache. +// If you want to modify one, you need to deep-copy it first. +func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) (map[string][]*v1.Pod, error) { + claimedPods, err := dsc.getDaemonPods(ds) if err != nil { return nil, err } @@ -554,18 +710,18 @@ func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controll return ds } -func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error { +func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) (string, error) { // Find out which nodes are running the daemon pods controlled by ds. nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) if err != nil { - return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) + return "", fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node. nodeList, err := dsc.nodeLister.List(labels.Everything()) if err != nil { - return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err) + return "", fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err) } var nodesNeedingDaemonPods, podsToDelete []string var failedPodsObserved int @@ -612,23 +768,33 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error { } } } - errors := dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods) + + // Find current history of the DaemonSet, and label new pods using the hash label value of the current history when creating them + cur, _, err := dsc.constructHistory(ds) + if err != nil { + return "", fmt.Errorf("failed to construct revisions of DaemonSet: %v", err) + } + + hash := cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey] + if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil { + return "", err + } // Throw an error when the daemon pods fail, to use ratelimiter to prevent kill-recreate hot loop if failedPodsObserved > 0 { - errors = append(errors, fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name)) + return "", fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name) } - return utilerrors.NewAggregate(errors) + return hash, nil } // syncNodes deletes given pods and creates new daemon set pods on the given nodes // returns slice with erros if any -func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string) []error { +func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error { // We need to set expectations before creating/deleting pods to avoid race conditions. dsKey, err := controller.KeyFunc(ds) if err != nil { - return []error{fmt.Errorf("couldn't get key for object %#v: %v", ds, err)} + return fmt.Errorf("couldn't get key for object %#v: %v", ds, err) } createDiff := len(nodesNeedingDaemonPods) @@ -649,7 +815,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelet glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff) createWait := sync.WaitGroup{} createWait.Add(createDiff) - template := util.GetPodTemplateWithGeneration(ds.Spec.Template, ds.Spec.TemplateGeneration) + template := util.CreatePodTemplate(ds.Spec.Template, ds.Spec.TemplateGeneration, hash) for i := 0; i < createDiff; i++ { go func(ix int) { defer createWait.Done() @@ -685,7 +851,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelet for err := range errCh { errors = append(errors, err) } - return errors + return utilerrors.NewAggregate(errors) } func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable int) error { @@ -767,7 +933,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) numberAvailable++ } } - if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod) { + if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod, ds.Labels[extensions.DefaultDaemonSetUniqueLabelKey]) { updatedNumberScheduled++ } } @@ -825,21 +991,28 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { return dsc.updateDaemonSetStatus(ds) } - if err := dsc.manage(ds); err != nil { + hash, err := dsc.manage(ds) + if err != nil { return err } // Process rolling updates if we're ready. if dsc.expectations.SatisfiedExpectations(dsKey) { switch ds.Spec.UpdateStrategy.Type { + case extensions.OnDeleteDaemonSetStrategyType: case extensions.RollingUpdateDaemonSetStrategyType: - err = dsc.rollingUpdate(ds) + err = dsc.rollingUpdate(ds, hash) } if err != nil { return err } } + err = dsc.cleanupHistory(ds) + if err != nil { + return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err) + } + return dsc.updateDaemonSetStatus(ds) } diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index cf30584e25..51cdab1194 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/controller" kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/securitycontext" + labelsutil "k8s.io/kubernetes/pkg/util/labels" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" ) @@ -84,6 +85,7 @@ func getKey(ds *extensions.DaemonSet, t *testing.T) string { } func newDaemonSet(name string) *extensions.DaemonSet { + two := int32(2) return &extensions.DaemonSet{ TypeMeta: metav1.TypeMeta{APIVersion: testapi.Extensions.GroupVersion().String()}, ObjectMeta: metav1.ObjectMeta{ @@ -92,6 +94,10 @@ func newDaemonSet(name string) *extensions.DaemonSet { Namespace: metav1.NamespaceDefault, }, Spec: extensions.DaemonSetSpec{ + RevisionHistoryLimit: &two, + UpdateStrategy: extensions.DaemonSetUpdateStrategy{ + Type: extensions.OnDeleteDaemonSetStrategyType, + }, Selector: &metav1.LabelSelector{MatchLabels: simpleDaemonSetLabel}, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ @@ -139,11 +145,18 @@ func addNodes(nodeStore cache.Store, startIndex, numNodes int, label map[string] } func newPod(podName string, nodeName string, label map[string]string, ds *extensions.DaemonSet) *v1.Pod { + // Add hash unique label to the pod + newLabels := label + if ds != nil { + hash := fmt.Sprint(controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount)) + newLabels = labelsutil.CloneAndAddLabel(label, extensions.DefaultDaemonSetUniqueLabelKey, hash) + } + pod := &v1.Pod{ TypeMeta: metav1.TypeMeta{APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String()}, ObjectMeta: metav1.ObjectMeta{ GenerateName: podName, - Labels: label, + Labels: newLabels, Namespace: metav1.NamespaceDefault, }, Spec: v1.PodSpec{ @@ -168,7 +181,8 @@ func newPod(podName string, nodeName string, label map[string]string, ds *extens func addPods(podStore cache.Store, nodeName string, label map[string]string, ds *extensions.DaemonSet, number int) { for i := 0; i < number; i++ { - podStore.Add(newPod(fmt.Sprintf("%s-", nodeName), nodeName, label, ds)) + pod := newPod(fmt.Sprintf("%s-", nodeName), nodeName, label, ds) + podStore.Add(pod) } } @@ -251,6 +265,7 @@ func newTestController(initialObjects ...runtime.Object) (*daemonSetsController, manager := NewDaemonSetsController( informerFactory.Extensions().V1beta1().DaemonSets(), + informerFactory.Apps().V1beta1().ControllerRevisions(), informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Nodes(), clientset, diff --git a/pkg/controller/daemon/update.go b/pkg/controller/daemon/update.go index d474fb2d31..9be6960e43 100644 --- a/pkg/controller/daemon/update.go +++ b/pkg/controller/daemon/update.go @@ -17,28 +17,37 @@ limitations under the License. package daemon import ( + "bytes" + "encoding/json" "fmt" + "sort" "github.com/golang/glog" + apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/runtime" intstrutil "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/daemon/util" + labelsutil "k8s.io/kubernetes/pkg/util/labels" ) // rollingUpdate deletes old daemon set pods making sure that no more than // ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable -func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet) error { +func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet, hash string) error { nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } - _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods) + _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash) maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeToDaemonPods) if err != nil { return fmt.Errorf("Couldn't get unavailable numbers: %v", err) @@ -46,7 +55,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet) error { oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods) // for oldPods delete all not running pods - var podsToDelete []string + var oldPodsToDelete []string glog.V(4).Infof("Marking all unavailable old pods for deletion") for _, pod := range oldUnavailablePods { // Skip terminating pods. We won't delete them again @@ -54,7 +63,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet) error { continue } glog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) - podsToDelete = append(podsToDelete, pod.Name) + oldPodsToDelete = append(oldPodsToDelete, pod.Name) } glog.V(4).Infof("Marking old pods for deletion") @@ -64,20 +73,311 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet) error { break } glog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) - podsToDelete = append(podsToDelete, pod.Name) + oldPodsToDelete = append(oldPodsToDelete, pod.Name) numUnavailable++ } - errors := dsc.syncNodes(ds, podsToDelete, []string{}) - return utilerrors.NewAggregate(errors) + return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash) } -func (dsc *DaemonSetsController) getAllDaemonSetPods(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) ([]*v1.Pod, []*v1.Pod) { +func (dsc *DaemonSetsController) constructHistory(ds *extensions.DaemonSet) (cur *apps.ControllerRevision, old []*apps.ControllerRevision, err error) { + var histories []*apps.ControllerRevision + var currentHistories []*apps.ControllerRevision + histories, err = dsc.controlledHistories(ds) + if err != nil { + return nil, nil, err + } + for _, history := range histories { + // Add the unique label if it's not already added to the history + // We use history name instead of computing hash, so that we don't need to worry about hash collision + if _, ok := history.Labels[extensions.DefaultDaemonSetUniqueLabelKey]; !ok { + var clone interface{} + clone, err = api.Scheme.DeepCopy(history) + if err != nil { + return nil, nil, err + } + toUpdate := clone.(*apps.ControllerRevision) + toUpdate.Labels[extensions.DefaultDaemonSetUniqueLabelKey] = toUpdate.Name + history, err = dsc.kubeClient.AppsV1beta1().ControllerRevisions(ds.Namespace).Update(toUpdate) + if err != nil { + return nil, nil, err + } + } + // Compare histories with ds to separate cur and old history + found := false + found, err = match(ds, history) + if err != nil { + return nil, nil, err + } + if found { + currentHistories = append(currentHistories, history) + } else { + old = append(old, history) + } + } + + currRevision := maxRevision(old) + 1 + switch len(currentHistories) { + case 0: + // Create a new history if the current one isn't found + cur, err = dsc.snapshot(ds, currRevision) + if err != nil { + return nil, nil, err + } + default: + cur, err = dsc.dedupCurHistories(ds, currentHistories) + if err != nil { + return nil, nil, err + } + // Update revision number if necessary + if cur.Revision < currRevision { + var clone interface{} + clone, err = api.Scheme.DeepCopy(cur) + if err != nil { + return nil, nil, err + } + toUpdate := clone.(*apps.ControllerRevision) + toUpdate.Revision = currRevision + _, err = dsc.kubeClient.AppsV1beta1().ControllerRevisions(ds.Namespace).Update(toUpdate) + if err != nil { + return nil, nil, err + } + } + } + // Label ds with current history's unique label as well + if ds.Labels[extensions.DefaultDaemonSetUniqueLabelKey] != cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey] { + var clone interface{} + clone, err = api.Scheme.DeepCopy(ds) + if err != nil { + return nil, nil, err + } + toUpdate := clone.(*extensions.DaemonSet) + if toUpdate.Labels == nil { + toUpdate.Labels = make(map[string]string) + } + toUpdate.Labels[extensions.DefaultDaemonSetUniqueLabelKey] = cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey] + _, err = dsc.kubeClient.ExtensionsV1beta1().DaemonSets(ds.Namespace).UpdateStatus(toUpdate) + } + return cur, old, err +} + +func (dsc *DaemonSetsController) cleanupHistory(ds *extensions.DaemonSet) error { + nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ds) + if err != nil { + return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) + } + _, old, err := dsc.constructHistory(ds) + if err != nil { + return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err) + } + + toKeep := int(*ds.Spec.RevisionHistoryLimit) + toKill := len(old) - toKeep + if toKill <= 0 { + return nil + } + + // Find all hashes of live pods + liveHashes := make(map[string]bool) + for _, pods := range nodesToDaemonPods { + for _, pod := range pods { + if hash := pod.Labels[extensions.DefaultDaemonSetUniqueLabelKey]; len(hash) > 0 { + liveHashes[hash] = true + } + } + } + + // Find all live history with the above hashes + liveHistory := make(map[string]bool) + for _, history := range old { + if hash := history.Labels[extensions.DefaultDaemonSetUniqueLabelKey]; liveHashes[hash] { + liveHistory[history.Name] = true + } + } + + // Clean up old history from smallest to highest revision (from oldest to newest) + sort.Sort(historiesByRevision(old)) + for _, history := range old { + if toKill <= 0 { + break + } + if liveHistory[history.Name] { + continue + } + // Clean up + err := dsc.kubeClient.AppsV1beta1().ControllerRevisions(ds.Namespace).Delete(history.Name, nil) + if err != nil { + return err + } + toKill-- + } + return nil +} + +// maxRevision returns the max revision number of the given list of histories +func maxRevision(histories []*apps.ControllerRevision) int64 { + max := int64(0) + for _, history := range histories { + if history.Revision > max { + max = history.Revision + } + } + return max +} + +func (dsc *DaemonSetsController) dedupCurHistories(ds *extensions.DaemonSet, curHistories []*apps.ControllerRevision) (*apps.ControllerRevision, error) { + if len(curHistories) == 1 { + return curHistories[0], nil + } + var maxRevision int64 + var keepCur *apps.ControllerRevision + for _, cur := range curHistories { + if cur.Revision >= maxRevision { + keepCur = cur + maxRevision = cur.Revision + } + } + // Clean up duplicates and relabel pods + for _, cur := range curHistories { + if cur.Name == keepCur.Name { + continue + } + // Relabel pods before dedup + pods, err := dsc.getDaemonPods(ds) + if err != nil { + return nil, err + } + for _, pod := range pods { + if pod.Labels[extensions.DefaultDaemonSetUniqueLabelKey] != keepCur.Labels[extensions.DefaultDaemonSetUniqueLabelKey] { + clone, err := api.Scheme.DeepCopy(pod) + if err != nil { + return nil, err + } + toUpdate := clone.(*v1.Pod) + if toUpdate.Labels == nil { + toUpdate.Labels = make(map[string]string) + } + toUpdate.Labels[extensions.DefaultDaemonSetUniqueLabelKey] = keepCur.Labels[extensions.DefaultDaemonSetUniqueLabelKey] + _, err = dsc.kubeClient.Core().Pods(ds.Namespace).Update(toUpdate) + if err != nil { + return nil, err + } + } + } + // Remove duplicates + err = dsc.kubeClient.AppsV1beta1().ControllerRevisions(ds.Namespace).Delete(cur.Name, nil) + if err != nil { + return nil, err + } + } + return keepCur, nil +} + +// controlledHistories returns all ControllerRevisions controlled by the given DaemonSet +// Note that returned histories are pointers to objects in the cache. +// If you want to modify one, you need to deep-copy it first. +func (dsc *DaemonSetsController) controlledHistories(ds *extensions.DaemonSet) ([]*apps.ControllerRevision, error) { + var result []*apps.ControllerRevision + selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) + if err != nil { + return nil, err + } + histories, err := dsc.historyLister.List(selector) + if err != nil { + return nil, err + } + for _, history := range histories { + // Skip history that doesn't belong to the DaemonSet + if controllerRef := controller.GetControllerOf(history); controllerRef == nil || controllerRef.UID != ds.UID { + continue + } + result = append(result, history) + } + return result, nil +} + +// match check if ds template is semantically equal to the template stored in history +func match(ds *extensions.DaemonSet, history *apps.ControllerRevision) (bool, error) { + template, err := decodeHistory(history) + return apiequality.Semantic.DeepEqual(&ds.Spec.Template, template), err +} + +func decodeHistory(history *apps.ControllerRevision) (*v1.PodTemplateSpec, error) { + raw := history.Data.Raw + decoder := json.NewDecoder(bytes.NewBuffer(raw)) + template := v1.PodTemplateSpec{} + err := decoder.Decode(&template) + return &template, err +} + +func encodeTemplate(template *v1.PodTemplateSpec) ([]byte, error) { + buffer := new(bytes.Buffer) + encoder := json.NewEncoder(buffer) + err := encoder.Encode(template) + return buffer.Bytes(), err +} + +func (dsc *DaemonSetsController) snapshot(ds *extensions.DaemonSet, revision int64) (*apps.ControllerRevision, error) { + encodedTemplate, err := encodeTemplate(&ds.Spec.Template) + if err != nil { + return nil, err + } + hash := fmt.Sprint(controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount)) + name := ds.Name + "-" + hash + history := &apps.ControllerRevision{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ds.Namespace, + Labels: labelsutil.CloneAndAddLabel(ds.Spec.Template.Labels, extensions.DefaultDaemonSetUniqueLabelKey, hash), + Annotations: ds.Annotations, + OwnerReferences: []metav1.OwnerReference{*newControllerRef(ds)}, + }, + Data: runtime.RawExtension{Raw: encodedTemplate}, + Revision: revision, + } + + history, err = dsc.kubeClient.AppsV1beta1().ControllerRevisions(ds.Namespace).Create(history) + if errors.IsAlreadyExists(err) { + // TODO: Is it okay to get from historyLister? + existedHistory, getErr := dsc.kubeClient.AppsV1beta1().ControllerRevisions(ds.Namespace).Get(name, metav1.GetOptions{}) + if getErr != nil { + return nil, getErr + } + // Check if we already created it + done, err := match(ds, existedHistory) + if err != nil { + return nil, err + } + if done { + return existedHistory, nil + } + + // Handle name collisions between different history + // TODO: Is it okay to get from dsLister? + currDS, getErr := dsc.kubeClient.ExtensionsV1beta1().DaemonSets(ds.Namespace).Get(ds.Name, metav1.GetOptions{}) + if getErr != nil { + return nil, getErr + } + if currDS.Status.CollisionCount == nil { + currDS.Status.CollisionCount = new(int64) + } + *currDS.Status.CollisionCount++ + _, updateErr := dsc.kubeClient.ExtensionsV1beta1().DaemonSets(ds.Namespace).UpdateStatus(currDS) + if updateErr != nil { + return nil, updateErr + } + glog.V(2).Infof("Found a hash collision for DaemonSet %q - bumping collisionCount to %d to resolve it", ds.Name, *currDS.Status.CollisionCount) + return nil, err + } + return history, err +} + +func (dsc *DaemonSetsController) getAllDaemonSetPods(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod, hash string) ([]*v1.Pod, []*v1.Pod) { var newPods []*v1.Pod var oldPods []*v1.Pod for _, pods := range nodeToDaemonPods { for _, pod := range pods { - if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod) { + if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod, hash) { newPods = append(newPods, pod) } else { oldPods = append(oldPods, pod) @@ -129,3 +429,11 @@ func (dsc *DaemonSetsController) getUnavailableNumbers(ds *extensions.DaemonSet, glog.V(4).Infof(" DaemonSet %s/%s, maxUnavailable: %d, numUnavailable: %d", ds.Namespace, ds.Name, maxUnavailable, numUnavailable) return maxUnavailable, numUnavailable, nil } + +type historiesByRevision []*apps.ControllerRevision + +func (h historiesByRevision) Len() int { return len(h) } +func (h historiesByRevision) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h historiesByRevision) Less(i, j int) bool { + return h[i].Revision < h[j].Revision +} diff --git a/pkg/controller/daemon/util/daemonset_util.go b/pkg/controller/daemon/util/daemonset_util.go index 0b101e769c..a6acf97086 100644 --- a/pkg/controller/daemon/util/daemonset_util.go +++ b/pkg/controller/daemon/util/daemonset_util.go @@ -29,9 +29,10 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" ) -// GetPodTemplateWithHash returns copy of provided template with additional -// label which contains hash of provided template and sets default daemon tolerations. -func GetPodTemplateWithGeneration(template v1.PodTemplateSpec, generation int64) v1.PodTemplateSpec { +// CreatePodTemplate returns copy of provided template with additional +// label which contains templateGeneration (for backward compatibility), +// hash of provided template and sets default daemon tolerations. +func CreatePodTemplate(template v1.PodTemplateSpec, generation int64, hash string) v1.PodTemplateSpec { obj, _ := api.Scheme.DeepCopy(template) newTemplate := obj.(v1.PodTemplateSpec) // DaemonSet pods shouldn't be deleted by NodeController in case of node problems. @@ -60,14 +61,19 @@ func GetPodTemplateWithGeneration(template v1.PodTemplateSpec, generation int64) extensions.DaemonSetTemplateGenerationKey, templateGenerationStr, ) + // TODO: do we need to validate if the DaemonSet is RollingUpdate or not? + if len(hash) > 0 { + newTemplate.ObjectMeta.Labels[extensions.DefaultDaemonSetUniqueLabelKey] = hash + } return newTemplate } -// IsPodUpdate checks if pod contains label with provided hash -func IsPodUpdated(dsTemplateGeneration int64, pod *v1.Pod) bool { - podTemplateGeneration, generationExists := pod.ObjectMeta.Labels[extensions.DaemonSetTemplateGenerationKey] - dsTemplateGenerationStr := fmt.Sprint(dsTemplateGeneration) - return generationExists && podTemplateGeneration == dsTemplateGenerationStr +// IsPodUpdate checks if pod contains label value that either matches templateGeneration or hash +func IsPodUpdated(dsTemplateGeneration int64, pod *v1.Pod, hash string) bool { + // Compare with hash to see if the pod is updated, need to maintain backward compatibility of templateGeneration + templateMatches := pod.Labels[extensions.DaemonSetTemplateGenerationKey] == fmt.Sprint(dsTemplateGeneration) + hashMatches := len(hash) > 0 && pod.Labels[extensions.DefaultDaemonSetUniqueLabelKey] == hash + return hashMatches || templateMatches } // SplitByAvailablePods splits provided daemon set pods by availabilty diff --git a/pkg/controller/daemon/util/daemonset_util_test.go b/pkg/controller/daemon/util/daemonset_util_test.go index 630b60ebdd..e4fa22c494 100644 --- a/pkg/controller/daemon/util/daemonset_util_test.go +++ b/pkg/controller/daemon/util/daemonset_util_test.go @@ -47,46 +47,118 @@ func newPod(podName string, nodeName string, label map[string]string) *v1.Pod { } func TestIsPodUpdated(t *testing.T) { + templateGeneration := int64(12345) + hash := "55555" + labels := map[string]string{extensions.DaemonSetTemplateGenerationKey: fmt.Sprint(templateGeneration), extensions.DefaultDaemonSetUniqueLabelKey: hash} + labelsNoHash := map[string]string{extensions.DaemonSetTemplateGenerationKey: fmt.Sprint(templateGeneration)} tests := []struct { + test string templateGeneration int64 pod *v1.Pod + hash string isUpdated bool }{ { - int64(12345), - newPod("pod1", "node1", map[string]string{extensions.DaemonSetTemplateGenerationKey: "12345"}), + "templateGeneration and hash both match", + templateGeneration, + newPod("pod1", "node1", labels), + hash, true, }, { - int64(12355), - newPod("pod1", "node1", map[string]string{extensions.DaemonSetTemplateGenerationKey: "12345"}), + "templateGeneration matches, hash doesn't", + templateGeneration, + newPod("pod1", "node1", labels), + hash + "123", + true, + }, + { + "templateGeneration matches, no hash label, has hash", + templateGeneration, + newPod("pod1", "node1", labelsNoHash), + hash, + true, + }, + { + "templateGeneration matches, no hash label, no hash", + templateGeneration, + newPod("pod1", "node1", labelsNoHash), + "", + true, + }, + { + "templateGeneration matches, has hash label, no hash", + templateGeneration, + newPod("pod1", "node1", labels), + "", + true, + }, + { + "templateGeneration doesn't match, hash does", + templateGeneration + 1, + newPod("pod1", "node1", labels), + hash, + true, + }, + { + "templateGeneration and hash don't match", + templateGeneration + 1, + newPod("pod1", "node1", labels), + hash + "123", false, }, { - int64(12355), + "empty labels, no hash", + templateGeneration, newPod("pod1", "node1", map[string]string{}), + "", false, }, { - int64(12355), + "empty labels", + templateGeneration, + newPod("pod1", "node1", map[string]string{}), + hash, + false, + }, + { + "no labels", + templateGeneration, newPod("pod1", "node1", nil), + hash, false, }, } for _, test := range tests { - updated := IsPodUpdated(test.templateGeneration, test.pod) + updated := IsPodUpdated(test.templateGeneration, test.pod, test.hash) if updated != test.isUpdated { - t.Errorf("IsPodUpdated returned wrong value. Expected %t, got %t. TemplateGeneration: %d", test.isUpdated, updated, test.templateGeneration) + t.Errorf("%s: IsPodUpdated returned wrong value. Expected %t, got %t", test.test, test.isUpdated, updated) } } } -func TestGetPodTemplateWithGeneration(t *testing.T) { - generation := int64(1) - podTemplateSpec := v1.PodTemplateSpec{} - newPodTemplate := GetPodTemplateWithGeneration(podTemplateSpec, generation) - label, exists := newPodTemplate.ObjectMeta.Labels[extensions.DaemonSetTemplateGenerationKey] - if !exists || label != fmt.Sprint(generation) { - t.Errorf("Error in getting podTemplateSpec with label generation. Exists: %t, label: %s", exists, label) +func TestCreatePodTemplate(t *testing.T) { + tests := []struct { + templateGeneration int64 + hash string + expectUniqueLabel bool + }{ + {int64(1), "", false}, + {int64(2), "3242341807", true}, + } + for _, test := range tests { + podTemplateSpec := v1.PodTemplateSpec{} + newPodTemplate := CreatePodTemplate(podTemplateSpec, test.templateGeneration, test.hash) + val, exists := newPodTemplate.ObjectMeta.Labels[extensions.DaemonSetTemplateGenerationKey] + if !exists || val != fmt.Sprint(test.templateGeneration) { + t.Errorf("Expected podTemplateSpec to have generation label value: %d, got: %s", test.templateGeneration, val) + } + val, exists = newPodTemplate.ObjectMeta.Labels[extensions.DefaultDaemonSetUniqueLabelKey] + if test.expectUniqueLabel && (!exists || val != test.hash) { + t.Errorf("Expected podTemplateSpec to have hash label value: %s, got: %s", test.hash, val) + } + if !test.expectUniqueLabel && exists { + t.Errorf("Expected podTemplateSpec to have no hash label, got: %s", val) + } } } diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 0268572fcd..7e43d636ca 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -288,7 +288,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *extensions.Deployment, rsLis return nil, err } newRSTemplate := templateCopy.(v1.PodTemplateSpec) - podTemplateSpecHash := fmt.Sprintf("%d", deploymentutil.GetPodTemplateSpecHash(&newRSTemplate, d.Status.CollisionCount)) + podTemplateSpecHash := fmt.Sprintf("%d", controller.ComputeHash(&newRSTemplate, d.Status.CollisionCount)) newRSTemplate.Labels = labelsutil.CloneAndAddLabel(d.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash) // Add podTemplateHash label to selector. newRSSelector := labelsutil.CloneSelectorAndAddLabel(d.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash) diff --git a/pkg/controller/deployment/util/hash_test.go b/pkg/controller/deployment/util/hash_test.go index d1d944df6d..9020f92218 100644 --- a/pkg/controller/deployment/util/hash_test.go +++ b/pkg/controller/deployment/util/hash_test.go @@ -24,6 +24,7 @@ import ( "testing" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/controller" hashutil "k8s.io/kubernetes/pkg/util/hash" ) @@ -110,7 +111,7 @@ func TestPodTemplateSpecHash(t *testing.T) { specJson := strings.Replace(podSpec, "@@VERSION@@", strconv.Itoa(i), 1) spec := v1.PodTemplateSpec{} json.Unmarshal([]byte(specJson), &spec) - hash := GetPodTemplateSpecHash(&spec, nil) + hash := controller.ComputeHash(&spec, nil) if v, ok := seenHashes[hash]; ok { t.Errorf("Hash collision, old: %d new: %d", v, i) break @@ -139,6 +140,6 @@ func BenchmarkFnv(b *testing.B) { json.Unmarshal([]byte(podSpec), &spec) for i := 0; i < b.N; i++ { - GetPodTemplateSpecHash(&spec, nil) + controller.ComputeHash(&spec, nil) } } diff --git a/pkg/controller/deployment/util/pod_util.go b/pkg/controller/deployment/util/pod_util.go index 1954ef7076..9b54953d12 100644 --- a/pkg/controller/deployment/util/pod_util.go +++ b/pkg/controller/deployment/util/pod_util.go @@ -17,9 +17,6 @@ limitations under the License. package util import ( - "encoding/binary" - "hash/fnv" - "github.com/golang/glog" errorsutil "k8s.io/apimachinery/pkg/util/errors" @@ -28,23 +25,8 @@ import ( v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/client/retry" - hashutil "k8s.io/kubernetes/pkg/util/hash" ) -func GetPodTemplateSpecHash(template *v1.PodTemplateSpec, uniquifier *int64) uint32 { - podTemplateSpecHasher := fnv.New32a() - hashutil.DeepHashObject(podTemplateSpecHasher, *template) - - // Add uniquifier in the hash if it exists. - if uniquifier != nil { - uniquifierBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(uniquifierBytes, uint64(*uniquifier)) - podTemplateSpecHasher.Write(uniquifierBytes) - } - - return podTemplateSpecHasher.Sum32() -} - // TODO: use client library instead when it starts to support update retries // see https://github.com/kubernetes/kubernetes/issues/21479 type updatePodFunc func(pod *v1.Pod) error diff --git a/pkg/controller/deployment/util/pod_util_test.go b/pkg/controller/deployment/util/pod_util_test.go deleted file mode 100644 index c312a372fb..0000000000 --- a/pkg/controller/deployment/util/pod_util_test.go +++ /dev/null @@ -1,59 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "math" - "testing" - - "k8s.io/kubernetes/pkg/api/v1" -) - -func int64P(num int64) *int64 { - return &num -} - -func TestGetPodTemplateSpecHash(t *testing.T) { - tests := []struct { - name string - template *v1.PodTemplateSpec - collisionCount *int64 - otherCollisionCount *int64 - }{ - { - name: "simple", - template: &v1.PodTemplateSpec{}, - collisionCount: int64P(1), - otherCollisionCount: int64P(2), - }, - { - name: "using math.MaxInt64", - template: &v1.PodTemplateSpec{}, - collisionCount: nil, - otherCollisionCount: int64P(int64(math.MaxInt64)), - }, - } - - for _, test := range tests { - hash := GetPodTemplateSpecHash(test.template, test.collisionCount) - otherHash := GetPodTemplateSpecHash(test.template, test.otherCollisionCount) - - if hash == otherHash { - t.Errorf("expected different hashes but got the same: %d", hash) - } - } -} diff --git a/pkg/controller/deployment/util/replicaset_util.go b/pkg/controller/deployment/util/replicaset_util.go index a816e2522c..5a58cd2bc9 100644 --- a/pkg/controller/deployment/util/replicaset_util.go +++ b/pkg/controller/deployment/util/replicaset_util.go @@ -28,6 +28,7 @@ import ( unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/extensions/v1beta1" extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1" "k8s.io/kubernetes/pkg/client/retry" + "k8s.io/kubernetes/pkg/controller" labelsutil "k8s.io/kubernetes/pkg/util/labels" ) @@ -76,5 +77,5 @@ func GetReplicaSetHash(rs *extensions.ReplicaSet, uniquifier *int64) (string, er } rsTemplate := template.(v1.PodTemplateSpec) rsTemplate.Labels = labelsutil.CloneAndRemoveLabel(rsTemplate.Labels, extensions.DefaultDeploymentUniqueLabelKey) - return fmt.Sprintf("%d", GetPodTemplateSpecHash(&rsTemplate, uniquifier)), nil + return fmt.Sprintf("%d", controller.ComputeHash(&rsTemplate, uniquifier)), nil } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index ddf8342917..d14e8a46f3 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -86,6 +86,7 @@ func init() { rbac.NewRule("list", "watch").Groups(legacyGroup).Resources("nodes").RuleOrDie(), rbac.NewRule("list", "watch", "create", "delete", "patch").Groups(legacyGroup).Resources("pods").RuleOrDie(), rbac.NewRule("create").Groups(legacyGroup).Resources("pods/binding").RuleOrDie(), + rbac.NewRule("list", "watch", "create", "delete", "update", "patch").Groups(appsGroup).Resources("controllerrevisions").RuleOrDie(), eventsRule(), }, }) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml index 9a9e031bdd..894734ffd3 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml @@ -180,6 +180,17 @@ items: - pods/binding verbs: - create + - apiGroups: + - apps + resources: + - controllerrevisions + verbs: + - create + - delete + - list + - patch + - update + - watch - apiGroups: - "" resources: