2015-07-28 06:26:53 +00:00
|
|
|
/*
|
2016-06-03 00:25:58 +00:00
|
|
|
Copyright 2015 The Kubernetes Authors.
|
2015-07-28 06:26:53 +00:00
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package storage_test
|
|
|
|
|
|
|
|
import (
|
2016-02-20 01:45:02 +00:00
|
|
|
"fmt"
|
2015-07-28 06:26:53 +00:00
|
|
|
"reflect"
|
2016-04-07 23:53:41 +00:00
|
|
|
goruntime "runtime"
|
2015-11-02 13:51:56 +00:00
|
|
|
"strconv"
|
2015-07-28 06:26:53 +00:00
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"k8s.io/kubernetes/pkg/api"
|
2016-05-09 21:01:20 +00:00
|
|
|
"k8s.io/kubernetes/pkg/api/errors"
|
2015-08-14 10:14:09 +00:00
|
|
|
"k8s.io/kubernetes/pkg/api/meta"
|
2015-07-28 06:26:53 +00:00
|
|
|
"k8s.io/kubernetes/pkg/api/testapi"
|
2015-09-14 21:56:51 +00:00
|
|
|
apitesting "k8s.io/kubernetes/pkg/api/testing"
|
2016-12-03 18:57:26 +00:00
|
|
|
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
|
2016-08-23 03:41:21 +00:00
|
|
|
"k8s.io/kubernetes/pkg/fields"
|
2015-08-14 10:14:09 +00:00
|
|
|
"k8s.io/kubernetes/pkg/labels"
|
2016-11-16 09:19:55 +00:00
|
|
|
corepod "k8s.io/kubernetes/pkg/registry/core/pod"
|
2015-07-28 06:26:53 +00:00
|
|
|
"k8s.io/kubernetes/pkg/runtime"
|
|
|
|
"k8s.io/kubernetes/pkg/storage"
|
|
|
|
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
2015-11-10 11:23:51 +00:00
|
|
|
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
|
2015-11-04 04:11:31 +00:00
|
|
|
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
|
2016-11-13 04:48:19 +00:00
|
|
|
"k8s.io/kubernetes/pkg/storage/etcd3"
|
2015-09-09 17:45:01 +00:00
|
|
|
"k8s.io/kubernetes/pkg/util/sets"
|
2016-02-02 10:57:06 +00:00
|
|
|
"k8s.io/kubernetes/pkg/util/wait"
|
2015-07-28 06:26:53 +00:00
|
|
|
"k8s.io/kubernetes/pkg/watch"
|
2015-10-09 14:49:01 +00:00
|
|
|
|
|
|
|
"golang.org/x/net/context"
|
2015-07-28 06:26:53 +00:00
|
|
|
)
|
|
|
|
|
2015-11-04 04:11:31 +00:00
|
|
|
func newEtcdTestStorage(t *testing.T, codec runtime.Codec, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
|
2016-11-13 04:48:19 +00:00
|
|
|
server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
|
|
|
|
storage := etcd3.New(server.V3Client, codec, prefix)
|
2015-11-04 04:11:31 +00:00
|
|
|
return server, storage
|
|
|
|
}
|
|
|
|
|
2016-11-13 04:48:19 +00:00
|
|
|
func newTestCacher(s storage.Interface, cap int) *storage.Cacher {
|
2015-07-28 06:26:53 +00:00
|
|
|
prefix := "pods"
|
|
|
|
config := storage.CacherConfig{
|
2016-11-13 04:48:19 +00:00
|
|
|
CacheCapacity: cap,
|
2015-11-02 13:51:56 +00:00
|
|
|
Storage: s,
|
2015-10-23 14:13:21 +00:00
|
|
|
Versioner: etcdstorage.APIObjectVersioner{},
|
2015-07-28 06:26:53 +00:00
|
|
|
Type: &api.Pod{},
|
|
|
|
ResourcePrefix: prefix,
|
|
|
|
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
2016-11-16 09:19:55 +00:00
|
|
|
GetAttrsFunc: corepod.GetAttrs,
|
2015-07-28 06:26:53 +00:00
|
|
|
NewListFunc: func() runtime.Object { return &api.PodList{} },
|
2016-08-15 23:54:13 +00:00
|
|
|
Codec: testapi.Default.Codec(),
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
2015-10-30 09:17:09 +00:00
|
|
|
return storage.NewCacherFromConfig(config)
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func makeTestPod(name string) *api.Pod {
|
|
|
|
return &api.Pod{
|
|
|
|
ObjectMeta: api.ObjectMeta{Namespace: "ns", Name: name},
|
2015-09-14 21:56:51 +00:00
|
|
|
Spec: apitesting.DeepEqualSafePodSpec(),
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-11-02 13:51:56 +00:00
|
|
|
func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod {
|
2016-03-25 13:24:58 +00:00
|
|
|
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
|
|
|
newObj, err := api.Scheme.DeepCopy(obj)
|
|
|
|
if err != nil {
|
2015-11-02 13:51:56 +00:00
|
|
|
t.Errorf("unexpected error: %v", err)
|
2016-03-25 13:24:58 +00:00
|
|
|
return nil, nil, err
|
2015-11-02 13:51:56 +00:00
|
|
|
}
|
2016-03-25 13:24:58 +00:00
|
|
|
return newObj.(*api.Pod), nil, nil
|
|
|
|
}
|
2016-11-13 00:04:23 +00:00
|
|
|
key := "pods/" + obj.Namespace + "/" + obj.Name
|
2016-03-25 13:24:58 +00:00
|
|
|
if err := s.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, old == nil, nil, updateFn); err != nil {
|
|
|
|
t.Errorf("unexpected error: %v", err)
|
|
|
|
}
|
|
|
|
obj.ResourceVersion = ""
|
|
|
|
result := &api.Pod{}
|
2016-11-30 13:59:02 +00:00
|
|
|
if err := s.Get(context.TODO(), key, "", result, false); err != nil {
|
2016-03-25 13:24:58 +00:00
|
|
|
t.Errorf("unexpected error: %v", err)
|
2015-11-02 13:51:56 +00:00
|
|
|
}
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
2016-11-30 13:59:02 +00:00
|
|
|
func TestGet(t *testing.T) {
|
|
|
|
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
|
|
|
defer server.Terminate(t)
|
|
|
|
cacher := newTestCacher(etcdStorage, 10)
|
|
|
|
defer cacher.Stop()
|
|
|
|
|
|
|
|
podFoo := makeTestPod("foo")
|
|
|
|
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
|
|
|
|
|
|
|
// We pass the ResourceVersion from the above Create() operation.
|
|
|
|
result := &api.Pod{}
|
|
|
|
if err := cacher.Get(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, result, true); err != nil {
|
|
|
|
t.Errorf("Unexpected error: %v", err)
|
|
|
|
}
|
|
|
|
if e, a := *fooCreated, *result; !reflect.DeepEqual(e, a) {
|
|
|
|
t.Errorf("Expected: %#v, got: %#v", e, a)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, true); err != nil {
|
|
|
|
t.Errorf("Unexpected error: %v", err)
|
|
|
|
}
|
|
|
|
emptyPod := api.Pod{}
|
|
|
|
if e, a := emptyPod, *result; !reflect.DeepEqual(e, a) {
|
|
|
|
t.Errorf("Expected: %#v, got: %#v", e, a)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, false); !storage.IsNotFound(err) {
|
|
|
|
t.Errorf("Unexpected error: %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-10-23 14:13:21 +00:00
|
|
|
func TestList(t *testing.T) {
|
2015-11-04 04:11:31 +00:00
|
|
|
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
2015-11-02 13:51:56 +00:00
|
|
|
defer server.Terminate(t)
|
2016-11-13 04:48:19 +00:00
|
|
|
cacher := newTestCacher(etcdStorage, 10)
|
2015-12-28 09:35:12 +00:00
|
|
|
defer cacher.Stop()
|
2015-07-28 06:26:53 +00:00
|
|
|
|
|
|
|
podFoo := makeTestPod("foo")
|
|
|
|
podBar := makeTestPod("bar")
|
|
|
|
podBaz := makeTestPod("baz")
|
|
|
|
|
|
|
|
podFooPrime := makeTestPod("foo")
|
|
|
|
podFooPrime.Spec.NodeName = "fakeNode"
|
|
|
|
|
2015-11-02 13:51:56 +00:00
|
|
|
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
|
|
|
_ = updatePod(t, etcdStorage, podBar, nil)
|
|
|
|
_ = updatePod(t, etcdStorage, podBaz, nil)
|
|
|
|
|
|
|
|
_ = updatePod(t, etcdStorage, podFooPrime, fooCreated)
|
2015-07-28 06:26:53 +00:00
|
|
|
|
2016-07-14 02:21:25 +00:00
|
|
|
// Create a pod in a namespace that contains "ns" as a prefix
|
|
|
|
// Make sure it is not returned in a watch of "ns"
|
|
|
|
podFooNS2 := makeTestPod("foo")
|
|
|
|
podFooNS2.Namespace += "2"
|
|
|
|
updatePod(t, etcdStorage, podFooNS2, nil)
|
|
|
|
|
2015-11-02 13:51:56 +00:00
|
|
|
deleted := api.Pod{}
|
2016-11-13 00:04:23 +00:00
|
|
|
if err := etcdStorage.Delete(context.TODO(), "pods/ns/bar", &deleted, nil); err != nil {
|
2015-11-02 13:51:56 +00:00
|
|
|
t.Errorf("Unexpected error: %v", err)
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
|
|
|
|
2016-02-18 08:56:08 +00:00
|
|
|
// We first List directly from etcd by passing empty resourceVersion,
|
|
|
|
// to get the current etcd resourceVersion.
|
|
|
|
rvResult := &api.PodList{}
|
|
|
|
if err := cacher.List(context.TODO(), "pods/ns", "", storage.Everything, rvResult); err != nil {
|
|
|
|
t.Errorf("Unexpected error: %v", err)
|
|
|
|
}
|
|
|
|
deletedPodRV := rvResult.ListMeta.ResourceVersion
|
|
|
|
|
2015-07-28 06:26:53 +00:00
|
|
|
result := &api.PodList{}
|
2016-02-18 08:56:08 +00:00
|
|
|
// We pass the current etcd ResourceVersion received from the above List() operation,
|
|
|
|
// since there is not easy way to get ResourceVersion of barPod deletion operation.
|
|
|
|
if err := cacher.List(context.TODO(), "pods/ns", deletedPodRV, storage.Everything, result); err != nil {
|
2015-11-02 13:51:56 +00:00
|
|
|
t.Errorf("Unexpected error: %v", err)
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
2016-02-18 08:56:08 +00:00
|
|
|
if result.ListMeta.ResourceVersion != deletedPodRV {
|
2015-11-02 13:51:56 +00:00
|
|
|
t.Errorf("Incorrect resource version: %v", result.ListMeta.ResourceVersion)
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
|
|
|
if len(result.Items) != 2 {
|
2015-11-02 13:51:56 +00:00
|
|
|
t.Errorf("Unexpected list result: %d", len(result.Items))
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
2015-09-09 17:45:01 +00:00
|
|
|
keys := sets.String{}
|
2015-07-28 06:26:53 +00:00
|
|
|
for _, item := range result.Items {
|
2015-11-02 13:51:56 +00:00
|
|
|
keys.Insert(item.Name)
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
|
|
|
if !keys.HasAll("foo", "baz") {
|
2015-11-02 13:51:56 +00:00
|
|
|
t.Errorf("Unexpected list result: %#v", result)
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
|
|
|
for _, item := range result.Items {
|
|
|
|
// unset fields that are set by the infrastructure
|
2015-11-02 13:51:56 +00:00
|
|
|
item.ResourceVersion = ""
|
2016-12-03 18:57:26 +00:00
|
|
|
item.CreationTimestamp = metav1.Time{}
|
2015-07-28 06:26:53 +00:00
|
|
|
|
2016-07-14 02:21:25 +00:00
|
|
|
if item.Namespace != "ns" {
|
|
|
|
t.Errorf("Unexpected namespace: %s", item.Namespace)
|
|
|
|
}
|
|
|
|
|
2015-07-28 06:26:53 +00:00
|
|
|
var expected *api.Pod
|
2015-11-02 13:51:56 +00:00
|
|
|
switch item.Name {
|
2015-07-28 06:26:53 +00:00
|
|
|
case "foo":
|
|
|
|
expected = podFooPrime
|
|
|
|
case "baz":
|
|
|
|
expected = podBaz
|
|
|
|
default:
|
2015-11-02 13:51:56 +00:00
|
|
|
t.Errorf("Unexpected item: %v", item)
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
|
|
|
if e, a := *expected, item; !reflect.DeepEqual(e, a) {
|
2015-11-02 13:51:56 +00:00
|
|
|
t.Errorf("Expected: %#v, got: %#v", e, a)
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
|
|
|
}
|
2015-11-02 13:51:56 +00:00
|
|
|
}
|
2015-07-28 06:26:53 +00:00
|
|
|
|
2015-11-02 13:51:56 +00:00
|
|
|
func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
|
2016-04-07 23:53:41 +00:00
|
|
|
_, _, line, _ := goruntime.Caller(1)
|
2015-11-02 13:51:56 +00:00
|
|
|
select {
|
|
|
|
case event := <-w.ResultChan():
|
|
|
|
if e, a := eventType, event.Type; e != a {
|
2016-04-07 23:53:41 +00:00
|
|
|
t.Logf("(called from line %d)", line)
|
2015-11-02 13:51:56 +00:00
|
|
|
t.Errorf("Expected: %s, got: %s", eventType, event.Type)
|
|
|
|
}
|
|
|
|
if e, a := eventObject, event.Object; !api.Semantic.DeepDerivative(e, a) {
|
2016-04-07 23:53:41 +00:00
|
|
|
t.Logf("(called from line %d)", line)
|
2015-11-02 13:51:56 +00:00
|
|
|
t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a)
|
|
|
|
}
|
2016-02-02 10:57:06 +00:00
|
|
|
case <-time.After(wait.ForeverTestTimeout):
|
2016-04-07 23:53:41 +00:00
|
|
|
t.Logf("(called from line %d)", line)
|
2015-11-02 13:51:56 +00:00
|
|
|
t.Errorf("Timed out waiting for an event")
|
|
|
|
}
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
|
|
|
|
2016-02-20 01:45:02 +00:00
|
|
|
type injectListError struct {
|
|
|
|
errors int
|
|
|
|
storage.Interface
|
|
|
|
}
|
|
|
|
|
2016-08-23 03:41:21 +00:00
|
|
|
func (self *injectListError) List(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate, listObj runtime.Object) error {
|
2016-02-20 01:45:02 +00:00
|
|
|
if self.errors > 0 {
|
|
|
|
self.errors--
|
|
|
|
return fmt.Errorf("injected error")
|
|
|
|
}
|
2016-08-23 03:41:21 +00:00
|
|
|
return self.Interface.List(ctx, key, resourceVersion, p, listObj)
|
2016-02-20 01:45:02 +00:00
|
|
|
}
|
|
|
|
|
2015-07-28 06:26:53 +00:00
|
|
|
func TestWatch(t *testing.T) {
|
2015-11-04 04:11:31 +00:00
|
|
|
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
2016-02-20 01:45:02 +00:00
|
|
|
// Inject one list error to make sure we test the relist case.
|
|
|
|
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
|
2015-11-02 13:51:56 +00:00
|
|
|
defer server.Terminate(t)
|
2016-11-13 04:48:19 +00:00
|
|
|
cacher := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error
|
2015-12-28 09:35:12 +00:00
|
|
|
defer cacher.Stop()
|
2015-07-28 06:26:53 +00:00
|
|
|
|
|
|
|
podFoo := makeTestPod("foo")
|
|
|
|
podBar := makeTestPod("bar")
|
|
|
|
|
2015-11-02 13:51:56 +00:00
|
|
|
podFooPrime := makeTestPod("foo")
|
|
|
|
podFooPrime.Spec.NodeName = "fakeNode"
|
|
|
|
|
|
|
|
podFooBis := makeTestPod("foo")
|
|
|
|
podFooBis.Spec.NodeName = "anotherFakeNode"
|
2015-07-28 06:26:53 +00:00
|
|
|
|
2016-07-14 02:21:25 +00:00
|
|
|
podFooNS2 := makeTestPod("foo")
|
|
|
|
podFooNS2.Namespace += "2"
|
|
|
|
|
2015-12-28 14:28:07 +00:00
|
|
|
// initialVersion is used to initate the watcher at the beginning of the world,
|
|
|
|
// which is not defined precisely in etcd.
|
|
|
|
initialVersion, err := cacher.LastSyncResourceVersion()
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
|
|
}
|
|
|
|
startVersion := strconv.Itoa(int(initialVersion))
|
|
|
|
|
2015-07-28 06:26:53 +00:00
|
|
|
// Set up Watch for object "podFoo".
|
2015-12-28 14:28:07 +00:00
|
|
|
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything)
|
2015-07-28 06:26:53 +00:00
|
|
|
if err != nil {
|
2015-11-02 13:51:56 +00:00
|
|
|
t.Fatalf("Unexpected error: %v", err)
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
2015-12-28 09:35:12 +00:00
|
|
|
defer watcher.Stop()
|
2015-07-28 06:26:53 +00:00
|
|
|
|
2016-07-14 02:21:25 +00:00
|
|
|
// Create in another namespace first to make sure events from other namespaces don't get delivered
|
|
|
|
updatePod(t, etcdStorage, podFooNS2, nil)
|
|
|
|
|
2015-11-02 13:51:56 +00:00
|
|
|
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
|
|
|
_ = updatePod(t, etcdStorage, podBar, nil)
|
|
|
|
fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated)
|
|
|
|
|
|
|
|
verifyWatchEvent(t, watcher, watch.Added, podFoo)
|
|
|
|
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
|
2015-07-28 06:26:53 +00:00
|
|
|
|
2016-05-09 21:01:20 +00:00
|
|
|
// Check whether we get too-old error via the watch channel
|
|
|
|
tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Expected no direct error, got %v", err)
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
2016-05-09 21:01:20 +00:00
|
|
|
defer tooOldWatcher.Stop()
|
|
|
|
// Ensure we get a "Gone" error
|
2016-05-14 18:25:47 +00:00
|
|
|
expectedGoneError := errors.NewGone("").ErrStatus
|
2016-05-09 21:01:20 +00:00
|
|
|
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedGoneError)
|
2015-07-28 06:26:53 +00:00
|
|
|
|
2015-12-10 14:03:59 +00:00
|
|
|
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything)
|
2015-11-02 13:51:56 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Unexpected error: %v", err)
|
2015-08-14 10:14:09 +00:00
|
|
|
}
|
2015-12-28 09:35:12 +00:00
|
|
|
defer initialWatcher.Stop()
|
2015-08-14 10:14:09 +00:00
|
|
|
|
2015-11-02 13:51:56 +00:00
|
|
|
verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
|
|
|
|
|
2015-09-14 08:36:13 +00:00
|
|
|
// Now test watch from "now".
|
2015-12-04 08:58:24 +00:00
|
|
|
nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything)
|
2015-09-14 08:36:13 +00:00
|
|
|
if err != nil {
|
2015-11-02 13:51:56 +00:00
|
|
|
t.Fatalf("Unexpected error: %v", err)
|
2015-09-14 08:36:13 +00:00
|
|
|
}
|
2015-12-28 09:35:12 +00:00
|
|
|
defer nowWatcher.Stop()
|
2015-09-14 08:36:13 +00:00
|
|
|
|
2015-11-02 13:51:56 +00:00
|
|
|
verifyWatchEvent(t, nowWatcher, watch.Added, podFooPrime)
|
|
|
|
|
|
|
|
_ = updatePod(t, etcdStorage, podFooBis, fooUpdated)
|
|
|
|
|
|
|
|
verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis)
|
2015-08-14 10:14:09 +00:00
|
|
|
}
|
|
|
|
|
2015-11-06 12:40:21 +00:00
|
|
|
func TestWatcherTimeout(t *testing.T) {
|
|
|
|
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
|
|
|
defer server.Terminate(t)
|
2016-11-13 04:48:19 +00:00
|
|
|
cacher := newTestCacher(etcdStorage, 10)
|
2015-12-28 09:35:12 +00:00
|
|
|
defer cacher.Stop()
|
2015-11-06 12:40:21 +00:00
|
|
|
|
2015-12-28 14:28:07 +00:00
|
|
|
// initialVersion is used to initate the watcher at the beginning of the world,
|
|
|
|
// which is not defined precisely in etcd.
|
|
|
|
initialVersion, err := cacher.LastSyncResourceVersion()
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
|
|
}
|
|
|
|
startVersion := strconv.Itoa(int(initialVersion))
|
|
|
|
|
2016-11-21 08:46:30 +00:00
|
|
|
// Create a number of watchers that will not be reading any result.
|
|
|
|
nonReadingWatchers := 50
|
|
|
|
for i := 0; i < nonReadingWatchers; i++ {
|
|
|
|
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
|
|
}
|
|
|
|
defer watcher.Stop()
|
2015-11-06 12:40:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Create a second watcher that will be reading result.
|
2015-12-28 14:28:07 +00:00
|
|
|
readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
|
2015-11-06 12:40:21 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
|
|
}
|
|
|
|
defer readingWatcher.Stop()
|
|
|
|
|
2016-11-21 08:46:30 +00:00
|
|
|
startTime := time.Now()
|
2015-11-06 12:40:21 +00:00
|
|
|
for i := 1; i <= 22; i++ {
|
|
|
|
pod := makeTestPod(strconv.Itoa(i))
|
|
|
|
_ = updatePod(t, etcdStorage, pod, nil)
|
|
|
|
verifyWatchEvent(t, readingWatcher, watch.Added, pod)
|
|
|
|
}
|
2016-11-21 08:46:30 +00:00
|
|
|
if time.Since(startTime) > time.Duration(250*nonReadingWatchers)*time.Millisecond {
|
|
|
|
t.Errorf("waiting for events took too long: %v", time.Since(startTime))
|
|
|
|
}
|
2015-11-06 12:40:21 +00:00
|
|
|
}
|
|
|
|
|
2015-08-14 10:14:09 +00:00
|
|
|
func TestFiltering(t *testing.T) {
|
2015-11-04 04:11:31 +00:00
|
|
|
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
2015-11-02 13:51:56 +00:00
|
|
|
defer server.Terminate(t)
|
2016-11-13 04:48:19 +00:00
|
|
|
cacher := newTestCacher(etcdStorage, 10)
|
2015-12-28 09:35:12 +00:00
|
|
|
defer cacher.Stop()
|
2015-08-14 10:14:09 +00:00
|
|
|
|
2015-12-31 06:53:41 +00:00
|
|
|
// Ensure that the cacher is initialized, before creating any pods,
|
|
|
|
// so that we are sure that all events will be present in cacher.
|
|
|
|
syncWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
|
|
}
|
|
|
|
syncWatcher.Stop()
|
|
|
|
|
2015-08-14 10:14:09 +00:00
|
|
|
podFoo := makeTestPod("foo")
|
2015-11-02 13:51:56 +00:00
|
|
|
podFoo.Labels = map[string]string{"filter": "foo"}
|
2015-08-14 10:14:09 +00:00
|
|
|
podFooFiltered := makeTestPod("foo")
|
2015-11-02 13:51:56 +00:00
|
|
|
podFooPrime := makeTestPod("foo")
|
|
|
|
podFooPrime.Labels = map[string]string{"filter": "foo"}
|
|
|
|
podFooPrime.Spec.NodeName = "fakeNode"
|
2015-08-14 10:14:09 +00:00
|
|
|
|
2016-07-14 02:21:25 +00:00
|
|
|
podFooNS2 := makeTestPod("foo")
|
|
|
|
podFooNS2.Namespace += "2"
|
|
|
|
podFooNS2.Labels = map[string]string{"filter": "foo"}
|
|
|
|
|
|
|
|
// Create in another namespace first to make sure events from other namespaces don't get delivered
|
|
|
|
updatePod(t, etcdStorage, podFooNS2, nil)
|
|
|
|
|
2015-11-02 13:51:56 +00:00
|
|
|
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
|
|
|
fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated)
|
|
|
|
fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered)
|
|
|
|
_ = updatePod(t, etcdStorage, podFooPrime, fooUnfiltered)
|
|
|
|
|
|
|
|
deleted := api.Pod{}
|
2016-11-13 00:04:23 +00:00
|
|
|
if err := etcdStorage.Delete(context.TODO(), "pods/ns/foo", &deleted, nil); err != nil {
|
2015-11-02 13:51:56 +00:00
|
|
|
t.Errorf("Unexpected error: %v", err)
|
2015-08-14 10:14:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Set up Watch for object "podFoo" with label filter set.
|
2016-08-23 03:41:21 +00:00
|
|
|
pred := storage.SelectionPredicate{
|
|
|
|
Label: labels.SelectorFromSet(labels.Set{"filter": "foo"}),
|
|
|
|
Field: fields.Everything(),
|
|
|
|
GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, err error) {
|
|
|
|
metadata, err := meta.Accessor(obj)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
|
|
}
|
|
|
|
return labels.Set(metadata.GetLabels()), nil, nil
|
|
|
|
},
|
2015-08-14 10:14:09 +00:00
|
|
|
}
|
2016-08-23 03:41:21 +00:00
|
|
|
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, pred)
|
2015-11-02 13:51:56 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Unexpected error: %v", err)
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
2015-12-28 09:35:12 +00:00
|
|
|
defer watcher.Stop()
|
2015-07-28 06:26:53 +00:00
|
|
|
|
2015-11-02 13:51:56 +00:00
|
|
|
verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered)
|
|
|
|
verifyWatchEvent(t, watcher, watch.Added, podFoo)
|
|
|
|
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
|
|
|
|
verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
|
2015-07-28 06:26:53 +00:00
|
|
|
}
|
2016-04-13 16:51:50 +00:00
|
|
|
|
|
|
|
func TestStartingResourceVersion(t *testing.T) {
|
|
|
|
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
|
|
|
defer server.Terminate(t)
|
2016-11-13 04:48:19 +00:00
|
|
|
cacher := newTestCacher(etcdStorage, 10)
|
2016-04-13 16:51:50 +00:00
|
|
|
defer cacher.Stop()
|
|
|
|
|
|
|
|
// add 1 object
|
|
|
|
podFoo := makeTestPod("foo")
|
|
|
|
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
|
|
|
|
|
|
|
// Set up Watch starting at fooCreated.ResourceVersion + 10
|
|
|
|
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
|
|
}
|
|
|
|
rv += 10
|
|
|
|
startVersion := strconv.Itoa(int(rv))
|
|
|
|
|
|
|
|
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
|
|
}
|
|
|
|
defer watcher.Stop()
|
|
|
|
|
|
|
|
lastFoo := fooCreated
|
|
|
|
for i := 0; i < 11; i++ {
|
|
|
|
podFooForUpdate := makeTestPod("foo")
|
|
|
|
podFooForUpdate.Labels = map[string]string{"foo": strconv.Itoa(i)}
|
|
|
|
lastFoo = updatePod(t, etcdStorage, podFooForUpdate, lastFoo)
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case e := <-watcher.ResultChan():
|
|
|
|
pod := e.Object.(*api.Pod)
|
|
|
|
podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("unexpected error: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// event should have at least rv + 1, since we're starting the watch at rv
|
|
|
|
if podRV <= rv {
|
|
|
|
t.Errorf("expected event with resourceVersion of at least %d, got %d", rv+1, podRV)
|
|
|
|
}
|
|
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
|
|
t.Errorf("timed out waiting for event")
|
|
|
|
}
|
|
|
|
}
|