diff --git a/staging/src/k8s.io/kube-apiextensions-server/pkg/apiserver/customresource_handler.go b/staging/src/k8s.io/kube-apiextensions-server/pkg/apiserver/customresource_handler.go index 435d21c4f6..9c82edd09b 100644 --- a/staging/src/k8s.io/kube-apiextensions-server/pkg/apiserver/customresource_handler.go +++ b/staging/src/k8s.io/kube-apiextensions-server/pkg/apiserver/customresource_handler.go @@ -275,17 +275,6 @@ func (r *crdHandler) getServingInfoFor(crd *apiextensions.CustomResourceDefiniti r.restOptionsGetter, ) - // When new REST storage is created, the storage cacher for the CR starts asynchronously. - // REST API operations return like list use the RV of etcd, but the storage cacher's reflector's list - // can get a different RV because etcd can be touched in between the initial list operation (if that's what you're doing first) - // and the storage cache reflector starting. - // Later, you can issue a watch with the REST apis list.RV and end up earlier than the storage cacher. - // The time window is really narrow, but it can happen. The simplest "solution" is to wait - // briefly for the storage cache to start before we return out new storage so its more likely that we'll have valid - // resource versions for the watch cache. We don't expose cache status outside of the caching layer - // so I can't think of way to determine it reliably. - time.Sleep(1 * time.Second) - parameterScheme := runtime.NewScheme() parameterScheme.AddUnversionedTypes(schema.GroupVersion{Group: crd.Spec.Group, Version: crd.Spec.Version}, &metav1.ListOptions{}, diff --git a/staging/src/k8s.io/kube-apiextensions-server/test/integration/testserver/BUILD b/staging/src/k8s.io/kube-apiextensions-server/test/integration/testserver/BUILD index dc39a16b28..5ffc1e193b 100644 --- a/staging/src/k8s.io/kube-apiextensions-server/test/integration/testserver/BUILD +++ b/staging/src/k8s.io/kube-apiextensions-server/test/integration/testserver/BUILD @@ -17,10 +17,12 @@ go_library( deps = [ "//vendor/github.com/pborman/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library", "//vendor/k8s.io/apiserver/pkg/server:go_default_library", "//vendor/k8s.io/client-go/dynamic:go_default_library", diff --git a/staging/src/k8s.io/kube-apiextensions-server/test/integration/testserver/resources.go b/staging/src/k8s.io/kube-apiextensions-server/test/integration/testserver/resources.go index 262631e7a0..c9a1a82305 100644 --- a/staging/src/k8s.io/kube-apiextensions-server/test/integration/testserver/resources.go +++ b/staging/src/k8s.io/kube-apiextensions-server/test/integration/testserver/resources.go @@ -17,13 +17,16 @@ limitations under the License. package testserver import ( + "fmt" "time" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" apiextensionsv1beta1 "k8s.io/kube-apiextensions-server/pkg/apis/apiextensions/v1beta1" "k8s.io/kube-apiextensions-server/pkg/client/clientset/clientset" @@ -145,9 +148,87 @@ func CreateNewCustomResourceDefinition(crd *apiextensionsv1beta1.CustomResourceD if err != nil { return nil, err } + + // This is only for a test. We need the watch cache to have a resource version that works for the test. + // When new REST storage is created, the storage cacher for the CR starts asynchronously. + // REST API operations return like list use the RV of etcd, but the storage cacher's reflector's list + // can get a different RV because etcd can be touched in between the initial list operation (if that's what you're doing first) + // and the storage cache reflector starting. + // Later, you can issue a watch with the REST apis list.RV and end up earlier than the storage cacher. + // The general working model is that if you get a "resourceVersion to old" message, you re-list and rewatch. + // For this test, we'll actually cycle, "list/watch/create/delete" until we get an RV from list that observes the create and not an error. + // This way all the tests that are checking for watches don't have to worry about RV too old problems because crazy things *could* happen + // before like the created RV could be too old to watch. + var primingErr error + wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) { + primingErr = checkForWatchCachePrimed(crd, dynamicClient) + if primingErr == nil { + return true, nil + } + return false, nil + }) + if primingErr != nil { + return nil, primingErr + } + return dynamicClient, nil } +func checkForWatchCachePrimed(crd *apiextensionsv1beta1.CustomResourceDefinition, dynamicClient *dynamic.Client) error { + ns := "" + if crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped { + ns = "aval" + } + resourceClient := dynamicClient.Resource(&metav1.APIResource{ + Name: crd.Spec.Names.Plural, + Namespaced: crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped, + }, ns) + initialList, err := resourceClient.List(metav1.ListOptions{}) + if err != nil { + return err + } + initialListListMeta, err := meta.ListAccessor(initialList) + if err != nil { + return err + } + + instanceName := "foo" + instance := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": crd.Spec.Group + "/" + crd.Spec.Version, + "kind": crd.Spec.Names.Kind, + "metadata": map[string]interface{}{ + "namespace": ns, + "name": instanceName, + }, + }, + } + if _, err := resourceClient.Create(instance); err != nil { + return err + } + // we created something, clean it up + defer func() { + resourceClient.Delete(instanceName, nil) + }() + + noxuWatch, err := resourceClient.Watch(metav1.ListOptions{ResourceVersion: initialListListMeta.GetResourceVersion()}) + if err != nil { + return err + } + defer noxuWatch.Stop() + + select { + case watchEvent := <-noxuWatch.ResultChan(): + if watch.Added == watchEvent.Type { + return nil + } + return fmt.Errorf("expected add, but got %#v", watchEvent) + + case <-time.After(5 * time.Second): + return fmt.Errorf("gave up waiting for watch event") + } +} + func DeleteCustomResourceDefinition(crd *apiextensionsv1beta1.CustomResourceDefinition, apiExtensionsClient clientset.Interface) error { if err := apiExtensionsClient.Apiextensions().CustomResourceDefinitions().Delete(crd.Name, nil); err != nil { return err