Merge pull request #49257 from k82cn/k8s_42001

Automatic merge from submit-queue (batch tested with PRs 51574, 51534, 49257, 44680, 48836)

Task 1: Tainted node by condition.

**What this PR does / why we need it**:
Tainted node by condition for MemoryPressure, OutOfDisk and so on.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: part of #42001 

**Release note**:
```release-note
Tainted nodes by conditions as following:
  * 'node.kubernetes.io/network-unavailable=:NoSchedule' if NetworkUnavailable is true
  * 'node.kubernetes.io/disk-pressure=:NoSchedule' if DiskPressure is true
  * 'node.kubernetes.io/memory-pressure=:NoSchedule' if MemoryPressure is true
  * 'node.kubernetes.io/out-of-disk=:NoSchedule' if OutOfDisk is true
```
pull/6/head
Kubernetes Submit Queue 2017-08-31 23:13:20 -07:00 committed by GitHub
commit b832992fc6
6 changed files with 302 additions and 25 deletions

View File

@ -112,6 +112,7 @@ func startNodeController(ctx ControllerContext) (bool, error) {
ipam.CIDRAllocatorType(ctx.Options.CIDRAllocatorType),
ctx.Options.EnableTaintManager,
utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition),
)
if err != nil {
return true, err

View File

@ -21,6 +21,7 @@ go_test(
"//pkg/kubelet/apis:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/taints:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",

View File

@ -76,6 +76,20 @@ var (
Key: algorithm.TaintNodeNotReady,
Effect: v1.TaintEffectNoExecute,
}
nodeConditionToTaintKeyMap = map[v1.NodeConditionType]string{
v1.NodeMemoryPressure: algorithm.TaintNodeMemoryPressure,
v1.NodeOutOfDisk: algorithm.TaintNodeOutOfDisk,
v1.NodeDiskPressure: algorithm.TaintNodeDiskPressure,
v1.NodeNetworkUnavailable: algorithm.TaintNodeNetworkUnavailable,
}
taintKeyToNodeConditionMap = map[string]v1.NodeConditionType{
algorithm.TaintNodeNetworkUnavailable: v1.NodeNetworkUnavailable,
algorithm.TaintNodeMemoryPressure: v1.NodeMemoryPressure,
algorithm.TaintNodeOutOfDisk: v1.NodeOutOfDisk,
algorithm.TaintNodeDiskPressure: v1.NodeDiskPressure,
}
)
const (
@ -180,6 +194,10 @@ type NodeController struct {
// if set to true NodeController will taint Nodes with 'TaintNodeNotReady' and 'TaintNodeUnreachable'
// taints instead of evicting Pods itself.
useTaintBasedEvictions bool
// if set to true, NodeController will taint Nodes based on its condition for 'NetworkUnavailable',
// 'MemoryPressure', 'OutOfDisk' and 'DiskPressure'.
taintNodeByCondition bool
}
// NewNodeController returns a new node controller to sync instances from cloudprovider.
@ -206,7 +224,9 @@ func NewNodeController(
allocateNodeCIDRs bool,
allocatorType ipam.CIDRAllocatorType,
runTaintManager bool,
useTaintBasedEvictions bool) (*NodeController, error) {
useTaintBasedEvictions bool,
taintNodeByCondition bool,
) (*NodeController, error) {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"})
eventBroadcaster.StartLogging(glog.Infof)
@ -387,6 +407,17 @@ func NewNodeController(
nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient)
}
if nc.taintNodeByCondition {
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error {
return nc.doNoScheduleTaintingPass(node)
}),
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
return nc.doNoScheduleTaintingPass(newNode)
}),
})
}
nc.nodeLister = nodeInformer.Lister()
nc.nodeInformerSynced = nodeInformer.Informer().HasSynced
@ -425,6 +456,34 @@ func (nc *NodeController) doEvictionPass() {
}
}
func (nc *NodeController) doNoScheduleTaintingPass(node *v1.Node) error {
// Map node's condition to Taints.
taints := []v1.Taint{}
for _, condition := range node.Status.Conditions {
if _, found := nodeConditionToTaintKeyMap[condition.Type]; found {
if condition.Status == v1.ConditionTrue {
taints = append(taints, v1.Taint{
Key: nodeConditionToTaintKeyMap[condition.Type],
Effect: v1.TaintEffectNoSchedule,
})
}
}
}
nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
_, found := taintKeyToNodeConditionMap[t.Key]
return found
})
taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
// If nothing to add not delete, return true directly.
if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
return nil
}
if !util.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
return fmt.Errorf("failed to swap taints of node %+v", node)
}
return nil
}
func (nc *NodeController) doNoExecuteTaintingPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
@ -459,7 +518,7 @@ func (nc *NodeController) doNoExecuteTaintingPass() {
return true, 0
}
return util.SwapNodeControllerTaint(nc.kubeClient, &taintToAdd, &oppositeTaint, node), 0
return util.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node), 0
})
}
}
@ -542,7 +601,7 @@ func (nc *NodeController) monitorNodeStatus() error {
nc.knownNodeSet[added[i].Name] = added[i]
nc.addPodEvictorForNewZone(added[i])
if nc.useTaintBasedEvictions {
nc.markNodeAsHealthy(added[i])
nc.markNodeAsReachable(added[i])
} else {
nc.cancelPodEviction(added[i])
}
@ -591,7 +650,7 @@ func (nc *NodeController) monitorNodeStatus() error {
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
taintToAdd := *NotReadyTaintTemplate
if !util.SwapNodeControllerTaint(nc.kubeClient, &taintToAdd, UnreachableTaintTemplate, node) {
if !util.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) {
glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) {
@ -618,7 +677,7 @@ func (nc *NodeController) monitorNodeStatus() error {
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
taintToAdd := *UnreachableTaintTemplate
if !util.SwapNodeControllerTaint(nc.kubeClient, &taintToAdd, NotReadyTaintTemplate, node) {
if !util.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) {
@ -642,7 +701,7 @@ func (nc *NodeController) monitorNodeStatus() error {
}
if observedReadyCondition.Status == v1.ConditionTrue {
if nc.useTaintBasedEvictions {
removed, err := nc.markNodeAsHealthy(node)
removed, err := nc.markNodeAsReachable(node)
if err != nil {
glog.Errorf("Failed to remove taints from node %v. Will retry in next iteration.", node.Name)
}
@ -737,7 +796,7 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1
glog.V(0).Info("NodeController detected that all Nodes are not-Ready. Entering master disruption mode.")
for i := range nodes {
if nc.useTaintBasedEvictions {
_, err := nc.markNodeAsHealthy(nodes[i])
_, err := nc.markNodeAsReachable(nodes[i])
if err != nil {
glog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
}
@ -1053,7 +1112,7 @@ func (nc *NodeController) markNodeForTainting(node *v1.Node) bool {
return nc.zoneNoExecuteTainer[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
}
func (nc *NodeController) markNodeAsHealthy(node *v1.Node) (bool, error) {
func (nc *NodeController) markNodeAsReachable(node *v1.Node) (bool, error) {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
err := controller.RemoveTaintOffNode(nc.kubeClient, node.Name, node, UnreachableTaintTemplate)

View File

@ -46,6 +46,7 @@ import (
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/util/node"
taintutils "k8s.io/kubernetes/pkg/util/taints"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
)
const (
@ -109,6 +110,7 @@ func NewNodeControllerFromClient(
ipam.RangeAllocatorType,
useTaints,
useTaints,
useTaints,
)
if err != nil {
return nil, err
@ -2060,6 +2062,186 @@ func TestSwapUnreachableNotReadyTaints(t *testing.T) {
}
}
func TestTaintsNodeByCondition(t *testing.T) {
fakeNow := metav1.Date(2017, 1, 1, 12, 0, 0, 0, time.UTC)
evictionTimeout := 10 * time.Minute
fakeNodeHandler := &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region1",
kubeletapis.LabelZoneFailureDomain: "zone1",
},
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
}
nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, evictionTimeout,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, true)
nodeController.now = func() metav1.Time { return fakeNow }
nodeController.recorder = testutil.NewFakeRecorder()
outOfDiskTaint := &v1.Taint{
Key: algorithm.TaintNodeOutOfDisk,
Effect: v1.TaintEffectNoSchedule,
}
networkUnavailableTaint := &v1.Taint{
Key: algorithm.TaintNodeNetworkUnavailable,
Effect: v1.TaintEffectNoSchedule,
}
tests := []struct {
Name string
Node *v1.Node
ExpectedTaints []*v1.Taint
}{
{
Name: "NetworkUnavailable is true",
Node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region1",
kubeletapis.LabelZoneFailureDomain: "zone1",
},
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeNetworkUnavailable,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
ExpectedTaints: []*v1.Taint{networkUnavailableTaint},
},
{
Name: "NetworkUnavailable and OutOfDisk are true",
Node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region1",
kubeletapis.LabelZoneFailureDomain: "zone1",
},
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeNetworkUnavailable,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeOutOfDisk,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
ExpectedTaints: []*v1.Taint{networkUnavailableTaint, outOfDiskTaint},
},
{
Name: "NetworkUnavailable is true, OutOfDisk is unknown",
Node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region1",
kubeletapis.LabelZoneFailureDomain: "zone1",
},
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeNetworkUnavailable,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeOutOfDisk,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
ExpectedTaints: []*v1.Taint{networkUnavailableTaint},
},
}
for _, test := range tests {
fakeNodeHandler.Update(test.Node)
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.doNoScheduleTaintingPass(test.Node)
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
node0, err := nodeController.nodeLister.Get("node0")
if err != nil {
t.Errorf("Can't get current node0...")
return
}
if len(node0.Spec.Taints) != len(test.ExpectedTaints) {
t.Errorf("%s: Unexpected number of taints: expected %d, got %d",
test.Name, len(test.ExpectedTaints), len(node0.Spec.Taints))
}
for _, taint := range test.ExpectedTaints {
if !taintutils.TaintExists(node0.Spec.Taints, taint) {
t.Errorf("%s: Can't find taint %v in %v", test.Name, taint, node0.Spec.Taints)
}
}
}
}
func TestNodeEventGeneration(t *testing.T) {
fakeNow := metav1.Date(2016, 9, 10, 12, 0, 0, 0, time.UTC)
fakeNodeHandler := &testutil.FakeNodeHandler{

View File

@ -254,31 +254,35 @@ func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newSta
// SwapNodeControllerTaint returns true in case of success and false
// otherwise.
func SwapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintToRemove *v1.Taint, node *v1.Node) bool {
taintToAdd.TimeAdded = metav1.Now()
err := controller.AddOrUpdateTaintOnNode(kubeClient, node.Name, taintToAdd)
if err != nil {
utilruntime.HandleError(
fmt.Errorf(
"unable to taint %v unresponsive Node %q: %v",
taintToAdd.Key,
node.Name,
err))
return false
func SwapNodeControllerTaint(kubeClient clientset.Interface, taintsToAdd, taintsToRemove []*v1.Taint, node *v1.Node) bool {
for _, taintToAdd := range taintsToAdd {
taintToAdd.TimeAdded = metav1.Now()
}
glog.V(4).Infof("Added %v Taint to Node %v", taintToAdd, node.Name)
err = controller.RemoveTaintOffNode(kubeClient, node.Name, node, taintToRemove)
err := controller.AddOrUpdateTaintOnNode(kubeClient, node.Name, taintsToAdd...)
if err != nil {
utilruntime.HandleError(
fmt.Errorf(
"unable to remove %v unneeded taint from unresponsive Node %q: %v",
taintToRemove.Key,
"unable to taint %+v unresponsive Node %q: %v",
taintsToAdd,
node.Name,
err))
return false
}
glog.V(4).Infof("Made sure that Node %v has no %v Taint", node.Name, taintToRemove)
glog.V(4).Infof("Added %+v Taint to Node %v", taintsToAdd, node.Name)
err = controller.RemoveTaintOffNode(kubeClient, node.Name, node, taintsToRemove...)
if err != nil {
utilruntime.HandleError(
fmt.Errorf(
"unable to remove %+v unneeded taint from unresponsive Node %q: %v",
taintsToRemove,
node.Name,
err))
return false
}
glog.V(4).Infof("Made sure that Node %+v has no %v Taint", node.Name, taintsToRemove)
return true
}

View File

@ -19,11 +19,11 @@ package taints
import (
"fmt"
"k8s.io/apimachinery/pkg/util/sets"
"strings"
"k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/helper"
@ -299,3 +299,33 @@ func TaintExists(taints []v1.Taint, taintToFind *v1.Taint) bool {
}
return false
}
func TaintSetDiff(t1, t2 []v1.Taint) (taintsToAdd []*v1.Taint, taintsToRemove []*v1.Taint) {
for _, taint := range t1 {
if !TaintExists(t2, &taint) {
t := taint
taintsToAdd = append(taintsToAdd, &t)
}
}
for _, taint := range t2 {
if !TaintExists(t1, &taint) {
t := taint
taintsToRemove = append(taintsToRemove, &t)
}
}
return
}
func TaintSetFilter(taints []v1.Taint, fn func(*v1.Taint) bool) []v1.Taint {
res := []v1.Taint{}
for _, taint := range taints {
if fn(&taint) {
res = append(res, taint)
}
}
return res
}