From 6dfe5b4b5a6c8eb50ebdab151af10e5fea6c4519 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 28 Aug 2015 11:24:34 +0200 Subject: [PATCH] Refactoring of watch etcd tests. --- pkg/api/rest/resttest/resttest.go | 137 ++++++++++- pkg/registry/controller/etcd/etcd_test.go | 243 ++++--------------- pkg/registry/daemon/etcd/etcd_test.go | 243 ++++--------------- pkg/registry/minion/etcd/etcd_test.go | 127 +++------- pkg/registry/pod/etcd/etcd_test.go | 143 +++-------- pkg/registry/registrytest/etcd.go | 22 ++ pkg/registry/resourcequota/etcd/etcd_test.go | 142 +++-------- pkg/registry/resourcequota/strategy.go | 3 +- pkg/registry/service/etcd/etcd_test.go | 34 +++ 9 files changed, 381 insertions(+), 713 deletions(-) diff --git a/pkg/api/rest/resttest/resttest.go b/pkg/api/rest/resttest/resttest.go index 0a232be3cb..23b5a87221 100644 --- a/pkg/api/rest/resttest/resttest.go +++ b/pkg/api/rest/resttest/resttest.go @@ -21,6 +21,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/coreos/go-etcd/etcd" "k8s.io/kubernetes/pkg/api" @@ -120,7 +121,10 @@ func copyOrDie(obj runtime.Object) runtime.Object { } type AssignFunc func([]runtime.Object) []runtime.Object +type EmitFunc func(runtime.Object, string) error type GetFunc func(api.Context, runtime.Object) (runtime.Object, error) +type InitWatchFunc func() +type InjectErrFunc func(err error) type SetFunc func(api.Context, runtime.Object) error type SetRVFunc func(uint64) type UpdateFunc func(runtime.Object) runtime.Object @@ -185,7 +189,7 @@ func (t *Tester) TestGet(obj runtime.Object) { } } -// Test listing object. +// Test listing objects. func (t *Tester) TestList(obj runtime.Object, assignFn AssignFunc, setRVFn SetRVFunc) { t.testListError() t.testListFound(obj, assignFn) @@ -193,6 +197,15 @@ func (t *Tester) TestList(obj runtime.Object, assignFn AssignFunc, setRVFn SetRV t.testListMatchLabels(obj, assignFn) } +// Test watching objects. +func (t *Tester) TestWatch( + obj runtime.Object, initWatchFn InitWatchFunc, injectErrFn InjectErrFunc, emitFn EmitFunc, + labelsPass, labelsFail []labels.Set, fieldsPass, fieldsFail []fields.Set, actions []string) { + t.testWatch(initWatchFn, injectErrFn) + t.testWatchLabels(copyOrDie(obj), initWatchFn, emitFn, labelsPass, labelsFail, actions) + t.testWatchFields(copyOrDie(obj), initWatchFn, emitFn, fieldsPass, fieldsFail, actions) +} + // ============================================================================= // Creation tests. @@ -914,3 +927,125 @@ func (t *Tester) testListNotFound(assignFn AssignFunc, setRVFn SetRVFunc) { t.Errorf("unexpected resource version: %d", meta.ResourceVersion) } } + +// ============================================================================= +// Watching tests. + +func (t *Tester) testWatch(initWatchFn InitWatchFunc, injectErrFn InjectErrFunc) { + ctx := t.TestContext() + watcher, err := t.storage.(rest.Watcher).Watch(ctx, labels.Everything(), fields.Everything(), "1") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + initWatchFn() + + select { + case _, ok := <-watcher.ResultChan(): + if !ok { + t.Errorf("watch channel should be open") + } + default: + } + + injectErrFn(nil) + if _, ok := <-watcher.ResultChan(); ok { + t.Errorf("watch channel should be closed") + } + watcher.Stop() +} + +func (t *Tester) testWatchFields(obj runtime.Object, initWatchFn InitWatchFunc, emitFn EmitFunc, fieldsPass, fieldsFail []fields.Set, actions []string) { + ctx := t.TestContext() + + for _, field := range fieldsPass { + for _, action := range actions { + watcher, err := t.storage.(rest.Watcher).Watch(ctx, labels.Everything(), field.AsSelector(), "1") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + initWatchFn() + if err := emitFn(obj, action); err != nil { + t.Errorf("unexpected error: %v", err) + } + + select { + case _, ok := <-watcher.ResultChan(): + if !ok { + t.Errorf("watch channel should be open") + } + case <-time.After(time.Millisecond * 100): + t.Errorf("unexpected timeout from result channel") + } + watcher.Stop() + } + } + + for _, field := range fieldsFail { + for _, action := range actions { + watcher, err := t.storage.(rest.Watcher).Watch(ctx, labels.Everything(), field.AsSelector(), "1") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + initWatchFn() + if err := emitFn(obj, action); err != nil { + t.Errorf("unexpected error: %v", err) + } + + select { + case <-watcher.ResultChan(): + t.Errorf("unexpected result from result channel") + case <-time.After(time.Millisecond * 100): + // expected case + } + watcher.Stop() + } + } +} + +func (t *Tester) testWatchLabels(obj runtime.Object, initWatchFn InitWatchFunc, emitFn EmitFunc, labelsPass, labelsFail []labels.Set, actions []string) { + ctx := t.TestContext() + + for _, label := range labelsPass { + for _, action := range actions { + watcher, err := t.storage.(rest.Watcher).Watch(ctx, label.AsSelector(), fields.Everything(), "1") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + initWatchFn() + if err := emitFn(obj, action); err != nil { + t.Errorf("unexpected error: %v", err) + } + + select { + case _, ok := <-watcher.ResultChan(): + if !ok { + t.Errorf("watch channel should be open") + } + case <-time.After(time.Millisecond * 100): + t.Errorf("unexpected timeout from result channel") + } + watcher.Stop() + } + } + + for _, label := range labelsFail { + for _, action := range actions { + watcher, err := t.storage.(rest.Watcher).Watch(ctx, label.AsSelector(), fields.Everything(), "1") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + initWatchFn() + if err := emitFn(obj, action); err != nil { + t.Errorf("unexpected error: %v", err) + } + + select { + case <-watcher.ResultChan(): + t.Errorf("unexpected result from result channel") + case <-time.After(time.Millisecond * 100): + // expected case + } + watcher.Stop() + } + } +} diff --git a/pkg/registry/controller/etcd/etcd_test.go b/pkg/registry/controller/etcd/etcd_test.go index 01f2bd8cf2..05a7415665 100644 --- a/pkg/registry/controller/etcd/etcd_test.go +++ b/pkg/registry/controller/etcd/etcd_test.go @@ -18,7 +18,6 @@ package etcd import ( "testing" - "time" "github.com/coreos/go-etcd/etcd" "k8s.io/kubernetes/pkg/api" @@ -26,19 +25,12 @@ import ( "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" - etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd" "k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/runtime" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" "k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/tools/etcdtest" ) -const ( - PASS = iota - FAIL -) - func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) { etcdStorage, fakeClient := registrytest.NewEtcdStorage(t) return NewREST(etcdStorage), fakeClient @@ -83,7 +75,7 @@ func validNewController() *api.ReplicationController { } } -var validController = *validNewController() +var validController = validNewController() func TestCreate(t *testing.T) { storage, fakeClient := newStorage(t) @@ -219,13 +211,54 @@ func TestEtcdListControllers(t *testing.T) { }) } +func TestWatchControllers(t *testing.T) { + storage, fakeClient := newStorage(t) + test := resttest.New(t, storage, fakeClient.SetError) + test.TestWatch( + validController, + func() { + fakeClient.WaitForWatchCompletion() + }, + func(err error) { + fakeClient.WatchInjectError <- err + }, + func(obj runtime.Object, action string) error { + return registrytest.EmitObject(fakeClient, obj, action) + }, + // matching labels + []labels.Set{ + {"a": "b"}, + }, + // not matching labels + []labels.Set{ + {"a": "c"}, + {"foo": "bar"}, + }, + // matching fields + []fields.Set{ + {"status.replicas": "0"}, + {"metadata.name": "foo"}, + {"status.replicas": "0", "metadata.name": "foo"}, + }, + // not matchin fields + []fields.Set{ + {"status.replicas": "10"}, + {"metadata.name": "bar"}, + {"name": "foo"}, + {"status.replicas": "10", "metadata.name": "foo"}, + {"status.replicas": "0", "metadata.name": "bar"}, + }, + registrytest.WatchActions, + ) +} + func TestEtcdDeleteController(t *testing.T) { ctx := api.NewDefaultContext() storage, fakeClient := newStorage(t) key, _ := storage.KeyFunc(ctx, validController.Name) key = etcdtest.AddPrefix(key) - fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), validNewController()), 0) + fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), validController), 0) obj, err := storage.Delete(ctx, validController.Name, nil) if err != nil { t.Errorf("unexpected error: %v", err) @@ -243,196 +276,6 @@ func TestEtcdDeleteController(t *testing.T) { } } -func TestEtcdWatchController(t *testing.T) { - ctx := api.NewDefaultContext() - storage, fakeClient := newStorage(t) - watching, err := storage.Watch(ctx, - labels.Everything(), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - default: - } - fakeClient.WatchInjectError <- nil - if _, ok := <-watching.ResultChan(); ok { - t.Errorf("watching channel should be closed") - } - watching.Stop() -} - -func TestEtcdWatchControllersMatch(t *testing.T) { - ctx := api.WithNamespace(api.NewDefaultContext(), validController.Namespace) - storage, fakeClient := newStorage(t) - fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods")) - - watching, err := storage.Watch(ctx, - labels.SelectorFromSet(validController.Spec.Selector), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - // The watcher above is waiting for these Labels, on receiving them it should - // apply the ControllerStatus decorator, which lists pods, causing a query against - // the /registry/pods endpoint of the etcd client. - controller := &api.ReplicationController{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Labels: validController.Spec.Selector, - Namespace: "default", - }, - } - controllerBytes, _ := testapi.Codec().Encode(controller) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(controllerBytes), - }, - } - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - case <-time.After(time.Millisecond * 100): - t.Error("unexpected timeout from result channel") - } - watching.Stop() -} - -func TestEtcdWatchControllersFields(t *testing.T) { - ctx := api.WithNamespace(api.NewDefaultContext(), validController.Namespace) - storage, fakeClient := newStorage(t) - fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods")) - - testFieldMap := map[int][]fields.Set{ - PASS: { - {"status.replicas": "0"}, - {"metadata.name": "foo"}, - {"status.replicas": "0", "metadata.name": "foo"}, - }, - FAIL: { - {"status.replicas": "10"}, - {"metadata.name": "bar"}, - {"name": "foo"}, - {"status.replicas": "10", "metadata.name": "foo"}, - {"status.replicas": "0", "metadata.name": "bar"}, - }, - } - testEtcdActions := []string{ - etcdstorage.EtcdCreate, - etcdstorage.EtcdSet, - etcdstorage.EtcdCAS, - etcdstorage.EtcdDelete} - - controller := &api.ReplicationController{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Labels: validController.Spec.Selector, - Namespace: "default", - }, - Status: api.ReplicationControllerStatus{ - Replicas: 0, - }, - } - controllerBytes, _ := testapi.Codec().Encode(controller) - - for expectedResult, fieldSet := range testFieldMap { - for _, field := range fieldSet { - for _, action := range testEtcdActions { - watching, err := storage.Watch(ctx, - labels.Everything(), - field.AsSelector(), - "1", - ) - var prevNode *etcd.Node = nil - node := &etcd.Node{ - Value: string(controllerBytes), - } - if action == etcdstorage.EtcdDelete { - prevNode = node - } - fakeClient.WaitForWatchCompletion() - fakeClient.WatchResponse <- &etcd.Response{ - Action: action, - Node: node, - PrevNode: prevNode, - } - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - select { - case r, ok := <-watching.ResultChan(): - if expectedResult == FAIL { - t.Errorf("Unexpected result from channel %#v", r) - } - if !ok { - t.Errorf("watching channel should be open") - } - case <-time.After(time.Millisecond * 100): - if expectedResult == PASS { - t.Error("unexpected timeout from result channel") - } - } - watching.Stop() - } - } - } -} - -func TestEtcdWatchControllersNotMatch(t *testing.T) { - ctx := api.NewDefaultContext() - storage, fakeClient := newStorage(t) - fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods")) - - watching, err := storage.Watch(ctx, - labels.SelectorFromSet(labels.Set{"name": "foo"}), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - controller := &api.ReplicationController{ - ObjectMeta: api.ObjectMeta{ - Name: "bar", - Labels: map[string]string{ - "name": "bar", - }, - }, - } - controllerBytes, _ := testapi.Codec().Encode(controller) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(controllerBytes), - }, - } - - select { - case <-watching.ResultChan(): - t.Error("unexpected result from result channel") - case <-time.After(time.Millisecond * 100): - // expected case - } -} - func TestDelete(t *testing.T) { ctx := api.NewDefaultContext() storage, fakeClient := newStorage(t) diff --git a/pkg/registry/daemon/etcd/etcd_test.go b/pkg/registry/daemon/etcd/etcd_test.go index 61d3aaae23..f77d10a761 100755 --- a/pkg/registry/daemon/etcd/etcd_test.go +++ b/pkg/registry/daemon/etcd/etcd_test.go @@ -18,7 +18,6 @@ package etcd import ( "testing" - "time" "github.com/coreos/go-etcd/etcd" "k8s.io/kubernetes/pkg/api" @@ -27,19 +26,12 @@ import ( "k8s.io/kubernetes/pkg/expapi" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" - etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd" "k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/runtime" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" "k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/tools/etcdtest" ) -const ( - PASS = iota - FAIL -) - func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) { etcdStorage, fakeClient := registrytest.NewEtcdStorage(t) return NewREST(etcdStorage), fakeClient @@ -84,16 +76,16 @@ func validNewDaemon() *expapi.Daemon { } } -var validDaemon = *validNewDaemon() +var validDaemon = validNewDaemon() func TestCreate(t *testing.T) { storage, fakeClient := newStorage(t) test := resttest.New(t, storage, fakeClient.SetError) - controller := validNewDaemon() - controller.ObjectMeta = api.ObjectMeta{} + daemon := validNewDaemon() + daemon.ObjectMeta = api.ObjectMeta{} test.TestCreate( // valid - controller, + daemon, func(ctx api.Context, obj runtime.Object) error { return registrytest.SetObject(fakeClient, storage.KeyFunc, ctx, obj) }, @@ -175,13 +167,49 @@ func TestEtcdListControllers(t *testing.T) { }) } +func TestWatchControllers(t *testing.T) { + storage, fakeClient := newStorage(t) + test := resttest.New(t, storage, fakeClient.SetError) + test.TestWatch( + validDaemon, + func() { + fakeClient.WaitForWatchCompletion() + }, + func(err error) { + fakeClient.WatchInjectError <- err + }, + func(obj runtime.Object, action string) error { + return registrytest.EmitObject(fakeClient, obj, action) + }, + // matching labels + []labels.Set{ + {"a": "b"}, + }, + // not matching labels + []labels.Set{ + {"a": "c"}, + {"foo": "bar"}, + }, + // matching fields + []fields.Set{ + {"metadata.name": "foo"}, + }, + // notmatching fields + []fields.Set{ + {"metadata.name": "bar"}, + {"name": "foo"}, + }, + registrytest.WatchActions, + ) +} + func TestEtcdDeleteController(t *testing.T) { ctx := api.NewDefaultContext() storage, fakeClient := newStorage(t) key, err := storage.KeyFunc(ctx, validDaemon.Name) key = etcdtest.AddPrefix(key) - fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), validNewDaemon()), 0) + fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), validDaemon), 0) obj, err := storage.Delete(ctx, validDaemon.Name, nil) if err != nil { t.Errorf("unexpected error: %v", err) @@ -199,195 +227,6 @@ func TestEtcdDeleteController(t *testing.T) { } } -func TestEtcdWatchController(t *testing.T) { - ctx := api.NewDefaultContext() - storage, fakeClient := newStorage(t) - watching, err := storage.Watch(ctx, - labels.Everything(), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - default: - } - fakeClient.WatchInjectError <- nil - if _, ok := <-watching.ResultChan(); ok { - t.Errorf("watching channel should be closed") - } - watching.Stop() -} - -// Tests that we can watch for the creation of daemon controllers with specified labels. -func TestEtcdWatchControllersMatch(t *testing.T) { - ctx := api.WithNamespace(api.NewDefaultContext(), validDaemon.Namespace) - storage, fakeClient := newStorage(t) - fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods")) - - watching, err := storage.Watch(ctx, - labels.SelectorFromSet(validDaemon.Spec.Selector), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - // The watcher above is waiting for these Labels, on receiving them it should - // apply the ControllerStatus decorator, which lists pods, causing a query against - // the /registry/pods endpoint of the etcd client. - controller := &expapi.Daemon{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Labels: validDaemon.Spec.Selector, - Namespace: "default", - }, - } - controllerBytes, _ := testapi.Codec().Encode(controller) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(controllerBytes), - }, - } - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - case <-time.After(time.Millisecond * 100): - t.Error("unexpected timeout from result channel") - } - watching.Stop() -} - -// Tests that we can watch for daemon controllers with specified fields. -func TestEtcdWatchControllersFields(t *testing.T) { - ctx := api.WithNamespace(api.NewDefaultContext(), validDaemon.Namespace) - storage, fakeClient := newStorage(t) - fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods")) - - testFieldMap := map[int][]fields.Set{ - PASS: { - {"metadata.name": "foo"}, - }, - FAIL: { - {"metadata.name": "bar"}, - {"name": "foo"}, - }, - } - testEtcdActions := []string{ - etcdstorage.EtcdCreate, - etcdstorage.EtcdSet, - etcdstorage.EtcdCAS, - etcdstorage.EtcdDelete} - - controller := &expapi.Daemon{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Labels: validDaemon.Spec.Selector, - Namespace: "default", - }, - Status: expapi.DaemonStatus{ - CurrentNumberScheduled: 2, - NumberMisscheduled: 1, - DesiredNumberScheduled: 4, - }, - } - controllerBytes, _ := testapi.Codec().Encode(controller) - - for expectedResult, fieldSet := range testFieldMap { - for _, field := range fieldSet { - for _, action := range testEtcdActions { - watching, err := storage.Watch(ctx, - labels.Everything(), - field.AsSelector(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - var prevNode *etcd.Node = nil - node := &etcd.Node{ - Value: string(controllerBytes), - } - if action == etcdstorage.EtcdDelete { - prevNode = node - } - fakeClient.WaitForWatchCompletion() - fakeClient.WatchResponse <- &etcd.Response{ - Action: action, - Node: node, - PrevNode: prevNode, - } - - select { - case r, ok := <-watching.ResultChan(): - if expectedResult == FAIL { - t.Errorf("Unexpected result from channel %#v", r) - } - if !ok { - t.Errorf("watching channel should be open") - } - case <-time.After(time.Millisecond * 100): - if expectedResult == PASS { - t.Error("unexpected timeout from result channel") - } - } - watching.Stop() - } - } - } -} - -func TestEtcdWatchControllersNotMatch(t *testing.T) { - ctx := api.NewDefaultContext() - storage, fakeClient := newStorage(t) - fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods")) - - watching, err := storage.Watch(ctx, - labels.SelectorFromSet(labels.Set{"name": "foo"}), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - controller := &expapi.Daemon{ - ObjectMeta: api.ObjectMeta{ - Name: "bar", - Labels: map[string]string{ - "name": "bar", - }, - }, - } - controllerBytes, _ := testapi.Codec().Encode(controller) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(controllerBytes), - }, - } - - select { - case <-watching.ResultChan(): - t.Error("unexpected result from result channel") - case <-time.After(time.Millisecond * 100): - // expected case - } -} - func TestDelete(t *testing.T) { ctx := api.NewDefaultContext() storage, fakeClient := newStorage(t) diff --git a/pkg/registry/minion/etcd/etcd_test.go b/pkg/registry/minion/etcd/etcd_test.go index 435f467d1d..b1ceb8bbee 100644 --- a/pkg/registry/minion/etcd/etcd_test.go +++ b/pkg/registry/minion/etcd/etcd_test.go @@ -19,7 +19,6 @@ package etcd import ( "net/http" "testing" - "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" @@ -169,6 +168,41 @@ func TestEtcdListNodes(t *testing.T) { }) } +func TestWatchNodes(t *testing.T) { + storage, fakeClient := newStorage(t) + test := resttest.New(t, storage, fakeClient.SetError).ClusterScope() + test.TestWatch( + validNewNode(), + func() { + fakeClient.WaitForWatchCompletion() + }, + func(err error) { + fakeClient.WatchInjectError <- err + }, + func(obj runtime.Object, action string) error { + return registrytest.EmitObject(fakeClient, obj, action) + }, + // matching labels + []labels.Set{ + {"name": "foo"}, + }, + // not matching labels + []labels.Set{ + {"name": "bar"}, + {"foo": "bar"}, + }, + // matching fields + []fields.Set{ + {"metadata.name": "foo"}, + }, + // not matchin fields + []fields.Set{ + {"metadata.name": "bar"}, + }, + registrytest.WatchActions, + ) +} + func TestEtcdDeleteNode(t *testing.T) { ctx := api.NewContext() storage, fakeClient := newStorage(t) @@ -188,94 +222,3 @@ func TestEtcdDeleteNode(t *testing.T) { t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } } - -func TestEtcdWatchNode(t *testing.T) { - ctx := api.NewDefaultContext() - storage, fakeClient := newStorage(t) - watching, err := storage.Watch(ctx, - labels.Everything(), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - default: - } - fakeClient.WatchInjectError <- nil - if _, ok := <-watching.ResultChan(); ok { - t.Errorf("watching channel should be closed") - } - watching.Stop() -} - -func TestEtcdWatchNodesMatch(t *testing.T) { - ctx := api.NewDefaultContext() - storage, fakeClient := newStorage(t) - node := validNewNode() - - watching, err := storage.Watch(ctx, - labels.SelectorFromSet(labels.Set{"name": node.Name}), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - nodeBytes, _ := testapi.Codec().Encode(node) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(nodeBytes), - }, - } - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - case <-time.After(time.Millisecond * 100): - t.Error("unexpected timeout from result channel") - } - watching.Stop() -} - -func TestEtcdWatchNodesNotMatch(t *testing.T) { - ctx := api.NewDefaultContext() - storage, fakeClient := newStorage(t) - node := validNewNode() - - watching, err := storage.Watch(ctx, - labels.SelectorFromSet(labels.Set{"name": "bar"}), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - nodeBytes, _ := testapi.Codec().Encode(node) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(nodeBytes), - }, - } - - select { - case <-watching.ResultChan(): - t.Error("unexpected result from result channel") - case <-time.After(time.Millisecond * 100): - // expected case - } -} diff --git a/pkg/registry/pod/etcd/etcd_test.go b/pkg/registry/pod/etcd/etcd_test.go index 38c05e8c8d..fe977a48ed 100644 --- a/pkg/registry/pod/etcd/etcd_test.go +++ b/pkg/registry/pod/etcd/etcd_test.go @@ -20,7 +20,6 @@ import ( "fmt" "strings" "testing" - "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" @@ -376,17 +375,15 @@ func TestDeletePod(t *testing.T) { func TestEtcdGet(t *testing.T) { storage, _, _, fakeClient := newStorage(t) test := resttest.New(t, storage, fakeClient.SetError) - pod := validNewPod() - test.TestGet(pod) + test.TestGet(validNewPod()) } func TestEtcdList(t *testing.T) { storage, _, _, fakeClient := newStorage(t) test := resttest.New(t, storage, fakeClient.SetError) key := etcdtest.AddPrefix(storage.Etcd.KeyRootFunc(test.TestContext())) - pod := validNewPod() test.TestList( - pod, + validNewPod(), func(objects []runtime.Object) []runtime.Object { return registrytest.SetObjectsForKey(fakeClient, key, objects) }, @@ -395,6 +392,38 @@ func TestEtcdList(t *testing.T) { }) } +func TestEtcdWatch(t *testing.T) { + storage, _, _, fakeClient := newStorage(t) + test := resttest.New(t, storage, fakeClient.SetError) + test.TestWatch( + validNewPod(), + func() { + fakeClient.WaitForWatchCompletion() + }, + func(err error) { + fakeClient.WatchInjectError <- err + }, + func(obj runtime.Object, action string) error { + return registrytest.EmitObject(fakeClient, obj, action) + }, + // matching labels + []labels.Set{}, + // not matching labels + []labels.Set{ + {"foo": "bar"}, + }, + // matching fields + []fields.Set{ + {"metadata.name": "foo"}, + }, + // not matchin fields + []fields.Set{ + {"metadata.name": "bar"}, + }, + registrytest.WatchActions, + ) +} + func TestEtcdCreate(t *testing.T) { storage, bindingStorage, _, fakeClient := newStorage(t) ctx := api.NewDefaultContext() @@ -893,107 +922,3 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } } - -func TestEtcdWatchPods(t *testing.T) { - storage, _, _, fakeClient := newStorage(t) - ctx := api.NewDefaultContext() - watching, err := storage.Watch(ctx, - labels.Everything(), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - default: - } - fakeClient.WatchInjectError <- nil - if _, ok := <-watching.ResultChan(); ok { - t.Errorf("watching channel should be closed") - } - watching.Stop() -} - -func TestEtcdWatchPodsMatch(t *testing.T) { - storage, _, _, fakeClient := newStorage(t) - ctx := api.NewDefaultContext() - watching, err := storage.Watch(ctx, - labels.SelectorFromSet(labels.Set{"name": "foo"}), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Namespace: "default", - Labels: map[string]string{ - "name": "foo", - }, - }, - } - podBytes, _ := testapi.Codec().Encode(pod) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(podBytes), - }, - } - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - case <-time.After(time.Millisecond * 100): - t.Error("unexpected timeout from result channel") - } - watching.Stop() -} - -func TestEtcdWatchPodsNotMatch(t *testing.T) { - storage, _, _, fakeClient := newStorage(t) - ctx := api.NewDefaultContext() - watching, err := storage.Watch(ctx, - labels.SelectorFromSet(labels.Set{"name": "foo"}), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "bar", - Labels: map[string]string{ - "name": "bar", - }, - }, - } - podBytes, _ := testapi.Codec().Encode(pod) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(podBytes), - }, - } - - select { - case <-watching.ResultChan(): - t.Error("unexpected result from result channel") - case <-time.After(time.Millisecond * 100): - // expected case - } -} diff --git a/pkg/registry/registrytest/etcd.go b/pkg/registry/registrytest/etcd.go index 628b10fa4c..dfcee12bd7 100644 --- a/pkg/registry/registrytest/etcd.go +++ b/pkg/registry/registrytest/etcd.go @@ -37,9 +37,31 @@ func NewEtcdStorage(t *testing.T) (storage.Interface, *tools.FakeEtcdClient) { return etcdStorage, fakeClient } +var WatchActions = []string{etcdstorage.EtcdCreate, etcdstorage.EtcdSet, etcdstorage.EtcdCAS, etcdstorage.EtcdDelete} + type keyFunc func(api.Context, string) (string, error) type newFunc func() runtime.Object +func EmitObject(fakeClient *tools.FakeEtcdClient, obj runtime.Object, action string) error { + encoded, err := testapi.Codec().Encode(obj) + if err != nil { + return err + } + node := &etcd.Node{ + Value: string(encoded), + } + var prevNode *etcd.Node = nil + if action == etcdstorage.EtcdDelete { + prevNode = node + } + fakeClient.WatchResponse <- &etcd.Response{ + Action: action, + Node: node, + PrevNode: prevNode, + } + return nil +} + func GetObject(fakeClient *tools.FakeEtcdClient, keyFn keyFunc, newFn newFunc, ctx api.Context, obj runtime.Object) (runtime.Object, error) { meta, err := api.ObjectMetaFor(obj) if err != nil { diff --git a/pkg/registry/resourcequota/etcd/etcd_test.go b/pkg/registry/resourcequota/etcd/etcd_test.go index 1034f4628c..39c8d1a6da 100644 --- a/pkg/registry/resourcequota/etcd/etcd_test.go +++ b/pkg/registry/resourcequota/etcd/etcd_test.go @@ -19,7 +19,6 @@ package etcd import ( "fmt" "testing" - "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" @@ -181,17 +180,15 @@ func TestDeleteResourceQuota(t *testing.T) { func TestEtcdGet(t *testing.T) { storage, _, fakeClient := newStorage(t) test := resttest.New(t, storage, fakeClient.SetError) - resourcequota := validNewResourceQuota() - test.TestGet(resourcequota) + test.TestGet(validNewResourceQuota()) } func TestEtcdList(t *testing.T) { storage, _, fakeClient := newStorage(t) test := resttest.New(t, storage, fakeClient.SetError) key := etcdtest.AddPrefix(storage.Etcd.KeyRootFunc(test.TestContext())) - resourcequota := validNewResourceQuota() test.TestList( - resourcequota, + validNewResourceQuota(), func(objects []runtime.Object) []runtime.Object { return registrytest.SetObjectsForKey(fakeClient, key, objects) }, @@ -200,6 +197,38 @@ func TestEtcdList(t *testing.T) { }) } +func TestEtcdWatch(t *testing.T) { + storage, _, fakeClient := newStorage(t) + test := resttest.New(t, storage, fakeClient.SetError) + test.TestWatch( + validNewResourceQuota(), + func() { + fakeClient.WaitForWatchCompletion() + }, + func(err error) { + fakeClient.WatchInjectError <- err + }, + func(obj runtime.Object, action string) error { + return registrytest.EmitObject(fakeClient, obj, action) + }, + // matching labels + []labels.Set{}, + // not matching labels + []labels.Set{ + {"foo": "bar"}, + }, + // matching fields + []fields.Set{ + {"metadata.name": "foo"}, + }, + // not matchin fields + []fields.Set{ + {"metadata.name": "bar"}, + }, + registrytest.WatchActions, + ) +} + func TestEtcdUpdateStatus(t *testing.T) { storage, status, fakeClient := newStorage(t) ctx := api.NewDefaultContext() @@ -249,106 +278,3 @@ func TestEtcdUpdateStatus(t *testing.T) { t.Errorf("unexpected object: %s", util.ObjectDiff(&expected, rqOut)) } } - -func TestEtcdWatchResourceQuotas(t *testing.T) { - storage, _, fakeClient := newStorage(t) - ctx := api.NewDefaultContext() - watching, err := storage.Watch(ctx, - labels.Everything(), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - default: - } - fakeClient.WatchInjectError <- nil - if _, ok := <-watching.ResultChan(); ok { - t.Errorf("watching channel should be closed") - } - watching.Stop() -} - -func TestEtcdWatchResourceQuotasMatch(t *testing.T) { - storage, _, fakeClient := newStorage(t) - ctx := api.NewDefaultContext() - watching, err := storage.Watch(ctx, - labels.SelectorFromSet(labels.Set{"name": "foo"}), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - resourcequota := &api.ResourceQuota{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Labels: map[string]string{ - "name": "foo", - }, - }, - } - resourcequotaBytes, _ := testapi.Codec().Encode(resourcequota) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(resourcequotaBytes), - }, - } - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - case <-time.After(time.Millisecond * 100): - t.Error("unexpected timeout from result channel") - } - watching.Stop() -} - -func TestEtcdWatchResourceQuotasNotMatch(t *testing.T) { - storage, _, fakeClient := newStorage(t) - ctx := api.NewDefaultContext() - watching, err := storage.Watch(ctx, - labels.SelectorFromSet(labels.Set{"name": "foo"}), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - resourcequota := &api.ResourceQuota{ - ObjectMeta: api.ObjectMeta{ - Name: "bar", - Labels: map[string]string{ - "name": "bar", - }, - }, - } - resourcequotaBytes, _ := testapi.Codec().Encode(resourcequota) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(resourcequotaBytes), - }, - } - - select { - case <-watching.ResultChan(): - t.Error("unexpected result from result channel") - case <-time.After(time.Millisecond * 100): - // expected case - } -} diff --git a/pkg/registry/resourcequota/strategy.go b/pkg/registry/resourcequota/strategy.go index 5bcc36d86c..e7e5e71f1b 100644 --- a/pkg/registry/resourcequota/strategy.go +++ b/pkg/registry/resourcequota/strategy.go @@ -106,9 +106,10 @@ func MatchResourceQuota(label labels.Selector, field fields.Selector) generic.Ma } // ResourceQuotaToSelectableFields returns a label set that represents the object -// TODO: fields are not labels, and the validation rules for them do not apply. func ResourceQuotaToSelectableFields(resourcequota *api.ResourceQuota) labels.Set { return labels.Set{ + "metadata.name": resourcequota.Name, + // Having "name" is a bug, but it must be supported for v1 API backward compatibility. "name": resourcequota.Name, } } diff --git a/pkg/registry/service/etcd/etcd_test.go b/pkg/registry/service/etcd/etcd_test.go index c17b1f8e71..475ee2d9e1 100644 --- a/pkg/registry/service/etcd/etcd_test.go +++ b/pkg/registry/service/etcd/etcd_test.go @@ -21,6 +21,8 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/rest/resttest" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/tools" @@ -119,3 +121,35 @@ func TestUpdate(t *testing.T) { }, ) } + +func TestWatch(t *testing.T) { + storage, fakeClient := newStorage(t) + test := resttest.New(t, storage, fakeClient.SetError) + test.TestWatch( + validService(), + func() { + fakeClient.WaitForWatchCompletion() + }, + func(err error) { + fakeClient.WatchInjectError <- err + }, + func(obj runtime.Object, action string) error { + return registrytest.EmitObject(fakeClient, obj, action) + }, + // matching labels + []labels.Set{}, + // not matching labels + []labels.Set{ + {"foo": "bar"}, + }, + // matching fields + []fields.Set{ + {"metadata.name": "foo"}, + }, + // not matchin fields + []fields.Set{ + {"metadata.name": "bar"}, + }, + registrytest.WatchActions, + ) +}