mirror of https://github.com/k3s-io/k3s
Evict terminating pods
parent
5dfc904c18
commit
8a62f1828d
|
@ -191,7 +191,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
|
|||
// TODO: Write an integration test for the replication controllers watch.
|
||||
go controllerManager.Run(3, util.NeverStop)
|
||||
|
||||
nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewFakeRateLimiter()),
|
||||
nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, util.NewFakeRateLimiter(),
|
||||
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
|
||||
nodeController.Run(5 * time.Second)
|
||||
cadvisorInterface := new(cadvisor.Fake)
|
||||
|
|
|
@ -199,7 +199,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||
}
|
||||
|
||||
nodeController := nodecontroller.NewNodeController(cloud, kubeClient,
|
||||
s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)),
|
||||
s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
|
||||
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, &s.ClusterCIDR, s.AllocateNodeCIDRs)
|
||||
nodeController.Run(s.NodeSyncPeriod)
|
||||
|
||||
|
|
|
@ -123,7 +123,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||
}
|
||||
|
||||
nodeController := nodecontroller.NewNodeController(cloud, kubeClient,
|
||||
s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)),
|
||||
s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
|
||||
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs)
|
||||
nodeController.Run(s.NodeSyncPeriod)
|
||||
|
||||
|
|
|
@ -89,8 +89,11 @@ type NodeController struct {
|
|||
nodeStatusMap map[string]nodeStatusData
|
||||
now func() util.Time
|
||||
// worker that evicts pods from unresponsive nodes.
|
||||
podEvictor *PodEvictor
|
||||
podEvictor *RateLimitedTimedQueue
|
||||
terminationEvictor *RateLimitedTimedQueue
|
||||
podEvictionTimeout time.Duration
|
||||
// The maximum duration before a pod evicted from a node can be forcefully terminated.
|
||||
maximumGracePeriod time.Duration
|
||||
recorder record.EventRecorder
|
||||
}
|
||||
|
||||
|
@ -99,7 +102,7 @@ func NewNodeController(
|
|||
cloud cloudprovider.Interface,
|
||||
kubeClient client.Interface,
|
||||
podEvictionTimeout time.Duration,
|
||||
podEvictor *PodEvictor,
|
||||
podEvictionLimiter util.RateLimiter,
|
||||
nodeMonitorGracePeriod time.Duration,
|
||||
nodeStartupGracePeriod time.Duration,
|
||||
nodeMonitorPeriod time.Duration,
|
||||
|
@ -123,7 +126,9 @@ func NewNodeController(
|
|||
kubeClient: kubeClient,
|
||||
recorder: recorder,
|
||||
podEvictionTimeout: podEvictionTimeout,
|
||||
podEvictor: podEvictor,
|
||||
maximumGracePeriod: 5 * time.Minute,
|
||||
podEvictor: NewRateLimitedTimedQueue(podEvictionLimiter, false),
|
||||
terminationEvictor: NewRateLimitedTimedQueue(podEvictionLimiter, false),
|
||||
nodeStatusMap: make(map[string]nodeStatusData),
|
||||
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
|
||||
nodeMonitorPeriod: nodeMonitorPeriod,
|
||||
|
@ -145,38 +150,43 @@ func (nc *NodeController) Run(period time.Duration) {
|
|||
}, nc.nodeMonitorPeriod, util.NeverStop)
|
||||
|
||||
go util.Until(func() {
|
||||
nc.podEvictor.TryEvict(func(nodeName string) { nc.deletePods(nodeName) })
|
||||
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||
remaining, err := nc.deletePods(value.Value)
|
||||
if err != nil {
|
||||
util.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
|
||||
return false, 0
|
||||
}
|
||||
if remaining {
|
||||
nc.terminationEvictor.Add(value.Value)
|
||||
}
|
||||
return true, 0
|
||||
})
|
||||
}, nodeEvictionPeriod, util.NeverStop)
|
||||
}
|
||||
|
||||
// We observed a Node deletion in etcd. Currently we only need to remove Pods that
|
||||
// were assigned to it.
|
||||
func (nc *NodeController) deleteNode(nodeID string) error {
|
||||
return nc.deletePods(nodeID)
|
||||
}
|
||||
// TODO: replace with a controller that ensures pods that are terminating complete
|
||||
// in a particular time period
|
||||
go util.Until(func() {
|
||||
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||
completed, remaining, err := nc.terminatePods(value.Value, value.Added)
|
||||
if err != nil {
|
||||
util.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
|
||||
return false, 0
|
||||
}
|
||||
|
||||
// deletePods will delete all pods from master running on given node.
|
||||
func (nc *NodeController) deletePods(nodeID string) error {
|
||||
glog.V(2).Infof("Delete all pods from %v", nodeID)
|
||||
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(),
|
||||
fields.OneTermEqualSelector(client.PodHost, nodeID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nc.recordNodeEvent(nodeID, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeID))
|
||||
for _, pod := range pods.Items {
|
||||
// Defensive check, also needed for tests.
|
||||
if pod.Spec.NodeName != nodeID {
|
||||
continue
|
||||
}
|
||||
glog.V(2).Infof("Delete pod %v", pod.Name)
|
||||
nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeID)
|
||||
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
|
||||
glog.Errorf("Error deleting pod %v: %v", pod.Name, err)
|
||||
}
|
||||
}
|
||||
if completed {
|
||||
glog.V(2).Infof("All pods terminated on %s", value.Value)
|
||||
nc.recordNodeEvent(value.Value, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
|
||||
return true, 0
|
||||
}
|
||||
|
||||
return nil
|
||||
glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.Added, value.Value, remaining)
|
||||
// clamp very short intervals
|
||||
if remaining < nodeEvictionPeriod {
|
||||
remaining = nodeEvictionPeriod
|
||||
}
|
||||
return false, remaining
|
||||
})
|
||||
}, nodeEvictionPeriod, util.NeverStop)
|
||||
}
|
||||
|
||||
// Generates num pod CIDRs that could be assigned to nodes.
|
||||
|
@ -232,7 +242,7 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||
for node := range deleted {
|
||||
glog.V(1).Infof("NodeController observed a Node deletion: %v", node)
|
||||
nc.recordNodeEvent(node, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", node))
|
||||
nc.deleteNode(node)
|
||||
nc.podEvictor.Add(node)
|
||||
nc.knownNodeSet.Delete(node)
|
||||
}
|
||||
}
|
||||
|
@ -274,18 +284,20 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||
// Check eviction timeout against decisionTimestamp
|
||||
if lastReadyCondition.Status == api.ConditionFalse &&
|
||||
decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
|
||||
if nc.podEvictor.AddNodeToEvict(node.Name) {
|
||||
if nc.podEvictor.Add(node.Name) {
|
||||
glog.Infof("Adding pods to evict: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
|
||||
}
|
||||
}
|
||||
if lastReadyCondition.Status == api.ConditionUnknown &&
|
||||
decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) {
|
||||
if nc.podEvictor.AddNodeToEvict(node.Name) {
|
||||
if nc.podEvictor.Add(node.Name) {
|
||||
glog.Infof("Adding pods to evict2: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
|
||||
}
|
||||
}
|
||||
if lastReadyCondition.Status == api.ConditionTrue {
|
||||
if nc.podEvictor.RemoveNodeToEvict(node.Name) {
|
||||
wasDeleting := nc.podEvictor.Remove(node.Name)
|
||||
wasTerminating := nc.terminationEvictor.Remove(node.Name)
|
||||
if wasDeleting || wasTerminating {
|
||||
glog.Infof("Pods on %v won't be evicted", node.Name)
|
||||
}
|
||||
}
|
||||
|
@ -305,11 +317,20 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||
}
|
||||
if _, err := instances.ExternalID(node.Name); err != nil && err == cloudprovider.InstanceNotFound {
|
||||
glog.Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
|
||||
nc.recordNodeEvent(node.Name, "DeleteingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
|
||||
if err := nc.deletePods(node.Name); err != nil {
|
||||
glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err)
|
||||
nc.recordNodeEvent(node.Name, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
|
||||
|
||||
remaining, err := nc.hasPods(node.Name)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to determine whether node %s has pods, will retry: %v", node.Name, err)
|
||||
continue
|
||||
}
|
||||
if remaining {
|
||||
// queue eviction of the pods on the node
|
||||
glog.Infof("Deleting node %s is delayed while pods are evicted", node.Name)
|
||||
nc.podEvictor.Add(node.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := nc.kubeClient.Nodes().Delete(node.Name); err != nil {
|
||||
glog.Errorf("Unable to delete node %s: %v", node.Name, err)
|
||||
continue
|
||||
|
@ -505,3 +526,96 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
|
|||
|
||||
return gracePeriod, lastReadyCondition, readyCondition, err
|
||||
}
|
||||
|
||||
// returns true if the provided node still has pods scheduled to it, or an error if
|
||||
// the server could not be contacted.
|
||||
func (nc *NodeController) hasPods(nodeID string) (bool, error) {
|
||||
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.OneTermEqualSelector(client.PodHost, nodeID))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return len(pods.Items) > 0, nil
|
||||
}
|
||||
|
||||
// deletePods will delete all pods from master running on given node, and return true
|
||||
// if any pods were deleted.
|
||||
func (nc *NodeController) deletePods(nodeID string) (bool, error) {
|
||||
remaining := false
|
||||
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.OneTermEqualSelector(client.PodHost, nodeID))
|
||||
if err != nil {
|
||||
return remaining, err
|
||||
}
|
||||
|
||||
if len(pods.Items) > 0 {
|
||||
nc.recordNodeEvent(nodeID, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeID))
|
||||
}
|
||||
|
||||
for _, pod := range pods.Items {
|
||||
// Defensive check, also needed for tests.
|
||||
if pod.Spec.NodeName != nodeID {
|
||||
continue
|
||||
}
|
||||
// if the pod has already been deleted, ignore it
|
||||
if pod.DeletionGracePeriodSeconds != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Delete pod %v", pod.Name)
|
||||
nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeID)
|
||||
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
|
||||
return false, err
|
||||
}
|
||||
remaining = true
|
||||
}
|
||||
return remaining, nil
|
||||
}
|
||||
|
||||
// terminatePods will ensure all pods on the given node that are in terminating state are eventually
|
||||
// cleaned up
|
||||
func (nc *NodeController) terminatePods(nodeID string, since time.Time) (bool, time.Duration, error) {
|
||||
remaining := time.Duration(0)
|
||||
complete := true
|
||||
|
||||
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(),
|
||||
fields.OneTermEqualSelector(client.PodHost, nodeID))
|
||||
if err != nil {
|
||||
return false, remaining, err
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
elapsed := now.Sub(since)
|
||||
for _, pod := range pods.Items {
|
||||
// Defensive check, also needed for tests.
|
||||
if pod.Spec.NodeName != nodeID {
|
||||
continue
|
||||
}
|
||||
// only clean terminated pods
|
||||
if pod.DeletionGracePeriodSeconds == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
|
||||
if grace > nc.maximumGracePeriod {
|
||||
grace = nc.maximumGracePeriod
|
||||
}
|
||||
next := grace - elapsed
|
||||
|
||||
if next < 0 {
|
||||
next = 0
|
||||
glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace)
|
||||
nc.recordNodeEvent(nodeID, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeID))
|
||||
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil {
|
||||
glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err)
|
||||
complete = false
|
||||
}
|
||||
} else {
|
||||
glog.V(2).Infof("Pod %v still terminating with %s remaining", pod.Name, next)
|
||||
complete = false
|
||||
}
|
||||
|
||||
if remaining < next {
|
||||
remaining = next
|
||||
}
|
||||
}
|
||||
return complete, remaining, nil
|
||||
}
|
||||
|
|
|
@ -324,9 +324,8 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, item := range table {
|
||||
podEvictor := NewPodEvictor(util.NewFakeRateLimiter())
|
||||
nodeController := NewNodeController(nil, item.fakeNodeHandler,
|
||||
evictionTimeout, podEvictor, testNodeMonitorGracePeriod,
|
||||
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
|
||||
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
|
||||
nodeController.now = func() util.Time { return fakeNow }
|
||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
||||
|
@ -340,7 +339,17 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
|
|||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
podEvictor.TryEvict(func(nodeName string) { nodeController.deletePods(nodeName) })
|
||||
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||
remaining, _ := nodeController.deletePods(value.Value)
|
||||
if remaining {
|
||||
nodeController.terminationEvictor.Add(value.Value)
|
||||
}
|
||||
return true, 0
|
||||
})
|
||||
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||
nodeController.terminatePods(value.Value, value.Added)
|
||||
return true, 0
|
||||
})
|
||||
podEvicted := false
|
||||
for _, action := range item.fakeNodeHandler.Actions() {
|
||||
if action.GetVerb() == "delete" && action.GetResource() == "pods" {
|
||||
|
@ -531,7 +540,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()),
|
||||
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(),
|
||||
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
|
||||
nodeController.now = func() util.Time { return fakeNow }
|
||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
||||
|
@ -610,7 +619,7 @@ func TestNodeDeletion(t *testing.T) {
|
|||
Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
|
||||
}
|
||||
|
||||
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()),
|
||||
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(),
|
||||
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
|
||||
nodeController.now = func() util.Time { return fakeNow }
|
||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
||||
|
@ -620,6 +629,10 @@ func TestNodeDeletion(t *testing.T) {
|
|||
if err := nodeController.monitorNodeStatus(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||
nodeController.deletePods(value.Value)
|
||||
return true, 0
|
||||
})
|
||||
podEvicted := false
|
||||
for _, action := range fakeNodeHandler.Actions() {
|
||||
if action.GetVerb() == "delete" && action.GetResource() == "pods" {
|
||||
|
|
|
@ -1,129 +0,0 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package nodecontroller
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// A FIFO queue which additionally guarantees that any element can be added only once until
|
||||
// it is removed.
|
||||
type UniqueQueue struct {
|
||||
lock sync.Mutex
|
||||
queue []string
|
||||
set util.StringSet
|
||||
}
|
||||
|
||||
// Entity responsible for evicting Pods from inserted Nodes. It uses RateLimiter to avoid
|
||||
// evicting everything at once. Note that we rate limit eviction of Nodes not individual Pods.
|
||||
type PodEvictor struct {
|
||||
queue UniqueQueue
|
||||
deletingPodsRateLimiter util.RateLimiter
|
||||
}
|
||||
|
||||
// Adds a new value to the queue if it wasn't added before, or was explicitly removed by the
|
||||
// Remove call. Returns true if new value was added.
|
||||
func (q *UniqueQueue) Add(value string) bool {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
if !q.set.Has(value) {
|
||||
q.queue = append(q.queue, value)
|
||||
q.set.Insert(value)
|
||||
return true
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
|
||||
// of the given value. If the value is not present does nothing and returns false.
|
||||
func (q *UniqueQueue) Remove(value string) bool {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
q.set.Delete(value)
|
||||
for i, val := range q.queue {
|
||||
if val == value {
|
||||
if i > 0 && i < len(q.queue)-1 {
|
||||
q.queue = append(q.queue[0:i], q.queue[i+1:len(q.queue)]...)
|
||||
} else if i > 0 {
|
||||
q.queue = q.queue[0 : len(q.queue)-1]
|
||||
} else {
|
||||
q.queue = q.queue[1:len(q.queue)]
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Returns the oldest added value that wasn't returned yet.
|
||||
func (q *UniqueQueue) Get() (string, bool) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
if len(q.queue) == 0 {
|
||||
return "", false
|
||||
}
|
||||
|
||||
result := q.queue[0]
|
||||
q.queue = q.queue[1:len(q.queue)]
|
||||
return result, true
|
||||
}
|
||||
|
||||
// Creates new PodEvictor which will use given RateLimiter to oversee eviction.
|
||||
func NewPodEvictor(deletingPodsRateLimiter util.RateLimiter) *PodEvictor {
|
||||
return &PodEvictor{
|
||||
queue: UniqueQueue{
|
||||
queue: make([]string, 0),
|
||||
set: util.NewStringSet(),
|
||||
},
|
||||
deletingPodsRateLimiter: deletingPodsRateLimiter,
|
||||
}
|
||||
}
|
||||
|
||||
// Tries to evict all Pods from previously inserted Nodes. Ends prematurely if RateLimiter forbids any eviction.
|
||||
// Each Node is processed only once, as long as it's not Removed, i.e. calling multiple AddNodeToEvict does not result
|
||||
// with multiple evictions as long as RemoveNodeToEvict is not called.
|
||||
func (pe *PodEvictor) TryEvict(delFunc func(string)) {
|
||||
val, ok := pe.queue.Get()
|
||||
for ok {
|
||||
if pe.deletingPodsRateLimiter.CanAccept() {
|
||||
glog.Infof("PodEvictor is evicting Pods on Node: %v", val)
|
||||
delFunc(val)
|
||||
} else {
|
||||
glog.V(1).Info("PodEvictor is rate limitted.")
|
||||
break
|
||||
}
|
||||
val, ok = pe.queue.Get()
|
||||
}
|
||||
}
|
||||
|
||||
// Adds Node to the Evictor to be processed later. Won't add the same Node second time if it was already
|
||||
// added and not removed.
|
||||
func (pe *PodEvictor) AddNodeToEvict(nodeName string) bool {
|
||||
return pe.queue.Add(nodeName)
|
||||
}
|
||||
|
||||
// Removes Node from the Evictor. The Node won't be processed until added again.
|
||||
func (pe *PodEvictor) RemoveNodeToEvict(nodeName string) bool {
|
||||
return pe.queue.Remove(nodeName)
|
||||
}
|
|
@ -0,0 +1,177 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package nodecontroller
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
// TimedValue is a value that should be processed at a designated time.
|
||||
type TimedValue struct {
|
||||
Value string
|
||||
Added time.Time
|
||||
Next time.Time
|
||||
}
|
||||
|
||||
// now is used to test time
|
||||
var now func() time.Time = time.Now
|
||||
|
||||
// TimedQueue is a priority heap where the lowest Next is at the front of the queue
|
||||
type TimedQueue []*TimedValue
|
||||
|
||||
func (h TimedQueue) Len() int { return len(h) }
|
||||
func (h TimedQueue) Less(i, j int) bool { return h[i].Next.Before(h[j].Next) }
|
||||
func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
||||
|
||||
func (h *TimedQueue) Push(x interface{}) {
|
||||
*h = append(*h, x.(*TimedValue))
|
||||
}
|
||||
|
||||
func (h *TimedQueue) Pop() interface{} {
|
||||
old := *h
|
||||
n := len(old)
|
||||
x := old[n-1]
|
||||
*h = old[0 : n-1]
|
||||
return x
|
||||
}
|
||||
|
||||
// A FIFO queue which additionally guarantees that any element can be added only once until
|
||||
// it is removed.
|
||||
type UniqueQueue struct {
|
||||
lock sync.Mutex
|
||||
queue TimedQueue
|
||||
set util.StringSet
|
||||
}
|
||||
|
||||
// Adds a new value to the queue if it wasn't added before, or was explicitly removed by the
|
||||
// Remove call. Returns true if new value was added.
|
||||
func (q *UniqueQueue) Add(value TimedValue) bool {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
if q.set.Has(value.Value) {
|
||||
return false
|
||||
}
|
||||
heap.Push(&q.queue, &value)
|
||||
q.set.Insert(value.Value)
|
||||
return true
|
||||
}
|
||||
|
||||
// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
|
||||
// of the given value. If the value is not present does nothing and returns false.
|
||||
func (q *UniqueQueue) Remove(value string) bool {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
q.set.Delete(value)
|
||||
for i, val := range q.queue {
|
||||
if val.Value == value {
|
||||
heap.Remove(&q.queue, i)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Returns the oldest added value that wasn't returned yet.
|
||||
func (q *UniqueQueue) Get() (TimedValue, bool) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
if len(q.queue) == 0 {
|
||||
return TimedValue{}, false
|
||||
}
|
||||
result := heap.Pop(&q.queue).(*TimedValue)
|
||||
q.set.Delete(result.Value)
|
||||
return *result, true
|
||||
}
|
||||
|
||||
// RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time
|
||||
// of execution. It is also rate limited.
|
||||
type RateLimitedTimedQueue struct {
|
||||
queue UniqueQueue
|
||||
limiter util.RateLimiter
|
||||
leak bool
|
||||
}
|
||||
|
||||
// Creates new queue which will use given RateLimiter to oversee execution. If leak is true,
|
||||
// items which are rate limited will be leakped. Otherwise, rate limited items will be requeued.
|
||||
func NewRateLimitedTimedQueue(limiter util.RateLimiter, leak bool) *RateLimitedTimedQueue {
|
||||
return &RateLimitedTimedQueue{
|
||||
queue: UniqueQueue{
|
||||
queue: TimedQueue{},
|
||||
set: util.NewStringSet(),
|
||||
},
|
||||
limiter: limiter,
|
||||
leak: leak,
|
||||
}
|
||||
}
|
||||
|
||||
// ActionFunc takes a timed value and returns false if the item must be retried, with an optional
|
||||
// time.Duration if some minimum wait interval should be used.
|
||||
type ActionFunc func(TimedValue) (bool, time.Duration)
|
||||
|
||||
// Try processes the queue. Ends prematurely if RateLimiter forbids an action and leak is true.
|
||||
// Otherwise, requeues the item to be processed. Each value is processed once if fn returns true,
|
||||
// otherwise it is added back to the queue. The returned remaining is used to identify the minimum
|
||||
// time to execute the next item in the queue.
|
||||
func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
|
||||
val, ok := q.queue.Get()
|
||||
for ok {
|
||||
// rate limit the queue checking
|
||||
if q.leak {
|
||||
if !q.limiter.CanAccept() {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
q.limiter.Accept()
|
||||
}
|
||||
|
||||
now := now()
|
||||
if now.Before(val.Next) {
|
||||
q.queue.Add(val)
|
||||
val, ok = q.queue.Get()
|
||||
// we do not sleep here because other values may be added at the front of the queue
|
||||
continue
|
||||
}
|
||||
|
||||
if ok, wait := fn(val); !ok {
|
||||
val.Next = now.Add(wait + 1)
|
||||
q.queue.Add(val)
|
||||
}
|
||||
val, ok = q.queue.Get()
|
||||
}
|
||||
}
|
||||
|
||||
// Adds value to the queue to be processed. Won't add the same value a second time if it was already
|
||||
// added and not removed.
|
||||
func (q *RateLimitedTimedQueue) Add(value string) bool {
|
||||
now := now()
|
||||
return q.queue.Add(TimedValue{
|
||||
Value: value,
|
||||
Added: now,
|
||||
Next: now,
|
||||
})
|
||||
}
|
||||
|
||||
// Removes Node from the Evictor. The Node won't be processed until added again.
|
||||
func (q *RateLimitedTimedQueue) Remove(value string) bool {
|
||||
return q.queue.Remove(value)
|
||||
}
|
|
@ -17,14 +17,16 @@ limitations under the License.
|
|||
package nodecontroller
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
func CheckQueueEq(lhs, rhs []string) bool {
|
||||
func CheckQueueEq(lhs []string, rhs TimedQueue) bool {
|
||||
for i := 0; i < len(lhs); i++ {
|
||||
if rhs[i] != lhs[i] {
|
||||
if rhs[i].Value != lhs[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -36,10 +38,10 @@ func CheckSetEq(lhs, rhs util.StringSet) bool {
|
|||
}
|
||||
|
||||
func TestAddNode(t *testing.T) {
|
||||
evictor := NewPodEvictor(util.NewFakeRateLimiter())
|
||||
evictor.AddNodeToEvict("first")
|
||||
evictor.AddNodeToEvict("second")
|
||||
evictor.AddNodeToEvict("third")
|
||||
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
|
||||
evictor.Add("first")
|
||||
evictor.Add("second")
|
||||
evictor.Add("third")
|
||||
|
||||
queuePattern := []string{"first", "second", "third"}
|
||||
if len(evictor.queue.queue) != len(queuePattern) {
|
||||
|
@ -59,11 +61,11 @@ func TestAddNode(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestDelNode(t *testing.T) {
|
||||
evictor := NewPodEvictor(util.NewFakeRateLimiter())
|
||||
evictor.AddNodeToEvict("first")
|
||||
evictor.AddNodeToEvict("second")
|
||||
evictor.AddNodeToEvict("third")
|
||||
evictor.RemoveNodeToEvict("first")
|
||||
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
|
||||
evictor.Add("first")
|
||||
evictor.Add("second")
|
||||
evictor.Add("third")
|
||||
evictor.Remove("first")
|
||||
|
||||
queuePattern := []string{"second", "third"}
|
||||
if len(evictor.queue.queue) != len(queuePattern) {
|
||||
|
@ -81,11 +83,11 @@ func TestDelNode(t *testing.T) {
|
|||
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
|
||||
}
|
||||
|
||||
evictor = NewPodEvictor(util.NewFakeRateLimiter())
|
||||
evictor.AddNodeToEvict("first")
|
||||
evictor.AddNodeToEvict("second")
|
||||
evictor.AddNodeToEvict("third")
|
||||
evictor.RemoveNodeToEvict("second")
|
||||
evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
|
||||
evictor.Add("first")
|
||||
evictor.Add("second")
|
||||
evictor.Add("third")
|
||||
evictor.Remove("second")
|
||||
|
||||
queuePattern = []string{"first", "third"}
|
||||
if len(evictor.queue.queue) != len(queuePattern) {
|
||||
|
@ -103,11 +105,11 @@ func TestDelNode(t *testing.T) {
|
|||
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
|
||||
}
|
||||
|
||||
evictor = NewPodEvictor(util.NewFakeRateLimiter())
|
||||
evictor.AddNodeToEvict("first")
|
||||
evictor.AddNodeToEvict("second")
|
||||
evictor.AddNodeToEvict("third")
|
||||
evictor.RemoveNodeToEvict("third")
|
||||
evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
|
||||
evictor.Add("first")
|
||||
evictor.Add("second")
|
||||
evictor.Add("third")
|
||||
evictor.Remove("third")
|
||||
|
||||
queuePattern = []string{"first", "second"}
|
||||
if len(evictor.queue.queue) != len(queuePattern) {
|
||||
|
@ -126,15 +128,18 @@ func TestDelNode(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEvictNode(t *testing.T) {
|
||||
evictor := NewPodEvictor(util.NewFakeRateLimiter())
|
||||
evictor.AddNodeToEvict("first")
|
||||
evictor.AddNodeToEvict("second")
|
||||
evictor.AddNodeToEvict("third")
|
||||
evictor.RemoveNodeToEvict("second")
|
||||
func TestTry(t *testing.T) {
|
||||
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
|
||||
evictor.Add("first")
|
||||
evictor.Add("second")
|
||||
evictor.Add("third")
|
||||
evictor.Remove("second")
|
||||
|
||||
deletedMap := util.NewStringSet()
|
||||
evictor.TryEvict(func(nodeName string) { deletedMap.Insert(nodeName) })
|
||||
evictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||
deletedMap.Insert(value.Value)
|
||||
return true, 0
|
||||
})
|
||||
|
||||
setPattern := util.NewStringSet("first", "third")
|
||||
if len(deletedMap) != len(setPattern) {
|
||||
|
@ -144,3 +149,35 @@ func TestEvictNode(t *testing.T) {
|
|||
t.Errorf("Invalid map. Got %v, expected %v", deletedMap, setPattern)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTryOrdering(t *testing.T) {
|
||||
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), false)
|
||||
evictor.Add("first")
|
||||
evictor.Add("second")
|
||||
evictor.Add("third")
|
||||
|
||||
order := []string{}
|
||||
count := 0
|
||||
queued := false
|
||||
evictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||
count++
|
||||
if value.Added.IsZero() {
|
||||
t.Fatalf("added should not be zero")
|
||||
}
|
||||
if value.Next.IsZero() {
|
||||
t.Fatalf("next should not be zero")
|
||||
}
|
||||
if !queued && value.Value == "second" {
|
||||
queued = true
|
||||
return false, time.Millisecond
|
||||
}
|
||||
order = append(order, value.Value)
|
||||
return true, 0
|
||||
})
|
||||
if !reflect.DeepEqual(order, []string{"first", "third", "second"}) {
|
||||
t.Fatalf("order was wrong: %v", order)
|
||||
}
|
||||
if count != 4 {
|
||||
t.Fatalf("unexpected iterations: %d", count)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue