Merge pull request #19973 from bprashanth/replication_controllers

Sync pods with store before starting controllers
pull/6/head
Brendan Burns 2016-01-26 12:47:51 -08:00
commit f919c3bb66
5 changed files with 109 additions and 5 deletions

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
)
@ -418,11 +419,48 @@ func (s ActivePods) Less(i, j int) bool {
func FilterActivePods(pods []api.Pod) []*api.Pod {
var result []*api.Pod
for i := range pods {
if api.PodSucceeded != pods[i].Status.Phase &&
api.PodFailed != pods[i].Status.Phase &&
pods[i].DeletionTimestamp == nil {
result = append(result, &pods[i])
p := pods[i]
if api.PodSucceeded != p.Status.Phase &&
api.PodFailed != p.Status.Phase &&
p.DeletionTimestamp == nil {
result = append(result, &p)
} else {
glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v",
p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp)
}
}
return result
}
// SyncAllPodsWithStore lists all pods and inserts them into the given store.
// Though this function is written in a generic manner, it is only used by the
// controllers for a specific purpose, to synchronously populate the store
// with the first batch of pods that would otherwise be sent by the Informer.
// Doing this avoids more complicated forms of synchronization with the
// Informer, though it also means that the controller calling this function
// will receive "OnUpdate" events for all the pods in the store, instead of
// "OnAdd". This should be ok, since most controllers are level triggered
// and make decisions based on the contents of the store.
//
// TODO: Extend this logic to load arbitrary local state for the controllers
// instead of just pods.
func SyncAllPodsWithStore(kubeClient client.Interface, store cache.Store) {
var allPods *api.PodList
var err error
listOptions := api.ListOptions{LabelSelector: labels.Everything(), FieldSelector: fields.Everything()}
for {
if allPods, err = kubeClient.Pods(api.NamespaceAll).List(listOptions); err != nil {
glog.Warningf("Retrying pod list: %v", err)
continue
}
break
}
pods := []interface{}{}
for i := range allPods.Items {
p := allPods.Items[i]
glog.V(4).Infof("Initializing store with pod %v/%v", p.Namespace, p.Name)
pods = append(pods, &p)
}
store.Replace(pods, allPods.ResourceVersion)
return
}

View File

@ -177,6 +177,8 @@ func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controlle
// Run begins watching and syncing daemon sets.
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash()
glog.Infof("Starting Daemon Sets controller manager")
controller.SyncAllPodsWithStore(dsc.kubeClient, dsc.podStore.Store)
go dsc.dsController.Run(stopCh)
go dsc.podController.Run(stopCh)
go dsc.nodeController.Run(stopCh)
@ -461,7 +463,6 @@ func storeDaemonSetStatus(dsClient client.DaemonSetInterface, ds *extensions.Dae
if ds.Status.DesiredNumberScheduled == desiredNumberScheduled && ds.Status.CurrentNumberScheduled == currentNumberScheduled && ds.Status.NumberMisscheduled == numberMisscheduled {
return nil
}
var updateErr, getErr error
for i := 0; i <= StatusUpdateRetries; i++ {
ds.Status.DesiredNumberScheduled = desiredNumberScheduled

View File

@ -18,6 +18,7 @@ package daemon
import (
"fmt"
"net/http/httptest"
"testing"
"k8s.io/kubernetes/pkg/api"
@ -27,7 +28,9 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
)
var (
@ -342,3 +345,39 @@ func TestDSManagerNotReady(t *testing.T) {
manager.podStoreSynced = alwaysReady
syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0)
}
func TestDSManagerInit(t *testing.T) {
// Insert a stable daemon set and make sure we don't create an extra pod
// for the one node which already has a daemon after a simulated restart.
ds := newDaemonSet("test")
ds.Status = extensions.DaemonSetStatus{
CurrentNumberScheduled: 1,
NumberMisscheduled: 0,
DesiredNumberScheduled: 1,
}
nodeName := "only-node"
podList := &api.PodList{
Items: []api.Pod{
*newPod("podname", nodeName, simpleDaemonSetLabel),
}}
response := runtime.EncodeOrDie(testapi.Default.Codec(), podList)
fakeHandler := utiltesting.FakeHandler{
StatusCode: 200,
ResponseBody: response,
}
testServer := httptest.NewServer(&fakeHandler)
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, GroupVersion: testapi.Default.GroupVersion()})
manager := NewDaemonSetsController(client, controller.NoResyncPeriodFunc)
manager.dsStore.Add(ds)
manager.nodeStore.Add(newNode(nodeName, nil))
manager.podStoreSynced = alwaysReady
controller.SyncAllPodsWithStore(manager.kubeClient, manager.podStore.Store)
fakePodControl := &controller.FakePodControl{}
manager.podControl = fakePodControl
manager.syncHandler(getKey(ds, t))
validateSyncDaemonSets(t, fakePodControl, 0, 0)
}

View File

@ -182,6 +182,8 @@ func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) {
// Run begins watching and syncing.
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash()
glog.Infof("Starting RC Manager")
controller.SyncAllPodsWithStore(rm.kubeClient, rm.podStore.Store)
go rm.rcController.Run(stopCh)
go rm.podController.Run(stopCh)
for i := 0; i < workers; i++ {

View File

@ -894,3 +894,27 @@ func TestOverlappingRCs(t *testing.T) {
}
}
}
func TestRCManagerInit(t *testing.T) {
// Insert a stable rc into the replication manager's store and make sure
// it syncs pods with the apiserver before making any decisions.
rc := newReplicationController(2)
response := runtime.EncodeOrDie(testapi.Default.Codec(), newPodList(nil, 2, api.PodRunning, rc))
fakeHandler := utiltesting.FakeHandler{
StatusCode: 200,
ResponseBody: response,
}
testServer := httptest.NewServer(&fakeHandler)
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, GroupVersion: testapi.Default.GroupVersion()})
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.rcStore.Store.Add(rc)
manager.podStoreSynced = alwaysReady
controller.SyncAllPodsWithStore(manager.kubeClient, manager.podStore.Store)
fakePodControl := &controller.FakePodControl{}
manager.podControl = fakePodControl
manager.syncReplicationController(getKey(rc, t))
validateSyncReplication(t, fakePodControl, 0, 0)
}