From 954e25465d80f36d4e5ead6bf2d99a0c9ff3934f Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Fri, 4 Mar 2016 03:58:34 +0000 Subject: [PATCH] fix several bugs: - properly remove the leading non-flag args from the command line before passing the rest to the executor - fix missed "add" after "delete" in merge() when objects are identical - properly test for tombstone instead of *api.Node in delete handler - basic tests for node registrator --- contrib/mesos/pkg/minion/server.go | 2 +- contrib/mesos/pkg/node/registrator.go | 8 +- contrib/mesos/pkg/node/registrator_test.go | 160 ++++++++++++++++++ contrib/mesos/pkg/queue/historical.go | 33 ++-- contrib/mesos/pkg/queue/historical_test.go | 26 +++ .../mesos/pkg/scheduler/service/service.go | 14 +- 6 files changed, 215 insertions(+), 28 deletions(-) create mode 100644 contrib/mesos/pkg/node/registrator_test.go diff --git a/contrib/mesos/pkg/minion/server.go b/contrib/mesos/pkg/minion/server.go index 5129cd1920..9f295ad6fe 100644 --- a/contrib/mesos/pkg/minion/server.go +++ b/contrib/mesos/pkg/minion/server.go @@ -164,7 +164,7 @@ func (ms *MinionServer) launchProxyServer() { // executor doesn't support failover right now, the right thing to do is to fail completely since all // pods will be lost upon restart and we want mesos to recover the resources from them. func (ms *MinionServer) launchExecutorServer(containerID string) <-chan struct{} { - allArgs := os.Args[1:] + allArgs := os.Args[2:] // filter out minion flags, leaving those for the executor executorFlags := pflag.NewFlagSet("executor", pflag.ContinueOnError) diff --git a/contrib/mesos/pkg/node/registrator.go b/contrib/mesos/pkg/node/registrator.go index 75a04d3305..7145e3280a 100644 --- a/contrib/mesos/pkg/node/registrator.go +++ b/contrib/mesos/pkg/node/registrator.go @@ -20,7 +20,7 @@ import ( "fmt" "time" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" log "github.com/golang/glog" "k8s.io/kubernetes/contrib/mesos/pkg/queue" @@ -63,11 +63,11 @@ type LookupFunc func(hostName string) *api.Node type clientRegistrator struct { lookupNode LookupFunc - client *clientset.Clientset + client unversionedcore.NodesGetter queue *queue.HistoricalFIFO } -func NewRegistrator(client *clientset.Clientset, lookupNode LookupFunc) *clientRegistrator { +func NewRegistrator(client unversionedcore.NodesGetter, lookupNode LookupFunc) *clientRegistrator { return &clientRegistrator{ lookupNode: lookupNode, client: client, @@ -80,6 +80,7 @@ func (r *clientRegistrator) Run(terminate <-chan struct{}) error { RegistrationLoop: for { obj := r.queue.Pop(terminate) + log.V(3).Infof("registration event observed") if obj == nil { break RegistrationLoop } @@ -92,6 +93,7 @@ func (r *clientRegistrator) Run(terminate <-chan struct{}) error { rg := obj.(*registration) n, needsUpdate := r.updateNecessary(rg.hostName, rg.labels) if !needsUpdate { + log.V(2).Infof("no update needed, skipping for %s: %v", rg.hostName, rg.labels) continue } diff --git a/contrib/mesos/pkg/node/registrator_test.go b/contrib/mesos/pkg/node/registrator_test.go new file mode 100644 index 0000000000..64beb3aaee --- /dev/null +++ b/contrib/mesos/pkg/node/registrator_test.go @@ -0,0 +1,160 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/testing/core" + unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" + "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned/fake" + "k8s.io/kubernetes/pkg/runtime" +) + +type fakeNodes struct { + *fake.FakeNodes +} + +func (f *fakeNodes) Nodes() unversionedcore.NodeInterface { + return f +} + +func calledOnce(h bool, ret runtime.Object, err error) (<-chan struct{}, func(core.Action) (bool, runtime.Object, error)) { + ch := make(chan struct{}) + return ch, func(_ core.Action) (bool, runtime.Object, error) { + select { + case <-ch: + panic("called more than once") + default: + close(ch) + } + return h, ret, err + } +} + +func TestRegister_withUnknownNode(t *testing.T) { + fc := &core.Fake{} + nodes := &fakeNodes{&fake.FakeNodes{&fake.FakeCore{fc}}} + createCalled, createOnce := calledOnce(true, nil, nil) + fc.AddReactor("create", "nodes", createOnce) + + lookup := func(hostName string) *api.Node { + select { + case <-createCalled: + return &api.Node{ObjectMeta: api.ObjectMeta{Name: "foo"}} + default: + return nil + } + } + + r := NewRegistrator(nodes, lookup) + ch := make(chan struct{}) + defer close(ch) + r.Run(ch) + + t.Logf("registering node foo") + ok, err := r.Register("foo", nil) + if !ok { + t.Fatalf("registration failed without error") + } else if err != nil { + t.Fatalf("registration failed with error %v", err) + } + + // wait for node creation + t.Logf("awaiting node creation") + <-createCalled +} + +func TestRegister_withKnownNode(t *testing.T) { + fc := &core.Fake{} + nodes := &fakeNodes{&fake.FakeNodes{&fake.FakeCore{fc}}} + updateCalled, updateOnce := calledOnce(true, nil, nil) + fc.AddReactor("update", "nodes", updateOnce) + + lookup := func(hostName string) *api.Node { + select { + case <-updateCalled: + return &api.Node{ObjectMeta: api.ObjectMeta{Name: "foo"}} + default: + // this node needs an update because it has labels: the updated version doesn't + return &api.Node{ObjectMeta: api.ObjectMeta{Name: "foo", Labels: map[string]string{"a": "b"}}} + } + } + + r := NewRegistrator(nodes, lookup) + ch := make(chan struct{}) + defer close(ch) + r.Run(ch) + + t.Logf("registering node foo") + ok, err := r.Register("foo", nil) + if !ok { + t.Fatalf("registration failed without error") + } else if err != nil { + t.Fatalf("registration failed with error %v", err) + } + + // wait for node update + t.Logf("awaiting node update") + <-updateCalled +} + +func TestRegister_withSemiKnownNode(t *testing.T) { + // semi-known because the lookup func doesn't see the a very newly created node + // but our apiserver "create" call returns an already-exists error. in this case + // CreateOrUpdate should proceed to attempt an update. + + fc := &core.Fake{} + nodes := &fakeNodes{&fake.FakeNodes{&fake.FakeCore{fc}}} + + createCalled, createOnce := calledOnce(true, nil, errors.NewAlreadyExists(unversioned.GroupResource{"", ""}, "nodes")) + fc.AddReactor("create", "nodes", createOnce) + + updateCalled, updateOnce := calledOnce(true, nil, nil) + fc.AddReactor("update", "nodes", updateOnce) + + lookup := func(hostName string) *api.Node { + select { + case <-updateCalled: + return &api.Node{ObjectMeta: api.ObjectMeta{Name: "foo"}} + default: + // this makes the node semi-known: apiserver knows it but the store/cache doesn't + return nil + } + } + + r := NewRegistrator(nodes, lookup) + ch := make(chan struct{}) + defer close(ch) + r.Run(ch) + + t.Logf("registering node foo") + ok, err := r.Register("foo", nil) + if !ok { + t.Fatalf("registration failed without error") + } else if err != nil { + t.Fatalf("registration failed with error %v", err) + } + + // wait for node update + t.Logf("awaiting node update") + <-createCalled + <-updateCalled +} diff --git a/contrib/mesos/pkg/queue/historical.go b/contrib/mesos/pkg/queue/historical.go index 79304c511d..901dbb6ad8 100644 --- a/contrib/mesos/pkg/queue/historical.go +++ b/contrib/mesos/pkg/queue/historical.go @@ -18,7 +18,6 @@ package queue import ( "fmt" - "reflect" "sync" "time" @@ -352,30 +351,22 @@ func (f *HistoricalFIFO) gc() { // Assumes that the caller has acquired the state lock. func (f *HistoricalFIFO) merge(id string, obj UniqueCopyable) (notifications []Entry) { item, exists := f.items[id] - now := time.Now() - if !exists { + if !exists || item.Is(POP_EVENT|DELETE_EVENT) { + // no prior history for this UID, or else it was popped/removed by the client. e := &entry{obj.Copy().(UniqueCopyable), ADD_EVENT} f.items[id] = e notifications = append(notifications, e) + } else if item.Value().GetUID() != obj.GetUID() { + // sanity check, please + panic(fmt.Sprintf("historical UID %q != current UID %v", item.Value().GetUID(), obj.GetUID())) } else { - if !item.Is(DELETE_EVENT) && item.Value().GetUID() != obj.GetUID() { - // hidden DELETE! - // (1) append a DELETE - // (2) append an ADD - // .. and notify listeners in that order - ent := item.(*entry) - ent.event = DELETE_EVENT - e1 := &deletedEntry{ent, now.Add(f.lingerTTL)} - e2 := &entry{obj.Copy().(UniqueCopyable), ADD_EVENT} - f.items[id] = e2 - notifications = append(notifications, e1, e2) - } else if !reflect.DeepEqual(obj, item.Value()) { - //TODO(jdef): it would be nice if we could rely on resource versions - //instead of doing a DeepEqual. Maybe someday we'll be able to. - e := &entry{obj.Copy().(UniqueCopyable), UPDATE_EVENT} - f.items[id] = e - notifications = append(notifications, e) - } + // exists && !(popped | deleted). so either the prior event was an add or an + // update. reflect.DeepEqual is expensive. it won't help us determine if + // we missed a hidden delete along the way. + e := &entry{obj.Copy().(UniqueCopyable), UPDATE_EVENT} + f.items[id] = e + notifications = append(notifications, e) + // else objects are the same, no work to do. } // check for garbage collection f.gcc++ diff --git a/contrib/mesos/pkg/queue/historical_test.go b/contrib/mesos/pkg/queue/historical_test.go index 8597526c68..094502bdec 100644 --- a/contrib/mesos/pkg/queue/historical_test.go +++ b/contrib/mesos/pkg/queue/historical_test.go @@ -119,6 +119,32 @@ func TestFIFO_addUpdate(t *testing.T) { } } +func TestFIFO_addDeleteAdd(t *testing.T) { + f := NewHistorical(nil) + testobj := &testObj{"foo", 10} + f.Add(testobj) + f.Delete(testobj) + f.Add(testobj) + + _, exists, _ := f.GetByKey("foo") + if !exists { + t.Errorf("item did not get readded") + } +} + +func TestFIFO_addPopAdd(t *testing.T) { + f := NewHistorical(nil) + testobj := &testObj{"foo", 10} + f.Add(testobj) + f.Pop(nil) + f.Add(testobj) + + _, exists, _ := f.GetByKey("foo") + if !exists { + t.Errorf("item did not get readded") + } +} + func TestFIFO_addReplace(t *testing.T) { f := NewHistorical(nil) f.Add(&testObj{"foo", 10}) diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 102ddf4d8f..8af5d42953 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -758,10 +758,18 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config nodeLW := cache.NewListWatchFromClient(nodesClient.CoreClient, "nodes", api.NamespaceAll, fields.Everything()) nodeStore, nodeCtl := controllerfw.NewInformer(nodeLW, &api.Node{}, s.nodeRelistPeriod, &controllerfw.ResourceEventHandlerFuncs{ DeleteFunc: func(obj interface{}) { - node := obj.(*api.Node) if eiRegistry != nil { - log.V(2).Infof("deleting node %q from registry", node.Name) - eiRegistry.Invalidate(node.Name) + // TODO(jdef) use controllerfw.DeletionHandlingMetaNamespaceKeyFunc at some point? + nodeName := "" + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + nodeName = tombstone.Key + } else if node, ok := obj.(*api.Node); ok { + nodeName = node.Name + } + if nodeName != "" { + log.V(2).Infof("deleting node %q from registry", nodeName) + eiRegistry.Invalidate(nodeName) + } } }, })