From 6b089582649b4d0a34a838acc8ce1b8dece15a90 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Thu, 2 Apr 2015 10:57:28 +0200 Subject: [PATCH] Kubelet watching only its own Node --- pkg/api/v1beta1/conversion.go | 13 ++++++ pkg/api/v1beta2/conversion.go | 13 ++++++ pkg/api/v1beta3/conversion.go | 13 ++++++ pkg/kubelet/kubelet.go | 9 ++--- pkg/registry/generic/etcd/etcd.go | 20 +++++++-- pkg/registry/minion/etcd/etcd.go | 1 + pkg/registry/minion/etcd/etcd_test.go | 58 +++++++++++++++++++++++++++ pkg/registry/minion/rest.go | 11 ++++- 8 files changed, 127 insertions(+), 11 deletions(-) diff --git a/pkg/api/v1beta1/conversion.go b/pkg/api/v1beta1/conversion.go index a39a854d78..8e399f2b37 100644 --- a/pkg/api/v1beta1/conversion.go +++ b/pkg/api/v1beta1/conversion.go @@ -1545,6 +1545,19 @@ func init() { // If one of the conversion functions is malformed, detect it immediately. panic(err) } + err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "Minion", + func(label, value string) (string, string, error) { + switch label { + case "name": + return "name", value, nil + default: + return "", "", fmt.Errorf("field label not supported: %s", label) + } + }) + if err != nil { + // If one of the conversion functions is malformed, detect it immediately. + panic(err) + } err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "ReplicationController", func(label, value string) (string, string, error) { switch label { diff --git a/pkg/api/v1beta2/conversion.go b/pkg/api/v1beta2/conversion.go index e6ffbc6952..55811fdf82 100644 --- a/pkg/api/v1beta2/conversion.go +++ b/pkg/api/v1beta2/conversion.go @@ -1470,6 +1470,19 @@ func init() { // If one of the conversion functions is malformed, detect it immediately. panic(err) } + err = newer.Scheme.AddFieldLabelConversionFunc("v1beta2", "Minion", + func(label, value string) (string, string, error) { + switch label { + case "name": + return "name", value, nil + default: + return "", "", fmt.Errorf("field label not supported: %s", label) + } + }) + if err != nil { + // if one of the conversion functions is malformed, detect it immediately. + panic(err) + } err = newer.Scheme.AddFieldLabelConversionFunc("v1beta2", "ReplicationController", func(label, value string) (string, string, error) { switch label { diff --git a/pkg/api/v1beta3/conversion.go b/pkg/api/v1beta3/conversion.go index 1f1b38d611..546b70db39 100644 --- a/pkg/api/v1beta3/conversion.go +++ b/pkg/api/v1beta3/conversion.go @@ -39,6 +39,19 @@ func init() { // If one of the conversion functions is malformed, detect it immediately. panic(err) } + err = newer.Scheme.AddFieldLabelConversionFunc("v1beta3", "Minion", + func(label, value string) (string, string, error) { + switch label { + case "name": + return label, value, nil + default: + return "", "", fmt.Errorf("field label not supported: %s", label) + } + }) + if err != nil { + // If one of the conversion functions is malformed, detect it immediately. + panic(err) + } err = newer.Scheme.AddFieldLabelConversionFunc("v1beta3", "ReplicationController", func(label, value string) (string, string, error) { switch label { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 1036c7b111..21c29ef5b8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -179,17 +179,14 @@ func NewMainKubelet( if kubeClient != nil { // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather // than an interface. There is no way to construct a list+watcher using resource name. + fieldSelector := fields.Set{"name": hostname}.AsSelector() listWatch := &cache.ListWatch{ - // TODO: currently, we are watching all nodes. To make it more efficient, - // we should be watching only a node with Name equal to kubelet's Hostname. - // To make it possible, we need to add field selector to ListFunc and WatchFunc, - // and selection by field needs to be implemented in WatchMinions function in pkg/registry/etcd. ListFunc: func() (runtime.Object, error) { + // TODO: Use List() with fieldSelector when it is supported. return kubeClient.Nodes().List() }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return kubeClient.Nodes().Watch( - labels.Everything(), fields.Everything(), resourceVersion) + return kubeClient.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion) }, } cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run() diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index 1860fbbf6d..463b5acd2e 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -65,6 +65,10 @@ type Etcd struct { // Called for Create/Update/Get/Delete KeyFunc func(ctx api.Context, name string) (string, error) + // If field.Selector of Watch contains a label with such name, this will be + // translated to watching a single object (not all objects of that type). + WatchSingleFieldName string + // Called to get the name of an object ObjectNameFunc func(obj runtime.Object) (string, error) @@ -404,19 +408,29 @@ func (e *Etcd) finalizeDelete(obj runtime.Object, runHooks bool) (runtime.Object } // WatchPredicate starts a watch for the items that m matches. -// TODO: Detect if m references a single object instead of a list. func (e *Etcd) Watch(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion) + if value, found := field.RequiresExactMatch(e.WatchSingleFieldName); found && len(e.WatchSingleFieldName) > 0 { + key, err := e.KeyFunc(ctx, value) + if err != nil { + return nil, err + } + return e.watchPredicate(key, e.PredicateFunc(label, field), resourceVersion) + } + return e.watchPredicate(e.KeyRootFunc(ctx), e.PredicateFunc(label, field), resourceVersion) } // WatchPredicate starts a watch for the items that m matches. // TODO: Detect if m references a single object instead of a list. func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) { + return e.watchPredicate(e.KeyRootFunc(ctx), m, resourceVersion) +} + +func (e *Etcd) watchPredicate(key string, m generic.Matcher, resourceVersion string) (watch.Interface, error) { version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName) if err != nil { return nil, err } - return e.Helper.WatchList(e.KeyRootFunc(ctx), version, func(obj runtime.Object) bool { + return e.Helper.WatchList(key, version, func(obj runtime.Object) bool { matches, err := m.Matches(obj) if err != nil { glog.Errorf("unable to match watch: %v", err) diff --git a/pkg/registry/minion/etcd/etcd.go b/pkg/registry/minion/etcd/etcd.go index 192e9da235..f90510cb4b 100644 --- a/pkg/registry/minion/etcd/etcd.go +++ b/pkg/registry/minion/etcd/etcd.go @@ -49,6 +49,7 @@ func NewStorage(h tools.EtcdHelper, connection client.ConnectionInfoGetter) *RES KeyFunc: func(ctx api.Context, name string) (string, error) { return prefix + "/" + name, nil }, + WatchSingleFieldName: "name", ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Node).Name, nil }, diff --git a/pkg/registry/minion/etcd/etcd_test.go b/pkg/registry/minion/etcd/etcd_test.go index e64a17b81a..8f23d87a4e 100644 --- a/pkg/registry/minion/etcd/etcd_test.go +++ b/pkg/registry/minion/etcd/etcd_test.go @@ -34,6 +34,11 @@ import ( "github.com/coreos/go-etcd/etcd" ) +const ( + PASS = iota + FAIL +) + type fakeConnectionInfoGetter struct { } @@ -342,6 +347,59 @@ func TestEtcdWatchNodesMatch(t *testing.T) { watching.Stop() } +func TestEtcdWatchNodesFields(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeClient := newStorage(t) + node := validNewNode() + nodeBytes, _ := latest.Codec.Encode(node) + + testFieldMap := map[int][]fields.Set{ + PASS: { + {"name": "foo"}, + }, + FAIL: { + {"name": "bar"}, + }, + } + + for _, singleWatchField := range []string{"", "name"} { + storage.WatchSingleFieldName = singleWatchField + for expectedResult, fieldSet := range testFieldMap { + for _, field := range fieldSet { + watching, err := storage.Watch(ctx, + labels.Everything(), + field.AsSelector(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + fakeClient.WatchResponse <- &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(nodeBytes), + }, + } + select { + case r, ok := <-watching.ResultChan(): + if expectedResult == FAIL { + t.Errorf("unexpected result from channel %#v", r) + } + if !ok { + t.Errorf("watching channel should be open") + } + case <-time.After(time.Millisecond * 100): + if expectedResult == PASS { + t.Errorf("unexpected timeout from result channel") + } + } + watching.Stop() + } + } + } +} + func TestEtcdWatchNodesNotMatch(t *testing.T) { ctx := api.NewDefaultContext() storage, fakeClient := newStorage(t) diff --git a/pkg/registry/minion/rest.go b/pkg/registry/minion/rest.go index 797c5f781b..0590e0a2bc 100644 --- a/pkg/registry/minion/rest.go +++ b/pkg/registry/minion/rest.go @@ -82,6 +82,13 @@ type ResourceGetter interface { Get(api.Context, string) (runtime.Object, error) } +// NodeToSelectableFields returns a label set that represents the object. +func NodeToSelectableFields(node *api.Node) labels.Set { + return labels.Set{ + "name": node.Name, + } +} + // MatchNode returns a generic matcher for a given label and field selector. func MatchNode(label labels.Selector, field fields.Selector) generic.Matcher { return generic.MatcherFunc(func(obj runtime.Object) (bool, error) { @@ -89,8 +96,8 @@ func MatchNode(label labels.Selector, field fields.Selector) generic.Matcher { if !ok { return false, fmt.Errorf("not a node") } - // TODO: Add support for filtering based on field, once NodeStatus is defined. - return label.Matches(labels.Set(nodeObj.Labels)), nil + fields := NodeToSelectableFields(nodeObj) + return label.Matches(labels.Set(nodeObj.Labels)) && field.Matches(fields), nil }) }