fix node controller event uid issue

pull/6/head
AdoHe 2016-08-14 09:41:20 +08:00
parent 874a186932
commit b2ab4c6d9b
6 changed files with 192 additions and 50 deletions

View File

@ -83,7 +83,7 @@ func cleanupOrphanedPods(pods []*api.Pod, nodeStore cache.Store, forcefulDeleteP
// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted.
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, daemonStore cache.StoreToDaemonSetLister) (bool, error) {
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore cache.StoreToDaemonSetLister) (bool, error) {
remaining := false
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
@ -93,7 +93,7 @@ func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n
}
if len(pods.Items) > 0 {
recordNodeEvent(recorder, nodeName, api.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
recordNodeEvent(recorder, nodeName, nodeUID, api.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
}
for _, pod := range pods.Items {
@ -257,11 +257,11 @@ func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName string) (
return true, nil
}
func recordNodeEvent(recorder record.EventRecorder, nodeName, eventtype, reason, event string) {
func recordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
ref := &api.ObjectReference{
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeName),
UID: types.UID(nodeUID),
Namespace: "",
}
glog.V(2).Infof("Recording %s event message for node %s", event, nodeName)
@ -272,7 +272,7 @@ func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_s
ref := &api.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: types.UID(node.Name),
UID: node.UID,
Namespace: "",
}
glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name)
@ -284,7 +284,7 @@ func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_s
// terminatePods will ensure all pods on the given node that are in terminating state are eventually
// cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how
// long before we should check again (the next deadline for a pod to complete), or an error.
func terminatePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, since time.Time, maxGracePeriod time.Duration) (bool, time.Duration, error) {
func terminatePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, nodeUID string, since time.Time, maxGracePeriod time.Duration) (bool, time.Duration, error) {
// the time before we should try again
nextAttempt := time.Duration(0)
// have we deleted all pods
@ -320,7 +320,7 @@ func terminatePods(kubeClient clientset.Interface, recorder record.EventRecorder
if remaining < 0 {
remaining = 0
glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace)
recordNodeEvent(recorder, nodeName, api.EventTypeNormal, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName))
recordNodeEvent(recorder, nodeName, nodeUID, api.EventTypeNormal, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName))
if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil {
glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err)
complete = false

View File

@ -386,14 +386,15 @@ func (nc *NodeController) Run(period time.Duration) {
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore)
nodeUid, _ := value.UID.(string)
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
nc.zoneTerminationEvictor[k].Add(value.Value)
nc.zoneTerminationEvictor[k].Add(value.Value, value.UID)
}
return true, 0
})
@ -407,7 +408,8 @@ func (nc *NodeController) Run(period time.Duration) {
defer nc.evictorLock.Unlock()
for k := range nc.zoneTerminationEvictor {
nc.zoneTerminationEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod)
nodeUid, _ := value.UID.(string)
completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, value.AddedAt, nc.maximumGracePeriod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
return false, 0
@ -415,7 +417,7 @@ func (nc *NodeController) Run(period time.Duration) {
if completed {
glog.V(2).Infof("All pods terminated on %s", value.Value)
recordNodeEvent(nc.recorder, value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
recordNodeEvent(nc.recorder, value.Value, nodeUid, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
return true, 0
}
@ -450,7 +452,7 @@ func (nc *NodeController) monitorNodeStatus() error {
added, deleted := nc.checkForNodeAddedDeleted(nodes)
for i := range added {
glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name)
recordNodeEvent(nc.recorder, added[i].Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
recordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
nc.knownNodeSet[added[i].Name] = added[i]
// When adding new Nodes we need to check if new zone appeared, and if so add new evictor.
zone := utilnode.GetZoneKey(added[i])
@ -468,7 +470,7 @@ func (nc *NodeController) monitorNodeStatus() error {
for i := range deleted {
glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name)
recordNodeEvent(nc.recorder, deleted[i].Name, api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name))
recordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name))
nc.evictPods(deleted[i])
delete(nc.knownNodeSet, deleted[i].Name)
}
@ -540,7 +542,7 @@ func (nc *NodeController) monitorNodeStatus() error {
}
if !exists {
glog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
recordNodeEvent(nc.recorder, node.Name, api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
recordNodeEvent(nc.recorder, node.Name, string(node.UID), api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
go func(nodeName string) {
defer utilruntime.HandleCrash()
// Kubelet is not reporting and Cloud Provider says node
@ -867,5 +869,5 @@ func (nc *NodeController) cancelPodEviction(node *api.Node) bool {
func (nc *NodeController) evictPods(node *api.Node) bool {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name)
return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
}

View File

@ -481,14 +481,16 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
zones := getZones(item.fakeNodeHandler)
for _, zone := range zones {
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
remaining, _ := deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore)
nodeUid, _ := value.UID.(string)
remaining, _ := deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore)
if remaining {
nodeController.zoneTerminationEvictor[zone].Add(value.Value)
nodeController.zoneTerminationEvictor[zone].Add(value.Value, nodeUid)
}
return true, 0
})
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
terminatePods(item.fakeNodeHandler, nodeController.recorder, value.Value, value.AddedAt, nodeController.maximumGracePeriod)
nodeUid, _ := value.UID.(string)
terminatePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, value.AddedAt, nodeController.maximumGracePeriod)
return true, 0
})
}
@ -1014,14 +1016,16 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
zones := getZones(fakeNodeHandler)
for _, zone := range zones {
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
remaining, _ := deletePods(fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore)
uid, _ := value.UID.(string)
remaining, _ := deletePods(fakeNodeHandler, nodeController.recorder, value.Value, uid, nodeController.daemonSetStore)
if remaining {
nodeController.zoneTerminationEvictor[zone].Add(value.Value)
nodeController.zoneTerminationEvictor[zone].Add(value.Value, value.UID)
}
return true, 0
})
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
terminatePods(fakeNodeHandler, nodeController.recorder, value.Value, value.AddedAt, nodeController.maximumGracePeriod)
uid, _ := value.UID.(string)
terminatePods(fakeNodeHandler, nodeController.recorder, value.Value, uid, value.AddedAt, nodeController.maximumGracePeriod)
return true, 0
})
}
@ -1544,7 +1548,8 @@ func TestNodeDeletion(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
nodeController.zonePodEvictor[""].Try(func(value TimedValue) (bool, time.Duration) {
deletePods(fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore)
uid, _ := value.UID.(string)
deletePods(fakeNodeHandler, nodeController.recorder, value.Value, uid, nodeController.daemonSetStore)
return true, 0
})
podEvicted := false
@ -1558,6 +1563,72 @@ func TestNodeDeletion(t *testing.T) {
}
}
func TestNodeEventGeneration(t *testing.T) {
fakeNow := unversioned.Date(2016, 8, 10, 12, 0, 0, 0, time.UTC)
fakeNodeHandler := &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
UID: "1234567890",
CreationTimestamp: unversioned.Date(2016, 8, 10, 0, 0, 0, 0, time.UTC),
},
Spec: api.NodeSpec{
ExternalID: "node0",
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
// Node status has just been updated.
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
},
},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceRequestsCPU): resource.MustParse("10"),
api.ResourceName(api.ResourceMemory): resource.MustParse("20G"),
},
},
},
},
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
}
nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
fakeRecorder := NewFakeRecorder()
nodeController.recorder = fakeRecorder
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
fakeNodeHandler.Delete("node0", nil)
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.zonePodEvictor[""].Try(func(value TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
deletePods(fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore)
return true, 0
})
if len(fakeRecorder.events) != 3 {
t.Fatalf("unexpected events: %v", fakeRecorder.events)
}
if fakeRecorder.events[0].Reason != "RegisteredNode" || fakeRecorder.events[1].Reason != "RemovingNode" || fakeRecorder.events[2].Reason != "DeletingAllPods" {
t.Fatalf("unexpected events generation: %v", fakeRecorder.events)
}
for _, event := range fakeRecorder.events {
involvedObject := event.InvolvedObject
actualUID := string(involvedObject.UID)
if actualUID != "1234567890" {
t.Fatalf("unexpected event uid: %v", actualUID)
}
}
}
func TestCheckPod(t *testing.T) {
tcs := []struct {
pod api.Pod

View File

@ -29,7 +29,9 @@ import (
// TimedValue is a value that should be processed at a designated time.
type TimedValue struct {
Value string
Value string
// UID could be anything that helps identify the value
UID interface{}
AddedAt time.Time
ProcessAt time.Time
}
@ -200,12 +202,13 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
}
}
// Adds value to the queue to be processed. Won't add the same value a second time if it was already
// added and not removed.
func (q *RateLimitedTimedQueue) Add(value string) bool {
// Adds value to the queue to be processed. Won't add the same value(comparsion by value) a second time
// if it was already added and not removed.
func (q *RateLimitedTimedQueue) Add(value string, uid interface{}) bool {
now := now()
return q.queue.Add(TimedValue{
Value: value,
UID: uid,
AddedAt: now,
ProcessAt: now,
})

View File

@ -40,9 +40,9 @@ func CheckSetEq(lhs, rhs sets.String) bool {
func TestAddNode(t *testing.T) {
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Add("first", "11111")
evictor.Add("second", "22222")
evictor.Add("third", "33333")
queuePattern := []string{"first", "second", "third"}
if len(evictor.queue.queue) != len(queuePattern) {
@ -70,9 +70,9 @@ func TestDelNode(t *testing.T) {
return t
}
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Add("first", "11111")
evictor.Add("second", "22222")
evictor.Add("third", "33333")
evictor.Remove("first")
queuePattern := []string{"second", "third"}
@ -92,9 +92,9 @@ func TestDelNode(t *testing.T) {
}
evictor = NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Add("first", "11111")
evictor.Add("second", "22222")
evictor.Add("third", "33333")
evictor.Remove("second")
queuePattern = []string{"first", "third"}
@ -114,9 +114,9 @@ func TestDelNode(t *testing.T) {
}
evictor = NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Add("first", "11111")
evictor.Add("second", "22222")
evictor.Add("third", "33333")
evictor.Remove("third")
queuePattern = []string{"first", "second"}
@ -138,9 +138,9 @@ func TestDelNode(t *testing.T) {
func TestTry(t *testing.T) {
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Add("first", "11111")
evictor.Add("second", "22222")
evictor.Add("third", "33333")
evictor.Remove("second")
deletedMap := sets.NewString()
@ -173,9 +173,9 @@ func TestTryOrdering(t *testing.T) {
return current
}
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Add("first", "11111")
evictor.Add("second", "22222")
evictor.Add("third", "33333")
order := []string{}
count := 0
@ -225,9 +225,9 @@ func TestTryOrdering(t *testing.T) {
func TestTryRemovingWhileTry(t *testing.T) {
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Add("first", "11111")
evictor.Add("second", "22222")
evictor.Add("third", "33333")
processing := make(chan struct{})
wait := make(chan struct{})
@ -271,9 +271,9 @@ func TestTryRemovingWhileTry(t *testing.T) {
func TestClear(t *testing.T) {
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Add("first", "11111")
evictor.Add("second", "22222")
evictor.Add("third", "33333")
evictor.Clear()

View File

@ -18,13 +18,18 @@ package node
import (
"errors"
"fmt"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/clock"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"
@ -192,6 +197,67 @@ func (m *FakeNodeHandler) Patch(name string, pt api.PatchType, data []byte, subr
return nil, nil
}
// FakeRecorder is used as a fake during testing.
type FakeRecorder struct {
source api.EventSource
events []*api.Event
clock clock.Clock
}
func (f *FakeRecorder) Event(obj runtime.Object, eventtype, reason, message string) {
f.generateEvent(obj, unversioned.Now(), eventtype, reason, message)
}
func (f *FakeRecorder) Eventf(obj runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
f.Event(obj, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (f *FakeRecorder) PastEventf(obj runtime.Object, timestamp unversioned.Time, eventtype, reason, messageFmt string, args ...interface{}) {
}
func (f *FakeRecorder) generateEvent(obj runtime.Object, timestamp unversioned.Time, eventtype, reason, message string) {
ref, err := api.GetReference(obj)
if err != nil {
return
}
event := f.makeEvent(ref, eventtype, reason, message)
event.Source = f.source
if f.events != nil {
fmt.Println("write event")
f.events = append(f.events, event)
}
}
func (f *FakeRecorder) makeEvent(ref *api.ObjectReference, eventtype, reason, message string) *api.Event {
fmt.Println("make event")
t := unversioned.Time{Time: f.clock.Now()}
namespace := ref.Namespace
if namespace == "" {
namespace = api.NamespaceDefault
}
return &api.Event{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
Namespace: namespace,
},
InvolvedObject: *ref,
Reason: reason,
Message: message,
FirstTimestamp: t,
LastTimestamp: t,
Count: 1,
Type: eventtype,
}
}
func NewFakeRecorder() *FakeRecorder {
return &FakeRecorder{
source: api.EventSource{Component: "nodeControllerTest"},
events: []*api.Event{},
clock: clock.NewFakeClock(time.Now()),
}
}
func newNode(name string) *api.Node {
return &api.Node{
ObjectMeta: api.ObjectMeta{Name: name},