diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index ef1bca9df8..e276b69d15 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" ) @@ -418,11 +419,48 @@ func (s ActivePods) Less(i, j int) bool { func FilterActivePods(pods []api.Pod) []*api.Pod { var result []*api.Pod for i := range pods { - if api.PodSucceeded != pods[i].Status.Phase && - api.PodFailed != pods[i].Status.Phase && - pods[i].DeletionTimestamp == nil { - result = append(result, &pods[i]) + p := pods[i] + if api.PodSucceeded != p.Status.Phase && + api.PodFailed != p.Status.Phase && + p.DeletionTimestamp == nil { + result = append(result, &p) + } else { + glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", + p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp) } } return result } + +// SyncAllPodsWithStore lists all pods and inserts them into the given store. +// Though this function is written in a generic manner, it is only used by the +// controllers for a specific purpose, to synchronously populate the store +// with the first batch of pods that would otherwise be sent by the Informer. +// Doing this avoids more complicated forms of synchronization with the +// Informer, though it also means that the controller calling this function +// will receive "OnUpdate" events for all the pods in the store, instead of +// "OnAdd". This should be ok, since most controllers are level triggered +// and make decisions based on the contents of the store. +// +// TODO: Extend this logic to load arbitrary local state for the controllers +// instead of just pods. +func SyncAllPodsWithStore(kubeClient client.Interface, store cache.Store) { + var allPods *api.PodList + var err error + listOptions := api.ListOptions{LabelSelector: labels.Everything(), FieldSelector: fields.Everything()} + for { + if allPods, err = kubeClient.Pods(api.NamespaceAll).List(listOptions); err != nil { + glog.Warningf("Retrying pod list: %v", err) + continue + } + break + } + pods := []interface{}{} + for i := range allPods.Items { + p := allPods.Items[i] + glog.V(4).Infof("Initializing store with pod %v/%v", p.Namespace, p.Name) + pods = append(pods, &p) + } + store.Replace(pods, allPods.ResourceVersion) + return +} diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index 41cf4af0d0..0c4907ecc3 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -177,6 +177,8 @@ func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controlle // Run begins watching and syncing daemon sets. func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { defer util.HandleCrash() + glog.Infof("Starting Daemon Sets controller manager") + controller.SyncAllPodsWithStore(dsc.kubeClient, dsc.podStore.Store) go dsc.dsController.Run(stopCh) go dsc.podController.Run(stopCh) go dsc.nodeController.Run(stopCh) @@ -461,7 +463,6 @@ func storeDaemonSetStatus(dsClient client.DaemonSetInterface, ds *extensions.Dae if ds.Status.DesiredNumberScheduled == desiredNumberScheduled && ds.Status.CurrentNumberScheduled == currentNumberScheduled && ds.Status.NumberMisscheduled == numberMisscheduled { return nil } - var updateErr, getErr error for i := 0; i <= StatusUpdateRetries; i++ { ds.Status.DesiredNumberScheduled = desiredNumberScheduled diff --git a/pkg/controller/daemon/controller_test.go b/pkg/controller/daemon/controller_test.go index d86505c8ef..072e9e0b32 100644 --- a/pkg/controller/daemon/controller_test.go +++ b/pkg/controller/daemon/controller_test.go @@ -18,6 +18,7 @@ package daemon import ( "fmt" + "net/http/httptest" "testing" "k8s.io/kubernetes/pkg/api" @@ -27,7 +28,9 @@ import ( "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/securitycontext" + utiltesting "k8s.io/kubernetes/pkg/util/testing" ) var ( @@ -342,3 +345,39 @@ func TestDSManagerNotReady(t *testing.T) { manager.podStoreSynced = alwaysReady syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) } + +func TestDSManagerInit(t *testing.T) { + // Insert a stable daemon set and make sure we don't create an extra pod + // for the one node which already has a daemon after a simulated restart. + ds := newDaemonSet("test") + ds.Status = extensions.DaemonSetStatus{ + CurrentNumberScheduled: 1, + NumberMisscheduled: 0, + DesiredNumberScheduled: 1, + } + nodeName := "only-node" + podList := &api.PodList{ + Items: []api.Pod{ + *newPod("podname", nodeName, simpleDaemonSetLabel), + }} + response := runtime.EncodeOrDie(testapi.Default.Codec(), podList) + fakeHandler := utiltesting.FakeHandler{ + StatusCode: 200, + ResponseBody: response, + } + testServer := httptest.NewServer(&fakeHandler) + // TODO: Uncomment when fix #19254 + // defer testServer.Close() + + client := client.NewOrDie(&client.Config{Host: testServer.URL, GroupVersion: testapi.Default.GroupVersion()}) + manager := NewDaemonSetsController(client, controller.NoResyncPeriodFunc) + manager.dsStore.Add(ds) + manager.nodeStore.Add(newNode(nodeName, nil)) + manager.podStoreSynced = alwaysReady + controller.SyncAllPodsWithStore(manager.kubeClient, manager.podStore.Store) + + fakePodControl := &controller.FakePodControl{} + manager.podControl = fakePodControl + manager.syncHandler(getKey(ds, t)) + validateSyncDaemonSets(t, fakePodControl, 0, 0) +} diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 33b2385414..24f50bde31 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -182,6 +182,8 @@ func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) { // Run begins watching and syncing. func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { defer util.HandleCrash() + glog.Infof("Starting RC Manager") + controller.SyncAllPodsWithStore(rm.kubeClient, rm.podStore.Store) go rm.rcController.Run(stopCh) go rm.podController.Run(stopCh) for i := 0; i < workers; i++ { diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 796b64d365..644a55d747 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -894,3 +894,27 @@ func TestOverlappingRCs(t *testing.T) { } } } + +func TestRCManagerInit(t *testing.T) { + // Insert a stable rc into the replication manager's store and make sure + // it syncs pods with the apiserver before making any decisions. + rc := newReplicationController(2) + response := runtime.EncodeOrDie(testapi.Default.Codec(), newPodList(nil, 2, api.PodRunning, rc)) + fakeHandler := utiltesting.FakeHandler{ + StatusCode: 200, + ResponseBody: response, + } + testServer := httptest.NewServer(&fakeHandler) + // TODO: Uncomment when fix #19254 + // defer testServer.Close() + + client := client.NewOrDie(&client.Config{Host: testServer.URL, GroupVersion: testapi.Default.GroupVersion()}) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) + manager.rcStore.Store.Add(rc) + manager.podStoreSynced = alwaysReady + controller.SyncAllPodsWithStore(manager.kubeClient, manager.podStore.Store) + fakePodControl := &controller.FakePodControl{} + manager.podControl = fakePodControl + manager.syncReplicationController(getKey(rc, t)) + validateSyncReplication(t, fakePodControl, 0, 0) +}