mirror of https://github.com/k3s-io/k3s
Fix waitForScheduler in scheduer predicates e2e tests
parent
355f576c0b
commit
2775a52e7a
|
@ -34,9 +34,11 @@ import (
|
|||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
type Action func() error
|
||||
|
||||
// Returns true if a node update matching the predicate was emitted from the
|
||||
// system after performing the supplied action.
|
||||
func ObserveNodeUpdateAfterAction(f *framework.Framework, nodeName string, nodePredicate func(*v1.Node) bool, action func() error) (bool, error) {
|
||||
func ObserveNodeUpdateAfterAction(f *framework.Framework, nodeName string, nodePredicate func(*v1.Node) bool, action Action) (bool, error) {
|
||||
observedMatchingNode := false
|
||||
nodeSelector := fields.OneTermEqualSelector("metadata.name", nodeName)
|
||||
informerStartedChan := make(chan struct{})
|
||||
|
@ -94,7 +96,7 @@ func ObserveNodeUpdateAfterAction(f *framework.Framework, nodeName string, nodeP
|
|||
|
||||
// Returns true if an event matching the predicate was emitted from the system
|
||||
// after performing the supplied action.
|
||||
func ObserveEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Event) bool, action func() error) (bool, error) {
|
||||
func ObserveEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Event) bool, action Action) (bool, error) {
|
||||
observedMatchingEvent := false
|
||||
informerStartedChan := make(chan struct{})
|
||||
var informerStartedGuard sync.Once
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/test/e2e/common"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
testutils "k8s.io/kubernetes/test/utils"
|
||||
|
||||
|
@ -133,11 +134,10 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
}), true, framework.Logf))
|
||||
}
|
||||
podName := "additional-pod"
|
||||
createPausePod(f, pausePodConfig{
|
||||
WaitForSchedulerAfterAction(f, createPausePodAction(f, pausePodConfig{
|
||||
Name: podName,
|
||||
Labels: map[string]string{"name": "additional"},
|
||||
})
|
||||
waitForScheduler()
|
||||
}), podName, false)
|
||||
verifyResult(cs, podsNeededForSaturation, 1, ns)
|
||||
})
|
||||
|
||||
|
@ -202,7 +202,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
}), true, framework.Logf))
|
||||
}
|
||||
podName := "additional-pod"
|
||||
createPausePod(f, pausePodConfig{
|
||||
conf := pausePodConfig{
|
||||
Name: podName,
|
||||
Labels: map[string]string{"name": "additional"},
|
||||
Resources: &v1.ResourceRequirements{
|
||||
|
@ -210,8 +210,8 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
"cpu": *resource.NewMilliQuantity(milliCpuPerPod, "DecimalSI"),
|
||||
},
|
||||
},
|
||||
})
|
||||
waitForScheduler()
|
||||
}
|
||||
WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), podName, false)
|
||||
verifyResult(cs, podsNeededForSaturation, 1, ns)
|
||||
})
|
||||
|
||||
|
@ -223,22 +223,22 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
|
||||
framework.WaitForStableCluster(cs, masterNodes)
|
||||
|
||||
createPausePod(f, pausePodConfig{
|
||||
conf := pausePodConfig{
|
||||
Name: podName,
|
||||
Labels: map[string]string{"name": "restricted"},
|
||||
NodeSelector: map[string]string{
|
||||
"label": "nonempty",
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
waitForScheduler()
|
||||
WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), podName, false)
|
||||
verifyResult(cs, 0, 1, ns)
|
||||
})
|
||||
|
||||
It("validates that a pod with an invalid NodeAffinity is rejected", func() {
|
||||
By("Trying to launch a pod with an invalid Affinity data.")
|
||||
podName := "without-label"
|
||||
_, err := cs.Core().Pods(ns).Create(initPausePod(f, pausePodConfig{
|
||||
_, err := cs.CoreV1().Pods(ns).Create(initPausePod(f, pausePodConfig{
|
||||
Name: podName,
|
||||
Affinity: &v1.Affinity{
|
||||
NodeAffinity: &v1.NodeAffinity{
|
||||
|
@ -256,9 +256,6 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
if err == nil || !errors.IsInvalid(err) {
|
||||
framework.Failf("Expect error of invalid, got : %v", err)
|
||||
}
|
||||
|
||||
// Wait a bit to allow scheduler to do its thing if the pod is not rejected.
|
||||
waitForScheduler()
|
||||
})
|
||||
|
||||
It("validates that NodeSelector is respected if matching [Conformance]", func() {
|
||||
|
@ -300,7 +297,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
|
||||
framework.WaitForStableCluster(cs, masterNodes)
|
||||
|
||||
createPausePod(f, pausePodConfig{
|
||||
conf := pausePodConfig{
|
||||
Name: podName,
|
||||
Affinity: &v1.Affinity{
|
||||
NodeAffinity: &v1.NodeAffinity{
|
||||
|
@ -328,8 +325,8 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
},
|
||||
},
|
||||
Labels: map[string]string{"name": "restricted"},
|
||||
})
|
||||
waitForScheduler()
|
||||
}
|
||||
WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), podName, false)
|
||||
verifyResult(cs, 0, 1, ns)
|
||||
})
|
||||
|
||||
|
@ -378,7 +375,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
// already when the kubelet does not know about its new label yet. The
|
||||
// kubelet will then refuse to launch the pod.
|
||||
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName))
|
||||
labelPod, err := cs.Core().Pods(ns).Get(labelPodName, metav1.GetOptions{})
|
||||
labelPod, err := cs.CoreV1().Pods(ns).Get(labelPodName, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
Expect(labelPod.Spec.NodeName).To(Equal(nodeName))
|
||||
})
|
||||
|
@ -388,7 +385,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
It("validates that a pod with an invalid podAffinity is rejected because of the LabelSelectorRequirement is invalid", func() {
|
||||
By("Trying to launch a pod with an invalid pod Affinity data.")
|
||||
podName := "without-label-" + string(uuid.NewUUID())
|
||||
_, err := cs.Core().Pods(ns).Create(initPausePod(f, pausePodConfig{
|
||||
_, err := cs.CoreV1().Pods(ns).Create(initPausePod(f, pausePodConfig{
|
||||
Name: podName,
|
||||
Labels: map[string]string{"name": "without-label"},
|
||||
Affinity: &v1.Affinity{
|
||||
|
@ -414,9 +411,6 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
if err == nil || !errors.IsInvalid(err) {
|
||||
framework.Failf("Expect error of invalid, got : %v", err)
|
||||
}
|
||||
|
||||
// Wait a bit to allow scheduler to do its thing if the pod is not rejected.
|
||||
waitForScheduler()
|
||||
})
|
||||
|
||||
// Test Nodes does not have any pod, hence it should be impossible to schedule a Pod with pod affinity.
|
||||
|
@ -424,7 +418,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
By("Trying to schedule Pod with nonempty Pod Affinity.")
|
||||
framework.WaitForStableCluster(cs, masterNodes)
|
||||
podName := "without-label-" + string(uuid.NewUUID())
|
||||
createPausePod(f, pausePodConfig{
|
||||
conf := pausePodConfig{
|
||||
Name: podName,
|
||||
Affinity: &v1.Affinity{
|
||||
PodAffinity: &v1.PodAffinity{
|
||||
|
@ -444,9 +438,9 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
waitForScheduler()
|
||||
WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), podName, false)
|
||||
verifyResult(cs, 0, 1, ns)
|
||||
})
|
||||
|
||||
|
@ -492,7 +486,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
// already when the kubelet does not know about its new label yet. The
|
||||
// kubelet will then refuse to launch the pod.
|
||||
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName))
|
||||
labelPod, err := cs.Core().Pods(ns).Get(labelPodName, metav1.GetOptions{})
|
||||
labelPod, err := cs.CoreV1().Pods(ns).Get(labelPodName, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
Expect(labelPod.Spec.NodeName).To(Equal(nodeName))
|
||||
})
|
||||
|
@ -506,7 +500,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
By("Launching two pods on two distinct nodes to get two node names")
|
||||
CreateHostPortPods(f, "host-port", 2, true)
|
||||
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, "host-port")
|
||||
podList, err := cs.Core().Pods(ns).List(metav1.ListOptions{})
|
||||
podList, err := cs.CoreV1().Pods(ns).List(metav1.ListOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
Expect(len(podList.Items)).To(Equal(2))
|
||||
nodeNames := []string{podList.Items[0].Spec.NodeName, podList.Items[1].Spec.NodeName}
|
||||
|
@ -532,7 +526,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
|
||||
By("Trying to launch another pod, now with podAntiAffinity with same Labels.")
|
||||
labelPodName := "with-podantiaffinity-" + string(uuid.NewUUID())
|
||||
createPausePod(f, pausePodConfig{
|
||||
conf := pausePodConfig{
|
||||
Name: labelPodName,
|
||||
Labels: map[string]string{"service": "Diff"},
|
||||
NodeSelector: map[string]string{k: v}, // only launch on our two nodes, contradicting the podAntiAffinity
|
||||
|
@ -555,9 +549,9 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
waitForScheduler()
|
||||
WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), labelPodName, false)
|
||||
verifyResult(cs, 3, 1, ns)
|
||||
})
|
||||
|
||||
|
@ -609,7 +603,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
// already when the kubelet does not know about its new label yet. The
|
||||
// kubelet will then refuse to launch the pod.
|
||||
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName))
|
||||
labelPod, err := cs.Core().Pods(ns).Get(labelPodName, metav1.GetOptions{})
|
||||
labelPod, err := cs.CoreV1().Pods(ns).Get(labelPodName, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
Expect(labelPod.Spec.NodeName).To(Equal(nodeName))
|
||||
})
|
||||
|
@ -732,18 +726,16 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||
|
||||
By("Trying to relaunch the pod, still no tolerations.")
|
||||
podNameNoTolerations := "still-no-tolerations"
|
||||
createPausePod(f, pausePodConfig{
|
||||
conf := pausePodConfig{
|
||||
Name: podNameNoTolerations,
|
||||
NodeSelector: map[string]string{labelKey: labelValue},
|
||||
})
|
||||
}
|
||||
|
||||
waitForScheduler()
|
||||
WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), podNameNoTolerations, false)
|
||||
verifyResult(cs, 0, 1, ns)
|
||||
|
||||
By("Removing taint off the node")
|
||||
framework.RemoveTaintOffNode(cs, nodeName, testTaint)
|
||||
|
||||
waitForScheduler()
|
||||
WaitForSchedulerAfterAction(f, removeTaintFromNodeAction(cs, nodeName, testTaint), podNameNoTolerations, true)
|
||||
verifyResult(cs, 1, 0, ns)
|
||||
})
|
||||
})
|
||||
|
@ -900,16 +892,38 @@ func getRequestedCPU(pod v1.Pod) int64 {
|
|||
return result
|
||||
}
|
||||
|
||||
func waitForScheduler() {
|
||||
// Wait a bit to allow scheduler to do its thing
|
||||
// TODO: this is brittle; there's no guarantee the scheduler will have run in 10 seconds.
|
||||
framework.Logf("Sleeping 10 seconds and crossing our fingers that scheduler will run in that time.")
|
||||
time.Sleep(10 * time.Second)
|
||||
// removeTaintFromNodeAction returns a closure that removes the given taint
|
||||
// from the given node upon invocation.
|
||||
func removeTaintFromNodeAction(cs clientset.Interface, nodeName string, testTaint v1.Taint) common.Action {
|
||||
return func() error {
|
||||
framework.RemoveTaintOffNode(cs, nodeName, testTaint)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// createPausePodAction returns a closure that creates a pause pod upon invocation.
|
||||
func createPausePodAction(f *framework.Framework, conf pausePodConfig) common.Action {
|
||||
return func() error {
|
||||
_, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(initPausePod(f, conf))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// WaitForSchedulerAfterAction performs the provided action and then waits for
|
||||
// scheduler to act on the given pod.
|
||||
func WaitForSchedulerAfterAction(f *framework.Framework, action common.Action, podName string, expectSuccess bool) {
|
||||
predicate := scheduleFailureEvent(podName)
|
||||
if expectSuccess {
|
||||
predicate = scheduleSuccessEvent(podName, "" /* any node */)
|
||||
}
|
||||
success, err := common.ObserveEventAfterAction(f, predicate, action)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(success).To(Equal(true))
|
||||
}
|
||||
|
||||
// TODO: upgrade calls in PodAffinity tests when we're able to run them
|
||||
func verifyResult(c clientset.Interface, expectedScheduled int, expectedNotScheduled int, ns string) {
|
||||
allPods, err := c.Core().Pods(ns).List(metav1.ListOptions{})
|
||||
allPods, err := c.CoreV1().Pods(ns).List(metav1.ListOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
scheduledPods, notScheduledPods := framework.GetPodsScheduled(masterNodes, allPods)
|
||||
|
||||
|
|
Loading…
Reference in New Issue