Merge pull request #28843 from gmarek/limiterPerZone

Automatic merge from submit-queue

Separate rate limiters for Pod evictions for different zones in NodeController

Ref. #28832

NodeController needs to be able to separately adjust rate-limits for eviction for different zones. This PR splits rate limiters.

cc @davidopp
pull/6/head
k8s-merge-robot 2016-07-13 06:42:11 -07:00 committed by GitHub
commit 7e6a856078
5 changed files with 184 additions and 81 deletions

View File

@ -72,7 +72,6 @@ import (
"k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/util/configz"
"k8s.io/kubernetes/pkg/util/crypto" "k8s.io/kubernetes/pkg/util/crypto"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog" "github.com/golang/glog"
@ -239,8 +238,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err) glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
} }
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)), s.PodEvictionTimeout.Duration, s.DeletingPodsQps, int(s.DeletingPodsBurst),
flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration) nodeController.Run(s.NodeSyncPeriod.Duration)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

View File

@ -59,7 +59,6 @@ import (
quotainstall "k8s.io/kubernetes/pkg/quota/install" quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/crypto" "k8s.io/kubernetes/pkg/util/crypto"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/contrib/mesos/pkg/profile" "k8s.io/kubernetes/contrib/mesos/pkg/profile"
@ -155,8 +154,7 @@ func (s *CMServer) Run(_ []string) error {
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR) _, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
_, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR) _, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR)
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)), s.PodEvictionTimeout.Duration, s.DeletingPodsQps, int(s.DeletingPodsBurst),
flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration) nodeController.Run(s.NodeSyncPeriod.Duration)

View File

@ -77,7 +77,6 @@ type NodeController struct {
cloud cloudprovider.Interface cloud cloudprovider.Interface
clusterCIDR *net.IPNet clusterCIDR *net.IPNet
serviceCIDR *net.IPNet serviceCIDR *net.IPNet
deletingPodsRateLimiter flowcontrol.RateLimiter
knownNodeSet map[string]*api.Node knownNodeSet map[string]*api.Node
kubeClient clientset.Interface kubeClient clientset.Interface
// Method for easy mocking in unittest. // Method for easy mocking in unittest.
@ -112,8 +111,10 @@ type NodeController struct {
// Lock to access evictor workers // Lock to access evictor workers
evictorLock *sync.Mutex evictorLock *sync.Mutex
// workers that evicts pods from unresponsive nodes. // workers that evicts pods from unresponsive nodes.
podEvictor *RateLimitedTimedQueue zonePodEvictor map[string]*RateLimitedTimedQueue
terminationEvictor *RateLimitedTimedQueue zoneTerminationEvictor map[string]*RateLimitedTimedQueue
evictionLimiterQPS float32
evictionLimiterBurst int
podEvictionTimeout time.Duration podEvictionTimeout time.Duration
// The maximum duration before a pod evicted from a node can be forcefully terminated. // The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration maximumGracePeriod time.Duration
@ -142,8 +143,8 @@ func NewNodeController(
cloud cloudprovider.Interface, cloud cloudprovider.Interface,
kubeClient clientset.Interface, kubeClient clientset.Interface,
podEvictionTimeout time.Duration, podEvictionTimeout time.Duration,
deletionEvictionLimiter flowcontrol.RateLimiter, evictionLimiterQPS float32,
terminationEvictionLimiter flowcontrol.RateLimiter, evictionLimiterBurst int,
nodeMonitorGracePeriod time.Duration, nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration, nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration, nodeMonitorPeriod time.Duration,
@ -184,8 +185,8 @@ func NewNodeController(
podEvictionTimeout: podEvictionTimeout, podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute, maximumGracePeriod: 5 * time.Minute,
evictorLock: &evictorLock, evictorLock: &evictorLock,
podEvictor: NewRateLimitedTimedQueue(deletionEvictionLimiter), zonePodEvictor: make(map[string]*RateLimitedTimedQueue),
terminationEvictor: NewRateLimitedTimedQueue(terminationEvictionLimiter), zoneTerminationEvictor: make(map[string]*RateLimitedTimedQueue),
nodeStatusMap: make(map[string]nodeStatusData), nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod, nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod, nodeMonitorPeriod: nodeMonitorPeriod,
@ -198,6 +199,8 @@ func NewNodeController(
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) }, forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) }, nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
computeZoneStateFunc: ComputeZoneState, computeZoneStateFunc: ComputeZoneState,
evictionLimiterQPS: evictionLimiterQPS,
evictionLimiterBurst: evictionLimiterBurst,
zoneStates: make(map[string]zoneState), zoneStates: make(map[string]zoneState),
} }
@ -309,7 +312,8 @@ func (nc *NodeController) Run(period time.Duration) {
go wait.Until(func() { go wait.Until(func() {
nc.evictorLock.Lock() nc.evictorLock.Lock()
defer nc.evictorLock.Unlock() defer nc.evictorLock.Unlock()
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { for k := range nc.zonePodEvictor {
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore) remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
@ -317,10 +321,11 @@ func (nc *NodeController) Run(period time.Duration) {
} }
if remaining { if remaining {
nc.terminationEvictor.Add(value.Value) nc.zoneTerminationEvictor[k].Add(value.Value)
} }
return true, 0 return true, 0
}) })
}
}, nodeEvictionPeriod, wait.NeverStop) }, nodeEvictionPeriod, wait.NeverStop)
// TODO: replace with a controller that ensures pods that are terminating complete // TODO: replace with a controller that ensures pods that are terminating complete
@ -328,7 +333,8 @@ func (nc *NodeController) Run(period time.Duration) {
go wait.Until(func() { go wait.Until(func() {
nc.evictorLock.Lock() nc.evictorLock.Lock()
defer nc.evictorLock.Unlock() defer nc.evictorLock.Unlock()
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) { for k := range nc.zoneTerminationEvictor {
nc.zoneTerminationEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod) completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err)) utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
@ -348,6 +354,7 @@ func (nc *NodeController) Run(period time.Duration) {
} }
return false, remaining return false, remaining
}) })
}
}, nodeEvictionPeriod, wait.NeverStop) }, nodeEvictionPeriod, wait.NeverStop)
go wait.Until(func() { go wait.Until(func() {
@ -372,8 +379,19 @@ func (nc *NodeController) monitorNodeStatus() error {
for i := range added { for i := range added {
glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name) glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name)
recordNodeEvent(nc.recorder, added[i].Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name)) recordNodeEvent(nc.recorder, added[i].Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
nc.cancelPodEviction(added[i])
nc.knownNodeSet[added[i].Name] = added[i] nc.knownNodeSet[added[i].Name] = added[i]
// When adding new Nodes we need to check if new zone appeared, and if so add new evictor.
zone := utilnode.GetZoneKey(added[i])
if _, found := nc.zonePodEvictor[zone]; !found {
nc.zonePodEvictor[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst))
}
if _, found := nc.zoneTerminationEvictor[zone]; !found {
nc.zoneTerminationEvictor[zone] = NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst))
}
nc.cancelPodEviction(added[i])
} }
for i := range deleted { for i := range deleted {
@ -689,10 +707,11 @@ func (nc *NodeController) checkForNodeAddedDeleted(nodes *api.NodeList) (added,
// cancelPodEviction removes any queued evictions, typically because the node is available again. It // cancelPodEviction removes any queued evictions, typically because the node is available again. It
// returns true if an eviction was queued. // returns true if an eviction was queued.
func (nc *NodeController) cancelPodEviction(node *api.Node) bool { func (nc *NodeController) cancelPodEviction(node *api.Node) bool {
zone := utilnode.GetZoneKey(node)
nc.evictorLock.Lock() nc.evictorLock.Lock()
defer nc.evictorLock.Unlock() defer nc.evictorLock.Unlock()
wasDeleting := nc.podEvictor.Remove(node.Name) wasDeleting := nc.zonePodEvictor[zone].Remove(node.Name)
wasTerminating := nc.terminationEvictor.Remove(node.Name) wasTerminating := nc.zoneTerminationEvictor[zone].Remove(node.Name)
if wasDeleting || wasTerminating { if wasDeleting || wasTerminating {
glog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name) glog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name)
return true return true
@ -703,10 +722,18 @@ func (nc *NodeController) cancelPodEviction(node *api.Node) bool {
// evictPods queues an eviction for the provided node name, and returns false if the node is already // evictPods queues an eviction for the provided node name, and returns false if the node is already
// queued for eviction. // queued for eviction.
func (nc *NodeController) evictPods(node *api.Node) bool { func (nc *NodeController) evictPods(node *api.Node) bool {
if nc.zoneStates[utilnode.GetZoneKey(node)] == stateFullSegmentation {
return false
}
nc.evictorLock.Lock() nc.evictorLock.Lock()
defer nc.evictorLock.Unlock() defer nc.evictorLock.Unlock()
return nc.podEvictor.Add(node.Name) foundHealty := false
for _, state := range nc.zoneStates {
if state != stateFullSegmentation {
foundHealty = true
break
}
}
if !foundHealty {
return false
}
zone := utilnode.GetZoneKey(node)
return nc.zonePodEvictor[zone].Add(node.Name)
} }

View File

@ -28,7 +28,6 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/util/diff" "k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
@ -36,6 +35,8 @@ const (
testNodeMonitorGracePeriod = 40 * time.Second testNodeMonitorGracePeriod = 40 * time.Second
testNodeStartupGracePeriod = 60 * time.Second testNodeStartupGracePeriod = 60 * time.Second
testNodeMonitorPeriod = 5 * time.Second testNodeMonitorPeriod = 5 * time.Second
testRateLimiterBurst = 10000
testRateLimiterQPS = float32(10000)
) )
func TestMonitorNodeStatusEvictPods(t *testing.T) { func TestMonitorNodeStatusEvictPods(t *testing.T) {
@ -74,12 +75,20 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node0", Name: "node0",
CreationTimestamp: fakeNow, CreationTimestamp: fakeNow,
Labels: map[string]string{
unversioned.LabelZoneRegion: "region1",
unversioned.LabelZoneFailureDomain: "zone1",
},
}, },
}, },
{ {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node1", Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
unversioned.LabelZoneRegion: "region1",
unversioned.LabelZoneFailureDomain: "zone1",
},
}, },
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
@ -110,6 +119,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node0", Name: "node0",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
unversioned.LabelZoneRegion: "region1",
unversioned.LabelZoneFailureDomain: "zone1",
},
}, },
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
@ -126,6 +139,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node1", Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
unversioned.LabelZoneRegion: "region1",
unversioned.LabelZoneFailureDomain: "zone1",
},
}, },
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
@ -166,6 +183,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node0", Name: "node0",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
unversioned.LabelZoneRegion: "region1",
unversioned.LabelZoneFailureDomain: "zone1",
},
}, },
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
@ -182,6 +203,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node1", Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
unversioned.LabelZoneRegion: "region1",
unversioned.LabelZoneFailureDomain: "zone1",
},
}, },
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
@ -249,6 +274,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node0", Name: "node0",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
unversioned.LabelZoneRegion: "region1",
unversioned.LabelZoneFailureDomain: "zone1",
},
}, },
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
@ -265,6 +294,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node1", Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
unversioned.LabelZoneRegion: "region1",
unversioned.LabelZoneFailureDomain: "zone1",
},
}, },
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
@ -305,6 +338,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node0", Name: "node0",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
unversioned.LabelZoneRegion: "region1",
unversioned.LabelZoneFailureDomain: "zone1",
},
}, },
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
@ -321,6 +358,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node1", Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
unversioned.LabelZoneRegion: "region1",
unversioned.LabelZoneFailureDomain: "zone1",
},
}, },
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
@ -361,6 +402,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node0", Name: "node0",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
unversioned.LabelZoneRegion: "region1",
unversioned.LabelZoneFailureDomain: "zone1",
},
}, },
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
@ -377,6 +422,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node1", Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
unversioned.LabelZoneRegion: "region1",
unversioned.LabelZoneFailureDomain: "zone1",
},
}, },
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
@ -417,6 +466,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node0", Name: "node0",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
unversioned.LabelZoneRegion: "region1",
unversioned.LabelZoneFailureDomain: "zone1",
},
}, },
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
@ -433,6 +486,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node1", Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
unversioned.LabelZoneRegion: "region2",
unversioned.LabelZoneFailureDomain: "zone2",
},
}, },
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
@ -484,6 +541,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node0", Name: "node0",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
unversioned.LabelZoneRegion: "region1",
unversioned.LabelZoneFailureDomain: "zone1",
},
}, },
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
@ -500,6 +561,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node-master", Name: "node-master",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
unversioned.LabelZoneRegion: "region1",
unversioned.LabelZoneFailureDomain: "zone1",
},
}, },
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
@ -536,7 +601,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, nodeController := NewNodeController(nil, item.fakeNodeHandler,
evictionTimeout, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, evictionTimeout, testRateLimiterQPS, testRateLimiterBurst, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
for _, ds := range item.daemonSets { for _, ds := range item.daemonSets {
@ -553,18 +618,21 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
if err := nodeController.monitorNodeStatus(); err != nil { if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
zones := getZones(item.fakeNodeHandler)
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { for _, zone := range zones {
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
remaining, _ := deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore) remaining, _ := deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore)
if remaining { if remaining {
nodeController.terminationEvictor.Add(value.Value) nodeController.zoneTerminationEvictor[zone].Add(value.Value)
} }
return true, 0 return true, 0
}) })
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
terminatePods(item.fakeNodeHandler, nodeController.recorder, value.Value, value.AddedAt, nodeController.maximumGracePeriod) terminatePods(item.fakeNodeHandler, nodeController.recorder, value.Value, value.AddedAt, nodeController.maximumGracePeriod)
return true, 0 return true, 0
}) })
}
podEvicted := false podEvicted := false
for _, action := range item.fakeNodeHandler.Actions() { for _, action := range item.fakeNodeHandler.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
@ -606,7 +674,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
deleteWaitChan: make(chan struct{}), deleteWaitChan: make(chan struct{}),
} }
nodeController := NewNodeController(nil, fnh, 10*time.Minute, nodeController := NewNodeController(nil, fnh, 10*time.Minute,
flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testRateLimiterQPS, testRateLimiterBurst,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false) testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.cloud = &fakecloud.FakeCloud{} nodeController.cloud = &fakecloud.FakeCloud{}
@ -626,7 +694,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
if len(fnh.DeletedNodes) != 1 || fnh.DeletedNodes[0].Name != "node0" { if len(fnh.DeletedNodes) != 1 || fnh.DeletedNodes[0].Name != "node0" {
t.Errorf("Node was not deleted") t.Errorf("Node was not deleted")
} }
if nodeOnQueue := nodeController.podEvictor.Remove("node0"); nodeOnQueue { if nodeOnQueue := nodeController.zonePodEvictor[""].Remove("node0"); nodeOnQueue {
t.Errorf("Node was queued for eviction. Should have been immediately deleted.") t.Errorf("Node was queued for eviction. Should have been immediately deleted.")
} }
} }
@ -839,8 +907,8 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
} }
for i, item := range table { for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterBurst,
flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil { if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -989,8 +1057,8 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
} }
for i, item := range table { for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterBurst,
flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil { if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("Case[%d] unexpected error: %v", i, err) t.Errorf("Case[%d] unexpected error: %v", i, err)
@ -1071,8 +1139,9 @@ func TestNodeDeletion(t *testing.T) {
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
} }
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterBurst,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil { if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -1081,7 +1150,7 @@ func TestNodeDeletion(t *testing.T) {
if err := nodeController.monitorNodeStatus(); err != nil { if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { nodeController.zonePodEvictor[""].Try(func(value TimedValue) (bool, time.Duration) {
deletePods(fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore) deletePods(fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore)
return true, 0 return true, 0
}) })
@ -1097,7 +1166,6 @@ func TestNodeDeletion(t *testing.T) {
} }
func TestCheckPod(t *testing.T) { func TestCheckPod(t *testing.T) {
tcs := []struct { tcs := []struct {
pod api.Pod pod api.Pod
prune bool prune bool
@ -1175,7 +1243,7 @@ func TestCheckPod(t *testing.T) {
}, },
} }
nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, nil, 0, false) nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc) nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
nc.nodeStore.Store.Add(&api.Node{ nc.nodeStore.Store.Add(&api.Node{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
@ -1242,7 +1310,7 @@ func TestCleanupOrphanedPods(t *testing.T) {
newPod("b", "bar"), newPod("b", "bar"),
newPod("c", "gone"), newPod("c", "gone"),
} }
nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, nil, 0, false) nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc.nodeStore.Store.Add(newNode("foo")) nc.nodeStore.Store.Add(newNode("foo"))
nc.nodeStore.Store.Add(newNode("bar")) nc.nodeStore.Store.Add(newNode("bar"))

View File

@ -25,6 +25,8 @@ import (
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -235,3 +237,13 @@ func contains(node *api.Node, nodes []*api.Node) bool {
} }
return false return false
} }
// Returns list of zones for all Nodes stored in FakeNodeHandler
func getZones(nodeHandler *FakeNodeHandler) []string {
nodes, _ := nodeHandler.List(api.ListOptions{})
zones := sets.NewString()
for _, node := range nodes.Items {
zones.Insert(utilnode.GetZoneKey(&node))
}
return zones.List()
}