diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD b/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD index 11fc4b92d1..14ba5e38c4 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD @@ -100,6 +100,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", "//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library", diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go index 2bf9668d28..d7d6c537ce 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go @@ -22,6 +22,7 @@ import ( "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-03-01/compute" "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2017-09-01/network" + "github.com/Azure/go-autorest/autorest/to" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -146,7 +147,7 @@ func (az *Cloud) CreateOrUpdateSecurityGroup(service *v1.Service, sg network.Sec ctx, cancel := getContextWithCancel() defer cancel() - resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg) + resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg, to.String(sg.Etag)) klog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name) if err == nil { if isSuccessHTTPResponse(resp) { @@ -156,6 +157,11 @@ func (az *Cloud) CreateOrUpdateSecurityGroup(service *v1.Service, sg network.Sec return fmt.Errorf("HTTP response %q", resp.Status) } } + + // Invalidate the cache because ETAG precondition mismatch. + if resp != nil && resp.StatusCode == http.StatusPreconditionFailed { + az.nsgCache.Delete(*sg.Name) + } return err } @@ -168,14 +174,20 @@ func (az *Cloud) CreateOrUpdateSGWithRetry(service *v1.Service, sg network.Secur ctx, cancel := getContextWithCancel() defer cancel() - resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg) + resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg, to.String(sg.Etag)) klog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name) - done, err := az.processHTTPRetryResponse(service, "CreateOrUpdateSecurityGroup", resp, err) + done, retryError := az.processHTTPRetryResponse(service, "CreateOrUpdateSecurityGroup", resp, err) if done && err == nil { // Invalidate the cache right after updating az.nsgCache.Delete(*sg.Name) } - return done, err + + // Invalidate the cache and abort backoff because ETAG precondition mismatch. + if resp != nil && resp.StatusCode == http.StatusPreconditionFailed { + az.nsgCache.Delete(*sg.Name) + return true, err + } + return done, retryError }) } @@ -538,17 +550,22 @@ func isSuccessHTTPResponse(resp *http.Response) bool { } func shouldRetryHTTPRequest(resp *http.Response, err error) bool { - if err != nil { - return true - } - if resp != nil { - // HTTP 4xx or 5xx suggests we should retry + // HTTP 412 (StatusPreconditionFailed) means etag mismatch, hence we shouldn't retry. + if resp.StatusCode == http.StatusPreconditionFailed { + return false + } + + // HTTP 4xx (except 412) or 5xx suggests we should retry. if 399 < resp.StatusCode && resp.StatusCode < 600 { return true } } + if err != nil { + return true + } + return false } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go index fd69eadbff..ac959515a8 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go @@ -122,3 +122,12 @@ func (t *timedCache) Delete(key string) error { key: key, }) } + +// Set sets the data cache for the key. +// It is only used for testing. +func (t *timedCache) Set(key string, data interface{}) { + t.store.Add(&cacheEntry{ + key: key, + data: data, + }) +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_client.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_client.go index 7870285348..8d7130eac0 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_client.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_client.go @@ -81,7 +81,7 @@ type SubnetsClient interface { // SecurityGroupsClient defines needed functions for azure network.SecurityGroupsClient type SecurityGroupsClient interface { - CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup) (resp *http.Response, err error) + CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (resp *http.Response, err error) Delete(ctx context.Context, resourceGroupName string, networkSecurityGroupName string) (resp *http.Response, err error) Get(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, expand string) (result network.SecurityGroup, err error) List(ctx context.Context, resourceGroupName string) (result []network.SecurityGroup, err error) @@ -714,7 +714,7 @@ func newAzSecurityGroupsClient(config *azClientConfig) *azSecurityGroupsClient { } } -func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup) (resp *http.Response, err error) { +func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (resp *http.Response, err error) { /* Write rate limiting */ if !az.rateLimiterWriter.TryAccept() { err = createRateLimitErr(true, "NSGCreateOrUpdate") @@ -727,7 +727,13 @@ func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGr }() mc := newMetricContext("security_groups", "create_or_update", resourceGroupName, az.client.SubscriptionID) - future, err := az.client.CreateOrUpdate(ctx, resourceGroupName, networkSecurityGroupName, parameters) + req, err := az.createOrUpdatePreparer(ctx, resourceGroupName, networkSecurityGroupName, parameters, etag) + if err != nil { + mc.Observe(err) + return nil, err + } + + future, err := az.client.CreateOrUpdateSender(req) if err != nil { mc.Observe(err) return future.Response(), err @@ -738,6 +744,34 @@ func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGr return future.Response(), err } +// createOrUpdatePreparer prepares the CreateOrUpdate request. +func (az *azSecurityGroupsClient) createOrUpdatePreparer(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (*http.Request, error) { + pathParameters := map[string]interface{}{ + "networkSecurityGroupName": autorest.Encode("path", networkSecurityGroupName), + "resourceGroupName": autorest.Encode("path", resourceGroupName), + "subscriptionId": autorest.Encode("path", az.client.SubscriptionID), + } + + const APIVersion = "2017-09-01" + queryParameters := map[string]interface{}{ + "api-version": APIVersion, + } + + preparerDecorators := []autorest.PrepareDecorator{ + autorest.AsContentType("application/json; charset=utf-8"), + autorest.AsPut(), + autorest.WithBaseURL(az.client.BaseURI), + autorest.WithPathParameters("/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkSecurityGroups/{networkSecurityGroupName}", pathParameters), + autorest.WithJSON(parameters), + autorest.WithQueryParameters(queryParameters), + } + if etag != "" { + preparerDecorators = append(preparerDecorators, autorest.WithHeader("If-Match", autorest.String(etag))) + } + preparer := autorest.CreatePreparer(preparerDecorators...) + return preparer.Prepare((&http.Request{}).WithContext(ctx)) +} + func (az *azSecurityGroupsClient) Delete(ctx context.Context, resourceGroupName string, networkSecurityGroupName string) (resp *http.Response, err error) { /* Write rate limiting */ if !az.rateLimiterWriter.TryAccept() { diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go index 17bbb8f99a..2ce921be1b 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go @@ -36,6 +36,10 @@ import ( "github.com/Azure/go-autorest/autorest/to" ) +var ( + errPreconditionFailedEtagMismatch = fmt.Errorf("PreconditionFailedEtagMismatch") +) + type fakeAzureLBClient struct { mutex *sync.Mutex FakeStore map[string]map[string]network.LoadBalancer @@ -417,13 +421,21 @@ func newFakeAzureNSGClient() *fakeAzureNSGClient { return fNSG } -func (fNSG *fakeAzureNSGClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup) (resp *http.Response, err error) { +func (fNSG *fakeAzureNSGClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (resp *http.Response, err error) { fNSG.mutex.Lock() defer fNSG.mutex.Unlock() if _, ok := fNSG.FakeStore[resourceGroupName]; !ok { fNSG.FakeStore[resourceGroupName] = make(map[string]network.SecurityGroup) } + + if nsg, ok := fNSG.FakeStore[resourceGroupName][networkSecurityGroupName]; ok { + if etag != "" && to.String(nsg.Etag) != "" && etag != to.String(nsg.Etag) { + return &http.Response{ + StatusCode: http.StatusPreconditionFailed, + }, errPreconditionFailedEtagMismatch + } + } fNSG.FakeStore[resourceGroupName][networkSecurityGroupName] = parameters return nil, nil diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_test.go index aae754303b..efcfbad560 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_test.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/record" servicehelpers "k8s.io/cloud-provider/service/helpers" "k8s.io/legacy-cloud-providers/azure/auth" @@ -855,6 +856,25 @@ func TestReconcileSecurityWithSourceRanges(t *testing.T) { validateSecurityGroup(t, sg, svc) } +func TestReconcileSecurityGroupEtagMismatch(t *testing.T) { + az := getTestCloud() + + sg := getTestSecurityGroup(az) + cachedSG := *sg + cachedSG.Etag = to.StringPtr("1111111-0000-0000-0000-000000000000") + az.nsgCache.Set(to.String(sg.Name), &cachedSG) + + svc1 := getTestService("servicea", v1.ProtocolTCP, 80) + clusterResources := getClusterResources(az, 1, 1) + lb, _ := az.reconcileLoadBalancer(testClusterName, &svc1, clusterResources.nodes, true) + lbStatus, _ := az.getServiceLoadBalancerStatus(&svc1, lb) + + newSG, err := az.reconcileSecurityGroup(testClusterName, &svc1, &lbStatus.Ingress[0].IP, true /* wantLb */) + assert.Nil(t, newSG) + assert.NotNil(t, err) + assert.Equal(t, err, errPreconditionFailedEtagMismatch) +} + func TestReconcilePublicIPWithNewService(t *testing.T) { az := getTestCloud() svc := getTestService("servicea", v1.ProtocolTCP, 80, 443) @@ -958,6 +978,7 @@ func getTestCloud() (az *Cloud) { nodeResourceGroups: map[string]string{}, unmanagedNodes: sets.NewString(), routeCIDRs: map[string]string{}, + eventRecorder: &record.FakeRecorder{}, } az.DisksClient = newFakeDisksClient() az.InterfacesClient = newFakeAzureInterfacesClient() @@ -1186,6 +1207,7 @@ func getTestSecurityGroup(az *Cloud, services ...v1.Service) *network.SecurityGr sg := network.SecurityGroup{ Name: &az.SecurityGroupName, + Etag: to.StringPtr("0000000-0000-0000-0000-000000000000"), SecurityGroupPropertiesFormat: &network.SecurityGroupPropertiesFormat{ SecurityRules: &rules, }, @@ -1197,7 +1219,8 @@ func getTestSecurityGroup(az *Cloud, services ...v1.Service) *network.SecurityGr ctx, az.ResourceGroup, az.SecurityGroupName, - sg) + sg, + "") return &sg }