diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index 3242d20bc7..343b6d1464 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -150,6 +150,7 @@ func (e *Etcd) List(ctx api.Context, label labels.Selector, field fields.Selecto func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object, error) { list := e.NewListFunc() trace := util.NewTrace("List " + reflect.TypeOf(list).String()) + filterFunc := e.filterAndDecorateFunction(m) defer trace.LogIfLong(600 * time.Millisecond) if name, ok := m.MatchesSingle(); ok { trace.Step("About to read single object") @@ -157,21 +158,20 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object if err != nil { return nil, err } - err = e.Storage.GetToList(key, list) + err = e.Storage.GetToList(key, filterFunc, list) trace.Step("Object extracted") if err != nil { return nil, err } } else { trace.Step("About to list directory") - err := e.Storage.List(e.KeyRootFunc(ctx), list) + err := e.Storage.List(e.KeyRootFunc(ctx), filterFunc, list) trace.Step("List extracted") if err != nil { return nil, err } } - defer trace.Step("List filtered") - return generic.FilterList(list, m, generic.DecoratorFunc(e.Decorator)) + return list, nil } // Create inserts a new item according to the unique key from the object. @@ -449,8 +449,21 @@ func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersio if err != nil { return nil, err } + filterFunc := e.filterAndDecorateFunction(m) - filterFunc := func(obj runtime.Object) bool { + if name, ok := m.MatchesSingle(); ok { + key, err := e.KeyFunc(ctx, name) + if err != nil { + return nil, err + } + return e.Storage.Watch(key, version, filterFunc) + } + + return e.Storage.WatchList(e.KeyRootFunc(ctx), version, filterFunc) +} + +func (e *Etcd) filterAndDecorateFunction(m generic.Matcher) func(runtime.Object) bool { + return func(obj runtime.Object) bool { matches, err := m.Matches(obj) if err != nil { glog.Errorf("unable to match watch: %v", err) @@ -464,16 +477,6 @@ func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersio } return matches } - - if name, ok := m.MatchesSingle(); ok { - key, err := e.KeyFunc(ctx, name) - if err != nil { - return nil, err - } - return e.Storage.Watch(key, version, filterFunc) - } - - return e.Storage.WatchList(e.KeyRootFunc(ctx), version, filterFunc) } // calculateTTL is a helper for retrieving the updated TTL for an object or returning an error diff --git a/pkg/registry/generic/matcher.go b/pkg/registry/generic/matcher.go index b3870747b3..111be647b0 100644 --- a/pkg/registry/generic/matcher.go +++ b/pkg/registry/generic/matcher.go @@ -118,38 +118,3 @@ var ( _ = Matcher(&SelectionPredicate{}) _ = Matcher(matcherFunc(nil)) ) - -// DecoratorFunc can mutate the provided object prior to being returned. -type DecoratorFunc func(obj runtime.Object) error - -// FilterList filters any list object that conforms to the api conventions, -// provided that 'm' works with the concrete type of list. d is an optional -// decorator for the returned functions. Only matching items are decorated. -func FilterList(list runtime.Object, m Matcher, d DecoratorFunc) (filtered runtime.Object, err error) { - // TODO: push a matcher down into tools.etcdHelper to avoid all this - // nonsense. This is a lot of unnecessary copies. - items, err := runtime.ExtractList(list) - if err != nil { - return nil, err - } - var filteredItems []runtime.Object - for _, obj := range items { - match, err := m.Matches(obj) - if err != nil { - return nil, err - } - if match { - if d != nil { - if err := d(obj); err != nil { - return nil, err - } - } - filteredItems = append(filteredItems, obj) - } - } - err = runtime.SetList(list, filteredItems) - if err != nil { - return nil, err - } - return list, nil -} diff --git a/pkg/registry/generic/matcher_test.go b/pkg/registry/generic/matcher_test.go index 669f15b5f8..5cd99cf0a1 100644 --- a/pkg/registry/generic/matcher_test.go +++ b/pkg/registry/generic/matcher_test.go @@ -18,7 +18,6 @@ package generic import ( "errors" - "reflect" "testing" "k8s.io/kubernetes/pkg/fields" @@ -118,44 +117,6 @@ func TestSelectionPredicate(t *testing.T) { } } -func TestFilterList(t *testing.T) { - try := &IgnoredList{ - Items: []Ignored{ - {"foo"}, - {"bar"}, - {"baz"}, - {"qux"}, - {"zot"}, - }, - } - expect := &IgnoredList{ - Items: []Ignored{ - {"bar"}, - {"baz"}, - }, - } - - m := MatcherFunc(func(obj runtime.Object) (bool, error) { - i, ok := obj.(*Ignored) - if !ok { - return false, errors.New("wrong type") - } - return i.ID[0] == 'b', nil - }) - if _, matchesSingleObject := m.MatchesSingle(); matchesSingleObject { - t.Errorf("matcher unexpectedly matches only a single object.") - } - - got, err := FilterList(try, m, nil) - if err != nil { - t.Fatalf("Unexpected error %v", err) - } - - if e, a := expect, got; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %#v, got %#v", e, a) - } -} - func TestSingleMatch(t *testing.T) { m := MatchOnKey("pod-name-here", func(obj runtime.Object) (bool, error) { return true, nil }) got, ok := m.MatchesSingle() diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index ee76b029a1..8c0901b396 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -213,13 +213,13 @@ func (c *Cacher) Get(key string, objPtr runtime.Object, ignoreNotFound bool) err } // Implements storage.Interface. -func (c *Cacher) GetToList(key string, listObj runtime.Object) error { - return c.storage.GetToList(key, listObj) +func (c *Cacher) GetToList(key string, filter FilterFunc, listObj runtime.Object) error { + return c.storage.GetToList(key, filter, listObj) } // Implements storage.Interface. -func (c *Cacher) List(key string, listObj runtime.Object) error { - return c.storage.List(key, listObj) +func (c *Cacher) List(key string, filter FilterFunc, listObj runtime.Object) error { + return c.storage.List(key, filter, listObj) } // ListFromMemory implements list operation (the same signature as List method) @@ -303,7 +303,7 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi return func(obj runtime.Object) bool { objKey, err := keyFunc(obj) if err != nil { - glog.Errorf("Invalid object for filter: %v", obj) + glog.Errorf("invalid object for filter: %v", obj) return false } if !strings.HasPrefix(objKey, key) { @@ -343,7 +343,7 @@ func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFun // Implements cache.ListerWatcher interface. func (lw *cacherListerWatcher) List() (runtime.Object, error) { list := lw.newListFunc() - if err := lw.storage.List(lw.resourcePrefix, list); err != nil { + if err := lw.storage.List(lw.resourcePrefix, Everything, list); err != nil { return nil, err } return list, nil diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index f4a0747235..254846ea31 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -243,7 +243,7 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run } // Implements storage.Interface. -func (h *etcdHelper) GetToList(key string, listObj runtime.Object) error { +func (h *etcdHelper) GetToList(key string, filter storage.FilterFunc, listObj runtime.Object) error { trace := util.NewTrace("GetToList " + getTypeName(listObj)) listPtr, err := runtime.GetItemsPtr(listObj) if err != nil { @@ -265,7 +265,7 @@ func (h *etcdHelper) GetToList(key string, listObj runtime.Object) error { nodes := make([]*etcd.Node, 0) nodes = append(nodes, response.Node) - if err := h.decodeNodeList(nodes, listPtr); err != nil { + if err := h.decodeNodeList(nodes, filter, listPtr); err != nil { return err } trace.Step("Object decoded") @@ -278,7 +278,7 @@ func (h *etcdHelper) GetToList(key string, listObj runtime.Object) error { } // decodeNodeList walks the tree of each node in the list and decodes into the specified object -func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) error { +func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFunc, slicePtr interface{}) error { trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr)) defer trace.LogIfLong(500 * time.Millisecond) v, err := conversion.EnforcePtr(slicePtr) @@ -289,14 +289,16 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er for _, node := range nodes { if node.Dir { trace.Step("Decoding dir " + node.Key + " START") - if err := h.decodeNodeList(node.Nodes, slicePtr); err != nil { + if err := h.decodeNodeList(node.Nodes, filter, slicePtr); err != nil { return err } trace.Step("Decoding dir " + node.Key + " END") continue } if obj, found := h.getFromCache(node.ModifiedIndex); found { - v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) + if filter(obj) { + v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) + } } else { obj := reflect.New(v.Type().Elem()) if err := h.codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil { @@ -306,7 +308,9 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er // being unable to set the version does not prevent the object from being extracted _ = h.versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex) } - v.Set(reflect.Append(v, obj.Elem())) + if filter(obj.Interface().(runtime.Object)) { + v.Set(reflect.Append(v, obj.Elem())) + } if node.ModifiedIndex != 0 { h.addToCache(node.ModifiedIndex, obj.Interface().(runtime.Object)) } @@ -317,7 +321,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er } // Implements storage.Interface. -func (h *etcdHelper) List(key string, listObj runtime.Object) error { +func (h *etcdHelper) List(key string, filter storage.FilterFunc, listObj runtime.Object) error { trace := util.NewTrace("List " + getTypeName(listObj)) defer trace.LogIfLong(time.Second) listPtr, err := runtime.GetItemsPtr(listObj) @@ -333,7 +337,7 @@ func (h *etcdHelper) List(key string, listObj runtime.Object) error { if err != nil { return err } - if err := h.decodeNodeList(nodes, listPtr); err != nil { + if err := h.decodeNodeList(nodes, filter, listPtr); err != nil { return err } trace.Step("Node list decoded") diff --git a/pkg/storage/etcd/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go index 99820304b9..5011cbf65e 100644 --- a/pkg/storage/etcd/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -155,7 +155,69 @@ func TestList(t *testing.T) { } var got api.PodList - err := helper.List("/some/key", &got) + err := helper.List("/some/key", storage.Everything, &got) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + if e, a := expect, got; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } +} + +func TestListFiltered(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()) + key := etcdtest.AddPrefix("/some/key") + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + EtcdIndex: 10, + Node: &etcd.Node{ + Dir: true, + Nodes: []*etcd.Node{ + { + Key: "/foo", + Value: getEncodedPod("foo"), + Dir: false, + ModifiedIndex: 1, + }, + { + Key: "/bar", + Value: getEncodedPod("bar"), + Dir: false, + ModifiedIndex: 2, + }, + { + Key: "/baz", + Value: getEncodedPod("baz"), + Dir: false, + ModifiedIndex: 3, + }, + }, + }, + }, + } + grace := int64(30) + expect := api.PodList{ + ListMeta: api.ListMeta{ResourceVersion: "10"}, + Items: []api.Pod{ + { + ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, + Spec: api.PodSpec{ + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + TerminationGracePeriodSeconds: &grace, + }, + }, + }, + } + + filter := func(obj runtime.Object) bool { + pod := obj.(*api.Pod) + return pod.Name == "bar" + } + + var got api.PodList + err := helper.List("/some/key", filter, &got) if err != nil { t.Errorf("Unexpected error %v", err) } @@ -243,7 +305,7 @@ func TestListAcrossDirectories(t *testing.T) { } var got api.PodList - err := helper.List("/some/key", &got) + err := helper.List("/some/key", storage.Everything, &got) if err != nil { t.Errorf("Unexpected error %v", err) } @@ -318,7 +380,7 @@ func TestListExcludesDirectories(t *testing.T) { } var got api.PodList - err := helper.List("/some/key", &got) + err := helper.List("/some/key", storage.Everything, &got) if err != nil { t.Errorf("Unexpected error %v", err) } diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index 3ba685ab57..d591a5b8e7 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -111,11 +111,11 @@ type Interface interface { // GetToList unmarshals json found at key and opaque it into *List api object // (an object that satisfies the runtime.IsList definition). - GetToList(key string, listObj runtime.Object) error + GetToList(key string, filter FilterFunc, listObj runtime.Object) error // List unmarshalls jsons found at directory defined by key and opaque them // into *List api object (an object that satisfies runtime.IsList definition). - List(key string, listObj runtime.Object) error + List(key string, filter FilterFunc, listObj runtime.Object) error // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') // retrying the update until success if there is index conflict.