k3s/pkg/storage/etcd/etcd_watcher_test.go

465 lines
13 KiB
Go
Raw Normal View History

2014-08-25 00:29:30 +00:00
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
2014-08-25 00:29:30 +00:00
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
2015-07-30 11:27:18 +00:00
package etcd
2014-08-25 00:29:30 +00:00
import (
rt "runtime"
2014-08-25 00:29:30 +00:00
"testing"
2015-08-05 22:05:17 +00:00
"github.com/coreos/go-etcd/etcd"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/tools/etcdtest"
"k8s.io/kubernetes/pkg/watch"
"golang.org/x/net/context"
2014-08-25 00:29:30 +00:00
)
var versioner = APIObjectVersioner{}
// Implements etcdCache interface as empty methods (i.e. does not cache any objects)
type fakeEtcdCache struct{}
2015-10-14 11:17:00 +00:00
func (f *fakeEtcdCache) getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) {
return nil, false
}
func (f *fakeEtcdCache) addToCache(index uint64, obj runtime.Object) {
}
var _ etcdCache = &fakeEtcdCache{}
2014-08-25 00:29:30 +00:00
func TestWatchInterpretations(t *testing.T) {
codec := testapi.Default.Codec()
2014-08-25 00:29:30 +00:00
// Declare some pods to make the test cases compact.
podFoo := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
podBar := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}
podBaz := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "baz"}}
2014-09-08 04:14:18 +00:00
firstLetterIsB := func(obj runtime.Object) bool {
2014-10-22 17:02:02 +00:00
return obj.(*api.Pod).Name[0] == 'b'
2014-08-25 00:29:30 +00:00
}
// All of these test cases will be run with the firstLetterIsB FilterFunc.
table := map[string]struct {
actions []string // Run this test item for every action here.
prevNodeValue string
nodeValue string
expectEmit bool
expectType watch.EventType
2014-09-08 04:14:18 +00:00
expectObject runtime.Object
2014-08-25 00:29:30 +00:00
}{
"create": {
actions: []string{"create", "get"},
nodeValue: runtime.EncodeOrDie(codec, podBar),
2014-08-25 00:29:30 +00:00
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"create but filter blocks": {
actions: []string{"create", "get"},
nodeValue: runtime.EncodeOrDie(codec, podFoo),
2014-08-25 00:29:30 +00:00
expectEmit: false,
},
"delete": {
actions: []string{"delete"},
prevNodeValue: runtime.EncodeOrDie(codec, podBar),
2014-08-25 00:29:30 +00:00
expectEmit: true,
expectType: watch.Deleted,
expectObject: podBar,
},
"delete but filter blocks": {
actions: []string{"delete"},
nodeValue: runtime.EncodeOrDie(codec, podFoo),
2014-08-25 00:29:30 +00:00
expectEmit: false,
},
"modify appears to create 1": {
actions: []string{"set", "compareAndSwap"},
nodeValue: runtime.EncodeOrDie(codec, podBar),
2014-08-25 00:29:30 +00:00
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"modify appears to create 2": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: runtime.EncodeOrDie(codec, podFoo),
nodeValue: runtime.EncodeOrDie(codec, podBar),
2014-08-25 00:29:30 +00:00
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"modify appears to delete": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: runtime.EncodeOrDie(codec, podBar),
nodeValue: runtime.EncodeOrDie(codec, podFoo),
2014-08-25 00:29:30 +00:00
expectEmit: true,
expectType: watch.Deleted,
expectObject: podBar, // Should return last state that passed the filter!
},
"modify modifies": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: runtime.EncodeOrDie(codec, podBar),
nodeValue: runtime.EncodeOrDie(codec, podBaz),
2014-08-25 00:29:30 +00:00
expectEmit: true,
expectType: watch.Modified,
expectObject: podBaz,
},
"modify ignores": {
actions: []string{"set", "compareAndSwap"},
nodeValue: runtime.EncodeOrDie(codec, podFoo),
2014-08-25 00:29:30 +00:00
expectEmit: false,
},
}
for name, item := range table {
for _, action := range item.actions {
w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{})
2014-08-25 00:29:30 +00:00
emitCalled := false
w.emit = func(event watch.Event) {
emitCalled = true
if !item.expectEmit {
return
}
if e, a := item.expectType, event.Type; e != a {
t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
}
if e, a := item.expectObject, event.Object; !api.Semantic.DeepDerivative(e, a) {
2014-08-25 00:29:30 +00:00
t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
}
}
var n, pn *etcd.Node
if item.nodeValue != "" {
n = &etcd.Node{Value: item.nodeValue}
}
if item.prevNodeValue != "" {
pn = &etcd.Node{Value: item.prevNodeValue}
}
w.sendResult(&etcd.Response{
Action: action,
Node: n,
PrevNode: pn,
})
if e, a := item.expectEmit, emitCalled; e != a {
t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
}
w.Stop()
}
}
}
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
2015-07-30 07:27:38 +00:00
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
2014-08-25 00:29:30 +00:00
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: "update",
})
w.Stop()
}
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
2015-07-30 07:27:38 +00:00
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
2014-08-25 00:29:30 +00:00
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: action,
})
w.Stop()
}
}
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
2015-07-30 07:27:38 +00:00
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
2014-08-25 00:29:30 +00:00
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: action,
Node: &etcd.Node{
Value: "foobar",
},
})
w.sendResult(&etcd.Response{
Action: action,
PrevNode: &etcd.Node{
Value: "foobar",
},
})
w.Stop()
}
}
/* TODO: So believe it or not... but this test is flakey with the go-etcd client library
* which I'm surprised by. Apprently you can close the client that is performing the watch
* and the watch *never returns.* I would like to still keep this test here and re-enable
* with the new 2.2+ client library.
func TestWatchEtcdError(t *testing.T) {
codec := testapi.Default.Codec()
server := NewEtcdTestClientServer(t)
h := newEtcdHelper(server.client, codec, etcdtest.PathPrefix())
watching, err := h.Watch(context.TODO(), "/some/key", 4, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
server.Terminate(t)
got := <-watching.ResultChan()
2014-09-22 23:12:32 +00:00
if got.Type != watch.Error {
t.Fatalf("Unexpected non-error")
}
watching.Stop()
} */
2014-08-25 00:29:30 +00:00
func TestWatch(t *testing.T) {
codec := testapi.Default.Codec()
server := NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
h := newEtcdHelper(server.client, codec, etcdtest.PathPrefix())
2014-08-25 00:29:30 +00:00
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
2014-08-25 00:29:30 +00:00
// Test normal case
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
returnObj := &api.Pod{}
err = h.Set(context.TODO(), key, pod, returnObj, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
2014-08-25 00:29:30 +00:00
}
event := <-watching.ResultChan()
if e, a := watch.Added, event.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
2014-08-25 00:29:30 +00:00
t.Errorf("Expected %v, got %v", e, a)
}
watching.Stop()
2014-09-22 23:12:32 +00:00
2014-08-25 00:29:30 +00:00
if _, open := <-watching.ResultChan(); open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
}
func emptySubsets() []api.EndpointSubset {
return []api.EndpointSubset{}
}
func makeSubsets(ip string, port int) []api.EndpointSubset {
return []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: ip}},
Ports: []api.EndpointPort{{Port: port}},
}}
}
func TestWatchEtcdState(t *testing.T) {
codec := testapi.Default.Codec()
key := etcdtest.AddPrefix("/somekey/foo")
server := NewEtcdTestClientServer(t)
defer server.Terminate(t)
h := newEtcdHelper(server.client, codec, etcdtest.PathPrefix())
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
endpoint := &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Subsets: emptySubsets(),
}
err = h.Set(context.TODO(), key, endpoint, endpoint, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event := <-watching.ResultChan()
if event.Type != watch.Added {
t.Errorf("Unexpected event %#v", event)
}
subset := makeSubsets("127.0.0.1", 9000)
endpoint.Subsets = subset
// CAS the previous value
err = h.Set(context.TODO(), key, endpoint, endpoint, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event = <-watching.ResultChan()
if event.Type != watch.Modified {
t.Errorf("Unexpected event %#v", event)
}
if e, a := endpoint, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", e, a)
}
watching.Stop()
}
2014-08-25 00:29:30 +00:00
func TestWatchFromZeroIndex(t *testing.T) {
codec := testapi.Default.Codec()
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
2014-08-25 00:29:30 +00:00
key := etcdtest.AddPrefix("/somekey/foo")
server := NewEtcdTestClientServer(t)
defer server.Terminate(t)
2014-08-25 00:29:30 +00:00
h := newEtcdHelper(server.client, codec, etcdtest.PathPrefix())
2014-08-25 00:29:30 +00:00
// set before the watch and verify events
err := h.Set(context.TODO(), key, pod, pod, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
2014-08-25 00:29:30 +00:00
// check for concatenation on watch event with CAS
pod.Name = "bar"
err = h.Set(context.TODO(), key, pod, pod, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
2014-08-25 00:29:30 +00:00
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
2014-08-25 00:29:30 +00:00
}
// marked as modified b/c of concatenation
event := <-watching.ResultChan()
if event.Type != watch.Modified {
t.Errorf("Unexpected event %#v", event)
2014-08-25 00:29:30 +00:00
}
err = h.Set(context.TODO(), key, pod, pod, 0)
2014-08-25 00:29:30 +00:00
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event = <-watching.ResultChan()
if event.Type != watch.Modified {
t.Errorf("Unexpected event %#v", event)
2014-08-25 00:29:30 +00:00
}
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", e, a)
2014-08-25 00:29:30 +00:00
}
watching.Stop()
}
func TestWatchListFromZeroIndex(t *testing.T) {
codec := testapi.Default.Codec()
key := etcdtest.AddPrefix("/some/key")
server := NewEtcdTestClientServer(t)
defer server.Terminate(t)
h := newEtcdHelper(server.client, codec, key)
watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// creates key/foo which should trigger the WatchList for "key"
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
err = h.Create(context.TODO(), pod.Name, pod, pod, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event, _ := <-watching.ResultChan()
if event.Type != watch.Added {
t.Errorf("Unexpected event %#v", event)
}
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", e, a)
}
watching.Stop()
}
func TestWatchListIgnoresRootKey(t *testing.T) {
codec := testapi.Default.Codec()
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
key := etcdtest.AddPrefix("/some/key")
server := NewEtcdTestClientServer(t)
defer server.Terminate(t)
h := newEtcdHelper(server.client, codec, key)
2014-08-25 00:29:30 +00:00
watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
2014-08-25 00:29:30 +00:00
// creates key/foo which should trigger the WatchList for "key"
err = h.Create(context.TODO(), key, pod, pod, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
2014-09-22 23:12:32 +00:00
// force context switch to ensure watches would catch and notify.
rt.Gosched()
2014-08-25 00:29:30 +00:00
select {
case event, _ := <-watching.ResultChan():
t.Fatalf("Unexpected event: %#v", event)
default:
// fall through, expected behavior
2014-08-25 00:29:30 +00:00
}
watching.Stop()
2014-08-25 00:29:30 +00:00
}
func TestWatchPurposefulShutdown(t *testing.T) {
server := NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
h := newEtcdHelper(server.client, codec, etcdtest.PathPrefix())
2014-08-25 00:29:30 +00:00
// Test purposeful shutdown
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
2014-08-25 00:29:30 +00:00
watching.Stop()
rt.Gosched()
2014-08-25 00:29:30 +00:00
if _, open := <-watching.ResultChan(); open {
t.Errorf("Channel should be closed")
2014-08-25 00:29:30 +00:00
}
}