mirror of https://github.com/k3s-io/k3s
Merge pull request #46967 from deads2k/crd-11-ns-wait
Automatic merge from submit-queue (batch tested with PRs 46967, 46992, 43338, 46717, 46672) deflake CRD watch tests Fixes https://github.com/kubernetes/kubernetes/issues/46446. Again... This flake window is caused by the watch cache starting late. This pull updates the test to do fancy list/create/watch/delete semantics to catch the problem. In the field, this should be treated the same as any other "resourceVersion tool old" error and handled with a list/watch. The test cannot be level driven, it is actually testing the edge behavior, so we have to do something weird like this. @sttts @liggitt let's try this again...pull/6/head
commit
f1dfda1fe6
|
@ -275,17 +275,6 @@ func (r *crdHandler) getServingInfoFor(crd *apiextensions.CustomResourceDefiniti
|
|||
r.restOptionsGetter,
|
||||
)
|
||||
|
||||
// When new REST storage is created, the storage cacher for the CR starts asynchronously.
|
||||
// REST API operations return like list use the RV of etcd, but the storage cacher's reflector's list
|
||||
// can get a different RV because etcd can be touched in between the initial list operation (if that's what you're doing first)
|
||||
// and the storage cache reflector starting.
|
||||
// Later, you can issue a watch with the REST apis list.RV and end up earlier than the storage cacher.
|
||||
// The time window is really narrow, but it can happen. The simplest "solution" is to wait
|
||||
// briefly for the storage cache to start before we return out new storage so its more likely that we'll have valid
|
||||
// resource versions for the watch cache. We don't expose cache status outside of the caching layer
|
||||
// so I can't think of way to determine it reliably.
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
parameterScheme := runtime.NewScheme()
|
||||
parameterScheme.AddUnversionedTypes(schema.GroupVersion{Group: crd.Spec.Group, Version: crd.Spec.Version},
|
||||
&metav1.ListOptions{},
|
||||
|
|
|
@ -17,10 +17,12 @@ go_library(
|
|||
deps = [
|
||||
"//vendor/github.com/pborman/uuid:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
|
||||
"//vendor/k8s.io/client-go/dynamic:go_default_library",
|
||||
|
|
|
@ -17,13 +17,16 @@ limitations under the License.
|
|||
package testserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/dynamic"
|
||||
apiextensionsv1beta1 "k8s.io/kube-apiextensions-server/pkg/apis/apiextensions/v1beta1"
|
||||
"k8s.io/kube-apiextensions-server/pkg/client/clientset/clientset"
|
||||
|
@ -145,9 +148,87 @@ func CreateNewCustomResourceDefinition(crd *apiextensionsv1beta1.CustomResourceD
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// This is only for a test. We need the watch cache to have a resource version that works for the test.
|
||||
// When new REST storage is created, the storage cacher for the CR starts asynchronously.
|
||||
// REST API operations return like list use the RV of etcd, but the storage cacher's reflector's list
|
||||
// can get a different RV because etcd can be touched in between the initial list operation (if that's what you're doing first)
|
||||
// and the storage cache reflector starting.
|
||||
// Later, you can issue a watch with the REST apis list.RV and end up earlier than the storage cacher.
|
||||
// The general working model is that if you get a "resourceVersion to old" message, you re-list and rewatch.
|
||||
// For this test, we'll actually cycle, "list/watch/create/delete" until we get an RV from list that observes the create and not an error.
|
||||
// This way all the tests that are checking for watches don't have to worry about RV too old problems because crazy things *could* happen
|
||||
// before like the created RV could be too old to watch.
|
||||
var primingErr error
|
||||
wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
|
||||
primingErr = checkForWatchCachePrimed(crd, dynamicClient)
|
||||
if primingErr == nil {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if primingErr != nil {
|
||||
return nil, primingErr
|
||||
}
|
||||
|
||||
return dynamicClient, nil
|
||||
}
|
||||
|
||||
func checkForWatchCachePrimed(crd *apiextensionsv1beta1.CustomResourceDefinition, dynamicClient *dynamic.Client) error {
|
||||
ns := ""
|
||||
if crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped {
|
||||
ns = "aval"
|
||||
}
|
||||
resourceClient := dynamicClient.Resource(&metav1.APIResource{
|
||||
Name: crd.Spec.Names.Plural,
|
||||
Namespaced: crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped,
|
||||
}, ns)
|
||||
initialList, err := resourceClient.List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
initialListListMeta, err := meta.ListAccessor(initialList)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
instanceName := "foo"
|
||||
instance := &unstructured.Unstructured{
|
||||
Object: map[string]interface{}{
|
||||
"apiVersion": crd.Spec.Group + "/" + crd.Spec.Version,
|
||||
"kind": crd.Spec.Names.Kind,
|
||||
"metadata": map[string]interface{}{
|
||||
"namespace": ns,
|
||||
"name": instanceName,
|
||||
},
|
||||
},
|
||||
}
|
||||
if _, err := resourceClient.Create(instance); err != nil {
|
||||
return err
|
||||
}
|
||||
// we created something, clean it up
|
||||
defer func() {
|
||||
resourceClient.Delete(instanceName, nil)
|
||||
}()
|
||||
|
||||
noxuWatch, err := resourceClient.Watch(metav1.ListOptions{ResourceVersion: initialListListMeta.GetResourceVersion()})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer noxuWatch.Stop()
|
||||
|
||||
select {
|
||||
case watchEvent := <-noxuWatch.ResultChan():
|
||||
if watch.Added == watchEvent.Type {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("expected add, but got %#v", watchEvent)
|
||||
|
||||
case <-time.After(5 * time.Second):
|
||||
return fmt.Errorf("gave up waiting for watch event")
|
||||
}
|
||||
}
|
||||
|
||||
func DeleteCustomResourceDefinition(crd *apiextensionsv1beta1.CustomResourceDefinition, apiExtensionsClient clientset.Interface) error {
|
||||
if err := apiExtensionsClient.Apiextensions().CustomResourceDefinitions().Delete(crd.Name, nil); err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue