Move evicting pods to separate thread to allow for correct ratelimitting.

pull/6/head
gmarek 2015-05-19 13:23:59 +02:00
parent 0bb78fe6c5
commit 1490543d54
7 changed files with 321 additions and 36 deletions

View File

@ -188,7 +188,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
// TODO: Write an integration test for the replication controllers watch.
go controllerManager.Run(3, util.NeverStop)
nodeController := nodecontroller.NewNodeController(nil, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(),
nodeController := nodecontroller.NewNodeController(nil, cl, 10, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewFakeRateLimiter()),
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
nodeController.Run(5 * time.Second)
cadvisorInterface := new(cadvisor.Fake)

View File

@ -208,7 +208,7 @@ func (s *CMServer) Run(_ []string) error {
}
nodeController := nodecontroller.NewNodeController(cloud, kubeClient, s.RegisterRetryCount,
s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)),
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod)

View File

@ -122,7 +122,7 @@ func runScheduler(cl *client.Client) {
func runControllerManager(cl *client.Client) {
const nodeSyncPeriod = 10 * time.Second
nodeController := nodecontroller.NewNodeController(
nil, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst),
nil, cl, 10, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst)),
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
nodeController.Run(nodeSyncPeriod)

View File

@ -39,8 +39,12 @@ var (
ErrCloudInstance = errors.New("cloud provider doesn't support instances.")
)
// nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update.
const nodeStatusUpdateRetry = 5
const (
// nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update.
nodeStatusUpdateRetry = 5
// controls how often NodeController will try to evict Pods from non-responsive Nodes.
nodeEvictionPeriod = 100 * time.Millisecond
)
type nodeStatusData struct {
probeTimestamp util.Time
@ -55,6 +59,9 @@ type NodeController struct {
registerRetryCount int
podEvictionTimeout time.Duration
deletingPodsRateLimiter util.RateLimiter
// worker that evicts pods from unresponsive nodes.
podEvictor *PodEvictor
// per Node map storing last observed Status together with a local time when it was observed.
// This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
// to aviod the problem with time skew across the cluster.
@ -94,7 +101,7 @@ func NewNodeController(
kubeClient client.Interface,
registerRetryCount int,
podEvictionTimeout time.Duration,
deletingPodsRateLimiter util.RateLimiter,
podEvictor *PodEvictor,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
@ -112,20 +119,20 @@ func NewNodeController(
glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.")
}
return &NodeController{
cloud: cloud,
kubeClient: kubeClient,
recorder: recorder,
registerRetryCount: registerRetryCount,
podEvictionTimeout: podEvictionTimeout,
deletingPodsRateLimiter: deletingPodsRateLimiter,
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
lookupIP: net.LookupIP,
now: util.Now,
clusterCIDR: clusterCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
cloud: cloud,
kubeClient: kubeClient,
recorder: recorder,
registerRetryCount: registerRetryCount,
podEvictionTimeout: podEvictionTimeout,
podEvictor: podEvictor,
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
lookupIP: net.LookupIP,
now: util.Now,
clusterCIDR: clusterCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
}
}
@ -179,6 +186,10 @@ func (nc *NodeController) Run(period time.Duration) {
glog.Errorf("Error monitoring node status: %v", err)
}
}, nc.nodeMonitorPeriod)
go util.Forever(func() {
nc.podEvictor.TryEvict(func(nodeName string) { nc.deletePods(nodeName) })
}, nodeEvictionPeriod)
}
func (nc *NodeController) recordNodeEvent(node *api.Node, event string) {
@ -366,24 +377,19 @@ func (nc *NodeController) monitorNodeStatus() error {
// Check eviction timeout.
if lastReadyCondition.Status == api.ConditionFalse &&
nc.now().After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
// Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node.
// Makes sure we are not removing pods from too many nodes in the same time.
glog.Infof("Evicting pods: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
if nc.deletingPodsRateLimiter.CanAccept() {
if err := nc.deletePods(node.Name); err != nil {
glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err)
}
if nc.podEvictor.AddNodeToEvict(node.Name) {
glog.Infof("Adding pods to evict: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
}
}
if lastReadyCondition.Status == api.ConditionUnknown &&
nc.now().After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) {
// Same as above. Note however, since condition unknown is posted by node controller, which means we
// need to substract monitoring grace period in order to get the real 'podEvictionTimeout'.
glog.Infof("Evicting pods2: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
if nc.deletingPodsRateLimiter.CanAccept() {
if err := nc.deletePods(node.Name); err != nil {
glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err)
}
if nc.podEvictor.AddNodeToEvict(node.Name) {
glog.Infof("Adding pods to evict2: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
}
}
if lastReadyCondition.Status == api.ConditionTrue {
if nc.podEvictor.RemoveNodeToEvict(node.Name) {
glog.Infof("Pods on %v won't be evicted", node.Name)
}
}

View File

@ -324,8 +324,9 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}
for _, item := range table {
podEvictor := NewPodEvictor(util.NewFakeRateLimiter())
nodeController := NewNodeController(nil, item.fakeNodeHandler, 10,
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
evictionTimeout, podEvictor, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
@ -338,12 +339,15 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
podEvictor.TryEvict(func(nodeName string) { nodeController.deletePods(nodeName) })
podEvicted := false
for _, action := range item.fakeNodeHandler.Actions {
if action.Action == "delete-pod" {
podEvicted = true
}
}
if item.expectedEvictPods != podEvicted {
t.Errorf("expected pod eviction: %+v, got %+v for %+v", item.expectedEvictPods,
podEvicted, item.description)
@ -527,7 +531,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
}
for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 10, 5*time.Minute, util.NewFakeRateLimiter(),
nodeController := NewNodeController(nil, item.fakeNodeHandler, 10, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {

View File

@ -0,0 +1,129 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodecontroller
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
// A FIFO queue which additionally guarantees that any element can be added only once until
// it is removed.
type UniqueQueue struct {
lock sync.Mutex
queue []string
set util.StringSet
}
// Entity responsible for evicting Pods from inserted Nodes. It uses RateLimiter to avoid
// evicting everything at once. Note that we rate limit eviction of Nodes not individual Pods.
type PodEvictor struct {
queue UniqueQueue
deletingPodsRateLimiter util.RateLimiter
}
// Adds a new value to the queue if it wasn't added before, or was explicitly removed by the
// Remove call. Returns true if new value was added.
func (q *UniqueQueue) Add(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
if !q.set.Has(value) {
q.queue = append(q.queue, value)
q.set.Insert(value)
return true
} else {
return false
}
}
// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
// of the given value. If the value is not present does nothing and returns false.
func (q *UniqueQueue) Remove(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
q.set.Delete(value)
for i, val := range q.queue {
if val == value {
if i > 0 && i < len(q.queue)-1 {
q.queue = append(q.queue[0:i], q.queue[i+1:len(q.queue)]...)
} else if i > 0 {
q.queue = q.queue[0 : len(q.queue)-1]
} else {
q.queue = q.queue[1:len(q.queue)]
}
return true
}
}
return false
}
// Returns the oldest added value that wasn't returned yet.
func (q *UniqueQueue) Get() (string, bool) {
q.lock.Lock()
defer q.lock.Unlock()
if len(q.queue) == 0 {
return "", false
}
result := q.queue[0]
q.queue = q.queue[1:len(q.queue)]
return result, true
}
// Creates new PodEvictor which will use given RateLimiter to oversee eviction.
func NewPodEvictor(deletingPodsRateLimiter util.RateLimiter) *PodEvictor {
return &PodEvictor{
queue: UniqueQueue{
queue: make([]string, 0),
set: util.NewStringSet(),
},
deletingPodsRateLimiter: deletingPodsRateLimiter,
}
}
// Tries to evict all Pods from previously inserted Nodes. Ends prematurely if RateLimiter forbids any eviction.
// Each Node is processed only once, as long as it's not Removed, i.e. calling multiple AddNodeToEvict does not result
// with multiple evictions as long as RemoveNodeToEvict is not called.
func (pe *PodEvictor) TryEvict(delFunc func(string)) {
val, ok := pe.queue.Get()
for ok {
if pe.deletingPodsRateLimiter.CanAccept() {
glog.Infof("PodEvictor is evicting Pods on Node: %v", val)
delFunc(val)
} else {
glog.V(1).Info("PodEvictor is rate limitted.")
break
}
val, ok = pe.queue.Get()
}
}
// Adds Node to the Evictor to be processed later. Won't add the same Node second time if it was already
// added and not removed.
func (pe *PodEvictor) AddNodeToEvict(nodeName string) bool {
return pe.queue.Add(nodeName)
}
// Removes Node from the Evictor. The Node won't be processed until added again.
func (pe *PodEvictor) RemoveNodeToEvict(nodeName string) bool {
return pe.queue.Remove(nodeName)
}

View File

@ -0,0 +1,146 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodecontroller
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func CheckQueueEq(lhs, rhs []string) bool {
for i := 0; i < len(lhs); i++ {
if rhs[i] != lhs[i] {
return false
}
}
return true
}
func CheckSetEq(lhs, rhs util.StringSet) bool {
return lhs.HasAll(rhs.List()...) && rhs.HasAll(lhs.List()...)
}
func TestAddNode(t *testing.T) {
evictor := NewPodEvictor(util.NewFakeRateLimiter())
evictor.AddNodeToEvict("first")
evictor.AddNodeToEvict("second")
evictor.AddNodeToEvict("third")
queuePattern := []string{"first", "second", "third"}
if len(evictor.queue.queue) != len(queuePattern) {
t.Fatalf("Queue %v should have lenght %d", evictor.queue.queue, len(queuePattern))
}
if !CheckQueueEq(queuePattern, evictor.queue.queue) {
t.Errorf("Invalid queue. Got %v, expected %v", evictor.queue.queue, queuePattern)
}
setPattern := util.NewStringSet("first", "second", "third")
if len(evictor.queue.set) != len(setPattern) {
t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern))
}
if !CheckSetEq(setPattern, evictor.queue.set) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
}
}
func TestDelNode(t *testing.T) {
evictor := NewPodEvictor(util.NewFakeRateLimiter())
evictor.AddNodeToEvict("first")
evictor.AddNodeToEvict("second")
evictor.AddNodeToEvict("third")
evictor.RemoveNodeToEvict("first")
queuePattern := []string{"second", "third"}
if len(evictor.queue.queue) != len(queuePattern) {
t.Fatalf("Queue %v should have length %d", evictor.queue.queue, len(queuePattern))
}
if !CheckQueueEq(queuePattern, evictor.queue.queue) {
t.Errorf("Invalid queue. Got %v, expected %v", evictor.queue.queue, queuePattern)
}
setPattern := util.NewStringSet("second", "third")
if len(evictor.queue.set) != len(setPattern) {
t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern))
}
if !CheckSetEq(setPattern, evictor.queue.set) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
}
evictor = NewPodEvictor(util.NewFakeRateLimiter())
evictor.AddNodeToEvict("first")
evictor.AddNodeToEvict("second")
evictor.AddNodeToEvict("third")
evictor.RemoveNodeToEvict("second")
queuePattern = []string{"first", "third"}
if len(evictor.queue.queue) != len(queuePattern) {
t.Fatalf("Queue %v should have lenght %d", evictor.queue.queue, len(queuePattern))
}
if !CheckQueueEq(queuePattern, evictor.queue.queue) {
t.Errorf("Invalid queue. Got %v, expected %v", evictor.queue.queue, queuePattern)
}
setPattern = util.NewStringSet("first", "third")
if len(evictor.queue.set) != len(setPattern) {
t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern))
}
if !CheckSetEq(setPattern, evictor.queue.set) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
}
evictor = NewPodEvictor(util.NewFakeRateLimiter())
evictor.AddNodeToEvict("first")
evictor.AddNodeToEvict("second")
evictor.AddNodeToEvict("third")
evictor.RemoveNodeToEvict("third")
queuePattern = []string{"first", "second"}
if len(evictor.queue.queue) != len(queuePattern) {
t.Fatalf("Queue %v should have lenght %d", evictor.queue.queue, len(queuePattern))
}
if !CheckQueueEq(queuePattern, evictor.queue.queue) {
t.Errorf("Invalid queue. Got %v, expected %v", evictor.queue.queue, queuePattern)
}
setPattern = util.NewStringSet("first", "second")
if len(evictor.queue.set) != len(setPattern) {
t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern))
}
if !CheckSetEq(setPattern, evictor.queue.set) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
}
}
func TestEvictNode(t *testing.T) {
evictor := NewPodEvictor(util.NewFakeRateLimiter())
evictor.AddNodeToEvict("first")
evictor.AddNodeToEvict("second")
evictor.AddNodeToEvict("third")
evictor.RemoveNodeToEvict("second")
deletedMap := util.NewStringSet()
evictor.TryEvict(func(nodeName string) { deletedMap.Insert(nodeName) })
setPattern := util.NewStringSet("first", "third")
if len(deletedMap) != len(setPattern) {
t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern))
}
if !CheckSetEq(setPattern, deletedMap) {
t.Errorf("Invalid map. Got %v, expected %v", deletedMap, setPattern)
}
}