Refactoring of watch etcd tests.

pull/6/head
Wojciech Tyczynski 2015-08-28 11:24:34 +02:00
parent 2f9652c7f1
commit 6dfe5b4b5a
9 changed files with 381 additions and 713 deletions

View File

@ -21,6 +21,7 @@ import (
"reflect"
"strings"
"testing"
"time"
"github.com/coreos/go-etcd/etcd"
"k8s.io/kubernetes/pkg/api"
@ -120,7 +121,10 @@ func copyOrDie(obj runtime.Object) runtime.Object {
}
type AssignFunc func([]runtime.Object) []runtime.Object
type EmitFunc func(runtime.Object, string) error
type GetFunc func(api.Context, runtime.Object) (runtime.Object, error)
type InitWatchFunc func()
type InjectErrFunc func(err error)
type SetFunc func(api.Context, runtime.Object) error
type SetRVFunc func(uint64)
type UpdateFunc func(runtime.Object) runtime.Object
@ -185,7 +189,7 @@ func (t *Tester) TestGet(obj runtime.Object) {
}
}
// Test listing object.
// Test listing objects.
func (t *Tester) TestList(obj runtime.Object, assignFn AssignFunc, setRVFn SetRVFunc) {
t.testListError()
t.testListFound(obj, assignFn)
@ -193,6 +197,15 @@ func (t *Tester) TestList(obj runtime.Object, assignFn AssignFunc, setRVFn SetRV
t.testListMatchLabels(obj, assignFn)
}
// Test watching objects.
func (t *Tester) TestWatch(
obj runtime.Object, initWatchFn InitWatchFunc, injectErrFn InjectErrFunc, emitFn EmitFunc,
labelsPass, labelsFail []labels.Set, fieldsPass, fieldsFail []fields.Set, actions []string) {
t.testWatch(initWatchFn, injectErrFn)
t.testWatchLabels(copyOrDie(obj), initWatchFn, emitFn, labelsPass, labelsFail, actions)
t.testWatchFields(copyOrDie(obj), initWatchFn, emitFn, fieldsPass, fieldsFail, actions)
}
// =============================================================================
// Creation tests.
@ -914,3 +927,125 @@ func (t *Tester) testListNotFound(assignFn AssignFunc, setRVFn SetRVFunc) {
t.Errorf("unexpected resource version: %d", meta.ResourceVersion)
}
}
// =============================================================================
// Watching tests.
func (t *Tester) testWatch(initWatchFn InitWatchFunc, injectErrFn InjectErrFunc) {
ctx := t.TestContext()
watcher, err := t.storage.(rest.Watcher).Watch(ctx, labels.Everything(), fields.Everything(), "1")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
initWatchFn()
select {
case _, ok := <-watcher.ResultChan():
if !ok {
t.Errorf("watch channel should be open")
}
default:
}
injectErrFn(nil)
if _, ok := <-watcher.ResultChan(); ok {
t.Errorf("watch channel should be closed")
}
watcher.Stop()
}
func (t *Tester) testWatchFields(obj runtime.Object, initWatchFn InitWatchFunc, emitFn EmitFunc, fieldsPass, fieldsFail []fields.Set, actions []string) {
ctx := t.TestContext()
for _, field := range fieldsPass {
for _, action := range actions {
watcher, err := t.storage.(rest.Watcher).Watch(ctx, labels.Everything(), field.AsSelector(), "1")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
initWatchFn()
if err := emitFn(obj, action); err != nil {
t.Errorf("unexpected error: %v", err)
}
select {
case _, ok := <-watcher.ResultChan():
if !ok {
t.Errorf("watch channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Errorf("unexpected timeout from result channel")
}
watcher.Stop()
}
}
for _, field := range fieldsFail {
for _, action := range actions {
watcher, err := t.storage.(rest.Watcher).Watch(ctx, labels.Everything(), field.AsSelector(), "1")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
initWatchFn()
if err := emitFn(obj, action); err != nil {
t.Errorf("unexpected error: %v", err)
}
select {
case <-watcher.ResultChan():
t.Errorf("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
watcher.Stop()
}
}
}
func (t *Tester) testWatchLabels(obj runtime.Object, initWatchFn InitWatchFunc, emitFn EmitFunc, labelsPass, labelsFail []labels.Set, actions []string) {
ctx := t.TestContext()
for _, label := range labelsPass {
for _, action := range actions {
watcher, err := t.storage.(rest.Watcher).Watch(ctx, label.AsSelector(), fields.Everything(), "1")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
initWatchFn()
if err := emitFn(obj, action); err != nil {
t.Errorf("unexpected error: %v", err)
}
select {
case _, ok := <-watcher.ResultChan():
if !ok {
t.Errorf("watch channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Errorf("unexpected timeout from result channel")
}
watcher.Stop()
}
}
for _, label := range labelsFail {
for _, action := range actions {
watcher, err := t.storage.(rest.Watcher).Watch(ctx, label.AsSelector(), fields.Everything(), "1")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
initWatchFn()
if err := emitFn(obj, action); err != nil {
t.Errorf("unexpected error: %v", err)
}
select {
case <-watcher.ResultChan():
t.Errorf("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
watcher.Stop()
}
}
}

View File

@ -18,7 +18,6 @@ package etcd
import (
"testing"
"time"
"github.com/coreos/go-etcd/etcd"
"k8s.io/kubernetes/pkg/api"
@ -26,19 +25,12 @@ import (
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/tools/etcdtest"
)
const (
PASS = iota
FAIL
)
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t)
return NewREST(etcdStorage), fakeClient
@ -83,7 +75,7 @@ func validNewController() *api.ReplicationController {
}
}
var validController = *validNewController()
var validController = validNewController()
func TestCreate(t *testing.T) {
storage, fakeClient := newStorage(t)
@ -219,13 +211,54 @@ func TestEtcdListControllers(t *testing.T) {
})
}
func TestWatchControllers(t *testing.T) {
storage, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError)
test.TestWatch(
validController,
func() {
fakeClient.WaitForWatchCompletion()
},
func(err error) {
fakeClient.WatchInjectError <- err
},
func(obj runtime.Object, action string) error {
return registrytest.EmitObject(fakeClient, obj, action)
},
// matching labels
[]labels.Set{
{"a": "b"},
},
// not matching labels
[]labels.Set{
{"a": "c"},
{"foo": "bar"},
},
// matching fields
[]fields.Set{
{"status.replicas": "0"},
{"metadata.name": "foo"},
{"status.replicas": "0", "metadata.name": "foo"},
},
// not matchin fields
[]fields.Set{
{"status.replicas": "10"},
{"metadata.name": "bar"},
{"name": "foo"},
{"status.replicas": "10", "metadata.name": "foo"},
{"status.replicas": "0", "metadata.name": "bar"},
},
registrytest.WatchActions,
)
}
func TestEtcdDeleteController(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
key, _ := storage.KeyFunc(ctx, validController.Name)
key = etcdtest.AddPrefix(key)
fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), validNewController()), 0)
fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), validController), 0)
obj, err := storage.Delete(ctx, validController.Name, nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -243,196 +276,6 @@ func TestEtcdDeleteController(t *testing.T) {
}
}
func TestEtcdWatchController(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
watching, err := storage.Watch(ctx,
labels.Everything(),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
func TestEtcdWatchControllersMatch(t *testing.T) {
ctx := api.WithNamespace(api.NewDefaultContext(), validController.Namespace)
storage, fakeClient := newStorage(t)
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
watching, err := storage.Watch(ctx,
labels.SelectorFromSet(validController.Spec.Selector),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
// The watcher above is waiting for these Labels, on receiving them it should
// apply the ControllerStatus decorator, which lists pods, causing a query against
// the /registry/pods endpoint of the etcd client.
controller := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: validController.Spec.Selector,
Namespace: "default",
},
}
controllerBytes, _ := testapi.Codec().Encode(controller)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(controllerBytes),
},
}
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Error("unexpected timeout from result channel")
}
watching.Stop()
}
func TestEtcdWatchControllersFields(t *testing.T) {
ctx := api.WithNamespace(api.NewDefaultContext(), validController.Namespace)
storage, fakeClient := newStorage(t)
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
testFieldMap := map[int][]fields.Set{
PASS: {
{"status.replicas": "0"},
{"metadata.name": "foo"},
{"status.replicas": "0", "metadata.name": "foo"},
},
FAIL: {
{"status.replicas": "10"},
{"metadata.name": "bar"},
{"name": "foo"},
{"status.replicas": "10", "metadata.name": "foo"},
{"status.replicas": "0", "metadata.name": "bar"},
},
}
testEtcdActions := []string{
etcdstorage.EtcdCreate,
etcdstorage.EtcdSet,
etcdstorage.EtcdCAS,
etcdstorage.EtcdDelete}
controller := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: validController.Spec.Selector,
Namespace: "default",
},
Status: api.ReplicationControllerStatus{
Replicas: 0,
},
}
controllerBytes, _ := testapi.Codec().Encode(controller)
for expectedResult, fieldSet := range testFieldMap {
for _, field := range fieldSet {
for _, action := range testEtcdActions {
watching, err := storage.Watch(ctx,
labels.Everything(),
field.AsSelector(),
"1",
)
var prevNode *etcd.Node = nil
node := &etcd.Node{
Value: string(controllerBytes),
}
if action == etcdstorage.EtcdDelete {
prevNode = node
}
fakeClient.WaitForWatchCompletion()
fakeClient.WatchResponse <- &etcd.Response{
Action: action,
Node: node,
PrevNode: prevNode,
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
select {
case r, ok := <-watching.ResultChan():
if expectedResult == FAIL {
t.Errorf("Unexpected result from channel %#v", r)
}
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
if expectedResult == PASS {
t.Error("unexpected timeout from result channel")
}
}
watching.Stop()
}
}
}
}
func TestEtcdWatchControllersNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
watching, err := storage.Watch(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
controller := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Labels: map[string]string{
"name": "bar",
},
},
}
controllerBytes, _ := testapi.Codec().Encode(controller)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(controllerBytes),
},
}
select {
case <-watching.ResultChan():
t.Error("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
}
func TestDelete(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)

View File

@ -18,7 +18,6 @@ package etcd
import (
"testing"
"time"
"github.com/coreos/go-etcd/etcd"
"k8s.io/kubernetes/pkg/api"
@ -27,19 +26,12 @@ import (
"k8s.io/kubernetes/pkg/expapi"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/tools/etcdtest"
)
const (
PASS = iota
FAIL
)
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t)
return NewREST(etcdStorage), fakeClient
@ -84,16 +76,16 @@ func validNewDaemon() *expapi.Daemon {
}
}
var validDaemon = *validNewDaemon()
var validDaemon = validNewDaemon()
func TestCreate(t *testing.T) {
storage, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError)
controller := validNewDaemon()
controller.ObjectMeta = api.ObjectMeta{}
daemon := validNewDaemon()
daemon.ObjectMeta = api.ObjectMeta{}
test.TestCreate(
// valid
controller,
daemon,
func(ctx api.Context, obj runtime.Object) error {
return registrytest.SetObject(fakeClient, storage.KeyFunc, ctx, obj)
},
@ -175,13 +167,49 @@ func TestEtcdListControllers(t *testing.T) {
})
}
func TestWatchControllers(t *testing.T) {
storage, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError)
test.TestWatch(
validDaemon,
func() {
fakeClient.WaitForWatchCompletion()
},
func(err error) {
fakeClient.WatchInjectError <- err
},
func(obj runtime.Object, action string) error {
return registrytest.EmitObject(fakeClient, obj, action)
},
// matching labels
[]labels.Set{
{"a": "b"},
},
// not matching labels
[]labels.Set{
{"a": "c"},
{"foo": "bar"},
},
// matching fields
[]fields.Set{
{"metadata.name": "foo"},
},
// notmatching fields
[]fields.Set{
{"metadata.name": "bar"},
{"name": "foo"},
},
registrytest.WatchActions,
)
}
func TestEtcdDeleteController(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
key, err := storage.KeyFunc(ctx, validDaemon.Name)
key = etcdtest.AddPrefix(key)
fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), validNewDaemon()), 0)
fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), validDaemon), 0)
obj, err := storage.Delete(ctx, validDaemon.Name, nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -199,195 +227,6 @@ func TestEtcdDeleteController(t *testing.T) {
}
}
func TestEtcdWatchController(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
watching, err := storage.Watch(ctx,
labels.Everything(),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
// Tests that we can watch for the creation of daemon controllers with specified labels.
func TestEtcdWatchControllersMatch(t *testing.T) {
ctx := api.WithNamespace(api.NewDefaultContext(), validDaemon.Namespace)
storage, fakeClient := newStorage(t)
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
watching, err := storage.Watch(ctx,
labels.SelectorFromSet(validDaemon.Spec.Selector),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
// The watcher above is waiting for these Labels, on receiving them it should
// apply the ControllerStatus decorator, which lists pods, causing a query against
// the /registry/pods endpoint of the etcd client.
controller := &expapi.Daemon{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: validDaemon.Spec.Selector,
Namespace: "default",
},
}
controllerBytes, _ := testapi.Codec().Encode(controller)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(controllerBytes),
},
}
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Error("unexpected timeout from result channel")
}
watching.Stop()
}
// Tests that we can watch for daemon controllers with specified fields.
func TestEtcdWatchControllersFields(t *testing.T) {
ctx := api.WithNamespace(api.NewDefaultContext(), validDaemon.Namespace)
storage, fakeClient := newStorage(t)
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
testFieldMap := map[int][]fields.Set{
PASS: {
{"metadata.name": "foo"},
},
FAIL: {
{"metadata.name": "bar"},
{"name": "foo"},
},
}
testEtcdActions := []string{
etcdstorage.EtcdCreate,
etcdstorage.EtcdSet,
etcdstorage.EtcdCAS,
etcdstorage.EtcdDelete}
controller := &expapi.Daemon{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: validDaemon.Spec.Selector,
Namespace: "default",
},
Status: expapi.DaemonStatus{
CurrentNumberScheduled: 2,
NumberMisscheduled: 1,
DesiredNumberScheduled: 4,
},
}
controllerBytes, _ := testapi.Codec().Encode(controller)
for expectedResult, fieldSet := range testFieldMap {
for _, field := range fieldSet {
for _, action := range testEtcdActions {
watching, err := storage.Watch(ctx,
labels.Everything(),
field.AsSelector(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var prevNode *etcd.Node = nil
node := &etcd.Node{
Value: string(controllerBytes),
}
if action == etcdstorage.EtcdDelete {
prevNode = node
}
fakeClient.WaitForWatchCompletion()
fakeClient.WatchResponse <- &etcd.Response{
Action: action,
Node: node,
PrevNode: prevNode,
}
select {
case r, ok := <-watching.ResultChan():
if expectedResult == FAIL {
t.Errorf("Unexpected result from channel %#v", r)
}
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
if expectedResult == PASS {
t.Error("unexpected timeout from result channel")
}
}
watching.Stop()
}
}
}
}
func TestEtcdWatchControllersNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
watching, err := storage.Watch(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
controller := &expapi.Daemon{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Labels: map[string]string{
"name": "bar",
},
},
}
controllerBytes, _ := testapi.Codec().Encode(controller)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(controllerBytes),
},
}
select {
case <-watching.ResultChan():
t.Error("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
}
func TestDelete(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)

View File

@ -19,7 +19,6 @@ package etcd
import (
"net/http"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
@ -169,6 +168,41 @@ func TestEtcdListNodes(t *testing.T) {
})
}
func TestWatchNodes(t *testing.T) {
storage, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError).ClusterScope()
test.TestWatch(
validNewNode(),
func() {
fakeClient.WaitForWatchCompletion()
},
func(err error) {
fakeClient.WatchInjectError <- err
},
func(obj runtime.Object, action string) error {
return registrytest.EmitObject(fakeClient, obj, action)
},
// matching labels
[]labels.Set{
{"name": "foo"},
},
// not matching labels
[]labels.Set{
{"name": "bar"},
{"foo": "bar"},
},
// matching fields
[]fields.Set{
{"metadata.name": "foo"},
},
// not matchin fields
[]fields.Set{
{"metadata.name": "bar"},
},
registrytest.WatchActions,
)
}
func TestEtcdDeleteNode(t *testing.T) {
ctx := api.NewContext()
storage, fakeClient := newStorage(t)
@ -188,94 +222,3 @@ func TestEtcdDeleteNode(t *testing.T) {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
}
func TestEtcdWatchNode(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
watching, err := storage.Watch(ctx,
labels.Everything(),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
func TestEtcdWatchNodesMatch(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
node := validNewNode()
watching, err := storage.Watch(ctx,
labels.SelectorFromSet(labels.Set{"name": node.Name}),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
nodeBytes, _ := testapi.Codec().Encode(node)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(nodeBytes),
},
}
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Error("unexpected timeout from result channel")
}
watching.Stop()
}
func TestEtcdWatchNodesNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
node := validNewNode()
watching, err := storage.Watch(ctx,
labels.SelectorFromSet(labels.Set{"name": "bar"}),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
nodeBytes, _ := testapi.Codec().Encode(node)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(nodeBytes),
},
}
select {
case <-watching.ResultChan():
t.Error("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
}

View File

@ -20,7 +20,6 @@ import (
"fmt"
"strings"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
@ -376,17 +375,15 @@ func TestDeletePod(t *testing.T) {
func TestEtcdGet(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError)
pod := validNewPod()
test.TestGet(pod)
test.TestGet(validNewPod())
}
func TestEtcdList(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError)
key := etcdtest.AddPrefix(storage.Etcd.KeyRootFunc(test.TestContext()))
pod := validNewPod()
test.TestList(
pod,
validNewPod(),
func(objects []runtime.Object) []runtime.Object {
return registrytest.SetObjectsForKey(fakeClient, key, objects)
},
@ -395,6 +392,38 @@ func TestEtcdList(t *testing.T) {
})
}
func TestEtcdWatch(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError)
test.TestWatch(
validNewPod(),
func() {
fakeClient.WaitForWatchCompletion()
},
func(err error) {
fakeClient.WatchInjectError <- err
},
func(obj runtime.Object, action string) error {
return registrytest.EmitObject(fakeClient, obj, action)
},
// matching labels
[]labels.Set{},
// not matching labels
[]labels.Set{
{"foo": "bar"},
},
// matching fields
[]fields.Set{
{"metadata.name": "foo"},
},
// not matchin fields
[]fields.Set{
{"metadata.name": "bar"},
},
registrytest.WatchActions,
)
}
func TestEtcdCreate(t *testing.T) {
storage, bindingStorage, _, fakeClient := newStorage(t)
ctx := api.NewDefaultContext()
@ -893,107 +922,3 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
}
func TestEtcdWatchPods(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
ctx := api.NewDefaultContext()
watching, err := storage.Watch(ctx,
labels.Everything(),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
func TestEtcdWatchPodsMatch(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
ctx := api.NewDefaultContext()
watching, err := storage.Watch(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: "default",
Labels: map[string]string{
"name": "foo",
},
},
}
podBytes, _ := testapi.Codec().Encode(pod)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(podBytes),
},
}
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Error("unexpected timeout from result channel")
}
watching.Stop()
}
func TestEtcdWatchPodsNotMatch(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
ctx := api.NewDefaultContext()
watching, err := storage.Watch(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Labels: map[string]string{
"name": "bar",
},
},
}
podBytes, _ := testapi.Codec().Encode(pod)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(podBytes),
},
}
select {
case <-watching.ResultChan():
t.Error("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
}

View File

@ -37,9 +37,31 @@ func NewEtcdStorage(t *testing.T) (storage.Interface, *tools.FakeEtcdClient) {
return etcdStorage, fakeClient
}
var WatchActions = []string{etcdstorage.EtcdCreate, etcdstorage.EtcdSet, etcdstorage.EtcdCAS, etcdstorage.EtcdDelete}
type keyFunc func(api.Context, string) (string, error)
type newFunc func() runtime.Object
func EmitObject(fakeClient *tools.FakeEtcdClient, obj runtime.Object, action string) error {
encoded, err := testapi.Codec().Encode(obj)
if err != nil {
return err
}
node := &etcd.Node{
Value: string(encoded),
}
var prevNode *etcd.Node = nil
if action == etcdstorage.EtcdDelete {
prevNode = node
}
fakeClient.WatchResponse <- &etcd.Response{
Action: action,
Node: node,
PrevNode: prevNode,
}
return nil
}
func GetObject(fakeClient *tools.FakeEtcdClient, keyFn keyFunc, newFn newFunc, ctx api.Context, obj runtime.Object) (runtime.Object, error) {
meta, err := api.ObjectMetaFor(obj)
if err != nil {

View File

@ -19,7 +19,6 @@ package etcd
import (
"fmt"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
@ -181,17 +180,15 @@ func TestDeleteResourceQuota(t *testing.T) {
func TestEtcdGet(t *testing.T) {
storage, _, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError)
resourcequota := validNewResourceQuota()
test.TestGet(resourcequota)
test.TestGet(validNewResourceQuota())
}
func TestEtcdList(t *testing.T) {
storage, _, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError)
key := etcdtest.AddPrefix(storage.Etcd.KeyRootFunc(test.TestContext()))
resourcequota := validNewResourceQuota()
test.TestList(
resourcequota,
validNewResourceQuota(),
func(objects []runtime.Object) []runtime.Object {
return registrytest.SetObjectsForKey(fakeClient, key, objects)
},
@ -200,6 +197,38 @@ func TestEtcdList(t *testing.T) {
})
}
func TestEtcdWatch(t *testing.T) {
storage, _, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError)
test.TestWatch(
validNewResourceQuota(),
func() {
fakeClient.WaitForWatchCompletion()
},
func(err error) {
fakeClient.WatchInjectError <- err
},
func(obj runtime.Object, action string) error {
return registrytest.EmitObject(fakeClient, obj, action)
},
// matching labels
[]labels.Set{},
// not matching labels
[]labels.Set{
{"foo": "bar"},
},
// matching fields
[]fields.Set{
{"metadata.name": "foo"},
},
// not matchin fields
[]fields.Set{
{"metadata.name": "bar"},
},
registrytest.WatchActions,
)
}
func TestEtcdUpdateStatus(t *testing.T) {
storage, status, fakeClient := newStorage(t)
ctx := api.NewDefaultContext()
@ -249,106 +278,3 @@ func TestEtcdUpdateStatus(t *testing.T) {
t.Errorf("unexpected object: %s", util.ObjectDiff(&expected, rqOut))
}
}
func TestEtcdWatchResourceQuotas(t *testing.T) {
storage, _, fakeClient := newStorage(t)
ctx := api.NewDefaultContext()
watching, err := storage.Watch(ctx,
labels.Everything(),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
func TestEtcdWatchResourceQuotasMatch(t *testing.T) {
storage, _, fakeClient := newStorage(t)
ctx := api.NewDefaultContext()
watching, err := storage.Watch(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
resourcequota := &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: map[string]string{
"name": "foo",
},
},
}
resourcequotaBytes, _ := testapi.Codec().Encode(resourcequota)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(resourcequotaBytes),
},
}
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Error("unexpected timeout from result channel")
}
watching.Stop()
}
func TestEtcdWatchResourceQuotasNotMatch(t *testing.T) {
storage, _, fakeClient := newStorage(t)
ctx := api.NewDefaultContext()
watching, err := storage.Watch(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
resourcequota := &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Labels: map[string]string{
"name": "bar",
},
},
}
resourcequotaBytes, _ := testapi.Codec().Encode(resourcequota)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(resourcequotaBytes),
},
}
select {
case <-watching.ResultChan():
t.Error("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
}

View File

@ -106,9 +106,10 @@ func MatchResourceQuota(label labels.Selector, field fields.Selector) generic.Ma
}
// ResourceQuotaToSelectableFields returns a label set that represents the object
// TODO: fields are not labels, and the validation rules for them do not apply.
func ResourceQuotaToSelectableFields(resourcequota *api.ResourceQuota) labels.Set {
return labels.Set{
"metadata.name": resourcequota.Name,
// Having "name" is a bug, but it must be supported for v1 API backward compatibility.
"name": resourcequota.Name,
}
}

View File

@ -21,6 +21,8 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/rest/resttest"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/tools"
@ -119,3 +121,35 @@ func TestUpdate(t *testing.T) {
},
)
}
func TestWatch(t *testing.T) {
storage, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError)
test.TestWatch(
validService(),
func() {
fakeClient.WaitForWatchCompletion()
},
func(err error) {
fakeClient.WatchInjectError <- err
},
func(obj runtime.Object, action string) error {
return registrytest.EmitObject(fakeClient, obj, action)
},
// matching labels
[]labels.Set{},
// not matching labels
[]labels.Set{
{"foo": "bar"},
},
// matching fields
[]fields.Set{
{"metadata.name": "foo"},
},
// not matchin fields
[]fields.Set{
{"metadata.name": "bar"},
},
registrytest.WatchActions,
)
}