Refactor version cache into kubelet util

pull/6/head
Harry Zhang 2016-03-14 16:35:49 +08:00
parent f9e2f522b4
commit c31ec5607a
5 changed files with 83 additions and 50 deletions

View File

@ -402,6 +402,8 @@ func (f *FakeDockerClient) PullImage(opts docker.PullImageOptions, auth docker.A
} }
func (f *FakeDockerClient) Version() (*docker.Env, error) { func (f *FakeDockerClient) Version() (*docker.Env, error) {
f.Lock()
defer f.Unlock()
return &f.VersionInfo, f.popError("version") return &f.VersionInfo, f.popError("version")
} }

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/cache"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/oom"
@ -52,6 +53,9 @@ func NewFakeDockerManager(
burst, containerLogsDir, osInterface, networkPlugin, runtimeHelper, httpClient, &NativeExecHandler{}, burst, containerLogsDir, osInterface, networkPlugin, runtimeHelper, httpClient, &NativeExecHandler{},
fakeOOMAdjuster, fakeProcFs, false, imageBackOff, false, false, true) fakeOOMAdjuster, fakeProcFs, false, imageBackOff, false, false, true)
dm.dockerPuller = &FakeDockerPuller{} dm.dockerPuller = &FakeDockerPuller{}
dm.versionCache = cache.NewVersionCache(func() (kubecontainer.Version, kubecontainer.Version, error) {
return dm.getVersionInfo()
})
return dm return dm
} }

View File

@ -46,6 +46,7 @@ import (
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/qos"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/cache"
"k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/securitycontext"
@ -55,7 +56,6 @@ import (
"k8s.io/kubernetes/pkg/util/procfs" "k8s.io/kubernetes/pkg/util/procfs"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
utilstrings "k8s.io/kubernetes/pkg/util/strings" utilstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/util/wait"
) )
const ( const (
@ -161,7 +161,7 @@ type DockerManager struct {
configureHairpinMode bool configureHairpinMode bool
// The api version cache of docker daemon. // The api version cache of docker daemon.
versionCache *VersionCache versionCache *cache.VersionCache
} }
// A subset of the pod.Manager interface extracted for testing purposes. // A subset of the pod.Manager interface extracted for testing purposes.
@ -239,7 +239,6 @@ func NewDockerManager(
cpuCFSQuota: cpuCFSQuota, cpuCFSQuota: cpuCFSQuota,
enableCustomMetrics: enableCustomMetrics, enableCustomMetrics: enableCustomMetrics,
configureHairpinMode: hairpinMode, configureHairpinMode: hairpinMode,
versionCache: NewVersionCache(),
} }
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
if serializeImagePulls { if serializeImagePulls {
@ -254,21 +253,15 @@ func NewDockerManager(
optf(dm) optf(dm)
} }
apiVersion, err := dm.APIVersion() // initialize versionCache with a updater
if err != nil { dm.versionCache = cache.NewVersionCache(func() (kubecontainer.Version, kubecontainer.Version, error) {
glog.Errorf("Failed to get api version from docker %v", err) return dm.getVersionInfo()
})
// update version cache periodically.
if dm.machineInfo != nil {
dm.versionCache.UpdateCachePeriodly(dm.machineInfo.MachineID)
} }
daemonVersion, err := dm.Version()
if err != nil {
glog.Errorf("Failed to get daemon version from docker %v", err)
}
// Update version cache periodically
go wait.Until(func() {
dm.versionCache.Update(dm.machineInfo.MachineID, apiVersion, daemonVersion)
}, 5*time.Second, wait.NeverStop)
return dm return dm
} }
@ -608,7 +601,10 @@ func (dm *DockerManager) runContainer(
} }
// If current api version is newer than docker 1.10 requested, set OomScoreAdj to HostConfig // If current api version is newer than docker 1.10 requested, set OomScoreAdj to HostConfig
if dm.checkDockerAPIVersion(dockerv110APIVersion) >= 0 { result, err := dm.checkDockerAPIVersion(dockerv110APIVersion)
if err != nil {
glog.Errorf("Failed to check docker api version: %v", err)
} else if result >= 0 {
hc.OomScoreAdj = oomScoreAdj hc.OomScoreAdj = oomScoreAdj
} }
@ -1554,12 +1550,15 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
} }
func (dm *DockerManager) applyOOMScoreAdjIfNeeded(container *api.Container, containerInfo *docker.Container) error { func (dm *DockerManager) applyOOMScoreAdjIfNeeded(container *api.Container, containerInfo *docker.Container) error {
// Compare current API version with expected api version // Compare current API version with expected api version.
result := dm.checkDockerAPIVersion(dockerv110APIVersion) result, err := dm.checkDockerAPIVersion(dockerv110APIVersion)
if err != nil {
return fmt.Errorf("Failed to check docker api version: %v", err)
}
// If current api version is older than OOMScoreAdj requested, use the old way. // If current api version is older than OOMScoreAdj requested, use the old way.
if result < 0 { if result < 0 {
if err := dm.applyOOMScoreAdj(container, containerInfo); err != nil { if err := dm.applyOOMScoreAdj(container, containerInfo); err != nil {
return fmt.Errorf("failed to apply oom-score-adj to container %q- %v", err, containerInfo.Name) return fmt.Errorf("Failed to apply oom-score-adj to container %q- %v", err, containerInfo.Name)
} }
} }
@ -1582,21 +1581,17 @@ func (dm *DockerManager) calculateOomScoreAdj(container *api.Container) int {
return oomScoreAdj return oomScoreAdj
} }
// getCachedApiVersion gets cached api version of docker runtime. // getCachedVersionInfo gets cached version info of docker runtime.
func (dm *DockerManager) getCachedApiVersion() (kubecontainer.Version, error) { func (dm *DockerManager) getCachedVersionInfo() (kubecontainer.Version, kubecontainer.Version, error) {
apiVersion, _, err := dm.versionCache.Get(dm.machineInfo.MachineID) apiVersion, daemonVersion, err := dm.versionCache.Get(dm.machineInfo.MachineID)
if err != nil { if err != nil {
glog.Errorf("Failed to get cached docker api version %v ", err) glog.Errorf("Failed to get cached docker api version %v ", err)
} }
// If we got nil apiVersion, try to get api version directly. // If we got nil versions, try to update version info.
if apiVersion == nil { if apiVersion == nil || daemonVersion == nil {
apiVersion, err = dm.APIVersion() dm.versionCache.Update(dm.machineInfo.MachineID)
if err != nil {
glog.Errorf("Failed to get docker api version directly %v ", err)
}
dm.versionCache.Update(dm.machineInfo.MachineID, apiVersion, nil)
} }
return apiVersion, err return apiVersion, daemonVersion, err
} }
// checkDockerAPIVersion checks current docker API version against expected version. // checkDockerAPIVersion checks current docker API version against expected version.
@ -1604,17 +1599,17 @@ func (dm *DockerManager) getCachedApiVersion() (kubecontainer.Version, error) {
// 1 : newer than expected version // 1 : newer than expected version
// -1: older than expected version // -1: older than expected version
// 0 : same version // 0 : same version
func (dm *DockerManager) checkDockerAPIVersion(expectedVersion string) int { func (dm *DockerManager) checkDockerAPIVersion(expectedVersion string) (int, error) {
apiVersion, err := dm.getCachedApiVersion() apiVersion, _, err := dm.getCachedVersionInfo()
if err != nil { if err != nil {
glog.Errorf("Failed to get cached docker api version %v ", err) return 0, err
} }
result, err := apiVersion.Compare(expectedVersion) result, err := apiVersion.Compare(expectedVersion)
if err != nil { if err != nil {
glog.Errorf("Failed to compare current docker api version %v with OOMScoreAdj supported Docker version %q - %v", return 0, fmt.Errorf("Failed to compare current docker api version %v with OOMScoreAdj supported Docker version %q - %v",
apiVersion, expectedVersion, err) apiVersion, expectedVersion, err)
} }
return result return result, nil
} }
func addNDotsOption(resolvFilePath string) error { func addNDotsOption(resolvFilePath string) error {
@ -2200,3 +2195,17 @@ func (dm *DockerManager) GetPodStatus(uid types.UID, name, namespace string) (*k
podStatus.ContainerStatuses = containerStatuses podStatus.ContainerStatuses = containerStatuses
return podStatus, nil return podStatus, nil
} }
// getVersionInfo returns apiVersion & daemonVersion of docker runtime
func (dm *DockerManager) getVersionInfo() (kubecontainer.Version, kubecontainer.Version, error) {
apiVersion, err := dm.APIVersion()
if err != nil {
return nil, nil, err
}
daemonVersion, err := dm.Version()
if err != nil {
return nil, nil, err
}
return apiVersion, daemonVersion, nil
}

View File

@ -1933,7 +1933,7 @@ func TestGetPodStatusNoSuchContainer(t *testing.T) {
}, },
}}) }})
fakeDocker.Errors = map[string]error{"inspect": &docker.NoSuchContainer{}} fakeDocker.InjectErrors(map[string]error{"inspect": &docker.NoSuchContainer{}})
runSyncPod(t, dm, fakeDocker, pod, nil, false) runSyncPod(t, dm, fakeDocker, pod, nil, false)
// Verify that we will try to start new contrainers even if the inspections // Verify that we will try to start new contrainers even if the inspections

View File

@ -14,19 +14,23 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package dockertools package cache
import ( import (
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/golang/glog"
"github.com/golang/groupcache/lru"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/wait"
) )
type VersionCache struct { type VersionCache struct {
lock sync.RWMutex lock sync.RWMutex
cache *lru.Cache cache map[string]versionInfo
updater func() (kubecontainer.Version, kubecontainer.Version, error)
} }
// versionInfo caches api version and daemon version. // versionInfo caches api version and daemon version.
@ -37,15 +41,24 @@ type versionInfo struct {
const maxVersionCacheEntries = 1000 const maxVersionCacheEntries = 1000
func NewVersionCache() *VersionCache { func NewVersionCache(f func() (kubecontainer.Version, kubecontainer.Version, error)) *VersionCache {
return &VersionCache{cache: lru.New(maxVersionCacheEntries)} return &VersionCache{
cache: map[string]versionInfo{},
updater: f,
}
} }
// Update updates cached versionInfo by using a unique string (e.g. machineInfo) as the key. // Update updates cached versionInfo by using a unique string (e.g. machineInfo) as the key.
func (c *VersionCache) Update(key string, apiVersion kubecontainer.Version, version kubecontainer.Version) { func (c *VersionCache) Update(key string) {
c.lock.Lock() apiVersion, daemonVersion, err := c.updater()
defer c.lock.Unlock()
c.cache.Add(key, versionInfo{apiVersion, version}) if err != nil {
glog.Errorf("Fail to get version info from container runtime: %v", err)
} else {
c.lock.Lock()
defer c.lock.Unlock()
c.cache[key] = versionInfo{apiVersion, daemonVersion}
}
} }
// Get gets cached versionInfo by using a unique string (e.g. machineInfo) as the key. // Get gets cached versionInfo by using a unique string (e.g. machineInfo) as the key.
@ -53,10 +66,15 @@ func (c *VersionCache) Update(key string, apiVersion kubecontainer.Version, vers
func (c *VersionCache) Get(key string) (kubecontainer.Version, kubecontainer.Version, error) { func (c *VersionCache) Get(key string) (kubecontainer.Version, kubecontainer.Version, error) {
c.lock.RLock() c.lock.RLock()
defer c.lock.RUnlock() defer c.lock.RUnlock()
value, ok := c.cache.Get(key) value, ok := c.cache[key]
if !ok { if !ok {
return nil, nil, fmt.Errorf("Failed to get version info from cache by key: ", key) return nil, nil, fmt.Errorf("Failed to get version info from cache by key: ", key)
} }
versions := value.(versionInfo) return value.apiVersion, value.version, nil
return versions.apiVersion, versions.version, nil }
func (c *VersionCache) UpdateCachePeriodly(key string) {
go wait.Until(func() {
c.Update(key)
}, 1*time.Minute, wait.NeverStop)
} }