/* Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package kubelet import ( "encoding/json" "fmt" "net" goruntime "runtime" "sort" "strconv" "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" cadvisorapi "github.com/google/cadvisor/info/v1" "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes/fake" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" core "k8s.io/client-go/testing" "k8s.io/kubernetes/pkg/features" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/nodestatus" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/kubernetes/pkg/scheduler/algorithm" taintutil "k8s.io/kubernetes/pkg/util/taints" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume/util" ) const ( maxImageTagsForTest = 20 ) // generateTestingImageLists generate randomly generated image list and corresponding expectedImageList. func generateTestingImageLists(count int, maxImages int) ([]kubecontainer.Image, []v1.ContainerImage) { // imageList is randomly generated image list var imageList []kubecontainer.Image for ; count > 0; count-- { imageItem := kubecontainer.Image{ ID: string(uuid.NewUUID()), RepoTags: generateImageTags(), Size: rand.Int63nRange(minImgSize, maxImgSize+1), } imageList = append(imageList, imageItem) } expectedImageList := makeExpectedImageList(imageList, maxImages) return imageList, expectedImageList } func makeExpectedImageList(imageList []kubecontainer.Image, maxImages int) []v1.ContainerImage { // expectedImageList is generated by imageList according to size and maxImages // 1. sort the imageList by size sort.Sort(sliceutils.ByImageSize(imageList)) // 2. convert sorted imageList to v1.ContainerImage list var expectedImageList []v1.ContainerImage for _, kubeImage := range imageList { apiImage := v1.ContainerImage{ Names: kubeImage.RepoTags[0:nodestatus.MaxNamesPerImageInNodeStatus], SizeBytes: kubeImage.Size, } expectedImageList = append(expectedImageList, apiImage) } // 3. only returns the top maxImages images in expectedImageList if maxImages == -1 { // -1 means no limit return expectedImageList } return expectedImageList[0:maxImages] } func generateImageTags() []string { var tagList []string // Generate > MaxNamesPerImageInNodeStatus tags so that the test can verify // that kubelet report up to MaxNamesPerImageInNodeStatus tags. count := rand.IntnRange(nodestatus.MaxNamesPerImageInNodeStatus+1, maxImageTagsForTest+1) for ; count > 0; count-- { tagList = append(tagList, "k8s.gcr.io:v"+strconv.Itoa(count)) } return tagList } func applyNodeStatusPatch(originalNode *v1.Node, patch []byte) (*v1.Node, error) { original, err := json.Marshal(originalNode) if err != nil { return nil, fmt.Errorf("failed to marshal original node %#v: %v", originalNode, err) } updated, err := strategicpatch.StrategicMergePatch(original, patch, v1.Node{}) if err != nil { return nil, fmt.Errorf("failed to apply strategic merge patch %q on node %#v: %v", patch, originalNode, err) } updatedNode := &v1.Node{} if err := json.Unmarshal(updated, updatedNode); err != nil { return nil, fmt.Errorf("failed to unmarshal updated node %q: %v", updated, err) } return updatedNode, nil } func notImplemented(action core.Action) (bool, runtime.Object, error) { return true, nil, fmt.Errorf("no reaction implemented for %s", action) } func addNotImplatedReaction(kubeClient *fake.Clientset) { if kubeClient == nil { return } kubeClient.AddReactor("*", "*", notImplemented) } type localCM struct { cm.ContainerManager allocatableReservation v1.ResourceList capacity v1.ResourceList } func (lcm *localCM) GetNodeAllocatableReservation() v1.ResourceList { return lcm.allocatableReservation } func (lcm *localCM) GetCapacity() v1.ResourceList { return lcm.capacity } // sortableNodeAddress is a type for sorting []v1.NodeAddress type sortableNodeAddress []v1.NodeAddress func (s sortableNodeAddress) Len() int { return len(s) } func (s sortableNodeAddress) Less(i, j int) bool { return (string(s[i].Type) + s[i].Address) < (string(s[j].Type) + s[j].Address) } func (s sortableNodeAddress) Swap(i, j int) { s[j], s[i] = s[i], s[j] } func sortNodeAddresses(addrs sortableNodeAddress) { sort.Sort(addrs) } func TestUpdateNewNodeStatus(t *testing.T) { cases := []struct { desc string nodeStatusMaxImages int32 }{ { desc: "5 image limit", nodeStatusMaxImages: 5, }, { desc: "no image limit", nodeStatusMaxImages: -1, }, } for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { // generate one more in inputImageList than we configure the Kubelet to report, // or 5 images if unlimited numTestImages := int(tc.nodeStatusMaxImages) + 1 if tc.nodeStatusMaxImages == -1 { numTestImages = 5 } inputImageList, expectedImageList := generateTestingImageLists(numTestImages, int(tc.nodeStatusMaxImages)) testKubelet := newTestKubeletWithImageList( t, inputImageList, false /* controllerAttachDetachEnabled */, true /*initFakeVolumePlugin*/) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubelet.nodeStatusMaxImages = tc.nodeStatusMaxImages kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatableReservation: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(2000, resource.BinarySI), }, capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, } // Since this test retroactively overrides the stub container manager, // we have to regenerate default status setters. kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", NumCores: 2, MemoryCapacity: 10E9, // 10G } kubelet.machineInfo = machineInfo expectedNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, Spec: v1.NodeSpec{}, Status: v1.NodeStatus{ Conditions: []v1.NodeCondition{ { Type: v1.NodeOutOfDisk, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientDisk", Message: fmt.Sprintf("kubelet has sufficient disk space available"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodeMemoryPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientMemory", Message: fmt.Sprintf("kubelet has sufficient memory available"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodeDiskPressure, Status: v1.ConditionFalse, Reason: "KubeletHasNoDiskPressure", Message: fmt.Sprintf("kubelet has no disk pressure"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodePIDPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientPID", Message: fmt.Sprintf("kubelet has sufficient PID available"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodeReady, Status: v1.ConditionTrue, Reason: "KubeletReady", Message: fmt.Sprintf("kubelet is posting ready status"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, }, NodeInfo: v1.NodeSystemInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", KernelVersion: cadvisortest.FakeKernelVersion, OSImage: cadvisortest.FakeContainerOsVersion, OperatingSystem: goruntime.GOOS, Architecture: goruntime.GOARCH, ContainerRuntimeVersion: "test://1.5.0", KubeletVersion: version.Get().String(), KubeProxyVersion: version.Get().String(), }, Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(3000, resource.BinarySI), }, Addresses: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, Images: expectedImageList, }, } kubelet.updateRuntimeUp() assert.NoError(t, kubelet.updateNodeStatus()) actions := kubeClient.Actions() require.Len(t, actions, 2) require.True(t, actions[1].Matches("patch", "nodes")) require.Equal(t, actions[1].GetSubresource(), "status") updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch()) assert.NoError(t, err) for i, cond := range updatedNode.Status.Conditions { assert.False(t, cond.LastHeartbeatTime.IsZero(), "LastHeartbeatTime for %v condition is zero", cond.Type) assert.False(t, cond.LastTransitionTime.IsZero(), "LastTransitionTime for %v condition is zero", cond.Type) updatedNode.Status.Conditions[i].LastHeartbeatTime = metav1.Time{} updatedNode.Status.Conditions[i].LastTransitionTime = metav1.Time{} } // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type, "NotReady should be last") assert.Len(t, updatedNode.Status.Images, len(expectedImageList)) assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode)) }) } } func TestUpdateExistingNodeStatus(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubelet.nodeStatusMaxImages = 5 // don't truncate the image list that gets constructed by hand for this test kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatableReservation: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), }, capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, } // Since this test retroactively overrides the stub container manager, // we have to regenerate default status setters. kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, Spec: v1.NodeSpec{}, Status: v1.NodeStatus{ Conditions: []v1.NodeCondition{ { Type: v1.NodeOutOfDisk, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientDisk", Message: fmt.Sprintf("kubelet has sufficient disk space available"), LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, { Type: v1.NodeMemoryPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientMemory", Message: fmt.Sprintf("kubelet has sufficient memory available"), LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, { Type: v1.NodeDiskPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientDisk", Message: fmt.Sprintf("kubelet has sufficient disk space available"), LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, { Type: v1.NodePIDPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientPID", Message: fmt.Sprintf("kubelet has sufficient PID available"), LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, { Type: v1.NodeReady, Status: v1.ConditionTrue, Reason: "KubeletReady", Message: fmt.Sprintf("kubelet is posting ready status"), LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, }, Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, }, } kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", NumCores: 2, MemoryCapacity: 20E9, } kubelet.machineInfo = machineInfo expectedNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, Spec: v1.NodeSpec{}, Status: v1.NodeStatus{ Conditions: []v1.NodeCondition{ { Type: v1.NodeOutOfDisk, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientDisk", Message: fmt.Sprintf("kubelet has sufficient disk space available"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodeMemoryPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientMemory", Message: fmt.Sprintf("kubelet has sufficient memory available"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodeDiskPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientDisk", Message: fmt.Sprintf("kubelet has sufficient disk space available"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodePIDPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientPID", Message: fmt.Sprintf("kubelet has sufficient PID available"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodeReady, Status: v1.ConditionTrue, Reason: "KubeletReady", Message: fmt.Sprintf("kubelet is posting ready status"), LastHeartbeatTime: metav1.Time{}, // placeholder LastTransitionTime: metav1.Time{}, // placeholder }, }, NodeInfo: v1.NodeSystemInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", KernelVersion: cadvisortest.FakeKernelVersion, OSImage: cadvisortest.FakeContainerOsVersion, OperatingSystem: goruntime.GOOS, Architecture: goruntime.GOARCH, ContainerRuntimeVersion: "test://1.5.0", KubeletVersion: version.Get().String(), KubeProxyVersion: version.Get().String(), }, Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Addresses: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, // images will be sorted from max to min in node status. Images: []v1.ContainerImage{ { Names: []string{"k8s.gcr.io:v1", "k8s.gcr.io:v2"}, SizeBytes: 123, }, { Names: []string{"k8s.gcr.io:v3", "k8s.gcr.io:v4"}, SizeBytes: 456, }, }, }, } kubelet.updateRuntimeUp() assert.NoError(t, kubelet.updateNodeStatus()) actions := kubeClient.Actions() assert.Len(t, actions, 2) assert.IsType(t, core.PatchActionImpl{}, actions[1]) patchAction := actions[1].(core.PatchActionImpl) updatedNode, err := applyNodeStatusPatch(&existingNode, patchAction.GetPatch()) require.NoError(t, err) for i, cond := range updatedNode.Status.Conditions { old := metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time // Expect LastHearbeat to be updated to Now, while LastTransitionTime to be the same. assert.NotEqual(t, old, cond.LastHeartbeatTime.Rfc3339Copy().UTC(), "LastHeartbeatTime for condition %v", cond.Type) assert.EqualValues(t, old, cond.LastTransitionTime.Rfc3339Copy().UTC(), "LastTransitionTime for condition %v", cond.Type) updatedNode.Status.Conditions[i].LastHeartbeatTime = metav1.Time{} updatedNode.Status.Conditions[i].LastTransitionTime = metav1.Time{} } // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type, "NodeReady should be the last condition") assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode)) } func TestUpdateExistingNodeStatusTimeout(t *testing.T) { attempts := int64(0) failureCallbacks := int64(0) // set up a listener that hangs connections ln, err := net.Listen("tcp", "127.0.0.1:0") assert.NoError(t, err) defer ln.Close() go func() { // accept connections and just let them hang for { _, err := ln.Accept() if err != nil { t.Log(err) return } t.Log("accepted connection") atomic.AddInt64(&attempts, 1) } }() config := &rest.Config{ Host: "http://" + ln.Addr().String(), QPS: -1, Timeout: time.Second, } assert.NoError(t, err) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.heartbeatClient, err = v1core.NewForConfig(config) kubelet.onRepeatedHeartbeatFailure = func() { atomic.AddInt64(&failureCallbacks, 1) } kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatableReservation: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), }, capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), }, } // should return an error, but not hang assert.Error(t, kubelet.updateNodeStatus()) // should have attempted multiple times if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts < nodeStatusUpdateRetry { t.Errorf("Expected at least %d attempts, got %d", nodeStatusUpdateRetry, actualAttempts) } // should have gotten multiple failure callbacks if actualFailureCallbacks := atomic.LoadInt64(&failureCallbacks); actualFailureCallbacks < (nodeStatusUpdateRetry - 1) { t.Errorf("Expected %d failure callbacks, got %d", (nodeStatusUpdateRetry - 1), actualFailureCallbacks) } } func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubelet.nodeStatusMaxImages = 5 // don't truncate the image list that gets constructed by hand for this test kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatableReservation: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(10E9, resource.BinarySI), }, capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(20E9, resource.BinarySI), }, } // Since this test retroactively overrides the stub container manager, // we have to regenerate default status setters. kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() clock := testKubelet.fakeClock kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", NumCores: 2, MemoryCapacity: 10E9, } kubelet.machineInfo = machineInfo expectedNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, Spec: v1.NodeSpec{}, Status: v1.NodeStatus{ Conditions: []v1.NodeCondition{ { Type: v1.NodeOutOfDisk, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientDisk", Message: fmt.Sprintf("kubelet has sufficient disk space available"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodeMemoryPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientMemory", Message: fmt.Sprintf("kubelet has sufficient memory available"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodeDiskPressure, Status: v1.ConditionFalse, Reason: "KubeletHasNoDiskPressure", Message: fmt.Sprintf("kubelet has no disk pressure"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, { Type: v1.NodePIDPressure, Status: v1.ConditionFalse, Reason: "KubeletHasSufficientPID", Message: fmt.Sprintf("kubelet has sufficient PID available"), LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, }, {}, //placeholder }, NodeInfo: v1.NodeSystemInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", KernelVersion: cadvisortest.FakeKernelVersion, OSImage: cadvisortest.FakeContainerOsVersion, OperatingSystem: goruntime.GOOS, Architecture: goruntime.GOARCH, ContainerRuntimeVersion: "test://1.5.0", KubeletVersion: version.Get().String(), KubeProxyVersion: version.Get().String(), }, Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(20E9, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(10E9, resource.BinarySI), }, Addresses: []v1.NodeAddress{ {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, {Type: v1.NodeHostName, Address: testKubeletHostname}, }, Images: []v1.ContainerImage{ { Names: []string{"k8s.gcr.io:v1", "k8s.gcr.io:v2"}, SizeBytes: 123, }, { Names: []string{"k8s.gcr.io:v3", "k8s.gcr.io:v4"}, SizeBytes: 456, }, }, }, } checkNodeStatus := func(status v1.ConditionStatus, reason string) { kubeClient.ClearActions() assert.NoError(t, kubelet.updateNodeStatus()) actions := kubeClient.Actions() require.Len(t, actions, 2) require.True(t, actions[1].Matches("patch", "nodes")) require.Equal(t, actions[1].GetSubresource(), "status") updatedNode, err := kubeClient.CoreV1().Nodes().Get(testKubeletHostname, metav1.GetOptions{}) require.NoError(t, err, "can't apply node status patch") for i, cond := range updatedNode.Status.Conditions { assert.False(t, cond.LastHeartbeatTime.IsZero(), "LastHeartbeatTime for %v condition is zero", cond.Type) assert.False(t, cond.LastTransitionTime.IsZero(), "LastTransitionTime for %v condition is zero", cond.Type) updatedNode.Status.Conditions[i].LastHeartbeatTime = metav1.Time{} updatedNode.Status.Conditions[i].LastTransitionTime = metav1.Time{} } // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 lastIndex := len(updatedNode.Status.Conditions) - 1 assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[lastIndex].Type, "NodeReady should be the last condition") assert.NotEmpty(t, updatedNode.Status.Conditions[lastIndex].Message) updatedNode.Status.Conditions[lastIndex].Message = "" expectedNode.Status.Conditions[lastIndex] = v1.NodeCondition{ Type: v1.NodeReady, Status: status, Reason: reason, LastHeartbeatTime: metav1.Time{}, LastTransitionTime: metav1.Time{}, } assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode)) } // TODO(random-liu): Refactor the unit test to be table driven test. // Should report kubelet not ready if the runtime check is out of date clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime)) kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionFalse, "KubeletNotReady") // Should report kubelet ready if the runtime check is updated clock.SetTime(time.Now()) kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionTrue, "KubeletReady") // Should report kubelet not ready if the runtime check is out of date clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime)) kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionFalse, "KubeletNotReady") // Should report kubelet not ready if the runtime check failed fakeRuntime := testKubelet.fakeRuntime // Inject error into fake runtime status check, node should be NotReady fakeRuntime.StatusErr = fmt.Errorf("injected runtime status error") clock.SetTime(time.Now()) kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionFalse, "KubeletNotReady") fakeRuntime.StatusErr = nil // Should report node not ready if runtime status is nil. fakeRuntime.RuntimeStatus = nil kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionFalse, "KubeletNotReady") // Should report node not ready if runtime status is empty. fakeRuntime.RuntimeStatus = &kubecontainer.RuntimeStatus{} kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionFalse, "KubeletNotReady") // Should report node not ready if RuntimeReady is false. fakeRuntime.RuntimeStatus = &kubecontainer.RuntimeStatus{ Conditions: []kubecontainer.RuntimeCondition{ {Type: kubecontainer.RuntimeReady, Status: false}, {Type: kubecontainer.NetworkReady, Status: true}, }, } kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionFalse, "KubeletNotReady") // Should report node ready if RuntimeReady is true. fakeRuntime.RuntimeStatus = &kubecontainer.RuntimeStatus{ Conditions: []kubecontainer.RuntimeCondition{ {Type: kubecontainer.RuntimeReady, Status: true}, {Type: kubecontainer.NetworkReady, Status: true}, }, } kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionTrue, "KubeletReady") // Should report node not ready if NetworkReady is false. fakeRuntime.RuntimeStatus = &kubecontainer.RuntimeStatus{ Conditions: []kubecontainer.RuntimeCondition{ {Type: kubecontainer.RuntimeReady, Status: true}, {Type: kubecontainer.NetworkReady, Status: false}, }, } kubelet.updateRuntimeUp() checkNodeStatus(v1.ConditionFalse, "KubeletNotReady") } func TestUpdateNodeStatusError(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubelet.kubeClient = nil // ensure only the heartbeat client is used // No matching node for the kubelet testKubelet.fakeKubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{}}).ReactionChain assert.Error(t, kubelet.updateNodeStatus()) assert.Len(t, testKubelet.fakeKubeClient.Actions(), nodeStatusUpdateRetry) } func TestRegisterWithApiServer(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubeClient := testKubelet.fakeKubeClient kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) { // Return an error on create. return true, &v1.Node{}, &apierrors.StatusError{ ErrStatus: metav1.Status{Reason: metav1.StatusReasonAlreadyExists}, } }) kubeClient.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { // Return an existing (matching) node on get. return true, &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: testKubeletHostname, Labels: map[string]string{ kubeletapis.LabelHostname: testKubeletHostname, kubeletapis.LabelOS: goruntime.GOOS, kubeletapis.LabelArch: goruntime.GOARCH, }, }, }, nil }) addNotImplatedReaction(kubeClient) machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", NumCores: 2, MemoryCapacity: 1024, } kubelet.machineInfo = machineInfo done := make(chan struct{}) go func() { kubelet.registerWithAPIServer() done <- struct{}{} }() select { case <-time.After(wait.ForeverTestTimeout): assert.Fail(t, "timed out waiting for registration") case <-done: return } } func TestTryRegisterWithApiServer(t *testing.T) { alreadyExists := &apierrors.StatusError{ ErrStatus: metav1.Status{Reason: metav1.StatusReasonAlreadyExists}, } conflict := &apierrors.StatusError{ ErrStatus: metav1.Status{Reason: metav1.StatusReasonConflict}, } newNode := func(cmad bool) *v1.Node { node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ kubeletapis.LabelHostname: testKubeletHostname, kubeletapis.LabelOS: goruntime.GOOS, kubeletapis.LabelArch: goruntime.GOARCH, }, }, } if cmad { node.Annotations = make(map[string]string) node.Annotations[util.ControllerManagedAttachAnnotation] = "true" } return node } cases := []struct { name string newNode *v1.Node existingNode *v1.Node createError error getError error patchError error deleteError error expectedResult bool expectedActions int testSavedNode bool savedNodeIndex int savedNodeCMAD bool }{ { name: "success case - new node", newNode: &v1.Node{}, expectedResult: true, expectedActions: 1, }, { name: "success case - existing node - no change in CMAD", newNode: newNode(true), createError: alreadyExists, existingNode: newNode(true), expectedResult: true, expectedActions: 2, }, { name: "success case - existing node - CMAD disabled", newNode: newNode(false), createError: alreadyExists, existingNode: newNode(true), expectedResult: true, expectedActions: 3, testSavedNode: true, savedNodeIndex: 2, savedNodeCMAD: false, }, { name: "success case - existing node - CMAD enabled", newNode: newNode(true), createError: alreadyExists, existingNode: newNode(false), expectedResult: true, expectedActions: 3, testSavedNode: true, savedNodeIndex: 2, savedNodeCMAD: true, }, { name: "create failed", newNode: newNode(false), createError: conflict, expectedResult: false, expectedActions: 1, }, { name: "get existing node failed", newNode: newNode(false), createError: alreadyExists, getError: conflict, expectedResult: false, expectedActions: 2, }, { name: "update existing node failed", newNode: newNode(false), createError: alreadyExists, existingNode: newNode(true), patchError: conflict, expectedResult: false, expectedActions: 3, }, } for _, tc := range cases { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled is a don't-care for this test */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubeClient := testKubelet.fakeKubeClient kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) { return true, nil, tc.createError }) kubeClient.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { // Return an existing (matching) node on get. return true, tc.existingNode, tc.getError }) kubeClient.AddReactor("patch", "nodes", func(action core.Action) (bool, runtime.Object, error) { if action.GetSubresource() == "status" { return true, nil, tc.patchError } return notImplemented(action) }) kubeClient.AddReactor("delete", "nodes", func(action core.Action) (bool, runtime.Object, error) { return true, nil, tc.deleteError }) addNotImplatedReaction(kubeClient) result := kubelet.tryRegisterWithAPIServer(tc.newNode) require.Equal(t, tc.expectedResult, result, "test [%s]", tc.name) actions := kubeClient.Actions() assert.Len(t, actions, tc.expectedActions, "test [%s]", tc.name) if tc.testSavedNode { var savedNode *v1.Node t.Logf("actions: %v: %+v", len(actions), actions) action := actions[tc.savedNodeIndex] if action.GetVerb() == "create" { createAction := action.(core.CreateAction) obj := createAction.GetObject() require.IsType(t, &v1.Node{}, obj) savedNode = obj.(*v1.Node) } else if action.GetVerb() == "patch" { patchAction := action.(core.PatchActionImpl) var err error savedNode, err = applyNodeStatusPatch(tc.existingNode, patchAction.GetPatch()) require.NoError(t, err) } actualCMAD, _ := strconv.ParseBool(savedNode.Annotations[util.ControllerManagedAttachAnnotation]) assert.Equal(t, tc.savedNodeCMAD, actualCMAD, "test [%s]", tc.name) } } } func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) { const nodeStatusMaxImages = 5 // generate one more in inputImageList than we configure the Kubelet to report inputImageList, _ := generateTestingImageLists(nodeStatusMaxImages+1, nodeStatusMaxImages) testKubelet := newTestKubeletWithImageList( t, inputImageList, false /* controllerAttachDetachEnabled */, true /* initFakeVolumePlugin */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubelet.nodeStatusMaxImages = nodeStatusMaxImages kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatableReservation: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(40000, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(1000, resource.BinarySI), }, capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(3000, resource.BinarySI), }, } // Since this test retroactively overrides the stub container manager, // we have to regenerate default status setters. kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", NumCores: 2, MemoryCapacity: 10E9, // 10G } kubelet.machineInfo = machineInfo expectedNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, Spec: v1.NodeSpec{}, Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(3000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), v1.ResourceEphemeralStorage: *resource.NewQuantity(2000, resource.BinarySI), }, }, } kubelet.updateRuntimeUp() assert.NoError(t, kubelet.updateNodeStatus()) actions := kubeClient.Actions() require.Len(t, actions, 2) require.True(t, actions[1].Matches("patch", "nodes")) require.Equal(t, actions[1].GetSubresource(), "status") updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch()) assert.NoError(t, err) assert.True(t, apiequality.Semantic.DeepEqual(expectedNode.Status.Allocatable, updatedNode.Status.Allocatable), "%s", diff.ObjectDiff(expectedNode.Status.Allocatable, updatedNode.Status.Allocatable)) } func TestUpdateDefaultLabels(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) testKubelet.kubelet.kubeClient = nil // ensure only the heartbeat client is used cases := []struct { name string initialNode *v1.Node existingNode *v1.Node needsUpdate bool finalLabels map[string]string }{ { name: "make sure default labels exist", initialNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ kubeletapis.LabelHostname: "new-hostname", kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain", kubeletapis.LabelZoneRegion: "new-zone-region", kubeletapis.LabelInstanceType: "new-instance-type", kubeletapis.LabelOS: "new-os", kubeletapis.LabelArch: "new-arch", }, }, }, existingNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{}, }, }, needsUpdate: true, finalLabels: map[string]string{ kubeletapis.LabelHostname: "new-hostname", kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain", kubeletapis.LabelZoneRegion: "new-zone-region", kubeletapis.LabelInstanceType: "new-instance-type", kubeletapis.LabelOS: "new-os", kubeletapis.LabelArch: "new-arch", }, }, { name: "make sure default labels are up to date", initialNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ kubeletapis.LabelHostname: "new-hostname", kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain", kubeletapis.LabelZoneRegion: "new-zone-region", kubeletapis.LabelInstanceType: "new-instance-type", kubeletapis.LabelOS: "new-os", kubeletapis.LabelArch: "new-arch", }, }, }, existingNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ kubeletapis.LabelHostname: "old-hostname", kubeletapis.LabelZoneFailureDomain: "old-zone-failure-domain", kubeletapis.LabelZoneRegion: "old-zone-region", kubeletapis.LabelInstanceType: "old-instance-type", kubeletapis.LabelOS: "old-os", kubeletapis.LabelArch: "old-arch", }, }, }, needsUpdate: true, finalLabels: map[string]string{ kubeletapis.LabelHostname: "new-hostname", kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain", kubeletapis.LabelZoneRegion: "new-zone-region", kubeletapis.LabelInstanceType: "new-instance-type", kubeletapis.LabelOS: "new-os", kubeletapis.LabelArch: "new-arch", }, }, { name: "make sure existing labels do not get deleted", initialNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ kubeletapis.LabelHostname: "new-hostname", kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain", kubeletapis.LabelZoneRegion: "new-zone-region", kubeletapis.LabelInstanceType: "new-instance-type", kubeletapis.LabelOS: "new-os", kubeletapis.LabelArch: "new-arch", }, }, }, existingNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ kubeletapis.LabelHostname: "new-hostname", kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain", kubeletapis.LabelZoneRegion: "new-zone-region", kubeletapis.LabelInstanceType: "new-instance-type", kubeletapis.LabelOS: "new-os", kubeletapis.LabelArch: "new-arch", "please-persist": "foo", }, }, }, needsUpdate: false, finalLabels: map[string]string{ kubeletapis.LabelHostname: "new-hostname", kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain", kubeletapis.LabelZoneRegion: "new-zone-region", kubeletapis.LabelInstanceType: "new-instance-type", kubeletapis.LabelOS: "new-os", kubeletapis.LabelArch: "new-arch", "please-persist": "foo", }, }, { name: "make sure existing labels do not get deleted when initial node has no opinion", initialNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{}, }, }, existingNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ kubeletapis.LabelHostname: "new-hostname", kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain", kubeletapis.LabelZoneRegion: "new-zone-region", kubeletapis.LabelInstanceType: "new-instance-type", kubeletapis.LabelOS: "new-os", kubeletapis.LabelArch: "new-arch", "please-persist": "foo", }, }, }, needsUpdate: false, finalLabels: map[string]string{ kubeletapis.LabelHostname: "new-hostname", kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain", kubeletapis.LabelZoneRegion: "new-zone-region", kubeletapis.LabelInstanceType: "new-instance-type", kubeletapis.LabelOS: "new-os", kubeletapis.LabelArch: "new-arch", "please-persist": "foo", }, }, { name: "no update needed", initialNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ kubeletapis.LabelHostname: "new-hostname", kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain", kubeletapis.LabelZoneRegion: "new-zone-region", kubeletapis.LabelInstanceType: "new-instance-type", kubeletapis.LabelOS: "new-os", kubeletapis.LabelArch: "new-arch", }, }, }, existingNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ kubeletapis.LabelHostname: "new-hostname", kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain", kubeletapis.LabelZoneRegion: "new-zone-region", kubeletapis.LabelInstanceType: "new-instance-type", kubeletapis.LabelOS: "new-os", kubeletapis.LabelArch: "new-arch", }, }, }, needsUpdate: false, finalLabels: map[string]string{ kubeletapis.LabelHostname: "new-hostname", kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain", kubeletapis.LabelZoneRegion: "new-zone-region", kubeletapis.LabelInstanceType: "new-instance-type", kubeletapis.LabelOS: "new-os", kubeletapis.LabelArch: "new-arch", }, }, { name: "not panic when existing node has nil labels", initialNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ kubeletapis.LabelHostname: "new-hostname", kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain", kubeletapis.LabelZoneRegion: "new-zone-region", kubeletapis.LabelInstanceType: "new-instance-type", kubeletapis.LabelOS: "new-os", kubeletapis.LabelArch: "new-arch", }, }, }, existingNode: &v1.Node{ ObjectMeta: metav1.ObjectMeta{}, }, needsUpdate: true, finalLabels: map[string]string{ kubeletapis.LabelHostname: "new-hostname", kubeletapis.LabelZoneFailureDomain: "new-zone-failure-domain", kubeletapis.LabelZoneRegion: "new-zone-region", kubeletapis.LabelInstanceType: "new-instance-type", kubeletapis.LabelOS: "new-os", kubeletapis.LabelArch: "new-arch", }, }, } for _, tc := range cases { defer testKubelet.Cleanup() kubelet := testKubelet.kubelet needsUpdate := kubelet.updateDefaultLabels(tc.initialNode, tc.existingNode) assert.Equal(t, tc.needsUpdate, needsUpdate, tc.name) assert.Equal(t, tc.finalLabels, tc.existingNode.Labels, tc.name) } } func TestReconcileExtendedResource(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) testKubelet.kubelet.kubeClient = nil // ensure only the heartbeat client is used extendedResourceName1 := v1.ResourceName("test.com/resource1") extendedResourceName2 := v1.ResourceName("test.com/resource2") cases := []struct { name string existingNode *v1.Node expectedNode *v1.Node needsUpdate bool }{ { name: "no update needed without extended resource", existingNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, }, }, needsUpdate: false, }, { name: "extended resource capacity is zeroed", existingNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), extendedResourceName1: *resource.NewQuantity(int64(2), resource.DecimalSI), extendedResourceName2: *resource.NewQuantity(int64(10), resource.DecimalSI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), extendedResourceName1: *resource.NewQuantity(int64(2), resource.DecimalSI), extendedResourceName2: *resource.NewQuantity(int64(10), resource.DecimalSI), }, }, }, expectedNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), extendedResourceName1: *resource.NewQuantity(int64(0), resource.DecimalSI), extendedResourceName2: *resource.NewQuantity(int64(0), resource.DecimalSI), }, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), extendedResourceName1: *resource.NewQuantity(int64(0), resource.DecimalSI), extendedResourceName2: *resource.NewQuantity(int64(0), resource.DecimalSI), }, }, }, needsUpdate: true, }, } for _, tc := range cases { defer testKubelet.Cleanup() kubelet := testKubelet.kubelet initialNode := &v1.Node{} needsUpdate := kubelet.reconcileExtendedResource(initialNode, tc.existingNode) assert.Equal(t, tc.needsUpdate, needsUpdate, tc.name) assert.Equal(t, tc.expectedNode, tc.existingNode, tc.name) } } func TestValidateNodeIPParam(t *testing.T) { type test struct { nodeIP string success bool testName string } tests := []test{ { nodeIP: "", success: false, testName: "IP not set", }, { nodeIP: "127.0.0.1", success: false, testName: "IPv4 loopback address", }, { nodeIP: "::1", success: false, testName: "IPv6 loopback address", }, { nodeIP: "224.0.0.1", success: false, testName: "multicast IPv4 address", }, { nodeIP: "ff00::1", success: false, testName: "multicast IPv6 address", }, { nodeIP: "169.254.0.1", success: false, testName: "IPv4 link-local unicast address", }, { nodeIP: "fe80::0202:b3ff:fe1e:8329", success: false, testName: "IPv6 link-local unicast address", }, { nodeIP: "0.0.0.0", success: false, testName: "Unspecified IPv4 address", }, { nodeIP: "::", success: false, testName: "Unspecified IPv6 address", }, { nodeIP: "1.2.3.4", success: false, testName: "IPv4 address that doesn't belong to host", }, } addrs, err := net.InterfaceAddrs() if err != nil { assert.Error(t, err, fmt.Sprintf( "Unable to obtain a list of the node's unicast interface addresses.")) } for _, addr := range addrs { var ip net.IP switch v := addr.(type) { case *net.IPNet: ip = v.IP case *net.IPAddr: ip = v.IP } if ip.IsLoopback() || ip.IsLinkLocalUnicast() { break } successTest := test{ nodeIP: ip.String(), success: true, testName: fmt.Sprintf("Success test case for address %s", ip.String()), } tests = append(tests, successTest) } for _, test := range tests { err := validateNodeIP(net.ParseIP(test.nodeIP)) if test.success { assert.NoError(t, err, "test %s", test.testName) } else { assert.Error(t, err, fmt.Sprintf("test %s", test.testName)) } } } func TestRegisterWithApiServerWithTaint(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet kubeClient := testKubelet.fakeKubeClient machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", BootID: "1b3", NumCores: 2, MemoryCapacity: 1024, } kubelet.machineInfo = machineInfo var gotNode runtime.Object kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) { createAction := action.(core.CreateAction) gotNode = createAction.GetObject() return true, gotNode, nil }) addNotImplatedReaction(kubeClient) // Make node to be unschedulable. kubelet.registerSchedulable = false forEachFeatureGate(t, []utilfeature.Feature{features.TaintNodesByCondition}, func(t *testing.T) { // Reset kubelet status for each test. kubelet.registrationCompleted = false // Register node to apiserver. kubelet.registerWithAPIServer() // Check the unschedulable taint. got := gotNode.(*v1.Node) unschedulableTaint := &v1.Taint{ Key: algorithm.TaintNodeUnschedulable, Effect: v1.TaintEffectNoSchedule, } require.Equal(t, utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition), taintutil.TaintExists(got.Spec.Taints, unschedulableTaint), "test unschedulable taint for TaintNodesByCondition") return }) }