Taint controller - first commit

pull/6/head
gmarek 2017-01-23 10:28:51 +01:00
parent e538adcd00
commit 2f0e436677
13 changed files with 1670 additions and 5 deletions

View File

@ -433,6 +433,7 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
serviceCIDR,
int(s.NodeCIDRMaskSize),
s.AllocateNodeCIDRs,
s.EnableTaintManager,
)
if err != nil {
return fmt.Errorf("failed to initialize nodecontroller: %v", err)

View File

@ -104,6 +104,7 @@ func NewCMServer() *CMServer {
ClusterSigningCertFile: "/etc/kubernetes/ca/ca.pem",
ClusterSigningKeyFile: "/etc/kubernetes/ca/ca.key",
ReconcilerSyncLoopPeriod: metav1.Duration{Duration: 5 * time.Second},
EnableTaintManager: true,
},
}
s.LeaderElection.LeaderElect = true
@ -196,6 +197,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet, allControllers []string, disabled
fs.Float32Var(&s.UnhealthyZoneThreshold, "unhealthy-zone-threshold", 0.55, "Fraction of Nodes in a zone which needs to be not Ready (minimum 3) for zone to be treated as unhealthy. ")
fs.BoolVar(&s.DisableAttachDetachReconcilerSync, "disable-attach-detach-reconcile-sync", false, "Disable volume attach detach reconciler sync. Disabling this may cause volumes to be mismatched with pods. Use wisely.")
fs.DurationVar(&s.ReconcilerSyncLoopPeriod.Duration, "attach-detach-reconcile-sync-period", s.ReconcilerSyncLoopPeriod.Duration, "The reconciler sync wait time between volume attach detach. This duration must be larger than one second, and increasing this value from the default may allow for volumes to be mismatched with pods.")
fs.BoolVar(&s.EnableTaintManager, "enable-taint-manager", s.EnableTaintManager, "WARNING: Beta feature. If set to true enables NoExecute Taints and will evict all not-tolerating Pod running on Nodes tainted with this kind of Taints.")
leaderelection.BindFlags(&s.LeaderElection, fs)

View File

@ -168,6 +168,7 @@ enable-garbage-collector
enable-hostpath-provisioner
enable-server
enable-swagger-ui
enable-taint-manager
etcd-address
etcd-cafile
etcd-certfile

View File

@ -396,6 +396,28 @@ func DeleteTaint(taints []Taint, taintToDelete *Taint) ([]Taint, bool) {
return newTaints, deleted
}
// Returns true and list of Tolerations matching all Taints if all are tolerated, or false otherwise.
func GetMatchingTolerations(taints []Taint, tolerations []Toleration) (bool, []Toleration) {
if len(tolerations) == 0 && len(taints) > 0 {
return false, []Toleration{}
}
result := []Toleration{}
for i := range taints {
tolerated := false
for j := range tolerations {
if tolerations[j].ToleratesTaint(&taints[i]) {
result = append(result, tolerations[j])
tolerated = true
break
}
}
if !tolerated {
return false, []Toleration{}
}
}
return true, result
}
// MatchTaint checks if the taint matches taintToMatch. Taints are unique by key:effect,
// if the two taints have same key:effect, regard as they match.
func (t *Taint) MatchTaint(taintToMatch Taint) bool {

View File

@ -790,6 +790,9 @@ type KubeControllerManagerConfiguration struct {
// ReconcilerSyncLoopPeriod is the amount of time the reconciler sync states loop
// wait between successive executions. Is set to 5 sec by default.
ReconcilerSyncLoopPeriod metav1.Duration
// If set to true enables NoExecute Taints and will evict all not-tolerating
// Pod running on Nodes tainted with this kind of Taints.
EnableTaintManager bool
}
// VolumeConfiguration contains *all* enumerated flags meant to configure all volume

View File

@ -148,6 +148,8 @@ type NodeController struct {
// allocate/recycle CIDRs for node if allocateNodeCIDRs == true
cidrAllocator CIDRAllocator
// manages taints
taintManager *NoExecuteTaintManager
forcefullyDeletePod func(*v1.Pod) error
nodeExistsInCloudProvider func(types.NodeName) (bool, error)
@ -160,6 +162,10 @@ type NodeController struct {
secondaryEvictionLimiterQPS float32
largeClusterThreshold int32
unhealthyZoneThreshold float32
// if set to true NodeController will start TaintManager that will evict Pods from
// tainted nodes, if they're not tolerated.
runTaintManager bool
}
// NewNodeController returns a new node controller to sync instances from cloudprovider.
@ -183,7 +189,8 @@ func NewNodeController(
clusterCIDR *net.IPNet,
serviceCIDR *net.IPNet,
nodeCIDRMaskSize int,
allocateNodeCIDRs bool) (*NodeController, error) {
allocateNodeCIDRs bool,
runTaintManager bool) (*NodeController, error) {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "controllermanager"})
eventBroadcaster.StartLogging(glog.Infof)
@ -232,14 +239,47 @@ func NewNodeController(
largeClusterThreshold: largeClusterThreshold,
unhealthyZoneThreshold: unhealthyZoneThreshold,
zoneStates: make(map[string]zoneState),
runTaintManager: runTaintManager,
}
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
nc.computeZoneStateFunc = nc.ComputeZoneState
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nc.maybeDeleteTerminatingPod,
UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) },
AddFunc: func(obj interface{}) {
nc.maybeDeleteTerminatingPod(obj)
pod := obj.(*v1.Pod)
if nc.taintManager != nil {
nc.taintManager.PodUpdated(nil, pod)
}
},
UpdateFunc: func(prev, obj interface{}) {
nc.maybeDeleteTerminatingPod(obj)
prevPod := prev.(*v1.Pod)
newPod := obj.(*v1.Pod)
if nc.taintManager != nil {
nc.taintManager.PodUpdated(prevPod, newPod)
}
},
DeleteFunc: func(obj interface{}) {
pod, isPod := obj.(*v1.Pod)
// We can get DeletedFinalStateUnknown instead of *v1.Node here and we need to handle that correctly. #34692
if !isPod {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Received unexpected object: %v", obj)
return
}
pod, ok = deletedState.Obj.(*v1.Pod)
if !ok {
glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
return
}
}
if nc.taintManager != nil {
nc.taintManager.PodUpdated(pod, nil)
}
},
})
nc.podInformerSynced = podInformer.Informer().HasSynced
@ -279,9 +319,13 @@ func NewNodeController(
if err := nc.cidrAllocator.AllocateOrOccupyCIDR(node); err != nil {
utilruntime.HandleError(fmt.Errorf("Error allocating CIDR: %v", err))
}
if nc.taintManager != nil {
nc.taintManager.NodeUpdated(nil, node)
}
},
UpdateFunc: func(_, obj interface{}) {
node := obj.(*v1.Node)
UpdateFunc: func(oldNode, newNode interface{}) {
node := newNode.(*v1.Node)
prevNode := oldNode.(*v1.Node)
// If the PodCIDR is not empty we either:
// - already processed a Node that already had a CIDR after NC restarted
// (cidr is marked as used),
@ -312,6 +356,9 @@ func NewNodeController(
utilruntime.HandleError(fmt.Errorf("Error allocating CIDR: %v", err))
}
}
if nc.taintManager != nil {
nc.taintManager.NodeUpdated(prevNode, node)
}
},
DeleteFunc: func(originalObj interface{}) {
obj, err := api.Scheme.DeepCopy(originalObj)
@ -334,6 +381,9 @@ func NewNodeController(
return
}
}
if nc.taintManager != nil {
nc.taintManager.NodeUpdated(node, nil)
}
if err := nc.cidrAllocator.ReleaseCIDR(node); err != nil {
glog.Errorf("Error releasing CIDR: %v", err)
}
@ -341,6 +391,10 @@ func NewNodeController(
}
}
if nc.runTaintManager {
nc.taintManager = NewNoExecuteTaintManager(kubeClient)
}
nodeInformer.Informer().AddEventHandler(nodeEventHandlerFuncs)
nc.nodeLister = nodeInformer.Lister()
nc.nodeInformerSynced = nodeInformer.Informer().HasSynced
@ -368,6 +422,10 @@ func (nc *NodeController) Run() {
}
}, nc.nodeMonitorPeriod, wait.NeverStop)
if nc.runTaintManager {
go nc.taintManager.Run(wait.NeverStop)
}
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.

View File

@ -99,6 +99,7 @@ func NewNodeControllerFromClient(
serviceCIDR,
nodeCIDRMaskSize,
allocateNodeCIDRs,
false,
)
if err != nil {
return nil, err

View File

@ -0,0 +1,432 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"fmt"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"github.com/golang/glog"
)
const (
nodeUpdateChannelSize = 10
podUpdateChannelSize = 1
retries = 5
)
func computeTaintDifference(left []v1.Taint, right []v1.Taint) []v1.Taint {
result := []v1.Taint{}
for i := range left {
found := false
for j := range right {
if left[i] == right[j] {
found = true
break
}
}
if !found {
result = append(result, left[i])
}
}
return result
}
// copy of 'computeTaintDifference' - long live lack of generics...
func computeTolerationDifference(left []v1.Toleration, right []v1.Toleration) []v1.Toleration {
result := []v1.Toleration{}
for i := range left {
found := false
for j := range right {
if left[i] == right[j] {
found = true
break
}
}
if !found {
result = append(result, left[i])
}
}
return result
}
// Needed to make workqueue work
type updateItemInterface interface{}
type nodeUpdateItem struct {
oldNode *v1.Node
newNode *v1.Node
newTaints []v1.Taint
}
type podUpdateItem struct {
oldPod *v1.Pod
newPod *v1.Pod
newTolerations []v1.Toleration
}
// NoExecuteTaint manager listens to Taint/Toleration changes and is resposible for removing Pods
// from Nodes tainted with NoExecute Taints.
type NoExecuteTaintManager struct {
client clientset.Interface
taintEvictionQueue *TimedWorkerQueue
// keeps a map from nodeName to all noExecute taints on that Node
taintedNodesLock sync.Mutex
taintedNodes map[string][]v1.Taint
nodeUpdateChannel chan *nodeUpdateItem
podUpdateChannel chan *podUpdateItem
nodeUpdateQueue workqueue.Interface
podUpdateQueue workqueue.Interface
}
func deletePodHandler(c clientset.Interface) func(args *WorkArgs) error {
return func(args *WorkArgs) error {
ns := args.NamespacedName.Namespace
name := args.NamespacedName.Name
glog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
var err error
for i := 0; i < retries; i++ {
err = c.Core().Pods(ns).Delete(name, &metav1.DeleteOptions{})
if err == nil {
break
}
time.Sleep(10 * time.Millisecond)
}
return err
}
}
func getNonExecuteTaints(taints []v1.Taint) []v1.Taint {
result := []v1.Taint{}
for i := range taints {
if taints[i].Effect == v1.TaintEffectNoExecute {
result = append(result, taints[i])
}
}
return result
}
func getPodsAssignedToNode(c clientset.Interface, nodeName string) ([]v1.Pod, error) {
selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName})
pods, err := c.Core().Pods(v1.NamespaceAll).List(metav1.ListOptions{
FieldSelector: selector.String(),
LabelSelector: labels.Everything().String(),
})
for i := 0; i < retries && err != nil; i++ {
pods, err = c.Core().Pods(v1.NamespaceAll).List(metav1.ListOptions{
FieldSelector: selector.String(),
LabelSelector: labels.Everything().String(),
})
time.Sleep(100 * time.Millisecond)
}
if err != nil {
return []v1.Pod{}, fmt.Errorf("Failed to get Pods assigned to node %v. Skipping update.", nodeName)
}
return pods.Items, nil
}
// Returns minimal toleration time from the given slice, or -1 if it's infinite.
func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
minTolerationTime := int64(-1)
for i := range tolerations {
if tolerations[i].TolerationSeconds != nil {
if minTolerationTime < 0 {
minTolerationTime = *(tolerations[i].TolerationSeconds)
} else {
tolerationSeconds := *(tolerations[i].TolerationSeconds)
if tolerationSeconds < minTolerationTime {
if tolerationSeconds < 0 {
minTolerationTime = 0
} else {
minTolerationTime = tolerationSeconds
}
}
}
}
}
return time.Duration(minTolerationTime) * time.Second
}
// NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to
// communicate with the API server.
func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager {
return &NoExecuteTaintManager{
client: c,
taintEvictionQueue: CreateWorkerQueue(deletePodHandler(c)),
taintedNodes: make(map[string][]v1.Taint),
nodeUpdateChannel: make(chan *nodeUpdateItem, nodeUpdateChannelSize),
podUpdateChannel: make(chan *podUpdateItem, podUpdateChannelSize),
nodeUpdateQueue: workqueue.New(),
podUpdateQueue: workqueue.New(),
}
}
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
// Functions that are responsible for taking work items out of the workqueues and putting them
// into channels.
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.nodeUpdateQueue.Get()
if shutdown {
break
}
nodeUpdate := item.(*nodeUpdateItem)
select {
case <-stopCh:
break
case tc.nodeUpdateChannel <- nodeUpdate:
}
}
}(stopCh)
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.podUpdateQueue.Get()
if shutdown {
break
}
podUpdate := item.(*podUpdateItem)
select {
case <-stopCh:
break
case tc.podUpdateChannel <- podUpdate:
}
}
}(stopCh)
// When processing events we want to prioritize Node updates over Pod updates,
// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
// we don't want user (or system) to wait until PodUpdate queue is drained before it can
// start evicting Pods from tainted Nodes.
for {
select {
case <-stopCh:
break
case nodeUpdate := <-tc.nodeUpdateChannel:
tc.handleNodeUpdate(nodeUpdate)
case podUpdate := <-tc.podUpdateChannel:
// If we found a Pod update we need to empty Node queue first.
priority:
for {
select {
case nodeUpdate := <-tc.nodeUpdateChannel:
tc.handleNodeUpdate(nodeUpdate)
default:
break priority
}
}
// After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(podUpdate)
}
}
}
// PodUpdated is used to notify NoExecuteTaintManager about Pod changes.
func (tc *NoExecuteTaintManager) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) {
var err error
oldTolerations := []v1.Toleration{}
if oldPod != nil {
oldTolerations, err = v1.GetPodTolerations(oldPod)
if err != nil {
glog.Errorf("Failed to get Tolerations from the old Pod: %v", err)
return
}
}
newTolerations := []v1.Toleration{}
if newPod != nil {
newTolerations, err = v1.GetPodTolerations(newPod)
if err != nil {
glog.Errorf("Failed to get Tolerations from the new Pod: %v", err)
return
}
}
if oldPod != nil && newPod != nil && api.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName {
return
}
updateItem := &podUpdateItem{
oldPod: oldPod,
newPod: newPod,
newTolerations: newTolerations,
}
tc.podUpdateQueue.Add(updateItemInterface(updateItem))
}
// NodeUpdated is used to notify NoExecuteTaintManager about Node changes.
func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) {
var err error
oldTaints := []v1.Taint{}
if oldNode != nil {
oldTaints, err = v1.GetNodeTaints(oldNode)
if err != nil {
glog.Errorf("Failed to get Taints from the old Node: %v", err)
return
}
}
oldTaints = getNonExecuteTaints(oldTaints)
newTaints := []v1.Taint{}
if newNode != nil {
newTaints, err = v1.GetNodeTaints(newNode)
if err != nil {
glog.Errorf("Failed to get Taints from the new Node: %v", err)
return
}
}
newTaints = getNonExecuteTaints(newTaints)
if oldNode != nil && newNode != nil && api.Semantic.DeepEqual(oldTaints, newTaints) {
return
}
updateItem := &nodeUpdateItem{
oldNode: oldNode,
newNode: newNode,
newTaints: newTaints,
}
tc.nodeUpdateQueue.Add(updateItemInterface(updateItem))
}
func (tc *NoExecuteTaintManager) processPodOnNode(
podNamespacedName types.NamespacedName,
nodeName string,
tolerations []v1.Toleration,
taints []v1.Taint,
now time.Time,
) {
allTolerated, usedTolerations := v1.GetMatchingTolerations(taints, tolerations)
if !allTolerated {
glog.V(2).Infof("Not all taints are tolerated after upgrade for Pod %v on %v", podNamespacedName.String(), nodeName)
// We're canceling scheduled work (if any), as we're going to delete the Pod right away.
tc.taintEvictionQueue.CancelWork(podNamespacedName.String())
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
return
}
minTolerationTime := getMinTolerationTime(usedTolerations)
// getMinTolerationTime returns negative value to denote infinite toleration.
if minTolerationTime < 0 {
glog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.", podNamespacedName.String())
return
}
startTime := now
triggerTime := startTime.Add(minTolerationTime)
scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
if scheduledEviction != nil {
startTime = scheduledEviction.CreatedAt
if startTime.Add(minTolerationTime).Before(triggerTime) {
return
} else {
tc.taintEvictionQueue.CancelWork(podNamespacedName.String())
}
}
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}
func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) {
// Delete
if podUpdate.newPod == nil {
pod := podUpdate.oldPod
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
glog.V(4).Infof("Noticed pod deletion: %v", podNamespacedName.String())
tc.taintEvictionQueue.CancelWork(podNamespacedName.String())
return
}
// Create or Update
pod := podUpdate.newPod
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
glog.V(4).Infof("Noticed pod update: %v", podNamespacedName.String())
nodeName := pod.Spec.NodeName
if nodeName == "" {
return
}
taints, ok := func() ([]v1.Taint, bool) {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
taints, ok := tc.taintedNodes[nodeName]
return taints, ok
}()
if !ok {
return
}
tc.processPodOnNode(podNamespacedName, nodeName, podUpdate.newTolerations, taints, time.Now())
}
func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) {
// Delete
if nodeUpdate.newNode == nil {
node := nodeUpdate.oldNode
glog.V(4).Infof("Noticed node deletion: %v", node.Name)
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
delete(tc.taintedNodes, node.Name)
return
}
// Create or Update
glog.V(4).Infof("Noticed node update: %v", nodeUpdate)
node := nodeUpdate.newNode
taints := nodeUpdate.newTaints
func() {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
tc.taintedNodes[node.Name] = taints
}()
pods, err := getPodsAssignedToNode(tc.client, node.Name)
if err != nil {
glog.Errorf(err.Error())
return
}
if len(pods) == 0 {
return
}
if len(taints) == 0 {
glog.V(4).Infof("All taints were removed from the Node. Cancelling all evictions...")
for i := range pods {
tc.taintEvictionQueue.CancelWork(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name}.String())
}
return
}
now := time.Now()
for i := range pods {
pod := &pods[i]
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
tolerations, err := v1.GetPodTolerations(pod)
if err != nil {
glog.Errorf("Failed to get Tolerations from Pod %v: %v", podNamespacedName.String(), err)
continue
}
tc.processPodOnNode(podNamespacedName, node.Name, tolerations, taints, now)
}
}

View File

@ -0,0 +1,492 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"encoding/json"
"fmt"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"k8s.io/kubernetes/pkg/controller/node/testutil"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestComputeTaintDifference(t *testing.T) {
testCases := []struct {
lhs []v1.Taint
rhs []v1.Taint
expectedDifference []v1.Taint
description string
}{
{
lhs: []v1.Taint{
{
Key: "one",
Value: "one",
},
{
Key: "two",
Value: "two",
},
},
rhs: []v1.Taint{
{
Key: "one",
Value: "one",
},
{
Key: "two",
Value: "two",
},
},
description: "Equal sets",
},
{
lhs: []v1.Taint{
{
Key: "one",
Value: "one",
},
},
expectedDifference: []v1.Taint{
{
Key: "one",
Value: "one",
},
},
description: "Right is empty",
},
{
rhs: []v1.Taint{
{
Key: "one",
Value: "one",
},
},
description: "Left is empty",
},
{
lhs: []v1.Taint{
{
Key: "one",
Value: "one",
},
{
Key: "two",
Value: "two",
},
},
rhs: []v1.Taint{
{
Key: "two",
Value: "two",
},
{
Key: "three",
Value: "three",
},
},
expectedDifference: []v1.Taint{
{
Key: "one",
Value: "one",
},
},
description: "Intersecting arrays",
},
}
for _, item := range testCases {
difference := computeTaintDifference(item.lhs, item.rhs)
if !api.Semantic.DeepEqual(difference, item.expectedDifference) {
t.Errorf("%v: difference in not what expected. Got %v, expected %v", item.description, difference, item.expectedDifference)
}
}
}
func createNoExecuteTaint(index int) v1.Taint {
return v1.Taint{
Key: "testTaint" + fmt.Sprintf("%v", index),
Value: "test" + fmt.Sprintf("%v", index),
Effect: v1.TaintEffectNoExecute,
TimeAdded: metav1.Now(),
}
}
func addToleration(pod *v1.Pod, index int, duration int64) *v1.Pod {
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
if duration < 0 {
pod.Annotations["scheduler.alpha.kubernetes.io/tolerations"] = `
[
{
"key": "testTaint` + fmt.Sprintf("%v", index) + `",
"value": "test` + fmt.Sprintf("%v", index) + `",
"effect": "` + string(v1.TaintEffectNoExecute) + `"
}
]`
} else {
pod.Annotations["scheduler.alpha.kubernetes.io/tolerations"] = `
[
{
"key": "testTaint` + fmt.Sprintf("%v", index) + `",
"value": "test` + fmt.Sprintf("%v", index) + `",
"effect": "` + string(v1.TaintEffectNoExecute) + `",
"tolerationSeconds": ` + fmt.Sprintf("%v", duration) + `
}
]`
}
return pod
}
func TestCreatePod(t *testing.T) {
testCases := []struct {
description string
pod *v1.Pod
taintedNodes map[string][]v1.Taint
expectDelete bool
}{
{
description: "not scheduled - ignore",
pod: testutil.NewPod("pod1", ""),
taintedNodes: map[string][]v1.Taint{},
expectDelete: false,
},
{
description: "scheduled on untainted Node",
pod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]v1.Taint{},
expectDelete: false,
},
{
description: "schedule on tainted Node",
pod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: true,
},
{
description: "schedule on tainted Node with finite toleration",
pod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: false,
},
{
description: "schedule on tainted Node with infinite toleration",
pod: addToleration(testutil.NewPod("pod1", "node1"), 1, -1),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: false,
},
{
description: "schedule on tainted Node with infinite ivalid toleration",
pod: addToleration(testutil.NewPod("pod1", "node1"), 2, -1),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: true,
},
}
for _, item := range testCases {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset()
controller := NewNoExecuteTaintManager(fakeClientset)
go controller.Run(stopCh)
controller.taintedNodes = item.taintedNodes
controller.PodUpdated(nil, item.pod)
// wait a bit
time.Sleep(200 * time.Millisecond)
podDeleted := false
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
}
}
if podDeleted != item.expectDelete {
t.Errorf("%v: Unexepected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted)
}
close(stopCh)
}
}
func TestDeletePod(t *testing.T) {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset()
controller := NewNoExecuteTaintManager(fakeClientset)
go controller.Run(stopCh)
controller.taintedNodes = map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
}
controller.PodUpdated(testutil.NewPod("pod1", "node1"), nil)
// wait a bit to see if nothing will panic
time.Sleep(200 * time.Millisecond)
close(stopCh)
}
func TestUpdatePod(t *testing.T) {
testCases := []struct {
description string
prevPod *v1.Pod
newPod *v1.Pod
taintedNodes map[string][]v1.Taint
expectDelete bool
additionalSleep time.Duration
}{
{
description: "scheduling onto tainted Node",
prevPod: testutil.NewPod("pod1", ""),
newPod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: true,
},
{
description: "scheduling onto tainted Node with toleration",
prevPod: addToleration(testutil.NewPod("pod1", ""), 1, -1),
newPod: addToleration(testutil.NewPod("pod1", "node1"), 1, -1),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: false,
},
{
description: "removing toleration",
prevPod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
newPod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: true,
},
{
description: "lengthening toleration shouldn't work",
prevPod: addToleration(testutil.NewPod("pod1", "node1"), 1, 1),
newPod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: true,
additionalSleep: 1500 * time.Millisecond,
},
}
for _, item := range testCases {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset()
controller := NewNoExecuteTaintManager(fakeClientset)
go controller.Run(stopCh)
controller.taintedNodes = item.taintedNodes
controller.PodUpdated(nil, item.prevPod)
fakeClientset.ClearActions()
time.Sleep(200 * time.Millisecond)
controller.PodUpdated(item.prevPod, item.newPod)
// wait a bit
time.Sleep(200 * time.Millisecond)
if item.additionalSleep > 0 {
time.Sleep(item.additionalSleep)
}
podDeleted := false
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
}
}
if podDeleted != item.expectDelete {
t.Errorf("%v: Unexepected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted)
}
close(stopCh)
}
}
func addTaintsToNode(node *v1.Node, key, value string, indices []int) *v1.Node {
taints := []v1.Taint{}
for _, index := range indices {
taints = append(taints, createNoExecuteTaint(index))
}
taintsData, err := json.Marshal(taints)
if err != nil {
panic(err)
}
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}
node.Annotations[v1.TaintsAnnotationKey] = string(taintsData)
return node
}
func TestCreateNode(t *testing.T) {
testCases := []struct {
description string
pods []v1.Pod
node *v1.Node
expectDelete bool
}{
{
description: "Creating Node maching already assigned Pod",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
node: testutil.NewNode("node1"),
expectDelete: false,
},
{
description: "Creating tainted Node maching already assigned Pod",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
node: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectDelete: true,
},
{
description: "Creating tainted Node maching already assigned tolerating Pod",
pods: []v1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, -1),
},
node: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectDelete: false,
},
}
for _, item := range testCases {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
controller := NewNoExecuteTaintManager(fakeClientset)
go controller.Run(stopCh)
controller.NodeUpdated(nil, item.node)
// wait a bit
time.Sleep(200 * time.Millisecond)
podDeleted := false
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
}
}
if podDeleted != item.expectDelete {
t.Errorf("%v: Unexepected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted)
}
close(stopCh)
}
}
func TestDeleteNode(t *testing.T) {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset()
controller := NewNoExecuteTaintManager(fakeClientset)
controller.taintedNodes = map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
}
go controller.Run(stopCh)
controller.NodeUpdated(testutil.NewNode("node1"), nil)
// wait a bit to see if nothing will panic
time.Sleep(200 * time.Millisecond)
controller.taintedNodesLock.Lock()
if _, ok := controller.taintedNodes["node1"]; ok {
t.Error("Node should have been deleted from taintedNodes list")
}
controller.taintedNodesLock.Unlock()
close(stopCh)
}
func TestUpdateNode(t *testing.T) {
testCases := []struct {
description string
pods []v1.Pod
oldNode *v1.Node
newNode *v1.Node
expectDelete bool
additionalSleep time.Duration
}{
{
description: "Added taint",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectDelete: true,
},
{
description: "Added tolerated taint",
pods: []v1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectDelete: false,
},
{
description: "Only one added taint tolerated",
pods: []v1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}),
expectDelete: true,
},
{
description: "Taint removed",
pods: []v1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, 1),
},
oldNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
newNode: testutil.NewNode("node1"),
expectDelete: false,
additionalSleep: 1500 * time.Millisecond,
},
}
for _, item := range testCases {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
controller := NewNoExecuteTaintManager(fakeClientset)
go controller.Run(stopCh)
controller.NodeUpdated(item.oldNode, item.newNode)
// wait a bit
time.Sleep(200 * time.Millisecond)
if item.additionalSleep > 0 {
time.Sleep(item.additionalSleep)
}
podDeleted := false
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
}
}
if podDeleted != item.expectDelete {
t.Errorf("%v: Unexepected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted)
}
close(stopCh)
}
}

View File

@ -0,0 +1,138 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/types"
"github.com/golang/glog"
)
// WorkArgs keeps arguments that will be passed to tha function executed by the worker.
type WorkArgs struct {
NamespacedName types.NamespacedName
}
// KeyFromWorkArgs creates a key for the given `WorkArgs`
func (w *WorkArgs) KeyFromWorkArgs() string {
return w.NamespacedName.String()
}
// NewWorkArgs is a helper function to create new `WorkArgs`
func NewWorkArgs(name, namespace string) *WorkArgs {
return &WorkArgs{types.NamespacedName{Namespace: namespace, Name: name}}
}
// TimedWorker is a responsible for executing a function no earlier than at FireAt time.
type TimedWorker struct {
WorkItem *WorkArgs
CreatedAt time.Time
FireAt time.Time
Timer *time.Timer
}
// CreateWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`.
func CreateWorker(args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(args *WorkArgs) error) *TimedWorker {
delay := fireAt.Sub(time.Now())
if delay <= 0 {
go f(args)
return nil
}
timer := time.AfterFunc(delay, func() { f(args) })
return &TimedWorker{
WorkItem: args,
CreatedAt: createdAt,
FireAt: fireAt,
Timer: timer,
}
}
// Cancel cancels the execution of function by the `TimedWorker`
func (w *TimedWorker) Cancel() {
if w != nil {
w.Timer.Stop()
}
}
// TimedWorkerQueue keeps a set of TimedWorkers that still wait for execution.
type TimedWorkerQueue struct {
sync.Mutex
workers map[string]*TimedWorker
workFunc func(args *WorkArgs) error
}
// CreateWorkerQueue creates a new TimedWorkerQueue for workers that will execute
// given function `f`.
func CreateWorkerQueue(f func(args *WorkArgs) error) *TimedWorkerQueue {
return &TimedWorkerQueue{
workers: make(map[string]*TimedWorker),
workFunc: f,
}
}
func (q *TimedWorkerQueue) getWrappedWorkerFunc(key string) func(args *WorkArgs) error {
return func(args *WorkArgs) error {
err := q.workFunc(args)
q.Lock()
defer q.Unlock()
if err == nil {
q.workers[key] = nil
} else {
delete(q.workers, key)
}
return err
}
}
// AddWork adds a work to the WorkerQueue which will be executed not earlier than `fireAt`.
func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt time.Time) {
key := args.KeyFromWorkArgs()
q.Lock()
defer q.Unlock()
if _, exists := q.workers[key]; exists {
glog.Warningf("Trying to add already existing work for %+v. Skipping.", args)
return
}
worker := CreateWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key))
if worker == nil {
return
}
q.workers[key] = worker
}
// CancelWork removes scheduled function execution from the queue.
func (q *TimedWorkerQueue) CancelWork(key string) {
q.Lock()
defer q.Unlock()
worker, found := q.workers[key]
if found {
worker.Cancel()
delete(q.workers, key)
}
}
// GetWorkerUnsafe returns a TimedWorker corresponding to the given key.
// Unsafe method - workers have attached goroutines which can fire afater this function is called.
func (q *TimedWorkerQueue) GetWorkerUnsafe(key string) *TimedWorker {
q.Lock()
defer q.Unlock()
return q.workers[key]
}

View File

@ -0,0 +1,140 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"sync"
"sync/atomic"
"testing"
"time"
)
func TestExecute(t *testing.T) {
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(10)
queue := CreateWorkerQueue(func(args *WorkArgs) error {
atomic.AddInt32(&testVal, 1)
wg.Done()
return nil
})
now := time.Now()
queue.AddWork(NewWorkArgs("1", "1"), now, now)
queue.AddWork(NewWorkArgs("2", "2"), now, now)
queue.AddWork(NewWorkArgs("3", "3"), now, now)
queue.AddWork(NewWorkArgs("4", "4"), now, now)
queue.AddWork(NewWorkArgs("5", "5"), now, now)
queue.AddWork(NewWorkArgs("1", "1"), now, now)
queue.AddWork(NewWorkArgs("2", "2"), now, now)
queue.AddWork(NewWorkArgs("3", "3"), now, now)
queue.AddWork(NewWorkArgs("4", "4"), now, now)
queue.AddWork(NewWorkArgs("5", "5"), now, now)
wg.Wait()
lastVal := atomic.LoadInt32(&testVal)
if lastVal != 10 {
t.Errorf("Espected testVal = 10, got %v", lastVal)
}
}
func TestExecuteDelayed(t *testing.T) {
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(5)
queue := CreateWorkerQueue(func(args *WorkArgs) error {
atomic.AddInt32(&testVal, 1)
wg.Done()
return nil
})
now := time.Now()
then := now.Add(time.Second)
queue.AddWork(NewWorkArgs("1", "1"), now, then)
queue.AddWork(NewWorkArgs("2", "2"), now, then)
queue.AddWork(NewWorkArgs("3", "3"), now, then)
queue.AddWork(NewWorkArgs("4", "4"), now, then)
queue.AddWork(NewWorkArgs("5", "5"), now, then)
queue.AddWork(NewWorkArgs("1", "1"), now, then)
queue.AddWork(NewWorkArgs("2", "2"), now, then)
queue.AddWork(NewWorkArgs("3", "3"), now, then)
queue.AddWork(NewWorkArgs("4", "4"), now, then)
queue.AddWork(NewWorkArgs("5", "5"), now, then)
wg.Wait()
lastVal := atomic.LoadInt32(&testVal)
if lastVal != 5 {
t.Errorf("Espected testVal = 5, got %v", lastVal)
}
}
func TestCancel(t *testing.T) {
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(3)
queue := CreateWorkerQueue(func(args *WorkArgs) error {
atomic.AddInt32(&testVal, 1)
wg.Done()
return nil
})
now := time.Now()
then := now.Add(time.Second)
queue.AddWork(NewWorkArgs("1", "1"), now, then)
queue.AddWork(NewWorkArgs("2", "2"), now, then)
queue.AddWork(NewWorkArgs("3", "3"), now, then)
queue.AddWork(NewWorkArgs("4", "4"), now, then)
queue.AddWork(NewWorkArgs("5", "5"), now, then)
queue.AddWork(NewWorkArgs("1", "1"), now, then)
queue.AddWork(NewWorkArgs("2", "2"), now, then)
queue.AddWork(NewWorkArgs("3", "3"), now, then)
queue.AddWork(NewWorkArgs("4", "4"), now, then)
queue.AddWork(NewWorkArgs("5", "5"), now, then)
queue.CancelWork(NewWorkArgs("2", "2").KeyFromWorkArgs())
queue.CancelWork(NewWorkArgs("4", "4").KeyFromWorkArgs())
wg.Wait()
lastVal := atomic.LoadInt32(&testVal)
if lastVal != 3 {
t.Errorf("Espected testVal = 3, got %v", lastVal)
}
}
func TestCancelAndReadd(t *testing.T) {
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(4)
queue := CreateWorkerQueue(func(args *WorkArgs) error {
atomic.AddInt32(&testVal, 1)
wg.Done()
return nil
})
now := time.Now()
then := now.Add(time.Second)
queue.AddWork(NewWorkArgs("1", "1"), now, then)
queue.AddWork(NewWorkArgs("2", "2"), now, then)
queue.AddWork(NewWorkArgs("3", "3"), now, then)
queue.AddWork(NewWorkArgs("4", "4"), now, then)
queue.AddWork(NewWorkArgs("5", "5"), now, then)
queue.AddWork(NewWorkArgs("1", "1"), now, then)
queue.AddWork(NewWorkArgs("2", "2"), now, then)
queue.AddWork(NewWorkArgs("3", "3"), now, then)
queue.AddWork(NewWorkArgs("4", "4"), now, then)
queue.AddWork(NewWorkArgs("5", "5"), now, then)
queue.CancelWork(NewWorkArgs("2", "2").KeyFromWorkArgs())
queue.CancelWork(NewWorkArgs("4", "4").KeyFromWorkArgs())
queue.AddWork(NewWorkArgs("2", "2"), now, then)
wg.Wait()
lastVal := atomic.LoadInt32(&testVal)
if lastVal != 4 {
t.Errorf("Espected testVal = 4, got %v", lastVal)
}
}

335
test/e2e/taints_test.go Normal file
View File

@ -0,0 +1,335 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/test/e2e/framework"
testutils "k8s.io/kubernetes/test/utils"
. "github.com/onsi/ginkgo"
_ "github.com/stretchr/testify/assert"
)
func getTestTaint() v1.Taint {
return v1.Taint{
Key: "kubernetes.io/e2e-evict-taint-key",
Value: "evictTaintVal",
Effect: v1.TaintEffectNoExecute,
TimeAdded: metav1.Now(),
}
}
// Creates a defaut pod for this test, with argument saying if the Pod should have
// toleration for Taits used in this test.
func createPodForTaintsTest(hasToleration bool, tolerationSeconds int, podName, ns string) *v1.Pod {
grace := int64(1)
if !hasToleration {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: ns,
Labels: map[string]string{"name": podName},
DeletionGracePeriodSeconds: &grace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "pause",
Image: "kubernetes/pause",
},
},
},
}
} else {
if tolerationSeconds <= 0 {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: ns,
Labels: map[string]string{"name": podName},
DeletionGracePeriodSeconds: &grace,
// default - tolerate forever
Annotations: map[string]string{
"scheduler.alpha.kubernetes.io/tolerations": `
[
{
"key": "kubernetes.io/e2e-evict-taint-key",
"value": "evictTaintVal",
"effect": "` + string(v1.TaintEffectNoExecute) + `"
}
]`,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "pause",
Image: "kubernetes/pause",
},
},
},
}
} else {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: ns,
Labels: map[string]string{"name": podName},
DeletionGracePeriodSeconds: &grace,
// default - tolerate forever
Annotations: map[string]string{
"scheduler.alpha.kubernetes.io/tolerations": `
[
{
"key": "kubernetes.io/e2e-evict-taint-key",
"value": "evictTaintVal",
"effect": "` + string(v1.TaintEffectNoExecute) + `",
"tolerationSeconds": ` + fmt.Sprintf("%v", tolerationSeconds) + `
}
]`,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "pause",
Image: "kubernetes/pause",
},
},
},
}
}
}
}
// Creates and startes a controller (informer) that watches updates on a pod in given namespace with given name. It puts a new
// struct into observedDeletion channel for every deletion it sees.
func createTestController(cs clientset.Interface, observedDeletions chan struct{}, stopCh chan struct{}, podName, ns string) {
_, controller := cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fields.SelectorFromSet(fields.Set{"metadata.name": podName}).String()
obj, err := cs.Core().Pods(ns).List(options)
return runtime.Object(obj), err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fields.SelectorFromSet(fields.Set{"metadata.name": podName}).String()
return cs.Core().Pods(ns).Watch(options)
},
},
&v1.Pod{},
0,
cache.ResourceEventHandlerFuncs{
DeleteFunc: func(oldObj interface{}) { observedDeletions <- struct{}{} },
},
)
framework.Logf("Starting informer...")
go controller.Run(stopCh)
}
// Tests the behavior of NoExecuteTaintManager. Following scenarios are included:
// - eviction of non-tolerating pods from a tainted node,
// - lack of eviction of tolerating pods from a tainted node,
// - delayed eviction of short-tolerating pod from a tainted node,
// - lack of eviction of short-tolerating pod after taint removal.
var _ = framework.KubeDescribe("NoExecuteTaintManager [Serial]", func() {
var cs clientset.Interface
var nodeList *v1.NodeList
var ns string
f := framework.NewDefaultFramework("taint-control")
BeforeEach(func() {
cs = f.ClientSet
ns = f.Namespace.Name
nodeList = &v1.NodeList{}
framework.WaitForAllNodesHealthy(cs, time.Minute)
err := framework.CheckTestingNSDeletedExcept(cs, ns)
framework.ExpectNoError(err)
})
// 1. Run a pod
// 2. Taint the node running this pod with a no-execute taint
// 3. See if pod will get evicted
It("evicts pods from tainted nodes", func() {
podName := "taint-eviction-1"
pod := createPodForTaintsTest(false, 0, podName, ns)
observedDeletions := make(chan struct{}, 100)
stopCh := make(chan struct{})
createTestController(cs, observedDeletions, stopCh, podName, ns)
By("Staring pod...")
nodeName, err := testutils.RunPodAndGetNodeName(cs, pod, 2*time.Minute)
framework.ExpectNoError(err)
framework.Logf("Pod is running on %v. Tainting Node", nodeName)
By("Trying to apply a taint on the Node")
testTaint := getTestTaint()
framework.AddOrUpdateTaintOnNode(cs, nodeName, testTaint)
framework.ExpectNodeHasTaint(cs, nodeName, testTaint)
defer framework.RemoveTaintOffNode(cs, nodeName, testTaint)
// Wait a bit
By("Waiting for Pod to be deleted")
timeoutChannel := time.NewTimer(10 * time.Second).C
select {
case <-timeoutChannel:
framework.Failf("Failed to evict Pod")
case <-observedDeletions:
framework.Logf("Noticed Pod eviction. Test successful")
}
})
// 1. Run a pod with toleration
// 2. Taint the node running this pod with a no-execute taint
// 3. See if pod wont get evicted
It("doesn't evict pod with tolerations from tainted nodes", func() {
podName := "taint-eviction-2"
pod := createPodForTaintsTest(true, 0, podName, ns)
observedDeletions := make(chan struct{}, 100)
stopCh := make(chan struct{})
createTestController(cs, observedDeletions, stopCh, podName, ns)
By("Staring pod...")
nodeName, err := testutils.RunPodAndGetNodeName(cs, pod, 2*time.Minute)
framework.ExpectNoError(err)
framework.Logf("Pod is running on %v. Tainting Node", nodeName)
By("Trying to apply a taint on the Node")
testTaint := getTestTaint()
framework.AddOrUpdateTaintOnNode(cs, nodeName, testTaint)
framework.ExpectNodeHasTaint(cs, nodeName, testTaint)
defer framework.RemoveTaintOffNode(cs, nodeName, testTaint)
// Wait a bit
By("Waiting for Pod to be deleted")
timeoutChannel := time.NewTimer(10 * time.Second).C
select {
case <-timeoutChannel:
framework.Logf("Pod wasn't evicted. Test successful")
case <-observedDeletions:
framework.Failf("Pod was evicted despite toleration")
}
})
// 1. Run a pod with a finite toleration
// 2. Taint the node running this pod with a no-execute taint
// 3. See if pod wont get evicted before toleration time runs out
// 4. See if pod will get evicted after toleration time runs out
It("eventually evict pod with finite tolerations from tainted nodes", func() {
podName := "taint-eviction-3"
pod := createPodForTaintsTest(true, 5, podName, ns)
observedDeletions := make(chan struct{}, 100)
stopCh := make(chan struct{})
createTestController(cs, observedDeletions, stopCh, podName, ns)
By("Staring pod...")
nodeName, err := testutils.RunPodAndGetNodeName(cs, pod, 2*time.Minute)
framework.ExpectNoError(err)
framework.Logf("Pod is running on %v. Tainting Node", nodeName)
By("Trying to apply a taint on the Node")
testTaint := getTestTaint()
framework.AddOrUpdateTaintOnNode(cs, nodeName, testTaint)
framework.ExpectNodeHasTaint(cs, nodeName, testTaint)
defer framework.RemoveTaintOffNode(cs, nodeName, testTaint)
// Wait a bit
By("Waiting to see if a Pod won't be deleted")
timeoutChannel := time.NewTimer(2 * time.Second).C
select {
case <-timeoutChannel:
framework.Logf("Pod wasn't evicted")
case <-observedDeletions:
framework.Failf("Pod was evicted despite toleration")
return
}
By("Waiting for Pod to be deleted")
timeoutChannel = time.NewTimer(10 * time.Second).C
select {
case <-timeoutChannel:
framework.Failf("Pod wasn't evicted")
case <-observedDeletions:
framework.Logf("Pod was evicted after toleration time run out. Test successful")
return
}
})
// 1. Run a pod with short toleration
// 2. Taint the node running this pod with a no-execute taint
// 3. Wait some time
// 4. Remove the taint
// 5. See if Pod won't be evicted.
It("removing taint cancels eviction", func() {
podName := "taint-eviction-4"
pod := createPodForTaintsTest(true, 5, podName, ns)
observedDeletions := make(chan struct{}, 100)
stopCh := make(chan struct{})
createTestController(cs, observedDeletions, stopCh, podName, ns)
By("Staring pod...")
nodeName, err := testutils.RunPodAndGetNodeName(cs, pod, 2*time.Minute)
framework.ExpectNoError(err)
framework.Logf("Pod is running on %v. Tainting Node", nodeName)
By("Trying to apply a taint on the Node")
testTaint := getTestTaint()
framework.AddOrUpdateTaintOnNode(cs, nodeName, testTaint)
framework.ExpectNodeHasTaint(cs, nodeName, testTaint)
taintRemoved := false
defer func() {
if !taintRemoved {
framework.RemoveTaintOffNode(cs, nodeName, testTaint)
}
}()
// Wait a bit
By("Waiting short time to make sure Pod is queued for deletion")
timeoutChannel := time.NewTimer(2 * time.Second).C
select {
case <-timeoutChannel:
framework.Logf("Pod wasn't evicted. Proceeding")
case <-observedDeletions:
framework.Failf("Pod was evicted despite toleration")
return
}
framework.Logf("Removing taint from Node")
framework.RemoveTaintOffNode(cs, nodeName, testTaint)
taintRemoved = true
By("Waiting some time to make sure that toleration time passed.")
timeoutChannel = time.NewTimer(10 * time.Second).C
select {
case <-timeoutChannel:
framework.Logf("Pod wasn't evicted. Test successful")
case <-observedDeletions:
framework.Failf("Pod was evicted despite toleration")
}
})
})

View File

@ -52,6 +52,46 @@ const (
nonExist = "NonExist"
)
func WaitUntilPodIsScheduled(c clientset.Interface, name, namespace string, timeout time.Duration) (*v1.Pod, error) {
// Wait until it's scheduled
p, err := c.Core().Pods(namespace).Get(name, metav1.GetOptions{ResourceVersion: "0"})
if err == nil && p.Spec.NodeName != "" {
return p, nil
}
pollingPeriod := 200 * time.Millisecond
startTime := time.Now()
for startTime.Add(timeout).After(time.Now()) {
time.Sleep(pollingPeriod)
p, err := c.Core().Pods(namespace).Get(name, metav1.GetOptions{ResourceVersion: "0"})
if err == nil && p.Spec.NodeName != "" {
return p, nil
}
}
return nil, fmt.Errorf("Timed out after %v when waiting for pod %v/%v to start.", timeout, namespace, name)
}
func RunPodAndGetNodeName(c clientset.Interface, pod *v1.Pod, timeout time.Duration) (string, error) {
retries := 5
name := pod.Name
namespace := pod.Namespace
var err error
// Create a Pod
for i := 0; i < retries; i++ {
_, err = c.Core().Pods(namespace).Create(pod)
if err == nil || apierrs.IsAlreadyExists(err) {
break
}
}
if err != nil && !apierrs.IsAlreadyExists(err) {
return "", err
}
p, err := WaitUntilPodIsScheduled(c, name, namespace, timeout)
if err != nil {
return "", err
}
return p.Spec.NodeName, nil
}
type RunObjectConfig interface {
Run() error
GetName() string