Merge pull request #16034 from mesosphere/sttts-node-lifecycle

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2015-11-16 07:27:25 -08:00
commit 6d8296a312
9 changed files with 399 additions and 19 deletions

View File

@ -124,6 +124,9 @@ func NewCMServer() *CMServer {
MinResyncPeriod: 12 * time.Hour,
RegisterRetryCount: 10,
PodEvictionTimeout: 5 * time.Minute,
NodeMonitorGracePeriod: 40 * time.Second,
NodeStartupGracePeriod: 60 * time.Second,
NodeMonitorPeriod: 5 * time.Second,
ClusterName: "kubernetes",
TerminatedPodGCThreshold: 12500,
VolumeConfigFlags: VolumeConfigFlags{
@ -204,13 +207,13 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.RegisterRetryCount, "register-retry-count", s.RegisterRetryCount, ""+
"The number of retries for initial node registration. Retry interval equals node-sync-period.")
fs.MarkDeprecated("register-retry-count", "This flag is currently no-op and will be deleted.")
fs.DurationVar(&s.NodeMonitorGracePeriod, "node-monitor-grace-period", 40*time.Second,
fs.DurationVar(&s.NodeMonitorGracePeriod, "node-monitor-grace-period", s.NodeMonitorGracePeriod,
"Amount of time which we allow running Node to be unresponsive before marking it unhealty. "+
"Must be N times more than kubelet's nodeStatusUpdateFrequency, "+
"where N means number of retries allowed for kubelet to post node status.")
fs.DurationVar(&s.NodeStartupGracePeriod, "node-startup-grace-period", 60*time.Second,
fs.DurationVar(&s.NodeStartupGracePeriod, "node-startup-grace-period", s.NodeStartupGracePeriod,
"Amount of time which we allow starting Node to be unresponsive before marking it unhealty.")
fs.DurationVar(&s.NodeMonitorPeriod, "node-monitor-period", 5*time.Second,
fs.DurationVar(&s.NodeMonitorPeriod, "node-monitor-period", s.NodeMonitorPeriod,
"The period for syncing NodeStatus in NodeController.")
fs.StringVar(&s.ServiceAccountKeyFile, "service-account-private-key-file", s.ServiceAccountKeyFile, "Filename containing a PEM-encoded private RSA key used to sign service account tokens.")
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")

View File

@ -26,6 +26,7 @@ import (
"time"
"k8s.io/kubernetes/cmd/kube-controller-manager/app"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
@ -140,6 +141,11 @@ func (s *CMServer) Run(_ []string) error {
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod)
nodeStatusUpdaterController := node.NewStatusUpdater(kubeClient, s.NodeMonitorPeriod, time.Now)
if err := nodeStatusUpdaterController.Run(util.NeverStop); err != nil {
glog.Fatalf("Failed to start node status update controller: %v", err)
}
serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName)
if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil {
glog.Errorf("Failed to start service controller: %v", err)

View File

@ -24,7 +24,6 @@ import (
const (
DefaultInfoID = "k8sm-executor"
DefaultInfoSource = "kubernetes"
DefaultInfoName = "Kubelet-Executor"
DefaultSuicideTimeout = 20 * time.Minute
DefaultLaunchGracePeriod = 5 * time.Minute
)

View File

@ -0,0 +1,195 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"fmt"
"time"
log "github.com/golang/glog"
kubelet "k8s.io/kubernetes/cmd/kubelet/app"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider/providers/mesos"
"k8s.io/kubernetes/pkg/fields"
)
const (
nodeStatusUpdateRetry = 5
slaveReadyReason = "SlaveReady"
slaveReadyMessage = "mesos reports ready status"
)
type StatusUpdater struct {
client *client.Client
relistPeriod time.Duration
heartBeatPeriod time.Duration
nowFunc func() time.Time
}
func NewStatusUpdater(client *client.Client, relistPeriod time.Duration, nowFunc func() time.Time) *StatusUpdater {
kubecfg := kubelet.NewKubeletServer() // only create to get the config, this is without side-effects
return &StatusUpdater{
client: client,
relistPeriod: relistPeriod,
heartBeatPeriod: kubecfg.NodeStatusUpdateFrequency,
nowFunc: nowFunc,
}
}
func (u *StatusUpdater) Run(terminate <-chan struct{}) error {
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
nodeLW := cache.NewListWatchFromClient(u.client, "nodes", api.NamespaceAll, fields.Everything())
cache.NewReflector(nodeLW, &api.Node{}, nodeStore, u.relistPeriod).Run()
monitor := func() {
// build up a slave set of nodes without kubelet
slavesWithoutKubeletList, err := mesos.CloudProvider.ListWithoutKubelet()
if err != nil {
log.Errorf("Error while updating slave nodes: %v", err)
return
}
slavesWithoutKubelet := make(map[string]struct{}, len(slavesWithoutKubeletList))
for _, s := range slavesWithoutKubeletList {
slavesWithoutKubelet[s] = struct{}{}
}
// update status for nodes which do not have a kubelet running and
// which are still existing as slave. This status update must be done
// before the node controller counts down the NodeMonitorGracePeriod
obj, err := nodeLW.List()
if err != nil {
log.Errorf("Error listing the nodes for status updates: %v", err)
}
nl, _ := obj.(*api.NodeList)
nodes := nl.Items
for i := range nodes {
if _, ok := slavesWithoutKubelet[nodes[i].Spec.ExternalID]; !ok {
// let the kubelet do its job updating the status, or the
// node controller will remove this node if the node does not even
// exist anymore
continue
}
err := u.updateStatus(&nodes[i])
if err != nil {
log.Errorf("Error updating node status: %v", err)
}
}
}
go runtime.Until(monitor, u.heartBeatPeriod, terminate)
return nil
}
func (u *StatusUpdater) updateStatus(n *api.Node) error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
if err := u.tryUpdateStatus(n); err != nil && !errors.IsConflict(err) {
log.Errorf("Error updating node status, will retry: %v", err)
} else {
return nil
}
}
return fmt.Errorf("Update node status exceeds retry count")
}
// nodeWithUpdatedStatus clones the given node and updates the NodeReady condition.
// The updated node is return and a boolean indicating whether the node was changed
// at all.
func (u *StatusUpdater) nodeWithUpdatedStatus(n *api.Node) (*api.Node, bool, error) {
readyCondition := getCondition(&n.Status, api.NodeReady)
currentTime := unversioned.NewTime(u.nowFunc())
// avoid flapping by waiting at least twice the kubetlet update frequency, i.e.
// give the kubelet the chance twice to update the heartbeat. This is necessary
// because we only poll the Mesos master state.json once in a while and we
// know that that the information from there can easily be outdated.
gracePeriod := u.heartBeatPeriod * 2
if readyCondition != nil && !currentTime.After(readyCondition.LastHeartbeatTime.Add(gracePeriod)) {
return n, false, nil
}
clone, err := api.Scheme.DeepCopy(n)
if err != nil {
return nil, false, err
}
n = clone.(*api.Node)
newNodeReadyCondition := api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: slaveReadyReason,
Message: slaveReadyMessage,
LastHeartbeatTime: currentTime,
}
found := false
for i := range n.Status.Conditions {
c := &n.Status.Conditions[i]
if c.Type == api.NodeReady {
if c.Status == newNodeReadyCondition.Status {
newNodeReadyCondition.LastTransitionTime = c.LastTransitionTime
} else {
newNodeReadyCondition.LastTransitionTime = currentTime
}
n.Status.Conditions[i] = newNodeReadyCondition
found = true
break
}
}
if !found {
newNodeReadyCondition.LastTransitionTime = currentTime
n.Status.Conditions = append(n.Status.Conditions, newNodeReadyCondition)
}
return n, true, nil
}
// tryUpdateStatus updates the status of the given node and tries to persist that
// on the apiserver
func (u *StatusUpdater) tryUpdateStatus(n *api.Node) error {
n, updated, err := u.nodeWithUpdatedStatus(n)
if err != nil {
return err
}
if !updated {
return nil
}
_, err = u.client.Nodes().UpdateStatus(n)
return err
}
// getCondition returns a condition object for the specific condition
// type, nil if the condition is not set.
func getCondition(status *api.NodeStatus, conditionType api.NodeConditionType) *api.NodeCondition {
if status == nil {
return nil
}
for i := range status.Conditions {
if status.Conditions[i].Type == conditionType {
return &status.Conditions[i]
}
}
return nil
}

View File

@ -0,0 +1,77 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/cmd/kube-controller-manager/app"
kubelet "k8s.io/kubernetes/cmd/kubelet/app"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
)
func Test_nodeWithUpdatedStatus(t *testing.T) {
now := time.Now()
testNode := func(d time.Duration, s api.ConditionStatus, r string) *api.Node {
return &api.Node{
Status: api.NodeStatus{
Conditions: []api.NodeCondition{{
Type: api.NodeOutOfDisk,
}, {
Type: api.NodeReady,
Status: s,
Reason: r,
Message: "some message we don't care about here",
LastTransitionTime: unversioned.Time{now.Add(-time.Minute)},
LastHeartbeatTime: unversioned.Time{now.Add(d)},
}},
},
}
}
cm := app.NewCMServer()
kubecfg := kubelet.NewKubeletServer()
assert.True(t, kubecfg.NodeStatusUpdateFrequency*3 < cm.NodeMonitorGracePeriod) // sanity check for defaults
n := testNode(0, api.ConditionTrue, "KubeletReady")
su := NewStatusUpdater(nil, cm.NodeMonitorPeriod, func() time.Time { return now })
_, updated, err := su.nodeWithUpdatedStatus(n)
assert.NoError(t, err)
assert.False(t, updated, "no update expected b/c kubelet updated heartbeat just now")
n = testNode(-cm.NodeMonitorGracePeriod, api.ConditionTrue, "KubeletReady")
n2, updated, err := su.nodeWithUpdatedStatus(n)
assert.NoError(t, err)
assert.True(t, updated, "update expected b/c kubelet's update is older than DefaultNodeMonitorGracePeriod")
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Reason, slaveReadyReason)
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Message, slaveReadyMessage)
n = testNode(-kubecfg.NodeStatusUpdateFrequency, api.ConditionTrue, "KubeletReady")
n2, updated, err = su.nodeWithUpdatedStatus(n)
assert.NoError(t, err)
assert.False(t, updated, "no update expected b/c kubelet's update was missed only once")
n = testNode(-kubecfg.NodeStatusUpdateFrequency*3, api.ConditionTrue, "KubeletReady")
n2, updated, err = su.nodeWithUpdatedStatus(n)
assert.NoError(t, err)
assert.True(t, updated, "update expected b/c kubelet's update is older than 3*DefaultNodeStatusUpdateFrequency")
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Reason, slaveReadyReason)
assert.Equal(t, getCondition(&n2.Status, api.NodeReady).Message, slaveReadyMessage)
}

View File

@ -70,6 +70,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
clientauth "k8s.io/kubernetes/pkg/client/unversioned/auth"
cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/mesos"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/master/ports"
@ -426,7 +427,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
// Create mesos scheduler driver.
execInfo := &mesos.ExecutorInfo{
Command: ci,
Name: proto.String(execcfg.DefaultInfoName),
Name: proto.String(cloud.KubernetesExecutorName),
Source: proto.String(execcfg.DefaultInfoSource),
}

View File

@ -49,13 +49,14 @@ type mesosClient struct {
}
type slaveNode struct {
hostname string
resources *api.NodeResources
hostname string
kubeletRunning bool
resources *api.NodeResources
}
type mesosState struct {
clusterName string
nodes []*slaveNode
nodes map[string]*slaveNode // by hostname
}
type stateCache struct {
@ -94,7 +95,7 @@ func (c *stateCache) clusterName(ctx context.Context) (string, error) {
}
// nodes returns the cached list of slave nodes.
func (c *stateCache) nodes(ctx context.Context) ([]*slaveNode, error) {
func (c *stateCache) nodes(ctx context.Context) (map[string]*slaveNode, error) {
cached, err := c.cachedState(ctx)
return cached.nodes, err
}
@ -160,9 +161,9 @@ func unpackIPv4(ip uint32) string {
return ipv4.String()
}
// listSlaves returns a (possibly cached) list of slave nodes.
// listSlaves returns a (possibly cached) map of slave nodes by hostname.
// Callers must not mutate the contents of the returned slice.
func (c *mesosClient) listSlaves(ctx context.Context) ([]*slaveNode, error) {
func (c *mesosClient) listSlaves(ctx context.Context) (map[string]*slaveNode, error) {
return c.state.nodes(ctx)
}
@ -225,12 +226,36 @@ func parseMesosState(blob []byte) (*mesosState, error) {
Hostname string `json:"hostname"` // ex: 10.22.211.18, or slave-123.nowhere.com
Resources map[string]interface{} `json:"resources"` // ex: {"mem": 123, "ports": "[31000-3200]"}
} `json:"slaves"`
Frameworks []*struct {
Id string `json:"id"` // ex: 20151105-093752-3745622208-5050-1-0000
Pid string `json:"pid"` // ex: scheduler(1)@192.168.65.228:57124
Executors []*struct {
SlaveId string `json:"slave_id"` // ex: 20151105-093752-3745622208-5050-1-S1
ExecutorId string `json:"executor_id"` // ex: 6704d375c68fee1e_k8sm-executor
Name string `json:"name"` // ex: Kubelet-Executor
} `json:"executors"`
} `json:"frameworks"`
}
state := &State{ClusterName: defaultClusterName}
if err := json.Unmarshal(blob, state); err != nil {
return nil, err
}
nodes := []*slaveNode{}
executorSlaveIds := map[string]struct{}{}
for _, f := range state.Frameworks {
for _, e := range f.Executors {
// Note that this simple comparison breaks when we support more than one
// k8s instance in a cluster. At the moment this is not possible for
// a number of reasons.
// TODO(sttts): find way to detect executors of this k8s instance
if e.Name == KubernetesExecutorName {
executorSlaveIds[e.SlaveId] = struct{}{}
}
}
}
nodes := map[string]*slaveNode{} // by hostname
for _, slave := range state.Slaves {
if slave.Hostname == "" {
continue
@ -264,7 +289,10 @@ func parseMesosState(blob []byte) (*mesosState, error) {
}
log.V(4).Infof("node %q reporting capacity %v", node.hostname, cap)
}
nodes = append(nodes, node)
if _, ok := executorSlaveIds[slave.Id]; ok {
node.kubeletRunning = true
}
nodes[node.hostname] = node
}
result := &mesosState{

View File

@ -30,8 +30,16 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
)
const (
ProviderName = "mesos"
// KubernetesExecutorName is shared between contrib/mesos and Mesos cloud provider.
// Because cloud provider -> contrib dependencies are forbidden, this constant
// is defined here, not in contrib.
KubernetesExecutorName = "Kubelet-Executor"
)
var (
ProviderName = "mesos"
CloudProvider *MesosCloud
noHostNameSpecified = errors.New("No hostname specified")
@ -182,7 +190,21 @@ func ipAddress(name string) (net.IP, error) {
// ExternalID returns the cloud provider ID of the specified instance (deprecated).
func (c *MesosCloud) ExternalID(instance string) (string, error) {
ip, err := ipAddress(instance)
//TODO(jdef) use a timeout here? 15s?
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
nodes, err := c.client.listSlaves(ctx)
if err != nil {
return "", err
}
node := nodes[instance]
if node == nil {
return "", cloudprovider.InstanceNotFound
}
ip, err := ipAddress(node.hostname)
if err != nil {
return "", err
}
@ -194,9 +216,7 @@ func (c *MesosCloud) InstanceID(name string) (string, error) {
return "", nil
}
// List lists instances that match 'filter' which is a regular expression
// which must match the entire instance name (fqdn).
func (c *MesosCloud) List(filter string) ([]string, error) {
func (c *MesosCloud) listNodes() (map[string]*slaveNode, error) {
//TODO(jdef) use a timeout here? 15s?
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
@ -209,6 +229,16 @@ func (c *MesosCloud) List(filter string) ([]string, error) {
log.V(2).Info("no slaves found, are any running?")
return nil, nil
}
return nodes, nil
}
// List lists instances that match 'filter' which is a regular expression
// which must match the entire instance name (fqdn).
func (c *MesosCloud) List(filter string) ([]string, error) {
nodes, err := c.listNodes()
if err != nil {
return nil, err
}
filterRegex, err := regexp.Compile(filter)
if err != nil {
return nil, err
@ -219,7 +249,23 @@ func (c *MesosCloud) List(filter string) ([]string, error) {
addr = append(addr, node.hostname)
}
}
return addr, err
return addr, nil
}
// ListWithKubelet list those instance which have no running kubelet, i.e. the
// Kubernetes executor.
func (c *MesosCloud) ListWithoutKubelet() ([]string, error) {
nodes, err := c.listNodes()
if err != nil {
return nil, err
}
addr := make([]string, 0, len(nodes))
for _, n := range nodes {
if !n.kubeletRunning {
addr = append(addr, n.hostname)
}
}
return addr, nil
}
// NodeAddresses returns the addresses of the specified instance.

View File

@ -24,6 +24,7 @@ import (
"time"
log "github.com/golang/glog"
"k8s.io/kubernetes/pkg/cloudprovider"
)
func TestIPAddress(t *testing.T) {
@ -252,3 +253,27 @@ func Test_List(t *testing.T) {
t.Fatalf("List with a reject-all filter should return a list of size 0: (actual: %#v)", clusters)
}
}
func Test_ExternalID(t *testing.T) {
defer log.Flush()
md := FakeMasterDetector{}
httpServer, httpClient, httpTransport := makeHttpMocks()
defer httpServer.Close()
cacheTTL := 500 * time.Millisecond
mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL)
mesosCloud := &MesosCloud{client: mesosClient, config: createDefaultConfig()}
_, err = mesosCloud.ExternalID("unknown")
if err != cloudprovider.InstanceNotFound {
t.Fatalf("ExternalID did not return InstanceNotFound on an unknown instance")
}
slaveName := "mesos3.internal.company.com"
id, err := mesosCloud.ExternalID(slaveName)
if id != "" {
t.Fatalf("ExternalID should not be able to resolve %q", slaveName)
}
if err == cloudprovider.InstanceNotFound {
t.Fatalf("ExternalID should find %q", slaveName)
}
}