mirror of https://github.com/k3s-io/k3s
Merge pull request #6349 from wojtek-t/node_watch_fields
Kubelet watching only its own Node objectpull/6/head
commit
f8f14b1cc1
|
@ -1545,6 +1545,19 @@ func init() {
|
||||||
// If one of the conversion functions is malformed, detect it immediately.
|
// If one of the conversion functions is malformed, detect it immediately.
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "Minion",
|
||||||
|
func(label, value string) (string, string, error) {
|
||||||
|
switch label {
|
||||||
|
case "name":
|
||||||
|
return "name", value, nil
|
||||||
|
default:
|
||||||
|
return "", "", fmt.Errorf("field label not supported: %s", label)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// If one of the conversion functions is malformed, detect it immediately.
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "ReplicationController",
|
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "ReplicationController",
|
||||||
func(label, value string) (string, string, error) {
|
func(label, value string) (string, string, error) {
|
||||||
switch label {
|
switch label {
|
||||||
|
|
|
@ -1470,6 +1470,19 @@ func init() {
|
||||||
// If one of the conversion functions is malformed, detect it immediately.
|
// If one of the conversion functions is malformed, detect it immediately.
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta2", "Minion",
|
||||||
|
func(label, value string) (string, string, error) {
|
||||||
|
switch label {
|
||||||
|
case "name":
|
||||||
|
return "name", value, nil
|
||||||
|
default:
|
||||||
|
return "", "", fmt.Errorf("field label not supported: %s", label)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// if one of the conversion functions is malformed, detect it immediately.
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta2", "ReplicationController",
|
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta2", "ReplicationController",
|
||||||
func(label, value string) (string, string, error) {
|
func(label, value string) (string, string, error) {
|
||||||
switch label {
|
switch label {
|
||||||
|
|
|
@ -39,6 +39,19 @@ func init() {
|
||||||
// If one of the conversion functions is malformed, detect it immediately.
|
// If one of the conversion functions is malformed, detect it immediately.
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta3", "Minion",
|
||||||
|
func(label, value string) (string, string, error) {
|
||||||
|
switch label {
|
||||||
|
case "name":
|
||||||
|
return label, value, nil
|
||||||
|
default:
|
||||||
|
return "", "", fmt.Errorf("field label not supported: %s", label)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// If one of the conversion functions is malformed, detect it immediately.
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta3", "ReplicationController",
|
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta3", "ReplicationController",
|
||||||
func(label, value string) (string, string, error) {
|
func(label, value string) (string, string, error) {
|
||||||
switch label {
|
switch label {
|
||||||
|
|
|
@ -179,17 +179,14 @@ func NewMainKubelet(
|
||||||
if kubeClient != nil {
|
if kubeClient != nil {
|
||||||
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
|
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
|
||||||
// than an interface. There is no way to construct a list+watcher using resource name.
|
// than an interface. There is no way to construct a list+watcher using resource name.
|
||||||
|
fieldSelector := fields.Set{"name": hostname}.AsSelector()
|
||||||
listWatch := &cache.ListWatch{
|
listWatch := &cache.ListWatch{
|
||||||
// TODO: currently, we are watching all nodes. To make it more efficient,
|
|
||||||
// we should be watching only a node with Name equal to kubelet's Hostname.
|
|
||||||
// To make it possible, we need to add field selector to ListFunc and WatchFunc,
|
|
||||||
// and selection by field needs to be implemented in WatchMinions function in pkg/registry/etcd.
|
|
||||||
ListFunc: func() (runtime.Object, error) {
|
ListFunc: func() (runtime.Object, error) {
|
||||||
|
// TODO: Use List() with fieldSelector when it is supported.
|
||||||
return kubeClient.Nodes().List()
|
return kubeClient.Nodes().List()
|
||||||
},
|
},
|
||||||
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
|
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
|
||||||
return kubeClient.Nodes().Watch(
|
return kubeClient.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion)
|
||||||
labels.Everything(), fields.Everything(), resourceVersion)
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
|
cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
|
||||||
|
|
|
@ -65,6 +65,10 @@ type Etcd struct {
|
||||||
// Called for Create/Update/Get/Delete
|
// Called for Create/Update/Get/Delete
|
||||||
KeyFunc func(ctx api.Context, name string) (string, error)
|
KeyFunc func(ctx api.Context, name string) (string, error)
|
||||||
|
|
||||||
|
// If field.Selector of Watch contains a label with such name, this will be
|
||||||
|
// translated to watching a single object (not all objects of that type).
|
||||||
|
WatchSingleFieldName string
|
||||||
|
|
||||||
// Called to get the name of an object
|
// Called to get the name of an object
|
||||||
ObjectNameFunc func(obj runtime.Object) (string, error)
|
ObjectNameFunc func(obj runtime.Object) (string, error)
|
||||||
|
|
||||||
|
@ -404,19 +408,29 @@ func (e *Etcd) finalizeDelete(obj runtime.Object, runHooks bool) (runtime.Object
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchPredicate starts a watch for the items that m matches.
|
// WatchPredicate starts a watch for the items that m matches.
|
||||||
// TODO: Detect if m references a single object instead of a list.
|
|
||||||
func (e *Etcd) Watch(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
|
func (e *Etcd) Watch(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
|
||||||
return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion)
|
if value, found := field.RequiresExactMatch(e.WatchSingleFieldName); found && len(e.WatchSingleFieldName) > 0 {
|
||||||
|
key, err := e.KeyFunc(ctx, value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return e.watchPredicate(key, e.PredicateFunc(label, field), resourceVersion)
|
||||||
|
}
|
||||||
|
return e.watchPredicate(e.KeyRootFunc(ctx), e.PredicateFunc(label, field), resourceVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchPredicate starts a watch for the items that m matches.
|
// WatchPredicate starts a watch for the items that m matches.
|
||||||
// TODO: Detect if m references a single object instead of a list.
|
// TODO: Detect if m references a single object instead of a list.
|
||||||
func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
|
func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
|
||||||
|
return e.watchPredicate(e.KeyRootFunc(ctx), m, resourceVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Etcd) watchPredicate(key string, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
|
||||||
version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName)
|
version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return e.Helper.WatchList(e.KeyRootFunc(ctx), version, func(obj runtime.Object) bool {
|
return e.Helper.WatchList(key, version, func(obj runtime.Object) bool {
|
||||||
matches, err := m.Matches(obj)
|
matches, err := m.Matches(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("unable to match watch: %v", err)
|
glog.Errorf("unable to match watch: %v", err)
|
||||||
|
|
|
@ -49,6 +49,7 @@ func NewStorage(h tools.EtcdHelper, connection client.ConnectionInfoGetter) *RES
|
||||||
KeyFunc: func(ctx api.Context, name string) (string, error) {
|
KeyFunc: func(ctx api.Context, name string) (string, error) {
|
||||||
return prefix + "/" + name, nil
|
return prefix + "/" + name, nil
|
||||||
},
|
},
|
||||||
|
WatchSingleFieldName: "name",
|
||||||
ObjectNameFunc: func(obj runtime.Object) (string, error) {
|
ObjectNameFunc: func(obj runtime.Object) (string, error) {
|
||||||
return obj.(*api.Node).Name, nil
|
return obj.(*api.Node).Name, nil
|
||||||
},
|
},
|
||||||
|
|
|
@ -34,6 +34,11 @@ import (
|
||||||
"github.com/coreos/go-etcd/etcd"
|
"github.com/coreos/go-etcd/etcd"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
PASS = iota
|
||||||
|
FAIL
|
||||||
|
)
|
||||||
|
|
||||||
type fakeConnectionInfoGetter struct {
|
type fakeConnectionInfoGetter struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -342,6 +347,59 @@ func TestEtcdWatchNodesMatch(t *testing.T) {
|
||||||
watching.Stop()
|
watching.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEtcdWatchNodesFields(t *testing.T) {
|
||||||
|
ctx := api.NewDefaultContext()
|
||||||
|
storage, fakeClient := newStorage(t)
|
||||||
|
node := validNewNode()
|
||||||
|
nodeBytes, _ := latest.Codec.Encode(node)
|
||||||
|
|
||||||
|
testFieldMap := map[int][]fields.Set{
|
||||||
|
PASS: {
|
||||||
|
{"name": "foo"},
|
||||||
|
},
|
||||||
|
FAIL: {
|
||||||
|
{"name": "bar"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, singleWatchField := range []string{"", "name"} {
|
||||||
|
storage.WatchSingleFieldName = singleWatchField
|
||||||
|
for expectedResult, fieldSet := range testFieldMap {
|
||||||
|
for _, field := range fieldSet {
|
||||||
|
watching, err := storage.Watch(ctx,
|
||||||
|
labels.Everything(),
|
||||||
|
field.AsSelector(),
|
||||||
|
"1",
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
fakeClient.WaitForWatchCompletion()
|
||||||
|
fakeClient.WatchResponse <- &etcd.Response{
|
||||||
|
Action: "create",
|
||||||
|
Node: &etcd.Node{
|
||||||
|
Value: string(nodeBytes),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case r, ok := <-watching.ResultChan():
|
||||||
|
if expectedResult == FAIL {
|
||||||
|
t.Errorf("unexpected result from channel %#v", r)
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("watching channel should be open")
|
||||||
|
}
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
if expectedResult == PASS {
|
||||||
|
t.Errorf("unexpected timeout from result channel")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
watching.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestEtcdWatchNodesNotMatch(t *testing.T) {
|
func TestEtcdWatchNodesNotMatch(t *testing.T) {
|
||||||
ctx := api.NewDefaultContext()
|
ctx := api.NewDefaultContext()
|
||||||
storage, fakeClient := newStorage(t)
|
storage, fakeClient := newStorage(t)
|
||||||
|
|
|
@ -82,6 +82,13 @@ type ResourceGetter interface {
|
||||||
Get(api.Context, string) (runtime.Object, error)
|
Get(api.Context, string) (runtime.Object, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NodeToSelectableFields returns a label set that represents the object.
|
||||||
|
func NodeToSelectableFields(node *api.Node) labels.Set {
|
||||||
|
return labels.Set{
|
||||||
|
"name": node.Name,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// MatchNode returns a generic matcher for a given label and field selector.
|
// MatchNode returns a generic matcher for a given label and field selector.
|
||||||
func MatchNode(label labels.Selector, field fields.Selector) generic.Matcher {
|
func MatchNode(label labels.Selector, field fields.Selector) generic.Matcher {
|
||||||
return generic.MatcherFunc(func(obj runtime.Object) (bool, error) {
|
return generic.MatcherFunc(func(obj runtime.Object) (bool, error) {
|
||||||
|
@ -89,8 +96,8 @@ func MatchNode(label labels.Selector, field fields.Selector) generic.Matcher {
|
||||||
if !ok {
|
if !ok {
|
||||||
return false, fmt.Errorf("not a node")
|
return false, fmt.Errorf("not a node")
|
||||||
}
|
}
|
||||||
// TODO: Add support for filtering based on field, once NodeStatus is defined.
|
fields := NodeToSelectableFields(nodeObj)
|
||||||
return label.Matches(labels.Set(nodeObj.Labels)), nil
|
return label.Matches(labels.Set(nodeObj.Labels)) && field.Matches(fields), nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue