kubelet: fix all sources ready condition

The current implementation considers a source seen when it receives a SET at
kubelet/config/config.go. However, the main kubelet sync loop may not have
received the pod update from the source via the channel. This change ensures
that kubelet would consider all sources are ready only after the sync loop has
seen all the sources.
pull/6/head
Yu-Ju Hong 2015-10-05 18:20:57 -07:00
parent 608244fbb0
commit 5abdfcdfe6
3 changed files with 25 additions and 10 deletions

View File

@ -90,14 +90,14 @@ func (c *PodConfig) Channel(source string) chan<- interface{} {
return c.mux.Channel(source)
}
// SeenAllSources returns true if this config has received a SET
// message from all configured sources, false otherwise.
func (c *PodConfig) SeenAllSources() bool {
// SeenAllSources returns true if seenSources contains all sources in the
// config, and also this config has received a SET message from each source.
func (c *PodConfig) SeenAllSources(seenSources sets.String) bool {
if c.pods == nil {
return false
}
glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), c.pods.sourcesSeen)
return c.pods.seenSources(c.sources.List()...)
glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), seenSources)
return seenSources.HasAll(c.sources.List()...) && c.pods.seenSources(c.sources.List()...)
}
// Updates returns a channel of updates to the configuration, properly denormalized.

View File

@ -118,7 +118,7 @@ type SyncHandler interface {
HandlePodCleanups() error
}
type SourcesReadyFn func() bool
type SourcesReadyFn func(sourcesSeen sets.String) bool
// Wait for the container runtime to be up with a timeout.
func waitUntilRuntimeIsUp(cr kubecontainer.Runtime, timeout time.Duration) error {
@ -418,6 +418,7 @@ func NewMainKubelet(
klet.backOff = util.NewBackOff(resyncInterval, maxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity)
klet.sourcesSeen = sets.NewString()
return klet, nil
}
@ -441,6 +442,9 @@ type Kubelet struct {
podWorkers PodWorkers
resyncInterval time.Duration
sourcesReady SourcesReadyFn
// sourcesSeen records the sources seen by kubelet. This set is not thread
// safe and should only be access by the main kubelet syncloop goroutine.
sourcesSeen sets.String
podManager podManager
@ -597,6 +601,15 @@ type Kubelet struct {
daemonEndpoints *api.NodeDaemonEndpoints
}
func (kl *Kubelet) allSourcesReady() bool {
// Make a copy of the sourcesSeen list because it's not thread-safe.
return kl.sourcesReady(sets.NewString(kl.sourcesSeen.List()...))
}
func (kl *Kubelet) addSource(source string) {
kl.sourcesSeen.Insert(source)
}
// getRootDir returns the full path to the directory under which kubelet can
// store data. These functions are useful to pass interfaces to other modules
// that may need to know where to write data without getting a whole kubelet
@ -1618,7 +1631,7 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods []*api.
}
func (kl *Kubelet) deletePod(uid types.UID) error {
if !kl.sourcesReady() {
if !kl.allSourcesReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
return fmt.Errorf("skipping delete because sources aren't ready yet")
@ -1906,7 +1919,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
// housekeepingMinimumPeriod.
// TODO (#13418): Investigate whether we can/should spawn a dedicated
// goroutine for housekeeping
if !kl.sourcesReady() {
if !kl.allSourcesReady() {
// If the sources aren't ready, skip housekeeping, as we may
// accidentally delete pods from unready sources.
glog.V(4).Infof("Skipping cleanup, sources aren't ready yet.")
@ -1930,6 +1943,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandl
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
kl.addSource(u.Source)
switch u.Op {
case ADD:
glog.V(2).Infof("SyncLoop (ADD): %q", kubeletUtil.FormatPodNames(u.Pods))

View File

@ -51,6 +51,7 @@ import (
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/bandwidth"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
_ "k8s.io/kubernetes/pkg/volume/host_path"
@ -102,7 +103,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
if err := os.MkdirAll(kubelet.rootDirectory, 0750); err != nil {
t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err)
}
kubelet.sourcesReady = func() bool { return true }
kubelet.sourcesReady = func(_ sets.String) bool { return true }
kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{}
kubelet.nodeLister = testNodeLister{}
@ -393,7 +394,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorApiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorApiv2.FsInfo{}, nil)
kubelet := testKubelet.kubelet
kubelet.sourcesReady = func() bool { return ready }
kubelet.sourcesReady = func(_ sets.String) bool { return ready }
fakeRuntime.PodList = []*kubecontainer.Pod{
{