Merge pull request #32807 from jingxu97/stateupdateNeeded-9-15

Automatic merge from submit-queue

Fix race condition in setting node statusUpdateNeeded flag

This PR fixes the race condition in setting node statusUpdateNeeded flag
in master's attachdetach controller. This flag is used to indicate
whether a node status has been updated by the node_status_updater or
not. When updater finishes update a node status, it is set to false.
When the node status is changed such as volume is detached or new volume
is attached to the node, the flag is set to true so that updater can
update the status again. The previous workflow has a race condition as
follows
1. updater gets the currently attached volume list from the node which needs to be
updated.
2. A new volume A is attached to the same node right after 1 and set the
flag to TRUE
3. updater updates the node attached volume list (which does not include volume A) and then set the flag to FALSE.
The result is that volume A will be never added to the attached volume
list so at node side, this volume is never attached.

So in this PR, the flag is set to FALSE when updater tries to get the
attached volume list (as in an atomic operation). So in the above
example, after step 2, the flag will be TRUE again, in step 3, updater
does not set the flag if updates is sucessful. So after that, flag is
still TRUE and in next round of update, the node status will be updated.
pull/6/head
Kubernetes Submit Queue 2016-09-23 11:25:16 -07:00 committed by GitHub
commit 0a4316f11e
3 changed files with 40 additions and 33 deletions

View File

@ -66,12 +66,12 @@ type ActualStateOfWorld interface {
// the specified volume, an error is returned. // the specified volume, an error is returned.
SetVolumeMountedByNode(volumeName api.UniqueVolumeName, nodeName string, mounted bool) error SetVolumeMountedByNode(volumeName api.UniqueVolumeName, nodeName string, mounted bool) error
// ResetNodeStatusUpdateNeeded resets statusUpdateNeeded for the specified // SetNodeStatusUpdateNeeded sets statusUpdateNeeded for the specified
// node to false indicating the AttachedVolume field of the Node's Status // node to true indicating the AttachedVolume field in the Node's Status
// object has been updated. // object needs to be updated by the node updater again.
// If no node with the name nodeName exists in list of attached nodes for // If the specifed node does not exist in the nodesToUpdateStatusFor list,
// the specified volume, an error is returned. // log the error and return
ResetNodeStatusUpdateNeeded(nodeName string) error SetNodeStatusUpdateNeeded(nodeName string)
// ResetDetachRequestTime resets the detachRequestTime to 0 which indicates there is no detach // ResetDetachRequestTime resets the detachRequestTime to 0 which indicates there is no detach
// request any more for the volume // request any more for the volume
@ -433,21 +433,28 @@ func (asw *actualStateOfWorld) addVolumeToReportAsAttached(
} }
} }
func (asw *actualStateOfWorld) ResetNodeStatusUpdateNeeded( // Update the flag statusUpdateNeeded to indicate whether node status is already updated or
nodeName string) error { // needs to be updated again by the node status updater.
asw.Lock() // If the specifed node does not exist in the nodesToUpdateStatusFor list, log the error and return
defer asw.Unlock() // This is an internal function and caller should acquire and release the lock
// Remove volume from volumes to report as attached func (asw *actualStateOfWorld) updateNodeStatusUpdateNeeded(nodeName string, needed bool) {
nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName] nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if !nodeToUpdateExists { if !nodeToUpdateExists {
return fmt.Errorf( // should not happen
"failed to ResetNodeStatusUpdateNeeded(nodeName=%q) nodeName does not exist", glog.Errorf(
"Failed to set statusUpdateNeeded to needed %t because nodeName=%q does not exist",
needed,
nodeName) nodeName)
} }
nodeToUpdate.statusUpdateNeeded = false nodeToUpdate.statusUpdateNeeded = needed
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
return nil }
func (asw *actualStateOfWorld) SetNodeStatusUpdateNeeded(nodeName string) {
asw.Lock()
defer asw.Unlock()
asw.updateNodeStatusUpdateNeeded(nodeName, true)
} }
func (asw *actualStateOfWorld) DeleteVolumeNode( func (asw *actualStateOfWorld) DeleteVolumeNode(
@ -529,7 +536,7 @@ func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[string][]api.Att
defer asw.RUnlock() defer asw.RUnlock()
volumesToReportAttached := make(map[string][]api.AttachedVolume) volumesToReportAttached := make(map[string][]api.AttachedVolume)
for _, nodeToUpdateObj := range asw.nodesToUpdateStatusFor { for nodeName, nodeToUpdateObj := range asw.nodesToUpdateStatusFor {
if nodeToUpdateObj.statusUpdateNeeded { if nodeToUpdateObj.statusUpdateNeeded {
attachedVolumes := make( attachedVolumes := make(
[]api.AttachedVolume, []api.AttachedVolume,
@ -544,6 +551,10 @@ func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[string][]api.Att
} }
volumesToReportAttached[nodeToUpdateObj.nodeName] = attachedVolumes volumesToReportAttached[nodeToUpdateObj.nodeName] = attachedVolumes
} }
// When GetVolumesToReportAttached is called by node status updater, the current status
// of this node will be updated, so set the flag statusUpdateNeeded to false indicating
// the current status is already updated.
asw.updateNodeStatusUpdateNeeded(nodeName, false)
} }
return volumesToReportAttached return volumesToReportAttached

View File

@ -851,7 +851,7 @@ func Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Positive(
reportAsAttachedVolumesMap := asw.GetVolumesToReportAttached() reportAsAttachedVolumesMap := asw.GetVolumesToReportAttached()
volumes, exists := reportAsAttachedVolumesMap[nodeName] volumes, exists := reportAsAttachedVolumesMap[nodeName]
if !exists { if !exists {
t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: <node %q exist> Actual: <node does not exist in the reportedAsAttached map", nodeName) t.Fatalf("Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Positive failed. Expected: <node %q exist> Actual: <node does not exist in the reportedAsAttached map", nodeName)
} }
if len(volumes) > 0 { if len(volumes) > 0 {
t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes)) t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes))
@ -861,7 +861,7 @@ func Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Positive(
reportAsAttachedVolumesMap = asw.GetVolumesToReportAttached() reportAsAttachedVolumesMap = asw.GetVolumesToReportAttached()
volumes, exists = reportAsAttachedVolumesMap[nodeName] volumes, exists = reportAsAttachedVolumesMap[nodeName]
if !exists { if !exists {
t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: <node %q exist> Actual: <node does not exist in the reportedAsAttached map", nodeName) t.Fatalf("Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Positive failed. Expected: <node %q exist> Actual: <node does not exist in the reportedAsAttached map", nodeName)
} }
if len(volumes) != 1 { if len(volumes) != 1 {
t.Fatalf("len(reportAsAttachedVolumes) Expected: <1> Actual: <%v>", len(volumes)) t.Fatalf("len(reportAsAttachedVolumes) Expected: <1> Actual: <%v>", len(volumes))
@ -871,9 +871,9 @@ func Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Positive(
// Populates data struct with one volume/node entry. // Populates data struct with one volume/node entry.
// Calls RemoveVolumeFromReportAsAttached // Calls RemoveVolumeFromReportAsAttached
// Calls DeleteVolumeNode // Calls DeleteVolumeNode
// Calls AddVolumeToReportAsAttached // Calls AddVolumeNode
// Verifyies there is no volume as reported as attached // Verifyies there is no volume as reported as attached
func Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Negative(t *testing.T) { func Test_RemoveVolumeFromReportAsAttached_Delete_AddVolumeNode(t *testing.T) {
// Arrange // Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr) asw := NewActualStateOfWorld(volumePluginMgr)
@ -894,22 +894,23 @@ func Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Negative(
reportAsAttachedVolumesMap := asw.GetVolumesToReportAttached() reportAsAttachedVolumesMap := asw.GetVolumesToReportAttached()
volumes, exists := reportAsAttachedVolumesMap[nodeName] volumes, exists := reportAsAttachedVolumesMap[nodeName]
if !exists { if !exists {
t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: <node %q exist> Actual: <node does not exist in the reportedAsAttached map", nodeName) t.Fatalf("Test_RemoveVolumeFromReportAsAttached_Delete_AddVolumeNode failed. Expected: <node %q exists> Actual: <node does not exist in the reportedAsAttached map", nodeName)
} }
if len(volumes) > 0 { if len(volumes) > 0 {
t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes)) t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes))
} }
asw.DeleteVolumeNode(generatedVolumeName, nodeName) asw.DeleteVolumeNode(generatedVolumeName, nodeName)
asw.AddVolumeToReportAsAttached(generatedVolumeName, nodeName)
asw.AddVolumeNode(volumeSpec, nodeName, "" /*device path*/)
reportAsAttachedVolumesMap = asw.GetVolumesToReportAttached() reportAsAttachedVolumesMap = asw.GetVolumesToReportAttached()
volumes, exists = reportAsAttachedVolumesMap[nodeName] volumes, exists = reportAsAttachedVolumesMap[nodeName]
if !exists { if !exists {
t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: <node %q exist> Actual: <node does not exist in the reportedAsAttached map", nodeName) t.Fatalf("Test_RemoveVolumeFromReportAsAttached_Delete_AddVolumeNode failed. Expected: <node %q exists> Actual: <node does not exist in the reportedAsAttached map", nodeName)
} }
if len(volumes) > 0 { if len(volumes) != 1 {
t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes)) t.Fatalf("len(reportAsAttachedVolumes) Expected: <1> Actual: <%v>", len(volumes))
} }
} }

View File

@ -107,20 +107,15 @@ func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
_, err = nsu.kubeClient.Core().Nodes().PatchStatus(nodeName, patchBytes) _, err = nsu.kubeClient.Core().Nodes().PatchStatus(nodeName, patchBytes)
if err != nil { if err != nil {
// If update node status fails, reset flag statusUpdateNeeded back to true
// to indicate this node status needs to be udpated again
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
return fmt.Errorf( return fmt.Errorf(
"failed to kubeClient.Core().Nodes().Patch for node %q. %v", "failed to kubeClient.Core().Nodes().Patch for node %q. %v",
nodeName, nodeName,
err) err)
} }
err = nsu.actualStateOfWorld.ResetNodeStatusUpdateNeeded(nodeName)
if err != nil {
return fmt.Errorf(
"failed to ResetNodeStatusUpdateNeeded for node %q. %v",
nodeName,
err)
}
glog.V(3).Infof( glog.V(3).Infof(
"Updating status for node %q succeeded. patchBytes: %q", "Updating status for node %q succeeded. patchBytes: %q",
nodeName, nodeName,