Merge pull request #22500 from mesosphere/jdef_multi_bugfix

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2016-03-04 00:32:28 -08:00
commit 679e27c157
6 changed files with 215 additions and 28 deletions

View File

@ -164,7 +164,7 @@ func (ms *MinionServer) launchProxyServer() {
// executor doesn't support failover right now, the right thing to do is to fail completely since all
// pods will be lost upon restart and we want mesos to recover the resources from them.
func (ms *MinionServer) launchExecutorServer(containerID string) <-chan struct{} {
allArgs := os.Args[1:]
allArgs := os.Args[2:]
// filter out minion flags, leaving those for the executor
executorFlags := pflag.NewFlagSet("executor", pflag.ContinueOnError)

View File

@ -20,7 +20,7 @@ import (
"fmt"
"time"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
@ -63,11 +63,11 @@ type LookupFunc func(hostName string) *api.Node
type clientRegistrator struct {
lookupNode LookupFunc
client *clientset.Clientset
client unversionedcore.NodesGetter
queue *queue.HistoricalFIFO
}
func NewRegistrator(client *clientset.Clientset, lookupNode LookupFunc) *clientRegistrator {
func NewRegistrator(client unversionedcore.NodesGetter, lookupNode LookupFunc) *clientRegistrator {
return &clientRegistrator{
lookupNode: lookupNode,
client: client,
@ -80,6 +80,7 @@ func (r *clientRegistrator) Run(terminate <-chan struct{}) error {
RegistrationLoop:
for {
obj := r.queue.Pop(terminate)
log.V(3).Infof("registration event observed")
if obj == nil {
break RegistrationLoop
}
@ -92,6 +93,7 @@ func (r *clientRegistrator) Run(terminate <-chan struct{}) error {
rg := obj.(*registration)
n, needsUpdate := r.updateNecessary(rg.hostName, rg.labels)
if !needsUpdate {
log.V(2).Infof("no update needed, skipping for %s: %v", rg.hostName, rg.labels)
continue
}

View File

@ -0,0 +1,160 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/testing/core"
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
"k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned/fake"
"k8s.io/kubernetes/pkg/runtime"
)
type fakeNodes struct {
*fake.FakeNodes
}
func (f *fakeNodes) Nodes() unversionedcore.NodeInterface {
return f
}
func calledOnce(h bool, ret runtime.Object, err error) (<-chan struct{}, func(core.Action) (bool, runtime.Object, error)) {
ch := make(chan struct{})
return ch, func(_ core.Action) (bool, runtime.Object, error) {
select {
case <-ch:
panic("called more than once")
default:
close(ch)
}
return h, ret, err
}
}
func TestRegister_withUnknownNode(t *testing.T) {
fc := &core.Fake{}
nodes := &fakeNodes{&fake.FakeNodes{&fake.FakeCore{fc}}}
createCalled, createOnce := calledOnce(true, nil, nil)
fc.AddReactor("create", "nodes", createOnce)
lookup := func(hostName string) *api.Node {
select {
case <-createCalled:
return &api.Node{ObjectMeta: api.ObjectMeta{Name: "foo"}}
default:
return nil
}
}
r := NewRegistrator(nodes, lookup)
ch := make(chan struct{})
defer close(ch)
r.Run(ch)
t.Logf("registering node foo")
ok, err := r.Register("foo", nil)
if !ok {
t.Fatalf("registration failed without error")
} else if err != nil {
t.Fatalf("registration failed with error %v", err)
}
// wait for node creation
t.Logf("awaiting node creation")
<-createCalled
}
func TestRegister_withKnownNode(t *testing.T) {
fc := &core.Fake{}
nodes := &fakeNodes{&fake.FakeNodes{&fake.FakeCore{fc}}}
updateCalled, updateOnce := calledOnce(true, nil, nil)
fc.AddReactor("update", "nodes", updateOnce)
lookup := func(hostName string) *api.Node {
select {
case <-updateCalled:
return &api.Node{ObjectMeta: api.ObjectMeta{Name: "foo"}}
default:
// this node needs an update because it has labels: the updated version doesn't
return &api.Node{ObjectMeta: api.ObjectMeta{Name: "foo", Labels: map[string]string{"a": "b"}}}
}
}
r := NewRegistrator(nodes, lookup)
ch := make(chan struct{})
defer close(ch)
r.Run(ch)
t.Logf("registering node foo")
ok, err := r.Register("foo", nil)
if !ok {
t.Fatalf("registration failed without error")
} else if err != nil {
t.Fatalf("registration failed with error %v", err)
}
// wait for node update
t.Logf("awaiting node update")
<-updateCalled
}
func TestRegister_withSemiKnownNode(t *testing.T) {
// semi-known because the lookup func doesn't see the a very newly created node
// but our apiserver "create" call returns an already-exists error. in this case
// CreateOrUpdate should proceed to attempt an update.
fc := &core.Fake{}
nodes := &fakeNodes{&fake.FakeNodes{&fake.FakeCore{fc}}}
createCalled, createOnce := calledOnce(true, nil, errors.NewAlreadyExists(unversioned.GroupResource{"", ""}, "nodes"))
fc.AddReactor("create", "nodes", createOnce)
updateCalled, updateOnce := calledOnce(true, nil, nil)
fc.AddReactor("update", "nodes", updateOnce)
lookup := func(hostName string) *api.Node {
select {
case <-updateCalled:
return &api.Node{ObjectMeta: api.ObjectMeta{Name: "foo"}}
default:
// this makes the node semi-known: apiserver knows it but the store/cache doesn't
return nil
}
}
r := NewRegistrator(nodes, lookup)
ch := make(chan struct{})
defer close(ch)
r.Run(ch)
t.Logf("registering node foo")
ok, err := r.Register("foo", nil)
if !ok {
t.Fatalf("registration failed without error")
} else if err != nil {
t.Fatalf("registration failed with error %v", err)
}
// wait for node update
t.Logf("awaiting node update")
<-createCalled
<-updateCalled
}

View File

@ -18,7 +18,6 @@ package queue
import (
"fmt"
"reflect"
"sync"
"time"
@ -352,30 +351,22 @@ func (f *HistoricalFIFO) gc() {
// Assumes that the caller has acquired the state lock.
func (f *HistoricalFIFO) merge(id string, obj UniqueCopyable) (notifications []Entry) {
item, exists := f.items[id]
now := time.Now()
if !exists {
if !exists || item.Is(POP_EVENT|DELETE_EVENT) {
// no prior history for this UID, or else it was popped/removed by the client.
e := &entry{obj.Copy().(UniqueCopyable), ADD_EVENT}
f.items[id] = e
notifications = append(notifications, e)
} else if item.Value().GetUID() != obj.GetUID() {
// sanity check, please
panic(fmt.Sprintf("historical UID %q != current UID %v", item.Value().GetUID(), obj.GetUID()))
} else {
if !item.Is(DELETE_EVENT) && item.Value().GetUID() != obj.GetUID() {
// hidden DELETE!
// (1) append a DELETE
// (2) append an ADD
// .. and notify listeners in that order
ent := item.(*entry)
ent.event = DELETE_EVENT
e1 := &deletedEntry{ent, now.Add(f.lingerTTL)}
e2 := &entry{obj.Copy().(UniqueCopyable), ADD_EVENT}
f.items[id] = e2
notifications = append(notifications, e1, e2)
} else if !reflect.DeepEqual(obj, item.Value()) {
//TODO(jdef): it would be nice if we could rely on resource versions
//instead of doing a DeepEqual. Maybe someday we'll be able to.
e := &entry{obj.Copy().(UniqueCopyable), UPDATE_EVENT}
f.items[id] = e
notifications = append(notifications, e)
}
// exists && !(popped | deleted). so either the prior event was an add or an
// update. reflect.DeepEqual is expensive. it won't help us determine if
// we missed a hidden delete along the way.
e := &entry{obj.Copy().(UniqueCopyable), UPDATE_EVENT}
f.items[id] = e
notifications = append(notifications, e)
// else objects are the same, no work to do.
}
// check for garbage collection
f.gcc++

View File

@ -119,6 +119,32 @@ func TestFIFO_addUpdate(t *testing.T) {
}
}
func TestFIFO_addDeleteAdd(t *testing.T) {
f := NewHistorical(nil)
testobj := &testObj{"foo", 10}
f.Add(testobj)
f.Delete(testobj)
f.Add(testobj)
_, exists, _ := f.GetByKey("foo")
if !exists {
t.Errorf("item did not get readded")
}
}
func TestFIFO_addPopAdd(t *testing.T) {
f := NewHistorical(nil)
testobj := &testObj{"foo", 10}
f.Add(testobj)
f.Pop(nil)
f.Add(testobj)
_, exists, _ := f.GetByKey("foo")
if !exists {
t.Errorf("item did not get readded")
}
}
func TestFIFO_addReplace(t *testing.T) {
f := NewHistorical(nil)
f.Add(&testObj{"foo", 10})

View File

@ -760,10 +760,18 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
nodeLW := cache.NewListWatchFromClient(nodesClient.CoreClient, "nodes", api.NamespaceAll, fields.Everything())
nodeStore, nodeCtl := controllerfw.NewInformer(nodeLW, &api.Node{}, s.nodeRelistPeriod, &controllerfw.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
node := obj.(*api.Node)
if eiRegistry != nil {
log.V(2).Infof("deleting node %q from registry", node.Name)
eiRegistry.Invalidate(node.Name)
// TODO(jdef) use controllerfw.DeletionHandlingMetaNamespaceKeyFunc at some point?
nodeName := ""
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
nodeName = tombstone.Key
} else if node, ok := obj.(*api.Node); ok {
nodeName = node.Name
}
if nodeName != "" {
log.V(2).Infof("deleting node %q from registry", nodeName)
eiRegistry.Invalidate(nodeName)
}
}
},
})