mirror of https://github.com/k3s-io/k3s
Merge pull request #57432 from karataliu/azure_vmget_cache
Automatic merge from submit-queue (batch tested with PRs 49856, 56257, 57027, 57695, 57432). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Add cache for VirtualMachinesClient.Get in azure cloud provider **What this PR does / why we need it**: Add a timed cache for 'VirtualMachinesClient.Get' Currently cloud provider will send several get calls to same URL in short period, which is not necessary. **Which issue(s) this PR fixes**: Fixes #57031 **Special notes for your reviewer**: **Release note**: NONEpull/6/head
commit
f918d18acb
|
@ -22,6 +22,7 @@ go_library(
|
|||
"azure_storage.go",
|
||||
"azure_storageaccount.go",
|
||||
"azure_util.go",
|
||||
"azure_util_cache.go",
|
||||
"azure_util_vmss.go",
|
||||
"azure_vmsets.go",
|
||||
"azure_wrap.go",
|
||||
|
@ -52,6 +53,7 @@ go_library(
|
|||
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -61,6 +63,7 @@ go_test(
|
|||
srcs = [
|
||||
"azure_loadbalancer_test.go",
|
||||
"azure_test.go",
|
||||
"azure_util_cache_test.go",
|
||||
"azure_util_test.go",
|
||||
"azure_wrap_test.go",
|
||||
],
|
||||
|
|
|
@ -134,6 +134,8 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri
|
|||
}
|
||||
} else {
|
||||
glog.V(4).Infof("azureDisk - azure attach succeeded")
|
||||
// Invalidate the cache right after updating
|
||||
vmCache.Delete(vmName)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -192,6 +194,8 @@ func (c *controllerCommon) DetachDiskByName(diskName, diskURI string, nodeName t
|
|||
glog.Errorf("azureDisk - azure disk detach failed, err: %v", err)
|
||||
} else {
|
||||
glog.V(4).Infof("azureDisk - azure disk detach succeeded")
|
||||
// Invalidate the cache right after updating
|
||||
vmCache.Delete(vmName)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -435,11 +435,15 @@ func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error
|
|||
|
||||
// GetZoneByNodeName gets zone from instance view.
|
||||
func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
|
||||
vm, err := as.VirtualMachinesClient.Get(as.ResourceGroup, name, compute.InstanceView)
|
||||
vm, exists, err := as.getVirtualMachine(types.NodeName(name))
|
||||
if err != nil {
|
||||
return cloudprovider.Zone{}, err
|
||||
}
|
||||
|
||||
if !exists {
|
||||
return cloudprovider.Zone{}, cloudprovider.InstanceNotFound
|
||||
}
|
||||
|
||||
failureDomain := strconv.Itoa(int(*vm.VirtualMachineProperties.InstanceView.PlatformFaultDomain))
|
||||
zone := cloudprovider.Zone{
|
||||
FailureDomain: failureDomain,
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package azure
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
type timedcacheEntry struct {
|
||||
key string
|
||||
data interface{}
|
||||
}
|
||||
|
||||
type timedcache struct {
|
||||
store cache.Store
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// ttl time.Duration
|
||||
func newTimedcache(ttl time.Duration) timedcache {
|
||||
return timedcache{
|
||||
store: cache.NewTTLStore(cacheKeyFunc, ttl),
|
||||
}
|
||||
}
|
||||
|
||||
func cacheKeyFunc(obj interface{}) (string, error) {
|
||||
return obj.(*timedcacheEntry).key, nil
|
||||
}
|
||||
|
||||
func (t *timedcache) GetOrCreate(key string, createFunc func() interface{}) (interface{}, error) {
|
||||
entry, exists, err := t.store.GetByKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if exists {
|
||||
return (entry.(*timedcacheEntry)).data, nil
|
||||
}
|
||||
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
entry, exists, err = t.store.GetByKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if exists {
|
||||
return (entry.(*timedcacheEntry)).data, nil
|
||||
}
|
||||
|
||||
if createFunc == nil {
|
||||
return nil, nil
|
||||
}
|
||||
created := createFunc()
|
||||
t.store.Add(&timedcacheEntry{
|
||||
key: key,
|
||||
data: created,
|
||||
})
|
||||
return created, nil
|
||||
}
|
||||
|
||||
func (t *timedcache) Delete(key string) {
|
||||
_ = t.store.Delete(&timedcacheEntry{
|
||||
key: key,
|
||||
})
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package azure
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestCacheReturnsSameObject(t *testing.T) {
|
||||
type cacheTestingStruct struct{}
|
||||
c := newTimedcache(1 * time.Minute)
|
||||
o1 := cacheTestingStruct{}
|
||||
get1, _ := c.GetOrCreate("b1", func() interface{} {
|
||||
return o1
|
||||
})
|
||||
o2 := cacheTestingStruct{}
|
||||
get2, _ := c.GetOrCreate("b1", func() interface{} {
|
||||
return o2
|
||||
})
|
||||
if get1 != get2 {
|
||||
t.Error("Get not equal")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheCallsCreateFuncOnce(t *testing.T) {
|
||||
var callsCount uint32
|
||||
f1 := func() interface{} {
|
||||
atomic.AddUint32(&callsCount, 1)
|
||||
return 1
|
||||
}
|
||||
c := newTimedcache(500 * time.Millisecond)
|
||||
for index := 0; index < 20; index++ {
|
||||
_, _ = c.GetOrCreate("b1", f1)
|
||||
}
|
||||
|
||||
if callsCount != 1 {
|
||||
t.Error("Count not match")
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
c.GetOrCreate("b1", f1)
|
||||
if callsCount != 2 {
|
||||
t.Error("Count not match")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheExpires(t *testing.T) {
|
||||
f1 := func() interface{} {
|
||||
return 1
|
||||
}
|
||||
c := newTimedcache(500 * time.Millisecond)
|
||||
get1, _ := c.GetOrCreate("b1", f1)
|
||||
if get1 != 1 {
|
||||
t.Error("Value not equal")
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
get1, _ = c.GetOrCreate("b1", nil)
|
||||
if get1 != nil {
|
||||
t.Error("value not expired")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheDelete(t *testing.T) {
|
||||
f1 := func() interface{} {
|
||||
return 1
|
||||
}
|
||||
c := newTimedcache(500 * time.Millisecond)
|
||||
get1, _ := c.GetOrCreate("b1", f1)
|
||||
if get1 != 1 {
|
||||
t.Error("Value not equal")
|
||||
}
|
||||
get1, _ = c.GetOrCreate("b1", nil)
|
||||
if get1 != 1 {
|
||||
t.Error("Value not equal")
|
||||
}
|
||||
c.Delete("b1")
|
||||
get1, _ = c.GetOrCreate("b1", nil)
|
||||
if get1 != nil {
|
||||
t.Error("value not deleted")
|
||||
}
|
||||
}
|
|
@ -18,6 +18,8 @@ package azure
|
|||
|
||||
import (
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/arm/compute"
|
||||
"github.com/Azure/azure-sdk-for-go/arm/network"
|
||||
|
@ -57,25 +59,65 @@ func ignoreStatusNotFoundFromError(err error) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// cache used by getVirtualMachine
|
||||
// 15s for expiration duration
|
||||
var vmCache = newTimedcache(15 * time.Second)
|
||||
|
||||
type vmRequest struct {
|
||||
lock *sync.Mutex
|
||||
vm *compute.VirtualMachine
|
||||
}
|
||||
|
||||
/// getVirtualMachine calls 'VirtualMachinesClient.Get' with a timed cache
|
||||
/// The service side has throttling control that delays responses if there're multiple requests onto certain vm
|
||||
/// resource request in short period.
|
||||
func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualMachine, exists bool, err error) {
|
||||
var realErr error
|
||||
|
||||
vmName := string(nodeName)
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachinesClient.Get(%s): start", vmName)
|
||||
vm, err = az.VirtualMachinesClient.Get(az.ResourceGroup, vmName, "")
|
||||
glog.V(10).Infof("VirtualMachinesClient.Get(%s): end", vmName)
|
||||
|
||||
exists, realErr = checkResourceExistsFromError(err)
|
||||
if realErr != nil {
|
||||
return vm, false, realErr
|
||||
cachedRequest, err := vmCache.GetOrCreate(vmName, func() interface{} {
|
||||
return &vmRequest{
|
||||
lock: &sync.Mutex{},
|
||||
vm: nil,
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return compute.VirtualMachine{}, false, err
|
||||
}
|
||||
request := cachedRequest.(*vmRequest)
|
||||
|
||||
if request.vm == nil {
|
||||
request.lock.Lock()
|
||||
defer request.lock.Unlock()
|
||||
if request.vm == nil {
|
||||
// Currently InstanceView request are used by azure_zones, while the calls come after non-InstanceView
|
||||
// request. If we first send an InstanceView request and then a non InstanceView request, the second
|
||||
// request will still hit throttling. This is what happens now for cloud controller manager: In this
|
||||
// case we do get instance view every time to fulfill the azure_zones requirement without hitting
|
||||
// throttling.
|
||||
// Consider adding separate parameter for controlling 'InstanceView' once node update issue #56276 is fixed
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachinesClient.Get(%s): start", vmName)
|
||||
vm, err = az.VirtualMachinesClient.Get(az.ResourceGroup, vmName, compute.InstanceView)
|
||||
glog.V(10).Infof("VirtualMachinesClient.Get(%s): end", vmName)
|
||||
|
||||
exists, realErr = checkResourceExistsFromError(err)
|
||||
if realErr != nil {
|
||||
return vm, false, realErr
|
||||
}
|
||||
|
||||
if !exists {
|
||||
return vm, false, nil
|
||||
}
|
||||
|
||||
request.vm = &vm
|
||||
}
|
||||
return vm, exists, err
|
||||
}
|
||||
|
||||
if !exists {
|
||||
return vm, false, nil
|
||||
}
|
||||
|
||||
return vm, exists, err
|
||||
glog.V(6).Infof("getVirtualMachine hits cache for(%s)", vmName)
|
||||
return *request.vm, true, nil
|
||||
}
|
||||
|
||||
func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, err error) {
|
||||
|
|
Loading…
Reference in New Issue