mirror of https://github.com/k3s-io/k3s
Azure cloudprovider retry using flowcontrol
An initial attempt at engaging exponential backoff for API error responses. Uses k8s.io/client-go/util/flowcontrol; implementation inspired by GCE cloudprovider backoff.pull/6/head
parent
db5c21f328
commit
f200f9a1e8
|
@ -22,6 +22,7 @@ import (
|
|||
"io/ioutil"
|
||||
"time"
|
||||
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/version"
|
||||
|
@ -74,16 +75,17 @@ type Config struct {
|
|||
// Cloud holds the config and clients
|
||||
type Cloud struct {
|
||||
Config
|
||||
Environment azure.Environment
|
||||
RoutesClient network.RoutesClient
|
||||
SubnetsClient network.SubnetsClient
|
||||
InterfacesClient network.InterfacesClient
|
||||
RouteTablesClient network.RouteTablesClient
|
||||
LoadBalancerClient network.LoadBalancersClient
|
||||
PublicIPAddressesClient network.PublicIPAddressesClient
|
||||
SecurityGroupsClient network.SecurityGroupsClient
|
||||
VirtualMachinesClient compute.VirtualMachinesClient
|
||||
StorageAccountClient storage.AccountsClient
|
||||
Environment azure.Environment
|
||||
RoutesClient network.RoutesClient
|
||||
SubnetsClient network.SubnetsClient
|
||||
InterfacesClient network.InterfacesClient
|
||||
RouteTablesClient network.RouteTablesClient
|
||||
LoadBalancerClient network.LoadBalancersClient
|
||||
PublicIPAddressesClient network.PublicIPAddressesClient
|
||||
SecurityGroupsClient network.SecurityGroupsClient
|
||||
VirtualMachinesClient compute.VirtualMachinesClient
|
||||
StorageAccountClient storage.AccountsClient
|
||||
operationPollRateLimiter flowcontrol.RateLimiter
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@ -177,6 +179,9 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
|
|||
az.StorageAccountClient = storage.NewAccountsClientWithBaseURI(az.Environment.ResourceManagerEndpoint, az.SubscriptionID)
|
||||
az.StorageAccountClient.Authorizer = servicePrincipalToken
|
||||
|
||||
// 1 qps, up to 5 burst when in flowcontrol; i.e., aggressive backoff enforcement
|
||||
az.operationPollRateLimiter = flowcontrol.NewTokenBucketRateLimiter(1, 5)
|
||||
|
||||
return &az, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package azure
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/arm/compute"
|
||||
"github.com/Azure/azure-sdk-for-go/arm/network"
|
||||
"github.com/Azure/go-autorest/autorest"
|
||||
)
|
||||
|
||||
const (
|
||||
operationPollInterval = 3 * time.Second
|
||||
operationPollTimeoutDuration = time.Hour
|
||||
)
|
||||
|
||||
// CreateOrUpdateSGWithRetry invokes az.SecurityGroupsClient.CreateOrUpdate with exponential backoff retry
|
||||
func (az *Cloud) CreateOrUpdateSGWithRetry(sg network.SecurityGroup) error {
|
||||
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
|
||||
resp, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil)
|
||||
return processRetryResponse(resp, err)
|
||||
})
|
||||
}
|
||||
|
||||
// CreateOrUpdateLBWithRetry invokes az.LoadBalancerClient.CreateOrUpdate with exponential backoff retry
|
||||
func (az *Cloud) CreateOrUpdateLBWithRetry(lb network.LoadBalancer) error {
|
||||
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
|
||||
resp, err := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil)
|
||||
return processRetryResponse(resp, err)
|
||||
})
|
||||
}
|
||||
|
||||
// CreateOrUpdatePIPWithRetry invokes az.PublicIPAddressesClient.CreateOrUpdate with exponential backoff retry
|
||||
func (az *Cloud) CreateOrUpdatePIPWithRetry(pip network.PublicIPAddress) error {
|
||||
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
|
||||
resp, err := az.PublicIPAddressesClient.CreateOrUpdate(az.ResourceGroup, *pip.Name, pip, nil)
|
||||
return processRetryResponse(resp, err)
|
||||
})
|
||||
}
|
||||
|
||||
// CreateOrUpdateInterfaceWithRetry invokes az.PublicIPAddressesClient.CreateOrUpdate with exponential backoff retry
|
||||
func (az *Cloud) CreateOrUpdateInterfaceWithRetry(nic network.Interface) error {
|
||||
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
|
||||
resp, err := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil)
|
||||
return processRetryResponse(resp, err)
|
||||
})
|
||||
}
|
||||
|
||||
// DeletePublicIPWithRetry invokes az.PublicIPAddressesClient.Delete with exponential backoff retry
|
||||
func (az *Cloud) DeletePublicIPWithRetry(pipName string) error {
|
||||
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
|
||||
resp, err := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil)
|
||||
return processRetryResponse(resp, err)
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteLBWithRetry invokes az.LoadBalancerClient.Delete with exponential backoff retry
|
||||
func (az *Cloud) DeleteLBWithRetry(lbName string) error {
|
||||
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
|
||||
resp, err := az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil)
|
||||
return processRetryResponse(resp, err)
|
||||
})
|
||||
}
|
||||
|
||||
// CreateOrUpdateRouteTableWithRetry invokes az.RouteTablesClient.CreateOrUpdate with exponential backoff retry
|
||||
func (az *Cloud) CreateOrUpdateRouteTableWithRetry(routeTable network.RouteTable) error {
|
||||
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
|
||||
resp, err := az.RouteTablesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, routeTable, nil)
|
||||
return processRetryResponse(resp, err)
|
||||
})
|
||||
}
|
||||
|
||||
// CreateOrUpdateRouteWithRetry invokes az.RoutesClient.CreateOrUpdate with exponential backoff retry
|
||||
func (az *Cloud) CreateOrUpdateRouteWithRetry(route network.Route) error {
|
||||
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
|
||||
resp, err := az.RoutesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, *route.Name, route, nil)
|
||||
return processRetryResponse(resp, err)
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteRouteWithRetry invokes az.RoutesClient.Delete with exponential backoff retry
|
||||
func (az *Cloud) DeleteRouteWithRetry(routeName string) error {
|
||||
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
|
||||
resp, err := az.RoutesClient.Delete(az.ResourceGroup, az.RouteTableName, routeName, nil)
|
||||
return processRetryResponse(resp, err)
|
||||
})
|
||||
}
|
||||
|
||||
// CreateOrUpdateVMWithRetry invokes az.VirtualMachinesClient.CreateOrUpdate with exponential backoff retry
|
||||
func (az *Cloud) CreateOrUpdateVMWithRetry(vmName string, newVM compute.VirtualMachine) error {
|
||||
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
|
||||
resp, err := az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil)
|
||||
return processRetryResponse(resp, err)
|
||||
})
|
||||
}
|
||||
|
||||
// An in-progress convenience function to deal with common HTTP backoff response conditions
|
||||
func processRetryResponse(resp autorest.Response, err error) (bool, error) {
|
||||
if isSuccessHTTPResponse(resp) {
|
||||
return true, nil
|
||||
}
|
||||
if shouldRetryAPIRequest(resp, err) {
|
||||
return false, err
|
||||
}
|
||||
// TODO determine the complete set of short-circuit conditions
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
// Fall-through: stop periodic backoff, return error object from most recent request
|
||||
return true, err
|
||||
}
|
||||
|
||||
// shouldRetryAPIRequest determines if the response from an HTTP request suggests periodic retry behavior
|
||||
func shouldRetryAPIRequest(resp autorest.Response, err error) bool {
|
||||
// TODO determine the complete set of retry conditions
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
if resp.StatusCode == 429 || resp.StatusCode == 500 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// isSuccessHTTPResponse determines if the response from an HTTP request suggests success
|
||||
func isSuccessHTTPResponse(resp autorest.Response) bool {
|
||||
// TODO determine the complete set of success conditions
|
||||
if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 202 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
|
@ -149,7 +149,13 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod
|
|||
// to nil. This is a workaround until https://github.com/Azure/go-autorest/issues/112 is fixed
|
||||
sg.SecurityGroupPropertiesFormat.NetworkInterfaces = nil
|
||||
sg.SecurityGroupPropertiesFormat.Subnets = nil
|
||||
_, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil)
|
||||
resp, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil)
|
||||
if shouldRetryAPIRequest(resp, err) {
|
||||
retryErr := az.CreateOrUpdateSGWithRetry(sg)
|
||||
if retryErr != nil {
|
||||
return nil, retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -219,7 +225,13 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod
|
|||
}
|
||||
if !existsLb || lbNeedsUpdate {
|
||||
glog.V(3).Infof("ensure(%s): lb(%s) - updating", serviceName, lbName)
|
||||
_, err = az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil)
|
||||
resp, err := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil)
|
||||
if shouldRetryAPIRequest(resp, err) {
|
||||
retryErr := az.CreateOrUpdateLBWithRetry(lb)
|
||||
if retryErr != nil {
|
||||
return nil, retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -310,7 +322,13 @@ func (az *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servi
|
|||
// to nil. This is a workaround until https://github.com/Azure/go-autorest/issues/112 is fixed
|
||||
sg.SecurityGroupPropertiesFormat.NetworkInterfaces = nil
|
||||
sg.SecurityGroupPropertiesFormat.Subnets = nil
|
||||
_, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *reconciledSg.Name, reconciledSg, nil)
|
||||
resp, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *reconciledSg.Name, reconciledSg, nil)
|
||||
if shouldRetryAPIRequest(resp, err) {
|
||||
retryErr := az.CreateOrUpdateSGWithRetry(reconciledSg)
|
||||
if retryErr != nil {
|
||||
return retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -339,14 +357,26 @@ func (az *Cloud) cleanupLoadBalancer(clusterName string, service *v1.Service, is
|
|||
if lbNeedsUpdate {
|
||||
if len(*lb.FrontendIPConfigurations) > 0 {
|
||||
glog.V(3).Infof("delete(%s): lb(%s) - updating", serviceName, lbName)
|
||||
_, err = az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil)
|
||||
resp, err := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil)
|
||||
if shouldRetryAPIRequest(resp, err) {
|
||||
retryErr := az.CreateOrUpdateLBWithRetry(lb)
|
||||
if retryErr != nil {
|
||||
return retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
glog.V(3).Infof("delete(%s): lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName)
|
||||
|
||||
_, err = az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil)
|
||||
resp, err := az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil)
|
||||
if shouldRetryAPIRequest(resp, err) {
|
||||
retryErr := az.DeleteLBWithRetry(lbName)
|
||||
if retryErr != nil {
|
||||
return retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -392,7 +422,13 @@ func (az *Cloud) ensurePublicIPExists(serviceName, pipName string) (*network.Pub
|
|||
pip.Tags = &map[string]*string{"service": &serviceName}
|
||||
|
||||
glog.V(3).Infof("ensure(%s): pip(%s) - creating", serviceName, *pip.Name)
|
||||
_, err = az.PublicIPAddressesClient.CreateOrUpdate(az.ResourceGroup, *pip.Name, pip, nil)
|
||||
resp, err := az.PublicIPAddressesClient.CreateOrUpdate(az.ResourceGroup, *pip.Name, pip, nil)
|
||||
if shouldRetryAPIRequest(resp, err) {
|
||||
retryErr := az.CreateOrUpdatePIPWithRetry(pip)
|
||||
if retryErr != nil {
|
||||
return nil, retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -407,7 +443,10 @@ func (az *Cloud) ensurePublicIPExists(serviceName, pipName string) (*network.Pub
|
|||
}
|
||||
|
||||
func (az *Cloud) ensurePublicIPDeleted(serviceName, pipName string) error {
|
||||
_, deleteErr := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil)
|
||||
resp, deleteErr := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil)
|
||||
if shouldRetryAPIRequest(resp, deleteErr) {
|
||||
deleteErr = az.DeletePublicIPWithRetry(pipName)
|
||||
}
|
||||
_, realErr := checkResourceExistsFromError(deleteErr)
|
||||
if realErr != nil {
|
||||
return nil
|
||||
|
@ -848,7 +887,13 @@ func (az *Cloud) ensureHostInPool(serviceName string, nodeName types.NodeName, b
|
|||
primaryIPConfig.LoadBalancerBackendAddressPools = &newBackendPools
|
||||
|
||||
glog.V(3).Infof("nicupdate(%s): nic(%s) - updating", serviceName, nicName)
|
||||
_, err := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil)
|
||||
resp, err := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil)
|
||||
if shouldRetryAPIRequest(resp, err) {
|
||||
retryErr := az.CreateOrUpdateInterfaceWithRetry(nic)
|
||||
if retryErr != nil {
|
||||
return retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -76,7 +76,13 @@ func (az *Cloud) CreateRoute(clusterName string, nameHint string, kubeRoute *clo
|
|||
}
|
||||
|
||||
glog.V(3).Infof("create: creating routetable. routeTableName=%q", az.RouteTableName)
|
||||
_, err = az.RouteTablesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, routeTable, nil)
|
||||
resp, err := az.RouteTablesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, routeTable, nil)
|
||||
if shouldRetryAPIRequest(resp, err) {
|
||||
retryErr := az.CreateOrUpdateRouteTableWithRetry(routeTable)
|
||||
if retryErr != nil {
|
||||
return retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -103,7 +109,13 @@ func (az *Cloud) CreateRoute(clusterName string, nameHint string, kubeRoute *clo
|
|||
}
|
||||
|
||||
glog.V(3).Infof("create: creating route: instance=%q cidr=%q", kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
|
||||
_, err = az.RoutesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, *route.Name, route, nil)
|
||||
resp, err := az.RoutesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, *route.Name, route, nil)
|
||||
if shouldRetryAPIRequest(resp, err) {
|
||||
retryErr := az.CreateOrUpdateRouteWithRetry(route)
|
||||
if retryErr != nil {
|
||||
return retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -118,7 +130,13 @@ func (az *Cloud) DeleteRoute(clusterName string, kubeRoute *cloudprovider.Route)
|
|||
glog.V(2).Infof("delete: deleting route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
|
||||
|
||||
routeName := mapNodeNameToRouteName(kubeRoute.TargetNode)
|
||||
_, err := az.RoutesClient.Delete(az.ResourceGroup, az.RouteTableName, routeName, nil)
|
||||
resp, err := az.RoutesClient.Delete(az.ResourceGroup, az.RouteTableName, routeName, nil)
|
||||
if shouldRetryAPIRequest(resp, err) {
|
||||
retryErr := az.DeleteRouteWithRetry(routeName)
|
||||
if retryErr != nil {
|
||||
return retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -64,7 +64,13 @@ func (az *Cloud) AttachDisk(diskName, diskURI string, nodeName types.NodeName, l
|
|||
},
|
||||
}
|
||||
vmName := mapNodeNameToVMName(nodeName)
|
||||
_, err = az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil)
|
||||
resp, err := az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil)
|
||||
if shouldRetryAPIRequest(resp, err) {
|
||||
retryErr := az.CreateOrUpdateVMWithRetry(vmName, newVM)
|
||||
if retryErr != nil {
|
||||
return retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
glog.Errorf("azure attach failed, err: %v", err)
|
||||
detail := err.Error()
|
||||
|
@ -135,7 +141,13 @@ func (az *Cloud) DetachDiskByName(diskName, diskURI string, nodeName types.NodeN
|
|||
},
|
||||
}
|
||||
vmName := mapNodeNameToVMName(nodeName)
|
||||
_, err = az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil)
|
||||
resp, err := az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil)
|
||||
if shouldRetryAPIRequest(resp, err) {
|
||||
retryErr := az.CreateOrUpdateVMWithRetry(vmName, newVM)
|
||||
if retryErr != nil {
|
||||
return retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
glog.Errorf("azure disk detach failed, err: %v", err)
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue