From 6a2602a51bf3f52298f0644769d63832b9d95615 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Tue, 20 Oct 2015 14:39:19 +0100 Subject: [PATCH 1/4] Make cloud provider return disappearence of slave to the node controller - implement ExternalID in Mesos cloud provider. This is used by the node controller to detect disappeared nodes. - add test case for ExternalID --- pkg/cloudprovider/providers/mesos/client.go | 10 ++++---- pkg/cloudprovider/providers/mesos/mesos.go | 16 +++++++++++- .../providers/mesos/mesos_test.go | 25 +++++++++++++++++++ 3 files changed, 45 insertions(+), 6 deletions(-) diff --git a/pkg/cloudprovider/providers/mesos/client.go b/pkg/cloudprovider/providers/mesos/client.go index c9f1ac222f..da7d5e9855 100644 --- a/pkg/cloudprovider/providers/mesos/client.go +++ b/pkg/cloudprovider/providers/mesos/client.go @@ -55,7 +55,7 @@ type slaveNode struct { type mesosState struct { clusterName string - nodes []*slaveNode + nodes map[string]*slaveNode } type stateCache struct { @@ -94,7 +94,7 @@ func (c *stateCache) clusterName(ctx context.Context) (string, error) { } // nodes returns the cached list of slave nodes. -func (c *stateCache) nodes(ctx context.Context) ([]*slaveNode, error) { +func (c *stateCache) nodes(ctx context.Context) (map[string]*slaveNode, error) { cached, err := c.cachedState(ctx) return cached.nodes, err } @@ -162,7 +162,7 @@ func unpackIPv4(ip uint32) string { // listSlaves returns a (possibly cached) list of slave nodes. // Callers must not mutate the contents of the returned slice. -func (c *mesosClient) listSlaves(ctx context.Context) ([]*slaveNode, error) { +func (c *mesosClient) listSlaves(ctx context.Context) (map[string]*slaveNode, error) { return c.state.nodes(ctx) } @@ -230,7 +230,7 @@ func parseMesosState(blob []byte) (*mesosState, error) { if err := json.Unmarshal(blob, state); err != nil { return nil, err } - nodes := []*slaveNode{} + nodes := map[string]*slaveNode{} for _, slave := range state.Slaves { if slave.Hostname == "" { continue @@ -264,7 +264,7 @@ func parseMesosState(blob []byte) (*mesosState, error) { } log.V(4).Infof("node %q reporting capacity %v", node.hostname, cap) } - nodes = append(nodes, node) + nodes[node.hostname] = node } result := &mesosState{ diff --git a/pkg/cloudprovider/providers/mesos/mesos.go b/pkg/cloudprovider/providers/mesos/mesos.go index fb1404e310..e376f1a0a1 100644 --- a/pkg/cloudprovider/providers/mesos/mesos.go +++ b/pkg/cloudprovider/providers/mesos/mesos.go @@ -182,7 +182,21 @@ func ipAddress(name string) (net.IP, error) { // ExternalID returns the cloud provider ID of the specified instance (deprecated). func (c *MesosCloud) ExternalID(instance string) (string, error) { - ip, err := ipAddress(instance) + //TODO(jdef) use a timeout here? 15s? + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + nodes, err := c.client.listSlaves(ctx) + if err != nil { + return "", err + } + + node := nodes[instance] + if node == nil { + return "", cloudprovider.InstanceNotFound + } + + ip, err := ipAddress(node.hostname) if err != nil { return "", err } diff --git a/pkg/cloudprovider/providers/mesos/mesos_test.go b/pkg/cloudprovider/providers/mesos/mesos_test.go index 748cbd25ad..b7f909bf1e 100644 --- a/pkg/cloudprovider/providers/mesos/mesos_test.go +++ b/pkg/cloudprovider/providers/mesos/mesos_test.go @@ -24,6 +24,7 @@ import ( "time" log "github.com/golang/glog" + "k8s.io/kubernetes/pkg/cloudprovider" ) func TestIPAddress(t *testing.T) { @@ -252,3 +253,27 @@ func Test_List(t *testing.T) { t.Fatalf("List with a reject-all filter should return a list of size 0: (actual: %#v)", clusters) } } + +func Test_ExternalID(t *testing.T) { + defer log.Flush() + md := FakeMasterDetector{} + httpServer, httpClient, httpTransport := makeHttpMocks() + defer httpServer.Close() + cacheTTL := 500 * time.Millisecond + mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL) + mesosCloud := &MesosCloud{client: mesosClient, config: createDefaultConfig()} + + _, err = mesosCloud.ExternalID("unknown") + if err != cloudprovider.InstanceNotFound { + t.Fatalf("ExternalID did not return InstanceNotFound on an unknown instance") + } + + slaveName := "mesos3.internal.company.com" + id, err := mesosCloud.ExternalID(slaveName) + if id != "" { + t.Fatalf("ExternalID should not be able to resolve %q", slaveName) + } + if err == cloudprovider.InstanceNotFound { + t.Fatalf("ExternalID should find %q", slaveName) + } +} From bc7523a775440a6294c48758e1f6e324496c5afa Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Tue, 20 Oct 2015 20:04:09 +0100 Subject: [PATCH 2/4] Make controller manager node monitor constant public --- cmd/kube-controller-manager/app/controllermanager.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 24e56a3991..3b3bf145de 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -124,6 +124,9 @@ func NewCMServer() *CMServer { MinResyncPeriod: 12 * time.Hour, RegisterRetryCount: 10, PodEvictionTimeout: 5 * time.Minute, + NodeMonitorGracePeriod: 40 * time.Second, + NodeStartupGracePeriod: 60 * time.Second, + NodeMonitorPeriod: 5 * time.Second, ClusterName: "kubernetes", TerminatedPodGCThreshold: 12500, VolumeConfigFlags: VolumeConfigFlags{ @@ -204,13 +207,13 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&s.RegisterRetryCount, "register-retry-count", s.RegisterRetryCount, ""+ "The number of retries for initial node registration. Retry interval equals node-sync-period.") fs.MarkDeprecated("register-retry-count", "This flag is currently no-op and will be deleted.") - fs.DurationVar(&s.NodeMonitorGracePeriod, "node-monitor-grace-period", 40*time.Second, + fs.DurationVar(&s.NodeMonitorGracePeriod, "node-monitor-grace-period", s.NodeMonitorGracePeriod, "Amount of time which we allow running Node to be unresponsive before marking it unhealty. "+ "Must be N times more than kubelet's nodeStatusUpdateFrequency, "+ "where N means number of retries allowed for kubelet to post node status.") - fs.DurationVar(&s.NodeStartupGracePeriod, "node-startup-grace-period", 60*time.Second, + fs.DurationVar(&s.NodeStartupGracePeriod, "node-startup-grace-period", s.NodeStartupGracePeriod, "Amount of time which we allow starting Node to be unresponsive before marking it unhealty.") - fs.DurationVar(&s.NodeMonitorPeriod, "node-monitor-period", 5*time.Second, + fs.DurationVar(&s.NodeMonitorPeriod, "node-monitor-period", s.NodeMonitorPeriod, "The period for syncing NodeStatus in NodeController.") fs.StringVar(&s.ServiceAccountKeyFile, "service-account-private-key-file", s.ServiceAccountKeyFile, "Filename containing a PEM-encoded private RSA key used to sign service account tokens.") fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") From 0c1d90bf5f23ff2c8b070e8488659f006381d3ae Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 5 Nov 2015 12:29:39 +0100 Subject: [PATCH 3/4] Add ListWithoutKubelet to mesos cloud provider --- pkg/cloudprovider/providers/mesos/client.go | 38 ++++++++++++++++--- pkg/cloudprovider/providers/mesos/mesos.go | 42 ++++++++++++++++++--- 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/pkg/cloudprovider/providers/mesos/client.go b/pkg/cloudprovider/providers/mesos/client.go index da7d5e9855..70d8ada92c 100644 --- a/pkg/cloudprovider/providers/mesos/client.go +++ b/pkg/cloudprovider/providers/mesos/client.go @@ -49,13 +49,14 @@ type mesosClient struct { } type slaveNode struct { - hostname string - resources *api.NodeResources + hostname string + kubeletRunning bool + resources *api.NodeResources } type mesosState struct { clusterName string - nodes map[string]*slaveNode + nodes map[string]*slaveNode // by hostname } type stateCache struct { @@ -160,7 +161,7 @@ func unpackIPv4(ip uint32) string { return ipv4.String() } -// listSlaves returns a (possibly cached) list of slave nodes. +// listSlaves returns a (possibly cached) map of slave nodes by hostname. // Callers must not mutate the contents of the returned slice. func (c *mesosClient) listSlaves(ctx context.Context) (map[string]*slaveNode, error) { return c.state.nodes(ctx) @@ -225,12 +226,36 @@ func parseMesosState(blob []byte) (*mesosState, error) { Hostname string `json:"hostname"` // ex: 10.22.211.18, or slave-123.nowhere.com Resources map[string]interface{} `json:"resources"` // ex: {"mem": 123, "ports": "[31000-3200]"} } `json:"slaves"` + Frameworks []*struct { + Id string `json:"id"` // ex: 20151105-093752-3745622208-5050-1-0000 + Pid string `json:"pid"` // ex: scheduler(1)@192.168.65.228:57124 + Executors []*struct { + SlaveId string `json:"slave_id"` // ex: 20151105-093752-3745622208-5050-1-S1 + ExecutorId string `json:"executor_id"` // ex: 6704d375c68fee1e_k8sm-executor + Name string `json:"name"` // ex: Kubelet-Executor + } `json:"executors"` + } `json:"frameworks"` } + state := &State{ClusterName: defaultClusterName} if err := json.Unmarshal(blob, state); err != nil { return nil, err } - nodes := map[string]*slaveNode{} + + executorSlaveIds := map[string]struct{}{} + for _, f := range state.Frameworks { + for _, e := range f.Executors { + // Note that this simple comparison breaks when we support more than one + // k8s instance in a cluster. At the moment this is not possible for + // a number of reasons. + // TODO(sttts): find way to detect executors of this k8s instance + if e.Name == "Kubelet-Executor" { + executorSlaveIds[e.SlaveId] = struct{}{} + } + } + } + + nodes := map[string]*slaveNode{} // by hostname for _, slave := range state.Slaves { if slave.Hostname == "" { continue @@ -264,6 +289,9 @@ func parseMesosState(blob []byte) (*mesosState, error) { } log.V(4).Infof("node %q reporting capacity %v", node.hostname, cap) } + if _, ok := executorSlaveIds[slave.Id]; ok { + node.kubeletRunning = true + } nodes[node.hostname] = node } diff --git a/pkg/cloudprovider/providers/mesos/mesos.go b/pkg/cloudprovider/providers/mesos/mesos.go index e376f1a0a1..538c3de5a1 100644 --- a/pkg/cloudprovider/providers/mesos/mesos.go +++ b/pkg/cloudprovider/providers/mesos/mesos.go @@ -30,8 +30,16 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" ) +const ( + ProviderName = "mesos" + + // KubernetesExecutorName is shared between contrib/mesos and Mesos cloud provider. + // Because cloud provider -> contrib dependencies are forbidden, this constant + // is defined here, not in contrib. + KubernetesExecutorName = "Kubelet-Executor" +) + var ( - ProviderName = "mesos" CloudProvider *MesosCloud noHostNameSpecified = errors.New("No hostname specified") @@ -208,9 +216,7 @@ func (c *MesosCloud) InstanceID(name string) (string, error) { return "", nil } -// List lists instances that match 'filter' which is a regular expression -// which must match the entire instance name (fqdn). -func (c *MesosCloud) List(filter string) ([]string, error) { +func (c *MesosCloud) listNodes() (map[string]*slaveNode, error) { //TODO(jdef) use a timeout here? 15s? ctx, cancel := context.WithCancel(context.TODO()) defer cancel() @@ -223,6 +229,16 @@ func (c *MesosCloud) List(filter string) ([]string, error) { log.V(2).Info("no slaves found, are any running?") return nil, nil } + return nodes, nil +} + +// List lists instances that match 'filter' which is a regular expression +// which must match the entire instance name (fqdn). +func (c *MesosCloud) List(filter string) ([]string, error) { + nodes, err := c.listNodes() + if err != nil { + return nil, err + } filterRegex, err := regexp.Compile(filter) if err != nil { return nil, err @@ -233,7 +249,23 @@ func (c *MesosCloud) List(filter string) ([]string, error) { addr = append(addr, node.hostname) } } - return addr, err + return addr, nil +} + +// ListWithKubelet list those instance which have no running kubelet, i.e. the +// Kubernetes executor. +func (c *MesosCloud) ListWithoutKubelet() ([]string, error) { + nodes, err := c.listNodes() + if err != nil { + return nil, err + } + addr := make([]string, 0, len(nodes)) + for _, n := range nodes { + if !n.kubeletRunning { + addr = append(addr, n.hostname) + } + } + return addr, nil } // NodeAddresses returns the addresses of the specified instance. From 2f45d5706b15ccc0b5681128170fa0e1ff92b55f Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 22 Oct 2015 14:48:49 -0700 Subject: [PATCH 4/4] Add node status update controller --- .../controllermanager/controllermanager.go | 6 + contrib/mesos/pkg/executor/config/config.go | 1 - contrib/mesos/pkg/node/statusupdater.go | 195 ++++++++++++++++++ contrib/mesos/pkg/node/statusupdater_test.go | 77 +++++++ .../mesos/pkg/scheduler/service/service.go | 3 +- pkg/cloudprovider/providers/mesos/client.go | 2 +- 6 files changed, 281 insertions(+), 3 deletions(-) create mode 100644 contrib/mesos/pkg/node/statusupdater.go create mode 100644 contrib/mesos/pkg/node/statusupdater_test.go diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 055d6876e8..6c3e6a3918 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -26,6 +26,7 @@ import ( "time" "k8s.io/kubernetes/cmd/kube-controller-manager/app" + "k8s.io/kubernetes/contrib/mesos/pkg/node" "k8s.io/kubernetes/pkg/api/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" @@ -140,6 +141,11 @@ func (s *CMServer) Run(_ []string) error { s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod) + nodeStatusUpdaterController := node.NewStatusUpdater(kubeClient, s.NodeMonitorPeriod, time.Now) + if err := nodeStatusUpdaterController.Run(util.NeverStop); err != nil { + glog.Fatalf("Failed to start node status update controller: %v", err) + } + serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName) if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil { glog.Errorf("Failed to start service controller: %v", err) diff --git a/contrib/mesos/pkg/executor/config/config.go b/contrib/mesos/pkg/executor/config/config.go index e61ce500f2..55af9fe97f 100644 --- a/contrib/mesos/pkg/executor/config/config.go +++ b/contrib/mesos/pkg/executor/config/config.go @@ -24,7 +24,6 @@ import ( const ( DefaultInfoID = "k8sm-executor" DefaultInfoSource = "kubernetes" - DefaultInfoName = "Kubelet-Executor" DefaultSuicideTimeout = 20 * time.Minute DefaultLaunchGracePeriod = 5 * time.Minute ) diff --git a/contrib/mesos/pkg/node/statusupdater.go b/contrib/mesos/pkg/node/statusupdater.go new file mode 100644 index 0000000000..c9d8172473 --- /dev/null +++ b/contrib/mesos/pkg/node/statusupdater.go @@ -0,0 +1,195 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "fmt" + "time" + + log "github.com/golang/glog" + kubelet "k8s.io/kubernetes/cmd/kubelet/app" + "k8s.io/kubernetes/contrib/mesos/pkg/runtime" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/cache" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/cloudprovider/providers/mesos" + "k8s.io/kubernetes/pkg/fields" +) + +const ( + nodeStatusUpdateRetry = 5 + slaveReadyReason = "SlaveReady" + slaveReadyMessage = "mesos reports ready status" +) + +type StatusUpdater struct { + client *client.Client + relistPeriod time.Duration + heartBeatPeriod time.Duration + nowFunc func() time.Time +} + +func NewStatusUpdater(client *client.Client, relistPeriod time.Duration, nowFunc func() time.Time) *StatusUpdater { + kubecfg := kubelet.NewKubeletServer() // only create to get the config, this is without side-effects + return &StatusUpdater{ + client: client, + relistPeriod: relistPeriod, + heartBeatPeriod: kubecfg.NodeStatusUpdateFrequency, + nowFunc: nowFunc, + } +} + +func (u *StatusUpdater) Run(terminate <-chan struct{}) error { + nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + nodeLW := cache.NewListWatchFromClient(u.client, "nodes", api.NamespaceAll, fields.Everything()) + cache.NewReflector(nodeLW, &api.Node{}, nodeStore, u.relistPeriod).Run() + + monitor := func() { + // build up a slave set of nodes without kubelet + slavesWithoutKubeletList, err := mesos.CloudProvider.ListWithoutKubelet() + if err != nil { + log.Errorf("Error while updating slave nodes: %v", err) + return + } + slavesWithoutKubelet := make(map[string]struct{}, len(slavesWithoutKubeletList)) + for _, s := range slavesWithoutKubeletList { + slavesWithoutKubelet[s] = struct{}{} + } + + // update status for nodes which do not have a kubelet running and + // which are still existing as slave. This status update must be done + // before the node controller counts down the NodeMonitorGracePeriod + obj, err := nodeLW.List() + if err != nil { + log.Errorf("Error listing the nodes for status updates: %v", err) + } + nl, _ := obj.(*api.NodeList) + nodes := nl.Items + + for i := range nodes { + if _, ok := slavesWithoutKubelet[nodes[i].Spec.ExternalID]; !ok { + // let the kubelet do its job updating the status, or the + // node controller will remove this node if the node does not even + // exist anymore + continue + } + + err := u.updateStatus(&nodes[i]) + if err != nil { + log.Errorf("Error updating node status: %v", err) + } + } + } + go runtime.Until(monitor, u.heartBeatPeriod, terminate) + + return nil +} + +func (u *StatusUpdater) updateStatus(n *api.Node) error { + for i := 0; i < nodeStatusUpdateRetry; i++ { + if err := u.tryUpdateStatus(n); err != nil && !errors.IsConflict(err) { + log.Errorf("Error updating node status, will retry: %v", err) + } else { + return nil + } + } + return fmt.Errorf("Update node status exceeds retry count") +} + +// nodeWithUpdatedStatus clones the given node and updates the NodeReady condition. +// The updated node is return and a boolean indicating whether the node was changed +// at all. +func (u *StatusUpdater) nodeWithUpdatedStatus(n *api.Node) (*api.Node, bool, error) { + readyCondition := getCondition(&n.Status, api.NodeReady) + currentTime := unversioned.NewTime(u.nowFunc()) + + // avoid flapping by waiting at least twice the kubetlet update frequency, i.e. + // give the kubelet the chance twice to update the heartbeat. This is necessary + // because we only poll the Mesos master state.json once in a while and we + // know that that the information from there can easily be outdated. + gracePeriod := u.heartBeatPeriod * 2 + if readyCondition != nil && !currentTime.After(readyCondition.LastHeartbeatTime.Add(gracePeriod)) { + return n, false, nil + } + + clone, err := api.Scheme.DeepCopy(n) + if err != nil { + return nil, false, err + } + n = clone.(*api.Node) + + newNodeReadyCondition := api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: slaveReadyReason, + Message: slaveReadyMessage, + LastHeartbeatTime: currentTime, + } + + found := false + for i := range n.Status.Conditions { + c := &n.Status.Conditions[i] + if c.Type == api.NodeReady { + if c.Status == newNodeReadyCondition.Status { + newNodeReadyCondition.LastTransitionTime = c.LastTransitionTime + } else { + newNodeReadyCondition.LastTransitionTime = currentTime + } + n.Status.Conditions[i] = newNodeReadyCondition + found = true + break + } + } + + if !found { + newNodeReadyCondition.LastTransitionTime = currentTime + n.Status.Conditions = append(n.Status.Conditions, newNodeReadyCondition) + } + + return n, true, nil +} + +// tryUpdateStatus updates the status of the given node and tries to persist that +// on the apiserver +func (u *StatusUpdater) tryUpdateStatus(n *api.Node) error { + n, updated, err := u.nodeWithUpdatedStatus(n) + if err != nil { + return err + } + if !updated { + return nil + } + + _, err = u.client.Nodes().UpdateStatus(n) + return err +} + +// getCondition returns a condition object for the specific condition +// type, nil if the condition is not set. +func getCondition(status *api.NodeStatus, conditionType api.NodeConditionType) *api.NodeCondition { + if status == nil { + return nil + } + for i := range status.Conditions { + if status.Conditions[i].Type == conditionType { + return &status.Conditions[i] + } + } + return nil +} diff --git a/contrib/mesos/pkg/node/statusupdater_test.go b/contrib/mesos/pkg/node/statusupdater_test.go new file mode 100644 index 0000000000..ea3b831802 --- /dev/null +++ b/contrib/mesos/pkg/node/statusupdater_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/cmd/kube-controller-manager/app" + kubelet "k8s.io/kubernetes/cmd/kubelet/app" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" +) + +func Test_nodeWithUpdatedStatus(t *testing.T) { + now := time.Now() + testNode := func(d time.Duration, s api.ConditionStatus, r string) *api.Node { + return &api.Node{ + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{{ + Type: api.NodeOutOfDisk, + }, { + Type: api.NodeReady, + Status: s, + Reason: r, + Message: "some message we don't care about here", + LastTransitionTime: unversioned.Time{now.Add(-time.Minute)}, + LastHeartbeatTime: unversioned.Time{now.Add(d)}, + }}, + }, + } + } + + cm := app.NewCMServer() + kubecfg := kubelet.NewKubeletServer() + assert.True(t, kubecfg.NodeStatusUpdateFrequency*3 < cm.NodeMonitorGracePeriod) // sanity check for defaults + + n := testNode(0, api.ConditionTrue, "KubeletReady") + su := NewStatusUpdater(nil, cm.NodeMonitorPeriod, func() time.Time { return now }) + _, updated, err := su.nodeWithUpdatedStatus(n) + assert.NoError(t, err) + assert.False(t, updated, "no update expected b/c kubelet updated heartbeat just now") + + n = testNode(-cm.NodeMonitorGracePeriod, api.ConditionTrue, "KubeletReady") + n2, updated, err := su.nodeWithUpdatedStatus(n) + assert.NoError(t, err) + assert.True(t, updated, "update expected b/c kubelet's update is older than DefaultNodeMonitorGracePeriod") + assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Reason, slaveReadyReason) + assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Message, slaveReadyMessage) + + n = testNode(-kubecfg.NodeStatusUpdateFrequency, api.ConditionTrue, "KubeletReady") + n2, updated, err = su.nodeWithUpdatedStatus(n) + assert.NoError(t, err) + assert.False(t, updated, "no update expected b/c kubelet's update was missed only once") + + n = testNode(-kubecfg.NodeStatusUpdateFrequency*3, api.ConditionTrue, "KubeletReady") + n2, updated, err = su.nodeWithUpdatedStatus(n) + assert.NoError(t, err) + assert.True(t, updated, "update expected b/c kubelet's update is older than 3*DefaultNodeStatusUpdateFrequency") + assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Reason, slaveReadyReason) + assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Message, slaveReadyMessage) +} diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 244f7e6a91..6e08417dab 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -70,6 +70,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" clientauth "k8s.io/kubernetes/pkg/client/unversioned/auth" + cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/mesos" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/master/ports" @@ -426,7 +427,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E // Create mesos scheduler driver. execInfo := &mesos.ExecutorInfo{ Command: ci, - Name: proto.String(execcfg.DefaultInfoName), + Name: proto.String(cloud.KubernetesExecutorName), Source: proto.String(execcfg.DefaultInfoSource), } diff --git a/pkg/cloudprovider/providers/mesos/client.go b/pkg/cloudprovider/providers/mesos/client.go index 70d8ada92c..26426ef556 100644 --- a/pkg/cloudprovider/providers/mesos/client.go +++ b/pkg/cloudprovider/providers/mesos/client.go @@ -249,7 +249,7 @@ func parseMesosState(blob []byte) (*mesosState, error) { // k8s instance in a cluster. At the moment this is not possible for // a number of reasons. // TODO(sttts): find way to detect executors of this k8s instance - if e.Name == "Kubelet-Executor" { + if e.Name == KubernetesExecutorName { executorSlaveIds[e.SlaveId] = struct{}{} } }