Merge pull request #65288 from mbohlool/crd_flaky_test

Automatic merge from submit-queue (batch tested with PRs 64122, 64936, 65288, 65383). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Consume watch event for all versions of CRD

The new test code to fix flaky test were considered CRD version but failed to consume watch cache event in all versions.

Fixes #64571

@sttts @liggitt
pull/8/head
Kubernetes Submit Queue 2018-06-22 19:03:19 -07:00 committed by GitHub
commit 21912b396c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 47 additions and 30 deletions

View File

@ -258,35 +258,41 @@ func CreateNewCustomResourceDefinition(crd *apiextensionsv1beta1.CustomResourceD
// For this test, we'll actually cycle, "list/watch/create/delete" until we get an RV from list that observes the create and not an error.
// This way all the tests that are checking for watches don't have to worry about RV too old problems because crazy things *could* happen
// before like the created RV could be too old to watch.
for _, version := range servedVersions(crd) {
err := wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
return isWatchCachePrimed(crd, dynamicClientSet, version)
})
if err != nil {
return nil, err
}
err = wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
return isWatchCachePrimed(crd, dynamicClientSet)
})
if err != nil {
return nil, err
}
return crd, nil
}
func resourceClientForVersion(crd *apiextensionsv1beta1.CustomResourceDefinition, dynamicClientSet dynamic.Interface, namespace, version string) dynamic.ResourceInterface {
gvr := schema.GroupVersionResource{Group: crd.Spec.Group, Version: version, Resource: crd.Spec.Names.Plural}
if crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped {
return dynamicClientSet.Resource(gvr).Namespace(namespace)
} else {
return dynamicClientSet.Resource(gvr)
}
}
// isWatchCachePrimed returns true if the watch is primed for an specified version of CRD watch
func isWatchCachePrimed(crd *apiextensionsv1beta1.CustomResourceDefinition, dynamicClientSet dynamic.Interface, version string) (bool, error) {
func isWatchCachePrimed(crd *apiextensionsv1beta1.CustomResourceDefinition, dynamicClientSet dynamic.Interface) (bool, error) {
ns := ""
if crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped {
ns = "aval"
}
gvr := schema.GroupVersionResource{Group: crd.Spec.Group, Version: version, Resource: crd.Spec.Names.Plural}
var resourceClient dynamic.ResourceInterface
if crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped {
resourceClient = dynamicClientSet.Resource(gvr).Namespace(ns)
} else {
resourceClient = dynamicClientSet.Resource(gvr)
versions := servedVersions(crd)
if len(versions) == 0 {
return true, nil
}
resourceClient := resourceClientForVersion(crd, dynamicClientSet, ns, versions[0])
instanceName := "setup-instance"
instance := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": crd.Spec.Group + "/" + version,
"apiVersion": crd.Spec.Group + "/" + versions[0],
"kind": crd.Spec.Names.Kind,
"metadata": map[string]interface{}{
"namespace": ns,
@ -309,24 +315,35 @@ func isWatchCachePrimed(crd *apiextensionsv1beta1.CustomResourceDefinition, dyna
return false, err
}
noxuWatch, err := resourceClient.Watch(metav1.ListOptions{ResourceVersion: createdInstance.GetResourceVersion()})
if err != nil {
return false, err
}
defer noxuWatch.Stop()
// Wait for all versions of watch cache to be primed and also make sure we consumed the DELETE event for all
// versions so that any new watch with ResourceVersion=0 does not get those events. This is source of some flaky tests.
// When a client creates a watch with resourceVersion=0, it will get an ADD event for any existing objects
// but because they specified resourceVersion=0, there is no starting point in the cache buffer to return existing events
// from, thus the server will return anything from current head of the cache to the end. By accessing the delete
// events for all versions here, we make sure that the head of the cache is passed those events and they will not being
// delivered to any future watch with resourceVersion=0.
for _, v := range versions {
noxuWatch, err := resourceClientForVersion(crd, dynamicClientSet, ns, v).Watch(
metav1.ListOptions{ResourceVersion: createdInstance.GetResourceVersion()})
if err != nil {
return false, err
}
defer noxuWatch.Stop()
select {
case watchEvent := <-noxuWatch.ResultChan():
if watch.Error == watchEvent.Type {
return false, nil
select {
case watchEvent := <-noxuWatch.ResultChan():
if watch.Error == watchEvent.Type {
return false, nil
}
if watch.Deleted != watchEvent.Type {
return false, fmt.Errorf("expected DELETE, but got %#v", watchEvent)
}
case <-time.After(5 * time.Second):
return false, fmt.Errorf("gave up waiting for watch event")
}
if watch.Deleted != watchEvent.Type {
return false, fmt.Errorf("expected DELETE, but got %#v", watchEvent)
}
return true, nil
case <-time.After(5 * time.Second):
return false, fmt.Errorf("gave up waiting for watch event")
}
return true, nil
}
func DeleteCustomResourceDefinition(crd *apiextensionsv1beta1.CustomResourceDefinition, apiExtensionsClient clientset.Interface) error {