Fix DaemonSet controller cache mutation

Add dsStoreSynced so we also wait on this cache when starting the
DaemonSetController.

Switch to using a fake clientset in the unit tests.

Fix TestNumberReadyStatus so it doesn't expect the cache to be mutated.
pull/6/head
Andy Goldstein 2016-12-19 16:39:23 -05:00
parent def8022729
commit febc641cee
3 changed files with 70 additions and 47 deletions

View File

@ -16,6 +16,7 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
@ -51,11 +52,11 @@ go_test(
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/restclient:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/testing/core:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/informers:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/securitycontext:go_default_library",
"//pkg/util/wait:go_default_library",
],
)

View File

@ -23,6 +23,7 @@ import (
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
@ -78,6 +79,9 @@ type DaemonSetsController struct {
podStore *cache.StoreToPodLister
// A store of nodes
nodeStore *cache.StoreToNodeLister
// dsStoreSynced returns true if the daemonset store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
dsStoreSynced cache.InformerSynced
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced cache.InformerSynced
@ -142,6 +146,7 @@ func NewDaemonSetsController(daemonSetInformer informers.DaemonSetInformer, podI
DeleteFunc: dsc.deleteDaemonset,
})
dsc.dsStore = daemonSetInformer.Lister()
dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
// more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
@ -191,7 +196,7 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
glog.Infof("Starting Daemon Sets controller manager")
if !cache.WaitForCacheSync(stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced) {
if !cache.WaitForCacheSync(stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.dsStoreSynced) {
return
}
@ -539,19 +544,26 @@ func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds
return nil
}
clone, err := api.Scheme.DeepCopy(ds)
if err != nil {
return err
}
toUpdate := clone.(*extensions.DaemonSet)
var updateErr, getErr error
for i := 0; i < StatusUpdateRetries; i++ {
ds.Status.DesiredNumberScheduled = int32(desiredNumberScheduled)
ds.Status.CurrentNumberScheduled = int32(currentNumberScheduled)
ds.Status.NumberMisscheduled = int32(numberMisscheduled)
ds.Status.NumberReady = int32(numberReady)
toUpdate.Status.DesiredNumberScheduled = int32(desiredNumberScheduled)
toUpdate.Status.CurrentNumberScheduled = int32(currentNumberScheduled)
toUpdate.Status.NumberMisscheduled = int32(numberMisscheduled)
toUpdate.Status.NumberReady = int32(numberReady)
if _, updateErr = dsClient.UpdateStatus(ds); updateErr == nil {
if _, updateErr = dsClient.UpdateStatus(toUpdate); updateErr == nil {
return nil
}
// Update the set with the latest resource version for the next poll
if ds, getErr = dsClient.Get(ds.Name, metav1.GetOptions{}); getErr != nil {
if toUpdate, getErr = dsClient.Get(ds.Name, metav1.GetOptions{}); getErr != nil {
// If the GET fails we can't trust status.Replicas anymore. This error
// is bound to be more interesting than the update failure.
return getErr

View File

@ -27,12 +27,12 @@ import (
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/util/wait"
)
var (
@ -137,18 +137,18 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num
}
}
func newTestController() (*DaemonSetsController, *controller.FakePodControl) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
func newTestController(initialObjects ...runtime.Object) (*DaemonSetsController, *controller.FakePodControl, *fake.Clientset) {
clientset := fake.NewSimpleClientset(initialObjects...)
informerFactory := informers.NewSharedInformerFactory(clientset, nil, controller.NoResyncPeriodFunc())
manager := NewDaemonSetsController(informerFactory.DaemonSets(), informerFactory.Pods(), informerFactory.Nodes(), clientset, 0)
informerFactory.Start(wait.NeverStop)
manager.podStoreSynced = alwaysReady
manager.nodeStoreSynced = alwaysReady
manager.dsStoreSynced = alwaysReady
podControl := &controller.FakePodControl{}
manager.podControl = podControl
return manager, podControl
return manager, podControl, clientset
}
func validateSyncDaemonSets(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes int) {
@ -170,7 +170,7 @@ func syncAndValidateDaemonSets(t *testing.T, manager *DaemonSetsController, ds *
}
func TestDeleteFinalStateUnknown(t *testing.T) {
manager, _ := newTestController()
manager, _, _ := newTestController()
addNodes(manager.nodeStore.Store, 0, 1, nil)
ds := newDaemonSet("foo")
// DeletedFinalStateUnknown should queue the embedded DS if found.
@ -183,7 +183,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
// DaemonSets without node selectors should launch pods on every node.
func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
addNodes(manager.nodeStore.Store, 0, 5, nil)
ds := newDaemonSet("foo")
manager.dsStore.Add(ds)
@ -192,7 +192,7 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
// DaemonSets should do nothing if there aren't any nodes
func TestNoNodesDoesNothing(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
ds := newDaemonSet("foo")
manager.dsStore.Add(ds)
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
@ -201,7 +201,7 @@ func TestNoNodesDoesNothing(t *testing.T) {
// DaemonSets without node selectors should launch on a single node in a
// single node cluster.
func TestOneNodeDaemonLaunchesPod(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
manager.nodeStore.Add(newNode("only-node", nil))
ds := newDaemonSet("foo")
manager.dsStore.Add(ds)
@ -210,7 +210,7 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) {
// DaemonSets should place onto NotReady nodes
func TestNotReadNodeDaemonDoesNotLaunchPod(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
node := newNode("not-ready", nil)
node.Status.Conditions = []v1.NodeCondition{
{Type: v1.NodeReady, Status: v1.ConditionFalse},
@ -223,7 +223,7 @@ func TestNotReadNodeDaemonDoesNotLaunchPod(t *testing.T) {
// DaemonSets should not place onto OutOfDisk nodes
func TestOutOfDiskNodeDaemonDoesNotLaunchPod(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
node := newNode("not-enough-disk", nil)
node.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}}
manager.nodeStore.Add(node)
@ -254,7 +254,7 @@ func allocatableResources(memory, cpu string) v1.ResourceList {
// DaemonSets should not place onto nodes with insufficient free resource
func TestInsufficentCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) {
podSpec := resourcePodSpec("too-much-mem", "75M", "75m")
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
node := newNode("too-much-mem", nil)
node.Status.Allocatable = allocatableResources("100M", "200m")
manager.nodeStore.Add(node)
@ -269,7 +269,7 @@ func TestInsufficentCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) {
func TestSufficentCapacityWithTerminatedPodsDaemonLaunchesPod(t *testing.T) {
podSpec := resourcePodSpec("too-much-mem", "75M", "75m")
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
node := newNode("too-much-mem", nil)
node.Status.Allocatable = allocatableResources("100M", "200m")
manager.nodeStore.Add(node)
@ -286,7 +286,7 @@ func TestSufficentCapacityWithTerminatedPodsDaemonLaunchesPod(t *testing.T) {
// DaemonSets should place onto nodes with sufficient free resource
func TestSufficentCapacityNodeDaemonLaunchesPod(t *testing.T) {
podSpec := resourcePodSpec("not-too-much-mem", "75M", "75m")
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
node := newNode("not-too-much-mem", nil)
node.Status.Allocatable = allocatableResources("200M", "200m")
manager.nodeStore.Add(node)
@ -302,7 +302,7 @@ func TestSufficentCapacityNodeDaemonLaunchesPod(t *testing.T) {
// DaemonSets not take any actions when being deleted
func TestDontDoAnythingIfBeingDeleted(t *testing.T) {
podSpec := resourcePodSpec("not-too-much-mem", "75M", "75m")
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
node := newNode("not-too-much-mem", nil)
node.Status.Allocatable = allocatableResources("200M", "200m")
manager.nodeStore.Add(node)
@ -327,7 +327,7 @@ func TestPortConflictNodeDaemonDoesNotLaunchPod(t *testing.T) {
}},
}},
}
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
node := newNode("port-conflict", nil)
manager.nodeStore.Add(node)
manager.podStore.Indexer.Add(&v1.Pod{
@ -353,7 +353,7 @@ func TestPortConflictWithSameDaemonPodDoesNotDeletePod(t *testing.T) {
}},
}},
}
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
node := newNode("port-conflict", nil)
manager.nodeStore.Add(node)
manager.podStore.Indexer.Add(&v1.Pod{
@ -387,7 +387,7 @@ func TestNoPortConflictNodeDaemonLaunchesPod(t *testing.T) {
}},
}},
}
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
node := newNode("no-port-conflict", nil)
manager.nodeStore.Add(node)
manager.podStore.Indexer.Add(&v1.Pod{
@ -403,7 +403,7 @@ func TestNoPortConflictNodeDaemonLaunchesPod(t *testing.T) {
//
// issue https://github.com/kubernetes/kubernetes/pull/23223
func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
manager.nodeStore.Store.Add(newNode("node1", nil))
// Create pod not controlled by a daemonset.
manager.podStore.Indexer.Add(&v1.Pod{
@ -436,7 +436,7 @@ func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) {
// Controller should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods.
func TestDealsWithExistingPods(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
addNodes(manager.nodeStore.Store, 0, 5, nil)
addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Indexer, "node-2", simpleDaemonSetLabel, 2)
@ -449,7 +449,7 @@ func TestDealsWithExistingPods(t *testing.T) {
// Daemon with node selector should launch pods on nodes matching selector.
func TestSelectorDaemonLaunchesPods(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
addNodes(manager.nodeStore.Store, 0, 4, nil)
addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
daemon := newDaemonSet("foo")
@ -460,7 +460,7 @@ func TestSelectorDaemonLaunchesPods(t *testing.T) {
// Daemon with node selector should delete pods from nodes that do not satisfy selector.
func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
addNodes(manager.nodeStore.Store, 0, 5, nil)
addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel)
addPods(manager.podStore.Indexer, "node-0", simpleDaemonSetLabel2, 2)
@ -475,7 +475,7 @@ func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) {
// DaemonSet with node selector should launch pods on nodes matching selector, but also deal with existing pods on nodes.
func TestSelectorDaemonDealsWithExistingPods(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
addNodes(manager.nodeStore.Store, 0, 5, nil)
addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel)
addPods(manager.podStore.Indexer, "node-0", simpleDaemonSetLabel, 1)
@ -494,7 +494,7 @@ func TestSelectorDaemonDealsWithExistingPods(t *testing.T) {
// DaemonSet with node selector which does not match any node labels should not launch pods.
func TestBadSelectorDaemonDoesNothing(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
addNodes(manager.nodeStore.Store, 0, 4, nil)
addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
ds := newDaemonSet("foo")
@ -505,7 +505,7 @@ func TestBadSelectorDaemonDoesNothing(t *testing.T) {
// DaemonSet with node name should launch pod on node with corresponding name.
func TestNameDaemonSetLaunchesPods(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
addNodes(manager.nodeStore.Store, 0, 5, nil)
ds := newDaemonSet("foo")
ds.Spec.Template.Spec.NodeName = "node-0"
@ -515,7 +515,7 @@ func TestNameDaemonSetLaunchesPods(t *testing.T) {
// DaemonSet with node name that does not exist should not launch pods.
func TestBadNameDaemonSetDoesNothing(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
addNodes(manager.nodeStore.Store, 0, 5, nil)
ds := newDaemonSet("foo")
ds.Spec.Template.Spec.NodeName = "node-10"
@ -525,7 +525,7 @@ func TestBadNameDaemonSetDoesNothing(t *testing.T) {
// DaemonSet with node selector, and node name, matching a node, should launch a pod on the node.
func TestNameAndSelectorDaemonSetLaunchesPods(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
addNodes(manager.nodeStore.Store, 0, 4, nil)
addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
ds := newDaemonSet("foo")
@ -537,7 +537,7 @@ func TestNameAndSelectorDaemonSetLaunchesPods(t *testing.T) {
// DaemonSet with node selector that matches some nodes, and node name that matches a different node, should do nothing.
func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
addNodes(manager.nodeStore.Store, 0, 4, nil)
addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
ds := newDaemonSet("foo")
@ -549,7 +549,7 @@ func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) {
// Daemon with node affinity should launch pods on nodes matching affinity.
func TestNodeAffinityDaemonLaunchesPods(t *testing.T) {
manager, podControl := newTestController()
manager, podControl, _ := newTestController()
addNodes(manager.nodeStore.Store, 0, 4, nil)
addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
daemon := newDaemonSet("foo")
@ -575,16 +575,26 @@ func TestNodeAffinityDaemonLaunchesPods(t *testing.T) {
}
func TestNumberReadyStatus(t *testing.T) {
manager, podControl := newTestController()
daemon := newDaemonSet("foo")
manager, podControl, clientset := newTestController()
var updated *extensions.DaemonSet
clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
if action.GetSubresource() != "status" {
return false, nil, nil
}
if u, ok := action.(core.UpdateAction); ok {
updated = u.GetObject().(*extensions.DaemonSet)
}
return false, nil, nil
})
addNodes(manager.nodeStore.Store, 0, 2, simpleNodeLabel)
addPods(manager.podStore.Indexer, "node-0", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel, 1)
daemon := newDaemonSet("foo")
manager.dsStore.Add(daemon)
syncAndValidateDaemonSets(t, manager, daemon, podControl, 0, 0)
if daemon.Status.NumberReady != 0 {
t.Errorf("Wrong daemon %s status: %v", daemon.Name, daemon.Status)
if updated.Status.NumberReady != 0 {
t.Errorf("Wrong daemon %s status: %v", updated.Name, updated.Status)
}
selector, _ := metav1.LabelSelectorAsSelector(daemon.Spec.Selector)
@ -595,7 +605,7 @@ func TestNumberReadyStatus(t *testing.T) {
}
syncAndValidateDaemonSets(t, manager, daemon, podControl, 0, 0)
if daemon.Status.NumberReady != 2 {
t.Errorf("Wrong daemon %s status: %v", daemon.Name, daemon.Status)
if updated.Status.NumberReady != 2 {
t.Errorf("Wrong daemon %s status: %v", updated.Name, updated.Status)
}
}