k3s/pkg/storage/cacher_test.go

344 lines
11 KiB
Go
Raw Normal View History

/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage_test
import (
2016-02-20 01:45:02 +00:00
"fmt"
"reflect"
2015-11-02 13:51:56 +00:00
"strconv"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/testapi"
apitesting "k8s.io/kubernetes/pkg/api/testing"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"golang.org/x/net/context"
)
func newEtcdTestStorage(t *testing.T, codec runtime.Codec, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
server := etcdtesting.NewEtcdTestClientServer(t)
2016-01-26 06:17:11 +00:00
storage := etcdstorage.NewEtcdStorage(server.Client, codec, prefix, false)
return server, storage
}
2015-11-02 13:51:56 +00:00
func newTestCacher(s storage.Interface) *storage.Cacher {
prefix := "pods"
config := storage.CacherConfig{
CacheCapacity: 10,
2015-11-02 13:51:56 +00:00
Storage: s,
Versioner: etcdstorage.APIObjectVersioner{},
Type: &api.Pod{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
NewListFunc: func() runtime.Object { return &api.PodList{} },
}
2015-10-30 09:17:09 +00:00
return storage.NewCacherFromConfig(config)
}
func makeTestPod(name string) *api.Pod {
return &api.Pod{
ObjectMeta: api.ObjectMeta{Namespace: "ns", Name: name},
Spec: apitesting.DeepEqualSafePodSpec(),
}
}
2015-11-02 13:51:56 +00:00
func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod {
key := etcdtest.AddPrefix("pods/ns/" + obj.Name)
result := &api.Pod{}
if old == nil {
if err := s.Create(context.TODO(), key, obj, result, 0); err != nil {
t.Errorf("unexpected error: %v", err)
}
} else {
// To force "update" behavior of Set() we need to set ResourceVersion of
// previous version of object.
obj.ResourceVersion = old.ResourceVersion
if err := s.Set(context.TODO(), key, obj, result, 0); err != nil {
t.Errorf("unexpected error: %v", err)
}
obj.ResourceVersion = ""
}
return result
}
func TestList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
2015-11-02 13:51:56 +00:00
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage)
2015-12-28 09:35:12 +00:00
defer cacher.Stop()
podFoo := makeTestPod("foo")
podBar := makeTestPod("bar")
podBaz := makeTestPod("baz")
podFooPrime := makeTestPod("foo")
podFooPrime.Spec.NodeName = "fakeNode"
2015-11-02 13:51:56 +00:00
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
_ = updatePod(t, etcdStorage, podBar, nil)
_ = updatePod(t, etcdStorage, podBaz, nil)
_ = updatePod(t, etcdStorage, podFooPrime, fooCreated)
2015-11-02 13:51:56 +00:00
deleted := api.Pod{}
if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/bar"), &deleted); err != nil {
t.Errorf("Unexpected error: %v", err)
}
2016-02-18 08:56:08 +00:00
// We first List directly from etcd by passing empty resourceVersion,
// to get the current etcd resourceVersion.
rvResult := &api.PodList{}
if err := cacher.List(context.TODO(), "pods/ns", "", storage.Everything, rvResult); err != nil {
t.Errorf("Unexpected error: %v", err)
}
deletedPodRV := rvResult.ListMeta.ResourceVersion
result := &api.PodList{}
2016-02-18 08:56:08 +00:00
// We pass the current etcd ResourceVersion received from the above List() operation,
// since there is not easy way to get ResourceVersion of barPod deletion operation.
if err := cacher.List(context.TODO(), "pods/ns", deletedPodRV, storage.Everything, result); err != nil {
2015-11-02 13:51:56 +00:00
t.Errorf("Unexpected error: %v", err)
}
2016-02-18 08:56:08 +00:00
if result.ListMeta.ResourceVersion != deletedPodRV {
2015-11-02 13:51:56 +00:00
t.Errorf("Incorrect resource version: %v", result.ListMeta.ResourceVersion)
}
if len(result.Items) != 2 {
2015-11-02 13:51:56 +00:00
t.Errorf("Unexpected list result: %d", len(result.Items))
}
keys := sets.String{}
for _, item := range result.Items {
2015-11-02 13:51:56 +00:00
keys.Insert(item.Name)
}
if !keys.HasAll("foo", "baz") {
2015-11-02 13:51:56 +00:00
t.Errorf("Unexpected list result: %#v", result)
}
for _, item := range result.Items {
// unset fields that are set by the infrastructure
2015-11-02 13:51:56 +00:00
item.ResourceVersion = ""
item.CreationTimestamp = unversioned.Time{}
var expected *api.Pod
2015-11-02 13:51:56 +00:00
switch item.Name {
case "foo":
expected = podFooPrime
case "baz":
expected = podBaz
default:
2015-11-02 13:51:56 +00:00
t.Errorf("Unexpected item: %v", item)
}
if e, a := *expected, item; !reflect.DeepEqual(e, a) {
2015-11-02 13:51:56 +00:00
t.Errorf("Expected: %#v, got: %#v", e, a)
}
}
2015-11-02 13:51:56 +00:00
}
2015-11-02 13:51:56 +00:00
func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
select {
case event := <-w.ResultChan():
if e, a := eventType, event.Type; e != a {
t.Errorf("Expected: %s, got: %s", eventType, event.Type)
}
if e, a := eventObject, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a)
}
case <-time.After(wait.ForeverTestTimeout):
2015-11-02 13:51:56 +00:00
t.Errorf("Timed out waiting for an event")
}
}
2016-02-20 01:45:02 +00:00
type injectListError struct {
errors int
storage.Interface
}
func (self *injectListError) List(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc, listObj runtime.Object) error {
if self.errors > 0 {
self.errors--
return fmt.Errorf("injected error")
}
return self.Interface.List(ctx, key, resourceVersion, filter, listObj)
}
func TestWatch(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
2016-02-20 01:45:02 +00:00
// Inject one list error to make sure we test the relist case.
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
2015-11-02 13:51:56 +00:00
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage)
2015-12-28 09:35:12 +00:00
defer cacher.Stop()
podFoo := makeTestPod("foo")
podBar := makeTestPod("bar")
2015-11-02 13:51:56 +00:00
podFooPrime := makeTestPod("foo")
podFooPrime.Spec.NodeName = "fakeNode"
podFooBis := makeTestPod("foo")
podFooBis.Spec.NodeName = "anotherFakeNode"
2015-12-28 14:28:07 +00:00
// initialVersion is used to initate the watcher at the beginning of the world,
// which is not defined precisely in etcd.
initialVersion, err := cacher.LastSyncResourceVersion()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
startVersion := strconv.Itoa(int(initialVersion))
// Set up Watch for object "podFoo".
2015-12-28 14:28:07 +00:00
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything)
if err != nil {
2015-11-02 13:51:56 +00:00
t.Fatalf("Unexpected error: %v", err)
}
2015-12-28 09:35:12 +00:00
defer watcher.Stop()
2015-11-02 13:51:56 +00:00
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
_ = updatePod(t, etcdStorage, podBar, nil)
fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated)
verifyWatchEvent(t, watcher, watch.Added, podFoo)
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
// Check whether we get too-old error.
_, err = cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything)
if err == nil {
2015-11-02 13:51:56 +00:00
t.Errorf("Expected 'error too old' error")
}
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything)
2015-11-02 13:51:56 +00:00
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
2015-12-28 09:35:12 +00:00
defer initialWatcher.Stop()
2015-11-02 13:51:56 +00:00
verifyWatchEvent(t, initialWatcher, watch.Added, podFoo)
verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
2015-09-14 08:36:13 +00:00
// Now test watch from "now".
nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything)
2015-09-14 08:36:13 +00:00
if err != nil {
2015-11-02 13:51:56 +00:00
t.Fatalf("Unexpected error: %v", err)
2015-09-14 08:36:13 +00:00
}
2015-12-28 09:35:12 +00:00
defer nowWatcher.Stop()
2015-09-14 08:36:13 +00:00
2015-11-02 13:51:56 +00:00
verifyWatchEvent(t, nowWatcher, watch.Added, podFooPrime)
_ = updatePod(t, etcdStorage, podFooBis, fooUpdated)
verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis)
}
2015-11-06 12:40:21 +00:00
func TestWatcherTimeout(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage)
2015-12-28 09:35:12 +00:00
defer cacher.Stop()
2015-11-06 12:40:21 +00:00
2015-12-28 14:28:07 +00:00
// initialVersion is used to initate the watcher at the beginning of the world,
// which is not defined precisely in etcd.
initialVersion, err := cacher.LastSyncResourceVersion()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
startVersion := strconv.Itoa(int(initialVersion))
2015-11-06 12:40:21 +00:00
// Create a watcher that will not be reading any result.
2015-12-28 14:28:07 +00:00
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
2015-11-06 12:40:21 +00:00
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
// Create a second watcher that will be reading result.
2015-12-28 14:28:07 +00:00
readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
2015-11-06 12:40:21 +00:00
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer readingWatcher.Stop()
for i := 1; i <= 22; i++ {
pod := makeTestPod(strconv.Itoa(i))
_ = updatePod(t, etcdStorage, pod, nil)
verifyWatchEvent(t, readingWatcher, watch.Added, pod)
}
}
func TestFiltering(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
2015-11-02 13:51:56 +00:00
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage)
2015-12-28 09:35:12 +00:00
defer cacher.Stop()
2015-12-31 06:53:41 +00:00
// Ensure that the cacher is initialized, before creating any pods,
// so that we are sure that all events will be present in cacher.
syncWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
syncWatcher.Stop()
podFoo := makeTestPod("foo")
2015-11-02 13:51:56 +00:00
podFoo.Labels = map[string]string{"filter": "foo"}
podFooFiltered := makeTestPod("foo")
2015-11-02 13:51:56 +00:00
podFooPrime := makeTestPod("foo")
podFooPrime.Labels = map[string]string{"filter": "foo"}
podFooPrime.Spec.NodeName = "fakeNode"
2015-11-02 13:51:56 +00:00
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated)
fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered)
_ = updatePod(t, etcdStorage, podFooPrime, fooUnfiltered)
deleted := api.Pod{}
if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/foo"), &deleted); err != nil {
t.Errorf("Unexpected error: %v", err)
}
// Set up Watch for object "podFoo" with label filter set.
selector := labels.SelectorFromSet(labels.Set{"filter": "foo"})
filter := func(obj runtime.Object) bool {
metadata, err := meta.Accessor(obj)
if err != nil {
2015-11-02 13:51:56 +00:00
t.Errorf("Unexpected error: %v", err)
return false
}
2015-12-08 03:01:12 +00:00
return selector.Matches(labels.Set(metadata.GetLabels()))
}
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, filter)
2015-11-02 13:51:56 +00:00
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
2015-12-28 09:35:12 +00:00
defer watcher.Stop()
2015-11-02 13:51:56 +00:00
verifyWatchEvent(t, watcher, watch.Added, podFoo)
verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered)
verifyWatchEvent(t, watcher, watch.Added, podFoo)
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
}