Merge pull request #57492 from cheftako/node-controller

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Split the NodeController into lifecycle and ipam pieces.

**What this PR does / why we need it**: Separates node controller into ipam and lifecycle components. 

    Prepatory work for removing cloud provider dependency from node
    controller running in Kube Controller Manager. Splitting the node
    controller into its two major pieces life-cycle and CIDR/IP
    management. Both pieces currently need the the cloud system to do their work.
    Removing lifecycles dependency on cloud will be fixed ina followup PR.
    
    Moved node scheduler code to live with node lifecycle controller.
    Got the IPAM/Lifecycle split completed. Still need to rename pieces.
    Made changes to the utils and tests so they would be in the appropriate
    package.
    Moved the node based ipam code to nodeipam.
    Made the relevant tests pass.
    Moved common node controller util code to nodeutil.
    Removed unneeded pod informer sync from node ipam controller.


**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #https://github.com/kubernetes/kubernetes/issues/52369

**Special notes for your reviewer**:

**Release note**:
```release-note
None
```
pull/6/head
Kubernetes Submit Queue 2018-01-05 07:52:43 -08:00 committed by GitHub
commit e7070354fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 972 additions and 738 deletions

View File

@ -58,8 +58,9 @@ go_library(
"//pkg/controller/garbagecollector:go_default_library",
"//pkg/controller/job:go_default_library",
"//pkg/controller/namespace:go_default_library",
"//pkg/controller/node:go_default_library",
"//pkg/controller/node/ipam:go_default_library",
"//pkg/controller/nodeipam:go_default_library",
"//pkg/controller/nodeipam/ipam:go_default_library",
"//pkg/controller/nodelifecycle:go_default_library",
"//pkg/controller/podautoscaler:go_default_library",
"//pkg/controller/podautoscaler/metrics:go_default_library",
"//pkg/controller/podgc:go_default_library",

View File

@ -368,10 +368,12 @@ func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc
controllers["tokencleaner"] = startTokenCleanerController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["nodeipam"] = startNodeIpamController
controllers["route"] = startRouteController
// TODO: Move node controller and volume controller into the IncludeCloudLoops only set.
// TODO: volume controller into the IncludeCloudLoops only set.
// TODO: Separate cluster in cloud check from node lifecycle controller.
}
controllers["node"] = startNodeController
controllers["nodelifecycle"] = startNodeLifecycleController
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController

View File

@ -41,8 +41,9 @@ import (
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
"k8s.io/kubernetes/pkg/controller/node/ipam"
nodeipamcontroller "k8s.io/kubernetes/pkg/controller/nodeipam"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
lifecyclecontroller "k8s.io/kubernetes/pkg/controller/nodelifecycle"
"k8s.io/kubernetes/pkg/controller/podgc"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
@ -77,7 +78,7 @@ func startServiceController(ctx ControllerContext) (bool, error) {
return true, nil
}
func startNodeController(ctx ControllerContext) (bool, error) {
func startNodeIpamController(ctx ControllerContext) (bool, error) {
var clusterCIDR *net.IPNet = nil
var serviceCIDR *net.IPNet = nil
if ctx.Options.AllocateNodeCIDRs {
@ -97,25 +98,38 @@ func startNodeController(ctx ControllerContext) (bool, error) {
}
}
nodeController, err := nodecontroller.NewNodeController(
ctx.InformerFactory.Core().V1().Pods(),
nodeIpamController, err := nodeipamcontroller.NewNodeIpamController(
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Extensions().V1beta1().DaemonSets(),
ctx.Cloud,
ctx.ClientBuilder.ClientOrDie("node-controller"),
ctx.Options.PodEvictionTimeout.Duration,
ctx.Options.NodeEvictionRate,
ctx.Options.SecondaryNodeEvictionRate,
ctx.Options.LargeClusterSizeThreshold,
ctx.Options.UnhealthyZoneThreshold,
ctx.Options.NodeMonitorGracePeriod.Duration,
ctx.Options.NodeStartupGracePeriod.Duration,
ctx.Options.NodeMonitorPeriod.Duration,
clusterCIDR,
serviceCIDR,
int(ctx.Options.NodeCIDRMaskSize),
ctx.Options.AllocateNodeCIDRs,
ipam.CIDRAllocatorType(ctx.Options.CIDRAllocatorType),
)
if err != nil {
return true, err
}
go nodeIpamController.Run(ctx.Stop)
return true, nil
}
func startNodeLifecycleController(ctx ControllerContext) (bool, error) {
lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Extensions().V1beta1().DaemonSets(),
ctx.Cloud,
ctx.ClientBuilder.ClientOrDie("node-controller"),
ctx.Options.NodeMonitorPeriod.Duration,
ctx.Options.NodeStartupGracePeriod.Duration,
ctx.Options.NodeMonitorGracePeriod.Duration,
ctx.Options.PodEvictionTimeout.Duration,
ctx.Options.NodeEvictionRate,
ctx.Options.SecondaryNodeEvictionRate,
ctx.Options.LargeClusterSizeThreshold,
ctx.Options.UnhealthyZoneThreshold,
ctx.Options.EnableTaintManager,
utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition),
@ -123,7 +137,7 @@ func startNodeController(ctx ControllerContext) (bool, error) {
if err != nil {
return true, err
}
go nodeController.Run(ctx.Stop)
go lifecycleController.Run(ctx.Stop)
return true, nil
}

View File

@ -117,7 +117,8 @@ filegroup(
"//pkg/controller/history:all-srcs",
"//pkg/controller/job:all-srcs",
"//pkg/controller/namespace:all-srcs",
"//pkg/controller/node:all-srcs",
"//pkg/controller/nodeipam:all-srcs",
"//pkg/controller/nodelifecycle:all-srcs",
"//pkg/controller/podautoscaler:all-srcs",
"//pkg/controller/podgc:all-srcs",
"//pkg/controller/replicaset:all-srcs",
@ -129,6 +130,7 @@ filegroup(
"//pkg/controller/statefulset:all-srcs",
"//pkg/controller/testutil:all-srcs",
"//pkg/controller/ttl:all-srcs",
"//pkg/controller/util/node:all-srcs",
"//pkg/controller/volume/attachdetach:all-srcs",
"//pkg/controller/volume/events:all-srcs",
"//pkg/controller/volume/expand:all-srcs",

View File

@ -0,0 +1,48 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"metrics.go",
"node_ipam_controller.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/nodeipam",
deps = [
"//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/nodeipam/ipam:go_default_library",
"//pkg/controller/nodeipam/ipam/sync:go_default_library",
"//pkg/util/metrics:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/controller/nodeipam/ipam:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -14,6 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package node contains code for syncing cloud instances with
// Package nodeipam contains code for syncing cloud instances with
// node registry
package node // import "k8s.io/kubernetes/pkg/controller/node"
package nodeipam // import "k8s.io/kubernetes/pkg/controller/nodeipam"

View File

@ -14,11 +14,11 @@ go_test(
"timeout_test.go",
],
embed = [":go_default_library"],
importpath = "k8s.io/kubernetes/pkg/controller/node/ipam",
importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam",
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/node/ipam/cidrset:go_default_library",
"//pkg/controller/node/ipam/test:go_default_library",
"//pkg/controller/nodeipam/ipam/cidrset:go_default_library",
"//pkg/controller/nodeipam/ipam/test:go_default_library",
"//pkg/controller/testutil:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -40,15 +40,15 @@ go_library(
"range_allocator.go",
"timeout.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/node/ipam",
importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam",
deps = [
"//pkg/api/v1/node:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/node/ipam/cidrset:go_default_library",
"//pkg/controller/node/ipam/sync:go_default_library",
"//pkg/controller/node/util:go_default_library",
"//pkg/controller/nodeipam/ipam/cidrset:go_default_library",
"//pkg/controller/nodeipam/ipam/sync:go_default_library",
"//pkg/controller/util/node:go_default_library",
"//pkg/util/node:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
@ -82,9 +82,9 @@ filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/controller/node/ipam/cidrset:all-srcs",
"//pkg/controller/node/ipam/sync:all-srcs",
"//pkg/controller/node/ipam/test:all-srcs",
"//pkg/controller/nodeipam/ipam/cidrset:all-srcs",
"//pkg/controller/nodeipam/ipam/sync:all-srcs",
"//pkg/controller/nodeipam/ipam/test:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -10,14 +10,14 @@ go_test(
name = "go_default_test",
srcs = ["cidr_set_test.go"],
embed = [":go_default_library"],
importpath = "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset",
importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset",
deps = ["//vendor/github.com/golang/glog:go_default_library"],
)
go_library(
name = "go_default_library",
srcs = ["cidr_set.go"],
importpath = "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset",
importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset",
)
filegroup(

View File

@ -40,7 +40,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/node/util"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
utilnode "k8s.io/kubernetes/pkg/util/node"
)
@ -101,8 +101,8 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: util.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR),
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
AddFunc: nodeutil.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
if newNode.Spec.PodCIDR == "" {
return ca.AllocateOrOccupyCIDR(newNode)
}
@ -114,7 +114,7 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
}
return nil
}),
DeleteFunc: util.CreateDeleteNodeHandler(ca.ReleaseCIDR),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(ca.ReleaseCIDR),
})
glog.V(0).Infof("Using cloud CIDR allocator (provider: %v)", cloud.ProviderName())
@ -197,11 +197,11 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error {
cidrs, err := ca.cloud.AliasRanges(types.NodeName(nodeName))
if err != nil {
util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
nodeutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: %v", err)
}
if len(cidrs) == 0 {
util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
nodeutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name)
}
_, cidr, err := net.ParseCIDR(cidrs[0])
@ -237,7 +237,7 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error {
glog.Errorf("Failed to update node %v PodCIDR to %v (%d retries left): %v", node.Name, podCIDR, cidrUpdateRetries-rep-1, err)
}
if err != nil {
util.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed")
nodeutil.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed")
glog.Errorf("CIDR assignment for node %v failed: %v.", nodeName, err)
return err
}

View File

@ -30,9 +30,9 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/controller/node/ipam/cidrset"
nodesync "k8s.io/kubernetes/pkg/controller/node/ipam/sync"
"k8s.io/kubernetes/pkg/controller/node/util"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
)
// Config for the IPAM controller.
@ -128,9 +128,9 @@ func (c *Controller) Start(nodeInformer informers.NodeInformer) error {
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: util.CreateAddNodeHandler(c.onAdd),
UpdateFunc: util.CreateUpdateNodeHandler(c.onUpdate),
DeleteFunc: util.CreateDeleteNodeHandler(c.onDelete),
AddFunc: nodeutil.CreateAddNodeHandler(c.onAdd),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(c.onUpdate),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(c.onDelete),
})
return nil

View File

@ -20,8 +20,8 @@ import (
"net"
"testing"
"k8s.io/kubernetes/pkg/controller/node/ipam/cidrset"
"k8s.io/kubernetes/pkg/controller/node/ipam/test"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/test"
)
func TestOccupyServiceCIDR(t *testing.T) {

View File

@ -36,9 +36,9 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/node/ipam/cidrset"
"k8s.io/kubernetes/pkg/controller/node/util"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
utilnode "k8s.io/kubernetes/pkg/util/node"
)
type rangeAllocator struct {
@ -119,8 +119,8 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: util.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR),
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
AddFunc: nodeutil.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
// If the PodCIDR is not empty we either:
// - already processed a Node that already had a CIDR after NC restarted
// (cidr is marked as used),
@ -145,7 +145,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No
}
return nil
}),
DeleteFunc: util.CreateDeleteNodeHandler(ra.ReleaseCIDR),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(ra.ReleaseCIDR),
})
return ra, nil
@ -234,7 +234,7 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
podCIDR, err := r.cidrs.AllocateNext()
if err != nil {
r.removeNodeFromProcessing(node.Name)
util.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: %v", err)
}
@ -303,14 +303,14 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
}
return nil
}
if err = nodeutil.PatchNodeCIDR(r.client, types.NodeName(node.Name), podCIDR); err == nil {
if err = utilnode.PatchNodeCIDR(r.client, types.NodeName(node.Name), podCIDR); err == nil {
glog.Infof("Set node %v PodCIDR to %v", node.Name, podCIDR)
break
}
glog.Errorf("Failed to update node %v PodCIDR to %v (%d retries left): %v", node.Name, podCIDR, cidrUpdateRetries-rep-1, err)
}
if err != nil {
util.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
// We accept the fact that we may leek CIDRs here. This is safer than releasing
// them in case when we don't know if request went through.
// NodeController restart will return all falsely allocated CIDRs to the pool.

View File

@ -3,10 +3,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["sync.go"],
importpath = "k8s.io/kubernetes/pkg/controller/node/ipam/sync",
importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync",
visibility = ["//visibility:public"],
deps = [
"//pkg/controller/node/ipam/cidrset:go_default_library",
"//pkg/controller/nodeipam/ipam/cidrset:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
],
@ -16,10 +16,10 @@ go_test(
name = "go_default_test",
srcs = ["sync_test.go"],
embed = [":go_default_library"],
importpath = "k8s.io/kubernetes/pkg/controller/node/ipam/sync",
importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync",
deps = [
"//pkg/controller/node/ipam/cidrset:go_default_library",
"//pkg/controller/node/ipam/test:go_default_library",
"//pkg/controller/nodeipam/ipam/cidrset:go_default_library",
"//pkg/controller/nodeipam/ipam/test:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -25,7 +25,7 @@ import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/controller/node/ipam/cidrset"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
)
const (

View File

@ -26,8 +26,8 @@ import (
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/controller/node/ipam/cidrset"
"k8s.io/kubernetes/pkg/controller/node/ipam/test"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/test"
"k8s.io/api/core/v1"
)

View File

@ -3,7 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["utils.go"],
importpath = "k8s.io/kubernetes/pkg/controller/node/ipam/test",
importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/test",
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,21 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeipam
// Register the metrics that are to be monitored.
func Register() {
}

View File

@ -0,0 +1,187 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeipam
import (
"net"
"time"
"github.com/golang/glog"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/api/core/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync"
"k8s.io/kubernetes/pkg/util/metrics"
)
func init() {
// Register prometheus metrics
Register()
}
const (
// ipamResyncInterval is the amount of time between when the cloud and node
// CIDR range assignments are synchronized.
ipamResyncInterval = 30 * time.Second
// ipamMaxBackoff is the maximum backoff for retrying synchronization of a
// given in the error state.
ipamMaxBackoff = 10 * time.Second
// ipamInitialRetry is the initial retry interval for retrying synchronization of a
// given in the error state.
ipamInitialBackoff = 250 * time.Millisecond
)
// Controller is the controller that manages node ipam state.
type Controller struct {
allocateNodeCIDRs bool
allocatorType ipam.CIDRAllocatorType
cloud cloudprovider.Interface
clusterCIDR *net.IPNet
serviceCIDR *net.IPNet
kubeClient clientset.Interface
// Method for easy mocking in unittest.
lookupIP func(host string) ([]net.IP, error)
nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
cidrAllocator ipam.CIDRAllocator
forcefullyDeletePod func(*v1.Pod) error
}
// NewNodeIpamController returns a new node IP Address Management controller to
// sync instances from cloudprovider.
// This method returns an error if it is unable to initialize the CIDR bitmap with
// podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes
// currently, this should be handled as a fatal error.
func NewNodeIpamController(
nodeInformer coreinformers.NodeInformer,
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
clusterCIDR *net.IPNet,
serviceCIDR *net.IPNet,
nodeCIDRMaskSize int,
allocateNodeCIDRs bool,
allocatorType ipam.CIDRAllocatorType) (*Controller, error) {
if kubeClient == nil {
glog.Fatalf("kubeClient is nil when starting Controller")
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
glog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(
&v1core.EventSinkImpl{
Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events(""),
})
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("node_ipam_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
}
if allocateNodeCIDRs {
if clusterCIDR == nil {
glog.Fatal("Controller: Must specify clusterCIDR if allocateNodeCIDRs == true.")
}
mask := clusterCIDR.Mask
if maskSize, _ := mask.Size(); maskSize > nodeCIDRMaskSize {
glog.Fatal("Controller: Invalid clusterCIDR, mask size of clusterCIDR must be less than nodeCIDRMaskSize.")
}
}
ic := &Controller{
cloud: cloud,
kubeClient: kubeClient,
lookupIP: net.LookupIP,
clusterCIDR: clusterCIDR,
serviceCIDR: serviceCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
allocatorType: allocatorType,
}
// TODO: Abstract this check into a generic controller manager should run method.
if ic.allocateNodeCIDRs {
if ic.allocatorType == ipam.IPAMFromClusterAllocatorType || ic.allocatorType == ipam.IPAMFromCloudAllocatorType {
cfg := &ipam.Config{
Resync: ipamResyncInterval,
MaxBackoff: ipamMaxBackoff,
InitialRetry: ipamInitialBackoff,
}
switch ic.allocatorType {
case ipam.IPAMFromClusterAllocatorType:
cfg.Mode = nodesync.SyncFromCluster
case ipam.IPAMFromCloudAllocatorType:
cfg.Mode = nodesync.SyncFromCloud
}
ipamc, err := ipam.NewController(cfg, kubeClient, cloud, clusterCIDR, serviceCIDR, nodeCIDRMaskSize)
if err != nil {
glog.Fatalf("Error creating ipam controller: %v", err)
}
if err := ipamc.Start(nodeInformer); err != nil {
glog.Fatalf("Error trying to Init(): %v", err)
}
} else {
var err error
ic.cidrAllocator, err = ipam.New(
kubeClient, cloud, nodeInformer, ic.allocatorType, ic.clusterCIDR, ic.serviceCIDR, nodeCIDRMaskSize)
if err != nil {
return nil, err
}
}
}
ic.nodeLister = nodeInformer.Lister()
ic.nodeInformerSynced = nodeInformer.Informer().HasSynced
return ic, nil
}
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting ipam controller")
defer glog.Infof("Shutting down ipam controller")
if !controller.WaitForCacheSync("node", stopCh, nc.nodeInformerSynced) {
return
}
// TODO: Abstract this check into a generic controller manager should run method.
if nc.allocateNodeCIDRs {
if nc.allocatorType != ipam.IPAMFromClusterAllocatorType && nc.allocatorType != ipam.IPAMFromCloudAllocatorType {
go nc.cidrAllocator.Run(stopCh)
}
}
<-stopCh
}

View File

@ -1,24 +1,76 @@
package(default_visibility = ["//visibility:public"])
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
go_library(
name = "go_default_library",
srcs = [
"metrics.go",
"node_lifecycle_controller.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle",
visibility = ["//visibility:public"],
deps = [
"//pkg/api/v1/node:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/nodelifecycle/scheduler:go_default_library",
"//pkg/controller/util/node:go_default_library",
"//pkg/util/metrics:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/system:go_default_library",
"//pkg/util/taints:go_default_library",
"//pkg/util/version:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/informers/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/controller/nodelifecycle/scheduler:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["nodecontroller_test.go"],
srcs = ["node_lifecycle_controller_test.go"],
embed = [":go_default_library"],
importpath = "k8s.io/kubernetes/pkg/controller/node",
importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle",
deps = [
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/node/ipam:go_default_library",
"//pkg/controller/node/scheduler:go_default_library",
"//pkg/controller/node/util:go_default_library",
"//pkg/controller/nodelifecycle/scheduler:go_default_library",
"//pkg/controller/testutil:go_default_library",
"//pkg/controller/util/node:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/taints:go_default_library",
@ -39,65 +91,3 @@ go_test(
"//vendor/k8s.io/client-go/testing:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"metrics.go",
"node_controller.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/node",
deps = [
"//pkg/api/v1/node:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/node/ipam:go_default_library",
"//pkg/controller/node/ipam/sync:go_default_library",
"//pkg/controller/node/scheduler:go_default_library",
"//pkg/controller/node/util:go_default_library",
"//pkg/util/metrics:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/system:go_default_library",
"//pkg/util/taints:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/informers/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/controller/node/ipam:all-srcs",
"//pkg/controller/node/scheduler:all-srcs",
"//pkg/controller/node/util:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -1,5 +1,5 @@
/*
Copyright 2016 The Kubernetes Authors.
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package nodelifecycle
import (
"sync"

View File

@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors.
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,16 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
// The Controller sets tainted annotations on nodes.
// Tainted nodes should not be used for new work loads and
// some effort should be given to getting existing work
// loads off of tainted nodes.
package nodelifecycle
import (
"fmt"
"net"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -31,31 +30,31 @@ import (
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/api/core/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corelisters "k8s.io/client-go/listers/core/v1"
extensionslisters "k8s.io/client-go/listers/extensions/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
v1node "k8s.io/kubernetes/pkg/api/v1/node"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/node/ipam"
nodesync "k8s.io/kubernetes/pkg/controller/node/ipam/sync"
"k8s.io/kubernetes/pkg/controller/node/scheduler"
"k8s.io/kubernetes/pkg/controller/node/util"
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
"k8s.io/kubernetes/pkg/util/metrics"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/system"
taintutils "k8s.io/kubernetes/pkg/util/taints"
utilversion "k8s.io/kubernetes/pkg/util/version"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"fmt"
"github.com/golang/glog"
"sync"
"time"
)
func init() {
@ -64,11 +63,14 @@ func init() {
}
var (
gracefulDeletionVersion = utilversion.MustParseSemantic("v1.1.0")
// UnreachableTaintTemplate is the taint for when a node becomes unreachable.
UnreachableTaintTemplate = &v1.Taint{
Key: algorithm.TaintNodeUnreachable,
Effect: v1.TaintEffectNoExecute,
}
// NotReadyTaintTemplate is the taint for when a node is not ready for
// executing pods
NotReadyTaintTemplate = &v1.Taint{
@ -91,23 +93,6 @@ var (
}
)
const (
// The amount of time the nodecontroller polls on the list nodes endpoint.
apiserverStartupGracePeriod = 10 * time.Minute
// The amount of time the nodecontroller should sleep between retrying NodeStatus updates
retrySleepTime = 20 * time.Millisecond
// ipamResyncInterval is the amount of time between when the cloud and node
// CIDR range assignments are synchronized.
ipamResyncInterval = 30 * time.Second
// ipamMaxBackoff is the maximum backoff for retrying synchronization of a
// given in the error state.
ipamMaxBackoff = 10 * time.Second
// ipamInitialRetry is the initial retry interval for retrying synchronization of a
// given in the error state.
ipamInitialBackoff = 250 * time.Millisecond
)
// ZoneState is the state of a given zone.
type ZoneState string
@ -118,24 +103,68 @@ const (
statePartialDisruption = ZoneState("PartialDisruption")
)
const (
// The amount of time the nodecontroller polls on the list nodes endpoint.
apiserverStartupGracePeriod = 10 * time.Minute
// The amount of time the nodecontroller should sleep between retrying NodeStatus updates
retrySleepTime = 20 * time.Millisecond
)
type nodeStatusData struct {
probeTimestamp metav1.Time
readyTransitionTimestamp metav1.Time
status v1.NodeStatus
}
// Controller is the controller that manages node related cluster state.
// Controller is the controller that manages node's life cycle.
type Controller struct {
allocateNodeCIDRs bool
allocatorType ipam.CIDRAllocatorType
taintManager *scheduler.NoExecuteTaintManager
podInformerSynced cache.InformerSynced
cloud cloudprovider.Interface
kubeClient clientset.Interface
// This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
// to aviod the problem with time skew across the cluster.
now func() metav1.Time
enterPartialDisruptionFunc func(nodeNum int) float32
enterFullDisruptionFunc func(nodeNum int) float32
computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState)
cloud cloudprovider.Interface
clusterCIDR *net.IPNet
serviceCIDR *net.IPNet
knownNodeSet map[string]*v1.Node
kubeClient clientset.Interface
// Method for easy mocking in unittest.
lookupIP func(host string) ([]net.IP, error)
// per Node map storing last observed Status together with a local time when it was observed.
nodeStatusMap map[string]nodeStatusData
// Lock to access evictor workers
evictorLock sync.Mutex
// workers that evicts pods from unresponsive nodes.
zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue
// workers that are responsible for tainting nodes.
zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue
zoneStates map[string]ZoneState
daemonSetStore extensionslisters.DaemonSetLister
daemonSetInformerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
nodeExistsInCloudProvider func(types.NodeName) (bool, error)
recorder record.EventRecorder
// Value controlling Controller monitoring period, i.e. how often does Controller
// check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod.
// TODO: Change node status monitor to watch based.
nodeMonitorPeriod time.Duration
// Value used if sync_nodes_status=False, only for node startup. When node
// is just created, e.g. cluster bootstrap or node creation, we give a longer grace period.
nodeStartupGracePeriod time.Duration
// Value used if sync_nodes_status=False. Controller will not proactively
// sync node status in this case, but will monitor node status updated from kubelet. If
// it doesn't receive update for this amount of time, it will start posting "NodeReady==
@ -151,45 +180,8 @@ type Controller struct {
// 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes
// longer for user to see up-to-date node status.
nodeMonitorGracePeriod time.Duration
// Value controlling Controller monitoring period, i.e. how often does Controller
// check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod.
// TODO: Change node status monitor to watch based.
nodeMonitorPeriod time.Duration
// Value used if sync_nodes_status=False, only for node startup. When node
// is just created, e.g. cluster bootstrap or node creation, we give a longer grace period.
nodeStartupGracePeriod time.Duration
// per Node map storing last observed Status together with a local time when it was observed.
nodeStatusMap map[string]nodeStatusData
// This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
// to aviod the problem with time skew across the cluster.
now func() metav1.Time
// Lock to access evictor workers
evictorLock sync.Mutex
// workers that evicts pods from unresponsive nodes.
zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue
// workers that are responsible for tainting nodes.
zoneNoExecuteTainer map[string]*scheduler.RateLimitedTimedQueue
podEvictionTimeout time.Duration
// The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration
recorder record.EventRecorder
nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
daemonSetStore extensionslisters.DaemonSetLister
daemonSetInformerSynced cache.InformerSynced
podInformerSynced cache.InformerSynced
cidrAllocator ipam.CIDRAllocator
taintManager *scheduler.NoExecuteTaintManager
nodeExistsInCloudProvider func(types.NodeName) (bool, error)
computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState)
enterPartialDisruptionFunc func(nodeNum int) float32
enterFullDisruptionFunc func(nodeNum int) float32
zoneStates map[string]ZoneState
podEvictionTimeout time.Duration
evictionLimiterQPS float32
secondaryEvictionLimiterQPS float32
largeClusterThreshold int32
@ -208,29 +200,20 @@ type Controller struct {
taintNodeByCondition bool
}
// NewNodeController returns a new node controller to sync instances from cloudprovider.
// This method returns an error if it is unable to initialize the CIDR bitmap with
// podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes
// currently, this should be handled as a fatal error.
func NewNodeController(
podInformer coreinformers.PodInformer,
// NewNodeLifecycleController returns a new taint controller.
func NewNodeLifecycleController(podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
daemonSetInformer extensionsinformers.DaemonSetInformer,
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
nodeMonitorPeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorGracePeriod time.Duration,
podEvictionTimeout time.Duration,
evictionLimiterQPS float32,
secondaryEvictionLimiterQPS float32,
largeClusterThreshold int32,
unhealthyZoneThreshold float32,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
clusterCIDR *net.IPNet,
serviceCIDR *net.IPNet,
nodeCIDRMaskSize int,
allocateNodeCIDRs bool,
allocatorType ipam.CIDRAllocatorType,
runTaintManager bool,
useTaintBasedEvictions bool,
taintNodeByCondition bool) (*Controller, error) {
@ -241,55 +224,32 @@ func NewNodeController(
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "node-controller"})
eventBroadcaster.StartLogging(glog.Infof)
glog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(
&v1core.EventSinkImpl{
Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events(""),
})
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("node_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
}
if allocateNodeCIDRs {
if clusterCIDR == nil {
glog.Fatal("Controller: Must specify clusterCIDR if allocateNodeCIDRs == true.")
}
mask := clusterCIDR.Mask
if maskSize, _ := mask.Size(); maskSize > nodeCIDRMaskSize {
glog.Fatalf("Controller: Invalid clusterCIDR, mask size of clusterCIDR(%d) must be less than nodeCIDRMaskSize(%d).", maskSize, nodeCIDRMaskSize)
}
metrics.RegisterMetricAndTrackRateLimiterUsage("node_lifecycle_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
}
nc := &Controller{
cloud: cloud,
knownNodeSet: make(map[string]*v1.Node),
kubeClient: kubeClient,
recorder: recorder,
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue),
zoneNoExecuteTainer: make(map[string]*scheduler.RateLimitedTimedQueue),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
lookupIP: net.LookupIP,
now: metav1.Now,
clusterCIDR: clusterCIDR,
serviceCIDR: serviceCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
allocatorType: allocatorType,
cloud: cloud,
kubeClient: kubeClient,
now: metav1.Now,
knownNodeSet: make(map[string]*v1.Node),
nodeStatusMap: make(map[string]nodeStatusData),
nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) {
return util.NodeExistsInCloudProvider(cloud, nodeName)
return nodeutil.ExistsInCloudProvider(cloud, nodeName)
},
recorder: recorder,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue),
zoneNoExecuteTainter: make(map[string]*scheduler.RateLimitedTimedQueue),
zoneStates: make(map[string]ZoneState),
podEvictionTimeout: podEvictionTimeout,
evictionLimiterQPS: evictionLimiterQPS,
secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
largeClusterThreshold: largeClusterThreshold,
unhealthyZoneThreshold: unhealthyZoneThreshold,
zoneStates: make(map[string]ZoneState),
runTaintManager: runTaintManager,
useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager,
taintNodeByCondition: taintNodeByCondition,
@ -297,6 +257,7 @@ func NewNodeController(
if useTaintBasedEvictions {
glog.Infof("Controller is using taint based evictions.")
}
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
nc.computeZoneStateFunc = nc.ComputeZoneState
@ -337,48 +298,18 @@ func NewNodeController(
})
nc.podInformerSynced = podInformer.Informer().HasSynced
if nc.allocateNodeCIDRs {
if nc.allocatorType == ipam.IPAMFromClusterAllocatorType || nc.allocatorType == ipam.IPAMFromCloudAllocatorType {
cfg := &ipam.Config{
Resync: ipamResyncInterval,
MaxBackoff: ipamMaxBackoff,
InitialRetry: ipamInitialBackoff,
}
switch nc.allocatorType {
case ipam.IPAMFromClusterAllocatorType:
cfg.Mode = nodesync.SyncFromCluster
case ipam.IPAMFromCloudAllocatorType:
cfg.Mode = nodesync.SyncFromCloud
}
ipamc, err := ipam.NewController(cfg, kubeClient, cloud, clusterCIDR, serviceCIDR, nodeCIDRMaskSize)
if err != nil {
glog.Fatalf("Error creating ipam controller: %v", err)
}
if err := ipamc.Start(nodeInformer); err != nil {
glog.Fatalf("Error trying to Init(): %v", err)
}
} else {
var err error
nc.cidrAllocator, err = ipam.New(
kubeClient, cloud, nodeInformer, nc.allocatorType, nc.clusterCIDR, nc.serviceCIDR, nodeCIDRMaskSize)
if err != nil {
return nil, err
}
}
}
if nc.runTaintManager {
nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient)
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error {
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(nil, node)
return nil
}),
UpdateFunc: util.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
nc.taintManager.NodeUpdated(oldNode, newNode)
return nil
}),
DeleteFunc: util.CreateDeleteNodeHandler(func(node *v1.Node) error {
DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(node, nil)
return nil
}),
@ -388,10 +319,10 @@ func NewNodeController(
if nc.taintNodeByCondition {
glog.Infof("Controller will taint node by condition.")
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error {
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
return nc.doNoScheduleTaintingPass(node)
}),
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
return nc.doNoScheduleTaintingPass(newNode)
}),
})
@ -400,10 +331,10 @@ func NewNodeController(
// NOTE(resouer): nodeInformer to substitute deprecated taint key (notReady -> not-ready).
// Remove this logic when we don't need this backwards compatibility
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error {
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
return nc.doFixDeprecatedTaintKeyPass(node)
}),
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
return nc.doFixDeprecatedTaintKeyPass(newNode)
}),
})
@ -417,33 +348,40 @@ func NewNodeController(
return nc, nil
}
func (nc *Controller) doEvictionPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
glog.Warningf("Node %v no longer present in nodeLister!", value.Value)
} else if err != nil {
glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
} else {
zone := utilnode.GetZoneKey(node)
evictionsNumber.WithLabelValues(zone).Inc()
}
nodeUID, _ := value.UID.(string)
remaining, err := util.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
glog.Infof("Pods awaiting deletion due to Controller eviction")
}
return true, 0
})
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting node controller")
defer glog.Infof("Shutting down node controller")
if !controller.WaitForCacheSync("taint", stopCh, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
return
}
if nc.runTaintManager {
go nc.taintManager.Run(wait.NeverStop)
}
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, wait.NeverStop)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop)
}
// Incorporate the results of node status pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
}, nc.nodeMonitorPeriod, wait.NeverStop)
<-stopCh
}
// doFixDeprecatedTaintKeyPass checks and replaces deprecated taint key with proper key name if needed.
@ -478,7 +416,7 @@ func (nc *Controller) doFixDeprecatedTaintKeyPass(node *v1.Node) error {
glog.Warningf("Detected deprecated taint keys: %v on node: %v, will substitute them with %v",
taintsToDel, node.GetName(), taintsToAdd)
if !util.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
return fmt.Errorf("failed to swap taints of node %+v", node)
}
return nil
@ -506,7 +444,7 @@ func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error {
if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
return nil
}
if !util.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
return fmt.Errorf("failed to swap taints of node %+v", node)
}
return nil
@ -515,9 +453,9 @@ func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error {
func (nc *Controller) doNoExecuteTaintingPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zoneNoExecuteTainer {
for k := range nc.zoneNoExecuteTainter {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zoneNoExecuteTainer[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
nc.zoneNoExecuteTainter[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
glog.Warningf("Node %v no longer present in nodeLister!", value.Value)
@ -546,70 +484,37 @@ func (nc *Controller) doNoExecuteTaintingPass() {
return true, 0
}
return util.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node), 0
return nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node), 0
})
}
}
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting node controller")
defer glog.Infof("Shutting down node controller")
if !controller.WaitForCacheSync("node", stopCh, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
return
}
// Incorporate the results of node status pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
}, nc.nodeMonitorPeriod, wait.NeverStop)
if nc.runTaintManager {
go nc.taintManager.Run(wait.NeverStop)
}
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, wait.NeverStop)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop)
}
if nc.allocateNodeCIDRs {
if nc.allocatorType != ipam.IPAMFromClusterAllocatorType && nc.allocatorType != ipam.IPAMFromCloudAllocatorType {
go nc.cidrAllocator.Run(wait.NeverStop)
}
}
<-stopCh
}
// addPodEvictorForNewZone checks if new zone appeared, and if so add new evictor.
func (nc *Controller) addPodEvictorForNewZone(node *v1.Node) {
zone := utilnode.GetZoneKey(node)
if _, found := nc.zoneStates[zone]; !found {
nc.zoneStates[zone] = stateInitial
if !nc.useTaintBasedEvictions {
nc.zonePodEvictor[zone] =
scheduler.NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))
} else {
nc.zoneNoExecuteTainer[zone] =
scheduler.NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))
}
// Init the metric for the new zone.
glog.Infof("Initializing eviction metric for zone: %v", zone)
evictionsNumber.WithLabelValues(zone).Add(0)
func (nc *Controller) doEvictionPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
glog.Warningf("Node %v no longer present in nodeLister!", value.Value)
} else if err != nil {
glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
} else {
zone := utilnode.GetZoneKey(node)
evictionsNumber.WithLabelValues(zone).Inc()
}
nodeUID, _ := value.UID.(string)
remaining, err := nodeutil.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
glog.Infof("Pods awaiting deletion due to Controller eviction")
}
return true, 0
})
}
}
@ -631,7 +536,7 @@ func (nc *Controller) monitorNodeStatus() error {
for i := range added {
glog.V(1).Infof("Controller observed a new Node: %#v", added[i].Name)
util.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
nodeutil.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
nc.knownNodeSet[added[i].Name] = added[i]
nc.addPodEvictorForNewZone(added[i])
if nc.useTaintBasedEvictions {
@ -643,7 +548,7 @@ func (nc *Controller) monitorNodeStatus() error {
for i := range deleted {
glog.V(1).Infof("Controller observed a Node deletion: %v", deleted[i].Name)
util.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name))
nodeutil.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name))
delete(nc.knownNodeSet, deleted[i].Name)
}
@ -684,7 +589,7 @@ func (nc *Controller) monitorNodeStatus() error {
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
taintToAdd := *NotReadyTaintTemplate
if !util.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) {
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) {
glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) {
@ -711,7 +616,7 @@ func (nc *Controller) monitorNodeStatus() error {
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
taintToAdd := *UnreachableTaintTemplate
if !util.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) {
@ -751,8 +656,8 @@ func (nc *Controller) monitorNodeStatus() error {
// Report node event.
if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
util.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
if err = util.MarkAllPodsNotReady(nc.kubeClient, node); err != nil {
nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
if err = nodeutil.MarkAllPodsNotReady(nc.kubeClient, node); err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
}
}
@ -767,13 +672,13 @@ func (nc *Controller) monitorNodeStatus() error {
}
if !exists {
glog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
util.RecordNodeEvent(nc.recorder, node.Name, string(node.UID), v1.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
nodeutil.RecordNodeEvent(nc.recorder, node.Name, string(node.UID), v1.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
go func(nodeName string) {
defer utilruntime.HandleCrash()
// Kubelet is not reporting and Cloud Provider says node
// is gone. Delete it without worrying about grace
// periods.
if err := util.ForcefullyDeleteNode(nc.kubeClient, nodeName); err != nil {
if err := nodeutil.ForcefullyDeleteNode(nc.kubeClient, nodeName); err != nil {
glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err)
}
}(node.Name)
@ -786,131 +691,6 @@ func (nc *Controller) monitorNodeStatus() error {
return nil
}
func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
newZoneStates := map[string]ZoneState{}
allAreFullyDisrupted := true
for k, v := range zoneToNodeConditions {
zoneSize.WithLabelValues(k).Set(float64(len(v)))
unhealthy, newState := nc.computeZoneStateFunc(v)
zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
if newState != stateFullDisruption {
allAreFullyDisrupted = false
}
newZoneStates[k] = newState
if _, had := nc.zoneStates[k]; !had {
glog.Errorf("Setting initial state for unseen zone: %v", k)
nc.zoneStates[k] = stateInitial
}
}
allWasFullyDisrupted := true
for k, v := range nc.zoneStates {
if _, have := zoneToNodeConditions[k]; !have {
zoneSize.WithLabelValues(k).Set(0)
zoneHealth.WithLabelValues(k).Set(100)
unhealthyNodes.WithLabelValues(k).Set(0)
delete(nc.zoneStates, k)
continue
}
if v != stateFullDisruption {
allWasFullyDisrupted = false
break
}
}
// At least one node was responding in previous pass or in the current pass. Semantics is as follows:
// - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use,
// - if the new state is "normal" we resume normal operation (go back to default limiter settings),
// - if new state is "fullDisruption" we restore normal eviction rate,
// - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions.
if !allAreFullyDisrupted || !allWasFullyDisrupted {
// We're switching to full disruption mode
if allAreFullyDisrupted {
glog.V(0).Info("Controller detected that all Nodes are not-Ready. Entering master disruption mode.")
for i := range nodes {
if nc.useTaintBasedEvictions {
_, err := nc.markNodeAsReachable(nodes[i])
if err != nil {
glog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
}
} else {
nc.cancelPodEviction(nodes[i])
}
}
// We stop all evictions.
for k := range nc.zoneStates {
if nc.useTaintBasedEvictions {
nc.zoneNoExecuteTainer[k].SwapLimiter(0)
} else {
nc.zonePodEvictor[k].SwapLimiter(0)
}
}
for k := range nc.zoneStates {
nc.zoneStates[k] = stateFullDisruption
}
// All rate limiters are updated, so we can return early here.
return
}
// We're exiting full disruption mode
if allWasFullyDisrupted {
glog.V(0).Info("Controller detected that some Nodes are Ready. Exiting master disruption mode.")
// When exiting disruption mode update probe timestamps on all Nodes.
now := nc.now()
for i := range nodes {
v := nc.nodeStatusMap[nodes[i].Name]
v.probeTimestamp = now
v.readyTransitionTimestamp = now
nc.nodeStatusMap[nodes[i].Name] = v
}
// We reset all rate limiters to settings appropriate for the given state.
for k := range nc.zoneStates {
nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
nc.zoneStates[k] = newZoneStates[k]
}
return
}
// We know that there's at least one not-fully disrupted so,
// we can use default behavior for rate limiters
for k, v := range nc.zoneStates {
newState := newZoneStates[k]
if v == newState {
continue
}
glog.V(0).Infof("Controller detected that zone %v is now in state %v.", k, newState)
nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
nc.zoneStates[k] = newState
}
}
}
func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) {
switch state {
case stateNormal:
if nc.useTaintBasedEvictions {
nc.zoneNoExecuteTainer[zone].SwapLimiter(nc.evictionLimiterQPS)
} else {
nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
}
case statePartialDisruption:
if nc.useTaintBasedEvictions {
nc.zoneNoExecuteTainer[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
} else {
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
}
case stateFullDisruption:
if nc.useTaintBasedEvictions {
nc.zoneNoExecuteTainer[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
} else {
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
}
}
}
// tryUpdateNodeStatus checks a given node's conditions and tries to update it. Returns grace period to
// which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
func (nc *Controller) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) {
@ -1082,6 +862,131 @@ func (nc *Controller) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.Node
return gracePeriod, observedReadyCondition, currentReadyCondition, err
}
func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
newZoneStates := map[string]ZoneState{}
allAreFullyDisrupted := true
for k, v := range zoneToNodeConditions {
zoneSize.WithLabelValues(k).Set(float64(len(v)))
unhealthy, newState := nc.computeZoneStateFunc(v)
zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
if newState != stateFullDisruption {
allAreFullyDisrupted = false
}
newZoneStates[k] = newState
if _, had := nc.zoneStates[k]; !had {
glog.Errorf("Setting initial state for unseen zone: %v", k)
nc.zoneStates[k] = stateInitial
}
}
allWasFullyDisrupted := true
for k, v := range nc.zoneStates {
if _, have := zoneToNodeConditions[k]; !have {
zoneSize.WithLabelValues(k).Set(0)
zoneHealth.WithLabelValues(k).Set(100)
unhealthyNodes.WithLabelValues(k).Set(0)
delete(nc.zoneStates, k)
continue
}
if v != stateFullDisruption {
allWasFullyDisrupted = false
break
}
}
// At least one node was responding in previous pass or in the current pass. Semantics is as follows:
// - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use,
// - if the new state is "normal" we resume normal operation (go back to default limiter settings),
// - if new state is "fullDisruption" we restore normal eviction rate,
// - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions.
if !allAreFullyDisrupted || !allWasFullyDisrupted {
// We're switching to full disruption mode
if allAreFullyDisrupted {
glog.V(0).Info("Controller detected that all Nodes are not-Ready. Entering master disruption mode.")
for i := range nodes {
if nc.useTaintBasedEvictions {
_, err := nc.markNodeAsReachable(nodes[i])
if err != nil {
glog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
}
} else {
nc.cancelPodEviction(nodes[i])
}
}
// We stop all evictions.
for k := range nc.zoneStates {
if nc.useTaintBasedEvictions {
nc.zoneNoExecuteTainter[k].SwapLimiter(0)
} else {
nc.zonePodEvictor[k].SwapLimiter(0)
}
}
for k := range nc.zoneStates {
nc.zoneStates[k] = stateFullDisruption
}
// All rate limiters are updated, so we can return early here.
return
}
// We're exiting full disruption mode
if allWasFullyDisrupted {
glog.V(0).Info("Controller detected that some Nodes are Ready. Exiting master disruption mode.")
// When exiting disruption mode update probe timestamps on all Nodes.
now := nc.now()
for i := range nodes {
v := nc.nodeStatusMap[nodes[i].Name]
v.probeTimestamp = now
v.readyTransitionTimestamp = now
nc.nodeStatusMap[nodes[i].Name] = v
}
// We reset all rate limiters to settings appropriate for the given state.
for k := range nc.zoneStates {
nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
nc.zoneStates[k] = newZoneStates[k]
}
return
}
// We know that there's at least one not-fully disrupted so,
// we can use default behavior for rate limiters
for k, v := range nc.zoneStates {
newState := newZoneStates[k]
if v == newState {
continue
}
glog.V(0).Infof("Controller detected that zone %v is now in state %v.", k, newState)
nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
nc.zoneStates[k] = newState
}
}
}
func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) {
switch state {
case stateNormal:
if nc.useTaintBasedEvictions {
nc.zoneNoExecuteTainter[zone].SwapLimiter(nc.evictionLimiterQPS)
} else {
nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
}
case statePartialDisruption:
if nc.useTaintBasedEvictions {
nc.zoneNoExecuteTainter[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
} else {
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
}
case stateFullDisruption:
if nc.useTaintBasedEvictions {
nc.zoneNoExecuteTainter[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
} else {
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
}
}
}
// classifyNodes classifies the allNodes to three categories:
// 1. added: the nodes that in 'allNodes', but not in 'knownNodeSet'
// 2. deleted: the nodes that in 'knownNodeSet', but not in 'allNodes'
@ -1116,6 +1021,41 @@ func (nc *Controller) classifyNodes(allNodes []*v1.Node) (added, deleted, newZon
return
}
// HealthyQPSFunc returns the default value for cluster eviction rate - we take
// nodeNum for consistency with ReducedQPSFunc.
func (nc *Controller) HealthyQPSFunc(nodeNum int) float32 {
return nc.evictionLimiterQPS
}
// ReducedQPSFunc returns the QPS for when a the cluster is large make
// evictions slower, if they're small stop evictions altogether.
func (nc *Controller) ReducedQPSFunc(nodeNum int) float32 {
if int32(nodeNum) > nc.largeClusterThreshold {
return nc.secondaryEvictionLimiterQPS
}
return 0
}
// addPodEvictorForNewZone checks if new zone appeared, and if so add new evictor.
func (nc *Controller) addPodEvictorForNewZone(node *v1.Node) {
zone := utilnode.GetZoneKey(node)
if _, found := nc.zoneStates[zone]; !found {
nc.zoneStates[zone] = stateInitial
if !nc.useTaintBasedEvictions {
nc.zonePodEvictor[zone] =
scheduler.NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))
} else {
nc.zoneNoExecuteTainter[zone] =
scheduler.NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))
}
// Init the metric for the new zone.
glog.Infof("Initializing eviction metric for zone: %v", zone)
evictionsNumber.WithLabelValues(zone).Add(0)
}
}
// cancelPodEviction removes any queued evictions, typically because the node is available again. It
// returns true if an eviction was queued.
func (nc *Controller) cancelPodEviction(node *v1.Node) bool {
@ -1141,7 +1081,7 @@ func (nc *Controller) evictPods(node *v1.Node) bool {
func (nc *Controller) markNodeForTainting(node *v1.Node) bool {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
return nc.zoneNoExecuteTainer[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
return nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
}
func (nc *Controller) markNodeAsReachable(node *v1.Node) (bool, error) {
@ -1157,22 +1097,7 @@ func (nc *Controller) markNodeAsReachable(node *v1.Node) (bool, error) {
glog.Errorf("Failed to remove taint from node %v: %v", node.Name, err)
return false, err
}
return nc.zoneNoExecuteTainer[utilnode.GetZoneKey(node)].Remove(node.Name), nil
}
// HealthyQPSFunc returns the default value for cluster eviction rate - we take
// nodeNum for consistency with ReducedQPSFunc.
func (nc *Controller) HealthyQPSFunc(nodeNum int) float32 {
return nc.evictionLimiterQPS
}
// ReducedQPSFunc returns the QPS for when a the cluster is large make
// evictions slower, if they're small stop evictions altogether.
func (nc *Controller) ReducedQPSFunc(nodeNum int) float32 {
if int32(nodeNum) > nc.largeClusterThreshold {
return nc.secondaryEvictionLimiterQPS
}
return 0
return nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Remove(node.Name), nil
}
// ComputeZoneState returns a slice of NodeReadyConditions for all Nodes in a given zone.

View File

@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors.
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,10 +14,9 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package nodelifecycle
import (
"net"
"strings"
"testing"
"time"
@ -39,10 +38,9 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/node/ipam"
"k8s.io/kubernetes/pkg/controller/node/scheduler"
"k8s.io/kubernetes/pkg/controller/node/util"
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
"k8s.io/kubernetes/pkg/controller/testutil"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/util/node"
taintutils "k8s.io/kubernetes/pkg/util/taints"
@ -60,13 +58,46 @@ const (
func alwaysReady() bool { return true }
type nodeController struct {
type nodeLifecycleController struct {
*Controller
nodeInformer coreinformers.NodeInformer
daemonSetInformer extensionsinformers.DaemonSetInformer
}
func newNodeControllerFromClient(
// doEviction does the fake eviction and returns the status of eviction operation.
func (nc *nodeLifecycleController) doEviction(fakeNodeHandler *testutil.FakeNodeHandler) bool {
var podEvicted bool
zones := testutil.GetZones(fakeNodeHandler)
for _, zone := range zones {
nc.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
uid, _ := value.UID.(string)
nodeutil.DeletePods(fakeNodeHandler, nc.recorder, value.Value, uid, nc.daemonSetStore)
return true, 0
})
}
for _, action := range fakeNodeHandler.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podEvicted = true
return podEvicted
}
}
return podEvicted
}
func (nc *nodeLifecycleController) syncNodeStore(fakeNodeHandler *testutil.FakeNodeHandler) error {
nodes, err := fakeNodeHandler.List(metav1.ListOptions{})
if err != nil {
return err
}
newElems := make([]interface{}, 0, len(nodes.Items))
for i := range nodes.Items {
newElems = append(newElems, &nodes.Items[i])
}
return nc.nodeInformer.Informer().GetStore().Replace(newElems, "newRV")
}
func newNodeLifecycleControllerFromClient(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
@ -77,37 +108,28 @@ func newNodeControllerFromClient(
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
clusterCIDR *net.IPNet,
serviceCIDR *net.IPNet,
nodeCIDRMaskSize int,
allocateNodeCIDRs bool,
useTaints bool,
) (*nodeController, error) {
) (*nodeLifecycleController, error) {
factory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
nodeInformer := factory.Core().V1().Nodes()
daemonSetInformer := factory.Extensions().V1beta1().DaemonSets()
nc, err := NewNodeController(
nc, err := NewNodeLifecycleController(
factory.Core().V1().Pods(),
nodeInformer,
daemonSetInformer,
cloud,
kubeClient,
nodeMonitorPeriod,
nodeStartupGracePeriod,
nodeMonitorGracePeriod,
podEvictionTimeout,
evictionLimiterQPS,
secondaryEvictionLimiterQPS,
largeClusterThreshold,
unhealthyZoneThreshold,
nodeMonitorGracePeriod,
nodeStartupGracePeriod,
nodeMonitorPeriod,
clusterCIDR,
serviceCIDR,
nodeCIDRMaskSize,
allocateNodeCIDRs,
ipam.RangeAllocatorType,
useTaints,
useTaints,
useTaints,
@ -120,19 +142,7 @@ func newNodeControllerFromClient(
nc.nodeInformerSynced = alwaysReady
nc.daemonSetInformerSynced = alwaysReady
return &nodeController{nc, nodeInformer, daemonSetInformer}, nil
}
func syncNodeStore(nc *nodeController, fakeNodeHandler *testutil.FakeNodeHandler) error {
nodes, err := fakeNodeHandler.List(metav1.ListOptions{})
if err != nil {
return err
}
newElems := make([]interface{}, 0, len(nodes.Items))
for i := range nodes.Items {
newElems = append(newElems, &nodes.Items[i])
}
return nc.nodeInformer.Informer().GetStore().Replace(newElems, "newRV")
return &nodeLifecycleController{nc, nodeInformer, daemonSetInformer}, nil
}
func TestMonitorNodeStatusEvictPods(t *testing.T) {
@ -597,7 +607,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}
for _, item := range table {
nodeController, _ := newNodeControllerFromClient(
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
item.fakeNodeHandler,
evictionTimeout,
@ -608,17 +618,13 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
nil,
nil,
0,
false,
false)
nodeController.now = func() metav1.Time { return fakeNow }
nodeController.recorder = testutil.NewFakeRecorder()
for _, ds := range item.daemonSets {
nodeController.daemonSetInformer.Informer().GetStore().Add(&ds)
}
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
@ -633,7 +639,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
item.fakeNodeHandler.Existing[0].Labels = labels
item.fakeNodeHandler.Existing[1].Labels = labels
}
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
@ -644,7 +650,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
if _, ok := nodeController.zonePodEvictor[zone]; ok {
nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
nodeUID, _ := value.UID.(string)
util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister())
nodeutil.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister())
return true, 0
})
} else {
@ -763,12 +769,21 @@ func TestPodStatusChange(t *testing.T) {
}
for _, item := range table {
nodeController, _ := newNodeControllerFromClient(nil, item.fakeNodeHandler,
evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false)
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
item.fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
testRateLimiterQPS,
testLargeClusterThreshold,
testUnhealthyThreshold,
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
false)
nodeController.now = func() metav1.Time { return fakeNow }
nodeController.recorder = testutil.NewFakeRecorder()
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
@ -779,7 +794,7 @@ func TestPodStatusChange(t *testing.T) {
item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus
item.fakeNodeHandler.Existing[1].Status = item.secondNodeNewStatus
}
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
@ -789,7 +804,7 @@ func TestPodStatusChange(t *testing.T) {
for _, zone := range zones {
nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
nodeUID, _ := value.UID.(string)
util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore)
nodeutil.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore)
return true, 0
})
}
@ -809,7 +824,6 @@ func TestPodStatusChange(t *testing.T) {
t.Errorf("expected pod update: %+v, got %+v for %+v", podReasonUpdate, item.expectedPodUpdate, item.description)
}
}
}
func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
@ -1280,9 +1294,18 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
Existing: item.nodeList,
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: item.podList}),
}
nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler,
evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false)
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
testRateLimiterQPS,
testLargeClusterThreshold,
testUnhealthyThreshold,
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
false)
nodeController.now = func() metav1.Time { return fakeNow }
nodeController.enterPartialDisruptionFunc = func(nodeNum int) float32 {
return testRateLimiterQPS
@ -1291,7 +1314,7 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
nodeController.enterFullDisruptionFunc = func(nodeNum int) float32 {
return testRateLimiterQPS
}
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
@ -1309,7 +1332,7 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
fakeNodeHandler.Existing[i].Status = item.updatedNodeStatuses[i]
}
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
@ -1337,27 +1360,6 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
}
}
// doEviction does the fake eviction and returns the status of eviction operation.
func (nc *nodeController) doEviction(fakeNodeHandler *testutil.FakeNodeHandler) bool {
var podEvicted bool
zones := testutil.GetZones(fakeNodeHandler)
for _, zone := range zones {
nc.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
uid, _ := value.UID.(string)
util.DeletePods(fakeNodeHandler, nc.recorder, value.Value, uid, nc.daemonSetStore)
return true, 0
})
}
for _, action := range fakeNodeHandler.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podEvicted = true
return podEvicted
}
}
return podEvicted
}
// TestCloudProviderNoRateLimit tests that monitorNodes() immediately deletes
// pods and the node when kubelet has not reported, and the cloudprovider says
// the node is gone.
@ -1384,10 +1386,18 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0"), *testutil.NewPod("pod1", "node0")}}),
DeleteWaitChan: make(chan struct{}),
}
nodeController, _ := newNodeControllerFromClient(nil, fnh, 10*time.Minute,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false, false)
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fnh,
10*time.Minute,
testRateLimiterQPS,
testRateLimiterQPS,
testLargeClusterThreshold,
testUnhealthyThreshold,
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
false)
nodeController.cloud = &fakecloud.FakeCloud{}
nodeController.now = func() metav1.Time { return metav1.Date(2016, 1, 1, 12, 0, 0, 0, time.UTC) }
nodeController.recorder = testutil.NewFakeRecorder()
@ -1395,7 +1405,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
return false, nil
}
// monitorNodeStatus should allow this node to be immediately deleted
if err := syncNodeStore(nodeController, fnh); err != nil {
if err := nodeController.syncNodeStore(fnh); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
@ -1624,12 +1634,21 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
}
for i, item := range table {
nodeController, _ := newNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false)
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
item.fakeNodeHandler,
5*time.Minute,
testRateLimiterQPS,
testRateLimiterQPS,
testLargeClusterThreshold,
testUnhealthyThreshold,
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
false)
nodeController.now = func() metav1.Time { return fakeNow }
nodeController.recorder = testutil.NewFakeRecorder()
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
@ -1638,7 +1657,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
if item.timeToPass > 0 {
nodeController.now = func() metav1.Time { return metav1.Time{Time: fakeNow.Add(item.timeToPass)} }
item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
@ -1768,12 +1787,21 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
}
for i, item := range table {
nodeController, _ := newNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false)
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
item.fakeNodeHandler,
5*time.Minute,
testRateLimiterQPS,
testRateLimiterQPS,
testLargeClusterThreshold,
testUnhealthyThreshold,
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
false)
nodeController.now = func() metav1.Time { return fakeNow }
nodeController.recorder = testutil.NewFakeRecorder()
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
@ -1782,7 +1810,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
if item.timeToPass > 0 {
nodeController.now = func() metav1.Time { return metav1.Time{Time: fakeNow.Add(item.timeToPass)} }
item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
@ -1879,12 +1907,21 @@ func TestSwapUnreachableNotReadyTaints(t *testing.T) {
originalTaint := UnreachableTaintTemplate
updatedTaint := NotReadyTaintTemplate
nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler,
evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, true)
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
testRateLimiterQPS,
testLargeClusterThreshold,
testUnhealthyThreshold,
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
true)
nodeController.now = func() metav1.Time { return fakeNow }
nodeController.recorder = testutil.NewFakeRecorder()
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
@ -1922,7 +1959,7 @@ func TestSwapUnreachableNotReadyTaints(t *testing.T) {
return
}
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
@ -1972,9 +2009,18 @@ func TestTaintsNodeByCondition(t *testing.T) {
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
}
nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, evictionTimeout,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, true)
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
testRateLimiterQPS,
testLargeClusterThreshold,
testUnhealthyThreshold,
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
true)
nodeController.now = func() metav1.Time { return fakeNow }
nodeController.recorder = testutil.NewFakeRecorder()
@ -2098,11 +2144,11 @@ func TestTaintsNodeByCondition(t *testing.T) {
for _, test := range tests {
fakeNodeHandler.Update(test.Node)
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.doNoScheduleTaintingPass(test.Node)
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
node0, err := nodeController.nodeLister.Get("node0")
@ -2150,10 +2196,18 @@ func TestNodeEventGeneration(t *testing.T) {
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
}
nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false, false)
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
5*time.Minute,
testRateLimiterQPS,
testRateLimiterQPS,
testLargeClusterThreshold,
testUnhealthyThreshold,
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
false)
nodeController.cloud = &fakecloud.FakeCloud{}
nodeController.nodeExistsInCloudProvider = func(nodeName types.NodeName) (bool, error) {
return false, nil
@ -2161,7 +2215,7 @@ func TestNodeEventGeneration(t *testing.T) {
nodeController.now = func() metav1.Time { return fakeNow }
fakeRecorder := testutil.NewFakeRecorder()
nodeController.recorder = fakeRecorder
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
@ -2208,9 +2262,18 @@ func TestFixDeprecatedTaintKey(t *testing.T) {
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
}
nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, evictionTimeout,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, true)
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
testRateLimiterQPS,
testLargeClusterThreshold,
testUnhealthyThreshold,
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
true)
nodeController.now = func() metav1.Time { return fakeNow }
nodeController.recorder = testutil.NewFakeRecorder()
@ -2319,11 +2382,11 @@ func TestFixDeprecatedTaintKey(t *testing.T) {
for _, test := range tests {
fakeNodeHandler.Update(test.Node)
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.doFixDeprecatedTaintKeyPass(test.Node)
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
node, err := nodeController.nodeLister.Get(test.Node.GetName())

View File

@ -1,39 +1,14 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = [
"rate_limited_queue_test.go",
"taint_controller_test.go",
"timed_workers_test.go",
],
embed = [":go_default_library"],
importpath = "k8s.io/kubernetes/pkg/controller/node/scheduler",
deps = [
"//pkg/controller/testutil:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"rate_limited_queue.go",
"taint_controller.go",
"taint_manager.go",
"timed_workers.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/node/scheduler",
importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler",
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core/helper:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
@ -53,6 +28,26 @@ go_library(
],
)
go_test(
name = "go_default_test",
srcs = [
"rate_limited_queue_test.go",
"taint_manager_test.go",
"timed_workers_test.go",
],
embed = [":go_default_library"],
importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler",
deps = [
"//pkg/controller/testutil:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
@ -64,4 +59,5 @@ filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -18,9 +18,6 @@ package scheduler
import (
"fmt"
"sync"
"time"
"k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/apis/core/helper"
@ -30,6 +27,8 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sync"
"time"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"

View File

@ -1,14 +1,10 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["controller_utils.go"],
importpath = "k8s.io/kubernetes/pkg/controller/node/util",
importpath = "k8s.io/kubernetes/pkg/controller/util/node",
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/cloudprovider:go_default_library",
@ -41,4 +37,5 @@ filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package util
package node
import (
"errors"
@ -170,9 +170,9 @@ func MarkAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error {
return fmt.Errorf("%v", strings.Join(errMsg, "; "))
}
// NodeExistsInCloudProvider returns true if the node exists in the
// ExistsInCloudProvider returns true if the node exists in the
// cloud provider.
func NodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) {
func ExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) {
instances, ok := cloud.Instances()
if !ok {
return false, fmt.Errorf("%v", ErrCloudInstance)
@ -198,7 +198,7 @@ func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype
recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
}
// RecordNodeStatusChange records a event related to a node status change.
// RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam)
func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) {
ref := &v1.ObjectReference{
Kind: "Node",
@ -257,7 +257,7 @@ func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
}
}
// CreateUpdateNodeHandler creates a node update handler.
// CreateUpdateNodeHandler creates a node update handler. (Common to lifecycle and ipam)
func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) {
return func(origOldObj, origNewObj interface{}) {
node := origNewObj.(*v1.Node).DeepCopy()
@ -269,7 +269,7 @@ func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldOb
}
}
// CreateDeleteNodeHandler creates a delete node handler.
// CreateDeleteNodeHandler creates a delete node handler. (Common to lifecycle and ipam)
func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
return func(originalObj interface{}) {
originalNode, isNode := originalObj.(*v1.Node)

View File

@ -33,7 +33,7 @@ go_library(
"//pkg/controller/daemon:go_default_library",
"//pkg/controller/deployment/util:go_default_library",
"//pkg/controller/job:go_default_library",
"//pkg/controller/node:go_default_library",
"//pkg/controller/nodelifecycle:go_default_library",
"//pkg/controller/replicaset:go_default_library",
"//pkg/controller/replication:go_default_library",
"//pkg/kubectl:go_default_library",

View File

@ -33,7 +33,7 @@ import (
"k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
api "k8s.io/kubernetes/pkg/apis/core"
nodepkg "k8s.io/kubernetes/pkg/controller/node"
nodepkg "k8s.io/kubernetes/pkg/controller/nodelifecycle"
"k8s.io/kubernetes/test/e2e/common"
"k8s.io/kubernetes/test/e2e/framework"
testutils "k8s.io/kubernetes/test/utils"

View File

@ -57,7 +57,7 @@ go_library(
"//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/deployment/util:go_default_library",
"//pkg/controller/node:go_default_library",
"//pkg/controller/nodelifecycle:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubectl:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",

View File

@ -86,7 +86,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider/providers/azure"
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/controller"
nodectlr "k8s.io/kubernetes/pkg/controller/node"
nodectlr "k8s.io/kubernetes/pkg/controller/nodelifecycle"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubelet/util/format"

View File

@ -1,9 +1,4 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_test",
)
load("@io_bazel_rules_go//go:def.bzl", "go_test")
go_test(
name = "go_default_test",
@ -52,4 +47,5 @@ filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -30,8 +30,7 @@ go_test(
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/controller/node:go_default_library",
"//pkg/controller/node/ipam:go_default_library",
"//pkg/controller/nodelifecycle:go_default_library",
"//pkg/controller/volume/persistentvolume:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubeapiserver/admission:go_default_library",

View File

@ -37,8 +37,7 @@ import (
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
internalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
"k8s.io/kubernetes/pkg/controller/node"
"k8s.io/kubernetes/pkg/controller/node/ipam"
"k8s.io/kubernetes/pkg/controller/nodelifecycle"
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction"
pluginapi "k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction/apis/podtolerationrestriction"
@ -85,29 +84,24 @@ func TestTaintNodeByCondition(t *testing.T) {
controllerCh := make(chan struct{})
defer close(controllerCh)
// Start NodeController for taint.
nc, err := node.NewNodeController(
// Start NodeLifecycleController for taint.
nc, err := nodelifecycle.NewNodeLifecycleController(
informers.Core().V1().Pods(),
informers.Core().V1().Nodes(),
informers.Extensions().V1beta1().DaemonSets(),
nil, // CloudProvider
clientset,
time.Second, // Node monitor grace period
time.Second, // Node startup grace period
time.Second, // Node monitor period
time.Second, // Pod eviction timeout
100, // Eviction limiter QPS
100, // Secondary eviction limiter QPS
100, // Large cluster threshold
100, // Unhealthy zone threshold
time.Second, // Node monitor grace period
time.Second, // Node startup grace period
time.Second, // Node monitor period
nil, // Cluster CIDR
nil, // Service CIDR
0, // Node CIDR mask size
false, // Allocate node CIDRs
ipam.RangeAllocatorType, // Allocator type
true, // Run taint manger
true, // Enabled taint based eviction
true, // Enabled TaintNodeByCondition feature
true, // Run taint manager
true, // Use taint based evictions
true, // Enabled TaintNodeByCondition feature
)
if err != nil {
t.Errorf("Failed to create node controller: %v", err)