From ba9410621fe048da1483cd1cc889e3b6db23e8dc Mon Sep 17 00:00:00 2001 From: Connor Doyle Date: Fri, 10 Mar 2017 11:10:59 -0800 Subject: [PATCH] Move node and event observer helpers to e2e/common --- test/e2e/common/BUILD | 4 + test/e2e/common/events.go | 144 ++++++++++++++++++++++++++++++++++++ test/e2e/opaque_resource.go | 137 +++------------------------------- 3 files changed, 159 insertions(+), 126 deletions(-) create mode 100644 test/e2e/common/events.go diff --git a/test/e2e/common/BUILD b/test/e2e/common/BUILD index baaf0072e7..d582fcbfe1 100644 --- a/test/e2e/common/BUILD +++ b/test/e2e/common/BUILD @@ -17,6 +17,7 @@ go_library( "downward_api.go", "downwardapi_volume.go", "empty_dir.go", + "events.go", "expansion.go", "host_path.go", "init_container.go", @@ -50,12 +51,15 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/api/resource", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/fields", "//vendor:k8s.io/apimachinery/pkg/labels", + "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/util/intstr", "//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/util/uuid", "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/watch", + "//vendor:k8s.io/client-go/tools/cache", ], ) diff --git a/test/e2e/common/events.go b/test/e2e/common/events.go new file mode 100644 index 0000000000..b8e03c0f56 --- /dev/null +++ b/test/e2e/common/events.go @@ -0,0 +1,144 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "fmt" + "sync" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +// Returns true if a node update matching the predicate was emitted from the +// system after performing the supplied action. +func ObserveNodeUpdateAfterAction(f *framework.Framework, nodeName string, nodePredicate func(*v1.Node) bool, action func() error) (bool, error) { + observedMatchingNode := false + nodeSelector := fields.OneTermEqualSelector("metadata.name", nodeName) + informerStartedChan := make(chan struct{}) + var informerStartedGuard sync.Once + + _, controller := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = nodeSelector.String() + ls, err := f.ClientSet.Core().Nodes().List(options) + return ls, err + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = nodeSelector.String() + w, err := f.ClientSet.Core().Nodes().Watch(options) + // Signal parent goroutine that watching has begun. + informerStartedGuard.Do(func() { close(informerStartedChan) }) + return w, err + }, + }, + &v1.Node{}, + 0, + cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + n, ok := newObj.(*v1.Node) + Expect(ok).To(Equal(true)) + if nodePredicate(n) { + observedMatchingNode = true + } + }, + }, + ) + + // Start the informer and block this goroutine waiting for the started signal. + informerStopChan := make(chan struct{}) + defer func() { close(informerStopChan) }() + go controller.Run(informerStopChan) + <-informerStartedChan + + // Invoke the action function. + err := action() + if err != nil { + return false, err + } + + // Poll whether the informer has found a matching node update with a timeout. + // Wait up 2 minutes polling every second. + timeout := 2 * time.Minute + interval := 1 * time.Second + err = wait.Poll(interval, timeout, func() (bool, error) { + return observedMatchingNode, nil + }) + return err == nil, err +} + +// Returns true if an event matching the predicate was emitted from the system +// after performing the supplied action. +func ObserveEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Event) bool, action func() error) (bool, error) { + observedMatchingEvent := false + + // Create an informer to list/watch events from the test framework namespace. + _, controller := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + ls, err := f.ClientSet.Core().Events(f.Namespace.Name).List(options) + return ls, err + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + w, err := f.ClientSet.Core().Events(f.Namespace.Name).Watch(options) + return w, err + }, + }, + &v1.Event{}, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + e, ok := obj.(*v1.Event) + By(fmt.Sprintf("Considering event: \nType = [%s], Name = [%s], Reason = [%s], Message = [%s]", e.Type, e.Name, e.Reason, e.Message)) + Expect(ok).To(Equal(true)) + if ok && eventPredicate(e) { + observedMatchingEvent = true + } + }, + }, + ) + + informerStopChan := make(chan struct{}) + defer func() { close(informerStopChan) }() + go controller.Run(informerStopChan) + + // Invoke the action function. + err := action() + if err != nil { + return false, err + } + + // Poll whether the informer has found a matching event with a timeout. + // Wait up 2 minutes polling every second. + timeout := 2 * time.Minute + interval := 1 * time.Second + err = wait.Poll(interval, timeout, func() (bool, error) { + return observedMatchingEvent, nil + }) + return err == nil, err +} diff --git a/test/e2e/opaque_resource.go b/test/e2e/opaque_resource.go index 24bb8e3c01..7e4e4e64ae 100644 --- a/test/e2e/opaque_resource.go +++ b/test/e2e/opaque_resource.go @@ -19,19 +19,13 @@ package e2e import ( "fmt" "strings" - "sync" - "time" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/util/system" + "k8s.io/kubernetes/test/e2e/common" "k8s.io/kubernetes/test/e2e/framework" . "github.com/onsi/ginkgo" @@ -85,7 +79,7 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun // Here we don't check for the bound node name since it can land on // any one (this pod doesn't require any of the opaque resource.) predicate := scheduleSuccess(pod.Name, "") - success, err := observeEventAfterAction(f, predicate, action) + success, err := common.ObserveEventAfterAction(f, predicate, action) Expect(err).NotTo(HaveOccurred()) Expect(success).To(Equal(true)) }) @@ -110,7 +104,7 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun return err } predicate := scheduleSuccess(pod.Name, node.Name) - success, err := observeEventAfterAction(f, predicate, action) + success, err := common.ObserveEventAfterAction(f, predicate, action) Expect(err).NotTo(HaveOccurred()) Expect(success).To(Equal(true)) }) @@ -128,7 +122,7 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun return err } predicate := scheduleFailure("over-max-oir") - success, err := observeEventAfterAction(f, predicate, action) + success, err := common.ObserveEventAfterAction(f, predicate, action) Expect(err).NotTo(HaveOccurred()) Expect(success).To(Equal(true)) }) @@ -173,7 +167,7 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun return err } predicate := scheduleSuccess(pod.Name, node.Name) - success, err := observeEventAfterAction(f, predicate, action) + success, err := common.ObserveEventAfterAction(f, predicate, action) Expect(err).NotTo(HaveOccurred()) Expect(success).To(Equal(true)) @@ -213,7 +207,7 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun return err } predicate = scheduleFailure(pod.Name) - success, err = observeEventAfterAction(f, predicate, action) + success, err = common.ObserveEventAfterAction(f, predicate, action) Expect(err).NotTo(HaveOccurred()) Expect(success).To(Equal(true)) }) @@ -239,7 +233,7 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun return err } predicate := scheduleSuccess(pod1.Name, node.Name) - success, err := observeEventAfterAction(f, predicate, action) + success, err := common.ObserveEventAfterAction(f, predicate, action) Expect(err).NotTo(HaveOccurred()) Expect(success).To(Equal(true)) @@ -249,7 +243,7 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun return err } predicate = scheduleFailure(pod2.Name) - success, err = observeEventAfterAction(f, predicate, action) + success, err = common.ObserveEventAfterAction(f, predicate, action) Expect(err).NotTo(HaveOccurred()) Expect(success).To(Equal(true)) @@ -259,7 +253,7 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun return err } predicate = scheduleSuccess(pod2.Name, node.Name) - success, err = observeEventAfterAction(f, predicate, action) + success, err = common.ObserveEventAfterAction(f, predicate, action) Expect(err).NotTo(HaveOccurred()) Expect(success).To(Equal(true)) }) @@ -279,7 +273,7 @@ func addOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1 return foundCap && capacity.MilliValue() == int64(5000) && foundAlloc && allocatable.MilliValue() == int64(5000) } - success, err := observeNodeUpdateAfterAction(f, nodeName, predicate, action) + success, err := common.ObserveNodeUpdateAfterAction(f, nodeName, predicate, action) Expect(err).NotTo(HaveOccurred()) Expect(success).To(Equal(true)) } @@ -298,7 +292,7 @@ func removeOpaqueResource(f *framework.Framework, nodeName string, opaqueResName By(fmt.Sprintf("Node [%s] has OIR capacity: [%t] (%s), has OIR allocatable: [%t] (%s)", n.Name, foundCap, capacity.String(), foundAlloc, allocatable.String())) return (!foundCap || capacity.IsZero()) && (!foundAlloc || allocatable.IsZero()) } - success, err := observeNodeUpdateAfterAction(f, nodeName, predicate, action) + success, err := common.ObserveNodeUpdateAfterAction(f, nodeName, predicate, action) Expect(err).NotTo(HaveOccurred()) Expect(success).To(Equal(true)) } @@ -309,115 +303,6 @@ func escapeForJSONPatch(resName v1.ResourceName) string { return strings.Replace(string(resName), "/", "~1", -1) } -// Returns true if a node update matching the predicate was emitted from the -// system after performing the supplied action. -func observeNodeUpdateAfterAction(f *framework.Framework, nodeName string, nodePredicate func(*v1.Node) bool, action func() error) (bool, error) { - observedMatchingNode := false - nodeSelector := fields.OneTermEqualSelector("metadata.name", nodeName) - informerStartedChan := make(chan struct{}) - var informerStartedGuard sync.Once - - _, controller := cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.FieldSelector = nodeSelector.String() - ls, err := f.ClientSet.Core().Nodes().List(options) - return ls, err - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.FieldSelector = nodeSelector.String() - w, err := f.ClientSet.Core().Nodes().Watch(options) - // Signal parent goroutine that watching has begun. - informerStartedGuard.Do(func() { close(informerStartedChan) }) - return w, err - }, - }, - &v1.Node{}, - 0, - cache.ResourceEventHandlerFuncs{ - UpdateFunc: func(oldObj, newObj interface{}) { - n, ok := newObj.(*v1.Node) - Expect(ok).To(Equal(true)) - if nodePredicate(n) { - observedMatchingNode = true - } - }, - }, - ) - - // Start the informer and block this goroutine waiting for the started signal. - informerStopChan := make(chan struct{}) - defer func() { close(informerStopChan) }() - go controller.Run(informerStopChan) - <-informerStartedChan - - // Invoke the action function. - err := action() - if err != nil { - return false, err - } - - // Poll whether the informer has found a matching node update with a timeout. - // Wait up 2 minutes polling every second. - timeout := 2 * time.Minute - interval := 1 * time.Second - err = wait.Poll(interval, timeout, func() (bool, error) { - return observedMatchingNode, nil - }) - return err == nil, err -} - -// Returns true if an event matching the predicate was emitted from the system -// after performing the supplied action. -func observeEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Event) bool, action func() error) (bool, error) { - observedMatchingEvent := false - - // Create an informer to list/watch events from the test framework namespace. - _, controller := cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - ls, err := f.ClientSet.Core().Events(f.Namespace.Name).List(options) - return ls, err - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - w, err := f.ClientSet.Core().Events(f.Namespace.Name).Watch(options) - return w, err - }, - }, - &v1.Event{}, - 0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - e, ok := obj.(*v1.Event) - By(fmt.Sprintf("Considering event: \nType = [%s], Name = [%s], Reason = [%s], Message = [%s]", e.Type, e.Name, e.Reason, e.Message)) - Expect(ok).To(Equal(true)) - if ok && eventPredicate(e) { - observedMatchingEvent = true - } - }, - }, - ) - - informerStopChan := make(chan struct{}) - defer func() { close(informerStopChan) }() - go controller.Run(informerStopChan) - - // Invoke the action function. - err := action() - if err != nil { - return false, err - } - - // Poll whether the informer has found a matching event with a timeout. - // Wait up 2 minutes polling every second. - timeout := 2 * time.Minute - interval := 1 * time.Second - err = wait.Poll(interval, timeout, func() (bool, error) { - return observedMatchingEvent, nil - }) - return err == nil, err -} - func scheduleSuccess(podName, nodeName string) func(*v1.Event) bool { return func(e *v1.Event) bool { return e.Type == v1.EventTypeNormal &&