diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 87e39a0b6d..cb13ec735f 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -544,7 +544,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error { nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent), nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent), nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent), - nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent), + nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent), nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse), nodestatus.RemoveOutOfDiskCondition(), // TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event diff --git a/pkg/kubelet/nodestatus/setters.go b/pkg/kubelet/nodestatus/setters.go index 71bd5e4a58..becdb0f12d 100644 --- a/pkg/kubelet/nodestatus/setters.go +++ b/pkg/kubelet/nodestatus/setters.go @@ -440,6 +440,7 @@ func ReadyCondition( nowFunc func() time.Time, // typically Kubelet.clock.Now runtimeErrorsFunc func() error, // typically Kubelet.runtimeState.runtimeErrors networkErrorsFunc func() error, // typically Kubelet.runtimeState.networkErrors + storageErrorsFunc func() error, // typically Kubelet.runtimeState.storageErrors appArmorValidateHostFunc func() error, // typically Kubelet.appArmorValidator.ValidateHost, might be nil depending on whether there was an appArmorValidator cmStatusFunc func() cm.Status, // typically Kubelet.containerManager.Status recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent @@ -456,7 +457,7 @@ func ReadyCondition( Message: "kubelet is posting ready status", LastHeartbeatTime: currentTime, } - errs := []error{runtimeErrorsFunc(), networkErrorsFunc()} + errs := []error{runtimeErrorsFunc(), networkErrorsFunc(), storageErrorsFunc()} requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods} if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) { requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage) diff --git a/pkg/kubelet/nodestatus/setters_test.go b/pkg/kubelet/nodestatus/setters_test.go index c0267a686f..6a3e73cd9d 100644 --- a/pkg/kubelet/nodestatus/setters_test.go +++ b/pkg/kubelet/nodestatus/setters_test.go @@ -895,6 +895,7 @@ func TestReadyCondition(t *testing.T) { node *v1.Node runtimeErrors error networkErrors error + storageErrors error appArmorValidateHostFunc func() error cmStatus cm.Status expectConditions []v1.NodeCondition @@ -929,6 +930,12 @@ func TestReadyCondition(t *testing.T) { }, expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status. WARNING: foo", now, now)}, }, + { + desc: "new, not ready: storage errors", + node: withCapacity.DeepCopy(), + storageErrors: errors.New("some storage error"), + expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "some storage error", now, now)}, + }, { desc: "new, not ready: runtime and network errors", node: withCapacity.DeepCopy(), @@ -1003,6 +1010,9 @@ func TestReadyCondition(t *testing.T) { networkErrorsFunc := func() error { return tc.networkErrors } + storageErrorsFunc := func() error { + return tc.storageErrors + } cmStatusFunc := func() cm.Status { return tc.cmStatus } @@ -1014,7 +1024,7 @@ func TestReadyCondition(t *testing.T) { }) } // construct setter - setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, tc.appArmorValidateHostFunc, cmStatusFunc, recordEventFunc) + setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, storageErrorsFunc, tc.appArmorValidateHostFunc, cmStatusFunc, recordEventFunc) // call setter on node if err := setter(tc.node); err != nil { t.Fatalf("unexpected error: %v", err) diff --git a/pkg/kubelet/runtime.go b/pkg/kubelet/runtime.go index 53c85a2887..d51b3b5b29 100644 --- a/pkg/kubelet/runtime.go +++ b/pkg/kubelet/runtime.go @@ -30,6 +30,7 @@ type runtimeState struct { lastBaseRuntimeSync time.Time baseRuntimeSyncThreshold time.Duration networkError error + storageError error cidr string healthChecks []*healthCheck } @@ -61,6 +62,12 @@ func (s *runtimeState) setNetworkState(err error) { s.networkError = err } +func (s *runtimeState) setStorageState(err error) { + s.Lock() + defer s.Unlock() + s.storageError = err +} + func (s *runtimeState) setPodCIDR(cidr string) { s.Lock() defer s.Unlock() @@ -101,6 +108,16 @@ func (s *runtimeState) networkErrors() error { return utilerrors.NewAggregate(errs) } +func (s *runtimeState) storageErrors() error { + s.RLock() + defer s.RUnlock() + errs := []error{} + if s.storageError != nil { + errs = append(errs, s.storageError) + } + return utilerrors.NewAggregate(errs) +} + func newRuntimeState( runtimeSyncThreshold time.Duration, ) *runtimeState { diff --git a/pkg/kubelet/volume_host.go b/pkg/kubelet/volume_host.go index e269c023b5..2efe38a720 100644 --- a/pkg/kubelet/volume_host.go +++ b/pkg/kubelet/volume_host.go @@ -80,6 +80,7 @@ func NewInitializedVolumePluginMgr( // Compile-time check to ensure kubeletVolumeHost implements the VolumeHost interface var _ volume.VolumeHost = &kubeletVolumeHost{} +var _ volume.KubeletVolumeHost = &kubeletVolumeHost{} func (kvh *kubeletVolumeHost) GetPluginDir(pluginName string) string { return kvh.kubelet.getPluginDir(pluginName) @@ -94,6 +95,11 @@ type kubeletVolumeHost struct { mountPodManager mountpod.Manager } +func (kvh *kubeletVolumeHost) SetKubeletError(err error) { + kvh.kubelet.runtimeState.setStorageState(err) + return +} + func (kvh *kubeletVolumeHost) GetVolumeDevicePluginDir(pluginName string) string { return kvh.kubelet.getVolumeDevicePluginDir(pluginName) } diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index ffcf3f7251..a51d3cb8a7 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -24,6 +24,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", @@ -32,6 +33,7 @@ go_library( "//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library", + "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", "//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", "//vendor/k8s.io/klog:go_default_library", diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 54bdc20ac0..26bfa64f33 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -33,6 +33,7 @@ import ( apierrs "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilversion "k8s.io/apimachinery/pkg/util/version" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -40,6 +41,7 @@ import ( csiinformer "k8s.io/client-go/informers/storage/v1beta1" clientset "k8s.io/client-go/kubernetes" csilister "k8s.io/client-go/listers/storage/v1beta1" + csitranslationplugins "k8s.io/csi-translation-lib/plugins" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager" @@ -216,15 +218,77 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { go factory.Start(wait.NeverStop) } + var migratedPlugins = map[string](func() bool){ + csitranslationplugins.GCEPDInTreePluginName: func() bool { + return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE) + }, + // TODO(leakingtpan): Add AWS migration feature gates and place them here + } + // Initializing the label management channels - nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host) + nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host, migratedPlugins) - // TODO(#70514) Init CSINodeInfo object if the CRD exists and create Driver - // objects for migrated drivers. + if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) && + utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) { + // This function prevents Kubelet from posting Ready status until CSINodeInfo + // is both installed and initialized + err := initializeCSINodeInfo(host) + if err != nil { + return fmt.Errorf("failed to initialize CSINodeInfo: %v", err) + } + } return nil } +func initializeCSINodeInfo(host volume.VolumeHost) error { + kvh, ok := host.(volume.KubeletVolumeHost) + if !ok { + klog.V(4).Infof("Skipping CSINodeInfo initialization, not running on Kubelet") + return nil + } + kubeClient := host.GetKubeClient() + if kubeClient == nil { + // Kubelet running in standalone mode. Skip CSINodeInfo initialization + klog.Warningf("Skipping CSINodeInfo initialization, Kubelet running in standalone mode") + return nil + } + + kvh.SetKubeletError(fmt.Errorf("CSINodeInfo is not yet intialized")) + + go func() { + defer utilruntime.HandleCrash() + + // Backoff parameters tuned to retry over 140 seconds. Will fail and restart the Kubelet + // after max retry steps. + initBackoff := wait.Backoff{ + Steps: 6, + Duration: 15 * time.Millisecond, + Factor: 6.0, + Jitter: 0.1, + } + err := wait.ExponentialBackoff(initBackoff, func() (bool, error) { + klog.V(4).Infof("Initializing migrated drivers on CSINodeInfo") + // TODO(dyzz): Just augment CreateCSINodeInfo to create the annotation on itself. Also update all updating functions to double check that the annotation is correct (yes) + _, err := nim.CreateCSINode() + if err != nil { + kvh.SetKubeletError(fmt.Errorf("Failed to initialize CSINodeInfo: %v", err)) + klog.Errorf("Failed to initialize CSINodeInfo: %v", err) + return false, nil + } + + // Successfully initialized drivers, allow Kubelet to post Ready + kvh.SetKubeletError(nil) + return true, nil + }) + if err != nil { + // Kill the Kubelet process and allow it to restart to retry initialization + klog.Fatalf("Failed to initialize CSINodeInfo after retrying") + } + }() + return nil +} + func (p *csiPlugin) GetPluginName() string { return CSIPluginName } diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go index dfea45f51e..99a610d5cb 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go @@ -21,6 +21,7 @@ package nodeinfomanager // import "k8s.io/kubernetes/pkg/volume/csi/nodeinfomana import ( "encoding/json" "fmt" + "strings" "time" @@ -60,8 +61,9 @@ var ( // nodeInfoManager contains necessary common dependencies to update node info on both // the Node and CSINode objects. type nodeInfoManager struct { - nodeName types.NodeName - volumeHost volume.VolumeHost + nodeName types.NodeName + volumeHost volume.VolumeHost + migratedPlugins map[string](func() bool) } // If no updates is needed, the function must return the same Node object as the input. @@ -85,10 +87,12 @@ type Interface interface { // NewNodeInfoManager initializes nodeInfoManager func NewNodeInfoManager( nodeName types.NodeName, - volumeHost volume.VolumeHost) Interface { + volumeHost volume.VolumeHost, + migratedPlugins map[string](func() bool)) Interface { return &nodeInfoManager{ - nodeName: nodeName, - volumeHost: volumeHost, + nodeName: nodeName, + volumeHost: volumeHost, + migratedPlugins: migratedPlugins, } } @@ -418,9 +422,51 @@ func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) { }, } + setMigrationAnnotation(nim.migratedPlugins, nodeInfo) + return csiKubeClient.StorageV1beta1().CSINodes().Create(nodeInfo) } +const ( + migratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins" +) + +func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *storage.CSINode) (modified bool) { + if migratedPlugins == nil { + return false + } + + nodeInfoAnnotations := nodeInfo.GetAnnotations() + if nodeInfoAnnotations == nil { + nodeInfoAnnotations = map[string]string{} + } + + mpa := nodeInfoAnnotations[migratedPluginsAnnotationKey] + tok := strings.Split(mpa, ",") + oldAnnotationSet := sets.NewString(tok...) + + newAnnotationSet := sets.NewString() + for pluginName, migratedFunc := range migratedPlugins { + if migratedFunc() { + newAnnotationSet.Insert(pluginName) + } + } + + if oldAnnotationSet.Equal(newAnnotationSet) { + return false + } + + nas := strings.Join(newAnnotationSet.List(), ",") + if len(nas) != 0 { + nodeInfoAnnotations[migratedPluginsAnnotationKey] = nas + } else { + delete(nodeInfoAnnotations, migratedPluginsAnnotationKey) + } + + nodeInfo.Annotations = nodeInfoAnnotations + return true +} + func (nim *nodeInfoManager) installDriverToCSINode( nodeInfo *storage.CSINode, driverName string, @@ -453,7 +499,9 @@ func (nim *nodeInfoManager) installDriverToCSINode( } } - if !specModified && !statusModified { + annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo) + + if !specModified && !statusModified && !annotationModified { return nil } @@ -517,7 +565,10 @@ func (nim *nodeInfoManager) tryUninstallDriverFromCSINode( hasModified = true } } - if !hasModified { + + annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo) + + if !hasModified && !annotationModified { // No changes, don't update return nil } diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go index 6b5831e9f7..7146a391e8 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go @@ -19,6 +19,7 @@ package nodeinfomanager import ( "encoding/json" "fmt" + "reflect" "testing" "github.com/stretchr/testify/assert" @@ -545,6 +546,156 @@ func TestUninstallCSIDriverCSINodeInfoDisabled(t *testing.T) { test(t, false /* addNodeInfo */, false /* csiNodeInfoEnabled */, testcases) } +func TestSetMigrationAnnotation(t *testing.T) { + testcases := []struct { + name string + migratedPlugins map[string](func() bool) + existingNode *storage.CSINode + expectedNode *storage.CSINode + expectModified bool + }{ + { + name: "nil migrated plugins", + existingNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + expectedNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + }, + { + name: "one modified plugin", + migratedPlugins: map[string](func() bool){ + "test": func() bool { return true }, + }, + existingNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + expectedNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, + }, + }, + expectModified: true, + }, + { + name: "existing plugin", + migratedPlugins: map[string](func() bool){ + "test": func() bool { return true }, + }, + existingNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, + }, + }, + expectedNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, + }, + }, + expectModified: false, + }, + { + name: "remove plugin", + migratedPlugins: map[string](func() bool){}, + existingNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, + }, + }, + expectedNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{}, + }, + }, + expectModified: true, + }, + { + name: "one modified plugin, other annotations stable", + migratedPlugins: map[string](func() bool){ + "test": func() bool { return true }, + }, + existingNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{"other": "annotation"}, + }, + }, + expectedNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{migratedPluginsAnnotationKey: "test", "other": "annotation"}, + }, + }, + expectModified: true, + }, + { + name: "multiple plugins modified, other annotations stable", + migratedPlugins: map[string](func() bool){ + "test": func() bool { return true }, + "foo": func() bool { return false }, + }, + existingNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{"other": "annotation", migratedPluginsAnnotationKey: "foo"}, + }, + }, + expectedNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{migratedPluginsAnnotationKey: "test", "other": "annotation"}, + }, + }, + expectModified: true, + }, + { + name: "multiple plugins added, other annotations stable", + migratedPlugins: map[string](func() bool){ + "test": func() bool { return true }, + "foo": func() bool { return true }, + }, + existingNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{"other": "annotation"}, + }, + }, + expectedNode: &storage.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{migratedPluginsAnnotationKey: "foo,test", "other": "annotation"}, + }, + }, + expectModified: true, + }, + } + + for _, tc := range testcases { + t.Logf("test case: %s", tc.name) + + modified := setMigrationAnnotation(tc.migratedPlugins, tc.existingNode) + if modified != tc.expectModified { + t.Errorf("Expected modified to be %v but got %v instead", tc.expectModified, modified) + } + + if !reflect.DeepEqual(tc.expectedNode, tc.existingNode) { + t.Errorf("Expected CSINode: %v, but got: %v", tc.expectedNode, tc.existingNode) + } + } +} + func TestInstallCSIDriverExistingAnnotation(t *testing.T) { defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSINodeInfo, true)() @@ -591,7 +742,7 @@ func TestInstallCSIDriverExistingAnnotation(t *testing.T) { nodeName, ) - nim := NewNodeInfoManager(types.NodeName(nodeName), host) + nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil) // Act _, err = nim.CreateCSINode() @@ -649,7 +800,7 @@ func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []t nil, nodeName, ) - nim := NewNodeInfoManager(types.NodeName(nodeName), host) + nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil) //// Act nim.CreateCSINode() diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 4228bc694e..1470156dfb 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -278,6 +278,13 @@ type BlockVolumePlugin interface { ConstructBlockVolumeSpec(podUID types.UID, volumeName, volumePath string) (*Spec, error) } +// KubeletVolumeHost is a Kubelet specific interface that plugins can use to access the kubelet. +type KubeletVolumeHost interface { + // SetKubeletError lets plugins set an error on the Kubelet runtime status + // that will cause the Kubelet to post NotReady status with the error message provided + SetKubeletError(err error) +} + // VolumeHost is an interface that plugins can use to access the kubelet. type VolumeHost interface { // GetPluginDir returns the absolute path to a directory under which