diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 36ee48d89b..410a88cf79 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -95,6 +95,10 @@ func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) { return exists, nil } +// NodeConditionPredicate is a function that indicates whether the given node's conditions meet +// some set of criteria defined by the function. +type NodeConditionPredicate func(node api.Node) bool + // StoreToNodeLister makes a Store have the List method of the client.NodeInterface // The Store must contain (only) Nodes. type StoreToNodeLister struct { @@ -109,50 +113,26 @@ func (s *StoreToNodeLister) List() (machines api.NodeList, err error) { } // NodeCondition returns a storeToNodeConditionLister -func (s *StoreToNodeLister) NodeCondition(conditionType api.NodeConditionType, conditionStatus api.ConditionStatus) storeToNodeConditionLister { +func (s *StoreToNodeLister) NodeCondition(predicate NodeConditionPredicate) storeToNodeConditionLister { // TODO: Move this filtering server side. Currently our selectors don't facilitate searching through a list so we // have the reflector filter out the Unschedulable field and sift through node conditions in the lister. - return storeToNodeConditionLister{s.Store, conditionType, conditionStatus} + return storeToNodeConditionLister{s.Store, predicate} } // storeToNodeConditionLister filters and returns nodes matching the given type and status from the store. type storeToNodeConditionLister struct { - store Store - conditionType api.NodeConditionType - conditionStatus api.ConditionStatus + store Store + predicate NodeConditionPredicate } -// List returns a list of nodes that match the condition type/status in the storeToNodeConditionLister. +// List returns a list of nodes that match the conditions defined by the predicate functions in the storeToNodeConditionLister. func (s storeToNodeConditionLister) List() (nodes api.NodeList, err error) { for _, m := range s.store.List() { node := *m.(*api.Node) - - // We currently only use a conditionType of "Ready". If the kubelet doesn't - // periodically report the status of a node, the nodecontroller sets its - // ConditionStatus to "Unknown". If the kubelet thinks a node is unhealthy - // it can (in theory) set its ConditionStatus to "False". - var nodeCondition *api.NodeCondition - - // Get the last condition of the required type - for _, cond := range node.Status.Conditions { - if cond.Type == s.conditionType { - condCopy := cond - nodeCondition = &condCopy - break - } else { - glog.V(4).Infof("Ignoring condition type %v for node %v", cond.Type, node.Name) - } - } - - // Check that the condition has the required status - if nodeCondition != nil { - if nodeCondition.Status == s.conditionStatus { - nodes.Items = append(nodes.Items, node) - } else { - glog.V(4).Infof("Ignoring node %v with condition status %v", node.Name, nodeCondition.Status) - } + if s.predicate(node) { + nodes.Items = append(nodes.Items, node) } else { - glog.V(2).Infof("Node %s doesn't have conditions of type %v", node.Name, s.conditionType) + glog.V(2).Infof("Node %s matches none of the conditions", node.Name) } } return diff --git a/pkg/client/cache/listers_test.go b/pkg/client/cache/listers_test.go index 45570b2327..e92de89a7d 100644 --- a/pkg/client/cache/listers_test.go +++ b/pkg/client/cache/listers_test.go @@ -46,6 +46,81 @@ func TestStoreToNodeLister(t *testing.T) { } } +func TestStoreToNodeConditionLister(t *testing.T) { + store := NewStore(MetaNamespaceKeyFunc) + nodes := []*api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionTrue, + }, + { + Type: api.NodeOutOfDisk, + Status: api.ConditionFalse, + }, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "bar"}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeOutOfDisk, + Status: api.ConditionTrue, + }, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "baz"}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFalse, + }, + { + Type: api.NodeOutOfDisk, + Status: api.ConditionUnknown, + }, + }, + }, + }, + } + for _, n := range nodes { + store.Add(n) + } + + predicate := func(node api.Node) bool { + for _, cond := range node.Status.Conditions { + if cond.Type == api.NodeOutOfDisk && cond.Status == api.ConditionTrue { + return false + } + } + return true + } + + snl := StoreToNodeLister{store} + sncl := snl.NodeCondition(predicate) + + want := sets.NewString("foo", "baz") + gotNodes, err := sncl.List() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + got := make([]string, len(gotNodes.Items)) + for ix := range gotNodes.Items { + got[ix] = gotNodes.Items[ix].Name + } + if !want.HasAll(got...) || len(got) != len(want) { + t.Errorf("Expected %v, got %v", want, got) + } +} + func TestStoreToReplicationControllerLister(t *testing.T) { store := NewStore(MetaNamespaceKeyFunc) lister := StoreToReplicationControllerLister{store} diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 7feb0f59c9..1bc74f6113 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -614,7 +614,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap if nc.now().After(savedNodeStatus.probeTimestamp.Add(gracePeriod)) { // NodeReady condition was last set longer ago than gracePeriod, so update it to Unknown - // (regardless of its current value) in the master, without contacting kubelet. + // (regardless of its current value) in the master. if readyCondition == nil { glog.V(2).Infof("node %v is never updated by kubelet", node.Name) node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{ @@ -637,6 +637,32 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap readyCondition.LastTransitionTime = nc.now() } } + + // Like NodeReady condition, NodeOutOfDisk was last set longer ago than gracePeriod, so update + // it to Unknown (regardless of its current value) in the master. + // TODO(madhusudancs): Refactor this with readyCondition to remove duplicated code. + oodCondition := nc.getCondition(&node.Status, api.NodeOutOfDisk) + if oodCondition == nil { + glog.V(2).Infof("Out of disk condition of node %v is never updated by kubelet", node.Name) + node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{ + Type: api.NodeOutOfDisk, + Status: api.ConditionUnknown, + Reason: "NodeStatusNeverUpdated", + Message: fmt.Sprintf("Kubelet never posted node status."), + LastHeartbeatTime: node.CreationTimestamp, + LastTransitionTime: nc.now(), + }) + } else { + glog.V(2).Infof("node %v hasn't been updated for %+v. Last out of disk condition is: %+v", + node.Name, nc.now().Time.Sub(savedNodeStatus.probeTimestamp.Time), oodCondition) + if oodCondition.Status != api.ConditionUnknown { + oodCondition.Status = api.ConditionUnknown + oodCondition.Reason = "NodeStatusUnknown" + oodCondition.Message = fmt.Sprintf("Kubelet stopped posting node status.") + oodCondition.LastTransitionTime = nc.now() + } + } + if !api.Semantic.DeepEqual(nc.getCondition(&node.Status, api.NodeReady), lastReadyCondition) { if _, err = nc.kubeClient.Nodes().UpdateStatus(node); err != nil { glog.Errorf("Error updating node %s: %v", node.Name, err) diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index b9a43f85bb..b26a8bb070 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -18,7 +18,6 @@ package node import ( "errors" - "fmt" "sync" "testing" "time" @@ -402,7 +401,15 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { Type: api.NodeReady, Status: api.ConditionUnknown, Reason: "NodeStatusNeverUpdated", - Message: fmt.Sprintf("Kubelet never posted node status."), + Message: "Kubelet never posted node status.", + LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: fakeNow, + }, + { + Type: api.NodeOutOfDisk, + Status: api.ConditionUnknown, + Reason: "NodeStatusNeverUpdated", + Message: "Kubelet never posted node status.", LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), LastTransitionTime: fakeNow, }, @@ -447,6 +454,13 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), }, + { + Type: api.NodeOutOfDisk, + Status: api.ConditionFalse, + // Node status hasn't been updated for 1hr. + LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, }, Capacity: api.ResourceList{ api.ResourceName(api.ResourceCPU): resource.MustParse("10"), @@ -471,6 +485,13 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), }, + { + Type: api.NodeOutOfDisk, + Status: api.ConditionFalse, + // Node status hasn't been updated for 1hr. + LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, }, Capacity: api.ResourceList{ api.ResourceName(api.ResourceCPU): resource.MustParse("10"), @@ -488,8 +509,16 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { { Type: api.NodeReady, Status: api.ConditionUnknown, - Reason: "NodeStatusStopUpdated", - Message: fmt.Sprintf("Kubelet stopped posting node status."), + Reason: "NodeStatusUnknown", + Message: "Kubelet stopped posting node status.", + LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: unversioned.Time{Time: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC).Add(time.Hour)}, + }, + { + Type: api.NodeOutOfDisk, + Status: api.ConditionUnknown, + Reason: "NodeStatusUnknown", + Message: "Kubelet stopped posting node status.", LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), LastTransitionTime: unversioned.Time{Time: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC).Add(time.Hour)}, }, @@ -542,7 +571,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { }, } - for _, item := range table { + for i, item := range table { nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() unversioned.Time { return fakeNow } @@ -560,8 +589,10 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) } if len(item.fakeNodeHandler.UpdatedNodes) > 0 && !api.Semantic.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) { - t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], - item.fakeNodeHandler.UpdatedNodes[0]) + t.Errorf("Case[%d] unexpected nodes: %s", i, util.ObjectDiff(item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0])) + } + if len(item.fakeNodeHandler.UpdatedNodeStatuses) > 0 && !api.Semantic.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodeStatuses) { + t.Errorf("Case[%d] unexpected nodes: %s", i, util.ObjectDiff(item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodeStatuses[0])) } } } diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index be3bf184ea..7de2e57d8a 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -164,7 +164,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String) ServiceLister: f.ServiceLister, ControllerLister: f.ControllerLister, // All fit predicates only need to consider schedulable nodes. - NodeLister: f.NodeLister.NodeCondition(api.NodeReady, api.ConditionTrue), + NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), NodeInfo: f.NodeLister, } predicateFuncs, err := getFitPredicateFunctions(predicateKeys, pluginArgs) @@ -212,7 +212,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String) return &scheduler.Config{ Modeler: f.modeler, // The scheduler only needs to consider schedulable nodes. - NodeLister: f.NodeLister.NodeCondition(api.NodeReady, api.ConditionTrue), + NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), Algorithm: algo, Binder: &binder{f.Client}, NextPod: func() *api.Pod { @@ -226,6 +226,23 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String) }, nil } +func getNodeConditionPredicate() cache.NodeConditionPredicate { + return func(node api.Node) bool { + for _, cond := range node.Status.Conditions { + // We consider the node for scheduling only when its NodeReady condition status + // is ConditionTrue and its NodeOutOfDisk condition status is ConditionFalse. + if cond.Type == api.NodeReady && cond.Status != api.ConditionTrue { + glog.V(4).Infof("Ignoring node %v with %v condition status %v", node.Name, cond.Type, cond.Status) + return false + } else if cond.Type == api.NodeOutOfDisk && cond.Status != api.ConditionFalse { + glog.V(4).Infof("Ignoring node %v with %v condition status %v", node.Name, cond.Type, cond.Status) + return false + } + } + return true + } +} + // Returns a cache.ListWatch that finds all pods that need to be // scheduled. func (factory *ConfigFactory) createUnassignedPodLW() *cache.ListWatch {