Ensure desired state of world populator runs before volume reconstructor

pull/6/head
Matthew Wong 2017-04-26 18:23:20 -04:00
parent 936137d22b
commit bbe82a2688
6 changed files with 79 additions and 37 deletions

View File

@ -16,6 +16,7 @@ go_library(
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/status:go_default_library",

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/status"
@ -48,13 +49,21 @@ import (
// if it has volumes. It also verifies that the pods in the desired state of the
// world cache still exist, if not, it removes them.
type DesiredStateOfWorldPopulator interface {
Run(stopCh <-chan struct{})
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
// ReprocessPod removes the specified pod from the list of processedPods
// (if it exists) forcing it to be reprocessed. This is required to enable
// remounting volumes on pod updates (volumes like Downward API volumes
// depend on this behavior to ensure volume content is updated).
ReprocessPod(podName volumetypes.UniquePodName)
// HasAddedPods returns whether the populator has looped through the list
// of active pods and added them to the desired state of the world cache,
// at a time after sources are all ready, at least once. It does not
// return true before sources are all ready because before then, there is
// a chance many or all pods are missing from the list of active pods and
// so few to none will have been added.
HasAddedPods() bool
}
// NewDesiredStateOfWorldPopulator returns a new instance of
@ -86,6 +95,8 @@ func NewDesiredStateOfWorldPopulator(
processedPods: make(map[volumetypes.UniquePodName]bool)},
kubeContainerRuntime: kubeContainerRuntime,
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
hasAddedPods: false,
hasAddedPodsLock: sync.RWMutex{},
}
}
@ -100,6 +111,8 @@ type desiredStateOfWorldPopulator struct {
kubeContainerRuntime kubecontainer.Runtime
timeOfLastGetPodStatus time.Time
keepTerminatedPodVolumes bool
hasAddedPods bool
hasAddedPodsLock sync.RWMutex
}
type processedPods struct {
@ -107,7 +120,16 @@ type processedPods struct {
sync.RWMutex
}
func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) {
func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
// Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly
wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
done := sourcesReady.AllReady()
dswp.populatorLoopFunc()()
return done, nil
}, stopCh)
dswp.hasAddedPodsLock.Lock()
dswp.hasAddedPods = true
dswp.hasAddedPodsLock.Unlock()
wait.Until(dswp.populatorLoopFunc(), dswp.loopSleepDuration, stopCh)
}
@ -116,6 +138,12 @@ func (dswp *desiredStateOfWorldPopulator) ReprocessPod(
dswp.deleteProcessedPod(podName)
}
func (dswp *desiredStateOfWorldPopulator) HasAddedPods() bool {
dswp.hasAddedPodsLock.RLock()
defer dswp.hasAddedPodsLock.RUnlock()
return dswp.hasAddedPods
}
func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
return func() {
dswp.findAndAddNewPods()

View File

@ -16,7 +16,6 @@ go_library(
"//cmd/kubelet/app/options:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/volumemanager/cache:go_default_library",
"//pkg/util:go_default_library",
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
@ -42,7 +41,6 @@ go_test(
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/volumemanager/cache:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
@ -53,7 +51,6 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",

View File

@ -32,7 +32,6 @@ import (
"k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
@ -59,7 +58,7 @@ type Reconciler interface {
// If attach/detach management is enabled, the manager will also check if
// volumes that should be attached are attached and volumes that should
// be detached are detached and trigger attach/detach operations as needed.
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
Run(stopCh <-chan struct{})
// StatesHasBeenSynced returns true only after syncStates process starts to sync
// states at least once after kubelet starts
@ -80,6 +79,9 @@ type Reconciler interface {
// nodeName - the Name for this node, used by Attach and Detach methods
// desiredStateOfWorld - cache containing the desired state of the world
// actualStateOfWorld - cache containing the actual state of the world
// populatorHasAddedPods - checker for whether the populator has finished
// adding pods to the desiredStateOfWorld cache at least once after sources
// are all ready (before sources are ready, pods are probably missing)
// operationExecutor - used to trigger attach/detach/mount/unmount operations
// safely (prevents more than one operation from being triggered on the same
// volume)
@ -94,6 +96,7 @@ func NewReconciler(
nodeName types.NodeName,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld,
populatorHasAddedPods func() bool,
operationExecutor operationexecutor.OperationExecutor,
mounter mount.Interface,
volumePluginMgr *volumepkg.VolumePluginMgr,
@ -107,6 +110,7 @@ func NewReconciler(
nodeName: nodeName,
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
populatorHasAddedPods: populatorHasAddedPods,
operationExecutor: operationExecutor,
mounter: mounter,
volumePluginMgr: volumePluginMgr,
@ -124,6 +128,7 @@ type reconciler struct {
nodeName types.NodeName
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
populatorHasAddedPods func() bool
operationExecutor operationexecutor.OperationExecutor
mounter mount.Interface
volumePluginMgr *volumepkg.VolumePluginMgr
@ -131,21 +136,27 @@ type reconciler struct {
timeOfLastSync time.Time
}
func (rc *reconciler) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
wait.Until(rc.reconciliationLoopFunc(sourcesReady), rc.loopSleepDuration, stopCh)
func (rc *reconciler) Run(stopCh <-chan struct{}) {
// Wait for the populator to indicate that it has actually populated the desired state of world, meaning it has
// completed a populate loop that started after sources are all ready. After, there's no need to keep checking.
wait.PollUntil(rc.loopSleepDuration, func() (bool, error) {
rc.reconciliationLoopFunc(rc.populatorHasAddedPods())()
return rc.populatorHasAddedPods(), nil
}, stopCh)
wait.Until(rc.reconciliationLoopFunc(true), rc.loopSleepDuration, stopCh)
}
func (rc *reconciler) reconciliationLoopFunc(sourcesReady config.SourcesReady) func() {
func (rc *reconciler) reconciliationLoopFunc(populatorHasAddedPods bool) func() {
return func() {
rc.reconcile()
// Add all sources ready check so that reconciler's reconstruct process will start after
// desired state of world is populated with pod volume information from different sources. Otherwise,
// reconciler's reconstruct process may add incomplete volume information and cause confusion.
// In addition, if some sources are not ready, the reconstruct process may clean up pods' volumes
// that are still in use because desired states could not get a complete list of pods.
if sourcesReady.AllReady() && time.Since(rc.timeOfLastSync) > rc.syncDuration {
glog.V(5).Infof("Sources are all ready, starting reconstruct state function")
// Add a check that the populator has added pods so that reconciler's reconstruct process will start
// after desired state of world is populated with pod volume information. Otherwise, reconciler's
// reconstruct process may add incomplete volume information and cause confusion. In addition, if the
// desired state of world has not been populated yet, the reconstruct process may clean up pods' volumes
// that are still in use because desired state of world does not contain a complete list of pods.
if populatorHasAddedPods && time.Since(rc.timeOfLastSync) > rc.syncDuration {
glog.V(5).Infof("Desired state of world has been populated with pods, starting reconstruct state function")
rc.sync()
}
}

View File

@ -25,13 +25,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
@ -43,7 +41,7 @@ import (
const (
// reconcilerLoopSleepDuration is the amount of time the reconciler loop
// waits between successive executions
reconcilerLoopSleepDuration time.Duration = 0 * time.Millisecond
reconcilerLoopSleepDuration time.Duration = 1 * time.Nanosecond
reconcilerSyncStatesSleepPeriod time.Duration = 10 * time.Minute
// waitForAttachTimeout is the maximum amount of time a
// operationexecutor.Mount call will wait for a volume to be attached.
@ -52,6 +50,8 @@ const (
kubeletPodsDir string = "fake-dir"
)
func hasAddedPods() bool { return true }
// Calls Run()
// Verifies there are no calls to attach, detach, mount, unmount, etc.
func Test_Run_Positive_DoNothing(t *testing.T) {
@ -71,6 +71,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
nodeName,
dsw,
asw,
hasAddedPods,
oex,
&mount.FakeMounter{},
volumePluginMgr,
@ -108,6 +109,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
nodeName,
dsw,
asw,
hasAddedPods,
oex,
&mount.FakeMounter{},
volumePluginMgr,
@ -179,6 +181,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
nodeName,
dsw,
asw,
hasAddedPods,
oex,
&mount.FakeMounter{},
volumePluginMgr,
@ -251,6 +254,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
nodeName,
dsw,
asw,
hasAddedPods,
oex,
&mount.FakeMounter{},
volumePluginMgr,
@ -334,6 +338,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
nodeName,
dsw,
asw,
hasAddedPods,
oex,
&mount.FakeMounter{},
volumePluginMgr,
@ -472,6 +477,5 @@ func createTestClient() *fake.Clientset {
}
func runReconciler(reconciler Reconciler) {
sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return false })
go reconciler.Run(sourcesReady, wait.NeverStop)
go reconciler.Run(wait.NeverStop)
}

View File

@ -169,19 +169,6 @@ func NewVolumeManager(
),
}
vm.reconciler = reconciler.NewReconciler(
kubeClient,
controllerAttachDetachEnabled,
reconcilerLoopSleepPeriod,
reconcilerSyncStatesSleepPeriod,
waitForAttachTimeout,
nodeName,
vm.desiredStateOfWorld,
vm.actualStateOfWorld,
vm.operationExecutor,
mounter,
volumePluginMgr,
kubeletPodsDir)
vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
kubeClient,
desiredStateOfWorldPopulatorLoopSleepPeriod,
@ -191,6 +178,20 @@ func NewVolumeManager(
vm.desiredStateOfWorld,
kubeContainerRuntime,
keepTerminatedPodVolumes)
vm.reconciler = reconciler.NewReconciler(
kubeClient,
controllerAttachDetachEnabled,
reconcilerLoopSleepPeriod,
reconcilerSyncStatesSleepPeriod,
waitForAttachTimeout,
nodeName,
vm.desiredStateOfWorld,
vm.actualStateOfWorld,
vm.desiredStateOfWorldPopulator.HasAddedPods,
vm.operationExecutor,
mounter,
volumePluginMgr,
kubeletPodsDir)
return vm, nil
}
@ -236,11 +237,11 @@ type volumeManager struct {
func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
go vm.desiredStateOfWorldPopulator.Run(stopCh)
go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
glog.V(2).Infof("The desired_state_of_world populator starts")
glog.Infof("Starting Kubelet Volume Manager")
go vm.reconciler.Run(sourcesReady, stopCh)
go vm.reconciler.Run(stopCh)
<-stopCh
glog.Infof("Shutting down Kubelet Volume Manager")