Merge pull request #56198 from dixudx/remove_FilterFunc_use_SelectionPredicate

Automatic merge from submit-queue (batch tested with PRs 55977, 56198, 57202, 57254, 57214). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

remove FilterFunc and use SelectionPredicate everywhere

**What this PR does / why we need it**:
> // FilterFunc takes an API object and returns true if the object satisfies some requirements.
// TODO: We will remove this type and use SelectionPredicate everywhere.
type FilterFunc func(obj runtime.Object) bool

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #

**Special notes for your reviewer**:
/assign @liggitt @wojtek-t 
**Release note**:

```release-note
None
```
pull/6/head
Kubernetes Submit Queue 2017-12-17 08:26:48 -08:00 committed by GitHub
commit 402456991f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 78 additions and 69 deletions

View File

@ -682,12 +682,16 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
}
func filterFunction(key string, p SelectionPredicate) func(string, runtime.Object) bool {
f := SimpleFilter(p)
filterFunc := func(objKey string, obj runtime.Object) bool {
if !hasPathPrefix(objKey, key) {
return false
}
return f(obj)
matches, err := p.Matches(obj)
if err != nil {
glog.Errorf("invalid object for matching. Obj: %v. Err: %v", obj, err)
return false
}
return matches
}
return filterFunc
}

View File

@ -240,7 +240,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
return nil, err
}
key = path.Join(h.pathPrefix, key)
w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h.transformer, h)
w := newEtcdWatcher(false, h.quorum, nil, pred, h.codec, h.versioner, nil, h.transformer, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil
}
@ -255,7 +255,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
return nil, err
}
key = path.Join(h.pathPrefix, key)
w := newEtcdWatcher(true, h.quorum, exceptKey(key), storage.SimpleFilter(pred), h.codec, h.versioner, nil, h.transformer, h)
w := newEtcdWatcher(true, h.quorum, exceptKey(key), pred, h.codec, h.versioner, nil, h.transformer, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil
}
@ -359,7 +359,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion
nodes := make([]*etcd.Node, 0)
nodes = append(nodes, response.Node)
if err := h.decodeNodeList(nodes, storage.SimpleFilter(pred), listPtr); err != nil {
if err := h.decodeNodeList(nodes, pred, listPtr); err != nil {
return err
}
trace.Step("Object decoded")
@ -370,7 +370,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion
}
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFunc, slicePtr interface{}) error {
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, pred storage.SelectionPredicate, slicePtr interface{}) error {
trace := utiltrace.New("decodeNodeList " + getTypeName(slicePtr))
defer trace.LogIfLong(400 * time.Millisecond)
v, err := conversion.EnforcePtr(slicePtr)
@ -383,13 +383,13 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
// IMPORTANT: do not log each key as a discrete step in the trace log
// as it produces an immense amount of log spam when there is a large
// amount of content in the list.
if err := h.decodeNodeList(node.Nodes, filter, slicePtr); err != nil {
if err := h.decodeNodeList(node.Nodes, pred, slicePtr); err != nil {
return err
}
continue
}
if obj, found := h.getFromCache(node.ModifiedIndex, filter); found {
// obj != nil iff it matches the filter function.
if obj, found := h.getFromCache(node.ModifiedIndex, pred); found {
// obj != nil iff it matches the pred function.
if obj != nil {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
@ -407,7 +407,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
}
// being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(obj, node.ModifiedIndex)
if filter(obj) {
if matched, err := pred.Matches(obj); err == nil && matched {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
if node.ModifiedIndex != 0 {
@ -439,7 +439,7 @@ func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion strin
if err != nil {
return err
}
if err := h.decodeNodeList(nodes, storage.SimpleFilter(pred), listPtr); err != nil {
if err := h.decodeNodeList(nodes, pred, listPtr); err != nil {
return err
}
trace.Step("Node list decoded")
@ -590,7 +590,7 @@ func (h *etcdHelper) GuaranteedUpdate(
// their Node.ModifiedIndex, which is unique across all types.
// All implementations must be thread-safe.
type etcdCache interface {
getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool)
getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool)
addToCache(index uint64, obj runtime.Object)
}
@ -598,14 +598,14 @@ func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}
func (h *etcdHelper) getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) {
func (h *etcdHelper) getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool) {
startTime := time.Now()
defer func() {
metrics.ObserveGetCache(startTime)
}()
obj, found := h.cache.Get(index)
if found {
if !filter(obj.(runtime.Object)) {
if matched, err := pred.Matches(obj.(runtime.Object)); err != nil || !matched {
return nil, true
}
// We should not return the object itself to avoid polluting the cache if someone

View File

@ -78,7 +78,7 @@ type etcdWatcher struct {
list bool // If we're doing a recursive watch, should be true.
quorum bool // If we enable quorum, shoule be true
include includeFunc
filter storage.FilterFunc
pred storage.SelectionPredicate
etcdIncoming chan *etcd.Response
etcdError chan error
@ -105,11 +105,9 @@ const watchWaitDuration = 100 * time.Millisecond
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes.
// The versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(
list bool, quorum bool, include includeFunc, filter storage.FilterFunc,
func newEtcdWatcher(list bool, quorum bool, include includeFunc, pred storage.SelectionPredicate,
encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
valueTransformer ValueTransformer,
cache etcdCache) *etcdWatcher {
valueTransformer ValueTransformer, cache etcdCache) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
@ -119,7 +117,7 @@ func newEtcdWatcher(
list: list,
quorum: quorum,
include: include,
filter: filter,
pred: pred,
// Buffer this channel, so that the etcd client is not forced
// to context switch with every object it gets, and so that a
// long time spent decoding an object won't block the *next*
@ -315,7 +313,7 @@ func (w *etcdWatcher) translate() {
// decodeObject extracts an object from the provided etcd node or returns an error.
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.SimpleFilter(storage.Everything)); found {
if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.Everything); found {
return obj, nil
}
@ -365,7 +363,7 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) {
// the resourceVersion to resume will never be able to get past a bad value.
return
}
if !w.filter(obj) {
if matched, err := w.pred.Matches(obj); err != nil || !matched {
return
}
action := watch.Added
@ -391,7 +389,10 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
// the resourceVersion to resume will never be able to get past a bad value.
return
}
curObjPasses := w.filter(curObj)
curObjPasses := false
if matched, err := w.pred.Matches(curObj); err == nil && matched {
curObjPasses = true
}
oldObjPasses := false
var oldObj runtime.Object
if res.PrevNode != nil && res.PrevNode.Value != "" {
@ -400,10 +401,12 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
if err := w.versioner.UpdateObject(oldObj, res.Node.ModifiedIndex); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", res.Node.ModifiedIndex, oldObj, err))
}
oldObjPasses = w.filter(oldObj)
if matched, err := w.pred.Matches(oldObj); err == nil && matched {
oldObjPasses = true
}
}
}
// Some changes to an object may cause it to start or stop matching a filter.
// Some changes to an object may cause it to start or stop matching a pred.
// We need to report those as adds/deletes. So we have to check both the previous
// and current value of the object.
switch {
@ -423,7 +426,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
Object: oldObj,
})
}
// Do nothing if neither new nor old object passed the filter.
// Do nothing if neither new nor old object passed the pred.
}
func (w *etcdWatcher) sendDelete(res *etcd.Response) {
@ -449,7 +452,7 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
// the resourceVersion to resume will never be able to get past a bad value.
return
}
if !w.filter(obj) {
if matched, err := w.pred.Matches(obj); err != nil || !matched {
return
}
w.emit(watch.Event{

View File

@ -23,6 +23,8 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
apitesting "k8s.io/apimachinery/pkg/api/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
@ -41,7 +43,7 @@ var versioner = APIObjectVersioner{}
// Implements etcdCache interface as empty methods (i.e. does not cache any objects)
type fakeEtcdCache struct{}
func (f *fakeEtcdCache) getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) {
func (f *fakeEtcdCache) getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool) {
return nil, false
}
@ -58,7 +60,7 @@ func TestWatchInterpretations(t *testing.T) {
podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
podBaz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz"}}
// All of these test cases will be run with the firstLetterIsB Filter.
// All of these test cases will be run with the firstLetterIsB SelectionPredicate.
table := map[string]struct {
actions []string // Run this test item for every action here.
prevNodeValue string
@ -128,8 +130,21 @@ func TestWatchInterpretations(t *testing.T) {
expectEmit: false,
},
}
firstLetterIsB := func(obj runtime.Object) bool {
return obj.(*example.Pod).Name[0] == 'b'
// Should use fieldSelector here.
// But for the sake of tests (simplifying the codes), use labelSelector to support set-based requirements
selector, err := labels.Parse("metadata.name in (bar, baz)")
if err != nil {
t.Fatal(err)
}
firstLetterIsB := storage.SelectionPredicate{
Label: selector,
Field: fields.Everything(),
IncludeUninitialized: true,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return labels.Set{"metadata.name": pod.Name}, nil, pod.Initializers != nil, nil
},
}
for name, item := range table {
for _, action := range item.actions {
@ -173,7 +188,7 @@ func TestWatchInterpretations(t *testing.T) {
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
_, codecs := testScheme(t)
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@ -189,7 +204,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@ -205,7 +220,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@ -228,10 +243,17 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
filter := func(obj runtime.Object) bool {
return obj.(*example.Pod).Name != "bar"
selector, _ := fields.ParseSelector("metadata.name!=bar")
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: selector,
IncludeUninitialized: true,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
}
w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
w := newEtcdWatcher(false, false, nil, pred, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
eventChan := make(chan watch.Event, 1)
w.emit = func(e watch.Event) {

View File

@ -403,7 +403,7 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
if err != nil || v.Kind() != reflect.Slice {
panic("need ptr to slice")
}
if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), storage.SimpleFilter(pred), s.codec, s.versioner); err != nil {
if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner); err != nil {
return err
}
// update version with cluster level revision
@ -492,8 +492,6 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
}
keyPrefix := key
filter := storage.SimpleFilter(pred)
// set the appropriate clientv3 options to filter the returned data set
var paging bool
options := make([]clientv3.OpOption, 0, 4)
@ -587,7 +585,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
continue
}
if err := appendListItem(v, data, uint64(kv.ModRevision), filter, s.codec, s.versioner); err != nil {
if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner); err != nil {
return err
}
}
@ -774,14 +772,14 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP
}
// appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice.
func appendListItem(v reflect.Value, data []byte, rev uint64, filter storage.FilterFunc, codec runtime.Codec, versioner storage.Versioner) error {
func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.SelectionPredicate, codec runtime.Codec, versioner storage.Versioner) error {
obj, _, err := codec.Decode(data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
versioner.UpdateObject(obj, rev)
if filter(obj) {
if matched, err := pred.Matches(obj); err == nil && matched {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
return nil

View File

@ -72,7 +72,7 @@ type watchChan struct {
key string
initialRev int64
recursive bool
internalFilter storage.FilterFunc
internalPred storage.SelectionPredicate
ctx context.Context
cancel context.CancelFunc
incomingEventChan chan *event
@ -111,14 +111,14 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
key: key,
initialRev: rev,
recursive: recursive,
internalFilter: storage.SimpleFilter(pred),
internalPred: pred,
incomingEventChan: make(chan *event, incomingBufSize),
resultChan: make(chan watch.Event, outgoingBufSize),
errChan: make(chan error, 1),
}
if pred.Empty() {
// The filter doesn't filter out any object.
wc.internalFilter = nil
wc.internalPred = storage.Everything
}
wc.ctx, wc.cancel = context.WithCancel(ctx)
return wc
@ -250,14 +250,15 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
}
func (wc *watchChan) filter(obj runtime.Object) bool {
if wc.internalFilter == nil {
if wc.internalPred.Empty() {
return true
}
return wc.internalFilter(obj)
matched, err := wc.internalPred.Matches(obj)
return err == nil && matched
}
func (wc *watchChan) acceptAll() bool {
return wc.internalFilter == nil
return wc.internalPred.Empty()
}
// transform transforms an event into a result for user if not filtered.

View File

@ -70,10 +70,6 @@ type MatchValue struct {
// to that function.
type TriggerPublisherFunc func(obj runtime.Object) []MatchValue
// FilterFunc takes an API object and returns true if the object satisfies some requirements.
// TODO: We will remove this type and use SelectionPredicate everywhere.
type FilterFunc func(obj runtime.Object) bool
// Everything accepts all objects.
var Everything = SelectionPredicate{
Label: labels.Everything(),

View File

@ -96,7 +96,7 @@ func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) {
}
matched := s.Label.Matches(labels)
if matched && s.Field != nil {
matched = (matched && s.Field.Matches(fields))
matched = matched && s.Field.Matches(fields)
}
return matched, nil
}

View File

@ -22,8 +22,6 @@ import (
"strings"
"sync/atomic"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/validation/path"
"k8s.io/apimachinery/pkg/runtime"
@ -40,19 +38,6 @@ func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc {
}
}
// SimpleFilter converts a selection predicate into a FilterFunc.
// It ignores any error from Matches().
func SimpleFilter(p SelectionPredicate) FilterFunc {
return func(obj runtime.Object) bool {
matches, err := p.Matches(obj)
if err != nil {
glog.Errorf("invalid object for matching. Obj: %v. Err: %v", obj, err)
return false
}
return matches
}
}
func EverythingFunc(runtime.Object) bool {
return true
}