Use node out of disk condition in the scheduler while scheduling pods.

Set the out of disk node condition to unknown in the node controller if
the kubelet does not report its node condition in a long time. Update
node controller unit tests.

Implement a node condition predicate function that checks if a given
node satisfies the conditions defined by the predicate and if it
does, use that node for scheduling pods. The predicate function takes
both NodeReady and NodeOutOfDisk into consideration to determine if a
node is fit for scheduling pods.

The predicate is then passed to the node lister in the scheduler factory
so that the node lister can run the predicate function on the nodes when
schedling pods thereby omitting nodes that does not satisfy the
predicate.

Also update listers test.
pull/6/head
Madhusudan.C.S 2015-10-22 12:47:43 -07:00
parent b74b2aa43e
commit ce257b5a0e
5 changed files with 171 additions and 42 deletions

View File

@ -95,6 +95,10 @@ func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) {
return exists, nil
}
// NodeConditionPredicate is a function that indicates whether the given node's conditions meet
// some set of criteria defined by the function.
type NodeConditionPredicate func(node api.Node) bool
// StoreToNodeLister makes a Store have the List method of the client.NodeInterface
// The Store must contain (only) Nodes.
type StoreToNodeLister struct {
@ -109,50 +113,26 @@ func (s *StoreToNodeLister) List() (machines api.NodeList, err error) {
}
// NodeCondition returns a storeToNodeConditionLister
func (s *StoreToNodeLister) NodeCondition(conditionType api.NodeConditionType, conditionStatus api.ConditionStatus) storeToNodeConditionLister {
func (s *StoreToNodeLister) NodeCondition(predicate NodeConditionPredicate) storeToNodeConditionLister {
// TODO: Move this filtering server side. Currently our selectors don't facilitate searching through a list so we
// have the reflector filter out the Unschedulable field and sift through node conditions in the lister.
return storeToNodeConditionLister{s.Store, conditionType, conditionStatus}
return storeToNodeConditionLister{s.Store, predicate}
}
// storeToNodeConditionLister filters and returns nodes matching the given type and status from the store.
type storeToNodeConditionLister struct {
store Store
conditionType api.NodeConditionType
conditionStatus api.ConditionStatus
store Store
predicate NodeConditionPredicate
}
// List returns a list of nodes that match the condition type/status in the storeToNodeConditionLister.
// List returns a list of nodes that match the conditions defined by the predicate functions in the storeToNodeConditionLister.
func (s storeToNodeConditionLister) List() (nodes api.NodeList, err error) {
for _, m := range s.store.List() {
node := *m.(*api.Node)
// We currently only use a conditionType of "Ready". If the kubelet doesn't
// periodically report the status of a node, the nodecontroller sets its
// ConditionStatus to "Unknown". If the kubelet thinks a node is unhealthy
// it can (in theory) set its ConditionStatus to "False".
var nodeCondition *api.NodeCondition
// Get the last condition of the required type
for _, cond := range node.Status.Conditions {
if cond.Type == s.conditionType {
condCopy := cond
nodeCondition = &condCopy
break
} else {
glog.V(4).Infof("Ignoring condition type %v for node %v", cond.Type, node.Name)
}
}
// Check that the condition has the required status
if nodeCondition != nil {
if nodeCondition.Status == s.conditionStatus {
nodes.Items = append(nodes.Items, node)
} else {
glog.V(4).Infof("Ignoring node %v with condition status %v", node.Name, nodeCondition.Status)
}
if s.predicate(node) {
nodes.Items = append(nodes.Items, node)
} else {
glog.V(2).Infof("Node %s doesn't have conditions of type %v", node.Name, s.conditionType)
glog.V(2).Infof("Node %s matches none of the conditions", node.Name)
}
}
return

View File

@ -46,6 +46,81 @@ func TestStoreToNodeLister(t *testing.T) {
}
}
func TestStoreToNodeConditionLister(t *testing.T) {
store := NewStore(MetaNamespaceKeyFunc)
nodes := []*api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
},
{
Type: api.NodeOutOfDisk,
Status: api.ConditionFalse,
},
},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeOutOfDisk,
Status: api.ConditionTrue,
},
},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "baz"},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFalse,
},
{
Type: api.NodeOutOfDisk,
Status: api.ConditionUnknown,
},
},
},
},
}
for _, n := range nodes {
store.Add(n)
}
predicate := func(node api.Node) bool {
for _, cond := range node.Status.Conditions {
if cond.Type == api.NodeOutOfDisk && cond.Status == api.ConditionTrue {
return false
}
}
return true
}
snl := StoreToNodeLister{store}
sncl := snl.NodeCondition(predicate)
want := sets.NewString("foo", "baz")
gotNodes, err := sncl.List()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
got := make([]string, len(gotNodes.Items))
for ix := range gotNodes.Items {
got[ix] = gotNodes.Items[ix].Name
}
if !want.HasAll(got...) || len(got) != len(want) {
t.Errorf("Expected %v, got %v", want, got)
}
}
func TestStoreToReplicationControllerLister(t *testing.T) {
store := NewStore(MetaNamespaceKeyFunc)
lister := StoreToReplicationControllerLister{store}

View File

@ -614,7 +614,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
if nc.now().After(savedNodeStatus.probeTimestamp.Add(gracePeriod)) {
// NodeReady condition was last set longer ago than gracePeriod, so update it to Unknown
// (regardless of its current value) in the master, without contacting kubelet.
// (regardless of its current value) in the master.
if readyCondition == nil {
glog.V(2).Infof("node %v is never updated by kubelet", node.Name)
node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{
@ -637,6 +637,32 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
readyCondition.LastTransitionTime = nc.now()
}
}
// Like NodeReady condition, NodeOutOfDisk was last set longer ago than gracePeriod, so update
// it to Unknown (regardless of its current value) in the master.
// TODO(madhusudancs): Refactor this with readyCondition to remove duplicated code.
oodCondition := nc.getCondition(&node.Status, api.NodeOutOfDisk)
if oodCondition == nil {
glog.V(2).Infof("Out of disk condition of node %v is never updated by kubelet", node.Name)
node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{
Type: api.NodeOutOfDisk,
Status: api.ConditionUnknown,
Reason: "NodeStatusNeverUpdated",
Message: fmt.Sprintf("Kubelet never posted node status."),
LastHeartbeatTime: node.CreationTimestamp,
LastTransitionTime: nc.now(),
})
} else {
glog.V(2).Infof("node %v hasn't been updated for %+v. Last out of disk condition is: %+v",
node.Name, nc.now().Time.Sub(savedNodeStatus.probeTimestamp.Time), oodCondition)
if oodCondition.Status != api.ConditionUnknown {
oodCondition.Status = api.ConditionUnknown
oodCondition.Reason = "NodeStatusUnknown"
oodCondition.Message = fmt.Sprintf("Kubelet stopped posting node status.")
oodCondition.LastTransitionTime = nc.now()
}
}
if !api.Semantic.DeepEqual(nc.getCondition(&node.Status, api.NodeReady), lastReadyCondition) {
if _, err = nc.kubeClient.Nodes().UpdateStatus(node); err != nil {
glog.Errorf("Error updating node %s: %v", node.Name, err)

View File

@ -18,7 +18,6 @@ package node
import (
"errors"
"fmt"
"sync"
"testing"
"time"
@ -402,7 +401,15 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
Type: api.NodeReady,
Status: api.ConditionUnknown,
Reason: "NodeStatusNeverUpdated",
Message: fmt.Sprintf("Kubelet never posted node status."),
Message: "Kubelet never posted node status.",
LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: fakeNow,
},
{
Type: api.NodeOutOfDisk,
Status: api.ConditionUnknown,
Reason: "NodeStatusNeverUpdated",
Message: "Kubelet never posted node status.",
LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: fakeNow,
},
@ -447,6 +454,13 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
{
Type: api.NodeOutOfDisk,
Status: api.ConditionFalse,
// Node status hasn't been updated for 1hr.
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
@ -471,6 +485,13 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
{
Type: api.NodeOutOfDisk,
Status: api.ConditionFalse,
// Node status hasn't been updated for 1hr.
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
@ -488,8 +509,16 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
{
Type: api.NodeReady,
Status: api.ConditionUnknown,
Reason: "NodeStatusStopUpdated",
Message: fmt.Sprintf("Kubelet stopped posting node status."),
Reason: "NodeStatusUnknown",
Message: "Kubelet stopped posting node status.",
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Time{Time: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC).Add(time.Hour)},
},
{
Type: api.NodeOutOfDisk,
Status: api.ConditionUnknown,
Reason: "NodeStatusUnknown",
Message: "Kubelet stopped posting node status.",
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Time{Time: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC).Add(time.Hour)},
},
@ -542,7 +571,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
},
}
for _, item := range table {
for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(),
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
@ -560,8 +589,10 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
}
if len(item.fakeNodeHandler.UpdatedNodes) > 0 && !api.Semantic.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) {
t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0],
item.fakeNodeHandler.UpdatedNodes[0])
t.Errorf("Case[%d] unexpected nodes: %s", i, util.ObjectDiff(item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0]))
}
if len(item.fakeNodeHandler.UpdatedNodeStatuses) > 0 && !api.Semantic.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodeStatuses) {
t.Errorf("Case[%d] unexpected nodes: %s", i, util.ObjectDiff(item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodeStatuses[0]))
}
}
}

View File

@ -164,7 +164,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String)
ServiceLister: f.ServiceLister,
ControllerLister: f.ControllerLister,
// All fit predicates only need to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(api.NodeReady, api.ConditionTrue),
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
NodeInfo: f.NodeLister,
}
predicateFuncs, err := getFitPredicateFunctions(predicateKeys, pluginArgs)
@ -212,7 +212,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String)
return &scheduler.Config{
Modeler: f.modeler,
// The scheduler only needs to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(api.NodeReady, api.ConditionTrue),
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
Algorithm: algo,
Binder: &binder{f.Client},
NextPod: func() *api.Pod {
@ -226,6 +226,23 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String)
}, nil
}
func getNodeConditionPredicate() cache.NodeConditionPredicate {
return func(node api.Node) bool {
for _, cond := range node.Status.Conditions {
// We consider the node for scheduling only when its NodeReady condition status
// is ConditionTrue and its NodeOutOfDisk condition status is ConditionFalse.
if cond.Type == api.NodeReady && cond.Status != api.ConditionTrue {
glog.V(4).Infof("Ignoring node %v with %v condition status %v", node.Name, cond.Type, cond.Status)
return false
} else if cond.Type == api.NodeOutOfDisk && cond.Status != api.ConditionFalse {
glog.V(4).Infof("Ignoring node %v with %v condition status %v", node.Name, cond.Type, cond.Status)
return false
}
}
return true
}
}
// Returns a cache.ListWatch that finds all pods that need to be
// scheduled.
func (factory *ConfigFactory) createUnassignedPodLW() *cache.ListWatch {