Remove pods from failed node

pull/6/head
Deyuan Deng 2015-02-07 14:53:42 -05:00
parent f5bc43a46c
commit 55b9944cfe
9 changed files with 363 additions and 75 deletions

View File

@ -202,7 +202,8 @@ func startComponents(manifestURL string) (apiServerURL string) {
controllerManager.Run(10 * time.Minute)
nodeResources := &api.NodeResources{}
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{})
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 5*time.Minute)
nodeController.Run(5*time.Second, 10, true)
// Kubelet (localhost)

View File

@ -123,7 +123,8 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
},
}
kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort}
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient)
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 5*time.Minute)
nodeController.Run(10*time.Second, 10, true)
endpoints := service.NewEndpointController(cl)

View File

@ -774,6 +774,7 @@ const (
type NodeCondition struct {
Kind NodeConditionKind `json:"kind"`
Status NodeConditionStatus `json:"status"`
LastProbeTime util.Time `json:"lastProbeTime,omitempty"`
LastTransitionTime util.Time `json:"lastTransitionTime,omitempty"`
Reason string `json:"reason,omitempty"`
Message string `json:"message,omitempty"`

View File

@ -619,6 +619,7 @@ const (
type NodeCondition struct {
Kind NodeConditionKind `json:"kind" description:"kind of the condition, one of reachable, ready"`
Status NodeConditionStatus `json:"status" description:"status of the condition, one of full, none, unknown"`
LastProbeTime util.Time `json:"lastProbeTime,omitempty" description:"last time the condition was probed"`
LastTransitionTime util.Time `json:"lastTransitionTime,omitempty" description:"last time the condition transit from one status to another"`
Reason string `json:"reason,omitempty" description:"(brief) reason for the condition's last transition"`
Message string `json:"message,omitempty" description:"human readable message indicating details about last transition"`

View File

@ -583,6 +583,7 @@ const (
type NodeCondition struct {
Kind NodeConditionKind `json:"kind" description:"kind of the condition, one of reachable, ready"`
Status NodeConditionStatus `json:"status" description:"status of the condition, one of full, none, unknown"`
LastProbeTime util.Time `json:"lastProbeTime,omitempty" description:"last time the condition was probed"`
LastTransitionTime util.Time `json:"lastTransitionTime,omitempty" description:"last time the condition transit from one status to another"`
Reason string `json:"reason,omitempty" description:"(brief) reason for the condition's last transition"`
Message string `json:"message,omitempty" description:"human readable message indicating details about last transition"`

View File

@ -808,6 +808,7 @@ const (
type NodeCondition struct {
Kind NodeConditionKind `json:"kind"`
Status NodeConditionStatus `json:"status"`
LastProbeTime util.Time `json:"lastProbeTime,omitempty"`
LastTransitionTime util.Time `json:"lastTransitionTime,omitempty"`
Reason string `json:"reason,omitempty"`
Message string `json:"message,omitempty"`

View File

@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"net"
"reflect"
"strings"
"sync"
"time"
@ -29,6 +28,7 @@ import (
apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
@ -41,12 +41,13 @@ var (
)
type NodeController struct {
cloud cloudprovider.Interface
matchRE string
staticResources *api.NodeResources
nodes []string
kubeClient client.Interface
kubeletClient client.KubeletHealthChecker
cloud cloudprovider.Interface
matchRE string
staticResources *api.NodeResources
nodes []string
kubeClient client.Interface
kubeletClient client.KubeletHealthChecker
podEvictionTimeout time.Duration
}
// NewNodeController returns a new node controller to sync instances from cloudprovider.
@ -58,14 +59,16 @@ func NewNodeController(
nodes []string,
staticResources *api.NodeResources,
kubeClient client.Interface,
kubeletClient client.KubeletHealthChecker) *NodeController {
kubeletClient client.KubeletHealthChecker,
podEvictionTimeout time.Duration) *NodeController {
return &NodeController{
cloud: cloud,
matchRE: matchRE,
nodes: nodes,
staticResources: staticResources,
kubeClient: kubeClient,
kubeletClient: kubeletClient,
cloud: cloud,
matchRE: matchRE,
nodes: nodes,
staticResources: staticResources,
kubeClient: kubeClient,
kubeletClient: kubeletClient,
podEvictionTimeout: podEvictionTimeout,
}
}
@ -180,6 +183,7 @@ func (s *NodeController) SyncCloud() error {
if err != nil {
glog.Errorf("Delete node error: %s", nodeID)
}
s.deletePods(nodeID)
}
return nil
@ -191,20 +195,15 @@ func (s *NodeController) SyncNodeStatus() error {
if err != nil {
return err
}
oldNodes := make(map[string]api.Node)
for _, node := range nodes.Items {
oldNodes[node.Name] = node
}
nodes = s.DoChecks(nodes)
nodes, err = s.PopulateIPs(nodes)
if err != nil {
return err
}
for _, node := range nodes.Items {
if reflect.DeepEqual(node, oldNodes[node.Name]) {
glog.V(2).Infof("skip updating node %v", node.Name)
continue
}
// We used to skip updating node when node status doesn't change, this is no longer
// useful after we introduce per-probe status field, e.g. 'LastProbeTime', which will
// differ in every call of the sync loop.
glog.V(2).Infof("updating node %v", node.Name)
_, err = s.kubeClient.Nodes().Update(&node)
if err != nil {
@ -273,40 +272,78 @@ func (s *NodeController) DoCheck(node *api.Node) []api.NodeCondition {
oldReadyCondition := s.getCondition(node, api.NodeReady)
newReadyCondition := s.checkNodeReady(node)
if oldReadyCondition != nil && oldReadyCondition.Status == newReadyCondition.Status {
// If node status doesn't change, transition time is same as last time.
newReadyCondition.LastTransitionTime = oldReadyCondition.LastTransitionTime
} else {
// Set transition time to Now() if node status changes or `oldReadyCondition` is nil, which
// happens only when the node is checked for the first time.
newReadyCondition.LastTransitionTime = util.Now()
}
if newReadyCondition.Status != api.ConditionFull {
// Node is not ready for this probe, we need to check if pods need to be deleted.
if newReadyCondition.LastProbeTime.After(newReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout)) {
// As long as the node fails, we call delete pods to delete all pods. Node controller sync
// is not a closed loop process, there is no feedback from other components regarding pod
// status. Keep listing pods to sanity check if pods are all deleted makes more sense.
s.deletePods(node.Name)
}
}
conditions = append(conditions, *newReadyCondition)
return conditions
}
// checkNodeReady checks raw node ready condition, without timestamp set.
// checkNodeReady checks raw node ready condition, without transition timestamp set.
func (s *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition {
switch status, err := s.kubeletClient.HealthCheck(node.Name); {
case err != nil:
glog.V(2).Infof("NodeController: node %s health check error: %v", node.Name, err)
return &api.NodeCondition{
Kind: api.NodeReady,
Status: api.ConditionUnknown,
Reason: fmt.Sprintf("Node health check error: %v", err),
Kind: api.NodeReady,
Status: api.ConditionUnknown,
Reason: fmt.Sprintf("Node health check error: %v", err),
LastProbeTime: util.Now(),
}
case status == probe.Failure:
return &api.NodeCondition{
Kind: api.NodeReady,
Status: api.ConditionNone,
Reason: fmt.Sprintf("Node health check failed: kubelet /healthz endpoint returns not ok"),
Kind: api.NodeReady,
Status: api.ConditionNone,
Reason: fmt.Sprintf("Node health check failed: kubelet /healthz endpoint returns not ok"),
LastProbeTime: util.Now(),
}
default:
return &api.NodeCondition{
Kind: api.NodeReady,
Status: api.ConditionFull,
Reason: fmt.Sprintf("Node health check succeeded: kubelet /healthz endpoint returns ok"),
Kind: api.NodeReady,
Status: api.ConditionFull,
Reason: fmt.Sprintf("Node health check succeeded: kubelet /healthz endpoint returns ok"),
LastProbeTime: util.Now(),
}
}
}
// deletePods will delete all pods from master running on given node.
func (s *NodeController) deletePods(nodeID string) error {
glog.V(2).Infof("Delete all pods from %v", nodeID)
// TODO: We don't yet have field selectors from client, see issue #1362.
pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Set{}.AsSelector())
if err != nil {
return err
}
for _, pod := range pods.Items {
if pod.Status.Host != nodeID {
continue
}
glog.V(2).Infof("Delete pod %v", pod.Name)
if err := s.kubeClient.Pods(api.NamespaceAll).Delete(pod.Name); err != nil {
glog.Errorf("Error deleting pod %v", pod.Name)
}
}
return nil
}
// StaticNodes constructs and returns api.NodeList for static nodes. If error
// occurs, an empty NodeList will be returned with a non-nil error info.
func (s *NodeController) StaticNodes() (*api.NodeList, error) {

View File

@ -33,10 +33,12 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface.
// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It
// allows test cases to have fine-grained control over mock behaviors. We alos need
// PodsInterface and PodInterface to test list & delet pods, which is implemented in
// the embeded client.Fake field.
type FakeNodeHandler struct {
client.Fake
client.FakeNodes
// Input: Hooks determine if request is valid or not
CreateHook func(*FakeNodeHandler, *api.Node) bool
@ -69,6 +71,10 @@ func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) {
}
}
func (m *FakeNodeHandler) Get(name string) (*api.Node, error) {
return nil, nil
}
func (m *FakeNodeHandler) List() (*api.NodeList, error) {
defer func() { m.RequestCount++ }()
var nodes []*api.Node
@ -224,7 +230,7 @@ func TestRegisterNodes(t *testing.T) {
for _, machine := range item.machines {
nodes.Items = append(nodes.Items, *newNode(machine))
}
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil)
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, time.Minute)
err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond)
if !item.expectedFail && err != nil {
t.Errorf("unexpected error: %v", err)
@ -265,7 +271,7 @@ func TestCreateStaticNodes(t *testing.T) {
}
for _, item := range table {
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, nil, nil)
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, nil, nil, time.Minute)
nodes, err := nodeController.StaticNodes()
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -308,7 +314,7 @@ func TestCreateCloudNodes(t *testing.T) {
}
for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil)
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, time.Minute)
nodes, err := nodeController.CloudNodes()
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -367,7 +373,7 @@ func TestSyncCloud(t *testing.T) {
}
for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil)
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, time.Minute)
if err := nodeController.SyncCloud(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -385,6 +391,83 @@ func TestSyncCloud(t *testing.T) {
}
}
func TestSyncCloudDeletePods(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
fakeCloud *fake_cloud.FakeCloud
matchRE string
expectedRequestCount int
expectedDeleted []string
expectedActions []client.FakeAction
}{
{
// No node to delete: do nothing.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{newNode("node0"), newNode("node1")},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}},
},
},
fakeCloud: &fake_cloud.FakeCloud{
Machines: []string{"node0", "node1"},
},
matchRE: ".*",
expectedRequestCount: 1, // List
expectedDeleted: []string{},
expectedActions: nil,
},
{
// Delete node1, and pod0 is running on it.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{newNode("node0"), newNode("node1")},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node1")}},
},
},
fakeCloud: &fake_cloud.FakeCloud{
Machines: []string{"node0"},
},
matchRE: ".*",
expectedRequestCount: 2, // List + Delete
expectedDeleted: []string{"node1"},
expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}},
},
{
// Delete node1, but pod0 is running on node0.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{newNode("node0"), newNode("node1")},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
fakeCloud: &fake_cloud.FakeCloud{
Machines: []string{"node0"},
},
matchRE: ".*",
expectedRequestCount: 2, // List + Delete
expectedDeleted: []string{"node1"},
expectedActions: []client.FakeAction{{Action: "list-pods"}},
},
}
for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, time.Minute)
if err := nodeController.SyncCloud(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
}
nodes := sortedNodeNames(item.fakeNodeHandler.DeletedNodes)
if !reflect.DeepEqual(item.expectedDeleted, nodes) {
t.Errorf("expected node list %+v, got %+v", item.expectedDeleted, nodes)
}
if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) {
t.Errorf("time out waiting for deleting pods, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions)
}
}
}
func TestHealthCheckNode(t *testing.T) {
table := []struct {
node *api.Node
@ -436,13 +519,17 @@ func TestHealthCheckNode(t *testing.T) {
}
for _, item := range table {
nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient)
nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, time.Minute)
conditions := nodeController.DoCheck(item.node)
for i := range conditions {
if conditions[i].LastTransitionTime.IsZero() {
t.Errorf("unexpected zero timestamp")
t.Errorf("unexpected zero last transition timestamp")
}
if conditions[i].LastProbeTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
conditions[i].LastTransitionTime = util.Time{}
conditions[i].LastProbeTime = util.Time{}
}
if !reflect.DeepEqual(item.expectedConditions, conditions) {
t.Errorf("expected conditions %+v, got %+v", item.expectedConditions, conditions)
@ -470,7 +557,7 @@ func TestPopulateNodeIPs(t *testing.T) {
}
for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil)
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, time.Minute)
result, err := nodeController.PopulateIPs(item.nodes)
// In case of IP querying error, we should continue.
if err != nil {
@ -484,14 +571,15 @@ func TestPopulateNodeIPs(t *testing.T) {
}
}
func TestNodeStatusTransitionTime(t *testing.T) {
func TestSyncNodeStatusTransitionTime(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
fakeKubeletClient *FakeKubeletClient
expectedNodes []*api.Node
expectedRequestCount int
fakeNodeHandler *FakeNodeHandler
fakeKubeletClient *FakeKubeletClient
expectedRequestCount int
expectedTransitionTimeChange bool
}{
{
// Existing node is healthy, current porbe is healthy too.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
@ -513,10 +601,11 @@ func TestNodeStatusTransitionTime(t *testing.T) {
Status: probe.Success,
Err: nil,
},
expectedNodes: []*api.Node{},
expectedRequestCount: 1,
expectedRequestCount: 2, // List+Update
expectedTransitionTimeChange: false,
},
{
// Existing node is healthy, current porbe is unhealthy.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
@ -538,27 +627,13 @@ func TestNodeStatusTransitionTime(t *testing.T) {
Status: probe.Failure,
Err: nil,
},
expectedNodes: []*api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "node0"},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Kind: api.NodeReady,
Status: api.ConditionFull,
Reason: "Node health check failed: kubelet /healthz endpoint returns not ok",
LastTransitionTime: util.Now(), // Placeholder expected transition time, due to inability to mock time.
},
},
},
},
},
expectedRequestCount: 2,
expectedRequestCount: 2, // List+Update
expectedTransitionTimeChange: true,
},
}
for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient)
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, time.Minute)
if err := nodeController.SyncNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -568,14 +643,168 @@ func TestNodeStatusTransitionTime(t *testing.T) {
for i := range item.fakeNodeHandler.UpdatedNodes {
conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions
for j := range conditions {
if !conditions[j].LastTransitionTime.After(time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) {
t.Errorf("unexpected timestamp %v", conditions[j].LastTransitionTime)
condition := conditions[j]
if item.expectedTransitionTimeChange {
if !condition.LastTransitionTime.After(time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) {
t.Errorf("unexpected last transition timestamp %v", condition.LastTransitionTime)
}
} else {
if !condition.LastTransitionTime.Equal(time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) {
t.Errorf("unexpected last transition timestamp %v", condition.LastTransitionTime)
}
}
}
}
}
}
func TestSyncNodeStatusDeletePods(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
fakeKubeletClient *FakeKubeletClient
expectedRequestCount int
expectedActions []client.FakeAction
}{
{
// Existing node is healthy, current porbe is healthy too.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "node0"},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Kind: api.NodeReady,
Status: api.ConditionFull,
Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok",
LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node1")}},
},
},
fakeKubeletClient: &FakeKubeletClient{
Status: probe.Success,
Err: nil,
},
expectedRequestCount: 2, // List+Update
expectedActions: nil,
},
{
// Existing node is healthy, current porbe is unhealthy, i.e. node just becomes unhealthy.
// Do not delete pods.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "node0"},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Kind: api.NodeReady,
Status: api.ConditionFull,
Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok",
LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
fakeKubeletClient: &FakeKubeletClient{
Status: probe.Failure,
Err: nil,
},
expectedRequestCount: 2, // List+Update
expectedActions: nil,
},
{
// Existing node unhealthy, current porbe is unhealthy. Node is still within grace peroid.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "node0"},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Kind: api.NodeReady,
Status: api.ConditionNone,
Reason: "Node health check failed: kubelet /healthz endpoint returns not ok",
// Here, last transition time is Now(). In node controller, the new condition's probe time is
// also Now(). The two calls to Now() yields differnt time due to test execution, but the
// time difference is within 5 minutes, which is the grace peroid.
LastTransitionTime: util.Now(),
},
},
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
fakeKubeletClient: &FakeKubeletClient{
Status: probe.Failure,
Err: nil,
},
expectedRequestCount: 2, // List+Update
expectedActions: nil,
},
{
// Existing node unhealthy, current porbe is unhealthy. Node exceeds grace peroid.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "node0"},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Kind: api.NodeReady,
Status: api.ConditionNone,
Reason: "Node health check failed: kubelet /healthz endpoint returns not ok",
// Here, last transition time is in the past, and in node controller, the
// new condition's probe time is Now(). The time difference is larger than
// 5*min. The test will fail if system clock is wrong, but we don't yet have
// ways to mock time in our tests.
LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
fakeKubeletClient: &FakeKubeletClient{
Status: probe.Failure,
Err: nil,
},
expectedRequestCount: 2, // List+Update
expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}},
},
}
for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 5*time.Minute)
if err := nodeController.SyncNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.expectedRequestCount != item.fakeNodeHandler.RequestCount {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
}
if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) {
t.Errorf("time out waiting for deleting pods, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions)
}
}
}
func TestSyncNodeStatus(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
@ -628,7 +857,7 @@ func TestSyncNodeStatus(t *testing.T) {
}
for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient)
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, time.Minute)
if err := nodeController.SyncNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -639,20 +868,25 @@ func TestSyncNodeStatus(t *testing.T) {
conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions
for j := range conditions {
if conditions[j].LastTransitionTime.IsZero() {
t.Errorf("unexpected zero timestamp")
t.Errorf("unexpected zero last transition timestamp")
}
if conditions[j].LastProbeTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
conditions[j].LastTransitionTime = util.Time{}
conditions[j].LastProbeTime = util.Time{}
}
}
if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) {
t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0])
}
// Second sync will also update the node.
item.fakeNodeHandler.RequestCount = 0
if err := nodeController.SyncNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.fakeNodeHandler.RequestCount != 1 {
t.Errorf("expected one list for updating same status, but got %v.", item.fakeNodeHandler.RequestCount)
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
}
}
}
@ -661,6 +895,10 @@ func newNode(name string) *api.Node {
return &api.Node{ObjectMeta: api.ObjectMeta{Name: name}}
}
func newPod(name, host string) *api.Pod {
return &api.Pod{ObjectMeta: api.ObjectMeta{Name: name}, Status: api.PodStatus{Host: host}}
}
func sortedNodeNames(nodes []*api.Node) []string {
nodeNames := []string{}
for _, node := range nodes {

View File

@ -54,7 +54,11 @@ type CMServer struct {
ResourceQuotaSyncPeriod time.Duration
RegisterRetryCount int
MachineList util.StringList
<<<<<<< HEAD
SyncNodeList bool
=======
PodEvictionTimeout time.Duration
>>>>>>> Remove pods from failed node
// TODO: Discover these by pinging the host machines, and rip out these params.
NodeMilliCPU int64
@ -63,7 +67,7 @@ type CMServer struct {
KubeletConfig client.KubeletConfig
}
// NewCMServer creates a new CMServer with default a default config.
// NewCMServer creates a new CMServer with a default config.
func NewCMServer() *CMServer {
s := CMServer{
Port: ports.ControllerManagerPort,
@ -71,6 +75,7 @@ func NewCMServer() *CMServer {
NodeSyncPeriod: 10 * time.Second,
ResourceQuotaSyncPeriod: 10 * time.Second,
RegisterRetryCount: 10,
PodEvictionTimeout: 5 * time.Minute,
NodeMilliCPU: 1000,
NodeMemory: resource.MustParse("3Gi"),
SyncNodeList: true,
@ -110,6 +115,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
"The period for syncing nodes from cloudprovider. Longer periods will result in "+
"fewer calls to cloud provider, but may delay addition of new nodes to cluster.")
fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource_quota_sync_period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system")
fs.DurationVar(&s.PodEvictionTimeout, "pod_eviction_timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.")
fs.IntVar(&s.RegisterRetryCount, "register_retry_count", s.RegisterRetryCount, ""+
"The number of retries for initial node registration. Retry interval equals node_sync_period.")
fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.")
@ -169,7 +175,8 @@ func (s *CMServer) Run(_ []string) error {
api.ResourceMemory: s.NodeMemory,
},
}
nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, kubeClient, kubeletClient)
nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, kubeClient, kubeletClient, s.PodEvictionTimeout)
nodeController.Run(s.NodeSyncPeriod, s.RegisterRetryCount, s.SyncNodeList)
resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)