From bbe82a26888431b49ff79ec961c17f87f950f282 Mon Sep 17 00:00:00 2001 From: Matthew Wong Date: Wed, 26 Apr 2017 18:23:20 -0400 Subject: [PATCH] Ensure desired state of world populator runs before volume reconstructor --- pkg/kubelet/volumemanager/populator/BUILD | 1 + .../desired_state_of_world_populator.go | 32 +++++++++++++++-- pkg/kubelet/volumemanager/reconciler/BUILD | 3 -- .../volumemanager/reconciler/reconciler.go | 35 ++++++++++++------- .../reconciler/reconciler_test.go | 14 +++++--- pkg/kubelet/volumemanager/volume_manager.go | 31 ++++++++-------- 6 files changed, 79 insertions(+), 37 deletions(-) diff --git a/pkg/kubelet/volumemanager/populator/BUILD b/pkg/kubelet/volumemanager/populator/BUILD index 8caac5b67d..e4596291bd 100644 --- a/pkg/kubelet/volumemanager/populator/BUILD +++ b/pkg/kubelet/volumemanager/populator/BUILD @@ -16,6 +16,7 @@ go_library( "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/status:go_default_library", diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index 613cc9d2d9..d0dd565fb5 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/status" @@ -48,13 +49,21 @@ import ( // if it has volumes. It also verifies that the pods in the desired state of the // world cache still exist, if not, it removes them. type DesiredStateOfWorldPopulator interface { - Run(stopCh <-chan struct{}) + Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) // ReprocessPod removes the specified pod from the list of processedPods // (if it exists) forcing it to be reprocessed. This is required to enable // remounting volumes on pod updates (volumes like Downward API volumes // depend on this behavior to ensure volume content is updated). ReprocessPod(podName volumetypes.UniquePodName) + + // HasAddedPods returns whether the populator has looped through the list + // of active pods and added them to the desired state of the world cache, + // at a time after sources are all ready, at least once. It does not + // return true before sources are all ready because before then, there is + // a chance many or all pods are missing from the list of active pods and + // so few to none will have been added. + HasAddedPods() bool } // NewDesiredStateOfWorldPopulator returns a new instance of @@ -86,6 +95,8 @@ func NewDesiredStateOfWorldPopulator( processedPods: make(map[volumetypes.UniquePodName]bool)}, kubeContainerRuntime: kubeContainerRuntime, keepTerminatedPodVolumes: keepTerminatedPodVolumes, + hasAddedPods: false, + hasAddedPodsLock: sync.RWMutex{}, } } @@ -100,6 +111,8 @@ type desiredStateOfWorldPopulator struct { kubeContainerRuntime kubecontainer.Runtime timeOfLastGetPodStatus time.Time keepTerminatedPodVolumes bool + hasAddedPods bool + hasAddedPodsLock sync.RWMutex } type processedPods struct { @@ -107,7 +120,16 @@ type processedPods struct { sync.RWMutex } -func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) { +func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { + // Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly + wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) { + done := sourcesReady.AllReady() + dswp.populatorLoopFunc()() + return done, nil + }, stopCh) + dswp.hasAddedPodsLock.Lock() + dswp.hasAddedPods = true + dswp.hasAddedPodsLock.Unlock() wait.Until(dswp.populatorLoopFunc(), dswp.loopSleepDuration, stopCh) } @@ -116,6 +138,12 @@ func (dswp *desiredStateOfWorldPopulator) ReprocessPod( dswp.deleteProcessedPod(podName) } +func (dswp *desiredStateOfWorldPopulator) HasAddedPods() bool { + dswp.hasAddedPodsLock.RLock() + defer dswp.hasAddedPodsLock.RUnlock() + return dswp.hasAddedPods +} + func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() { return func() { dswp.findAndAddNewPods() diff --git a/pkg/kubelet/volumemanager/reconciler/BUILD b/pkg/kubelet/volumemanager/reconciler/BUILD index dfd4efc413..b3fef07d69 100644 --- a/pkg/kubelet/volumemanager/reconciler/BUILD +++ b/pkg/kubelet/volumemanager/reconciler/BUILD @@ -16,7 +16,6 @@ go_library( "//cmd/kubelet/app/options:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/volumemanager/cache:go_default_library", "//pkg/util:go_default_library", "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", @@ -42,7 +41,6 @@ go_test( deps = [ "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library", - "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/volumemanager/cache:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", @@ -53,7 +51,6 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/testing:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index ebfdd412f8..32ce326195 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -32,7 +32,6 @@ import ( "k8s.io/kubernetes/cmd/kubelet/app/options" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" @@ -59,7 +58,7 @@ type Reconciler interface { // If attach/detach management is enabled, the manager will also check if // volumes that should be attached are attached and volumes that should // be detached are detached and trigger attach/detach operations as needed. - Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) + Run(stopCh <-chan struct{}) // StatesHasBeenSynced returns true only after syncStates process starts to sync // states at least once after kubelet starts @@ -80,6 +79,9 @@ type Reconciler interface { // nodeName - the Name for this node, used by Attach and Detach methods // desiredStateOfWorld - cache containing the desired state of the world // actualStateOfWorld - cache containing the actual state of the world +// populatorHasAddedPods - checker for whether the populator has finished +// adding pods to the desiredStateOfWorld cache at least once after sources +// are all ready (before sources are ready, pods are probably missing) // operationExecutor - used to trigger attach/detach/mount/unmount operations // safely (prevents more than one operation from being triggered on the same // volume) @@ -94,6 +96,7 @@ func NewReconciler( nodeName types.NodeName, desiredStateOfWorld cache.DesiredStateOfWorld, actualStateOfWorld cache.ActualStateOfWorld, + populatorHasAddedPods func() bool, operationExecutor operationexecutor.OperationExecutor, mounter mount.Interface, volumePluginMgr *volumepkg.VolumePluginMgr, @@ -107,6 +110,7 @@ func NewReconciler( nodeName: nodeName, desiredStateOfWorld: desiredStateOfWorld, actualStateOfWorld: actualStateOfWorld, + populatorHasAddedPods: populatorHasAddedPods, operationExecutor: operationExecutor, mounter: mounter, volumePluginMgr: volumePluginMgr, @@ -124,6 +128,7 @@ type reconciler struct { nodeName types.NodeName desiredStateOfWorld cache.DesiredStateOfWorld actualStateOfWorld cache.ActualStateOfWorld + populatorHasAddedPods func() bool operationExecutor operationexecutor.OperationExecutor mounter mount.Interface volumePluginMgr *volumepkg.VolumePluginMgr @@ -131,21 +136,27 @@ type reconciler struct { timeOfLastSync time.Time } -func (rc *reconciler) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { - wait.Until(rc.reconciliationLoopFunc(sourcesReady), rc.loopSleepDuration, stopCh) +func (rc *reconciler) Run(stopCh <-chan struct{}) { + // Wait for the populator to indicate that it has actually populated the desired state of world, meaning it has + // completed a populate loop that started after sources are all ready. After, there's no need to keep checking. + wait.PollUntil(rc.loopSleepDuration, func() (bool, error) { + rc.reconciliationLoopFunc(rc.populatorHasAddedPods())() + return rc.populatorHasAddedPods(), nil + }, stopCh) + wait.Until(rc.reconciliationLoopFunc(true), rc.loopSleepDuration, stopCh) } -func (rc *reconciler) reconciliationLoopFunc(sourcesReady config.SourcesReady) func() { +func (rc *reconciler) reconciliationLoopFunc(populatorHasAddedPods bool) func() { return func() { rc.reconcile() - // Add all sources ready check so that reconciler's reconstruct process will start after - // desired state of world is populated with pod volume information from different sources. Otherwise, - // reconciler's reconstruct process may add incomplete volume information and cause confusion. - // In addition, if some sources are not ready, the reconstruct process may clean up pods' volumes - // that are still in use because desired states could not get a complete list of pods. - if sourcesReady.AllReady() && time.Since(rc.timeOfLastSync) > rc.syncDuration { - glog.V(5).Infof("Sources are all ready, starting reconstruct state function") + // Add a check that the populator has added pods so that reconciler's reconstruct process will start + // after desired state of world is populated with pod volume information. Otherwise, reconciler's + // reconstruct process may add incomplete volume information and cause confusion. In addition, if the + // desired state of world has not been populated yet, the reconstruct process may clean up pods' volumes + // that are still in use because desired state of world does not contain a complete list of pods. + if populatorHasAddedPods && time.Since(rc.timeOfLastSync) > rc.syncDuration { + glog.V(5).Infof("Desired state of world has been populated with pods, starting reconstruct state function") rc.sync() } } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index 6b027e2b93..5fcb4f4d38 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -25,13 +25,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" k8stypes "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" - "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" @@ -43,7 +41,7 @@ import ( const ( // reconcilerLoopSleepDuration is the amount of time the reconciler loop // waits between successive executions - reconcilerLoopSleepDuration time.Duration = 0 * time.Millisecond + reconcilerLoopSleepDuration time.Duration = 1 * time.Nanosecond reconcilerSyncStatesSleepPeriod time.Duration = 10 * time.Minute // waitForAttachTimeout is the maximum amount of time a // operationexecutor.Mount call will wait for a volume to be attached. @@ -52,6 +50,8 @@ const ( kubeletPodsDir string = "fake-dir" ) +func hasAddedPods() bool { return true } + // Calls Run() // Verifies there are no calls to attach, detach, mount, unmount, etc. func Test_Run_Positive_DoNothing(t *testing.T) { @@ -71,6 +71,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) { nodeName, dsw, asw, + hasAddedPods, oex, &mount.FakeMounter{}, volumePluginMgr, @@ -108,6 +109,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) { nodeName, dsw, asw, + hasAddedPods, oex, &mount.FakeMounter{}, volumePluginMgr, @@ -179,6 +181,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { nodeName, dsw, asw, + hasAddedPods, oex, &mount.FakeMounter{}, volumePluginMgr, @@ -251,6 +254,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { nodeName, dsw, asw, + hasAddedPods, oex, &mount.FakeMounter{}, volumePluginMgr, @@ -334,6 +338,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { nodeName, dsw, asw, + hasAddedPods, oex, &mount.FakeMounter{}, volumePluginMgr, @@ -472,6 +477,5 @@ func createTestClient() *fake.Clientset { } func runReconciler(reconciler Reconciler) { - sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return false }) - go reconciler.Run(sourcesReady, wait.NeverStop) + go reconciler.Run(wait.NeverStop) } diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index 47e7c4b582..b69b94d8c9 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -169,19 +169,6 @@ func NewVolumeManager( ), } - vm.reconciler = reconciler.NewReconciler( - kubeClient, - controllerAttachDetachEnabled, - reconcilerLoopSleepPeriod, - reconcilerSyncStatesSleepPeriod, - waitForAttachTimeout, - nodeName, - vm.desiredStateOfWorld, - vm.actualStateOfWorld, - vm.operationExecutor, - mounter, - volumePluginMgr, - kubeletPodsDir) vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator( kubeClient, desiredStateOfWorldPopulatorLoopSleepPeriod, @@ -191,6 +178,20 @@ func NewVolumeManager( vm.desiredStateOfWorld, kubeContainerRuntime, keepTerminatedPodVolumes) + vm.reconciler = reconciler.NewReconciler( + kubeClient, + controllerAttachDetachEnabled, + reconcilerLoopSleepPeriod, + reconcilerSyncStatesSleepPeriod, + waitForAttachTimeout, + nodeName, + vm.desiredStateOfWorld, + vm.actualStateOfWorld, + vm.desiredStateOfWorldPopulator.HasAddedPods, + vm.operationExecutor, + mounter, + volumePluginMgr, + kubeletPodsDir) return vm, nil } @@ -236,11 +237,11 @@ type volumeManager struct { func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { defer runtime.HandleCrash() - go vm.desiredStateOfWorldPopulator.Run(stopCh) + go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh) glog.V(2).Infof("The desired_state_of_world populator starts") glog.Infof("Starting Kubelet Volume Manager") - go vm.reconciler.Run(sourcesReady, stopCh) + go vm.reconciler.Run(stopCh) <-stopCh glog.Infof("Shutting down Kubelet Volume Manager")