mirror of https://github.com/k3s-io/k3s
Add PV protection controller
parent
b99580ba3f
commit
3fee293607
|
@ -76,6 +76,7 @@ go_library(
|
|||
"//pkg/controller/volume/expand:go_default_library",
|
||||
"//pkg/controller/volume/persistentvolume:go_default_library",
|
||||
"//pkg/controller/volume/pvcprotection:go_default_library",
|
||||
"//pkg/controller/volume/pvprotection:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/quota/generic:go_default_library",
|
||||
"//pkg/quota/install:go_default_library",
|
||||
|
|
|
@ -384,6 +384,7 @@ func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc
|
|||
controllers["persistentvolume-expander"] = startVolumeExpandController
|
||||
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
|
||||
controllers["pvc-protection"] = startPVCProtectionController
|
||||
controllers["pv-protection"] = startPVProtectionController
|
||||
|
||||
return controllers
|
||||
}
|
||||
|
|
|
@ -55,6 +55,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/controller/volume/expand"
|
||||
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/pvcprotection"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/pvprotection"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/quota/generic"
|
||||
quotainstall "k8s.io/kubernetes/pkg/quota/install"
|
||||
|
@ -403,3 +404,14 @@ func startPVCProtectionController(ctx ControllerContext) (bool, error) {
|
|||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func startPVProtectionController(ctx ControllerContext) (bool, error) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.StorageProtection) {
|
||||
go pvprotection.NewPVProtectionController(
|
||||
ctx.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
ctx.ClientBuilder.ClientOrDie("pv-protection-controller"),
|
||||
).Run(1, ctx.Stop)
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
|
|
@ -136,6 +136,7 @@ filegroup(
|
|||
"//pkg/controller/volume/expand:all-srcs",
|
||||
"//pkg/controller/volume/persistentvolume:all-srcs",
|
||||
"//pkg/controller/volume/pvcprotection:all-srcs",
|
||||
"//pkg/controller/volume/pvprotection:all-srcs",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["pv_protection_controller.go"],
|
||||
importpath = "k8s.io/kubernetes/pkg/controller/volume/pvprotection",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/util/metrics:go_default_library",
|
||||
"//pkg/util/slice:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["pv_protection_controller_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
importpath = "k8s.io/kubernetes/pkg/controller/volume/pvprotection",
|
||||
deps = [
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//vendor/k8s.io/client-go/testing:go_default_library",
|
||||
],
|
||||
)
|
|
@ -0,0 +1,208 @@
|
|||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package pvprotection
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/util/metrics"
|
||||
"k8s.io/kubernetes/pkg/util/slice"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
|
||||
// Controller is controller that removes PVProtectionFinalizer
|
||||
// from PVs that are not bound to PVCs.
|
||||
type Controller struct {
|
||||
client clientset.Interface
|
||||
|
||||
pvLister corelisters.PersistentVolumeLister
|
||||
pvListerSynced cache.InformerSynced
|
||||
|
||||
queue workqueue.RateLimitingInterface
|
||||
}
|
||||
|
||||
// NewPVProtectionController returns a new *Controller.
|
||||
func NewPVProtectionController(pvInformer coreinformers.PersistentVolumeInformer, cl clientset.Interface) *Controller {
|
||||
e := &Controller{
|
||||
client: cl,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvprotection"),
|
||||
}
|
||||
if cl != nil && cl.CoreV1().RESTClient().GetRateLimiter() != nil {
|
||||
metrics.RegisterMetricAndTrackRateLimiterUsage("persistentvolume_protection_controller", cl.CoreV1().RESTClient().GetRateLimiter())
|
||||
}
|
||||
|
||||
e.pvLister = pvInformer.Lister()
|
||||
e.pvListerSynced = pvInformer.Informer().HasSynced
|
||||
pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: e.pvAddedUpdated,
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
e.pvAddedUpdated(new)
|
||||
},
|
||||
})
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
// Run runs the controller goroutines.
|
||||
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
|
||||
glog.Infof("Starting PV protection controller")
|
||||
defer glog.Infof("Shutting down PV protection controller")
|
||||
|
||||
if !controller.WaitForCacheSync("PV protection", stopCh, c.pvListerSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (c *Controller) runWorker() {
|
||||
for c.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
// processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit.
|
||||
func (c *Controller) processNextWorkItem() bool {
|
||||
pvKey, quit := c.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer c.queue.Done(pvKey)
|
||||
|
||||
pvName := pvKey.(string)
|
||||
|
||||
err := c.processPV(pvName)
|
||||
if err == nil {
|
||||
c.queue.Forget(pvKey)
|
||||
return true
|
||||
}
|
||||
|
||||
utilruntime.HandleError(fmt.Errorf("PV %v failed with : %v", pvKey, err))
|
||||
c.queue.AddRateLimited(pvKey)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *Controller) processPV(pvName string) error {
|
||||
glog.V(4).Infof("Processing PV %s", pvName)
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
glog.V(4).Infof("Finished processing PV %s (%v)", pvName, time.Now().Sub(startTime))
|
||||
}()
|
||||
|
||||
pv, err := c.pvLister.Get(pvName)
|
||||
if apierrs.IsNotFound(err) {
|
||||
glog.V(4).Infof("PV %s not found, ignoring", pvName)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if isDeletionCandidate(pv) {
|
||||
// PV should be deleted. Check if it's used and remove finalizer if
|
||||
// it's not.
|
||||
isUsed := c.isBeingUsed(pv)
|
||||
if !isUsed {
|
||||
return c.removeFinalizer(pv)
|
||||
}
|
||||
}
|
||||
|
||||
if needToAddFinalizer(pv) {
|
||||
// PV is not being deleted -> it should have the finalizer. The
|
||||
// finalizer should be added by admission plugin, this is just to add
|
||||
// the finalizer to old PVs that were created before the admission
|
||||
// plugin was enabled.
|
||||
return c.addFinalizer(pv)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) addFinalizer(pv *v1.PersistentVolume) error {
|
||||
pvClone := pv.DeepCopy()
|
||||
pvClone.ObjectMeta.Finalizers = append(pvClone.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer)
|
||||
_, err := c.client.CoreV1().PersistentVolumes().Update(pvClone)
|
||||
if err != nil {
|
||||
glog.V(3).Infof("Error adding protection finalizer to PV %s: %v", pv.Name)
|
||||
return err
|
||||
}
|
||||
glog.V(3).Infof("Added protection finalizer to PV %s", pv.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) removeFinalizer(pv *v1.PersistentVolume) error {
|
||||
pvClone := pv.DeepCopy()
|
||||
pvClone.ObjectMeta.Finalizers = slice.RemoveString(pvClone.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer, nil)
|
||||
_, err := c.client.CoreV1().PersistentVolumes().Update(pvClone)
|
||||
if err != nil {
|
||||
glog.V(3).Infof("Error removing protection finalizer from PV %s: %v", pv.Name, err)
|
||||
return err
|
||||
}
|
||||
glog.V(3).Infof("Removed protection finalizer from PV %s", pv.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) isBeingUsed(pv *v1.PersistentVolume) bool {
|
||||
// check if PV is being bound to a PVC by its status
|
||||
// the status will be updated by PV controller
|
||||
if pv.Status.Phase == v1.VolumeBound {
|
||||
// the PV is being used now
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// pvAddedUpdated reacts to pv added/updated events
|
||||
func (c *Controller) pvAddedUpdated(obj interface{}) {
|
||||
pv, ok := obj.(*v1.PersistentVolume)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("PV informer returned non-PV object: %#v", obj))
|
||||
return
|
||||
}
|
||||
glog.V(4).Infof("Got event on PV %s", pv.Name)
|
||||
|
||||
if needToAddFinalizer(pv) || isDeletionCandidate(pv) {
|
||||
c.queue.Add(pv.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func isDeletionCandidate(pv *v1.PersistentVolume) bool {
|
||||
return pv.ObjectMeta.DeletionTimestamp != nil && slice.ContainsString(pv.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer, nil)
|
||||
}
|
||||
|
||||
func needToAddFinalizer(pv *v1.PersistentVolume) bool {
|
||||
return pv.ObjectMeta.DeletionTimestamp == nil && !slice.ContainsString(pv.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer, nil)
|
||||
}
|
|
@ -0,0 +1,257 @@
|
|||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package pvprotection
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
|
||||
const defaultPVName = "default-pv"
|
||||
|
||||
type reaction struct {
|
||||
verb string
|
||||
resource string
|
||||
reactorfn clienttesting.ReactionFunc
|
||||
}
|
||||
|
||||
func pv() *v1.PersistentVolume {
|
||||
return &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: defaultPVName,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func boundPV() *v1.PersistentVolume {
|
||||
return &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: defaultPVName,
|
||||
},
|
||||
Status: v1.PersistentVolumeStatus{
|
||||
Phase: v1.VolumeBound,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func withProtectionFinalizer(pv *v1.PersistentVolume) *v1.PersistentVolume {
|
||||
pv.Finalizers = append(pv.Finalizers, volumeutil.PVProtectionFinalizer)
|
||||
return pv
|
||||
}
|
||||
|
||||
func generateUpdateErrorFunc(t *testing.T, failures int) clienttesting.ReactionFunc {
|
||||
i := 0
|
||||
return func(action clienttesting.Action) (bool, runtime.Object, error) {
|
||||
i++
|
||||
if i <= failures {
|
||||
// Update fails
|
||||
update, ok := action.(clienttesting.UpdateAction)
|
||||
|
||||
if !ok {
|
||||
t.Fatalf("Reactor got non-update action: %+v", action)
|
||||
}
|
||||
acc, _ := meta.Accessor(update.GetObject())
|
||||
return true, nil, apierrors.NewForbidden(update.GetResource().GroupResource(), acc.GetName(), errors.New("Mock error"))
|
||||
}
|
||||
// Update succeeds
|
||||
return false, nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
func deleted(pv *v1.PersistentVolume) *v1.PersistentVolume {
|
||||
pv.DeletionTimestamp = &metav1.Time{}
|
||||
return pv
|
||||
}
|
||||
|
||||
func TestPVProtectionController(t *testing.T) {
|
||||
pvVer := schema.GroupVersionResource{
|
||||
Group: v1.GroupName,
|
||||
Version: "v1",
|
||||
Resource: "persistentvolumes",
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
// Object to insert into fake kubeclient before the test starts.
|
||||
initialObjects []runtime.Object
|
||||
// Optional client reactors.
|
||||
reactors []reaction
|
||||
// PV event to simulate. This PV will be automatically added to
|
||||
// initalObjects.
|
||||
updatedPV *v1.PersistentVolume
|
||||
// List of expected kubeclient actions that should happen during the
|
||||
// test.
|
||||
expectedActions []clienttesting.Action
|
||||
}{
|
||||
// PV events
|
||||
//
|
||||
{
|
||||
name: "PV without finalizer -> finalizer is added",
|
||||
updatedPV: pv(),
|
||||
expectedActions: []clienttesting.Action{
|
||||
clienttesting.NewUpdateAction(pvVer, "", withProtectionFinalizer(pv())),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "PVC with finalizer -> no action",
|
||||
updatedPV: withProtectionFinalizer(pv()),
|
||||
expectedActions: []clienttesting.Action{},
|
||||
},
|
||||
{
|
||||
name: "saving PVC finalizer fails -> controller retries",
|
||||
updatedPV: pv(),
|
||||
reactors: []reaction{
|
||||
{
|
||||
verb: "update",
|
||||
resource: "persistentvolumes",
|
||||
reactorfn: generateUpdateErrorFunc(t, 2 /* update fails twice*/),
|
||||
},
|
||||
},
|
||||
expectedActions: []clienttesting.Action{
|
||||
// This fails
|
||||
clienttesting.NewUpdateAction(pvVer, "", withProtectionFinalizer(pv())),
|
||||
// This fails too
|
||||
clienttesting.NewUpdateAction(pvVer, "", withProtectionFinalizer(pv())),
|
||||
// This succeeds
|
||||
clienttesting.NewUpdateAction(pvVer, "", withProtectionFinalizer(pv())),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "deleted PV with finalizer -> finalizer is removed",
|
||||
updatedPV: deleted(withProtectionFinalizer(pv())),
|
||||
expectedActions: []clienttesting.Action{
|
||||
clienttesting.NewUpdateAction(pvVer, "", deleted(pv())),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "finalizer removal fails -> controller retries",
|
||||
updatedPV: deleted(withProtectionFinalizer(pv())),
|
||||
reactors: []reaction{
|
||||
{
|
||||
verb: "update",
|
||||
resource: "persistentvolumes",
|
||||
reactorfn: generateUpdateErrorFunc(t, 2 /* update fails twice*/),
|
||||
},
|
||||
},
|
||||
expectedActions: []clienttesting.Action{
|
||||
// Fails
|
||||
clienttesting.NewUpdateAction(pvVer, "", deleted(pv())),
|
||||
// Fails too
|
||||
clienttesting.NewUpdateAction(pvVer, "", deleted(pv())),
|
||||
// Succeeds
|
||||
clienttesting.NewUpdateAction(pvVer, "", deleted(pv())),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "deleted PVC with finalizer + PV is bound -> finalizer is not removed",
|
||||
updatedPV: deleted(withProtectionFinalizer(boundPV())),
|
||||
expectedActions: []clienttesting.Action{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
// Create client with initial data
|
||||
objs := test.initialObjects
|
||||
if test.updatedPV != nil {
|
||||
objs = append(objs, test.updatedPV)
|
||||
}
|
||||
|
||||
client := fake.NewSimpleClientset(objs...)
|
||||
|
||||
// Create informers
|
||||
informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||
pvInformer := informers.Core().V1().PersistentVolumes()
|
||||
|
||||
// Populate the informers with initial objects so the controller can
|
||||
// Get() it.
|
||||
for _, obj := range objs {
|
||||
switch obj.(type) {
|
||||
case *v1.PersistentVolume:
|
||||
pvInformer.Informer().GetStore().Add(obj)
|
||||
default:
|
||||
t.Fatalf("Unknown initalObject type: %+v", obj)
|
||||
}
|
||||
}
|
||||
|
||||
// Add reactor to inject test errors.
|
||||
for _, reactor := range test.reactors {
|
||||
client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorfn)
|
||||
}
|
||||
|
||||
// Create the controller
|
||||
ctrl := NewPVProtectionController(pvInformer, client)
|
||||
|
||||
// Start the test by simulating an event
|
||||
if test.updatedPV != nil {
|
||||
ctrl.pvAddedUpdated(test.updatedPV)
|
||||
}
|
||||
|
||||
// Process the controller queue until we get expected results
|
||||
timeout := time.Now().Add(10 * time.Second)
|
||||
lastReportedActionCount := 0
|
||||
for {
|
||||
if time.Now().After(timeout) {
|
||||
t.Errorf("Test %q: timed out", test.name)
|
||||
break
|
||||
}
|
||||
if ctrl.queue.Len() > 0 {
|
||||
glog.V(5).Infof("Test %q: %d events queue, processing one", test.name, ctrl.queue.Len())
|
||||
ctrl.processNextWorkItem()
|
||||
}
|
||||
if ctrl.queue.Len() > 0 {
|
||||
// There is still some work in the queue, process it now
|
||||
continue
|
||||
}
|
||||
currentActionCount := len(client.Actions())
|
||||
if currentActionCount < len(test.expectedActions) {
|
||||
// Do not log evey wait, only when the action count changes.
|
||||
if lastReportedActionCount < currentActionCount {
|
||||
glog.V(5).Infof("Test %q: got %d actions out of %d, waiting for the rest", test.name, currentActionCount, len(test.expectedActions))
|
||||
lastReportedActionCount = currentActionCount
|
||||
}
|
||||
// The test expected more to happen, wait for the actions.
|
||||
// Most probably it's exponential backoff
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
actions := client.Actions()
|
||||
|
||||
if !reflect.DeepEqual(actions, test.expectedActions) {
|
||||
t.Errorf("Test %q: action not expected\nExpected:\n%s\ngot:\n%s", test.name, spew.Sdump(test.expectedActions), spew.Sdump(actions))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue