Merge pull request #24376 from resouer/fix-cache

Automatic merge from submit-queue

Do not update cache with so much effort

Fixes: #24298
1. Remove automatic update
2. Every time we check if we can get valid value from cache, if not, get the value directly from api

cc @Random-Liu
pull/6/head
k8s-merge-robot 2016-04-28 01:00:33 -07:00
commit 04b70bc6c7
5 changed files with 218 additions and 89 deletions

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/network"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/cache"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/oom"
@ -52,6 +53,14 @@ func NewFakeDockerManager(
burst, containerLogsDir, osInterface, networkPlugin, runtimeHelper, httpClient, &NativeExecHandler{},
fakeOOMAdjuster, fakeProcFs, false, imageBackOff, false, false, true)
dm.dockerPuller = &FakeDockerPuller{}
// ttl of version cache is set to 0 so we always call version api directly in tests.
dm.versionCache = cache.NewObjectCache(
func() (interface{}, error) {
return dm.getVersionInfo()
},
0,
)
return dm
}

View File

@ -87,6 +87,9 @@ const (
// Remote API version for docker daemon version v1.10
// https://docs.docker.com/engine/reference/api/docker_remote_api/
dockerV110APIVersion = "1.22"
// The expiration time of version cache.
versionCacheTTL = 60 * time.Second
)
var (
@ -161,11 +164,11 @@ type DockerManager struct {
// it might already be true.
configureHairpinMode bool
// The api version cache of docker daemon.
versionCache *cache.VersionCache
// Provides image stats
*imageStatsProvider
// The version cache of docker daemon.
versionCache *cache.ObjectCache
}
// A subset of the pod.Manager interface extracted for testing purposes.
@ -253,6 +256,13 @@ func NewDockerManager(
}
dm.containerGC = NewContainerGC(client, podGetter, containerLogsDir)
dm.versionCache = cache.NewObjectCache(
func() (interface{}, error) {
return dm.getVersionInfo()
},
versionCacheTTL,
)
// apply optional settings..
for _, optf := range options {
optf(dm)
@ -1554,16 +1564,24 @@ func (dm *DockerManager) calculateOomScoreAdj(container *api.Container) int {
return oomScoreAdj
}
// versionInfo wraps api version and daemon version.
type versionInfo struct {
apiVersion kubecontainer.Version
daemonVersion kubecontainer.Version
}
// checkDockerAPIVersion checks current docker API version against expected version.
// Return:
// 1 : newer than expected version
// -1: older than expected version
// 0 : same version
func (dm *DockerManager) checkDockerAPIVersion(expectedVersion string) (int, error) {
apiVersion, _, err := dm.getVersionInfo()
value, err := dm.versionCache.Get(dm.machineInfo.MachineID)
if err != nil {
return 0, err
}
apiVersion := value.(versionInfo).apiVersion
result, err := apiVersion.Compare(expectedVersion)
if err != nil {
return 0, fmt.Errorf("Failed to compare current docker api version %v with OOMScoreAdj supported Docker version %q - %v",
@ -2156,15 +2174,17 @@ func (dm *DockerManager) GetPodStatus(uid types.UID, name, namespace string) (*k
}
// getVersionInfo returns apiVersion & daemonVersion of docker runtime
func (dm *DockerManager) getVersionInfo() (kubecontainer.Version, kubecontainer.Version, error) {
func (dm *DockerManager) getVersionInfo() (versionInfo, error) {
apiVersion, err := dm.APIVersion()
if err != nil {
return nil, nil, err
return versionInfo{}, err
}
daemonVersion, err := dm.Version()
if err != nil {
return nil, nil, err
return versionInfo{}, err
}
return apiVersion, daemonVersion, nil
return versionInfo{
apiVersion: apiVersion,
daemonVersion: daemonVersion,
}, nil
}

84
pkg/kubelet/util/cache/object_cache.go vendored Normal file
View File

@ -0,0 +1,84 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"time"
expirationCache "k8s.io/kubernetes/pkg/client/cache"
)
// ObjectCache is a simple wrapper of expiration cache that
// 1. use string type key
// 2. has a updater to get value directly if it is expired
// 3. then update the cache
type ObjectCache struct {
cache expirationCache.Store
updater func() (interface{}, error)
}
// objectEntry is a object with string type key.
type objectEntry struct {
key string
obj interface{}
}
// NewObjectCache creates ObjectCache with a updater.
// updater returns a object to cache.
func NewObjectCache(f func() (interface{}, error), ttl time.Duration) *ObjectCache {
return &ObjectCache{
updater: f,
cache: expirationCache.NewTTLStore(stringKeyFunc, ttl),
}
}
// stringKeyFunc is a string as cache key function
func stringKeyFunc(obj interface{}) (string, error) {
key := obj.(objectEntry).key
return key, nil
}
// Get gets cached objectEntry by using a unique string as the key.
func (c *ObjectCache) Get(key string) (interface{}, error) {
value, ok, err := c.cache.Get(objectEntry{key: key})
if err != nil {
return nil, err
}
if !ok {
obj, err := c.updater()
if err != nil {
return nil, err
}
err = c.cache.Add(objectEntry{
key: key,
obj: obj,
})
if err != nil {
return nil, err
}
return obj, nil
}
return value.(objectEntry).obj, nil
}
func (c *ObjectCache) Add(key string, obj interface{}) error {
err := c.cache.Add(objectEntry{key: key, obj: obj})
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,96 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"testing"
"time"
expirationCache "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/util"
)
type testObject struct {
key string
val string
}
// A fake objectCache for unit test.
func NewFakeObjectCache(f func() (interface{}, error), ttl time.Duration, clock util.Clock) *ObjectCache {
ttlPolicy := &expirationCache.TTLPolicy{Ttl: ttl, Clock: clock}
deleteChan := make(chan string, 1)
return &ObjectCache{
updater: f,
cache: expirationCache.NewFakeExpirationStore(stringKeyFunc, deleteChan, ttlPolicy, clock),
}
}
func TestAddAndGet(t *testing.T) {
testObj := testObject{
key: "foo",
val: "bar",
}
objectCache := NewFakeObjectCache(func() (interface{}, error) {
return nil, fmt.Errorf("Unexpected Error: updater should never be called in this test!")
}, 1*time.Hour, util.NewFakeClock(time.Now()))
err := objectCache.Add(testObj.key, testObj.val)
if err != nil {
t.Errorf("Unable to add obj %#v by key: %s", testObj, testObj.key)
}
value, err := objectCache.Get(testObj.key)
if err != nil {
t.Errorf("Unable to get obj %#v by key: %s", testObj, testObj.key)
}
if value.(string) != testObj.val {
t.Errorf("Expected to get cached value: %#v, but got: %s", testObj.val, value.(string))
}
}
func TestExpirationBasic(t *testing.T) {
unexpectedVal := "bar"
expectedVal := "bar2"
testObj := testObject{
key: "foo",
val: unexpectedVal,
}
fakeClock := util.NewFakeClock(time.Now())
objectCache := NewFakeObjectCache(func() (interface{}, error) {
return expectedVal, nil
}, 1*time.Second, fakeClock)
err := objectCache.Add(testObj.key, testObj.val)
if err != nil {
t.Errorf("Unable to add obj %#v by key: %s", testObj, testObj.key)
}
// sleep 2s so cache should be expired.
fakeClock.Sleep(2 * time.Second)
value, err := objectCache.Get(testObj.key)
if err != nil {
t.Errorf("Unable to get obj %#v by key: %s", testObj, testObj.key)
}
if value.(string) != expectedVal {
t.Errorf("Expected to get cached value: %#v, but got: %s", expectedVal, value.(string))
}
}

View File

@ -1,80 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"sync"
"time"
"github.com/golang/glog"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/wait"
)
type VersionCache struct {
lock sync.RWMutex
cache map[string]versionInfo
updater func() (kubecontainer.Version, kubecontainer.Version, error)
}
// versionInfo caches api version and daemon version.
type versionInfo struct {
apiVersion kubecontainer.Version
version kubecontainer.Version
}
const maxVersionCacheEntries = 1000
func NewVersionCache(f func() (kubecontainer.Version, kubecontainer.Version, error)) *VersionCache {
return &VersionCache{
cache: map[string]versionInfo{},
updater: f,
}
}
// Update updates cached versionInfo by using a unique string (e.g. machineInfo) as the key.
func (c *VersionCache) Update(key string) {
apiVersion, daemonVersion, err := c.updater()
if err != nil {
glog.Errorf("Fail to get version info from container runtime: %v", err)
} else {
c.lock.Lock()
defer c.lock.Unlock()
c.cache[key] = versionInfo{apiVersion, daemonVersion}
}
}
// Get gets cached versionInfo by using a unique string (e.g. machineInfo) as the key.
// It returns apiVersion first and followed by daemon version.
func (c *VersionCache) Get(key string) (kubecontainer.Version, kubecontainer.Version, error) {
c.lock.RLock()
defer c.lock.RUnlock()
value, ok := c.cache[key]
if !ok {
return nil, nil, fmt.Errorf("Failed to get version info from cache by key: %v", key)
}
return value.apiVersion, value.version, nil
}
func (c *VersionCache) UpdateCachePeriodly(key string) {
go wait.Until(func() {
c.Update(key)
}, 1*time.Minute, wait.NeverStop)
}