diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 08b6eaecb0..936972680e 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -206,7 +206,7 @@ func startComponents(manifestURL string) (apiServerURL string) { nodeResources := &api.NodeResources{} nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute) - nodeController.Run(5*time.Second, true) + nodeController.Run(5*time.Second, true, true) // Kubelet (localhost) testRootDir := makeTempDirOrDie("kubelet_integ_1.") diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 2b201652fb..dc6e295395 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -54,6 +54,7 @@ type CMServer struct { RegisterRetryCount int MachineList util.StringList SyncNodeList bool + SyncNodeStatus bool PodEvictionTimeout time.Duration // TODO: Discover these by pinging the host machines, and rip out these params. @@ -75,6 +76,7 @@ func NewCMServer() *CMServer { NodeMilliCPU: 1000, NodeMemory: resource.MustParse("3Gi"), SyncNodeList: true, + SyncNodeStatus: true, KubeletConfig: client.KubeletConfig{ Port: ports.KubeletPort, EnableHttps: false, @@ -100,6 +102,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { "The number of retries for initial node registration. Retry interval equals node_sync_period.") fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.") fs.BoolVar(&s.SyncNodeList, "sync_nodes", s.SyncNodeList, "If true, and --cloud_provider is specified, sync nodes from the cloud provider. Default true.") + fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, "Should node controler send probes to kubelets and update NodeStatus.") // TODO: Discover these by pinging the host machines, and rip out these flags. // TODO: in the meantime, use resource.QuantityFlag() instead of these fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node") @@ -158,7 +161,7 @@ func (s *CMServer) Run(_ []string) error { nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout) - nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList) + nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList, s.SyncNodeStatus) resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 5528ab2433..307abbb85c 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -127,7 +127,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort} nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute) - nodeController.Run(10*time.Second, true) + nodeController.Run(10*time.Second, true, true) endpoints := service.NewEndpointController(cl) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) diff --git a/pkg/cloudprovider/controller/nodecontroller.go b/pkg/cloudprovider/controller/nodecontroller.go index 6771a3ee15..075029c27a 100644 --- a/pkg/cloudprovider/controller/nodecontroller.go +++ b/pkg/cloudprovider/controller/nodecontroller.go @@ -77,7 +77,7 @@ func NewNodeController( // Run creates initial node list and start syncing instances from cloudprovider if any. // It also starts syncing cluster node status. -func (s *NodeController) Run(period time.Duration, syncNodeList bool) { +func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) { // Register intial set of nodes with their status set. var nodes *api.NodeList var err error @@ -96,7 +96,6 @@ func (s *NodeController) Run(period time.Duration, syncNodeList bool) { glog.Errorf("Error loading initial static nodes: %v", err) } } - nodes = s.DoChecks(nodes) nodes, err = s.PopulateIPs(nodes) if err != nil { glog.Errorf("Error getting nodes ips: %v", err) @@ -114,12 +113,21 @@ func (s *NodeController) Run(period time.Duration, syncNodeList bool) { }, period) } - // Start syncing node status. - go util.Forever(func() { - if err = s.SyncNodeStatus(); err != nil { - glog.Errorf("Error syncing status: %v", err) - } - }, period) + if syncNodeStatus { + // Start syncing node status. + go util.Forever(func() { + if err = s.SyncNodeStatus(); err != nil { + glog.Errorf("Error syncing status: %v", err) + } + }, period) + } else { + // Start checking node reachability and evicting timeouted pods. + go util.Forever(func() { + if err = s.EvictTimeoutedPods(); err != nil { + glog.Errorf("Error evicting timeouted pods: %v", err) + } + }, period) + } } // RegisterNodes registers the given list of nodes, it keeps retrying for `retryCount` times. @@ -216,6 +224,33 @@ func (s *NodeController) SyncNodeStatus() error { return nil } +// EvictTimeoutedPods verifies if nodes are reachable by checking the time of last probe +// and deletes pods from not reachable nodes. +func (s *NodeController) EvictTimeoutedPods() error { + nodes, err := s.kubeClient.Nodes().List() + if err != nil { + return err + } + for _, node := range nodes.Items { + if util.Now().After(latestReadyTime(&node).Add(s.podEvictionTimeout)) { + s.deletePods(node.Name) + } + } + return nil +} + +func latestReadyTime(node *api.Node) util.Time { + readyTime := node.ObjectMeta.CreationTimestamp + for _, condition := range node.Status.Conditions { + if condition.Type == api.NodeReady && + condition.Status == api.ConditionFull && + condition.LastProbeTime.After(readyTime.Time) { + readyTime = condition.LastProbeTime + } + } + return readyTime +} + // PopulateIPs queries IPs for given list of nodes. func (s *NodeController) PopulateIPs(nodes *api.NodeList) (*api.NodeList, error) { if s.isRunningCloudProvider() { diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/controller/nodecontroller_test.go index 1001cdabe7..ffec918881 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/controller/nodecontroller_test.go @@ -709,6 +709,118 @@ func TestSyncNodeStatusTransitionTime(t *testing.T) { } } +func TestEvictTimeoutedPods(t *testing.T) { + table := []struct { + fakeNodeHandler *FakeNodeHandler + expectedRequestCount int + expectedActions []client.FakeAction + }{ + // Node created long time ago, with no status. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedRequestCount: 1, // List + expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}}, + }, + // Node created recently, with no status. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Now(), + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedRequestCount: 1, // List + expectedActions: nil, + }, + // Node created long time ago, with status updated long time ago. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedRequestCount: 1, // List + expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}}, + }, + // Node created long time ago, with status updated recently. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + LastProbeTime: util.Now(), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedRequestCount: 1, // List + expectedActions: nil, + }, + } + + for _, item := range table { + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute) + if err := nodeController.EvictTimeoutedPods(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { + t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } + if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) { + t.Errorf("actions differs, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions) + } + } +} + func TestSyncNodeStatusDeletePods(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler