mirror of https://github.com/k3s-io/k3s
Add CRD integration test for dropping watches
parent
d304c9ecbb
commit
ea464235a8
|
@ -27,6 +27,8 @@ import (
|
||||||
"k8s.io/apiextensions-apiserver/test/integration/fixtures"
|
"k8s.io/apiextensions-apiserver/test/integration/fixtures"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/client-go/dynamic"
|
"k8s.io/client-go/dynamic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -91,7 +93,7 @@ func TestChangeCRD(t *testing.T) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Set up 100 loops creating and reading custom resources
|
// Set up 100 loops creating and reading and watching custom resources
|
||||||
for i := 0; i < 100; i++ {
|
for i := 0; i < 100; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
|
@ -112,6 +114,31 @@ func TestChangeCRD(t *testing.T) {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
}
|
}
|
||||||
}(i)
|
}(i)
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-stopChan:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
w, err := noxuNamespacedResourceClient.Watch(metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error establishing watch: %v", err)
|
||||||
|
}
|
||||||
|
for event := range w.ResultChan() {
|
||||||
|
switch event.Type {
|
||||||
|
case watch.Added, watch.Modified, watch.Deleted:
|
||||||
|
// all expected
|
||||||
|
default:
|
||||||
|
t.Errorf("unexpected watch event: %#v", event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let all the established get request loops soak
|
// Let all the established get request loops soak
|
||||||
|
@ -121,5 +148,15 @@ func TestChangeCRD(t *testing.T) {
|
||||||
close(stopChan)
|
close(stopChan)
|
||||||
|
|
||||||
// Let loops drain
|
// Let loops drain
|
||||||
wg.Wait()
|
drained := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(drained)
|
||||||
|
wg.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-drained:
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Error("timed out waiting for clients to complete")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue