Merge pull request #46463 from wongma7/getinstances

Automatic merge from submit-queue (batch tested with PRs 46489, 46281, 46463, 46114, 43946)

AWS: consider instances of all states in DisksAreAttached, not just "running"

Require callers of `getInstancesByNodeNames(Cached)` to specify the states they want to filter instances by, if any. DisksAreAttached, cannot only get "running" instances because of the following attach/detach bug we discovered:

1. Node A stops (or reboots) and stays down for x amount of time
2. Kube reschedules all pods to different nodes; the ones using ebs volumes cannot run because their volumes are still attached to node A
3. Verify volumes are attached check happens while node A is down
4. Since aws ebs bulk verify filters by running nodes, it assumes the volumes attached to node A are detached and removes them all from ASW
5. Node A comes back; its volumes are still attached to it but the attach detach controller has removed them all from asw and so will never detach them even though they are no longer desired on this node and in fact desired elsewhere
6. Pods cannot run because their volumes are still attached to node A

So the idea here is to remove the wrong assumption that callers of `getInstancesByNodeNames(Cached)` only want "running" nodes.

I hope this isn't too confusing, open to alternative ways of fixing the bug + making the code nice.

ping @gnufied @kubernetes/sig-storage-bugs

```release-note
Fix AWS EBS volumes not getting detached from node if routine to verify volumes are attached runs while the node is down
```
pull/6/head
Kubernetes Submit Queue 2017-05-30 11:59:04 -07:00 committed by GitHub
commit 222d247489
2 changed files with 11 additions and 10 deletions

View File

@ -2599,7 +2599,7 @@ func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, n
return nil, fmt.Errorf("LoadBalancerIP cannot be specified for AWS ELB") return nil, fmt.Errorf("LoadBalancerIP cannot be specified for AWS ELB")
} }
instances, err := c.getInstancesByNodeNamesCached(nodeNames(nodes)) instances, err := c.getInstancesByNodeNamesCached(nodeNames(nodes), "running")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -3176,7 +3176,7 @@ func (c *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servic
// UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer // UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer
func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error { func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
instances, err := c.getInstancesByNodeNamesCached(nodeNames(nodes)) instances, err := c.getInstancesByNodeNamesCached(nodeNames(nodes), "running")
if err != nil { if err != nil {
return err return err
} }
@ -3248,10 +3248,11 @@ func (c *Cloud) getInstancesByIDs(instanceIDs []*string) (map[string]*ec2.Instan
return instancesByID, nil return instancesByID, nil
} }
// Fetches and caches instances by node names; returns an error if any cannot be found. // Fetches and caches instances in the given state, by node names; returns an error if any cannot be found. If no states
// are given, no state filter is used and instances of all states are fetched.
// This is implemented with a multi value filter on the node names, fetching the desired instances with a single query. // This is implemented with a multi value filter on the node names, fetching the desired instances with a single query.
// TODO(therc): make all the caching more rational during the 1.4 timeframe // TODO(therc): make all the caching more rational during the 1.4 timeframe
func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Instance, error) { func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String, states ...string) ([]*ec2.Instance, error) {
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
if nodeNames.Equal(c.lastNodeNames) { if nodeNames.Equal(c.lastNodeNames) {
@ -3262,7 +3263,7 @@ func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Ins
return c.lastInstancesByNodeNames, nil return c.lastInstancesByNodeNames, nil
} }
} }
instances, err := c.getInstancesByNodeNames(nodeNames.List()) instances, err := c.getInstancesByNodeNames(nodeNames.List(), states...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -3278,7 +3279,7 @@ func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Ins
return instances, nil return instances, nil
} }
func (c *Cloud) getInstancesByNodeNames(nodeNames []string) ([]*ec2.Instance, error) { func (c *Cloud) getInstancesByNodeNames(nodeNames []string, states ...string) ([]*ec2.Instance, error) {
names := aws.StringSlice(nodeNames) names := aws.StringSlice(nodeNames)
nodeNameFilter := &ec2.Filter{ nodeNameFilter := &ec2.Filter{
@ -3286,9 +3287,9 @@ func (c *Cloud) getInstancesByNodeNames(nodeNames []string) ([]*ec2.Instance, er
Values: names, Values: names,
} }
filters := []*ec2.Filter{ filters := []*ec2.Filter{nodeNameFilter}
nodeNameFilter, if len(states) > 0 {
newEc2Filter("instance-state-name", "running"), filters = append(filters, newEc2Filter("instance-state-name", states...))
} }
instances, err := c.describeInstances(filters) instances, err := c.describeInstances(filters)

View File

@ -1050,7 +1050,7 @@ func TestFindInstancesByNodeNameCached(t *testing.T) {
} }
nodeNames := sets.NewString(nodeNameOne) nodeNames := sets.NewString(nodeNameOne)
returnedInstances, errr := c.getInstancesByNodeNamesCached(nodeNames) returnedInstances, errr := c.getInstancesByNodeNamesCached(nodeNames, "running")
if errr != nil { if errr != nil {
t.Errorf("Failed to find instance: %v", err) t.Errorf("Failed to find instance: %v", err)