Merge pull request #68212 from feiskyer/az-events

Add service events for azure cloud provider
pull/8/head
k8s-ci-robot 2018-09-16 08:40:39 -07:00 committed by GitHub
commit e763a3c56d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 109 additions and 111 deletions

View File

@ -46,13 +46,18 @@ go_library(
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/network/mgmt/2017-09-01/network:go_default_library",

View File

@ -28,7 +28,11 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/azure/auth"
@ -183,6 +187,10 @@ type Cloud struct {
// client for vm sizes list
VirtualMachineSizesClient VirtualMachineSizesClient
kubeClient clientset.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
vmCache *timedCache
lbCache *timedCache
nsgCache *timedCache
@ -383,7 +391,12 @@ func parseConfig(configReader io.Reader) (*Config, error) {
}
// Initialize passes a Kubernetes clientBuilder interface to the cloud provider
func (az *Cloud) Initialize(clientBuilder controller.ControllerClientBuilder) {}
func (az *Cloud) Initialize(clientBuilder controller.ControllerClientBuilder) {
az.kubeClient = clientBuilder.ClientOrDie("azure-cloud-provider")
az.eventBroadcaster = record.NewBroadcaster()
az.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: az.kubeClient.CoreV1().Events("")})
az.eventRecorder = az.eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "azure-cloud-provider"})
}
// LoadBalancer returns a balancer interface. Also returns true if the interface is supported, false otherwise.
func (az *Cloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {

View File

@ -18,13 +18,15 @@ package azure
import (
"context"
"fmt"
"net/http"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2017-09-01/network"
"github.com/Azure/go-autorest/autorest"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/cloudprovider"
@ -45,6 +47,13 @@ func (az *Cloud) requestBackoff() (resourceRequestBackoff wait.Backoff) {
return resourceRequestBackoff
}
// Event creates a event for the specified object.
func (az *Cloud) Event(obj runtime.Object, eventtype, reason, message string) {
if obj != nil && reason != "" {
az.eventRecorder.Event(obj, eventtype, reason, message)
}
}
// GetVirtualMachineWithRetry invokes az.getVirtualMachine with exponential backoff retry
func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName) (compute.VirtualMachine, error) {
var machine compute.VirtualMachine
@ -109,14 +118,14 @@ func (az *Cloud) GetIPForMachineWithRetry(name types.NodeName) (string, string,
}
// CreateOrUpdateSGWithRetry invokes az.SecurityGroupsClient.CreateOrUpdate with exponential backoff retry
func (az *Cloud) CreateOrUpdateSGWithRetry(sg network.SecurityGroup) error {
func (az *Cloud) CreateOrUpdateSGWithRetry(service *v1.Service, sg network.SecurityGroup) error {
return wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg)
glog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name)
done, err := processHTTPRetryResponse(resp, err)
done, err := az.processHTTPRetryResponse(service, "CreateOrUpdateSecurityGroup", resp, err)
if done && err == nil {
// Invalidate the cache right after updating
az.nsgCache.Delete(*sg.Name)
@ -126,14 +135,14 @@ func (az *Cloud) CreateOrUpdateSGWithRetry(sg network.SecurityGroup) error {
}
// CreateOrUpdateLBWithRetry invokes az.LoadBalancerClient.CreateOrUpdate with exponential backoff retry
func (az *Cloud) CreateOrUpdateLBWithRetry(lb network.LoadBalancer) error {
func (az *Cloud) CreateOrUpdateLBWithRetry(service *v1.Service, lb network.LoadBalancer) error {
return wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.LoadBalancerClient.CreateOrUpdate(ctx, az.ResourceGroup, *lb.Name, lb)
glog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%s): end", *lb.Name)
done, err := processHTTPRetryResponse(resp, err)
done, err := az.processHTTPRetryResponse(service, "CreateOrUpdateLoadBalancer", resp, err)
if done && err == nil {
// Invalidate the cache right after updating
az.lbCache.Delete(*lb.Name)
@ -143,7 +152,7 @@ func (az *Cloud) CreateOrUpdateLBWithRetry(lb network.LoadBalancer) error {
}
// ListLBWithRetry invokes az.LoadBalancerClient.List with exponential backoff retry
func (az *Cloud) ListLBWithRetry() ([]network.LoadBalancer, error) {
func (az *Cloud) ListLBWithRetry(service *v1.Service) ([]network.LoadBalancer, error) {
var allLBs []network.LoadBalancer
err := wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
@ -153,6 +162,7 @@ func (az *Cloud) ListLBWithRetry() ([]network.LoadBalancer, error) {
allLBs, retryErr = az.LoadBalancerClient.List(ctx, az.ResourceGroup)
if retryErr != nil {
az.Event(service, v1.EventTypeWarning, "ListLoadBalancers", retryErr.Error())
glog.Errorf("LoadBalancerClient.List(%v) - backoff: failure, will retry,err=%v",
az.ResourceGroup,
retryErr)
@ -169,7 +179,7 @@ func (az *Cloud) ListLBWithRetry() ([]network.LoadBalancer, error) {
}
// ListPIPWithRetry list the PIP resources in the given resource group
func (az *Cloud) ListPIPWithRetry(pipResourceGroup string) ([]network.PublicIPAddress, error) {
func (az *Cloud) ListPIPWithRetry(service *v1.Service, pipResourceGroup string) ([]network.PublicIPAddress, error) {
var allPIPs []network.PublicIPAddress
err := wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
@ -179,6 +189,7 @@ func (az *Cloud) ListPIPWithRetry(pipResourceGroup string) ([]network.PublicIPAd
allPIPs, retryErr = az.PublicIPAddressesClient.List(ctx, pipResourceGroup)
if retryErr != nil {
az.Event(service, v1.EventTypeWarning, "ListPublicIPs", retryErr.Error())
glog.Errorf("PublicIPAddressesClient.List(%v) - backoff: failure, will retry,err=%v",
pipResourceGroup,
retryErr)
@ -195,48 +206,48 @@ func (az *Cloud) ListPIPWithRetry(pipResourceGroup string) ([]network.PublicIPAd
}
// CreateOrUpdatePIPWithRetry invokes az.PublicIPAddressesClient.CreateOrUpdate with exponential backoff retry
func (az *Cloud) CreateOrUpdatePIPWithRetry(pipResourceGroup string, pip network.PublicIPAddress) error {
func (az *Cloud) CreateOrUpdatePIPWithRetry(service *v1.Service, pipResourceGroup string, pip network.PublicIPAddress) error {
return wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.PublicIPAddressesClient.CreateOrUpdate(ctx, pipResourceGroup, *pip.Name, pip)
glog.V(10).Infof("PublicIPAddressesClient.CreateOrUpdate(%s, %s): end", pipResourceGroup, *pip.Name)
return processHTTPRetryResponse(resp, err)
return az.processHTTPRetryResponse(service, "CreateOrUpdatePublicIPAddress", resp, err)
})
}
// CreateOrUpdateInterfaceWithRetry invokes az.PublicIPAddressesClient.CreateOrUpdate with exponential backoff retry
func (az *Cloud) CreateOrUpdateInterfaceWithRetry(nic network.Interface) error {
func (az *Cloud) CreateOrUpdateInterfaceWithRetry(service *v1.Service, nic network.Interface) error {
return wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.InterfacesClient.CreateOrUpdate(ctx, az.ResourceGroup, *nic.Name, nic)
glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%s): end", *nic.Name)
return processHTTPRetryResponse(resp, err)
return az.processHTTPRetryResponse(service, "CreateOrUpdateInterface", resp, err)
})
}
// DeletePublicIPWithRetry invokes az.PublicIPAddressesClient.Delete with exponential backoff retry
func (az *Cloud) DeletePublicIPWithRetry(pipResourceGroup string, pipName string) error {
func (az *Cloud) DeletePublicIPWithRetry(service *v1.Service, pipResourceGroup string, pipName string) error {
return wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.PublicIPAddressesClient.Delete(ctx, pipResourceGroup, pipName)
return processHTTPRetryResponse(resp, err)
return az.processHTTPRetryResponse(service, "DeletePublicIPAddress", resp, err)
})
}
// DeleteLBWithRetry invokes az.LoadBalancerClient.Delete with exponential backoff retry
func (az *Cloud) DeleteLBWithRetry(lbName string) error {
func (az *Cloud) DeleteLBWithRetry(service *v1.Service, lbName string) error {
return wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.LoadBalancerClient.Delete(ctx, az.ResourceGroup, lbName)
done, err := processHTTPRetryResponse(resp, err)
done, err := az.processHTTPRetryResponse(service, "DeleteLoadBalancer", resp, err)
if done && err == nil {
// Invalidate the cache right after deleting
az.lbCache.Delete(lbName)
@ -252,7 +263,7 @@ func (az *Cloud) CreateOrUpdateRouteTableWithRetry(routeTable network.RouteTable
defer cancel()
resp, err := az.RouteTablesClient.CreateOrUpdate(ctx, az.ResourceGroup, az.RouteTableName, routeTable)
return processHTTPRetryResponse(resp, err)
return az.processHTTPRetryResponse(nil, "", resp, err)
})
}
@ -264,7 +275,7 @@ func (az *Cloud) CreateOrUpdateRouteWithRetry(route network.Route) error {
resp, err := az.RoutesClient.CreateOrUpdate(ctx, az.ResourceGroup, az.RouteTableName, *route.Name, route)
glog.V(10).Infof("RoutesClient.CreateOrUpdate(%s): end", *route.Name)
return processHTTPRetryResponse(resp, err)
return az.processHTTPRetryResponse(nil, "", resp, err)
})
}
@ -276,7 +287,7 @@ func (az *Cloud) DeleteRouteWithRetry(routeName string) error {
resp, err := az.RoutesClient.Delete(ctx, az.ResourceGroup, az.RouteTableName, routeName)
glog.V(10).Infof("RoutesClient.Delete(%s): end", az.RouteTableName)
return processHTTPRetryResponse(resp, err)
return az.processHTTPRetryResponse(nil, "", resp, err)
})
}
@ -288,7 +299,7 @@ func (az *Cloud) CreateOrUpdateVMWithRetry(resourceGroup, vmName string, newVM c
resp, err := az.VirtualMachinesClient.CreateOrUpdate(ctx, resourceGroup, vmName, newVM)
glog.V(10).Infof("VirtualMachinesClient.CreateOrUpdate(%s): end", vmName)
return processHTTPRetryResponse(resp, err)
return az.processHTTPRetryResponse(nil, "", resp, err)
})
}
@ -297,39 +308,12 @@ func (az *Cloud) UpdateVmssVMWithRetry(ctx context.Context, resourceGroupName st
return wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
resp, err := az.VirtualMachineScaleSetVMsClient.Update(ctx, resourceGroupName, VMScaleSetName, instanceID, parameters)
glog.V(10).Infof("VirtualMachinesClient.CreateOrUpdate(%s,%s): end", VMScaleSetName, instanceID)
return processHTTPRetryResponse(resp, err)
return az.processHTTPRetryResponse(nil, "", resp, err)
})
}
// A wait.ConditionFunc function to deal with common HTTP backoff response conditions
func processRetryResponse(resp autorest.Response, err error) (bool, error) {
if isSuccessHTTPResponse(resp) {
glog.V(2).Infof("processRetryResponse: backoff success, HTTP response=%d", resp.StatusCode)
return true, nil
}
if shouldRetryAPIRequest(resp, err) {
glog.Errorf("processRetryResponse: backoff failure, will retry, HTTP response=%d, err=%v", resp.StatusCode, err)
// suppress the error object so that backoff process continues
return false, nil
}
// Fall-through: stop periodic backoff
return true, nil
}
// shouldRetryAPIRequest determines if the response from an HTTP request suggests periodic retry behavior
func shouldRetryAPIRequest(resp autorest.Response, err error) bool {
if err != nil {
return true
}
// HTTP 4xx or 5xx suggests we should retry
if 399 < resp.StatusCode && resp.StatusCode < 600 {
return true
}
return false
}
// isSuccessHTTPResponse determines if the response from an HTTP request suggests success
func isSuccessHTTPResponse(resp autorest.Response) bool {
func isSuccessHTTPResponse(resp http.Response) bool {
// HTTP 2xx suggests a successful response
if 199 < resp.StatusCode && resp.StatusCode < 300 {
return true
@ -352,18 +336,18 @@ func shouldRetryHTTPRequest(resp *http.Response, err error) bool {
return false
}
func processHTTPRetryResponse(resp *http.Response, err error) (bool, error) {
if resp != nil {
func (az *Cloud) processHTTPRetryResponse(service *v1.Service, reason string, resp *http.Response, err error) (bool, error) {
if resp != nil && isSuccessHTTPResponse(*resp) {
// HTTP 2xx suggests a successful response
if 199 < resp.StatusCode && resp.StatusCode < 300 {
return true, nil
}
return true, nil
}
if shouldRetryHTTPRequest(resp, err) {
if err != nil {
az.Event(service, v1.EventTypeWarning, reason, err.Error())
glog.Errorf("processHTTPRetryResponse: backoff failure, will retry, err=%v", err)
} else {
az.Event(service, v1.EventTypeWarning, reason, fmt.Sprintf("Azure HTTP response %d", resp.StatusCode))
glog.Errorf("processHTTPRetryResponse: backoff failure, will retry, HTTP response=%d", resp.StatusCode)
}

View File

@ -20,11 +20,9 @@ import (
"fmt"
"net/http"
"testing"
"github.com/Azure/go-autorest/autorest"
)
func TestShouldRetry(t *testing.T) {
func TestShouldRetryHTTPRequest(t *testing.T) {
tests := []struct {
code int
err error
@ -54,12 +52,10 @@ func TestShouldRetry(t *testing.T) {
}
for _, test := range tests {
resp := autorest.Response{
Response: &http.Response{
StatusCode: test.code,
},
resp := &http.Response{
StatusCode: test.code,
}
res := shouldRetryAPIRequest(resp, test.err)
res := shouldRetryHTTPRequest(resp, test.err)
if res != test.expected {
t.Errorf("expected: %v, saw: %v", test.expected, res)
}
@ -86,10 +82,8 @@ func TestIsSuccessResponse(t *testing.T) {
}
for _, test := range tests {
resp := autorest.Response{
Response: &http.Response{
StatusCode: test.code,
},
resp := http.Response{
StatusCode: test.code,
}
res := isSuccessHTTPResponse(resp)
if res != test.expected {
@ -99,6 +93,7 @@ func TestIsSuccessResponse(t *testing.T) {
}
func TestProcessRetryResponse(t *testing.T) {
az := &Cloud{}
tests := []struct {
code int
err error
@ -132,12 +127,10 @@ func TestProcessRetryResponse(t *testing.T) {
}
for _, test := range tests {
resp := autorest.Response{
Response: &http.Response{
StatusCode: test.code,
},
resp := &http.Response{
StatusCode: test.code,
}
res, err := processRetryResponse(resp, test.err)
res, err := az.processHTTPRetryResponse(nil, "", resp, test.err)
if res != test.stop {
t.Errorf("expected: %v, saw: %v", test.stop, res)
}

View File

@ -874,11 +874,11 @@ func (f *fakeVMSet) GetVMSetNames(service *v1.Service, nodes []*v1.Node) (availa
return nil, fmt.Errorf("unimplemented")
}
func (f *fakeVMSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error {
func (f *fakeVMSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error {
return fmt.Errorf("unimplemented")
}
func (f *fakeVMSet) EnsureBackendPoolDeleted(poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
func (f *fakeVMSet) EnsureBackendPoolDeleted(service *v1.Service, poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
return fmt.Errorf("unimplemented")
}

View File

@ -210,7 +210,7 @@ func (az *Cloud) getServiceLoadBalancer(service *v1.Service, clusterName string,
primaryVMSetName := az.vmSet.GetPrimaryVMSetName()
defaultLBName := az.getAzureLoadBalancerName(clusterName, primaryVMSetName, isInternal)
existingLBs, err := az.ListLBWithRetry()
existingLBs, err := az.ListLBWithRetry(service)
if err != nil {
return nil, nil, false, err
}
@ -387,7 +387,7 @@ func (az *Cloud) determinePublicIPName(clusterName string, service *v1.Service)
pipResourceGroup := az.getPublicIPAddressResourceGroup(service)
pips, err := az.ListPIPWithRetry(pipResourceGroup)
pips, err := az.ListPIPWithRetry(service, pipResourceGroup)
if err != nil {
return "", err
}
@ -475,7 +475,7 @@ func (az *Cloud) ensurePublicIPExists(service *v1.Service, pipName string, domai
glog.V(2).Infof("ensurePublicIPExists for service(%s): pip(%s) - creating", serviceName, *pip.Name)
glog.V(10).Infof("CreateOrUpdatePIPWithRetry(%s, %q): start", pipResourceGroup, *pip.Name)
err = az.CreateOrUpdatePIPWithRetry(pipResourceGroup, pip)
err = az.CreateOrUpdatePIPWithRetry(service, pipResourceGroup, pip)
if err != nil {
glog.V(2).Infof("ensure(%s) abort backoff: pip(%s) - creating", serviceName, *pip.Name)
return nil, err
@ -488,7 +488,6 @@ func (az *Cloud) ensurePublicIPExists(service *v1.Service, pipName string, domai
if err != nil {
return nil, err
}
return &pip, nil
}
@ -811,7 +810,7 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,
// Remove backend pools from vmSets. This is required for virtual machine scale sets before removing the LB.
vmSetName := az.mapLoadBalancerNameToVMSet(lbName, clusterName)
glog.V(10).Infof("EnsureBackendPoolDeleted(%s, %s): start", lbBackendPoolID, vmSetName)
err := az.vmSet.EnsureBackendPoolDeleted(lbBackendPoolID, vmSetName, lb.BackendAddressPools)
err := az.vmSet.EnsureBackendPoolDeleted(service, lbBackendPoolID, vmSetName, lb.BackendAddressPools)
if err != nil {
glog.Errorf("EnsureBackendPoolDeleted(%s, %s) failed: %v", lbBackendPoolID, vmSetName, err)
return nil, err
@ -820,7 +819,7 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,
// Remove the LB.
glog.V(10).Infof("reconcileLoadBalancer: az.DeleteLBWithRetry(%q): start", lbName)
err = az.DeleteLBWithRetry(lbName)
err = az.DeleteLBWithRetry(service, lbName)
if err != nil {
glog.V(2).Infof("reconcileLoadBalancer for service(%s) abort backoff: lb(%s) - deleting; no remaining frontendIPConfigurations", serviceName, lbName)
return nil, err
@ -828,7 +827,7 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,
glog.V(10).Infof("az.DeleteLBWithRetry(%q): end", lbName)
} else {
glog.V(2).Infof("reconcileLoadBalancer: reconcileLoadBalancer for service(%s): lb(%s) - updating", serviceName, lbName)
err := az.CreateOrUpdateLBWithRetry(*lb)
err := az.CreateOrUpdateLBWithRetry(service, *lb)
if err != nil {
glog.V(2).Infof("reconcileLoadBalancer for service(%s) abort backoff: lb(%s) - updating", serviceName, lbName)
return nil, err
@ -852,7 +851,7 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,
if wantLb && nodes != nil {
// Add the machines to the backend pool if they're not already
vmSetName := az.mapLoadBalancerNameToVMSet(lbName, clusterName)
err := az.vmSet.EnsureHostsInPool(serviceName, nodes, lbBackendPoolID, vmSetName, isInternal)
err := az.vmSet.EnsureHostsInPool(service, nodes, lbBackendPoolID, vmSetName, isInternal)
if err != nil {
return nil, err
}
@ -1145,7 +1144,7 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service,
sg.SecurityRules = &updatedRules
glog.V(2).Infof("reconcileSecurityGroup for service(%s): sg(%s) - updating", serviceName, *sg.Name)
glog.V(10).Infof("CreateOrUpdateSGWithRetry(%q): start", *sg.Name)
err := az.CreateOrUpdateSGWithRetry(sg)
err := az.CreateOrUpdateSGWithRetry(service, sg)
if err != nil {
glog.V(2).Infof("ensure(%s) abort backoff: sg(%s) - updating", serviceName, *sg.Name)
// TODO (Nov 2017): remove when augmented security rules are out of preview
@ -1316,7 +1315,7 @@ func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, want
pipResourceGroup := az.getPublicIPAddressResourceGroup(service)
pips, err := az.ListPIPWithRetry(pipResourceGroup)
pips, err := az.ListPIPWithRetry(service, pipResourceGroup)
if err != nil {
return nil, err
}
@ -1333,7 +1332,7 @@ func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, want
} else {
glog.V(2).Infof("reconcilePublicIP for service(%s): pip(%s) - deleting", serviceName, pipName)
glog.V(10).Infof("DeletePublicIPWithRetry(%s, %q): start", pipResourceGroup, pipName)
err = az.DeletePublicIPWithRetry(pipResourceGroup, pipName)
err = az.DeletePublicIPWithRetry(service, pipResourceGroup, pipName)
if err != nil {
glog.V(2).Infof("ensure(%s) abort backoff: pip(%s) - deleting", serviceName, pipName)
// We let err to pass through

View File

@ -620,8 +620,9 @@ func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName stri
// ensureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is
// participating in the specified LoadBalancer Backend Pool.
func (as *availabilitySet) ensureHostInPool(serviceName string, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error {
func (as *availabilitySet) ensureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error {
vmName := mapNodeNameToVMName(nodeName)
serviceName := getServiceName(service)
nic, err := as.getPrimaryInterfaceWithVMSet(vmName, vmSetName)
if err != nil {
if err == errNotInVMSet {
@ -689,7 +690,7 @@ func (as *availabilitySet) ensureHostInPool(serviceName string, nodeName types.N
glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%q): end", *nic.Name)
if as.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
glog.V(2).Infof("nicupdate(%s) backing off: nic(%s) - updating, err=%v", serviceName, nicName, err)
retryErr := as.CreateOrUpdateInterfaceWithRetry(nic)
retryErr := as.CreateOrUpdateInterfaceWithRetry(service, nic)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("nicupdate(%s) abort backoff: nic(%s) - updating", serviceName, nicName)
@ -704,7 +705,7 @@ func (as *availabilitySet) ensureHostInPool(serviceName string, nodeName types.N
// EnsureHostsInPool ensures the given Node's primary IP configurations are
// participating in the specified LoadBalancer Backend Pool.
func (as *availabilitySet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error {
func (as *availabilitySet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error {
hostUpdates := make([]func() error, 0, len(nodes))
for _, node := range nodes {
localNodeName := node.Name
@ -719,9 +720,9 @@ func (as *availabilitySet) EnsureHostsInPool(serviceName string, nodes []*v1.Nod
}
f := func() error {
err := as.ensureHostInPool(serviceName, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal)
err := as.ensureHostInPool(service, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal)
if err != nil {
return fmt.Errorf("ensure(%s): backendPoolID(%s) - failed to ensure host in pool: %q", serviceName, backendPoolID, err)
return fmt.Errorf("ensure(%s): backendPoolID(%s) - failed to ensure host in pool: %q", getServiceName(service), backendPoolID, err)
}
return nil
}
@ -737,7 +738,7 @@ func (as *availabilitySet) EnsureHostsInPool(serviceName string, nodes []*v1.Nod
}
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet.
func (as *availabilitySet) EnsureBackendPoolDeleted(poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
func (as *availabilitySet) EnsureBackendPoolDeleted(service *v1.Service, poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
// Do nothing for availability set.
return nil
}

View File

@ -54,9 +54,9 @@ type VMSet interface {
GetVMSetNames(service *v1.Service, nodes []*v1.Node) (availabilitySetNames *[]string, err error)
// EnsureHostsInPool ensures the given Node's primary IP configurations are
// participating in the specified LoadBalancer Backend Pool.
EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error
EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet.
EnsureBackendPoolDeleted(poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error
EnsureBackendPoolDeleted(service *v1.Service, poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error
// AttachDisk attaches a vhd to vm. The vhd must exist, can be identified by diskName, diskURI, and lun.
AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error

View File

@ -533,13 +533,14 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, err
}
// getScaleSetWithRetry gets scale set with exponential backoff retry
func (ss *scaleSet) getScaleSetWithRetry(name string) (compute.VirtualMachineScaleSet, bool, error) {
func (ss *scaleSet) getScaleSetWithRetry(service *v1.Service, name string) (compute.VirtualMachineScaleSet, bool, error) {
var result compute.VirtualMachineScaleSet
var exists bool
err := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
cached, retryErr := ss.vmssCache.Get(name)
if retryErr != nil {
ss.Event(service, v1.EventTypeWarning, "GetVirtualMachineScaleSet", retryErr.Error())
glog.Errorf("backoff: failure for scale set %q, will retry,err=%v", name, retryErr)
return false, nil
}
@ -590,24 +591,24 @@ func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *compute.VirtualMachine
}
// createOrUpdateVMSSWithRetry invokes ss.VirtualMachineScaleSetsClient.CreateOrUpdate with exponential backoff retry.
func (ss *scaleSet) createOrUpdateVMSSWithRetry(virtualMachineScaleSet compute.VirtualMachineScaleSet) error {
func (ss *scaleSet) createOrUpdateVMSSWithRetry(service *v1.Service, virtualMachineScaleSet compute.VirtualMachineScaleSet) error {
return wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, *virtualMachineScaleSet.Name, virtualMachineScaleSet)
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", *virtualMachineScaleSet.Name)
return processHTTPRetryResponse(resp, err)
return ss.processHTTPRetryResponse(service, "CreateOrUpdateVMSS", resp, err)
})
}
// updateVMSSInstancesWithRetry invokes ss.VirtualMachineScaleSetsClient.UpdateInstances with exponential backoff retry.
func (ss *scaleSet) updateVMSSInstancesWithRetry(scaleSetName string, vmInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs) error {
func (ss *scaleSet) updateVMSSInstancesWithRetry(service *v1.Service, scaleSetName string, vmInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs) error {
return wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := ss.VirtualMachineScaleSetsClient.UpdateInstances(ctx, ss.ResourceGroup, scaleSetName, vmInstanceIDs)
glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%s): end", scaleSetName)
return processHTTPRetryResponse(resp, err)
return ss.processHTTPRetryResponse(service, "CreateOrUpdateVMSSInstance", resp, err)
})
}
@ -652,9 +653,10 @@ func (ss *scaleSet) getNodesScaleSets(nodes []*v1.Node) (map[string]sets.String,
// ensureHostsInVMSetPool ensures the given Node's primary IP configurations are
// participating in the vmSet's LoadBalancer Backend Pool.
func (ss *scaleSet) ensureHostsInVMSetPool(serviceName string, backendPoolID string, vmSetName string, instanceIDs []string, isInternal bool) error {
func (ss *scaleSet) ensureHostsInVMSetPool(service *v1.Service, backendPoolID string, vmSetName string, instanceIDs []string, isInternal bool) error {
glog.V(3).Infof("ensuring hosts %q of scaleset %q in LB backendpool %q", instanceIDs, vmSetName, backendPoolID)
virtualMachineScaleSet, exists, err := ss.getScaleSetWithRetry(vmSetName)
serviceName := getServiceName(service)
virtualMachineScaleSet, exists, err := ss.getScaleSetWithRetry(service, vmSetName)
if err != nil {
glog.Errorf("ss.getScaleSetWithRetry(%s) for service %q failed: %v", vmSetName, serviceName, err)
return err
@ -722,7 +724,7 @@ func (ss *scaleSet) ensureHostsInVMSetPool(serviceName string, backendPoolID str
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", vmSetName)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate for service (%s): scale set (%s) - updating, err=%v", serviceName, vmSetName, err)
retryErr := ss.createOrUpdateVMSSWithRetry(virtualMachineScaleSet)
retryErr := ss.createOrUpdateVMSSWithRetry(service, virtualMachineScaleSet)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate for service (%s) abort backoff: scale set (%s) - updating", serviceName, vmSetName)
@ -743,7 +745,7 @@ func (ss *scaleSet) ensureHostsInVMSetPool(serviceName string, backendPoolID str
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", vmSetName)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(instanceResp, err) {
glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances for service (%s): scale set (%s) - updating, err=%v", serviceName, vmSetName, err)
retryErr := ss.updateVMSSInstancesWithRetry(vmSetName, vmInstanceIDs)
retryErr := ss.updateVMSSInstancesWithRetry(service, vmSetName, vmInstanceIDs)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances for service (%s) abort backoff: scale set (%s) - updating", serviceName, vmSetName)
@ -758,7 +760,8 @@ func (ss *scaleSet) ensureHostsInVMSetPool(serviceName string, backendPoolID str
// EnsureHostsInPool ensures the given Node's primary IP configurations are
// participating in the specified LoadBalancer Backend Pool.
func (ss *scaleSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error {
func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error {
serviceName := getServiceName(service)
scalesets, standardNodes, err := ss.getNodesScaleSets(nodes)
if err != nil {
glog.Errorf("getNodesScaleSets() for service %q failed: %v", serviceName, err)
@ -778,7 +781,7 @@ func (ss *scaleSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, back
instanceIDs.Insert("*")
}
err := ss.ensureHostsInVMSetPool(serviceName, backendPoolID, ssName, instanceIDs.List(), isInternal)
err := ss.ensureHostsInVMSetPool(service, backendPoolID, ssName, instanceIDs.List(), isInternal)
if err != nil {
glog.Errorf("ensureHostsInVMSetPool() with scaleSet %q for service %q failed: %v", ssName, serviceName, err)
return err
@ -786,7 +789,7 @@ func (ss *scaleSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, back
}
if ss.useStandardLoadBalancer() && len(standardNodes) > 0 {
err := ss.availabilitySet.EnsureHostsInPool(serviceName, standardNodes, backendPoolID, "", isInternal)
err := ss.availabilitySet.EnsureHostsInPool(service, standardNodes, backendPoolID, "", isInternal)
if err != nil {
glog.Errorf("availabilitySet.EnsureHostsInPool() for service %q failed: %v", serviceName, err)
return err
@ -797,9 +800,9 @@ func (ss *scaleSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, back
}
// ensureScaleSetBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified scaleset.
func (ss *scaleSet) ensureScaleSetBackendPoolDeleted(poolID, ssName string) error {
func (ss *scaleSet) ensureScaleSetBackendPoolDeleted(service *v1.Service, poolID, ssName string) error {
glog.V(3).Infof("ensuring backend pool %q deleted from scaleset %q", poolID, ssName)
virtualMachineScaleSet, exists, err := ss.getScaleSetWithRetry(ssName)
virtualMachineScaleSet, exists, err := ss.getScaleSetWithRetry(service, ssName)
if err != nil {
glog.Errorf("ss.ensureScaleSetBackendPoolDeleted(%s, %s) getScaleSetWithRetry(%s) failed: %v", poolID, ssName, ssName, err)
return err
@ -851,7 +854,7 @@ func (ss *scaleSet) ensureScaleSetBackendPoolDeleted(poolID, ssName string) erro
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", ssName)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating, err=%v", ssName, err)
retryErr := ss.createOrUpdateVMSSWithRetry(virtualMachineScaleSet)
retryErr := ss.createOrUpdateVMSSWithRetry(service, virtualMachineScaleSet)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate abort backoff: scale set (%s) - updating", ssName)
@ -872,7 +875,7 @@ func (ss *scaleSet) ensureScaleSetBackendPoolDeleted(poolID, ssName string) erro
glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%q): end", ssName)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(instanceResp, err) {
glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances scale set (%s) - updating, err=%v", ssName, err)
retryErr := ss.updateVMSSInstancesWithRetry(ssName, vmInstanceIDs)
retryErr := ss.updateVMSSInstancesWithRetry(service, ssName, vmInstanceIDs)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances abort backoff: scale set (%s) - updating", ssName)
@ -892,7 +895,7 @@ func (ss *scaleSet) ensureScaleSetBackendPoolDeleted(poolID, ssName string) erro
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", ssName)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating, err=%v", ssName, err)
retryErr := ss.createOrUpdateVMSSWithRetry(virtualMachineScaleSet)
retryErr := ss.createOrUpdateVMSSWithRetry(service, virtualMachineScaleSet)
if retryErr != nil {
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate abort backoff: scale set (%s) - updating", ssName)
}
@ -903,7 +906,7 @@ func (ss *scaleSet) ensureScaleSetBackendPoolDeleted(poolID, ssName string) erro
}
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet.
func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
func (ss *scaleSet) EnsureBackendPoolDeleted(service *v1.Service, poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
if backendAddressPools == nil {
return nil
}
@ -934,7 +937,7 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string, backendAd
continue
}
err := ss.ensureScaleSetBackendPoolDeleted(poolID, ssName)
err := ss.ensureScaleSetBackendPoolDeleted(service, poolID, ssName)
if err != nil {
glog.Errorf("ensureScaleSetBackendPoolDeleted() with scaleSet %q failed: %v", ssName, err)
return err