Merge pull request #70344 from andrewsykim/consolidate-node-delete

consolidate node deletion logic between kube-controller-manager and cloud-controller-manager
pull/564/head
Kubernetes Prow Robot 2018-12-17 15:49:16 -08:00 committed by GitHub
commit e2be7c91d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 750 additions and 702 deletions

View File

@ -226,12 +226,23 @@ func startControllers(c *cloudcontrollerconfig.CompletedConfig, stop <-chan stru
nodeController := cloudcontrollers.NewCloudNodeController(
c.SharedInformers.Core().V1().Nodes(),
client("cloud-node-controller"), cloud,
c.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
c.ComponentConfig.NodeStatusUpdateFrequency.Duration)
nodeController.Run(stop)
go nodeController.Run(stop)
time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
cloudNodeLifecycleController, err := cloudcontrollers.NewCloudNodeLifecycleController(
c.SharedInformers.Core().V1().Nodes(),
client("cloud-node-lifecycle-controller"), cloud,
c.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
)
if err != nil {
klog.Errorf("failed to start cloud node lifecycle controller: %s", err)
} else {
go cloudNodeLifecycleController.Run(stop)
time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
}
// Start the PersistentVolumeLabelController
pvlController := cloudcontrollers.NewPersistentVolumeLabelController(client("pvl-controller"), cloud)
go pvlController.Run(5, stop)

View File

@ -46,6 +46,7 @@ go_library(
"//pkg/controller/certificates/cleaner:go_default_library",
"//pkg/controller/certificates/rootcacertpublisher:go_default_library",
"//pkg/controller/certificates/signer:go_default_library",
"//pkg/controller/cloud:go_default_library",
"//pkg/controller/clusterroleaggregation:go_default_library",
"//pkg/controller/cronjob:go_default_library",
"//pkg/controller/daemon:go_default_library",

View File

@ -383,13 +383,13 @@ func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
controllers["nodeipam"] = startNodeIpamController
controllers["nodelifecycle"] = startNodeLifecycleController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["route"] = startRouteController
controllers["cloudnodelifecycle"] = startCloudNodeLifecycleController
// TODO: volume controller into the IncludeCloudLoops only set.
// TODO: Separate cluster in cloud check from node lifecycle controller.
}
controllers["nodelifecycle"] = startNodeLifecycleController
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController

View File

@ -38,6 +38,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/kubernetes/pkg/controller"
cloudcontroller "k8s.io/kubernetes/pkg/controller/cloud"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
@ -125,7 +126,6 @@ func startNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, er
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Extensions().V1beta1().DaemonSets(),
ctx.Cloud,
ctx.ClientBuilder.ClientOrDie("node-controller"),
ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
ctx.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration,
@ -146,6 +146,24 @@ func startNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, er
return nil, true, nil
}
func startCloudNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) {
cloudNodeLifecycleController, err := cloudcontroller.NewCloudNodeLifecycleController(
ctx.InformerFactory.Core().V1().Nodes(),
ctx.ClientBuilder.ClientOrDie("cloud-node-lifecycle-controller"),
ctx.Cloud,
ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
)
if err != nil {
// the controller manager should continue to run if the "Instances" interface is not
// supported, though it's unlikely for a cloud provider to not support it
klog.Errorf("failed to start cloud node lifecycle controller: %v", err)
return nil, false, nil
}
go cloudNodeLifecycleController.Run(ctx.Stop)
return nil, true, nil
}
func startRouteController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)

View File

@ -55,7 +55,6 @@ go_library(
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/install:go_default_library",
"//pkg/apis/core/validation:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/serviceaccount:go_default_library",
"//pkg/util/hash:go_default_library",
"//pkg/util/taints:go_default_library",

View File

@ -10,6 +10,7 @@ go_library(
name = "go_default_library",
srcs = [
"node_controller.go",
"node_lifecycle_controller.go",
"pvlcontroller.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/cloud",
@ -17,7 +18,6 @@ go_library(
"//pkg/api/v1/node:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/util/node:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/api:go_default_library",
@ -26,6 +26,7 @@ go_library(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
@ -51,6 +52,7 @@ go_test(
name = "go_default_test",
srcs = [
"node_controller_test.go",
"node_lifecycle_controller_test.go",
"pvlcontroller_test.go",
],
embed = [":go_default_library"],
@ -67,10 +69,10 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",

View File

@ -37,9 +37,6 @@ import (
"k8s.io/client-go/tools/record"
clientretry "k8s.io/client-go/util/retry"
cloudprovider "k8s.io/cloud-provider"
nodeutilv1 "k8s.io/kubernetes/pkg/api/v1/node"
"k8s.io/kubernetes/pkg/controller"
nodectrlutil "k8s.io/kubernetes/pkg/controller/util/node"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
nodeutil "k8s.io/kubernetes/pkg/util/node"
@ -58,11 +55,6 @@ type CloudNodeController struct {
cloud cloudprovider.Interface
// Value controlling NodeController monitoring period, i.e. how often does NodeController
// check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod
// set in controller-manager
nodeMonitorPeriod time.Duration
nodeStatusUpdateFrequency time.Duration
}
@ -79,7 +71,6 @@ func NewCloudNodeController(
nodeInformer coreinformers.NodeInformer,
kubeClient clientset.Interface,
cloud cloudprovider.Interface,
nodeMonitorPeriod time.Duration,
nodeStatusUpdateFrequency time.Duration) *CloudNodeController {
eventBroadcaster := record.NewBroadcaster()
@ -97,7 +88,6 @@ func NewCloudNodeController(
kubeClient: kubeClient,
recorder: recorder,
cloud: cloud,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
}
@ -111,8 +101,9 @@ func NewCloudNodeController(
return cnc
}
// This controller deletes a node if kubelet is not reporting
// and the node is gone from the cloud provider.
// This controller updates newly registered nodes with information
// from the cloud provider. This call is blocking so should be called
// via a goroutine
func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
@ -121,10 +112,7 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) {
// very infrequently. DO NOT MODIFY this to perform frequent operations.
// Start a loop to periodically update the node addresses obtained from the cloud
go wait.Until(cnc.UpdateNodeStatus, cnc.nodeStatusUpdateFrequency, stopCh)
// Start a loop to periodically check if any nodes have been deleted from cloudprovider
go wait.Until(cnc.MonitorNode, cnc.nodeMonitorPeriod, stopCh)
wait.Until(cnc.UpdateNodeStatus, cnc.nodeStatusUpdateFrequency, stopCh)
}
// UpdateNodeStatus updates the node status, such as node addresses
@ -210,108 +198,6 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud
}
}
// Monitor node queries the cloudprovider for non-ready nodes and deletes them
// if they cannot be found in the cloud provider
func (cnc *CloudNodeController) MonitorNode() {
instances, ok := cnc.cloud.Instances()
if !ok {
utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider"))
return
}
nodes, err := cnc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{ResourceVersion: "0"})
if err != nil {
klog.Errorf("Error monitoring node status: %v", err)
return
}
for i := range nodes.Items {
var currentReadyCondition *v1.NodeCondition
node := &nodes.Items[i]
// Try to get the current node status
// If node status is empty, then kubelet has not posted ready status yet. In this case, process next node
for rep := 0; rep < nodeStatusUpdateRetry; rep++ {
_, currentReadyCondition = nodeutilv1.GetNodeCondition(&node.Status, v1.NodeReady)
if currentReadyCondition != nil {
break
}
name := node.Name
node, err = cnc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name)
break
}
time.Sleep(retrySleepTime)
}
if currentReadyCondition == nil {
klog.Errorf("Update status of Node %v from CloudNodeController exceeds retry count or the Node was deleted.", node.Name)
continue
}
// If the known node status says that Node is NotReady, then check if the node has been removed
// from the cloud provider. If node cannot be found in cloudprovider, then delete the node immediately
if currentReadyCondition != nil {
if currentReadyCondition.Status != v1.ConditionTrue {
// we need to check this first to get taint working in similar in all cloudproviders
// current problem is that shutdown nodes are not working in similar way ie. all cloudproviders
// does not delete node from kubernetes cluster when instance it is shutdown see issue #46442
shutdown, err := nodectrlutil.ShutdownInCloudProvider(context.TODO(), cnc.cloud, node)
if err != nil {
klog.Errorf("Error checking if node %s is shutdown: %v", node.Name, err)
}
if shutdown && err == nil {
// if node is shutdown add shutdown taint
err = controller.AddOrUpdateTaintOnNode(cnc.kubeClient, node.Name, controller.ShutdownTaint)
if err != nil {
klog.Errorf("Error patching node taints: %v", err)
}
// Continue checking the remaining nodes since the current one is shutdown.
continue
}
// Check with the cloud provider to see if the node still exists. If it
// doesn't, delete the node immediately.
exists, err := ensureNodeExistsByProviderID(instances, node)
if err != nil {
klog.Errorf("Error checking if node %s exists: %v", node.Name, err)
continue
}
if exists {
// Continue checking the remaining nodes since the current one is fine.
continue
}
klog.V(2).Infof("Deleting node since it is no longer present in cloud provider: %s", node.Name)
ref := &v1.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: types.UID(node.UID),
Namespace: "",
}
klog.V(2).Infof("Recording %s event message for node %s", "DeletingNode", node.Name)
cnc.recorder.Eventf(ref, v1.EventTypeNormal, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name), "Node %s event: %s", node.Name, "DeletingNode")
go func(nodeName string) {
defer utilruntime.HandleCrash()
if err := cnc.kubeClient.CoreV1().Nodes().Delete(nodeName, nil); err != nil {
klog.Errorf("unable to delete node %q: %v", nodeName, err)
}
}(node.Name)
} else {
// if taint exist remove taint
err = controller.RemoveTaintOffNode(cnc.kubeClient, node.Name, node, controller.ShutdownTaint)
if err != nil {
klog.Errorf("Error patching node taints: %v", err)
}
}
}
}
}
func (cnc *CloudNodeController) UpdateCloudNode(_, newObj interface{}) {
if _, ok := newObj.(*v1.Node); !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))

View File

@ -27,7 +27,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
@ -164,205 +163,6 @@ func TestEnsureNodeExistsByProviderID(t *testing.T) {
}
func TestNodeShutdown(t *testing.T) {
testCases := []struct {
testName string
node *v1.Node
existsByProviderID bool
shutdown bool
}{
{
testName: "node shutdowned add taint",
existsByProviderID: true,
shutdown: true,
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "node0",
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
{
testName: "node started after shutdown remove taint",
existsByProviderID: true,
shutdown: false,
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "node0",
Taints: []v1.Taint{
{
Key: schedulerapi.TaintNodeShutdown,
Effect: v1.TaintEffectNoSchedule,
},
},
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
}
for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
fc := &fakecloud.FakeCloud{
ExistsByProviderID: tc.existsByProviderID,
NodeShutdown: tc.shutdown,
}
fnh := &testutil.FakeNodeHandler{
Existing: []*v1.Node{tc.node},
Clientset: fake.NewSimpleClientset(),
PatchWaitChan: make(chan struct{}),
}
factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc())
eventBroadcaster := record.NewBroadcaster()
cloudNodeController := &CloudNodeController{
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: fc,
nodeMonitorPeriod: 1 * time.Second,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
nodeStatusUpdateFrequency: 1 * time.Second,
}
eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.Run(wait.NeverStop)
select {
case <-fnh.PatchWaitChan:
case <-time.After(1 * time.Second):
t.Errorf("Timed out waiting %v for node to be updated", wait.ForeverTestTimeout)
}
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
if tc.shutdown {
assert.Equal(t, 1, len(fnh.UpdatedNodes[0].Spec.Taints), "Node Taint was not added")
assert.Equal(t, "node.cloudprovider.kubernetes.io/shutdown", fnh.UpdatedNodes[0].Spec.Taints[0].Key, "Node Taint key is not correct")
} else {
assert.Equal(t, 0, len(fnh.UpdatedNodes[0].Spec.Taints), "Node Taint was not removed after node is back in ready state")
}
})
}
}
// This test checks that the node is deleted when kubelet stops reporting
// and cloud provider says node is gone
func TestNodeDeleted(t *testing.T) {
pod0 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod0",
},
Spec: v1.PodSpec{
NodeName: "node0",
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
}
pod1 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
},
Spec: v1.PodSpec{
NodeName: "node0",
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
}
fnh := &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*pod0, *pod1}}),
DeleteWaitChan: make(chan struct{}),
}
factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc())
eventBroadcaster := record.NewBroadcaster()
cloudNodeController := &CloudNodeController{
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: &fakecloud.FakeCloud{
ExistsByProviderID: false,
Err: nil,
},
nodeMonitorPeriod: 1 * time.Second,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
nodeStatusUpdateFrequency: 1 * time.Second,
}
eventBroadcaster.StartLogging(klog.Infof)
cloudNodeController.Run(wait.NeverStop)
select {
case <-fnh.DeleteWaitChan:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Timed out waiting %v for node to be deleted", wait.ForeverTestTimeout)
}
assert.Equal(t, 1, len(fnh.DeletedNodes), "Node was not deleted")
assert.Equal(t, "node0", fnh.DeletedNodes[0].Name, "Node was not deleted")
}
// This test checks that a node with the external cloud provider taint is cloudprovider initialized
func TestNodeInitialized(t *testing.T) {
fnh := &testutil.FakeNodeHandler{
@ -425,7 +225,6 @@ func TestNodeInitialized(t *testing.T) {
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: fakeCloud,
nodeMonitorPeriod: 1 * time.Second,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
nodeStatusUpdateFrequency: 1 * time.Second,
}
@ -491,7 +290,6 @@ func TestNodeIgnored(t *testing.T) {
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: fakeCloud,
nodeMonitorPeriod: 5 * time.Second,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
}
eventBroadcaster.StartLogging(klog.Infof)
@ -565,7 +363,6 @@ func TestGCECondition(t *testing.T) {
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: fakeCloud,
nodeMonitorPeriod: 1 * time.Second,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
}
eventBroadcaster.StartLogging(klog.Infof)
@ -655,7 +452,6 @@ func TestZoneInitialized(t *testing.T) {
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: fakeCloud,
nodeMonitorPeriod: 5 * time.Second,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
}
eventBroadcaster.StartLogging(klog.Infof)
@ -745,7 +541,6 @@ func TestNodeAddresses(t *testing.T) {
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: fakeCloud,
nodeMonitorPeriod: 5 * time.Second,
nodeStatusUpdateFrequency: 1 * time.Second,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
}
@ -768,9 +563,7 @@ func TestNodeAddresses(t *testing.T) {
},
}
cloudNodeController.Run(wait.NeverStop)
<-time.After(2 * time.Second)
cloudNodeController.UpdateNodeStatus()
updatedNodes := fnh.GetUpdatedNodesCopy()
@ -860,7 +653,6 @@ func TestNodeProvidedIPAddresses(t *testing.T) {
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: fakeCloud,
nodeMonitorPeriod: 5 * time.Second,
nodeStatusUpdateFrequency: 1 * time.Second,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
}
@ -872,9 +664,7 @@ func TestNodeProvidedIPAddresses(t *testing.T) {
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")
assert.Equal(t, 3, len(fnh.UpdatedNodes[0].Status.Addresses), "Node status unexpectedly updated")
cloudNodeController.Run(wait.NeverStop)
<-time.After(2 * time.Second)
cloudNodeController.UpdateNodeStatus()
updatedNodes := fnh.GetUpdatedNodesCopy()
@ -1152,7 +942,6 @@ func TestNodeProviderID(t *testing.T) {
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: fakeCloud,
nodeMonitorPeriod: 5 * time.Second,
nodeStatusUpdateFrequency: 1 * time.Second,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
}
@ -1236,7 +1025,6 @@ func TestNodeProviderIDAlreadySet(t *testing.T) {
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: fakeCloud,
nodeMonitorPeriod: 5 * time.Second,
nodeStatusUpdateFrequency: 1 * time.Second,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
}

View File

@ -0,0 +1,224 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloud
import (
"context"
"errors"
"fmt"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
v1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
nodeutilv1 "k8s.io/kubernetes/pkg/api/v1/node"
"k8s.io/kubernetes/pkg/controller"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
)
const (
deleteNodeEvent = "DeletingNode"
)
var ShutdownTaint = &v1.Taint{
Key: schedulerapi.TaintNodeShutdown,
Effect: v1.TaintEffectNoSchedule,
}
// CloudNodeLifecycleController is responsible for deleting/updating kubernetes
// nodes that have been deleted/shutdown on the cloud provider
type CloudNodeLifecycleController struct {
kubeClient clientset.Interface
nodeLister v1lister.NodeLister
recorder record.EventRecorder
cloud cloudprovider.Interface
// Value controlling NodeController monitoring period, i.e. how often does NodeController
// check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod
// set in controller-manager
nodeMonitorPeriod time.Duration
}
func NewCloudNodeLifecycleController(
nodeInformer coreinformers.NodeInformer,
kubeClient clientset.Interface,
cloud cloudprovider.Interface,
nodeMonitorPeriod time.Duration) (*CloudNodeLifecycleController, error) {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"})
eventBroadcaster.StartLogging(klog.Infof)
klog.Info("Sending events to api server")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
if kubeClient == nil {
return nil, errors.New("kubernetes client is nil")
}
if cloud == nil {
return nil, errors.New("no cloud provider provided")
}
if _, ok := cloud.Instances(); !ok {
return nil, errors.New("cloud provider does not support instances")
}
c := &CloudNodeLifecycleController{
kubeClient: kubeClient,
nodeLister: nodeInformer.Lister(),
recorder: recorder,
cloud: cloud,
nodeMonitorPeriod: nodeMonitorPeriod,
}
return c, nil
}
// Run starts the main loop for this controller. Run is blocking so should
// be called via a goroutine
func (c *CloudNodeLifecycleController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// The following loops run communicate with the APIServer with a worst case complexity
// of O(num_nodes) per cycle. These functions are justified here because these events fire
// very infrequently. DO NOT MODIFY this to perform frequent operations.
// Start a loop to periodically check if any nodes have been
// deleted or shutdown from the cloudprovider
wait.Until(c.MonitorNodes, c.nodeMonitorPeriod, stopCh)
}
// MonitorNodes checks to see if nodes in the cluster have been deleted
// or shutdown. If deleeted, it deletes the node resource. If shutdown it
// applies a shutdown taint to the node
func (c *CloudNodeLifecycleController) MonitorNodes() {
instances, ok := c.cloud.Instances()
if !ok {
utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider"))
return
}
nodes, err := c.nodeLister.List(labels.Everything())
if err != nil {
klog.Errorf("error listing nodes from cache: %s", err)
return
}
for _, node := range nodes {
// Default NodeReady status to v1.ConditionUnknown
status := v1.ConditionUnknown
if _, c := nodeutilv1.GetNodeCondition(&node.Status, v1.NodeReady); c != nil {
status = c.Status
}
if status == v1.ConditionTrue {
// if taint exist remove taint
err = controller.RemoveTaintOffNode(c.kubeClient, node.Name, node, ShutdownTaint)
if err != nil {
klog.Errorf("error patching node taints: %v", err)
}
continue
}
// node condition is unknown so we should not try to update it
if status == v1.ConditionUnknown {
continue
}
// status for NodeReady should be false at this point, this check
// is here as a fail safe in the case a new node condition is added
// without consideration of this controller
if status != v1.ConditionFalse {
continue
}
// we need to check this first to get taint working in similar in all cloudproviders
// current problem is that shutdown nodes are not working in similar way ie. all cloudproviders
// does not delete node from kubernetes cluster when instance it is shutdown see issue #46442
shutdown, err := shutdownInCloudProvider(context.TODO(), c.cloud, node)
if err != nil {
klog.Errorf("error checking if node %s is shutdown: %v", node.Name, err)
}
if shutdown && err == nil {
// if node is shutdown add shutdown taint
err = controller.AddOrUpdateTaintOnNode(c.kubeClient, node.Name, ShutdownTaint)
if err != nil {
klog.Errorf("failed to apply shutdown taint to node %s, it may have been deleted.", node.Name)
}
// Continue checking the remaining nodes since the current one is shutdown.
continue
}
// At this point the node has NotReady status, we need to check if the node has been removed
// from the cloud provider. If node cannot be found in cloudprovider, then delete the node
exists, err := ensureNodeExistsByProviderID(instances, node)
if err != nil {
klog.Errorf("error checking if node %s exists: %v", node.Name, err)
continue
}
if exists {
// Continue checking the remaining nodes since the current one is fine.
continue
}
klog.V(2).Infof("deleting node since it is no longer present in cloud provider: %s", node.Name)
ref := &v1.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: types.UID(node.UID),
Namespace: "",
}
c.recorder.Eventf(ref, v1.EventTypeNormal,
fmt.Sprintf("Deleting node %v because it does not exist in the cloud provider", node.Name),
"Node %s event: %s", node.Name, deleteNodeEvent)
if err := c.kubeClient.CoreV1().Nodes().Delete(node.Name, nil); err != nil {
klog.Errorf("unable to delete node %q: %v", node.Name, err)
}
}
}
// shutdownInCloudProvider returns true if the node is shutdown on the cloud provider
func shutdownInCloudProvider(ctx context.Context, cloud cloudprovider.Interface, node *v1.Node) (bool, error) {
instances, ok := cloud.Instances()
if !ok {
return false, errors.New("cloud provider does not support instances")
}
shutdown, err := instances.InstanceShutdownByProviderID(ctx, node.Spec.ProviderID)
if err == cloudprovider.NotImplemented {
return false, nil
}
return shutdown, err
}

View File

@ -0,0 +1,453 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloud
import (
"errors"
"reflect"
"testing"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/controller/testutil"
)
func Test_NodesDeleted(t *testing.T) {
testcases := []struct {
name string
fnh *testutil.FakeNodeHandler
fakeCloud *fakecloud.FakeCloud
deleteNodes []*v1.Node
}{
{
name: "node is not ready and does not exist",
fnh: &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
DeletedNodes: []*v1.Node{},
Clientset: fake.NewSimpleClientset(),
},
fakeCloud: &fakecloud.FakeCloud{
ExistsByProviderID: false,
},
deleteNodes: []*v1.Node{
testutil.NewNode("node0"),
},
},
{
name: "node is not ready and provider returns err",
fnh: &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "node0",
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
DeletedNodes: []*v1.Node{},
Clientset: fake.NewSimpleClientset(),
},
fakeCloud: &fakecloud.FakeCloud{
ExistsByProviderID: false,
ErrByProviderID: errors.New("err!"),
},
deleteNodes: []*v1.Node{},
},
{
name: "node is not ready but still exists",
fnh: &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "node0",
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
DeletedNodes: []*v1.Node{},
Clientset: fake.NewSimpleClientset(),
},
fakeCloud: &fakecloud.FakeCloud{
ExistsByProviderID: true,
},
deleteNodes: []*v1.Node{},
},
{
name: "node ready condition is unknown, node doesn't exist",
fnh: &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
DeletedNodes: []*v1.Node{},
Clientset: fake.NewSimpleClientset(),
},
fakeCloud: &fakecloud.FakeCloud{
ExistsByProviderID: false,
},
deleteNodes: []*v1.Node{},
},
{
name: "node ready condition is unknown, node exists",
fnh: &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
DeletedNodes: []*v1.Node{},
Clientset: fake.NewSimpleClientset(),
},
fakeCloud: &fakecloud.FakeCloud{
ExistsByProviderID: true,
},
deleteNodes: []*v1.Node{},
},
{
name: "node is ready, but provider said it is deleted (maybe a bug in provider)",
fnh: &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "node0",
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
DeletedNodes: []*v1.Node{},
Clientset: fake.NewSimpleClientset(),
},
fakeCloud: &fakecloud.FakeCloud{
ExistsByProviderID: false,
},
deleteNodes: []*v1.Node{},
},
}
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
informer := informers.NewSharedInformerFactory(testcase.fnh.Clientset, time.Second)
nodeInformer := informer.Core().V1().Nodes()
if err := syncNodeStore(nodeInformer, testcase.fnh); err != nil {
t.Errorf("unexpected error: %v", err)
}
eventBroadcaster := record.NewBroadcaster()
cloudNodeLifecycleController := &CloudNodeLifecycleController{
nodeLister: nodeInformer.Lister(),
kubeClient: testcase.fnh,
cloud: testcase.fakeCloud,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"}),
nodeMonitorPeriod: 1 * time.Second,
}
eventBroadcaster.StartLogging(klog.Infof)
cloudNodeLifecycleController.MonitorNodes()
if !reflect.DeepEqual(testcase.fnh.DeletedNodes, testcase.deleteNodes) {
t.Logf("actual nodes: %v", testcase.fnh.DeletedNodes)
t.Logf("expected nodes: %v", testcase.deleteNodes)
t.Error("unexpected deleted nodes")
}
})
}
}
func Test_NodesShutdown(t *testing.T) {
testcases := []struct {
name string
fnh *testutil.FakeNodeHandler
fakeCloud *fakecloud.FakeCloud
updatedNodes []*v1.Node
}{
{
name: "node is not ready and was shutdown",
fnh: &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.Local),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.Local),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.Local),
},
},
},
},
},
UpdatedNodes: []*v1.Node{},
Clientset: fake.NewSimpleClientset(),
},
fakeCloud: &fakecloud.FakeCloud{
NodeShutdown: true,
ErrShutdownByProviderID: nil,
},
updatedNodes: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.Local),
},
Spec: v1.NodeSpec{
Taints: []v1.Taint{
*ShutdownTaint,
},
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.Local),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.Local),
},
},
},
},
},
},
{
name: "node is not ready, but there is error checking if node is shutdown",
fnh: &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.Local),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.Local),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.Local),
},
},
},
},
},
UpdatedNodes: []*v1.Node{},
Clientset: fake.NewSimpleClientset(),
},
fakeCloud: &fakecloud.FakeCloud{
NodeShutdown: false,
ErrShutdownByProviderID: errors.New("err!"),
},
updatedNodes: []*v1.Node{},
},
{
name: "node is not ready and is not shutdown",
fnh: &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.Local),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.Local),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.Local),
},
},
},
},
},
UpdatedNodes: []*v1.Node{},
Clientset: fake.NewSimpleClientset(),
},
fakeCloud: &fakecloud.FakeCloud{
NodeShutdown: false,
ErrShutdownByProviderID: nil,
},
updatedNodes: []*v1.Node{},
},
{
name: "node is ready but provider says it's shutdown (maybe a bug by provider)",
fnh: &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.Local),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.Local),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.Local),
},
},
},
},
},
UpdatedNodes: []*v1.Node{},
Clientset: fake.NewSimpleClientset(),
},
fakeCloud: &fakecloud.FakeCloud{
NodeShutdown: true,
ErrShutdownByProviderID: nil,
},
updatedNodes: []*v1.Node{},
},
}
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
informer := informers.NewSharedInformerFactory(testcase.fnh.Clientset, time.Second)
nodeInformer := informer.Core().V1().Nodes()
if err := syncNodeStore(nodeInformer, testcase.fnh); err != nil {
t.Errorf("unexpected error: %v", err)
}
eventBroadcaster := record.NewBroadcaster()
cloudNodeLifecycleController := &CloudNodeLifecycleController{
nodeLister: nodeInformer.Lister(),
kubeClient: testcase.fnh,
cloud: testcase.fakeCloud,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"}),
nodeMonitorPeriod: 1 * time.Second,
}
eventBroadcaster.StartLogging(klog.Infof)
cloudNodeLifecycleController.MonitorNodes()
if !reflect.DeepEqual(testcase.fnh.UpdatedNodes, testcase.updatedNodes) {
t.Logf("actual nodes: %v", testcase.fnh.UpdatedNodes)
t.Logf("expected nodes: %v", testcase.updatedNodes)
t.Error("unexpected updated nodes")
}
})
}
}
func syncNodeStore(nodeinformer coreinformers.NodeInformer, f *testutil.FakeNodeHandler) error {
nodes, err := f.List(metav1.ListOptions{})
if err != nil {
return err
}
newElems := make([]interface{}, 0, len(nodes.Items))
for i := range nodes.Items {
newElems = append(newElems, &nodes.Items[i])
}
return nodeinformer.Informer().GetStore().Replace(newElems, "newRV")
}

View File

@ -25,7 +25,6 @@ import (
"k8s.io/klog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -35,18 +34,16 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
const initializerName = "pvlabel.kubernetes.io"

View File

@ -47,7 +47,6 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/apis/core/validation"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
hashutil "k8s.io/kubernetes/pkg/util/hash"
taintutils "k8s.io/kubernetes/pkg/util/taints"
@ -89,11 +88,6 @@ var UpdateTaintBackoff = wait.Backoff{
Jitter: 1.0,
}
var ShutdownTaint = &v1.Taint{
Key: schedulerapi.TaintNodeShutdown,
Effect: v1.TaintEffectNoSchedule,
}
var (
KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
)

View File

@ -25,7 +25,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
@ -42,7 +41,6 @@ go_library(
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
@ -70,7 +68,6 @@ go_test(
srcs = ["node_lifecycle_controller_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/nodelifecycle/scheduler:go_default_library",
"//pkg/controller/testutil:go_default_library",
@ -86,9 +83,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
@ -98,7 +93,6 @@ go_test(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)

View File

@ -22,7 +22,6 @@ limitations under the License.
package nodelifecycle
import (
"context"
"fmt"
"hash/fnv"
"io"
@ -37,7 +36,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -54,7 +52,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
v1node "k8s.io/kubernetes/pkg/api/v1/node"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
@ -150,7 +147,6 @@ type Controller struct {
taintManager *scheduler.NoExecuteTaintManager
podInformerSynced cache.InformerSynced
cloud cloudprovider.Interface
kubeClient clientset.Interface
// This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
@ -183,8 +179,6 @@ type Controller struct {
leaseInformerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
nodeExistsInCloudProvider func(types.NodeName) (bool, error)
nodeShutdownInCloudProvider func(context.Context, *v1.Node) (bool, error)
recorder record.EventRecorder
@ -247,7 +241,6 @@ func NewNodeLifecycleController(
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
daemonSetInformer extensionsinformers.DaemonSetInformer,
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
nodeMonitorPeriod time.Duration,
nodeStartupGracePeriod time.Duration,
@ -280,17 +273,10 @@ func NewNodeLifecycleController(
}
nc := &Controller{
cloud: cloud,
kubeClient: kubeClient,
now: metav1.Now,
knownNodeSet: make(map[string]*v1.Node),
nodeHealthMap: make(map[string]*nodeHealthData),
nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) {
return nodeutil.ExistsInCloudProvider(cloud, nodeName)
},
nodeShutdownInCloudProvider: func(ctx context.Context, node *v1.Node) (bool, error) {
return nodeutil.ShutdownInCloudProvider(ctx, cloud, node)
},
recorder: recorder,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
@ -779,11 +765,6 @@ func (nc *Controller) monitorNodeHealth() error {
klog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
}
}
// remove shutdown taint this is needed always depending do we use taintbased or not
err := nc.markNodeAsNotShutdown(node)
if err != nil {
klog.Errorf("Failed to remove taints from node %v. Will retry in next iteration.", node.Name)
}
}
// Report node event.
@ -793,42 +774,6 @@ func (nc *Controller) monitorNodeHealth() error {
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
}
}
// Check with the cloud provider to see if the node still exists. If it
// doesn't, delete the node immediately.
if currentReadyCondition.Status != v1.ConditionTrue && nc.cloud != nil {
// check is node shutdowned, if yes do not deleted it. Instead add taint
shutdown, err := nc.nodeShutdownInCloudProvider(context.TODO(), node)
if err != nil {
klog.Errorf("Error determining if node %v shutdown in cloud: %v", node.Name, err)
}
// node shutdown
if shutdown && err == nil {
err = controller.AddOrUpdateTaintOnNode(nc.kubeClient, node.Name, controller.ShutdownTaint)
if err != nil {
klog.Errorf("Error patching node taints: %v", err)
}
continue
}
exists, err := nc.nodeExistsInCloudProvider(types.NodeName(node.Name))
if err != nil {
klog.Errorf("Error determining if node %v exists in cloud: %v", node.Name, err)
continue
}
if !exists {
klog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
nodeutil.RecordNodeEvent(nc.recorder, node.Name, string(node.UID), v1.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
go func(nodeName string) {
defer utilruntime.HandleCrash()
// Kubelet is not reporting and Cloud Provider says node
// is gone. Delete it without worrying about grace
// periods.
if err := nodeutil.ForcefullyDeleteNode(nc.kubeClient, nodeName); err != nil {
klog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err)
}
}(node.Name)
}
}
}
}
nc.handleDisruption(zoneToNodeConditions, nodes)
@ -1268,17 +1213,6 @@ func (nc *Controller) markNodeAsReachable(node *v1.Node) (bool, error) {
return nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Remove(node.Name), nil
}
func (nc *Controller) markNodeAsNotShutdown(node *v1.Node) error {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
err := controller.RemoveTaintOffNode(nc.kubeClient, node.Name, node, controller.ShutdownTaint)
if err != nil {
klog.Errorf("Failed to remove taint from node %v: %v", node.Name, err)
return err
}
return nil
}
// ComputeZoneState returns a slice of NodeReadyConditions for all Nodes in a given zone.
// The zone is considered:
// - fullyDisrupted if there're no Ready Nodes,

View File

@ -17,7 +17,6 @@ limitations under the License.
package nodelifecycle
import (
"context"
"strings"
"testing"
"time"
@ -28,9 +27,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
"k8s.io/client-go/informers"
@ -40,8 +37,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
testcore "k8s.io/client-go/testing"
cloudprovider "k8s.io/cloud-provider"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
"k8s.io/kubernetes/pkg/controller/testutil"
@ -128,7 +123,6 @@ func (nc *nodeLifecycleController) syncNodeStore(fakeNodeHandler *testutil.FakeN
}
func newNodeLifecycleControllerFromClient(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
evictionLimiterQPS float32,
@ -152,7 +146,6 @@ func newNodeLifecycleControllerFromClient(
factory.Core().V1().Pods(),
nodeInformer,
daemonSetInformer,
cloud,
kubeClient,
nodeMonitorPeriod,
nodeStartupGracePeriod,
@ -641,7 +634,6 @@ func TestMonitorNodeHealthEvictPods(t *testing.T) {
for _, item := range table {
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
item.fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
@ -802,7 +794,6 @@ func TestPodStatusChange(t *testing.T) {
for _, item := range table {
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
item.fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
@ -1327,7 +1318,6 @@ func TestMonitorNodeHealthEvictPodsWithDisruption(t *testing.T) {
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: item.podList}),
}
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
@ -1392,185 +1382,6 @@ func TestMonitorNodeHealthEvictPodsWithDisruption(t *testing.T) {
}
}
func TestCloudProviderNodeShutdown(t *testing.T) {
testCases := []struct {
testName string
node *v1.Node
shutdown bool
}{
{
testName: "node shutdowned add taint",
shutdown: true,
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "node0",
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
{
testName: "node started after shutdown remove taint",
shutdown: false,
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "node0",
Taints: []v1.Taint{
{
Key: schedulerapi.TaintNodeShutdown,
Effect: v1.TaintEffectNoSchedule,
},
},
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
}
for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
fnh := &testutil.FakeNodeHandler{
Existing: []*v1.Node{tc.node},
Clientset: fake.NewSimpleClientset(),
}
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fnh,
10*time.Minute,
testRateLimiterQPS,
testRateLimiterQPS,
testLargeClusterThreshold,
testUnhealthyThreshold,
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
false)
nodeController.cloud = &fakecloud.FakeCloud{}
nodeController.now = func() metav1.Time { return metav1.Date(2016, 1, 1, 12, 0, 0, 0, time.UTC) }
nodeController.recorder = testutil.NewFakeRecorder()
nodeController.nodeShutdownInCloudProvider = func(ctx context.Context, node *v1.Node) (bool, error) {
return tc.shutdown, nil
}
if err := nodeController.syncNodeStore(fnh); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(fnh.UpdatedNodes) != 1 {
t.Errorf("Node was not updated")
}
if tc.shutdown {
if len(fnh.UpdatedNodes[0].Spec.Taints) != 1 {
t.Errorf("Node Taint was not added")
}
if fnh.UpdatedNodes[0].Spec.Taints[0].Key != "node.cloudprovider.kubernetes.io/shutdown" {
t.Errorf("Node Taint key is not correct")
}
} else {
if len(fnh.UpdatedNodes[0].Spec.Taints) != 0 {
t.Errorf("Node Taint was not removed after node is back in ready state")
}
}
})
}
}
// TestCloudProviderNoRateLimit tests that monitorNodes() immediately deletes
// pods and the node when kubelet has not reported, and the cloudprovider says
// the node is gone.
func TestCloudProviderNoRateLimit(t *testing.T) {
fnh := &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0"), *testutil.NewPod("pod1", "node0")}}),
DeleteWaitChan: make(chan struct{}),
}
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fnh,
10*time.Minute,
testRateLimiterQPS,
testRateLimiterQPS,
testLargeClusterThreshold,
testUnhealthyThreshold,
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
false)
nodeController.cloud = &fakecloud.FakeCloud{}
nodeController.now = func() metav1.Time { return metav1.Date(2016, 1, 1, 12, 0, 0, 0, time.UTC) }
nodeController.recorder = testutil.NewFakeRecorder()
nodeController.nodeExistsInCloudProvider = func(nodeName types.NodeName) (bool, error) {
return false, nil
}
nodeController.nodeShutdownInCloudProvider = func(ctx context.Context, node *v1.Node) (bool, error) {
return false, nil
}
// monitorNodeHealth should allow this node to be immediately deleted
if err := nodeController.syncNodeStore(fnh); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(); err != nil {
t.Errorf("unexpected error: %v", err)
}
select {
case <-fnh.DeleteWaitChan:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Timed out waiting %v for node to be deleted", wait.ForeverTestTimeout)
}
if len(fnh.DeletedNodes) != 1 || fnh.DeletedNodes[0].Name != "node0" {
t.Errorf("Node was not deleted")
}
if nodeOnQueue := nodeController.zonePodEvictor[""].Remove("node0"); nodeOnQueue {
t.Errorf("Node was queued for eviction. Should have been immediately deleted.")
}
}
func TestMonitorNodeHealthUpdateStatus(t *testing.T) {
fakeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
table := []struct {
@ -1822,7 +1633,6 @@ func TestMonitorNodeHealthUpdateStatus(t *testing.T) {
}
for i, item := range table {
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
item.fakeNodeHandler,
5*time.Minute,
testRateLimiterQPS,
@ -2403,7 +2213,6 @@ func TestMonitorNodeHealthUpdateNodeAndPodStatusWithLease(t *testing.T) {
for _, item := range testcases {
t.Run(item.description, func(t *testing.T) {
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
item.fakeNodeHandler,
5*time.Minute,
testRateLimiterQPS,
@ -2567,7 +2376,6 @@ func TestMonitorNodeHealthMarkPodsNotReady(t *testing.T) {
for i, item := range table {
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
item.fakeNodeHandler,
5*time.Minute,
testRateLimiterQPS,
@ -2697,7 +2505,6 @@ func TestApplyNoExecuteTaints(t *testing.T) {
}
originalTaint := UnreachableTaintTemplate
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
@ -2838,7 +2645,6 @@ func TestSwapUnreachableNotReadyTaints(t *testing.T) {
updatedTaint := NotReadyTaintTemplate
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
@ -2940,7 +2746,6 @@ func TestTaintsNodeByCondition(t *testing.T) {
}
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
@ -3180,7 +2985,6 @@ func TestNodeEventGeneration(t *testing.T) {
}
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
5*time.Minute,
testRateLimiterQPS,
@ -3191,26 +2995,20 @@ func TestNodeEventGeneration(t *testing.T) {
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
false)
nodeController.cloud = &fakecloud.FakeCloud{}
nodeController.nodeExistsInCloudProvider = func(nodeName types.NodeName) (bool, error) {
return false, nil
}
nodeController.nodeShutdownInCloudProvider = func(ctx context.Context, node *v1.Node) (bool, error) {
return false, nil
}
nodeController.now = func() metav1.Time { return fakeNow }
fakeRecorder := testutil.NewFakeRecorder()
nodeController.recorder = fakeRecorder
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(fakeRecorder.Events) != 2 {
t.Fatalf("unexpected events, got %v, expected %v: %+v", len(fakeRecorder.Events), 2, fakeRecorder.Events)
if len(fakeRecorder.Events) != 1 {
t.Fatalf("unexpected events, got %v, expected %v: %+v", len(fakeRecorder.Events), 1, fakeRecorder.Events)
}
if fakeRecorder.Events[0].Reason != "RegisteredNode" || fakeRecorder.Events[1].Reason != "DeletingNode" {
if fakeRecorder.Events[0].Reason != "RegisteredNode" {
var reasons []string
for _, event := range fakeRecorder.Events {
reasons = append(reasons, event.Reason)
@ -3249,7 +3047,6 @@ func TestFixDeprecatedTaintKey(t *testing.T) {
}
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,

View File

@ -21,7 +21,6 @@ go_library(
"//staging/src/k8s.io/client-go/listers/extensions/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -17,8 +17,6 @@ limitations under the License.
package node
import (
"context"
"errors"
"fmt"
"strings"
@ -34,7 +32,6 @@ import (
"k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
extensionslisters "k8s.io/client-go/listers/extensions/v1beta1"
cloudprovider "k8s.io/cloud-provider"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/kubelet/util/format"
@ -43,12 +40,6 @@ import (
"k8s.io/klog"
)
var (
// ErrCloudInstance occurs when the cloud provider does not support
// the Instances API.
ErrCloudInstance = errors.New("cloud provider doesn't support instances")
)
// DeletePods will delete all pods from master running on given node,
// and return true if any pods were deleted, or were found pending
// deletion.
@ -125,15 +116,6 @@ func SetPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeNa
return updatedPod, nil
}
// ForcefullyDeleteNode deletes the node immediately. The pods on the
// node are cleaned up by the podGC.
func ForcefullyDeleteNode(kubeClient clientset.Interface, nodeName string) error {
if err := kubeClient.CoreV1().Nodes().Delete(nodeName, nil); err != nil {
return fmt.Errorf("unable to delete node %q: %v", nodeName, err)
}
return nil
}
// MarkAllPodsNotReady updates ready status of all pods running on
// given node from master return true if success
func MarkAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error {
@ -171,36 +153,6 @@ func MarkAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error {
return fmt.Errorf("%v", strings.Join(errMsg, "; "))
}
// ExistsInCloudProvider returns true if the node exists in the
// cloud provider.
func ExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) {
instances, ok := cloud.Instances()
if !ok {
return false, fmt.Errorf("%v", ErrCloudInstance)
}
if _, err := instances.InstanceID(context.TODO(), nodeName); err != nil {
if err == cloudprovider.InstanceNotFound {
return false, nil
}
return false, err
}
return true, nil
}
// ShutdownInCloudProvider returns true if the node is shutdowned in
// cloud provider.
func ShutdownInCloudProvider(ctx context.Context, cloud cloudprovider.Interface, node *v1.Node) (bool, error) {
instances, ok := cloud.Instances()
if !ok {
return false, fmt.Errorf("%v", ErrCloudInstance)
}
shutdown, err := instances.InstanceShutdownByProviderID(ctx, node.Spec.ProviderID)
if err == cloudprovider.NotImplemented {
return false, nil
}
return shutdown, err
}
// RecordNodeEvent records a event related to a node.
func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
ref := &v1.ObjectReference{

View File

@ -95,7 +95,6 @@ func TestTaintNodeByCondition(t *testing.T) {
informers.Core().V1().Pods(),
informers.Core().V1().Nodes(),
informers.Extensions().V1beta1().DaemonSets(),
nil, // CloudProvider
cs,
time.Hour, // Node monitor grace period
time.Second, // Node startup grace period