mirror of https://github.com/k3s-io/k3s
Merge pull request #44781 from wongma7/outervolumespec
Automatic merge from submit-queue (batch tested with PRs 45382, 45384, 44781, 45333, 45543) Ensure desired state of world populator runs before volume reconstructor If the kubelet's volumemanager reconstructor for actual state of world runs before the desired state of world has been populated, the pods in the actual state of world will have some incorrect volume information: namely outerVolumeSpecName, which if incorrect leads to part of the issue here https://github.com/kubernetes/kubernetes/issues/43515, because WaitForVolumeAttachAndMount searches the actual state of world with the correct outerVolumeSpecName and won't find it so reports 'timeout waiting....', etc. forever for existing pods. The comments acknowledge that this is a known issue The all sources ready check doesn't work because the sources being ready doesn't necessarily mean the desired state of world populator added pods from the sources. So instead let's put the all sources ready check in the *populator*, and when the sources are ready, it will be able to populate the desired state of world and make "HasAddedPods()" return true. THEN, the reconstructor may run. @jingxu97 PTAL, you wrote all of the reconstruction stuff ```release-note NONE ```pull/6/head
commit
aacc9729f1
|
@ -16,6 +16,7 @@ go_library(
|
|||
"//pkg/api:go_default_library",
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/kubelet/config:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/pod:go_default_library",
|
||||
"//pkg/kubelet/status:go_default_library",
|
||||
|
|
|
@ -33,6 +33,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/pod"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
|
@ -48,13 +49,21 @@ import (
|
|||
// if it has volumes. It also verifies that the pods in the desired state of the
|
||||
// world cache still exist, if not, it removes them.
|
||||
type DesiredStateOfWorldPopulator interface {
|
||||
Run(stopCh <-chan struct{})
|
||||
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
|
||||
|
||||
// ReprocessPod removes the specified pod from the list of processedPods
|
||||
// (if it exists) forcing it to be reprocessed. This is required to enable
|
||||
// remounting volumes on pod updates (volumes like Downward API volumes
|
||||
// depend on this behavior to ensure volume content is updated).
|
||||
ReprocessPod(podName volumetypes.UniquePodName)
|
||||
|
||||
// HasAddedPods returns whether the populator has looped through the list
|
||||
// of active pods and added them to the desired state of the world cache,
|
||||
// at a time after sources are all ready, at least once. It does not
|
||||
// return true before sources are all ready because before then, there is
|
||||
// a chance many or all pods are missing from the list of active pods and
|
||||
// so few to none will have been added.
|
||||
HasAddedPods() bool
|
||||
}
|
||||
|
||||
// NewDesiredStateOfWorldPopulator returns a new instance of
|
||||
|
@ -86,6 +95,8 @@ func NewDesiredStateOfWorldPopulator(
|
|||
processedPods: make(map[volumetypes.UniquePodName]bool)},
|
||||
kubeContainerRuntime: kubeContainerRuntime,
|
||||
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
|
||||
hasAddedPods: false,
|
||||
hasAddedPodsLock: sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -100,6 +111,8 @@ type desiredStateOfWorldPopulator struct {
|
|||
kubeContainerRuntime kubecontainer.Runtime
|
||||
timeOfLastGetPodStatus time.Time
|
||||
keepTerminatedPodVolumes bool
|
||||
hasAddedPods bool
|
||||
hasAddedPodsLock sync.RWMutex
|
||||
}
|
||||
|
||||
type processedPods struct {
|
||||
|
@ -107,7 +120,16 @@ type processedPods struct {
|
|||
sync.RWMutex
|
||||
}
|
||||
|
||||
func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) {
|
||||
func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
|
||||
// Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly
|
||||
wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
|
||||
done := sourcesReady.AllReady()
|
||||
dswp.populatorLoopFunc()()
|
||||
return done, nil
|
||||
}, stopCh)
|
||||
dswp.hasAddedPodsLock.Lock()
|
||||
dswp.hasAddedPods = true
|
||||
dswp.hasAddedPodsLock.Unlock()
|
||||
wait.Until(dswp.populatorLoopFunc(), dswp.loopSleepDuration, stopCh)
|
||||
}
|
||||
|
||||
|
@ -116,6 +138,12 @@ func (dswp *desiredStateOfWorldPopulator) ReprocessPod(
|
|||
dswp.deleteProcessedPod(podName)
|
||||
}
|
||||
|
||||
func (dswp *desiredStateOfWorldPopulator) HasAddedPods() bool {
|
||||
dswp.hasAddedPodsLock.RLock()
|
||||
defer dswp.hasAddedPodsLock.RUnlock()
|
||||
return dswp.hasAddedPods
|
||||
}
|
||||
|
||||
func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
|
||||
return func() {
|
||||
dswp.findAndAddNewPods()
|
||||
|
|
|
@ -16,7 +16,6 @@ go_library(
|
|||
"//cmd/kubelet/app/options:go_default_library",
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/kubelet/config:go_default_library",
|
||||
"//pkg/kubelet/volumemanager/cache:go_default_library",
|
||||
"//pkg/util:go_default_library",
|
||||
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
|
||||
|
@ -42,7 +41,6 @@ go_test(
|
|||
deps = [
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
||||
"//pkg/kubelet/config:go_default_library",
|
||||
"//pkg/kubelet/volumemanager/cache:go_default_library",
|
||||
"//pkg/util/mount:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
|
@ -53,7 +51,6 @@ go_test(
|
|||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/testing:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
||||
|
|
|
@ -32,7 +32,6 @@ import (
|
|||
"k8s.io/kubernetes/cmd/kubelet/app/options"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
||||
|
@ -59,7 +58,7 @@ type Reconciler interface {
|
|||
// If attach/detach management is enabled, the manager will also check if
|
||||
// volumes that should be attached are attached and volumes that should
|
||||
// be detached are detached and trigger attach/detach operations as needed.
|
||||
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
|
||||
Run(stopCh <-chan struct{})
|
||||
|
||||
// StatesHasBeenSynced returns true only after syncStates process starts to sync
|
||||
// states at least once after kubelet starts
|
||||
|
@ -80,6 +79,9 @@ type Reconciler interface {
|
|||
// nodeName - the Name for this node, used by Attach and Detach methods
|
||||
// desiredStateOfWorld - cache containing the desired state of the world
|
||||
// actualStateOfWorld - cache containing the actual state of the world
|
||||
// populatorHasAddedPods - checker for whether the populator has finished
|
||||
// adding pods to the desiredStateOfWorld cache at least once after sources
|
||||
// are all ready (before sources are ready, pods are probably missing)
|
||||
// operationExecutor - used to trigger attach/detach/mount/unmount operations
|
||||
// safely (prevents more than one operation from being triggered on the same
|
||||
// volume)
|
||||
|
@ -94,6 +96,7 @@ func NewReconciler(
|
|||
nodeName types.NodeName,
|
||||
desiredStateOfWorld cache.DesiredStateOfWorld,
|
||||
actualStateOfWorld cache.ActualStateOfWorld,
|
||||
populatorHasAddedPods func() bool,
|
||||
operationExecutor operationexecutor.OperationExecutor,
|
||||
mounter mount.Interface,
|
||||
volumePluginMgr *volumepkg.VolumePluginMgr,
|
||||
|
@ -107,6 +110,7 @@ func NewReconciler(
|
|||
nodeName: nodeName,
|
||||
desiredStateOfWorld: desiredStateOfWorld,
|
||||
actualStateOfWorld: actualStateOfWorld,
|
||||
populatorHasAddedPods: populatorHasAddedPods,
|
||||
operationExecutor: operationExecutor,
|
||||
mounter: mounter,
|
||||
volumePluginMgr: volumePluginMgr,
|
||||
|
@ -124,6 +128,7 @@ type reconciler struct {
|
|||
nodeName types.NodeName
|
||||
desiredStateOfWorld cache.DesiredStateOfWorld
|
||||
actualStateOfWorld cache.ActualStateOfWorld
|
||||
populatorHasAddedPods func() bool
|
||||
operationExecutor operationexecutor.OperationExecutor
|
||||
mounter mount.Interface
|
||||
volumePluginMgr *volumepkg.VolumePluginMgr
|
||||
|
@ -131,21 +136,27 @@ type reconciler struct {
|
|||
timeOfLastSync time.Time
|
||||
}
|
||||
|
||||
func (rc *reconciler) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
|
||||
wait.Until(rc.reconciliationLoopFunc(sourcesReady), rc.loopSleepDuration, stopCh)
|
||||
func (rc *reconciler) Run(stopCh <-chan struct{}) {
|
||||
// Wait for the populator to indicate that it has actually populated the desired state of world, meaning it has
|
||||
// completed a populate loop that started after sources are all ready. After, there's no need to keep checking.
|
||||
wait.PollUntil(rc.loopSleepDuration, func() (bool, error) {
|
||||
rc.reconciliationLoopFunc(rc.populatorHasAddedPods())()
|
||||
return rc.populatorHasAddedPods(), nil
|
||||
}, stopCh)
|
||||
wait.Until(rc.reconciliationLoopFunc(true), rc.loopSleepDuration, stopCh)
|
||||
}
|
||||
|
||||
func (rc *reconciler) reconciliationLoopFunc(sourcesReady config.SourcesReady) func() {
|
||||
func (rc *reconciler) reconciliationLoopFunc(populatorHasAddedPods bool) func() {
|
||||
return func() {
|
||||
rc.reconcile()
|
||||
|
||||
// Add all sources ready check so that reconciler's reconstruct process will start after
|
||||
// desired state of world is populated with pod volume information from different sources. Otherwise,
|
||||
// reconciler's reconstruct process may add incomplete volume information and cause confusion.
|
||||
// In addition, if some sources are not ready, the reconstruct process may clean up pods' volumes
|
||||
// that are still in use because desired states could not get a complete list of pods.
|
||||
if sourcesReady.AllReady() && time.Since(rc.timeOfLastSync) > rc.syncDuration {
|
||||
glog.V(5).Infof("Sources are all ready, starting reconstruct state function")
|
||||
// Add a check that the populator has added pods so that reconciler's reconstruct process will start
|
||||
// after desired state of world is populated with pod volume information. Otherwise, reconciler's
|
||||
// reconstruct process may add incomplete volume information and cause confusion. In addition, if the
|
||||
// desired state of world has not been populated yet, the reconstruct process may clean up pods' volumes
|
||||
// that are still in use because desired state of world does not contain a complete list of pods.
|
||||
if populatorHasAddedPods && time.Since(rc.timeOfLastSync) > rc.syncDuration {
|
||||
glog.V(5).Infof("Desired state of world has been populated with pods, starting reconstruct state function")
|
||||
rc.sync()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,13 +25,11 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
k8stypes "k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
|
@ -43,7 +41,7 @@ import (
|
|||
const (
|
||||
// reconcilerLoopSleepDuration is the amount of time the reconciler loop
|
||||
// waits between successive executions
|
||||
reconcilerLoopSleepDuration time.Duration = 0 * time.Millisecond
|
||||
reconcilerLoopSleepDuration time.Duration = 1 * time.Nanosecond
|
||||
reconcilerSyncStatesSleepPeriod time.Duration = 10 * time.Minute
|
||||
// waitForAttachTimeout is the maximum amount of time a
|
||||
// operationexecutor.Mount call will wait for a volume to be attached.
|
||||
|
@ -52,6 +50,8 @@ const (
|
|||
kubeletPodsDir string = "fake-dir"
|
||||
)
|
||||
|
||||
func hasAddedPods() bool { return true }
|
||||
|
||||
// Calls Run()
|
||||
// Verifies there are no calls to attach, detach, mount, unmount, etc.
|
||||
func Test_Run_Positive_DoNothing(t *testing.T) {
|
||||
|
@ -71,6 +71,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
|
|||
nodeName,
|
||||
dsw,
|
||||
asw,
|
||||
hasAddedPods,
|
||||
oex,
|
||||
&mount.FakeMounter{},
|
||||
volumePluginMgr,
|
||||
|
@ -108,6 +109,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
|
|||
nodeName,
|
||||
dsw,
|
||||
asw,
|
||||
hasAddedPods,
|
||||
oex,
|
||||
&mount.FakeMounter{},
|
||||
volumePluginMgr,
|
||||
|
@ -179,6 +181,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
|
|||
nodeName,
|
||||
dsw,
|
||||
asw,
|
||||
hasAddedPods,
|
||||
oex,
|
||||
&mount.FakeMounter{},
|
||||
volumePluginMgr,
|
||||
|
@ -251,6 +254,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
|
|||
nodeName,
|
||||
dsw,
|
||||
asw,
|
||||
hasAddedPods,
|
||||
oex,
|
||||
&mount.FakeMounter{},
|
||||
volumePluginMgr,
|
||||
|
@ -334,6 +338,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
|
|||
nodeName,
|
||||
dsw,
|
||||
asw,
|
||||
hasAddedPods,
|
||||
oex,
|
||||
&mount.FakeMounter{},
|
||||
volumePluginMgr,
|
||||
|
@ -472,6 +477,5 @@ func createTestClient() *fake.Clientset {
|
|||
}
|
||||
|
||||
func runReconciler(reconciler Reconciler) {
|
||||
sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return false })
|
||||
go reconciler.Run(sourcesReady, wait.NeverStop)
|
||||
go reconciler.Run(wait.NeverStop)
|
||||
}
|
||||
|
|
|
@ -171,19 +171,6 @@ func NewVolumeManager(
|
|||
),
|
||||
}
|
||||
|
||||
vm.reconciler = reconciler.NewReconciler(
|
||||
kubeClient,
|
||||
controllerAttachDetachEnabled,
|
||||
reconcilerLoopSleepPeriod,
|
||||
reconcilerSyncStatesSleepPeriod,
|
||||
waitForAttachTimeout,
|
||||
nodeName,
|
||||
vm.desiredStateOfWorld,
|
||||
vm.actualStateOfWorld,
|
||||
vm.operationExecutor,
|
||||
mounter,
|
||||
volumePluginMgr,
|
||||
kubeletPodsDir)
|
||||
vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
|
||||
kubeClient,
|
||||
desiredStateOfWorldPopulatorLoopSleepPeriod,
|
||||
|
@ -193,6 +180,20 @@ func NewVolumeManager(
|
|||
vm.desiredStateOfWorld,
|
||||
kubeContainerRuntime,
|
||||
keepTerminatedPodVolumes)
|
||||
vm.reconciler = reconciler.NewReconciler(
|
||||
kubeClient,
|
||||
controllerAttachDetachEnabled,
|
||||
reconcilerLoopSleepPeriod,
|
||||
reconcilerSyncStatesSleepPeriod,
|
||||
waitForAttachTimeout,
|
||||
nodeName,
|
||||
vm.desiredStateOfWorld,
|
||||
vm.actualStateOfWorld,
|
||||
vm.desiredStateOfWorldPopulator.HasAddedPods,
|
||||
vm.operationExecutor,
|
||||
mounter,
|
||||
volumePluginMgr,
|
||||
kubeletPodsDir)
|
||||
|
||||
return vm, nil
|
||||
}
|
||||
|
@ -238,11 +239,11 @@ type volumeManager struct {
|
|||
func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
|
||||
defer runtime.HandleCrash()
|
||||
|
||||
go vm.desiredStateOfWorldPopulator.Run(stopCh)
|
||||
go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
|
||||
glog.V(2).Infof("The desired_state_of_world populator starts")
|
||||
|
||||
glog.Infof("Starting Kubelet Volume Manager")
|
||||
go vm.reconciler.Run(sourcesReady, stopCh)
|
||||
go vm.reconciler.Run(stopCh)
|
||||
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down Kubelet Volume Manager")
|
||||
|
|
Loading…
Reference in New Issue