Merge pull request #77210 from feiskyer/nsg-race

Add etag for NSG updates so as to fix nsg race condition
k3s-v1.15.3
Kubernetes Prow Robot 2019-05-03 02:56:17 -07:00 committed by GitHub
commit 5f6d9b614e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 110 additions and 14 deletions

View File

@ -100,6 +100,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library", "//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library",

View File

@ -22,6 +22,7 @@ import (
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-03-01/compute" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-03-01/compute"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2017-09-01/network" "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2017-09-01/network"
"github.com/Azure/go-autorest/autorest/to"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@ -146,7 +147,7 @@ func (az *Cloud) CreateOrUpdateSecurityGroup(service *v1.Service, sg network.Sec
ctx, cancel := getContextWithCancel() ctx, cancel := getContextWithCancel()
defer cancel() defer cancel()
resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg) resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg, to.String(sg.Etag))
klog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name) klog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name)
if err == nil { if err == nil {
if isSuccessHTTPResponse(resp) { if isSuccessHTTPResponse(resp) {
@ -156,6 +157,11 @@ func (az *Cloud) CreateOrUpdateSecurityGroup(service *v1.Service, sg network.Sec
return fmt.Errorf("HTTP response %q", resp.Status) return fmt.Errorf("HTTP response %q", resp.Status)
} }
} }
// Invalidate the cache because ETAG precondition mismatch.
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
az.nsgCache.Delete(*sg.Name)
}
return err return err
} }
@ -168,14 +174,20 @@ func (az *Cloud) CreateOrUpdateSGWithRetry(service *v1.Service, sg network.Secur
ctx, cancel := getContextWithCancel() ctx, cancel := getContextWithCancel()
defer cancel() defer cancel()
resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg) resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg, to.String(sg.Etag))
klog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name) klog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name)
done, err := az.processHTTPRetryResponse(service, "CreateOrUpdateSecurityGroup", resp, err) done, retryError := az.processHTTPRetryResponse(service, "CreateOrUpdateSecurityGroup", resp, err)
if done && err == nil { if done && err == nil {
// Invalidate the cache right after updating // Invalidate the cache right after updating
az.nsgCache.Delete(*sg.Name) az.nsgCache.Delete(*sg.Name)
} }
return done, err
// Invalidate the cache and abort backoff because ETAG precondition mismatch.
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
az.nsgCache.Delete(*sg.Name)
return true, err
}
return done, retryError
}) })
} }
@ -538,17 +550,22 @@ func isSuccessHTTPResponse(resp *http.Response) bool {
} }
func shouldRetryHTTPRequest(resp *http.Response, err error) bool { func shouldRetryHTTPRequest(resp *http.Response, err error) bool {
if err != nil {
return true
}
if resp != nil { if resp != nil {
// HTTP 4xx or 5xx suggests we should retry // HTTP 412 (StatusPreconditionFailed) means etag mismatch, hence we shouldn't retry.
if resp.StatusCode == http.StatusPreconditionFailed {
return false
}
// HTTP 4xx (except 412) or 5xx suggests we should retry.
if 399 < resp.StatusCode && resp.StatusCode < 600 { if 399 < resp.StatusCode && resp.StatusCode < 600 {
return true return true
} }
} }
if err != nil {
return true
}
return false return false
} }

View File

@ -122,3 +122,12 @@ func (t *timedCache) Delete(key string) error {
key: key, key: key,
}) })
} }
// Set sets the data cache for the key.
// It is only used for testing.
func (t *timedCache) Set(key string, data interface{}) {
t.store.Add(&cacheEntry{
key: key,
data: data,
})
}

View File

@ -81,7 +81,7 @@ type SubnetsClient interface {
// SecurityGroupsClient defines needed functions for azure network.SecurityGroupsClient // SecurityGroupsClient defines needed functions for azure network.SecurityGroupsClient
type SecurityGroupsClient interface { type SecurityGroupsClient interface {
CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup) (resp *http.Response, err error) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (resp *http.Response, err error)
Delete(ctx context.Context, resourceGroupName string, networkSecurityGroupName string) (resp *http.Response, err error) Delete(ctx context.Context, resourceGroupName string, networkSecurityGroupName string) (resp *http.Response, err error)
Get(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, expand string) (result network.SecurityGroup, err error) Get(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, expand string) (result network.SecurityGroup, err error)
List(ctx context.Context, resourceGroupName string) (result []network.SecurityGroup, err error) List(ctx context.Context, resourceGroupName string) (result []network.SecurityGroup, err error)
@ -714,7 +714,7 @@ func newAzSecurityGroupsClient(config *azClientConfig) *azSecurityGroupsClient {
} }
} }
func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup) (resp *http.Response, err error) { func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (resp *http.Response, err error) {
/* Write rate limiting */ /* Write rate limiting */
if !az.rateLimiterWriter.TryAccept() { if !az.rateLimiterWriter.TryAccept() {
err = createRateLimitErr(true, "NSGCreateOrUpdate") err = createRateLimitErr(true, "NSGCreateOrUpdate")
@ -727,7 +727,13 @@ func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGr
}() }()
mc := newMetricContext("security_groups", "create_or_update", resourceGroupName, az.client.SubscriptionID) mc := newMetricContext("security_groups", "create_or_update", resourceGroupName, az.client.SubscriptionID)
future, err := az.client.CreateOrUpdate(ctx, resourceGroupName, networkSecurityGroupName, parameters) req, err := az.createOrUpdatePreparer(ctx, resourceGroupName, networkSecurityGroupName, parameters, etag)
if err != nil {
mc.Observe(err)
return nil, err
}
future, err := az.client.CreateOrUpdateSender(req)
if err != nil { if err != nil {
mc.Observe(err) mc.Observe(err)
return future.Response(), err return future.Response(), err
@ -738,6 +744,34 @@ func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGr
return future.Response(), err return future.Response(), err
} }
// createOrUpdatePreparer prepares the CreateOrUpdate request.
func (az *azSecurityGroupsClient) createOrUpdatePreparer(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (*http.Request, error) {
pathParameters := map[string]interface{}{
"networkSecurityGroupName": autorest.Encode("path", networkSecurityGroupName),
"resourceGroupName": autorest.Encode("path", resourceGroupName),
"subscriptionId": autorest.Encode("path", az.client.SubscriptionID),
}
const APIVersion = "2017-09-01"
queryParameters := map[string]interface{}{
"api-version": APIVersion,
}
preparerDecorators := []autorest.PrepareDecorator{
autorest.AsContentType("application/json; charset=utf-8"),
autorest.AsPut(),
autorest.WithBaseURL(az.client.BaseURI),
autorest.WithPathParameters("/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkSecurityGroups/{networkSecurityGroupName}", pathParameters),
autorest.WithJSON(parameters),
autorest.WithQueryParameters(queryParameters),
}
if etag != "" {
preparerDecorators = append(preparerDecorators, autorest.WithHeader("If-Match", autorest.String(etag)))
}
preparer := autorest.CreatePreparer(preparerDecorators...)
return preparer.Prepare((&http.Request{}).WithContext(ctx))
}
func (az *azSecurityGroupsClient) Delete(ctx context.Context, resourceGroupName string, networkSecurityGroupName string) (resp *http.Response, err error) { func (az *azSecurityGroupsClient) Delete(ctx context.Context, resourceGroupName string, networkSecurityGroupName string) (resp *http.Response, err error) {
/* Write rate limiting */ /* Write rate limiting */
if !az.rateLimiterWriter.TryAccept() { if !az.rateLimiterWriter.TryAccept() {

View File

@ -36,6 +36,10 @@ import (
"github.com/Azure/go-autorest/autorest/to" "github.com/Azure/go-autorest/autorest/to"
) )
var (
errPreconditionFailedEtagMismatch = fmt.Errorf("PreconditionFailedEtagMismatch")
)
type fakeAzureLBClient struct { type fakeAzureLBClient struct {
mutex *sync.Mutex mutex *sync.Mutex
FakeStore map[string]map[string]network.LoadBalancer FakeStore map[string]map[string]network.LoadBalancer
@ -417,13 +421,21 @@ func newFakeAzureNSGClient() *fakeAzureNSGClient {
return fNSG return fNSG
} }
func (fNSG *fakeAzureNSGClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup) (resp *http.Response, err error) { func (fNSG *fakeAzureNSGClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (resp *http.Response, err error) {
fNSG.mutex.Lock() fNSG.mutex.Lock()
defer fNSG.mutex.Unlock() defer fNSG.mutex.Unlock()
if _, ok := fNSG.FakeStore[resourceGroupName]; !ok { if _, ok := fNSG.FakeStore[resourceGroupName]; !ok {
fNSG.FakeStore[resourceGroupName] = make(map[string]network.SecurityGroup) fNSG.FakeStore[resourceGroupName] = make(map[string]network.SecurityGroup)
} }
if nsg, ok := fNSG.FakeStore[resourceGroupName][networkSecurityGroupName]; ok {
if etag != "" && to.String(nsg.Etag) != "" && etag != to.String(nsg.Etag) {
return &http.Response{
StatusCode: http.StatusPreconditionFailed,
}, errPreconditionFailedEtagMismatch
}
}
fNSG.FakeStore[resourceGroupName][networkSecurityGroupName] = parameters fNSG.FakeStore[resourceGroupName][networkSecurityGroupName] = parameters
return nil, nil return nil, nil

View File

@ -30,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
servicehelpers "k8s.io/cloud-provider/service/helpers" servicehelpers "k8s.io/cloud-provider/service/helpers"
"k8s.io/legacy-cloud-providers/azure/auth" "k8s.io/legacy-cloud-providers/azure/auth"
@ -855,6 +856,25 @@ func TestReconcileSecurityWithSourceRanges(t *testing.T) {
validateSecurityGroup(t, sg, svc) validateSecurityGroup(t, sg, svc)
} }
func TestReconcileSecurityGroupEtagMismatch(t *testing.T) {
az := getTestCloud()
sg := getTestSecurityGroup(az)
cachedSG := *sg
cachedSG.Etag = to.StringPtr("1111111-0000-0000-0000-000000000000")
az.nsgCache.Set(to.String(sg.Name), &cachedSG)
svc1 := getTestService("servicea", v1.ProtocolTCP, 80)
clusterResources := getClusterResources(az, 1, 1)
lb, _ := az.reconcileLoadBalancer(testClusterName, &svc1, clusterResources.nodes, true)
lbStatus, _ := az.getServiceLoadBalancerStatus(&svc1, lb)
newSG, err := az.reconcileSecurityGroup(testClusterName, &svc1, &lbStatus.Ingress[0].IP, true /* wantLb */)
assert.Nil(t, newSG)
assert.NotNil(t, err)
assert.Equal(t, err, errPreconditionFailedEtagMismatch)
}
func TestReconcilePublicIPWithNewService(t *testing.T) { func TestReconcilePublicIPWithNewService(t *testing.T) {
az := getTestCloud() az := getTestCloud()
svc := getTestService("servicea", v1.ProtocolTCP, 80, 443) svc := getTestService("servicea", v1.ProtocolTCP, 80, 443)
@ -958,6 +978,7 @@ func getTestCloud() (az *Cloud) {
nodeResourceGroups: map[string]string{}, nodeResourceGroups: map[string]string{},
unmanagedNodes: sets.NewString(), unmanagedNodes: sets.NewString(),
routeCIDRs: map[string]string{}, routeCIDRs: map[string]string{},
eventRecorder: &record.FakeRecorder{},
} }
az.DisksClient = newFakeDisksClient() az.DisksClient = newFakeDisksClient()
az.InterfacesClient = newFakeAzureInterfacesClient() az.InterfacesClient = newFakeAzureInterfacesClient()
@ -1186,6 +1207,7 @@ func getTestSecurityGroup(az *Cloud, services ...v1.Service) *network.SecurityGr
sg := network.SecurityGroup{ sg := network.SecurityGroup{
Name: &az.SecurityGroupName, Name: &az.SecurityGroupName,
Etag: to.StringPtr("0000000-0000-0000-0000-000000000000"),
SecurityGroupPropertiesFormat: &network.SecurityGroupPropertiesFormat{ SecurityGroupPropertiesFormat: &network.SecurityGroupPropertiesFormat{
SecurityRules: &rules, SecurityRules: &rules,
}, },
@ -1197,7 +1219,8 @@ func getTestSecurityGroup(az *Cloud, services ...v1.Service) *network.SecurityGr
ctx, ctx,
az.ResourceGroup, az.ResourceGroup,
az.SecurityGroupName, az.SecurityGroupName,
sg) sg,
"")
return &sg return &sg
} }