From 14cad206f5b4a56276ba7f35ce2710266abb8c0f Mon Sep 17 00:00:00 2001 From: Jing Xu Date: Thu, 15 Sep 2016 11:01:11 -0700 Subject: [PATCH] Fix race conditino in setting node statusUpdateNeeded flag This PR fixes the race condition in setting node statusUpdateNeeded flag in master's attachdetach controller. This flag is used to indicate whether a node status has been updated by the node_status_updater or not. When updater finishes update a node status, it is set to false. When the node status is changed such as volume is detached or new volume is attached to the node, the flag is set to true so that updater can update the status again. The previous workflow has a race condition as follows 1. updater gets the currently attached volume list from the node which needs to be updated. 2. A new volume A is attached to the same node right after 1 and set the flag to TRUE 3. updater updates the node attached volume list (which does not include volume A) and then set the flag to FALSE. The result is that volume A will be never added to the attached volume list so at node side, this volume is never attached. So in this PR, the flag is set to FALSE when updater tries to get the attached volume list (as in an atomic operation). So in the above example, after step 2, the flag will be TRUE again, in step 3, updater does not set the flag if updates is sucessful. So after that, flag is still TRUE and in next round of update, the node status will be updated. This PR also changes a unit test due to the workflow changes --- .../cache/actual_state_of_world.go | 43 ++++++++++++------- .../cache/actual_state_of_world_test.go | 19 ++++---- .../statusupdater/node_status_updater.go | 11 ++--- 3 files changed, 40 insertions(+), 33 deletions(-) diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go index 55bff28d2d..9b965bbeae 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go @@ -66,12 +66,12 @@ type ActualStateOfWorld interface { // the specified volume, an error is returned. SetVolumeMountedByNode(volumeName api.UniqueVolumeName, nodeName string, mounted bool) error - // ResetNodeStatusUpdateNeeded resets statusUpdateNeeded for the specified - // node to false indicating the AttachedVolume field of the Node's Status - // object has been updated. - // If no node with the name nodeName exists in list of attached nodes for - // the specified volume, an error is returned. - ResetNodeStatusUpdateNeeded(nodeName string) error + // SetNodeStatusUpdateNeeded sets statusUpdateNeeded for the specified + // node to true indicating the AttachedVolume field in the Node's Status + // object needs to be updated by the node updater again. + // If the specifed node does not exist in the nodesToUpdateStatusFor list, + // log the error and return + SetNodeStatusUpdateNeeded(nodeName string) // ResetDetachRequestTime resets the detachRequestTime to 0 which indicates there is no detach // request any more for the volume @@ -433,21 +433,28 @@ func (asw *actualStateOfWorld) addVolumeToReportAsAttached( } } -func (asw *actualStateOfWorld) ResetNodeStatusUpdateNeeded( - nodeName string) error { - asw.Lock() - defer asw.Unlock() - // Remove volume from volumes to report as attached +// Update the flag statusUpdateNeeded to indicate whether node status is already updated or +// needs to be updated again by the node status updater. +// If the specifed node does not exist in the nodesToUpdateStatusFor list, log the error and return +// This is an internal function and caller should acquire and release the lock +func (asw *actualStateOfWorld) updateNodeStatusUpdateNeeded(nodeName string, needed bool) { nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName] if !nodeToUpdateExists { - return fmt.Errorf( - "failed to ResetNodeStatusUpdateNeeded(nodeName=%q) nodeName does not exist", + // should not happen + glog.Errorf( + "Failed to set statusUpdateNeeded to needed %t because nodeName=%q does not exist", + needed, nodeName) } - nodeToUpdate.statusUpdateNeeded = false + nodeToUpdate.statusUpdateNeeded = needed asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate - return nil +} + +func (asw *actualStateOfWorld) SetNodeStatusUpdateNeeded(nodeName string) { + asw.Lock() + defer asw.Unlock() + asw.updateNodeStatusUpdateNeeded(nodeName, true) } func (asw *actualStateOfWorld) DeleteVolumeNode( @@ -529,7 +536,7 @@ func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[string][]api.Att defer asw.RUnlock() volumesToReportAttached := make(map[string][]api.AttachedVolume) - for _, nodeToUpdateObj := range asw.nodesToUpdateStatusFor { + for nodeName, nodeToUpdateObj := range asw.nodesToUpdateStatusFor { if nodeToUpdateObj.statusUpdateNeeded { attachedVolumes := make( []api.AttachedVolume, @@ -544,6 +551,10 @@ func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[string][]api.Att } volumesToReportAttached[nodeToUpdateObj.nodeName] = attachedVolumes } + // When GetVolumesToReportAttached is called by node status updater, the current status + // of this node will be updated, so set the flag statusUpdateNeeded to false indicating + // the current status is already updated. + asw.updateNodeStatusUpdateNeeded(nodeName, false) } return volumesToReportAttached diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go index 8379fab83c..aca4693754 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go @@ -851,7 +851,7 @@ func Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Positive( reportAsAttachedVolumesMap := asw.GetVolumesToReportAttached() volumes, exists := reportAsAttachedVolumesMap[nodeName] if !exists { - t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: Actual: Actual: 0 { t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes)) @@ -861,7 +861,7 @@ func Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Positive( reportAsAttachedVolumesMap = asw.GetVolumesToReportAttached() volumes, exists = reportAsAttachedVolumesMap[nodeName] if !exists { - t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: Actual: Actual: Actual: <%v>", len(volumes)) @@ -871,9 +871,9 @@ func Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Positive( // Populates data struct with one volume/node entry. // Calls RemoveVolumeFromReportAsAttached // Calls DeleteVolumeNode -// Calls AddVolumeToReportAsAttached +// Calls AddVolumeNode // Verifyies there is no volume as reported as attached -func Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Negative(t *testing.T) { +func Test_RemoveVolumeFromReportAsAttached_Delete_AddVolumeNode(t *testing.T) { // Arrange volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) asw := NewActualStateOfWorld(volumePluginMgr) @@ -894,22 +894,23 @@ func Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Negative( reportAsAttachedVolumesMap := asw.GetVolumesToReportAttached() volumes, exists := reportAsAttachedVolumesMap[nodeName] if !exists { - t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: Actual: Actual: 0 { t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes)) } asw.DeleteVolumeNode(generatedVolumeName, nodeName) - asw.AddVolumeToReportAsAttached(generatedVolumeName, nodeName) + + asw.AddVolumeNode(volumeSpec, nodeName, "" /*device path*/) reportAsAttachedVolumesMap = asw.GetVolumesToReportAttached() volumes, exists = reportAsAttachedVolumesMap[nodeName] if !exists { - t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: Actual: Actual: 0 { - t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes)) + if len(volumes) != 1 { + t.Fatalf("len(reportAsAttachedVolumes) Expected: <1> Actual: <%v>", len(volumes)) } } diff --git a/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go b/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go index 0e1de23a68..000fd20fcc 100644 --- a/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go +++ b/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go @@ -107,20 +107,15 @@ func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error { _, err = nsu.kubeClient.Core().Nodes().PatchStatus(nodeName, patchBytes) if err != nil { + // If update node status fails, reset flag statusUpdateNeeded back to true + // to indicate this node status needs to be udpated again + nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName) return fmt.Errorf( "failed to kubeClient.Core().Nodes().Patch for node %q. %v", nodeName, err) } - err = nsu.actualStateOfWorld.ResetNodeStatusUpdateNeeded(nodeName) - if err != nil { - return fmt.Errorf( - "failed to ResetNodeStatusUpdateNeeded for node %q. %v", - nodeName, - err) - } - glog.V(3).Infof( "Updating status for node %q succeeded. patchBytes: %q", nodeName,